Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 75 additions & 2 deletions connexion/middleware/routing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import typing as t
from contextvars import ContextVar
from types import SimpleNamespace

import starlette.convertors
from starlette.routing import Router
Expand All @@ -15,17 +17,67 @@
from connexion.resolver import Resolver
from connexion.spec import Specification

logger = logging.getLogger(__name__)

_scope: ContextVar[dict] = ContextVar("SCOPE")

# Type for routing hook callbacks
# Callback receives: (route_path, operation_id, scope)
RoutingHook = t.Callable[[str, t.Optional[str], Scope], None]


class RoutingOperation:
def __init__(self, operation_id: t.Optional[str], next_app: ASGIApp) -> None:
"""Represents a routed operation that attaches routing context to the ASGI scope."""

# Class-level list of hooks invoked after routing resolution completes.
# Use after_routing_resolution() to register hooks.
_routing_hooks: t.ClassVar[t.List[RoutingHook]] = []

def __init__(
self,
operation_id: t.Optional[str],
next_app: ASGIApp,
path: t.Optional[str] = None,
) -> None:
self.operation_id = operation_id
self.next_app = next_app
self.path = path

@classmethod
def from_operation(cls, operation: AbstractOperation, next_app: ASGIApp):
return cls(operation.operation_id, next_app)
return cls(operation.operation_id, next_app, path=operation.path)

@classmethod
def after_routing_resolution(cls, hook: RoutingHook) -> None:
"""Register a hook to run after routing resolution, before the operation is called.

This hook runs after Connexion determines which operation handles the request,
but before the operation function is invoked. Use cases include:

- Observability: Set span attributes, update trace names (OTEL, Datadog, etc.)
- Logging: Log route information with structured context
- Metrics: Record routing metrics or counters
- Auditing: Track which operations are called

The hook receives:
- route_path: The OpenAPI path template (e.g., "/v1/users/{user_id}")
- operation_id: The operation identifier from the OpenAPI spec
- scope: The ASGI scope dict with request information

Example:
from connexion.middleware.routing import RoutingOperation

def log_route(route_path, operation_id, scope):
print(f"Routing {scope['method']} to {route_path}")

RoutingOperation.after_routing_resolution(log_route)
"""
cls._routing_hooks.append(hook)

@classmethod
def clear_routing_hooks(cls) -> None:
"""Clear all registered routing hooks. Useful for testing."""
cls._routing_hooks.clear()

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""Attach operation to scope and pass it to the next app"""
Expand All @@ -45,6 +97,27 @@ def get_root_path(scope: Scope) -> str:
connexion_routing.update(
{"api_base_path": api_base_path, "operation_id": self.operation_id}
)

# Set scope["route"] for OpenTelemetry instrumentation compatibility.
# OTEL's ASGI middleware reads scope["route"].path to populate http.route
# attribute on spans and metrics, which is required for proper transaction
# naming in APM tools like NewRelic.
if self.path is not None:
full_route_path = f"{api_base_path}{self.path}"
original_scope["route"] = SimpleNamespace(path=full_route_path)

# Invoke registered routing hooks
for hook in self._routing_hooks:
try:
hook(full_route_path, self.operation_id, original_scope)
except Exception:
# Don't let hook errors break request processing
logger.debug(
"Routing hook error for %s (ignored)",
full_route_path,
exc_info=True,
)

await self.next_app(original_scope, receive, send)


Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ exclude_lines = [
"@t.overload",
]

[tool.bandit]
exclude_dirs = ["tests"]
skips = ["B101"]

[[tool.mypy.overrides]]
module = "referencing.jsonschema.*"
follow_imports = "skip"
Expand Down
144 changes: 144 additions & 0 deletions tests/test_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ async def patched_send(message):
await self.app(scope, receive, patched_send)


class RoutePathMiddleware:
"""Middleware to check if scope["route"].path is set for OTEL compatibility."""

__test__ = False

def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
# Read scope["route"].path which OTEL ASGI middleware uses for http.route
route_obj = scope.get("route")
route_path = route_obj.path if route_obj else ""

async def patched_send(message):
if message["type"] != "http.response.start":
await send(message)
return

message.setdefault("headers", [])
headers = MutableHeaders(scope=message)
headers["x-route-path"] = route_path

await send(message)

await self.app(scope, receive, patched_send)


@pytest.fixture(scope="session")
def middleware_app(spec, app_class):
middlewares = ConnexionMiddleware.default_middlewares + [TestMiddleware]
Expand All @@ -44,6 +71,14 @@ def middleware_app(spec, app_class):
)


@pytest.fixture(scope="session")
def route_path_app(spec, app_class):
middlewares = ConnexionMiddleware.default_middlewares + [RoutePathMiddleware]
return build_app_from_fixture(
"simple", app_class=app_class, spec_file=spec, middlewares=middlewares
)


def test_routing_middleware(middleware_app):
app_client = middleware_app.test_client()

Expand All @@ -54,6 +89,35 @@ def test_routing_middleware(middleware_app):
), response.status_code


def test_route_path_for_otel(route_path_app):
"""Test that scope['route'].path is set for OpenTelemetry instrumentation.

