Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions documentation/docs/getting-started/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,19 @@ Whether to include traceback information in log messages.

#### OPAL_LOG_DIAGNOSE

Default: `True`
Default: `False`

Include diagnosis in log messages.
Include diagnosis (local variable values) in tracebacks. Also gates verbose git
SSH protocol tracing (`GIT_TRACE`/`GIT_CURL_VERBOSE`).

_Added in OPAL v0.6.0_

:::caution Default changed
The default is now `False` (previously `True`). Operators upgrading lose
diagnostic tracebacks and verbose git tracing unless they opt back in with
`OPAL_LOG_DIAGNOSE=true`.
:::

:::danger
When `OPAL_LOG_DIAGNOSE` is enabled, logs may contain sensitive information such as secrets.
:::
Expand Down
11 changes: 6 additions & 5 deletions packages/opal-client/opal_client/callbacks/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opal_client.callbacks.register import CallbackConfig, CallbacksRegister
from opal_client.data.fetcher import DataFetcher
from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig
from opal_common.http_utils import is_http_error_response
from opal_common.http_utils import is_http_error_response, redact_url
from opal_common.logger import logger
from opal_common.schemas.data import DataUpdateReport

Expand Down Expand Up @@ -60,16 +60,17 @@ async def report_update_results(
config.data = report_data
callback_requests.append((url, config, None))

# log only the URLs — FetcherConfig may carry Authorization headers and the full report payload
urls = [request[0] for request in callback_requests]
# log only the URLs — FetcherConfig may carry Authorization headers and the full report payload.
# Callback URLs are user-supplied and can embed credentials, so redact them too.
urls = [redact_url(request[0]) for request in callback_requests]
logger.info("Reporting the update to requested callbacks", urls=urls)
report_results = await self._fetcher.handle_urls(callback_requests)
# log reports which we failed to send
for url, config, result in report_results:
if isinstance(result, Exception):
logger.error(
"Failed to send report to {url}, info={exc_info}",
url=url,
url=redact_url(url),
exc_info=repr(result),
)
if isinstance(
Expand All @@ -83,7 +84,7 @@ async def report_update_results(
error_content = await result.text()
logger.error(
"Failed to send report to {url}, got response code {status} with error: {error}",
url=url,
url=redact_url(url),
status=result.status,
error=error_content,
)
Expand Down
7 changes: 4 additions & 3 deletions packages/opal-client/opal_client/data/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from opal_common.fetcher import FetchingEngine
from opal_common.fetcher.events import FetcherConfig
from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig
from opal_common.http_utils import redact_url
from opal_common.logger import logger
from opal_common.utils import get_authorization_header, tuple_to_dict

Expand Down Expand Up @@ -62,20 +63,20 @@ async def handle_url(
) -> Optional[JsonableValue]:
"""Helper function wrapping self._engine.handle_url."""
if data is not None:
logger.info("Data provided inline for url: {url}", url=url)
logger.info("Data provided inline for url: {url}", url=redact_url(url))
return data

if url is None:
logger.error("Invalid data update: no embedded data or URL")
return None

logger.info("Fetching data from url: {url}", url=url)
logger.info("Fetching data from url: {url}", url=redact_url(url))
try:
# ask the engine to get our data
response = await self._engine.handle_url(url, config=config)
return response
except asyncio.TimeoutError as e:
logger.exception("Timeout while fetching url: {url}", url=url)
logger.exception("Timeout while fetching url: {url}", url=redact_url(url))
raise

async def handle_urls(
Expand Down
64 changes: 45 additions & 19 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
)
from opal_common.async_utils import TasksPool, repeated_call
from opal_common.config import opal_common_config
from opal_common.http_utils import is_http_error_response
from opal_common.http_utils import (
is_http_error_response,
redact_url,
redact_url_in_text,
)
from opal_common.schemas.data import (
DataEntryReport,
DataSourceConfig,
Expand Down Expand Up @@ -227,7 +231,10 @@ async def get_policy_data_config(self, url: str = None) -> DataSourceConfig:
"""
if url is None:
url = self._data_sources_config_url
logger.info("Getting data-sources configuration from '{source}'", source=url)
logger.info(
"Getting data-sources configuration from '{source}'",
source=redact_url(url),
)

try:
async with ClientSession(
Expand Down Expand Up @@ -446,8 +453,10 @@ def calc_hash(data: JsonableValue) -> str:
if not isinstance(data, str):
data = json.dumps(data, default=pydantic_encoder)
return hashlib.sha256(data.encode("utf-8")).hexdigest()
except Exception as e:
logger.exception(f"Failed to calculate hash for data {data}: {e}")
except Exception:
# Don't interpolate ``data`` - it may be an inline credential-bearing
# payload. logger.exception already records the traceback.
logger.exception("Failed to calculate hash for data")
return ""

async def _update_policy_data(self, update: DataUpdate) -> None:
Expand Down Expand Up @@ -475,14 +484,17 @@ async def _update_policy_data(self, update: DataUpdate) -> None:

for entry in update.entries:
if not entry.topics:
logger.debug("Data entry {entry} has no topics, skipping", entry=entry)
logger.debug(
"Data entry for url {url} has no topics, skipping",
url=redact_url(entry.url),
)
continue

# Only process entries that match one of our subscribed data topics
if set(entry.topics).isdisjoint(set(self._data_topics)):
logger.debug(
"Data entry {entry} has no topics matching the data topics, skipping",
entry=entry,
"Data entry for url {url} has no topics matching the data topics, skipping",
url=redact_url(entry.url),
)
continue

Expand Down Expand Up @@ -546,26 +558,34 @@ async def _fetch_and_save_data(
try:
result = await self._fetch_data(entry)
except Exception as e:
# url is persisted into the transaction log and the OPA-failed error
# log; redact it (and the exception text) - the functional fetch
# already used the real entry.url above.
store_transaction._update_remote_status(
url=entry.url, status=False, error=str(e)
url=redact_url(entry.url),
status=False,
error=redact_url_in_text(str(e), entry.url),
)
return DataEntryReport(entry=entry, fetched=False, saved=False)

try:
await self._store_fetched_data(entry, result, store_transaction)
except Exception as e:
logger.exception("Failed to save data update to policy-store: {exc}", exc=e)
# logger.exception already records the traceback; don't bind the raw
# exception (str(e) can carry a credentialed URL).
logger.exception("Failed to save data update to policy-store")
store_transaction._update_remote_status(
url=entry.url,
url=redact_url(entry.url),
status=False,
error=f"Failed to save data to policy store: {e}",
error=f"Failed to save data to policy store: "
f"{redact_url_in_text(str(e), entry.url)}",
)
return DataEntryReport(
entry=entry, hash=self.calc_hash(result), fetched=True, saved=False
)
else:
store_transaction._update_remote_status(
url=entry.url, status=True, error=""
url=redact_url(entry.url), status=True, error=""
)
return DataEntryReport(
entry=entry, hash=self.calc_hash(result), fetched=True, saved=True
Expand All @@ -588,28 +608,34 @@ async def _fetch_data(self, entry: DataSourceEntry) -> JsonableValue:
data=entry.data,
)
except Exception as e:
# logger.exception already records the active traceback; don't bind
# the raw exception into the message - str(e) on a 3rd-party error
# (e.g. aiohttp) can carry a credentialed URL and would be serialized
# verbatim under LOG_SERIALIZE.
logger.exception(
"Failed to fetch data for entry {entry} with exception {exc}",
entry=entry,
exc=e,
"Failed to fetch data for entry url {url}",
url=redact_url(entry.url),
)
Comment thread
Zivxx marked this conversation as resolved.
raise Exception(
f"Failed to fetch data for entry {redact_url(entry.url)}: "
f"{redact_url_in_text(str(e), entry.url)}"
)
raise Exception(f"Failed to fetch data for entry {entry.url}: {e}")

if result is None:
raise Exception(f"Fetched data is empty for entry {entry.url}")
raise Exception(f"Fetched data is empty for entry {redact_url(entry.url)}")

if isinstance(result, aiohttp.ClientResponse) and is_http_error_response(
result
):
error_content = await result.text()
logger.error(
"Failed to decode response from url: '{url}', got response code {status} with response: {error}",
url=entry.url,
url=redact_url(entry.url),
status=result.status,
error=error_content,
)
raise Exception(
f"Failed to decode response from url: '{entry.url}', got response code {result.status} with response: {error_content}"
f"Failed to decode response from url: '{redact_url(entry.url)}', got response code {result.status} with response: {error_content}"
)

return result
Expand Down
9 changes: 8 additions & 1 deletion packages/opal-common/opal_common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@ class OpalCommonConfig(Confi):
"LOG_TRACEBACK", True, description="Include traceback in log messages"
)
LOG_DIAGNOSE = confi.bool(
"LOG_DIAGNOSE", True, description="Include diagnosis in log messages"
"LOG_DIAGNOSE",
False,
description="Include diagnosis (local variable values) in tracebacks. "
"Off by default because loguru renders raw variable values, which can "
"leak credentials (e.g. auth headers/tokens) into logs - only enable for "
"local debugging. Also gates verbose git SSH protocol tracing "
"(GIT_TRACE/GIT_CURL_VERBOSE on SSH clones), which adds noisy protocol/"
"host disclosure to logs.",
)
LOG_COLORIZE = confi.bool("LOG_COLORIZE", True, description="Colorize log messages")
LOG_SERIALIZE = confi.bool(
Expand Down
19 changes: 15 additions & 4 deletions packages/opal-common/opal_common/fetcher/events.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
from typing import List, Optional
from typing import ClassVar, List, Optional, Set

from opal_common.logging_utils.redaction import RedactedReprMixin
from pydantic import BaseModel, Field


class FetcherConfig(BaseModel):
class FetcherConfig(RedactedReprMixin, BaseModel):
"""The configuration of a fetcher, used as part of a FetchEvent Fetch
Provider's have their own uniqueue events and configurations.
Provider's have their own unique events and configurations.

Configurations

Note: subclasses commonly carry credentials (e.g. auth headers). They must
list any secret-bearing fields in ``_redacted_repr_fields`` so they never
leak into logs - see ``RedactedReprMixin``.
"""

fetcher: Optional[str] = Field(
Expand All @@ -16,12 +21,18 @@ class FetcherConfig(BaseModel):
)


class FetchEvent(BaseModel):
class FetchEvent(RedactedReprMixin, BaseModel):
"""Event used to describe an queue fetching tasks Design note -

By using a Pydantic model - we can create a potentially transfer FetchEvents to be handled by other network nodes (perhaps via RPC)
"""

# ``config`` may carry a FetcherConfig with credentials - mask it in repr/str.
_redacted_repr_fields: ClassVar[Set[str]] = {"config"}
Comment thread
Zivxx marked this conversation as resolved.
# ``url`` can embed credentials (``user:token@host`` / ``?token=``); strip
# them via redact_url while keeping host/path visible for debugging.
_redacted_url_fields: ClassVar[Set[str]] = {"url"}

# Event id to be filled by the engine
id: str = None
# optional name of the specific event
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
"""Simple HTTP get data fetcher using requests supports."""

from typing import ClassVar, Set

from fastapi_websocket_rpc.rpc_methods import RpcMethodsBase
from fastapi_websocket_rpc.websocket_rpc_client import WebSocketRpcClient
from opal_common.fetcher.events import FetcherConfig, FetchEvent
from opal_common.fetcher.fetch_provider import BaseFetchProvider
from opal_common.fetcher.logger import get_logger
from opal_common.http_utils import redact_url

logger = get_logger("rpc_fetch_provider")


class FastApiRpcFetchConfig(FetcherConfig):
"""Config for FastApiRpcFetchConfig's Adding HTTP headers."""

# ``rpc_arguments`` may carry credentials - mask it in repr/str.
_redacted_repr_fields: ClassVar[Set[str]] = {"rpc_arguments"}

rpc_method_name: str
rpc_arguments: dict

Expand All @@ -38,8 +44,9 @@ async def _fetch_(self):
args = self._event.config.rpc_arguments
method = self._event.config.rpc_method_name
result = None
# Note: ``args`` (rpc_arguments) may carry credentials - never log it.
logger.info(
f"{self.__class__.__name__} fetching from {self._url} with RPC call {method}({args})"
f"{self.__class__.__name__} fetching from {redact_url(self._url)} with RPC call {method}"
)
Comment thread
Zivxx marked this conversation as resolved.
async with WebSocketRpcClient(
self._url,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Simple HTTP get data fetcher using requests supports."""

from enum import Enum
from typing import Any, Union, cast
from typing import Any, ClassVar, Set, Union, cast

import httpx
from aiohttp import ClientResponse, ClientSession, ClientTimeout
from opal_common.config import opal_common_config
from opal_common.fetcher.events import FetcherConfig, FetchEvent
from opal_common.fetcher.fetch_provider import BaseFetchProvider
from opal_common.fetcher.logger import get_logger
from opal_common.http_utils import is_http_error_response
from opal_common.http_utils import is_http_error_response, redact_url
from opal_common.security.sslcontext import get_custom_ssl_context
from pydantic import validator

Expand All @@ -28,6 +28,10 @@ class HttpMethods(Enum):
class HttpFetcherConfig(FetcherConfig):
"""Config for HttpFetchProvider's Adding HTTP headers."""

# ``headers`` carries Authorization tokens and ``data`` the (possibly
# sensitive) payload - mask both in repr/str so they never leak into logs.
_redacted_repr_fields: ClassVar[Set[str]] = {"headers", "data"}

headers: dict = None
is_json: bool = True
process_data: bool = True
Expand Down Expand Up @@ -91,7 +95,7 @@ async def __aexit__(self, exc_type=None, exc_val=None, tb=None):
await self._session.__aexit__(exc_type, exc_val, tb)

async def _fetch_(self):
logger.debug(f"{self.__class__.__name__} fetching from {self._url}")
logger.debug(f"{self.__class__.__name__} fetching from {redact_url(self._url)}")
http_method = self.match_http_method_from_type(
self._session, self._event.config.method
)
Expand Down
16 changes: 13 additions & 3 deletions packages/opal-common/opal_common/git_utils/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,18 @@ def provide_git_ssh_environment(url: str, ssh_key: str):
if not is_ssh_repo_url(url) or ssh_key is None:
return {} # no ssh config
git_ssh_identity_file = save_ssh_key_to_pem_file(ssh_key)
return {
env = {
"GIT_SSH_COMMAND": f"ssh -o StrictHostKeyChecking=no -o IdentitiesOnly=yes -i {git_ssh_identity_file}",
"GIT_TRACE": "1",
"GIT_CURL_VERBOSE": "1",
}
# This function only runs for SSH clones (non-SSH urls early-return above),
# so the value of gating GIT_TRACE / GIT_CURL_VERBOSE here is reducing the
# verbose SSH protocol noise / host disclosure git dumps to stderr (which
# OPAL captures into its logs) - not closing an HTTP Authorization-header
# leak, since SSH uses key auth, not HTTP headers. (HTTPS clones, where
# GIT_CURL_VERBOSE would dump Authorization headers, never enter this path;
# that would have to be handled where the HTTPS clone env is built.)
# Only enable verbose tracing when diagnosis logging is explicitly turned on.
if opal_common_config.LOG_DIAGNOSE:
env["GIT_TRACE"] = "1"
env["GIT_CURL_VERBOSE"] = "1"
return env
Loading
Loading