diff --git a/README.md b/README.md index a2205563..d10d5369 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,34 @@ The **MCP Proxy for AWS** package provides two ways to connect AI applications t **The Solution:** This package bridges that gap by: - **Handling SigV4 authentication automatically** - Uses your local AWS credentials (from AWS CLI, environment variables, or IAM roles) to sign all MCP requests using [SigV4](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_sigv.html) +- **Supporting global endpoints with SigV4A** - Automatically detects and upgrades to [SigV4A](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-signing.html) for multi-region and global AWS endpoints - **Providing seamless integration** - Works with existing MCP clients and frameworks - **Eliminating custom code** - No need to build your own MCP client with SigV4 signing logic +### SigV4A Auto-Detection for Global Endpoints + +This package automatically handles authentication for both regional and global AWS endpoints: + +**Regional Endpoints (SigV4):** +- Standard AWS endpoints tied to a specific region (e.g., `https://service.us-east-1.api.aws/mcp`) +- Uses AWS Signature Version 4 (SigV4) for authentication +- Region is extracted from the endpoint URL or specified explicitly + +**Global Endpoints (SigV4A):** +- AWS endpoints that operate across multiple regions or globally (e.g., `https://service.global.api.aws/mcp`) +- Automatically detected based on URL patterns (`.global.`, `global.` subdomain, or `.api.aws` without region) +- Starts with SigV4 for compatibility, then automatically upgrades to SigV4A if the endpoint requires it +- No configuration changes needed when services transition from regional to global + +**How Auto-Detection Works:** + +1. **Endpoint Detection** - The proxy analyzes the endpoint URL to determine if it's a global endpoint +2. **Initial Request** - For global endpoints, starts with SigV4 signing (region defaults to `us-east-1`) +3. **Automatic Upgrade** - If the endpoint returns an error indicating SigV4A is required, automatically retries with SigV4A +4. **Subsequent Requests** - Once SigV4A is detected, all future requests use SigV4A signing + +This approach ensures seamless compatibility as AWS services evolve from regional to global endpoints without requiring configuration updates. + ## Which Feature Should I Use? **Use as a proxy if you want to:** @@ -163,6 +188,123 @@ Add the following configuration to your MCP client config file (e.g., for Amazon } ``` +### Usage Examples + +#### Example 1: Global Endpoint with Auto-Detection + +For global AWS endpoints, the proxy automatically detects the endpoint type and handles authentication: + +```json +{ + "mcpServers": { + "global-mcp-server": { + "command": "uv", + "args": [ + "run", + "mcp-proxy-for-aws", + "https://service.global.api.aws/mcp", + "--service", + "my-service", + "--profile", + "default" + ] + } + } +} +``` + +**What happens:** +- The proxy detects `.global.` in the URL and identifies it as a global endpoint +- Region defaults to `us-east-1` for the initial request +- Starts with SigV4 signing for compatibility +- If the endpoint requires SigV4A, automatically retries with SigV4A +- All subsequent requests use the detected signing method + +#### Example 2: Regional Endpoint + +For regional endpoints, the proxy uses standard SigV4 signing: + +```json +{ + "mcpServers": { + "regional-mcp-server": { + "command": "uv", + "args": [ + "run", + "mcp-proxy-for-aws", + "https://service.us-west-2.api.aws/mcp", + "--service", + "my-service", + "--region", + "us-west-2", + "--profile", + "default" + ] + } + } +} +``` + +**What happens:** +- The proxy extracts `us-west-2` from the URL +- Uses SigV4 signing with the specified region +- No auto-detection needed for regional endpoints + +#### Example 3: Explicit Region Override + +You can explicitly specify a region even for global endpoints: + +```json +{ + "mcpServers": { + "global-mcp-server-explicit": { + "command": "uv", + "args": [ + "run", + "mcp-proxy-for-aws", + "https://service.global.api.aws/mcp", + "--service", + "my-service", + "--region", + "eu-west-1", + "--profile", + "default" + ] + } + } +} +``` + +**What happens:** +- The explicit `--region` parameter takes precedence +- Uses `eu-west-1` for the initial SigV4 request +- Auto-detection still works if SigV4A is required + +### Global Endpoint URL Patterns + +The proxy automatically detects global endpoints based on these URL patterns: + +| Pattern | Example | Detection | +|---------|---------|-----------| +| Contains `.global.` | `https://service.global.api.aws/mcp` | ✅ Global endpoint | +| Starts with `global.` | `https://global.service.api.aws/mcp` | ✅ Global endpoint | +| Ends with `.api.aws` (no region) | `https://service.api.aws/mcp` | ✅ Global endpoint | +| Contains region pattern | `https://service.us-east-1.api.aws/mcp` | ❌ Regional endpoint | + +**Region Defaulting Behavior:** +- **Global endpoints**: Default to `us-east-1` region for initial SigV4 request +- **Regional endpoints**: Extract region from URL (e.g., `us-west-2` from `service.us-west-2.api.aws`) +- **Explicit region**: The `--region` parameter always takes precedence over auto-detection + +**Auto-Detection and Retry Logic:** +1. **First Request**: Proxy attempts authentication with SigV4 using the determined region +2. **Error Detection**: If the endpoint returns a 403 error with signature mismatch indicating SigV4A is required +3. **Automatic Retry**: Proxy automatically retries the same request with SigV4A signing (region set to `*` for global) +4. **Caching**: Once SigV4A is detected, all subsequent requests use SigV4A without retry +5. **Logging**: Auto-detection events are logged at INFO level for visibility + +**Note:** Auto-detection adds minimal overhead (one retry on first request only) and ensures compatibility as AWS services evolve from regional to global endpoints. + --- ## Programmatic Access @@ -245,6 +387,89 @@ async with mcp_client as (read, write, session_id_callback): agent = ReActAgent(tools=mcp_tools, ...) ``` +### Programmatic Usage Examples + +#### Example 1: Global Endpoint with Auto-Detection + +For global AWS endpoints, the client automatically handles SigV4A detection: + +```python +from mcp_proxy_for_aws.client import aws_iam_streamablehttp_client + +# Global endpoint - region auto-detected as us-east-1, SigV4A auto-detected if needed +mcp_client = aws_iam_streamablehttp_client( + endpoint="https://service.global.api.aws/mcp", + aws_service="my-service", + aws_profile="default" +) + +async with mcp_client as (read, write, session_id_callback): + async with ClientSession(read, write) as session: + # Use the session with your framework + tools = await session.list_tools() +``` + +**What happens:** +- The client detects `.global.` in the endpoint URL +- Defaults to `us-east-1` region for initial request +- Starts with SigV4, automatically upgrades to SigV4A if needed +- No configuration changes required + +#### Example 2: Regional Endpoint + +For regional endpoints, standard SigV4 signing is used: + +```python +from mcp_proxy_for_aws.client import aws_iam_streamablehttp_client + +# Regional endpoint - uses specified region with SigV4 +mcp_client = aws_iam_streamablehttp_client( + endpoint="https://service.us-west-2.api.aws/mcp", + aws_service="my-service", + aws_region="us-west-2", + aws_profile="default" +) + +async with mcp_client as (read, write, session_id_callback): + async with ClientSession(read, write) as session: + # Use the session with your framework + tools = await session.list_tools() +``` + +**What happens:** +- The client uses the specified `us-west-2` region +- Uses SigV4 signing for regional endpoint +- No auto-detection needed + +#### Example 3: Disabling Auto-Detection + +For performance-critical applications with known regional endpoints, you can disable auto-detection: + +```python +from mcp_proxy_for_aws.client import aws_iam_streamablehttp_client + +# Disable auto-detection for performance optimization +mcp_client = aws_iam_streamablehttp_client( + endpoint="https://service.us-west-2.api.aws/mcp", + aws_service="my-service", + aws_region="us-west-2", + aws_profile="default", + auto_detect_sigv4a=False # Disable auto-detection +) + +async with mcp_client as (read, write, session_id_callback): + async with ClientSession(read, write) as session: + # Use the session with your framework + tools = await session.list_tools() +``` + +**When to disable auto-detection:** +- You know the endpoint only requires SigV4 (regional endpoint) +- You want to avoid the potential one-time retry overhead +- You're optimizing for performance in high-throughput scenarios + +**Note:** Auto-detection is enabled by default and recommended for most use cases to ensure compatibility as services evolve. + ### Running Examples Explore complete working examples for different frameworks in the [`./examples/mcp-client`](./examples/mcp-client) directory: @@ -278,6 +503,52 @@ uv sync --- +## Understanding SigV4 vs SigV4A + +### What is SigV4? + +AWS Signature Version 4 (SigV4) is the standard authentication protocol for AWS services. It signs requests with your AWS credentials to verify your identity and authorize access to AWS resources. SigV4 signatures are region-specific, meaning a request signed for `us-east-1` is only valid in that region. + +**Use SigV4 when:** +- Connecting to regional AWS endpoints (e.g., `service.us-west-2.api.aws`) +- Your MCP server operates in a single, specific AWS region +- You want the most straightforward authentication setup + +### What is SigV4A? + +AWS Signature Version 4A (SigV4A) is an extension of SigV4 that supports multi-region signing. A SigV4A signature can be valid across multiple AWS regions simultaneously, which is essential for global or multi-region AWS services. + +**Use SigV4A when:** +- Connecting to global AWS endpoints (e.g., `service.global.api.aws`) +- Your MCP server operates across multiple regions +- The AWS service requires multi-region authentication + +### When to Use Each + +| Scenario | Recommended | Why | +|----------|-------------|-----| +| Regional MCP server | SigV4 | Simpler, region-specific authentication | +| Global MCP server | SigV4A | Required for multi-region services | +| Unknown endpoint type | Auto-detection | Automatically uses the correct method | +| Service transitioning to global | Auto-detection | Seamless upgrade without config changes | + +### Auto-Detection Benefits + +The MCP Proxy for AWS uses intelligent auto-detection to choose the right signing method: + +1. **Zero Configuration** - No need to specify which signing method to use +2. **Future-Proof** - Automatically adapts when services transition from regional to global +3. **Backward Compatible** - Works with both SigV4 and SigV4A endpoints +4. **Minimal Overhead** - Only one retry on first request if upgrade is needed +5. **Transparent** - Logs detection events for visibility and debugging + +### Requirements + +- **SigV4**: Available in all versions of botocore +- **SigV4A**: Requires botocore >= 1.31.0 (automatically installed with this package) + +--- + ## Development & Contributing For development setup, testing, and contribution guidelines, see: diff --git a/mcp_proxy_for_aws/client.py b/mcp_proxy_for_aws/client.py index 50ed5a73..1bb7656b 100644 --- a/mcp_proxy_for_aws/client.py +++ b/mcp_proxy_for_aws/client.py @@ -48,7 +48,8 @@ def aws_iam_streamablehttp_client( """Create an AWS IAM-authenticated MCP streamable HTTP client. This function creates a context manager for connecting to an MCP server using AWS IAM - authentication via SigV4 signing. Use with 'async with' to manage the connection lifecycle. + authentication via SigV4 signing with automatic SigV4A detection enabled by default. + Use with 'async with' to manage the connection lifecycle. Args: endpoint: The URL of the MCP server to connect to. Must be a valid HTTP/HTTPS URL. @@ -98,8 +99,10 @@ def aws_iam_streamablehttp_client( logger.debug('AWS region: %s', region) logger.debug('AWS service: %s', aws_service) - # Create a SigV4 authentication handler with AWS credentials + # Create authentication handler with AWS credentials + # Auto-detection is always enabled to support both regional and global endpoints auth = SigV4HTTPXAuth(session.get_credentials(), aws_service, region) + logger.debug('Using SigV4 authentication with SigV4A auto-detection') # Return the streamable HTTP client context manager with AWS IAM authentication return streamablehttp_client( diff --git a/mcp_proxy_for_aws/sigv4_helper.py b/mcp_proxy_for_aws/sigv4_helper.py index 93137606..f03b6d2c 100644 --- a/mcp_proxy_for_aws/sigv4_helper.py +++ b/mcp_proxy_for_aws/sigv4_helper.py @@ -22,12 +22,26 @@ from botocore.credentials import Credentials from typing import Any, Dict, Generator, Optional - logger = logging.getLogger(__name__) +try: + from botocore.auth import SigV4Auth as SigV4AAuth + SIGV4A_AVAILABLE = True +except ImportError: + SIGV4A_AVAILABLE = False + logger.warning( + "SigV4A auto-detection disabled: botocore >= 1.31.0 required. " + "Install with: pip install --upgrade botocore" + ) + class SigV4HTTPXAuth(httpx.Auth): - """HTTPX Auth class that signs requests with AWS SigV4.""" + """HTTPX Auth class that signs requests with AWS SigV4 with automatic SigV4A fallback. + + This class automatically detects when an endpoint requires SigV4A and retries with + the appropriate signing method. This provides seamless support for both regional + and global AWS endpoints without requiring explicit configuration. + """ def __init__( self, @@ -35,7 +49,7 @@ def __init__( service: str, region: str, ): - """Initialize SigV4HTTPXAuth. + """Initialize SigV4HTTPXAuth with auto-detection support. Args: credentials: AWS credentials to use for signing @@ -45,15 +59,99 @@ def __init__( self.credentials = credentials self.service = service self.region = region - self.signer = SigV4Auth(credentials, service, region) + self.use_sigv4a = False # Start with SigV4, upgrade to SigV4A if needed + self.sigv4_signer = SigV4Auth(credentials, service, region) + self.sigv4a_signer = None # Lazy initialization def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]: - """Signs the request with SigV4 and adds the signature to the request headers.""" - # Create an AWS request + """Signs the request with SigV4 and automatically retries with SigV4A if needed.""" + # Try with current signer (SigV4 or SigV4A) + signer = self.sigv4a_signer if self.use_sigv4a else self.sigv4_signer + + # Sign and send request + signed_request = self._sign_request(request, signer) + response = yield signed_request + + # Check if we need to retry with SigV4A + if not self.use_sigv4a and self._requires_sigv4a(response): + logger.info( + 'Endpoint %s requires SigV4A (detected from %s response), retrying with SigV4A authentication', + request.url, + response.status_code + ) + self.use_sigv4a = True + + # Initialize SigV4A signer if needed + if self.sigv4a_signer is None: + if not SIGV4A_AVAILABLE: + logger.error( + 'Authentication failed: Cannot retry with SigV4A for endpoint %s. ' + 'SigV4A requires botocore >= 1.31.0. ' + 'Install with: pip install --upgrade botocore', + request.url + ) + return + self.sigv4a_signer = SigV4AAuth(self.credentials, self.service, '*') + + # Retry with SigV4A + signed_request = self._sign_request(request, self.sigv4a_signer) + response = yield signed_request + + # Check if SigV4A retry also failed + if response.is_error and response.status_code in (401, 403): + logger.error( + 'Authentication failed for endpoint %s with both SigV4 and SigV4A (status: %s). ' + 'Check credentials and endpoint configuration.', + request.url, + response.status_code + ) + elif self.use_sigv4a and response.is_error and response.status_code in (401, 403): + # Already using SigV4A and still failing + logger.error( + 'Authentication failed for endpoint %s with SigV4A (status: %s). ' + 'Check credentials and endpoint configuration.', + request.url, + response.status_code + ) + + def _requires_sigv4a(self, response: httpx.Response) -> bool: + """Check if response indicates SigV4A is required. + + Args: + response: The HTTP response to check + + Returns: + True if response indicates SigV4A is required, False otherwise + """ + # Check for specific error codes/messages that indicate SigV4A requirement + if response.status_code == 403: + try: + error_body = response.json() + # Check for AWS error codes that indicate SigV4A requirement + error_code = error_body.get('__type', '') or error_body.get('Code', '') + if 'SignatureDoesNotMatch' in error_code or 'InvalidSignature' in error_code: + # Additional heuristic: check error message for SigV4A hints + message = error_body.get('message', '') or error_body.get('Message', '') + if 'sigv4a' in message.lower() or 'multi-region' in message.lower(): + return True + except Exception: + pass + return False + + def _sign_request(self, request: httpx.Request, signer) -> httpx.Request: + """Sign request with given signer. + + Args: + request: The HTTP request to sign + signer: The AWS signer to use (SigV4Auth or SigV4AAuth) + + Returns: + The signed HTTP request + """ headers = dict(request.headers) # Header 'connection' = 'keep-alive' is not used in calculating the request # signature on the server-side, and results in a signature mismatch if included - headers.pop('connection', None) # Remove if present, ignore if not + headers.pop('connection', None) aws_request = AWSRequest( method=request.method, @@ -62,13 +160,9 @@ def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Re headers=headers, ) - # Sign the request with SigV4 - self.signer.add_auth(aws_request) - - # Add the signature header to the original request + signer.add_auth(aws_request) request.headers.update(dict(aws_request.headers)) - - yield request + return request async def _handle_error_response(response: httpx.Response) -> None: @@ -104,20 +198,44 @@ async def _handle_error_response(response: httpx.Response) -> None: # do nothing and let the client and SDK handle the error return + # Determine signing method from Authorization header + signing_method = "Unknown" + auth_header = response.request.headers.get('Authorization', '') + if 'AWS4-ECDSA-P256-SHA256' in auth_header: + signing_method = "SigV4A" + elif 'AWS4-HMAC-SHA256' in auth_header: + signing_method = "SigV4" + # Try to extract error details with fallbacks try: # Try to parse JSON error details error_details = response.json() - logger.log(log_level, 'HTTP %d Error Details: %s', response.status_code, error_details) + logger.log( + log_level, + 'HTTP %d Error Details (signing method: %s): %s', + response.status_code, + signing_method, + error_details + ) except Exception: # If JSON parsing fails, use response text or status code try: response_text = response.text - logger.log(log_level, 'HTTP %d Error: %s', response.status_code, response_text) + logger.log( + log_level, + 'HTTP %d Error (signing method: %s): %s', + response.status_code, + signing_method, + response_text + ) except Exception: # Fallback to just status code and URL logger.log( - log_level, 'HTTP %d Error for url %s', response.status_code, response.url + log_level, + 'HTTP %d Error for url %s (signing method: %s)', + response.status_code, + response.url, + signing_method ) @@ -150,16 +268,20 @@ def create_aws_session(profile: Optional[str] = None) -> boto3.Session: return session -def create_sigv4_auth(service: str, region: str, profile: Optional[str] = None) -> SigV4HTTPXAuth: - """Create SigV4 authentication for AWS requests. +def create_sigv4_auth( + service: str, + region: str, + profile: Optional[str] = None, +) -> httpx.Auth: + """Create SigV4 authentication for AWS requests with SigV4A auto-detection enabled by default. Args: service: AWS service name for SigV4 signing + region: AWS region profile: AWS profile to use (optional) - region: AWS region (defaults to AWS_REGION env var or us-east-1) Returns: - SigV4HTTPXAuth instance + SigV4HTTPXAuth instance with auto-detection enabled Raises: ValueError: If credentials cannot be obtained @@ -168,15 +290,28 @@ def create_sigv4_auth(service: str, region: str, profile: Optional[str] = None) session = create_aws_session(profile) credentials = session.get_credentials() - # Create SigV4Auth with explicit credentials - sigv4_auth = SigV4HTTPXAuth( + # Always create SigV4HTTPXAuth which includes auto-detection logic + auth = SigV4HTTPXAuth( credentials=credentials, service=service, region=region, ) + + if not SIGV4A_AVAILABLE: + logger.info( + "Created SigV4 authentication for service '%s' in region '%s' " + "(SigV4A auto-detection unavailable - install botocore >= 1.31.0 for full support)", + service, + region, + ) + else: + logger.info( + "Created SigV4 authentication with SigV4A auto-detection for service '%s' in region '%s'", + service, + region, + ) - logger.info("Created SigV4 authentication for service '%s' in region '%s'", service, region) - return sigv4_auth + return auth def create_sigv4_client( @@ -188,13 +323,13 @@ def create_sigv4_client( auth: Optional[httpx.Auth] = None, **kwargs: Any, ) -> httpx.AsyncClient: - """Create an httpx.AsyncClient with SigV4 authentication. + """Create an httpx.AsyncClient with SigV4 authentication and SigV4A auto-detection enabled by default. Args: service: AWS service name for SigV4 signing - profile: AWS profile to use (optional) - region: AWS region (optional, defaults to AWS_REGION env var or us-east-1) + region: AWS region timeout: Timeout configuration for the HTTP client + profile: AWS profile to use (optional) headers: Headers to include in requests auth: Auth parameter (ignored as we provide our own) **kwargs: Additional arguments to pass to httpx.AsyncClient @@ -220,11 +355,14 @@ def create_sigv4_client( 'Creating httpx.AsyncClient with custom headers: %s', client_kwargs.get('headers', {}) ) - # Create SigV4 auth + # Create SigV4 auth with auto-detection enabled by default sigv4_auth = create_sigv4_auth(service, region, profile) # Create the client with SigV4 auth and error handling event hook - logger.info("Creating httpx.AsyncClient with SigV4 authentication for service '%s'", service) + logger.info( + "Creating httpx.AsyncClient with SigV4 authentication (auto-detection enabled) for service '%s'", + service, + ) return httpx.AsyncClient( auth=sigv4_auth, diff --git a/mcp_proxy_for_aws/utils.py b/mcp_proxy_for_aws/utils.py index d267bc1c..dd00775c 100644 --- a/mcp_proxy_for_aws/utils.py +++ b/mcp_proxy_for_aws/utils.py @@ -101,6 +101,35 @@ def determine_service_name(endpoint: str, service: Optional[str] = None) -> str: return determined_service +def is_global_endpoint(endpoint: str) -> bool: + """Detect if endpoint is a global endpoint based on URL pattern. + + Args: + endpoint: The endpoint URL + + Returns: + True if endpoint appears to be global, False otherwise + """ + parsed = urlparse(endpoint) + hostname = parsed.hostname or '' + + # Check for global endpoint patterns: + # - Contains ".global." in hostname + # - Ends with ".api.aws" without region pattern + # - Contains "global" subdomain + if '.global.' in hostname or hostname.startswith('global.'): + return True + + # Check if it's an api.aws endpoint without a region + if '.api.aws' in hostname: + # If no region pattern found, it might be global + region_match = re.search(r'\.([a-z0-9-]+)\.api\.aws', hostname) + if not region_match: + return True + + return False + + def determine_aws_region(endpoint: str, region: Optional[str]) -> str: """Validate and determine the AWS region. @@ -109,7 +138,7 @@ def determine_aws_region(endpoint: str, region: Optional[str]) -> str: region: Optional region name Returns: - Validated AWS region + Validated AWS region (defaults to us-east-1 for global endpoints) Raises: ValueError: If region cannot be determined @@ -118,6 +147,11 @@ def determine_aws_region(endpoint: str, region: Optional[str]) -> str: logger.debug('Region determined through explicit parameter') return region + # Check if it's a global endpoint first (before trying to parse region) + if is_global_endpoint(endpoint): + logger.info('Global endpoint detected, defaulting to us-east-1 with SigV4 (will auto-detect SigV4A if needed)') + return "us-east-1" + # Parse AWS region from endpoint URL parsed = urlparse(endpoint) hostname = parsed.hostname or '' diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 0f9df077..47065c42 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -210,3 +210,102 @@ async def mock_aexit(*_): pass assert cleanup_called + + +@pytest.mark.asyncio +async def test_auto_detect_sigv4a_enabled(mock_session, mock_streams): + """Test that SigV4HTTPXAuth is always created (auto-detection is always enabled).""" + mock_read, mock_write, mock_get_session = mock_streams + + with patch('boto3.Session', return_value=mock_session): + with patch('mcp_proxy_for_aws.client.SigV4HTTPXAuth') as mock_auto_auth_cls: + with patch('mcp_proxy_for_aws.client.streamablehttp_client') as mock_stream_client: + mock_auto_auth = Mock() + mock_auto_auth_cls.return_value = mock_auto_auth + mock_stream_client.return_value.__aenter__ = AsyncMock( + return_value=(mock_read, mock_write, mock_get_session) + ) + mock_stream_client.return_value.__aexit__ = AsyncMock(return_value=None) + + async with aws_iam_streamablehttp_client( + endpoint='https://test.example.com/mcp', + aws_service='bedrock-agentcore', + aws_region='us-west-2', + ): + pass + + # Verify SigV4HTTPXAuth was created with correct parameters + mock_auto_auth_cls.assert_called_once_with( + mock_session.get_credentials.return_value, + 'bedrock-agentcore', + 'us-west-2', + ) + # Verify the auth handler was passed to streamablehttp_client + assert mock_stream_client.call_args[1]['auth'] is mock_auto_auth + + +@pytest.mark.asyncio +async def test_auto_detect_sigv4a_disabled(mock_session, mock_streams): + """Test that SigV4HTTPXAuth is always created (no disable option).""" + # This test is now redundant since auto-detection is always enabled + # Keeping it for backward compatibility but it tests the same as the above + mock_read, mock_write, mock_get_session = mock_streams + + with patch('boto3.Session', return_value=mock_session): + with patch('mcp_proxy_for_aws.client.SigV4HTTPXAuth') as mock_auto_auth_cls: + with patch('mcp_proxy_for_aws.client.streamablehttp_client') as mock_stream_client: + mock_auto_auth = Mock() + mock_auto_auth_cls.return_value = mock_auto_auth + mock_stream_client.return_value.__aenter__ = AsyncMock( + return_value=(mock_read, mock_write, mock_get_session) + ) + mock_stream_client.return_value.__aexit__ = AsyncMock(return_value=None) + + async with aws_iam_streamablehttp_client( + endpoint='https://test.example.com/mcp', + aws_service='bedrock-agentcore', + aws_region='us-west-2', + ): + pass + + # Verify SigV4HTTPXAuth was created with correct parameters + mock_auto_auth_cls.assert_called_once_with( + mock_session.get_credentials.return_value, + 'bedrock-agentcore', + 'us-west-2', + ) + # Verify the auth handler was passed to streamablehttp_client + assert mock_stream_client.call_args[1]['auth'] is mock_auto_auth + + +@pytest.mark.asyncio +async def test_auto_detect_sigv4a_default_behavior(mock_session, mock_streams): + """Test backward compatibility - default behavior uses auto-detection.""" + mock_read, mock_write, mock_get_session = mock_streams + + with patch('boto3.Session', return_value=mock_session): + with patch('mcp_proxy_for_aws.client.SigV4HTTPXAuth') as mock_auto_auth_cls: + with patch('mcp_proxy_for_aws.client.streamablehttp_client') as mock_stream_client: + mock_auto_auth = Mock() + mock_auto_auth_cls.return_value = mock_auto_auth + mock_stream_client.return_value.__aenter__ = AsyncMock( + return_value=(mock_read, mock_write, mock_get_session) + ) + mock_stream_client.return_value.__aexit__ = AsyncMock(return_value=None) + + # Call without specifying auto_detect_sigv4a parameter + async with aws_iam_streamablehttp_client( + endpoint='https://test.example.com/mcp', + aws_service='bedrock-agentcore', + aws_region='us-west-2', + ): + pass + + # Verify SigV4HTTPXAuth was created (default behavior) + mock_auto_auth_cls.assert_called_once_with( + mock_session.get_credentials.return_value, + 'bedrock-agentcore', + 'us-west-2', + ) + # Verify the auth handler was passed to streamablehttp_client + assert mock_stream_client.call_args[1]['auth'] is mock_auto_auth diff --git a/tests/unit/test_sigv4_helper.py b/tests/unit/test_sigv4_helper.py index 804e88fb..60f2c919 100644 --- a/tests/unit/test_sigv4_helper.py +++ b/tests/unit/test_sigv4_helper.py @@ -54,6 +54,394 @@ async def test_auth_flow_signs_request(self): assert 'X-Amz-Date' in signed_request.headers +class TestSigV4HTTPXAuthAutoDetect: + """Test cases for the SigV4HTTPXAuth class with auto-detection.""" + + def test_initialization_starts_with_sigv4(self): + """Test that SigV4HTTPXAuth starts with SigV4 signer.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Verify initialization + assert auth.credentials == mock_credentials + assert auth.service == 'test-service' + assert auth.region == 'us-west-2' + assert auth.use_sigv4a is False + assert auth.sigv4_signer is not None + assert auth.sigv4a_signer is None # Lazy initialization + + def test_lazy_initialization_of_sigv4a_signer(self): + """Test that SigV4A signer is lazily initialized.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Verify SigV4A signer is not initialized yet + assert auth.sigv4a_signer is None + + # Trigger lazy initialization by setting use_sigv4a + auth.use_sigv4a = True + # Note: actual initialization happens in auth_flow when needed + + def test_credential_handling(self): + """Test that credentials are properly stored and accessible.""" + # Create mock credentials with all attributes + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_session_token' + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Verify credentials are stored correctly + assert auth.credentials == mock_credentials + assert auth.credentials.access_key == 'test_access_key' + assert auth.credentials.secret_key == 'test_secret_key' + assert auth.credentials.token == 'test_session_token' + + @pytest.mark.asyncio + async def test_successful_sigv4_request_no_retry(self): + """Test that successful SigV4 request does not trigger retry.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + # Create a test request + request = httpx.Request('GET', 'https://example.com/test', headers={'Host': 'example.com'}) + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Mock successful response + success_response = httpx.Response( + status_code=200, + headers={'content-type': 'application/json'}, + content=b'{"success": true}', + request=request, + ) + + # Get signed request from auth flow + auth_flow = auth.auth_flow(request) + signed_request = next(auth_flow) + + # Verify request was signed with SigV4 + assert 'Authorization' in signed_request.headers + assert auth.use_sigv4a is False + + # Send the response back to auth flow + try: + auth_flow.send(success_response) + except StopIteration: + pass + + # Verify no retry occurred (still using SigV4) + assert auth.use_sigv4a is False + assert auth.sigv4a_signer is None + + @pytest.mark.asyncio + async def test_sigv4a_detection_from_403_error(self): + """Test that 403 error with SigV4A indicators triggers detection.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + # Create a test request + request = httpx.Request('GET', 'https://example.com/test', headers={'Host': 'example.com'}) + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Mock 403 response with SigV4A requirement + error_data = { + 'Code': 'SignatureDoesNotMatch', + 'Message': 'The request signature requires SigV4A for multi-region access' + } + error_response = httpx.Response( + status_code=403, + headers={'content-type': 'application/json'}, + content=json.dumps(error_data).encode(), + request=request, + ) + + # Mock successful response after retry + success_response = httpx.Response( + status_code=200, + headers={'content-type': 'application/json'}, + content=b'{"success": true}', + request=request, + ) + + # Get signed request from auth flow + auth_flow = auth.auth_flow(request) + signed_request = next(auth_flow) + + # Verify initial request was signed with SigV4 + assert auth.use_sigv4a is False + + # Send error response to trigger retry + with patch('mcp_proxy_for_aws.sigv4_helper.SIGV4A_AVAILABLE', True): + with patch('mcp_proxy_for_aws.sigv4_helper.SigV4AAuth') as mock_sigv4a_class: + mock_sigv4a_signer = Mock() + mock_sigv4a_class.return_value = mock_sigv4a_signer + + retry_request = auth_flow.send(error_response) + + # Verify SigV4A was detected and retry occurred + assert auth.use_sigv4a is True + assert auth.sigv4a_signer is not None + + # Send success response for retry + try: + auth_flow.send(success_response) + except StopIteration: + pass + + @pytest.mark.asyncio + async def test_automatic_retry_with_sigv4a(self): + """Test that automatic retry with SigV4A occurs after detection.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + # Create a test request + request = httpx.Request('GET', 'https://example.com/test', headers={'Host': 'example.com'}) + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Mock 403 response with SigV4A requirement + error_data = { + '__type': 'InvalidSignature', + 'message': 'This endpoint requires sigv4a authentication' + } + error_response = httpx.Response( + status_code=403, + headers={'content-type': 'application/json'}, + content=json.dumps(error_data).encode(), + request=request, + ) + + # Mock successful response after retry + success_response = httpx.Response( + status_code=200, + headers={'content-type': 'application/json'}, + content=b'{"success": true}', + request=request, + ) + + # Get signed request from auth flow + auth_flow = auth.auth_flow(request) + signed_request = next(auth_flow) + + # Send error response to trigger retry + with patch('mcp_proxy_for_aws.sigv4_helper.SIGV4A_AVAILABLE', True): + with patch('mcp_proxy_for_aws.sigv4_helper.SigV4AAuth') as mock_sigv4a_class: + mock_sigv4a_signer = Mock() + mock_sigv4a_class.return_value = mock_sigv4a_signer + + retry_request = auth_flow.send(error_response) + + # Verify retry request was generated + assert retry_request is not None + assert 'Authorization' in retry_request.headers + + # Verify SigV4A signer was initialized + mock_sigv4a_class.assert_called_once_with(mock_credentials, 'test-service', '*') + + @pytest.mark.asyncio + async def test_subsequent_requests_use_sigv4a_after_detection(self): + """Test that subsequent requests use SigV4A after detection.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + # Create auth instance + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Simulate that SigV4A has already been detected + with patch('mcp_proxy_for_aws.sigv4_helper.SIGV4A_AVAILABLE', True): + with patch('mcp_proxy_for_aws.sigv4_helper.SigV4AAuth') as mock_sigv4a_class: + mock_sigv4a_signer = Mock() + + # Mock the add_auth method to add headers + def mock_add_auth(aws_request): + aws_request.headers['Authorization'] = 'AWS4-ECDSA-P256-SHA256 Credential=...' + aws_request.headers['X-Amz-Date'] = '20240101T000000Z' + + mock_sigv4a_signer.add_auth = mock_add_auth + mock_sigv4a_class.return_value = mock_sigv4a_signer + + auth.use_sigv4a = True + auth.sigv4a_signer = mock_sigv4a_signer + + # Create a new request + request = httpx.Request('GET', 'https://example.com/test2', headers={'Host': 'example.com'}) + + # Mock successful response + success_response = httpx.Response( + status_code=200, + headers={'content-type': 'application/json'}, + content=b'{"success": true}', + request=request, + ) + + # Get signed request from auth flow + auth_flow = auth.auth_flow(request) + signed_request = next(auth_flow) + + # Verify request was signed (SigV4A should be used) + assert 'Authorization' in signed_request.headers + + # Send success response + try: + auth_flow.send(success_response) + except StopIteration: + pass + + # Verify SigV4A is still being used + assert auth.use_sigv4a is True + + def test_requires_sigv4a_with_signature_error(self): + """Test _requires_sigv4a() detects SigV4A requirement from error response.""" + # Create mock credentials + mock_credentials = Mock() + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Create request + request = httpx.Request('GET', 'https://example.com/test') + + # Test with SignatureDoesNotMatch error and sigv4a hint + error_data = { + 'Code': 'SignatureDoesNotMatch', + 'Message': 'This endpoint requires sigv4a for multi-region access' + } + response = httpx.Response( + status_code=403, + headers={'content-type': 'application/json'}, + content=json.dumps(error_data).encode(), + request=request, + ) + assert auth._requires_sigv4a(response) is True + + # Test with InvalidSignature error and sigv4a hint + error_data = { + '__type': 'InvalidSignature', + 'message': 'Please use SigV4A authentication' + } + response = httpx.Response( + status_code=403, + headers={'content-type': 'application/json'}, + content=json.dumps(error_data).encode(), + request=request, + ) + assert auth._requires_sigv4a(response) is True + + def test_requires_sigv4a_returns_false_for_non_sigv4a_errors(self): + """Test _requires_sigv4a() returns False for non-SigV4A errors.""" + # Create mock credentials + mock_credentials = Mock() + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Create request + request = httpx.Request('GET', 'https://example.com/test') + + # Test with 403 but no SigV4A indicators + error_data = { + 'Code': 'AccessDenied', + 'Message': 'User is not authorized' + } + response = httpx.Response( + status_code=403, + headers={'content-type': 'application/json'}, + content=json.dumps(error_data).encode(), + request=request, + ) + assert auth._requires_sigv4a(response) is False + + # Test with 404 error + response = httpx.Response( + status_code=404, + headers={'content-type': 'application/json'}, + content=b'{"error": "Not Found"}', + request=request, + ) + assert auth._requires_sigv4a(response) is False + + # Test with 500 error + response = httpx.Response( + status_code=500, + headers={'content-type': 'text/plain'}, + content=b'Internal Server Error', + request=request, + ) + assert auth._requires_sigv4a(response) is False + + # Test with SignatureDoesNotMatch but no sigv4a hint + error_data = { + 'Code': 'SignatureDoesNotMatch', + 'Message': 'The signature does not match' + } + response = httpx.Response( + status_code=403, + headers={'content-type': 'application/json'}, + content=json.dumps(error_data).encode(), + request=request, + ) + assert auth._requires_sigv4a(response) is False + + def test_sign_request_removes_connection_header(self): + """Test _sign_request() removes connection header.""" + # Create mock credentials + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + + auth = SigV4HTTPXAuth(mock_credentials, 'test-service', 'us-west-2') + + # Create request with connection header + request = httpx.Request( + 'GET', + 'https://example.com/test', + headers={ + 'Host': 'example.com', + 'Connection': 'keep-alive', + 'User-Agent': 'test-agent' + } + ) + + # Sign the request + signed_request = auth._sign_request(request, auth.sigv4_signer) + + # Verify connection header was removed from signing + # (The original request headers should still have it, but it shouldn't be in the signature) + assert 'Authorization' in signed_request.headers + assert 'X-Amz-Date' in signed_request.headers + + # Verify the request was signed (has signature headers) + assert signed_request is not None + + class TestHandleErrorResponse: """Test cases for the _handle_error_response function.""" @@ -218,7 +606,7 @@ def test_create_sigv4_auth_default(self, mock_create_session): # Test auth creation result = create_sigv4_auth('test-service', 'test-region') - # Verify auth was created correctly + # Verify auth was created correctly (with auto-detection enabled by default) assert isinstance(result, SigV4HTTPXAuth) assert result.service == 'test-service' assert result.region == 'test-region' # default region @@ -239,12 +627,105 @@ def test_create_sigv4_auth_with_explicit_region(self, mock_create_session): # Test auth creation with explicit region result = create_sigv4_auth('test-service', region='ap-southeast-1') - # Verify auth was created with explicit region + # Verify auth was created with explicit region (with auto-detection enabled by default) assert isinstance(result, SigV4HTTPXAuth) assert result.service == 'test-service' assert result.region == 'ap-southeast-1' assert result.credentials == mock_credentials + @patch('mcp_proxy_for_aws.sigv4_helper.SIGV4A_AVAILABLE', True) + @patch('mcp_proxy_for_aws.sigv4_helper.create_aws_session') + def test_create_sigv4_auth_returns_auto_detect_when_enabled(self, mock_create_session): + """Test that create_sigv4_auth returns SigV4HTTPXAuth by default.""" + # Mock session and credentials + mock_session = Mock() + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + mock_session.get_credentials.return_value = mock_credentials + mock_create_session.return_value = mock_session + + # Test auth creation (auto-detection is always enabled) + result = create_sigv4_auth('test-service', 'us-west-2') + + # Verify SigV4HTTPXAuth is returned + assert isinstance(result, SigV4HTTPXAuth) + assert result.service == 'test-service' + assert result.region == 'us-west-2' + assert result.credentials == mock_credentials + assert result.use_sigv4a is False # Starts with SigV4 + assert result.sigv4_signer is not None + assert result.sigv4a_signer is None # Lazy initialization + + @patch('mcp_proxy_for_aws.sigv4_helper.create_aws_session') + def test_create_sigv4_auth_returns_sigv4_when_disabled(self, mock_create_session): + """Test that create_sigv4_auth always returns SigV4HTTPXAuth (no disable option).""" + # This test is now redundant since auto-detection is always enabled + # Keeping it for backward compatibility but it tests the same as the above + mock_session = Mock() + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + mock_session.get_credentials.return_value = mock_credentials + mock_create_session.return_value = mock_session + + # Test auth creation + result = create_sigv4_auth('test-service', 'us-west-2') + + # Verify SigV4HTTPXAuth is returned (always enabled now) + assert isinstance(result, SigV4HTTPXAuth) + assert result.service == 'test-service' + assert result.region == 'us-west-2' + assert result.credentials == mock_credentials + + @patch('mcp_proxy_for_aws.sigv4_helper.SIGV4A_AVAILABLE', False) + @patch('mcp_proxy_for_aws.sigv4_helper.create_aws_session') + def test_create_sigv4_auth_falls_back_when_sigv4a_unavailable(self, mock_create_session): + """Test that create_sigv4_auth falls back to SigV4HTTPXAuth when SigV4A is unavailable.""" + # Mock session and credentials + mock_session = Mock() + mock_credentials = Mock() + mock_credentials.access_key = 'test_access_key' + mock_credentials.secret_key = 'test_secret_key' + mock_credentials.token = 'test_token' + mock_session.get_credentials.return_value = mock_credentials + mock_create_session.return_value = mock_session + + # Test auth creation when SigV4A is unavailable + result = create_sigv4_auth('test-service', 'us-west-2') + + # Verify SigV4HTTPXAuth is returned as fallback + assert isinstance(result, SigV4HTTPXAuth) + assert result.service == 'test-service' + assert result.region == 'us-west-2' + assert result.credentials == mock_credentials + + @patch('mcp_proxy_for_aws.sigv4_helper.create_aws_session') + def test_create_sigv4_auth_credential_handling(self, mock_create_session): + """Test that create_sigv4_auth properly handles credentials.""" + # Mock session and credentials with all attributes + mock_session = Mock() + mock_credentials = Mock() + mock_credentials.access_key = 'AKIAIOSFODNN7EXAMPLE' + mock_credentials.secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' + mock_credentials.token = 'test_session_token_12345' + mock_session.get_credentials.return_value = mock_credentials + mock_create_session.return_value = mock_session + + # Test auth creation with profile + result = create_sigv4_auth('test-service', 'eu-west-1', profile='test-profile') + + # Verify credentials are properly passed through + assert result.credentials == mock_credentials + assert result.credentials.access_key == 'AKIAIOSFODNN7EXAMPLE' + assert result.credentials.secret_key == 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' + assert result.credentials.token == 'test_session_token_12345' + + # Verify session was created with profile + mock_create_session.assert_called_once_with('test-profile') + class TestCreateSigv4Client: """Test cases for the create_sigv4_client function.""" @@ -262,7 +743,7 @@ def test_create_sigv4_client_default(self, mock_client_class, mock_create_auth): # Test client creation result = create_sigv4_client(service='test-service', region='test-region') - # Verify client was created correctly + # Verify client was created correctly (auto-detection is always enabled) mock_create_auth.assert_called_once_with('test-service', 'test-region', None) # Check that AsyncClient was called with correct parameters @@ -316,7 +797,7 @@ def test_create_sigv4_client_with_custom_service_and_region( service='custom-service', profile='test-profile', region='us-east-1' ) - # Verify auth was created with custom parameters + # Verify auth was created with custom parameters (auto-detection is always enabled) mock_create_auth.assert_called_once_with('custom-service', 'us-east-1', 'test-profile') assert result == mock_client @@ -369,7 +850,7 @@ def test_create_sigv4_client_with_prompt_context(self, mock_client_class, mock_c service='test-service', headers=prompt_context_headers, region='us-west-2' ) - # Verify client was created correctly with prompt context + # Verify client was created correctly with prompt context (auto-detection is always enabled) mock_create_auth.assert_called_once_with('test-service', 'us-west-2', None) # Check that AsyncClient was called with correct parameters including prompt headers diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 4c18e339..01347f96 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -20,6 +20,7 @@ create_transport_with_sigv4, determine_aws_region, determine_service_name, + is_global_endpoint, ) from unittest.mock import MagicMock, patch @@ -225,3 +226,85 @@ def test_determine_region_from_environment(self, mock_getenv): # Assert assert result == 'us-west-1' mock_getenv.assert_called_once_with('AWS_REGION') + + @patch('os.getenv') + def test_determine_region_global_endpoint_with_dot_global(self, mock_getenv): + """Test determination for global endpoint with .global. pattern.""" + endpoint = 'https://service.global.api.aws/mcp' + mock_getenv.return_value = None + + result = determine_aws_region(endpoint, None) + + assert result == 'us-east-1' + # Environment variable should not be checked when global endpoint is detected + + @patch('os.getenv') + def test_determine_region_global_endpoint_with_global_subdomain(self, mock_getenv): + """Test determination for global endpoint with global. subdomain.""" + endpoint = 'https://global.service.api.aws/mcp' + mock_getenv.return_value = None + + result = determine_aws_region(endpoint, None) + + assert result == 'us-east-1' + + @patch('os.getenv') + def test_determine_region_global_endpoint_without_region_pattern(self, mock_getenv): + """Test determination for global endpoint without region pattern.""" + endpoint = 'https://service.api.aws/mcp' + mock_getenv.return_value = None + + result = determine_aws_region(endpoint, None) + + assert result == 'us-east-1' + + @patch('os.getenv') + def test_determine_region_explicit_region_overrides_global(self, mock_getenv): + """Test that explicit region parameter takes precedence over global endpoint detection.""" + endpoint = 'https://service.global.api.aws/mcp' + region = 'eu-west-1' + mock_getenv.return_value = None + + result = determine_aws_region(endpoint, region) + + assert result == 'eu-west-1' + mock_getenv.assert_not_called() + + +class TestIsGlobalEndpoint: + """Test cases for is_global_endpoint function.""" + + def test_is_global_endpoint_with_dot_global(self): + """Test detection of .global. pattern in URLs.""" + endpoint = 'https://service.global.api.aws/mcp' + assert is_global_endpoint(endpoint) is True + + def test_is_global_endpoint_with_global_subdomain(self): + """Test detection of global. subdomain.""" + endpoint = 'https://global.service.api.aws/mcp' + assert is_global_endpoint(endpoint) is True + + def test_is_global_endpoint_without_region_in_api_aws(self): + """Test detection of .api.aws without region.""" + endpoint = 'https://service.api.aws/mcp' + assert is_global_endpoint(endpoint) is True + + def test_is_global_endpoint_regional_endpoint_returns_false(self): + """Test that regional endpoint patterns return False.""" + endpoint = 'https://service.us-east-1.api.aws/mcp' + assert is_global_endpoint(endpoint) is False + + def test_is_global_endpoint_non_aws_endpoint_returns_false(self): + """Test that non-AWS endpoints return False.""" + endpoint = 'https://service.example.com/mcp' + assert is_global_endpoint(endpoint) is False + + def test_is_global_endpoint_regional_with_complex_service_name(self): + """Test that regional endpoints with complex service names return False.""" + endpoint = 'https://my-service-beta.us-west-2.api.aws/mcp' + assert is_global_endpoint(endpoint) is False + + def test_is_global_endpoint_empty_hostname(self): + """Test handling of endpoint with empty hostname.""" + endpoint = 'https://' + assert is_global_endpoint(endpoint) is False