Skip to content

Commit fbe06f8

Browse files
committed
support streamable http transport
1 parent cfe1373 commit fbe06f8

File tree

4 files changed

+525
-19
lines changed

4 files changed

+525
-19
lines changed

fastapi_mcp/server.py

Lines changed: 141 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from fastapi_mcp.openapi.convert import convert_openapi_to_mcp_tools
1212
from fastapi_mcp.transport.sse import FastApiSseTransport
13+
from fastapi_mcp.transport.http import FastApiStreamableHttpTransport
1314
from fastapi_mcp.types import HTTPRequestInfo, AuthConfig
1415

1516
import logging
@@ -35,7 +36,7 @@ def decorator(
3536

3637
async def handler(req: types.CallToolRequest):
3738
try:
38-
# Pull the original HTTP request info from the MCP message. It was injected in
39+
# HACK: Pull the original HTTP request info from the MCP message. It was injected in
3940
# `FastApiSseTransport.handle_fastapi_post_message()`
4041
if hasattr(req.params, "_http_request_info") and req.params._http_request_info is not None:
4142
http_request_info = HTTPRequestInfo.model_validate(req.params._http_request_info)
@@ -241,6 +242,32 @@ def _register_mcp_endpoints_sse(
241242
self._register_mcp_connection_endpoint_sse(router, transport, mount_path, dependencies)
242243
self._register_mcp_messages_endpoint_sse(router, transport, mount_path, dependencies)
243244

245+
def _register_mcp_http_endpoint(
246+
self,
247+
router: FastAPI | APIRouter,
248+
transport: FastApiStreamableHttpTransport,
249+
mount_path: str,
250+
dependencies: Optional[Sequence[params.Depends]],
251+
):
252+
@router.api_route(
253+
mount_path,
254+
methods=["GET", "POST", "DELETE"],
255+
include_in_schema=False,
256+
operation_id="mcp_http",
257+
dependencies=dependencies,
258+
)
259+
async def handle_mcp_streamable_http(request: Request):
260+
return await transport.handle_fastapi_request(request)
261+
262+
def _register_mcp_endpoints_http(
263+
self,
264+
router: FastAPI | APIRouter,
265+
transport: FastApiStreamableHttpTransport,
266+
mount_path: str,
267+
dependencies: Optional[Sequence[params.Depends]],
268+
):
269+
self._register_mcp_http_endpoint(router, transport, mount_path, dependencies)
270+
244271
def _setup_auth_2025_03_26(self):
245272
from fastapi_mcp.auth.proxy import (
246273
setup_oauth_custom_metadata,
@@ -296,7 +323,7 @@ def _setup_auth(self):
296323
else:
297324
logger.info("No auth config provided, skipping auth setup")
298325

299-
def mount(
326+
def mount_http(
300327
self,
301328
router: Annotated[
302329
Optional[FastAPI | APIRouter],
@@ -317,17 +344,64 @@ def mount(
317344
"""
318345
),
319346
] = "/mcp",
320-
transport: Annotated[
321-
Literal["sse"],
347+
) -> None:
348+
"""
349+
Mount the MCP server with HTTP transport to **any** FastAPI app or APIRouter.
350+
351+
There is no requirement that the FastAPI app or APIRouter is the same as the one that the MCP
352+
server was created from.
353+
"""
354+
# Normalize mount path
355+
if not mount_path.startswith("/"):
356+
mount_path = f"/{mount_path}"
357+
if mount_path.endswith("/"):
358+
mount_path = mount_path[:-1]
359+
360+
if not router:
361+
router = self.fastapi
362+
363+
assert isinstance(router, (FastAPI, APIRouter)), f"Invalid router type: {type(router)}"
364+
365+
http_transport = FastApiStreamableHttpTransport()
366+
dependencies = self._auth_config.dependencies if self._auth_config else None
367+
368+
self._register_mcp_endpoints_http(router, http_transport, mount_path, dependencies)
369+
self._setup_auth()
370+
371+
# HACK: If we got a router and not a FastAPI instance, we need to re-include the router so that
372+
# FastAPI will pick up the new routes we added. The problem with this approach is that we assume
373+
# that the router is a sub-router of self.fastapi, which may not always be the case.
374+
#
375+
# TODO: Find a better way to do this.
376+
if isinstance(router, APIRouter):
377+
self.fastapi.include_router(router)
378+
379+
logger.info(f"MCP HTTP server listening at {mount_path}")
380+
381+
def mount_sse(
382+
self,
383+
router: Annotated[
384+
Optional[FastAPI | APIRouter],
322385
Doc(
323386
"""
324-
The transport type for the MCP server. Currently only 'sse' is supported.
387+
The FastAPI app or APIRouter to mount the MCP server to. If not provided, the MCP
388+
server will be mounted to the FastAPI app.
325389
"""
326390
),
327-
] = "sse",
391+
] = None,
392+
mount_path: Annotated[
393+
str,
394+
Doc(
395+
"""
396+
Path where the MCP server will be mounted.
397+
Mount path is appended to the root path of FastAPI router, or to the prefix of APIRouter.
398+
Defaults to '/sse'.
399+
"""
400+
),
401+
] = "/sse",
328402
) -> None:
329403
"""
330-
Mount the MCP server to **any** FastAPI app or APIRouter.
404+
Mount the MCP server with SSE transport to **any** FastAPI app or APIRouter.
331405
332406
There is no requirement that the FastAPI app or APIRouter is the same as the one that the MCP
333407
server was created from.
@@ -347,14 +421,9 @@ def mount(
347421
messages_path = f"{base_path}/messages/"
348422

349423
sse_transport = FastApiSseTransport(messages_path)
350-
351424
dependencies = self._auth_config.dependencies if self._auth_config else None
352425

353-
if transport == "sse":
354-
self._register_mcp_endpoints_sse(router, sse_transport, mount_path, dependencies)
355-
else: # pragma: no cover
356-
raise ValueError(f"Invalid transport: {transport}") # pragma: no cover
357-
426+
self._register_mcp_endpoints_sse(router, sse_transport, mount_path, dependencies)
358427
self._setup_auth()
359428

360429
# HACK: If we got a router and not a FastAPI instance, we need to re-include the router so that
@@ -365,7 +434,65 @@ def mount(
365434
if isinstance(router, APIRouter):
366435
self.fastapi.include_router(router)
367436

368-
logger.info(f"MCP server listening at {mount_path}")
437+
logger.info(f"MCP SSE server listening at {mount_path}")
438+
439+
def mount(
440+
self,
441+
router: Annotated[
442+
Optional[FastAPI | APIRouter],
443+
Doc(
444+
"""
445+
The FastAPI app or APIRouter to mount the MCP server to. If not provided, the MCP
446+
server will be mounted to the FastAPI app.
447+
"""
448+
),
449+
] = None,
450+
mount_path: Annotated[
451+
str,
452+
Doc(
453+
"""
454+
Path where the MCP server will be mounted.
455+
Mount path is appended to the root path of FastAPI router, or to the prefix of APIRouter.
456+
Defaults to '/mcp'.
457+
"""
458+
),
459+
] = "/mcp",
460+
transport: Annotated[
461+
Literal["sse"],
462+
Doc(
463+
"""
464+
The transport type for the MCP server. Currently only 'sse' is supported.
465+
This parameter is deprecated.
466+
"""
467+
),
468+
] = "sse",
469+
) -> None:
470+
"""
471+
[DEPRECATED] Mount the MCP server to **any** FastAPI app or APIRouter.
472+
473+
This method is deprecated and will be removed in a future version.
474+
Use mount_http() for HTTP transport (recommended) or mount_sse() for SSE transport instead.
475+
476+
For backwards compatibility, this method defaults to SSE transport.
477+
478+
There is no requirement that the FastAPI app or APIRouter is the same as the one that the MCP
479+
server was created from.
480+
"""
481+
import warnings
482+
483+
warnings.warn(
484+
"mount() is deprecated and will be removed in a future version. "
485+
"Use mount_http() for HTTP transport (recommended) or mount_sse() for SSE transport instead.",
486+
DeprecationWarning,
487+
stacklevel=2,
488+
)
489+
490+
if transport == "sse":
491+
self.mount_sse(router, mount_path)
492+
else: # pragma: no cover
493+
raise ValueError( # pragma: no cover
494+
f"Unsupported transport: {transport}. Use mount_sse() or mount_http() instead."
495+
)
369496

370497
async def _execute_api_tool(
371498
self,

fastapi_mcp/transport/http.py

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import logging
2+
import json
3+
4+
from fastapi import Request, Response, HTTPException
5+
from mcp.server.streamable_http import StreamableHTTPServerTransport
6+
from mcp.server.transport_security import TransportSecuritySettings
7+
from mcp.types import JSONRPCMessage
8+
from pydantic import ValidationError
9+
from fastapi_mcp.types import HTTPRequestInfo
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class FastApiStreamableHttpTransport(StreamableHTTPServerTransport):
15+
def __init__(
16+
self,
17+
mcp_session_id: str | None = None,
18+
is_json_response_enabled: bool = True, # Default to JSON for HTTP transport
19+
event_store=None,
20+
security_settings: TransportSecuritySettings | None = None,
21+
):
22+
super().__init__(
23+
mcp_session_id=mcp_session_id,
24+
is_json_response_enabled=is_json_response_enabled,
25+
event_store=event_store,
26+
security_settings=security_settings,
27+
)
28+
logger.debug(f"FastApiStreamableHttpTransport initialized with session_id: {mcp_session_id}")
29+
30+
async def handle_fastapi_request(self, request: Request) -> Response:
31+
"""
32+
FastAPI-native request handler that adapts the SDK's handle_request method.
33+
34+
The approach here is necessarily different from FastApiSseTransport.
35+
In FastApiSseTransport, we reimplement the SSE transport logic to have a more FastAPI-native transport.
36+
It proved to be less bug-prone since it avoids deconstructing and reconstructing raw ASGI objects.
37+
38+
But, we took a different approach here because StreamableHTTPServerTransport handles more complexity,
39+
and multiple request methods (GET/POST/DELETE), so we want to leverage that logic and avoid reimplementing.
40+
41+
So we use an enhanced adapter pattern: intercept and enhance POST requests for HTTPRequestInfo injection,
42+
while delegating the complex protocol handling to the SDK.
43+
"""
44+
logger.debug(f"Handling FastAPI request: {request.method} {request.url.path}")
45+
46+
if request.method == "POST":
47+
return await self._handle_post_with_injection(request)
48+
else:
49+
# For GET and DELETE requests, delegate directly to SDK since they don't need injection
50+
return await self._delegate_to_sdk(request)
51+
52+
async def _handle_post_with_injection(self, request: Request) -> Response:
53+
"""
54+
Handle POST requests with HTTPRequestInfo injection.
55+
56+
This mirrors the approach in FastApiSseTransport.handle_fastapi_post_message()
57+
to ensure consistency in how we handle authentication context and header forwarding.
58+
59+
The injection happens at the JSON-RPC message level, just like in SSE transport,
60+
so that the downstream tool handlers receive the same request context regardless
61+
of transport type.
62+
"""
63+
try:
64+
# Read and parse the request body first, just like SSE transport does
65+
body = await request.body()
66+
logger.debug(f"Received JSON: {body.decode()}")
67+
68+
try:
69+
raw_message = json.loads(body)
70+
except json.JSONDecodeError as e:
71+
logger.error(f"Failed to parse JSON: {e}")
72+
raise HTTPException(status_code=400, detail=f"Parse error: {str(e)}")
73+
74+
try:
75+
message = JSONRPCMessage.model_validate(raw_message)
76+
except ValidationError as e:
77+
logger.error(f"Failed to validate message: {e}")
78+
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
79+
80+
# HACK to inject the HTTP request info into the MCP message,
81+
# so we can use it for auth.
82+
# It is then used in our custom `LowlevelMCPServer.call_tool()` decorator.
83+
if hasattr(message.root, "params") and message.root.params is not None:
84+
message.root.params["_http_request_info"] = HTTPRequestInfo(
85+
method=request.method,
86+
path=request.url.path,
87+
headers=dict(request.headers),
88+
cookies=request.cookies,
89+
query_params=dict(request.query_params),
90+
body=body.decode(),
91+
).model_dump(mode="json")
92+
logger.debug("Injected HTTPRequestInfo into message for auth context")
93+
94+
modified_body = message.model_dump_json(by_alias=True, exclude_none=True).encode()
95+
modified_request = self._create_modified_request(request, modified_body)
96+
97+
# Delegate to SDK with the modified request
98+
return await self._delegate_to_sdk(modified_request)
99+
100+
except HTTPException:
101+
# Re-raise FastAPI HTTPExceptions directly for proper error handling
102+
raise
103+
except Exception:
104+
logger.exception("Error processing POST request")
105+
raise HTTPException(status_code=500, detail="Internal server error")
106+
107+
def _create_modified_request(self, original_request: Request, modified_body: bytes) -> Request:
108+
"""
109+
Create a new Request object with modified body content.
110+
111+
This is necessary because we need to inject HTTPRequestInfo into the JSON-RPC message
112+
before passing it to the SDK, but Request objects are immutable.
113+
"""
114+
115+
# Create a new receive callable that returns our modified body
116+
async def modified_receive():
117+
return {
118+
"type": "http.request",
119+
"body": modified_body,
120+
"more_body": False,
121+
}
122+
123+
# Create new request with modified receive
124+
return Request(original_request.scope, modified_receive)
125+
126+
async def _delegate_to_sdk(self, request: Request) -> Response:
127+
"""
128+
Delegate request handling to the underlying StreamableHTTPServerTransport.
129+
130+
This captures the ASGI response from the SDK and converts it to a FastAPI Response,
131+
maintaining the adapter pattern while providing FastAPI-native integration.
132+
"""
133+
# Capture the response from the SDK's handle_request method
134+
response_started = False
135+
response_status = 200
136+
response_headers = []
137+
response_body = b""
138+
139+
async def capture_send(message):
140+
nonlocal response_started, response_status, response_headers, response_body
141+
142+
if message["type"] == "http.response.start":
143+
response_started = True
144+
response_status = message["status"]
145+
response_headers = message.get("headers", [])
146+
elif message["type"] == "http.response.body":
147+
response_body += message.get("body", b"")
148+
149+
try:
150+
# Delegate to the SDK's handle_request method with ASGI interface
151+
await self.handle_request(request.scope, request.receive, capture_send)
152+
153+
# Convert the captured ASGI response to a FastAPI Response
154+
headers_dict = {name.decode(): value.decode() for name, value in response_headers}
155+
156+
return Response(
157+
content=response_body,
158+
status_code=response_status,
159+
headers=headers_dict,
160+
)
161+
162+
except Exception as e:
163+
logger.exception(f"Error in StreamableHTTPServerTransport: {e}")
164+
raise HTTPException(status_code=500, detail="Internal server error")

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ dependencies = [
2929
"fastapi>=0.100.0",
3030
"typer>=0.9.0",
3131
"rich>=13.0.0",
32-
"mcp>=1.8.1",
32+
"mcp>=1.12.0",
3333
"pydantic>=2.0.0",
3434
"pydantic-settings>=2.5.2",
3535
"uvicorn>=0.20.0",
@@ -49,6 +49,7 @@ dev = [
4949
"pre-commit>=4.2.0",
5050
"pyjwt>=2.10.1",
5151
"cryptography>=44.0.2",
52+
"types-jsonschema>=4.25.0.20250720",
5253
]
5354

5455
[project.urls]

0 commit comments

Comments
 (0)