diff --git a/src/mcp/server/mcpserver/utilities/func_metadata.py b/src/mcp/server/mcpserver/utilities/func_metadata.py index be4afb4e9..26d8440ba 100644 --- a/src/mcp/server/mcpserver/utilities/func_metadata.py +++ b/src/mcp/server/mcpserver/utilities/func_metadata.py @@ -33,6 +33,33 @@ def _is_input_required_type(obj: Any) -> bool: return isinstance(obj, type) and issubclass(obj, InputRequiredResult) +def _unwrap_annotated(annotation: Any) -> Any: + """Strip `Annotated[...]` down to its underlying type, e.g. for + `Annotated[str, Field(description=...)]` (idiomatic for described params). + """ + if get_origin(annotation) is Annotated: + return get_args(annotation)[0] + return annotation + + +def _is_optional_str(annotation: Any) -> bool: + """Whether `annotation` is `str | None` (`Optional[str]`), modulo `None` + and `Annotated` wrapping. + + Used by `pre_parse_json` to skip JSON pre-parsing for such annotations: + unlike e.g. `str | list[str]`, no member of this union other than `str` + could ever be produced by decoding a string as JSON, so attempting to + parse it only risks corrupting a valid string value that happens to look + like a JSON object/array (e.g. a JSON-serialized payload passed as a + plain string parameter). + """ + origin = get_origin(annotation) + if not is_union_origin(origin): + return False + non_none_args = [_unwrap_annotated(arg) for arg in get_args(annotation) if arg is not type(None)] + return non_none_args == [str] + + class StrictJsonSchema(GenerateJsonSchema): """A JSON schema generator that raises exceptions instead of emitting warnings. @@ -169,7 +196,11 @@ def pre_parse_json(self, data: dict[str, Any]) -> dict[str, Any]: continue field_info = key_to_field_info[data_key] - if isinstance(data_value, str) and field_info.annotation is not str: + if ( + isinstance(data_value, str) + and field_info.annotation is not str + and not _is_optional_str(field_info.annotation) + ): try: pre_parsed = json.loads(data_value) except json.JSONDecodeError: diff --git a/tests/server/mcpserver/test_func_metadata.py b/tests/server/mcpserver/test_func_metadata.py index 62a9612b9..080ace672 100644 --- a/tests/server/mcpserver/test_func_metadata.py +++ b/tests/server/mcpserver/test_func_metadata.py @@ -204,6 +204,53 @@ def func_with_str_types(str_or_list: str | list[str]): # pragma: no cover assert result["str_or_list"] == ["hello", "world"] +def test_optional_str_is_never_json_pre_parsed(): + """`str | None` must never be JSON-decoded, even if the value looks like a + JSON object/array (e.g. a JSON-serialized payload passed as a plain string). + + Regression test: `field_info.annotation is not str` used to be True for + `str | None`, so a value like a JSON-serialized template body would get + silently turned into a dict and fail pydantic validation downstream. + """ + + def func_with_optional_str(body: str | None = None): # pragma: no cover + return body + + meta = func_metadata(func_with_optional_str) + + json_payload = '{"blocks": [{"type": "text", "value": "hi"}]}' + result = meta.pre_parse_json({"body": json_payload}) + assert result["body"] == json_payload + + json_list_payload = '["hello", "world"]' + result = meta.pre_parse_json({"body": json_list_payload}) + assert result["body"] == json_list_payload + + # Plain strings and None are unaffected. + result = meta.pre_parse_json({"body": "hello"}) + assert result["body"] == "hello" + result = meta.pre_parse_json({"body": None}) + assert result["body"] is None + + +def test_annotated_optional_str_is_never_json_pre_parsed(): + """`Annotated[str, Field(...)] | None` — idiomatic FastMCP style for a + described optional string param — must get the same treatment as plain + `str | None`: the `Annotated` wrapper must not defeat the `str` check. + """ + + def func_with_annotated_str( + body: Annotated[str, Field(description="a body")] | None = None, + ): # pragma: no cover + return body + + meta = func_metadata(func_with_annotated_str) + + json_payload = '{"blocks": ["a", "b"]}' + result = meta.pre_parse_json({"body": json_payload}) + assert result["body"] == json_payload + + def test_skip_names(): """Test that skipped parameters are not included in the model"""