OTEL ASGI middleware reads scope["route"].path to populate the http.route
attribute on spans and metrics, which is required for proper transaction
naming in APM tools like NewRelic.
"""
app_client = route_path_app.test_client()

response = app_client.post("/v1.0/greeting/robbe")

# The route path should be the OpenAPI path template with base path
assert ( # nosec B101
response.headers.get("x-route-path") == "/v1.0/greeting/{name}"
), f"Expected /v1.0/greeting/{{name}}, got {response.headers.get('x-route-path')}"


def test_route_path_with_multiple_params(route_path_app):
"""Test route path with multiple path parameters."""
app_client = route_path_app.test_client()

response = app_client.post("/v1.0/greeting/robbe/extra/path")

# Path with remainder parameter
assert ( # nosec B101
response.headers.get("x-route-path") == "/v1.0/greeting/{name}/{remainder}"
), f"Expected /v1.0/greeting/{{name}}/{{remainder}}, got {response.headers.get('x-route-path')}"


def test_add_middleware(spec, app_class):
"""Test adding middleware via the `add_middleware` method."""
app = build_app_from_fixture("simple", app_class=app_class, spec_file=spec)
Expand Down Expand Up @@ -109,3 +173,83 @@ def __call__(
app_client.post("/v1.0/greeting/robbe")

mock.assert_called_once()


class TestRoutingHooks:
"""Tests for the after_routing_resolution hook mechanism."""

def test_hook_receives_route_info(self, spec, app_class):
"""Test that registered hooks receive route path and operation_id."""
from connexion.middleware.routing import RoutingOperation

# Clear any existing hooks from other tests
RoutingOperation.clear_routing_hooks()

captured = {}

def capture_route_info(route_path, operation_id, scope):
captured["route_path"] = route_path
captured["operation_id"] = operation_id
captured["method"] = scope.get("method")

RoutingOperation.after_routing_resolution(capture_route_info)

try:
app = build_app_from_fixture("simple", app_class=app_class, spec_file=spec)
app_client = app.test_client()
app_client.post("/v1.0/greeting/robbe")

assert captured["route_path"] == "/v1.0/greeting/{name}" # nosec B101
assert (
captured["operation_id"] == "fakeapi.hello.post_greeting"
) # nosec B101
assert captured["method"] == "POST" # nosec B101
finally:
RoutingOperation.clear_routing_hooks()

def test_multiple_hooks(self, spec, app_class):
"""Test that multiple hooks are all invoked."""
from connexion.middleware.routing import RoutingOperation

RoutingOperation.clear_routing_hooks()

call_count = {"first": 0, "second": 0}

def first_hook(route_path, operation_id, scope):
call_count["first"] += 1

def second_hook(route_path, operation_id, scope):
call_count["second"] += 1

RoutingOperation.after_routing_resolution(first_hook)
RoutingOperation.after_routing_resolution(second_hook)

try:
app = build_app_from_fixture("simple", app_class=app_class, spec_file=spec)
app_client = app.test_client()
app_client.post("/v1.0/greeting/robbe")

assert call_count["first"] == 1 # nosec B101
assert call_count["second"] == 1 # nosec B101
finally:
RoutingOperation.clear_routing_hooks()

def test_hook_error_does_not_break_request(self, spec, app_class):
"""Test that hook errors don't break request processing."""
from connexion.middleware.routing import RoutingOperation

RoutingOperation.clear_routing_hooks()

def failing_hook(route_path, operation_id, scope):
raise RuntimeError("Intentional error")

RoutingOperation.after_routing_resolution(failing_hook)

try:
app = build_app_from_fixture("simple", app_class=app_class, spec_file=spec)
app_client = app.test_client()
# Request should still succeed despite hook error
response = app_client.post("/v1.0/greeting/robbe")
assert response.status_code == 200 # nosec B101
finally:
RoutingOperation.clear_routing_hooks()