From ffd2506ac2b2d663e9d954cee8a6a8f63aa2c255 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Wed, 3 Jun 2026 15:10:36 +0200 Subject: [PATCH 1/2] fix(acp): Serialize proxy metadata as _meta --- .../tests/meta_propagation.rs | 197 ++++++++++++++++++ .../src/schema/proxy_protocol.rs | 40 +++- .../tests/meta_propagation.rs | 126 +++++++++++ 3 files changed, 353 insertions(+), 10 deletions(-) create mode 100644 src/agent-client-protocol-conductor/tests/meta_propagation.rs create mode 100644 src/agent-client-protocol/tests/meta_propagation.rs diff --git a/src/agent-client-protocol-conductor/tests/meta_propagation.rs b/src/agent-client-protocol-conductor/tests/meta_propagation.rs new file mode 100644 index 0000000..6083e7a --- /dev/null +++ b/src/agent-client-protocol-conductor/tests/meta_propagation.rs @@ -0,0 +1,197 @@ +use std::sync::{Arc, Mutex}; + +use agent_client_protocol::schema::{ + AgentCapabilities, ContentBlock, InitializeProxyRequest, InitializeRequest, InitializeResponse, + Meta, NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse, ProtocolVersion, + SessionId, StopReason, TextContent, +}; +use agent_client_protocol::util::MatchDispatchFrom; +use agent_client_protocol::{ + Agent, Client, Conductor, ConnectTo, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, + JsonRpcResponse, Proxy, SentRequest, +}; +use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use serde_json::Value; +use tokio::io::duplex; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +async fn recv( + response: SentRequest, +) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + response.on_receiving_result(async move |result| { + tx.send(result) + .map_err(|_| agent_client_protocol::Error::internal_error()) + })?; + rx.await + .map_err(|_| agent_client_protocol::Error::internal_error())? +} + +fn trace_context_meta() -> Meta { + let mut meta = Meta::new(); + meta.insert( + "traceparent".into(), + Value::String("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".into()), + ); + meta.insert( + "tracestate".into(), + Value::String("rojo=00f067aa0ba902b7".into()), + ); + meta.insert("baggage".into(), Value::String("tenant=acme".into())); + meta +} + +struct PassthroughProxy; + +impl ConnectTo for PassthroughProxy { + async fn connect_to( + self, + client: impl ConnectTo, + ) -> Result<(), agent_client_protocol::Error> { + Proxy + .builder() + .name("passthrough-proxy") + .on_receive_request_from( + Client, + async |request: InitializeProxyRequest, responder, cx| { + cx.send_request_to(Agent, request.initialize) + .forward_response_to(responder) + }, + agent_client_protocol::on_receive_request!(), + ) + .with_handler(ForwardMessages) + .connect_to(client) + .await + } +} + +struct ForwardMessages; + +impl HandleDispatchFrom for ForwardMessages { + async fn handle_dispatch_from( + &mut self, + message: Dispatch, + connection: ConnectionTo, + ) -> Result, agent_client_protocol::Error> { + MatchDispatchFrom::new(message, &connection) + .if_message_from(Client, async |message: Dispatch| { + connection.send_proxied_message_to(Agent, message)?; + Ok(Handled::Yes) + }) + .await + .if_message_from(Agent, async |message: Dispatch| { + connection.send_proxied_message_to(Client, message)?; + Ok(Handled::Yes) + }) + .await + .done() + } + + fn describe_chain(&self) -> impl std::fmt::Debug { + "ForwardMessages" + } +} + +struct RecordingAgent { + prompt_meta: Arc>>, +} + +impl ConnectTo for RecordingAgent { + async fn connect_to( + self, + client: impl ConnectTo, + ) -> Result<(), agent_client_protocol::Error> { + let prompt_meta = self.prompt_meta; + + Agent + .builder() + .name("recording-agent") + .on_receive_request( + async |request: InitializeRequest, responder, _cx| { + responder.respond( + InitializeResponse::new(request.protocol_version) + .agent_capabilities(AgentCapabilities::new()), + ) + }, + agent_client_protocol::on_receive_request!(), + ) + .on_receive_request( + async |_request: NewSessionRequest, responder, _cx| { + responder.respond(NewSessionResponse::new(SessionId::new("session-1"))) + }, + agent_client_protocol::on_receive_request!(), + ) + .on_receive_request( + async move |request: PromptRequest, responder, _cx| { + *prompt_meta.lock().expect("not poisoned") = request.meta; + responder.respond(PromptResponse::new(StopReason::EndTurn)) + }, + agent_client_protocol::on_receive_request!(), + ) + .connect_to(client) + .await + } +} + +async fn run_with_conductor( + components: ProxiesAndAgent, + editor_task: impl AsyncFnOnce(ConnectionTo) -> Result<(), agent_client_protocol::Error>, +) -> Result<(), agent_client_protocol::Error> { + let (editor_out, conductor_in) = duplex(4096); + let (conductor_out, editor_in) = duplex(4096); + + let transport = + agent_client_protocol::ByteStreams::new(editor_out.compat_write(), editor_in.compat()); + + Client + .builder() + .name("editor") + .with_spawned(|_cx| async move { + ConductorImpl::new_agent("conductor", components) + .run(agent_client_protocol::ByteStreams::new( + conductor_out.compat_write(), + conductor_in.compat(), + )) + .await + }) + .connect_with(transport, editor_task) + .await +} + +#[tokio::test] +async fn conductor_proxy_chain_preserves_prompt_meta() -> Result<(), agent_client_protocol::Error> { + let observed_meta = Arc::new(Mutex::new(None)); + let expected_meta = trace_context_meta(); + let agent = RecordingAgent { + prompt_meta: Arc::clone(&observed_meta), + }; + + run_with_conductor( + ProxiesAndAgent::new(agent).proxy(PassthroughProxy), + async |connection| { + recv(connection.send_request(InitializeRequest::new(ProtocolVersion::LATEST))).await?; + + let session = recv(connection.send_request(NewSessionRequest::new("/"))).await?; + recv( + connection.send_request( + PromptRequest::new( + session.session_id, + vec![ContentBlock::Text(TextContent::new("hello"))], + ) + .meta(expected_meta.clone()), + ), + ) + .await?; + + Ok(()) + }, + ) + .await?; + + assert_eq!( + *observed_meta.lock().expect("not poisoned"), + Some(expected_meta) + ); + + Ok(()) +} diff --git a/src/agent-client-protocol/src/schema/proxy_protocol.rs b/src/agent-client-protocol/src/schema/proxy_protocol.rs index da18da4..4dce6ee 100644 --- a/src/agent-client-protocol/src/schema/proxy_protocol.rs +++ b/src/agent-client-protocol/src/schema/proxy_protocol.rs @@ -22,8 +22,12 @@ pub struct SuccessorMessage { #[serde(flatten)] pub message: M, - /// Optional metadata - #[serde(skip_serializing_if = "Option::is_none")] + /// Optional `_meta` metadata. + #[serde( + rename = "_meta", + alias = "meta", + skip_serializing_if = "Option::is_none" + )] pub meta: Option, } @@ -82,8 +86,12 @@ pub struct McpConnectRequest { /// The ACP identifier for the server (e.g., "acp:uuid"), matching `McpServerAcp.id` pub acp_id: String, - /// Optional metadata - #[serde(skip_serializing_if = "Option::is_none")] + /// Optional `_meta` metadata. + #[serde( + rename = "_meta", + alias = "meta", + skip_serializing_if = "Option::is_none" + )] pub meta: Option, } @@ -94,8 +102,12 @@ pub struct McpConnectResponse { /// Unique identifier for the established MCP connection pub connection_id: String, - /// Optional metadata - #[serde(skip_serializing_if = "Option::is_none")] + /// Optional `_meta` metadata. + #[serde( + rename = "_meta", + alias = "meta", + skip_serializing_if = "Option::is_none" + )] pub meta: Option, } @@ -109,8 +121,12 @@ pub struct McpDisconnectNotification { /// The id of the connection to disconnect. pub connection_id: String, - /// Optional metadata - #[serde(skip_serializing_if = "Option::is_none")] + /// Optional `_meta` metadata. + #[serde( + rename = "_meta", + alias = "meta", + skip_serializing_if = "Option::is_none" + )] pub meta: Option, } @@ -131,8 +147,12 @@ pub struct McpOverAcpMessage { #[serde(flatten)] pub message: M, - /// Optional metadata - #[serde(skip_serializing_if = "Option::is_none")] + /// Optional `_meta` metadata. + #[serde( + rename = "_meta", + alias = "meta", + skip_serializing_if = "Option::is_none" + )] pub meta: Option, } diff --git a/src/agent-client-protocol/tests/meta_propagation.rs b/src/agent-client-protocol/tests/meta_propagation.rs new file mode 100644 index 0000000..fc88822 --- /dev/null +++ b/src/agent-client-protocol/tests/meta_propagation.rs @@ -0,0 +1,126 @@ +use agent_client_protocol::schema::{ + ContentBlock, McpOverAcpMessage, Meta, PromptRequest, SessionId, SuccessorMessage, TextContent, +}; +use agent_client_protocol::{JsonRpcMessage, UntypedMessage}; +use serde_json::{Value, json}; + +fn trace_context_meta() -> Meta { + let mut meta = Meta::new(); + meta.insert( + "traceparent".into(), + Value::String("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".into()), + ); + meta.insert( + "tracestate".into(), + Value::String("rojo=00f067aa0ba902b7".into()), + ); + meta.insert("baggage".into(), Value::String("tenant=acme".into())); + meta +} + +fn prompt_request(meta: Meta) -> PromptRequest { + PromptRequest::new( + SessionId::new("session-1"), + vec![ContentBlock::Text(TextContent::new("hello"))], + ) + .meta(meta) +} + +fn trace_context_meta_value() -> Value { + Value::Object(trace_context_meta()) +} + +#[test] +fn prompt_request_meta_round_trips_with_root_trace_context_keys() +-> Result<(), agent_client_protocol::Error> { + let meta = trace_context_meta(); + let request = prompt_request(meta.clone()); + + let untyped = request.to_untyped_message()?; + + assert_eq!(untyped.method(), "session/prompt"); + assert_eq!(untyped.params()["_meta"], Value::Object(meta.clone())); + assert!(untyped.params().get("meta").is_none()); + + let parsed = PromptRequest::parse_message(untyped.method(), untyped.params())?; + assert_eq!(parsed.meta, Some(meta)); + + Ok(()) +} + +#[test] +fn successor_message_meta_serializes_as_reserved_meta_field() +-> Result<(), agent_client_protocol::Error> { + let envelope_meta = trace_context_meta_value(); + let inner_meta = trace_context_meta(); + let message = SuccessorMessage { + message: prompt_request(inner_meta.clone()), + meta: Some(envelope_meta.clone()), + }; + + let untyped = message.to_untyped_message()?; + + assert_eq!(untyped.method(), "_proxy/successor"); + assert_eq!(untyped.params()["_meta"], envelope_meta); + assert_eq!( + untyped.params()["params"]["_meta"], + Value::Object(inner_meta.clone()) + ); + assert!(untyped.params().get("meta").is_none()); + + let parsed = + SuccessorMessage::::parse_message(untyped.method(), untyped.params())?; + assert_eq!(parsed.meta, Some(envelope_meta)); + assert_eq!(parsed.message.meta, Some(inner_meta)); + + Ok(()) +} + +#[test] +fn successor_message_accepts_legacy_meta_alias() -> Result<(), agent_client_protocol::Error> { + let envelope_meta = trace_context_meta_value(); + let mut params = SuccessorMessage { + message: prompt_request(Meta::new()), + meta: Some(envelope_meta.clone()), + } + .to_untyped_message()? + .params; + + let params_object = params.as_object_mut().expect("params should be an object"); + let meta = params_object + .remove("_meta") + .expect("successor metadata should be present"); + params_object.insert("meta".into(), meta); + + let parsed = SuccessorMessage::::parse_message("_proxy/successor", ¶ms)?; + assert_eq!(parsed.meta, Some(envelope_meta)); + + Ok(()) +} + +#[test] +fn mcp_over_acp_message_meta_serializes_as_reserved_meta_field() +-> Result<(), agent_client_protocol::Error> { + let meta = trace_context_meta_value(); + let inner = UntypedMessage::new("tools/list", json!({ "cursor": "abc" }))?; + let message = McpOverAcpMessage { + connection_id: "connection-1".into(), + message: inner, + meta: Some(meta.clone()), + }; + + let untyped = message.to_untyped_message()?; + + assert_eq!(untyped.method(), "_mcp/message"); + assert_eq!(untyped.params()["connectionId"], "connection-1"); + assert_eq!(untyped.params()["method"], "tools/list"); + assert_eq!(untyped.params()["params"]["cursor"], "abc"); + assert_eq!(untyped.params()["_meta"], meta); + assert!(untyped.params().get("meta").is_none()); + + let parsed = + McpOverAcpMessage::::parse_message(untyped.method(), untyped.params())?; + assert_eq!(parsed.meta, Some(meta)); + + Ok(()) +} From bd920f98eed3884913e742f9f5308e93d7584e48 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Wed, 3 Jun 2026 15:15:09 +0200 Subject: [PATCH 2/2] fix typos --- src/agent-client-protocol-conductor/tests/meta_propagation.rs | 4 ++-- src/agent-client-protocol/tests/meta_propagation.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/agent-client-protocol-conductor/tests/meta_propagation.rs b/src/agent-client-protocol-conductor/tests/meta_propagation.rs index 6083e7a..ae9710e 100644 --- a/src/agent-client-protocol-conductor/tests/meta_propagation.rs +++ b/src/agent-client-protocol-conductor/tests/meta_propagation.rs @@ -31,11 +31,11 @@ fn trace_context_meta() -> Meta { let mut meta = Meta::new(); meta.insert( "traceparent".into(), - Value::String("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".into()), + Value::String("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0b902b7-01".into()), ); meta.insert( "tracestate".into(), - Value::String("rojo=00f067aa0ba902b7".into()), + Value::String("rojo=00f067aa0b902b7".into()), ); meta.insert("baggage".into(), Value::String("tenant=acme".into())); meta diff --git a/src/agent-client-protocol/tests/meta_propagation.rs b/src/agent-client-protocol/tests/meta_propagation.rs index fc88822..fbf5d35 100644 --- a/src/agent-client-protocol/tests/meta_propagation.rs +++ b/src/agent-client-protocol/tests/meta_propagation.rs @@ -8,11 +8,11 @@ fn trace_context_meta() -> Meta { let mut meta = Meta::new(); meta.insert( "traceparent".into(), - Value::String("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".into()), + Value::String("00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0b902b7-01".into()), ); meta.insert( "tracestate".into(), - Value::String("rojo=00f067aa0ba902b7".into()), + Value::String("rojo=00f067aa0b902b7".into()), ); meta.insert("baggage".into(), Value::String("tenant=acme".into())); meta