Skip to content

Commit 9a2b35e

Browse files
committed
fix(streamable-http): reject duplicate in-flight request ids
The transport keys per-request routing by request id and assigned the slot without checking for an existing entry, so a second concurrent POST with the same id silently overwrote the first request's routing slot. One request received the other's payload and the other hung. Reject a POST whose request id is already in flight on the session with HTTP 400 and JSON-RPC -32600. Ids may still be reused once the earlier request completes, which deployed clients rely on. Fixes #3060.
1 parent 53117cb commit 9a2b35e

2 files changed

Lines changed: 146 additions & 0 deletions

File tree

src/mcp/server/streamable_http.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,20 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
549549

550550
request_id = str(message.id)
551551

552+
# Reject duplicate in-flight request ids: `_request_streams` is keyed by
553+
# request id, so a second concurrent request with the same id would
554+
# silently overwrite the first one's routing slot and cross-wire their
555+
# responses (one request receives the other's payload, the other hangs).
556+
# The spec requires ids to be unique within a session; ids may still be
557+
# reused once the earlier request has completed. See #3060.
558+
if request_id in self._request_streams:
559+
response = self._create_error_response(
560+
f"Bad Request: Request id {request_id} is already in flight for this session",
561+
HTTPStatus.BAD_REQUEST,
562+
)
563+
await response(scope, receive, send)
564+
return
565+
552566
if self.is_json_response_enabled:
553567
self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](
554568
REQUEST_STREAM_BUFFER_SIZE

tests/shared/test_streamable_http.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,14 @@ def first_sse_data(response: httpx.Response) -> dict[str, Any]:
9494
raise ValueError("No data event in SSE response") # pragma: no cover
9595

9696

97+
async def next_sse_data(lines: AsyncIterator[str]) -> dict[str, Any]:
98+
"""Return the next SSE `data:` payload from a live line iterator, parsed as JSON."""
99+
while True:
100+
line = await anext(lines)
101+
if line.startswith("data: "):
102+
return json.loads(line.removeprefix("data: "))
103+
104+
97105
def extract_protocol_version_from_sse(response: httpx.Response) -> str:
98106
"""Extract the negotiated protocol version from an SSE initialization response."""
99107
return first_sse_data(response)["result"]["protocolVersion"]
@@ -680,6 +688,130 @@ async def test_response(basic_app: Starlette) -> None:
680688
assert tools_response.headers.get("Content-Type") == "text/event-stream"
681689

682690

691+
@pytest.mark.anyio
692+
async def test_duplicate_in_flight_request_id_rejected(basic_app: Starlette) -> None:
693+
"""A request whose id is already in flight on the session is rejected with 400.
694+
695+
The per-request routing in the transport is keyed by request id, so a second
696+
concurrent request with the same id would overwrite the in-flight request's
697+
routing slot and cross-wire the two responses (see #3060). The duplicate is
698+
rejected and the in-flight request completes unaffected.
699+
"""
700+
async with make_client(basic_app) as client:
701+
response = await client.post(
702+
"/mcp",
703+
headers={
704+
"Accept": "application/json, text/event-stream",
705+
"Content-Type": "application/json",
706+
},
707+
json=INIT_REQUEST,
708+
)
709+
assert response.status_code == 200
710+
headers = {
711+
"Accept": "application/json, text/event-stream",
712+
"Content-Type": "application/json",
713+
MCP_SESSION_ID_HEADER: response.headers[MCP_SESSION_ID_HEADER],
714+
MCP_PROTOCOL_VERSION_HEADER: extract_protocol_version_from_sse(response),
715+
}
716+
717+
# Request A blocks server-side on the lock, keeping its id in flight.
718+
async with client.stream(
719+
"POST",
720+
"/mcp",
721+
headers=headers,
722+
json={
723+
"jsonrpc": "2.0",
724+
"id": 1,
725+
"method": "tools/call",
726+
"params": {"name": "wait_for_lock_with_notification", "arguments": {}},
727+
},
728+
) as response_a:
729+
assert response_a.status_code == 200
730+
lines_a = response_a.aiter_lines()
731+
# The tool's first notification confirms request A is in flight.
732+
with anyio.fail_after(5):
733+
notification = await next_sse_data(lines_a)
734+
assert notification["params"]["data"] == "First notification before lock"
735+
736+
# A second request reusing id 1 while A is in flight is rejected.
737+
response_b = await client.post(
738+
"/mcp",
739+
headers=headers,
740+
json={
741+
"jsonrpc": "2.0",
742+
"id": 1,
743+
"method": "tools/call",
744+
"params": {"name": "test_tool", "arguments": {}},
745+
},
746+
)
747+
assert response_b.status_code == 400
748+
error = response_b.json()["error"]
749+
assert error["code"] == INVALID_REQUEST
750+
assert "already in flight" in error["message"]
751+
752+
# Request A is unaffected: release the lock and it completes normally.
753+
release_response = await client.post(
754+
"/mcp",
755+
headers=headers,
756+
json={
757+
"jsonrpc": "2.0",
758+
"id": 2,
759+
"method": "tools/call",
760+
"params": {"name": "release_lock", "arguments": {}},
761+
},
762+
)
763+
assert release_response.status_code == 200
764+
765+
with anyio.fail_after(5):
766+
notification = await next_sse_data(lines_a)
767+
final = await next_sse_data(lines_a)
768+
assert notification["params"]["data"] == "Second notification after lock"
769+
assert final["id"] == 1
770+
assert final["result"]["content"][0]["text"] == "Completed"
771+
772+
773+
@pytest.mark.anyio
774+
async def test_request_id_reuse_after_completion_allowed(basic_app: Starlette) -> None:
775+
"""A request id can be reused once the earlier request with that id has completed.
776+
777+
Only concurrent requests with the same id are ambiguous to route; sequential
778+
reuse (which some deployed clients rely on, sending every request with id 1)
779+
keeps working (see #3060).
780+
"""
781+
async with make_client(basic_app) as client:
782+
response = await client.post(
783+
"/mcp",
784+
headers={
785+
"Accept": "application/json, text/event-stream",
786+
"Content-Type": "application/json",
787+
},
788+
json=INIT_REQUEST,
789+
)
790+
assert response.status_code == 200
791+
headers = {
792+
"Accept": "application/json, text/event-stream",
793+
"Content-Type": "application/json",
794+
MCP_SESSION_ID_HEADER: response.headers[MCP_SESSION_ID_HEADER],
795+
MCP_PROTOCOL_VERSION_HEADER: extract_protocol_version_from_sse(response),
796+
}
797+
798+
for _ in range(2):
799+
response = await client.post(
800+
"/mcp",
801+
headers=headers,
802+
json={
803+
"jsonrpc": "2.0",
804+
"id": 1,
805+
"method": "tools/call",
806+
"params": {"name": "test_tool", "arguments": {}},
807+
},
808+
)
809+
assert response.status_code == 200
810+
body = first_sse_data(response)
811+
assert body["id"] == 1
812+
assert body["result"]["content"][0]["text"] == "Called test_tool"
813+
814+
683815
@pytest.mark.anyio
684816
async def test_json_response(json_app: Starlette) -> None:
685817
"""With JSON response mode enabled, requests are answered with application/json bodies."""

0 commit comments

Comments
 (0)