forked from aws/mcp-proxy-for-aws
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproxy.py
More file actions
127 lines (103 loc) · 4.91 KB
/
proxy.py
File metadata and controls
127 lines (103 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import httpx
import logging
from fastmcp import Client
from fastmcp.client.transports import ClientTransport
from fastmcp.server.providers.proxy import ProxyClient as _ProxyClient
from mcp import McpError
from mcp.types import InitializeRequest, JSONRPCError, JSONRPCMessage
from typing_extensions import override
logger = logging.getLogger(__name__)
class AWSMCPProxyClient(_ProxyClient):
"""Proxy client that handles HTTP errors when connection fails."""
def __init__(self, transport: ClientTransport, max_connect_retry=3, **kwargs):
"""Constructor of AWSMCPProxyClient."""
super().__init__(transport, **kwargs)
self._max_connect_retry = max_connect_retry
@override
async def _connect(self, retry=0):
"""Enter as normal && initialize only once."""
logger.debug('Connecting %s', self)
try:
result = await super(AWSMCPProxyClient, self)._connect()
logger.debug('Connected %s', self)
return result
except httpx.HTTPStatusError as http_error:
logger.exception('Connection failed')
response = http_error.response
try:
body = await response.aread()
jsonrpc_msg = JSONRPCMessage.model_validate_json(body).root
except Exception as e:
logger.debug('HTTP error is not a valid MCP message.', exc_info=e)
raise http_error
if isinstance(jsonrpc_msg, JSONRPCError):
logger.debug('Converting HTTP error to MCP error', exc_info=http_error)
# raising McpError so that the sdk can handle the exception properly
raise McpError(error=jsonrpc_msg.error) from http_error
else:
raise http_error
except RuntimeError as e:
if isinstance(e.__cause__, McpError):
raise e.__cause__
if retry > self._max_connect_retry:
raise e
try:
logger.warning('encountered runtime error, try force disconnect.', exc_info=e)
await self._disconnect(force=True)
except httpx.TimeoutException:
# _disconnect awaits on the session_task,
# which raises the timeout error that caused the client session to be terminated.
# the error is ignored as long as the counter is force set to 0.
# TODO: investigate how timeout error is handled by fastmcp and httpx
logger.exception(
'Session was terminated due to timeout error, ignore and reconnect'
)
return await self._connect(retry + 1)
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""The MCP Proxy for AWS project is a proxy from stdio to http (sigv4).
We want the client to remain connected until the stdio connection is closed.
https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#stdio
1. close stdin
2. terminate subprocess
There is no equivalent of the streamble-http DELETE concept in stdio to terminate a session.
Hence the connection will be terminated only at program exit.
"""
pass
class AWSMCPProxyClientFactory:
"""Client factory that returns a connected client."""
def __init__(self, transport: ClientTransport) -> None:
"""Initialize a client factory with transport."""
self._transport = transport
self._client: AWSMCPProxyClient | None = None
self._initialize_request: InitializeRequest | None = None
def set_init_params(self, initialize_request: InitializeRequest):
"""Set client init parameters."""
self._initialize_request = initialize_request
async def get_client(self) -> Client:
"""Get client."""
if self._client is None:
self._client = AWSMCPProxyClient(self._transport)
return self._client
async def __call__(self) -> Client:
"""Implement the callable factory interface."""
return await self.get_client()
async def disconnect(self):
"""Disconnect all the clients (no throw)."""
try:
if self._client:
await self._client._disconnect(force=True)
except Exception:
logger.exception('Failed to disconnect client.')