Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/opal-client/opal_client/data/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from opal_common.fetcher import FetchingEngine
from opal_common.fetcher.events import FetcherConfig
from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig
from opal_common.http_utils import redact_url
from opal_common.logger import logger
from opal_common.utils import get_authorization_header, tuple_to_dict

Expand Down Expand Up @@ -62,20 +63,20 @@ async def handle_url(
) -> Optional[JsonableValue]:
"""Helper function wrapping self._engine.handle_url."""
if data is not None:
logger.info("Data provided inline for url: {url}", url=url)
logger.info("Data provided inline for url: {url}", url=redact_url(url))
return data

if url is None:
logger.error("Invalid data update: no embedded data or URL")
return None

logger.info("Fetching data from url: {url}", url=url)
logger.info("Fetching data from url: {url}", url=redact_url(url))
try:
# ask the engine to get our data
response = await self._engine.handle_url(url, config=config)
return response
except asyncio.TimeoutError as e:
logger.exception("Timeout while fetching url: {url}", url=url)
logger.exception("Timeout while fetching url: {url}", url=redact_url(url))
raise

async def handle_urls(
Expand Down
25 changes: 15 additions & 10 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from opal_common.async_utils import TasksPool, repeated_call
from opal_common.config import opal_common_config
from opal_common.http_utils import is_http_error_response
from opal_common.http_utils import is_http_error_response, redact_url
from opal_common.schemas.data import (
DataEntryReport,
DataSourceConfig,
Expand Down Expand Up @@ -475,14 +475,17 @@ async def _update_policy_data(self, update: DataUpdate) -> None:

for entry in update.entries:
if not entry.topics:
logger.debug("Data entry {entry} has no topics, skipping", entry=entry)
logger.debug(
"Data entry for url {url} has no topics, skipping",
url=redact_url(entry.url),
)
continue

# Only process entries that match one of our subscribed data topics
if set(entry.topics).isdisjoint(set(self._data_topics)):
logger.debug(
"Data entry {entry} has no topics matching the data topics, skipping",
entry=entry,
"Data entry for url {url} has no topics matching the data topics, skipping",
url=redact_url(entry.url),
)
continue

Expand Down Expand Up @@ -589,27 +592,29 @@ async def _fetch_data(self, entry: DataSourceEntry) -> JsonableValue:
)
except Exception as e:
logger.exception(
"Failed to fetch data for entry {entry} with exception {exc}",
entry=entry,
"Failed to fetch data for entry url {url} with exception {exc}",
url=redact_url(entry.url),
exc=e,
)
Comment thread
Zivxx marked this conversation as resolved.
raise Exception(f"Failed to fetch data for entry {entry.url}: {e}")
raise Exception(
f"Failed to fetch data for entry {redact_url(entry.url)}: {e}"
)

if result is None:
raise Exception(f"Fetched data is empty for entry {entry.url}")
raise Exception(f"Fetched data is empty for entry {redact_url(entry.url)}")

if isinstance(result, aiohttp.ClientResponse) and is_http_error_response(
result
):
error_content = await result.text()
logger.error(
"Failed to decode response from url: '{url}', got response code {status} with response: {error}",
url=entry.url,
url=redact_url(entry.url),
status=result.status,
error=error_content,
)
raise Exception(
f"Failed to decode response from url: '{entry.url}', got response code {result.status} with response: {error_content}"
f"Failed to decode response from url: '{redact_url(entry.url)}', got response code {result.status} with response: {error_content}"
)

return result
Expand Down
8 changes: 7 additions & 1 deletion packages/opal-common/opal_common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ class OpalCommonConfig(Confi):
"LOG_TRACEBACK", True, description="Include traceback in log messages"
)
LOG_DIAGNOSE = confi.bool(
"LOG_DIAGNOSE", True, description="Include diagnosis in log messages"
"LOG_DIAGNOSE",
False,
description="Include diagnosis (local variable values) in tracebacks. "
"Off by default because loguru renders raw variable values, which can "
"leak credentials (e.g. auth headers/tokens) into logs - only enable for "
"local debugging. Also gates verbose git protocol tracing "
"(GIT_TRACE/GIT_CURL_VERBOSE), which can leak auth headers.",
)
LOG_COLORIZE = confi.bool("LOG_COLORIZE", True, description="Colorize log messages")
LOG_SERIALIZE = confi.bool(
Expand Down
16 changes: 12 additions & 4 deletions packages/opal-common/opal_common/fetcher/events.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
from typing import List, Optional
from typing import ClassVar, List, Optional, Set

from opal_common.logging_utils.redaction import RedactedReprMixin
from pydantic import BaseModel, Field


class FetcherConfig(BaseModel):
class FetcherConfig(RedactedReprMixin, BaseModel):
"""The configuration of a fetcher, used as part of a FetchEvent Fetch
Provider's have their own uniqueue events and configurations.
Provider's have their own unique events and configurations.

Configurations

Note: subclasses commonly carry credentials (e.g. auth headers). They must
list any secret-bearing fields in ``_redacted_repr_fields`` so they never
leak into logs - see ``RedactedReprMixin``.
"""

fetcher: Optional[str] = Field(
Expand All @@ -16,12 +21,15 @@ class FetcherConfig(BaseModel):
)


class FetchEvent(BaseModel):
class FetchEvent(RedactedReprMixin, BaseModel):
"""Event used to describe an queue fetching tasks Design note -

By using a Pydantic model - we can create a potentially transfer FetchEvents to be handled by other network nodes (perhaps via RPC)
"""

# ``config`` may carry a FetcherConfig with credentials - mask it in repr/str.
_redacted_repr_fields: ClassVar[Set[str]] = {"config"}
Comment thread
Zivxx marked this conversation as resolved.

# Event id to be filled by the engine
id: str = None
# optional name of the specific event
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Simple HTTP get data fetcher using requests supports."""

from typing import ClassVar, Set

from fastapi_websocket_rpc.rpc_methods import RpcMethodsBase
from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient
from opal_common.fetcher.events import FetcherConfig, FetchEvent
Expand All @@ -12,6 +14,9 @@
class FastApiRpcFetchConfig(FetcherConfig):
"""Config for FastApiRpcFetchConfig's Adding HTTP headers."""

# ``rpc_arguments`` may carry credentials - mask it in repr/str.
_redacted_repr_fields: ClassVar[Set[str]] = {"rpc_arguments"}

rpc_method_name: str
rpc_arguments: dict

Expand All @@ -38,8 +43,9 @@ async def _fetch_(self):
args = self._event.config.rpc_arguments
method = self._event.config.rpc_method_name
result = None
# Note: ``args`` (rpc_arguments) may carry credentials - never log it.
logger.info(
f"{self.__class__.__name__} fetching from {self._url} with RPC call {method}({args})"
f"{self.__class__.__name__} fetching from {self._url} with RPC call {method}"
)
Comment thread
Zivxx marked this conversation as resolved.
async with WebSocketRpcClient(
self._url,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Simple HTTP get data fetcher using requests supports."""

from enum import Enum
from typing import Any, Union, cast
from typing import Any, ClassVar, Set, Union, cast

import httpx
from aiohttp import ClientResponse, ClientSession, ClientTimeout
from opal_common.config import opal_common_config
from opal_common.fetcher.events import FetcherConfig, FetchEvent
from opal_common.fetcher.fetch_provider import BaseFetchProvider
from opal_common.fetcher.logger import get_logger
from opal_common.http_utils import is_http_error_response
from opal_common.http_utils import is_http_error_response, redact_url
from opal_common.security.sslcontext import get_custom_ssl_context
from pydantic import validator

Expand All @@ -28,6 +28,10 @@ class HttpMethods(Enum):
class HttpFetcherConfig(FetcherConfig):
"""Config for HttpFetchProvider's Adding HTTP headers."""

# ``headers`` carries Authorization tokens and ``data`` the (possibly
# sensitive) payload - mask both in repr/str so they never leak into logs.
_redacted_repr_fields: ClassVar[Set[str]] = {"headers", "data"}

headers: dict = None
is_json: bool = True
process_data: bool = True
Expand Down Expand Up @@ -91,7 +95,7 @@ async def __aexit__(self, exc_type=None, exc_val=None, tb=None):
await self._session.__aexit__(exc_type, exc_val, tb)

async def _fetch_(self):
logger.debug(f"{self.__class__.__name__} fetching from {self._url}")
logger.debug(f"{self.__class__.__name__} fetching from {redact_url(self._url)}")
http_method = self.match_http_method_from_type(
self._session, self._event.config.method
)
Expand Down
11 changes: 8 additions & 3 deletions packages/opal-common/opal_common/git_utils/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ def provide_git_ssh_environment(url: str, ssh_key: str):
if not is_ssh_repo_url(url) or ssh_key is None:
return {} # no ssh config
git_ssh_identity_file = save_ssh_key_to_pem_file(ssh_key)
return {
env = {
"GIT_SSH_COMMAND": f"ssh -o StrictHostKeyChecking=no -o IdentitiesOnly=yes -i {git_ssh_identity_file}",
"GIT_TRACE": "1",
"GIT_CURL_VERBOSE": "1",
}
# GIT_TRACE / GIT_CURL_VERBOSE make git dump verbose protocol output to
Comment thread
Zivxx marked this conversation as resolved.
Outdated
# stderr - including HTTP Authorization headers - which OPAL captures into
# its logs. Only enable them when diagnosis logging is explicitly turned on.
if opal_common_config.LOG_DIAGNOSE:
env["GIT_TRACE"] = "1"
env["GIT_CURL_VERBOSE"] = "1"
return env
13 changes: 10 additions & 3 deletions packages/opal-common/opal_common/git_utils/repo_cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from opal_common.config import opal_common_config
from opal_common.git_utils.env import provide_git_ssh_environment
from opal_common.git_utils.exceptions import GitFailed
from opal_common.http_utils import redact_url, redact_url_in_text
from opal_common.logger import logger
from opal_common.utils import get_filepaths_with_glob
from tenacity import RetryError, retry, stop, wait
Expand Down Expand Up @@ -179,7 +180,7 @@ async def clone(self) -> CloneResult:
"""
logger.info(
"Cloning repo from '{url}' to '{to_path}' (branch: '{branch}')",
url=self.url,
url=redact_url(self.url),
to_path=self.path,
branch=self.branch_name,
)
Expand All @@ -196,7 +197,10 @@ def _attempt_clone_from_url(self) -> CloneResult:
except (GitError, GitCommandError) as e:
raise GitFailed(e)
except RetryError as e:
logger.exception("cannot clone policy repo: {error}", error=e)
logger.exception(
"cannot clone policy repo: {error}",
error=redact_url_in_text(str(e), self.url),
)
raise GitFailed(e)
else:
logger.info("Clone succeeded", repo_path=self.path)
Expand All @@ -208,5 +212,8 @@ def _clone(self, env) -> Repo:
url=self.url, to_path=self.path, branch=self.branch_name, env=env
)
except (GitError, GitCommandError) as e:
logger.error("cannot clone policy repo: {error}", error=e)
logger.error(
"cannot clone policy repo: {error}",
error=redact_url_in_text(str(e), self.url),
)
raise
83 changes: 83 additions & 0 deletions packages/opal-common/opal_common/http_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,91 @@
import re
from typing import Union
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit

import aiohttp
import httpx

#: Query-string parameter names whose values may carry credentials.
SENSITIVE_QUERY_PARAMS = frozenset(
{
"token",
"access_token",
"api_key",
"apikey",
"key",
"password",
"secret",
"sig",
"signature",
}
)

#: Matches ``scheme://userinfo@`` anywhere in free text.
_USERINFO_RE = re.compile(r"(?P<scheme>[a-zA-Z][a-zA-Z0-9+.\-]*://)[^/@\s]+@")


def redact_url(url: str) -> str:
"""Strip embedded credentials from a URL so it is safe to log.

Data source / policy repo URLs may be of the form
``https://user:token@host/path`` or carry a credential in a query parameter
(e.g. ``?token=...``). We replace any ``user:password@`` userinfo with
``***@`` and mask the values of known sensitive query parameters, while
keeping the host, port, path and non-sensitive params intact for debugging.
Returns the input byte-for-byte unchanged if it is empty, cannot be parsed,
or carries nothing sensitive.
"""
if not url:
return url
try:
parts = urlsplit(url)
except ValueError:
return url

changed = False
netloc = parts.netloc
if parts.username or parts.password:
host = parts.hostname or ""
if ":" in host: # IPv6 literal - urlsplit strips the surrounding brackets
host = f"[{host}]"
if parts.port is not None:
Comment thread
Zivxx marked this conversation as resolved.
Outdated
host = f"{host}:{parts.port}"
netloc = f"***@{host}"
changed = True

query = parts.query
if query:
pairs = parse_qsl(query, keep_blank_values=True)
if any(key.lower() in SENSITIVE_QUERY_PARAMS for key, _ in pairs):
query = urlencode(
[
(key, "***" if key.lower() in SENSITIVE_QUERY_PARAMS else value)
for key, value in pairs
],
safe="*",
)
changed = True

if not changed:
return url
return urlunsplit((parts.scheme, netloc, parts.path, query, parts.fragment))
Comment thread
Zivxx marked this conversation as resolved.
Outdated


def redact_url_in_text(text: str, url: str = "") -> str:
"""Redact embedded credentials from free text such as a git error message.

Scrubs any ``scheme://user:password@`` userinfo found anywhere in the text
(a regex, so it is robust to the exact URL form git happens to print), and
additionally replaces verbatim occurrences of ``url`` with its fully redacted
form so query-string tokens of a known URL are masked too.
"""
if not text:
return text
scrubbed = _USERINFO_RE.sub(lambda m: f"{m.group('scheme')}***@", text)
if url:
scrubbed = scrubbed.replace(url, redact_url(url))
Comment thread
Zivxx marked this conversation as resolved.
return scrubbed


def is_http_error_response(
response: Union[aiohttp.ClientResponse, httpx.Response]
Expand Down
Loading
Loading