From df8bb3c727adaf5e61a78ebca43ffc26ac18ee41 Mon Sep 17 00:00:00 2001 From: limityan Date: Fri, 5 Jun 2026 20:05:03 +0800 Subject: [PATCH] refactor(stream): move stream contracts out of adapters --- docs/architecture/core-decomposition.md | 4 +- docs/plans/core-decomposition-completed.md | 1 + docs/plans/core-decomposition-plan.md | 1 + src/crates/adapters/ai-adapters/AGENTS.md | 6 +- src/crates/adapters/ai-adapters/Cargo.toml | 7 + src/crates/adapters/ai-adapters/README.md | 7 +- .../ai-adapters/src/stream/types/unified.rs | 82 +- .../ai-adapters/src/tool_call_accumulator.rs | 1090 +---------------- .../adapters/ai-adapters/src/types/ai.rs | 69 ++ .../tests/common/fixture_loader.rs | 0 .../ai-adapters}/tests/common/mod.rs | 0 .../tests/common/sse_fixture_server.rs | 0 .../tests/common/stream_test_harness.rs | 0 .../anthropic/closed_after_message_delta.sse | 0 ...y_thinking_signature_text_and_tool_use.sse | 0 .../stream/anthropic/extended_thinking.sse | 0 .../stream/anthropic/inline_think_text.sse | 0 .../interleaved_parallel_tool_use.sse | 0 .../malformed_content_block_delta.sse | 0 .../malformed_tool_arguments_extra_brace.sse | 0 .../gemini/function_call_string_args.sse | 0 ...y_reasoning_content_text_and_tool_call.sse | 0 .../stream/openai/inline_think_text.sse | 0 ...nterleaved_parallel_tool_args_by_index.sse | 0 ...hree_tools_with_empty_toolcall_anomaly.sse | 0 .../openai/tool_args_snapshot_stop_reason.sse | 0 .../openai/tool_args_split_with_usage.sse | 0 .../openai/tool_call_missing_type_field.sse | 0 ..._call_trailing_empty_args_finish_chunk.sse | 0 .../openai/tool_id_only_orphan_filtered.sse | 0 ...ool_id_prelude_then_payload_without_id.sse | 0 ...st_final_chunk_contains_orphan_id_only.sse | 0 .../malformed_function_call_arguments.sse | 0 .../tests/stream_processor_anthropic.rs | 0 .../tests/stream_processor_openai.rs | 0 .../tests/stream_processor_tool_arguments.rs | 0 .../tests/stream_replay_regressions.rs | 0 .../ai-adapters}/tests/stream_test_harness.rs | 0 .../src/agentic/execution/stream_processor.rs | 2 +- src/crates/execution/AGENTS-CN.md | 8 +- src/crates/execution/AGENTS.md | 11 +- src/crates/execution/agent-stream/AGENTS.md | 23 +- src/crates/execution/agent-stream/Cargo.toml | 3 - src/crates/execution/agent-stream/src/lib.rs | 22 +- .../agent-stream/src/tool_call_accumulator.rs | 1089 ++++++++++++++++ .../execution/agent-stream/src/unified.rs | 81 ++ 46 files changed, 1293 insertions(+), 1213 deletions(-) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/common/fixture_loader.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/common/mod.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/common/sse_fixture_server.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/common/stream_test_harness.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/closed_after_message_delta.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/empty_thinking_signature_text_and_tool_use.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/extended_thinking.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/inline_think_text.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/interleaved_parallel_tool_use.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/malformed_content_block_delta.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/anthropic/malformed_tool_arguments_extra_brace.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/gemini/function_call_string_args.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/empty_reasoning_content_text_and_tool_call.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/inline_think_text.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/interleaved_parallel_tool_args_by_index.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/thinking_text_three_tools_with_empty_toolcall_anomaly.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/tool_args_snapshot_stop_reason.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/tool_args_split_with_usage.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/tool_call_missing_type_field.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/tool_call_trailing_empty_args_finish_chunk.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/tool_id_only_orphan_filtered.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/tool_id_prelude_then_payload_without_id.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/openai/two_tools_first_final_chunk_contains_orphan_id_only.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/fixtures/stream/responses/malformed_function_call_arguments.sse (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/stream_processor_anthropic.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/stream_processor_openai.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/stream_processor_tool_arguments.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/stream_replay_regressions.rs (100%) rename src/crates/{execution/agent-stream => adapters/ai-adapters}/tests/stream_test_harness.rs (100%) create mode 100644 src/crates/execution/agent-stream/src/tool_call_accumulator.rs create mode 100644 src/crates/execution/agent-stream/src/unified.rs diff --git a/docs/architecture/core-decomposition.md b/docs/architecture/core-decomposition.md index bcc67d315..2362fec25 100644 --- a/docs/architecture/core-decomposition.md +++ b/docs/architecture/core-decomposition.md @@ -226,7 +226,7 @@ flowchart TB ### 7.3 适配层(Adapters) -适配层负责协议、transport、外部 provider 和宿主通信转换,物理位置是 `src/crates/adapters`。其中 `ai-adapters` 负责 AI provider 请求/响应与 stream 协议,`api-layer` 负责产品宿主共用的后端 API adapter,`transport` 负责事件投递和 host transport adapter,`webdriver` 负责 WebDriver 协议和浏览器自动化 adapter。适配层不拥有产品能力选择,也不承载可复用 OS service 实现。 +适配层负责协议、transport、外部 provider 和宿主通信转换,物理位置是 `src/crates/adapters`。其中 `ai-adapters` 负责 AI provider 请求/响应映射和 provider stream 协议解析,解析结果应转换为 execution 层拥有的统一 stream 契约;`api-layer` 负责产品宿主共用的后端 API adapter,`transport` 负责事件投递和 host transport adapter,`webdriver` 负责 WebDriver 协议和浏览器自动化 adapter。适配层不拥有产品能力选择,也不承载可复用 OS service 实现。 ### 7.4 服务实现层(Services) @@ -234,7 +234,7 @@ flowchart TB ### 7.5 执行原语层(Execution Primitives) -执行原语层提供 provider-neutral 的 runtime building blocks,物理位置是 `src/crates/execution`。`agent-runtime`、`agent-stream`、`harness`、`runtime-services`、`tool-contracts`、`tool-provider-groups` 和 `tool-execution` 分别定义 agent loop facts、stream normalization、workflow descriptor、typed service bundle、tool manifest / permission / result policy、tool group facts 和低层 tool execution helper。当前 Cargo package / lib 名保持兼容,但物理目录按职责命名。它们只能依赖稳定契约或明确的 provider-neutral DTO,不直接创建 Tauri handle、filesystem manager、Git provider、MCP client、AI client 或 host process。 +执行原语层提供 provider-neutral 的 runtime building blocks,物理位置是 `src/crates/execution`。`agent-runtime`、`agent-stream`、`harness`、`runtime-services`、`tool-contracts`、`tool-provider-groups` 和 `tool-execution` 分别定义 agent loop facts、统一 stream DTO / tool-call 累积 / replay 契约、workflow descriptor、typed service bundle、tool manifest / permission / result policy、tool group facts 和低层 tool execution helper。当前 Cargo package / lib 名保持兼容,但物理目录按职责命名。它们只能依赖稳定契约或明确的 provider-neutral DTO,不直接创建 Tauri handle、filesystem manager、Git provider、MCP client、AI client 或 host process。 ### 7.6 稳定契约与产品领域层(Stable Contracts and Product Domains) diff --git a/docs/plans/core-decomposition-completed.md b/docs/plans/core-decomposition-completed.md index f52714bea..62094c6d4 100644 --- a/docs/plans/core-decomposition-completed.md +++ b/docs/plans/core-decomposition-completed.md @@ -43,6 +43,7 @@ - `src/crates` 已按六层物理布局整理:`interfaces/`、`assembly/`、`adapters/`、`services/`、`execution/`、`contracts/`。 - 旧 `surfaces` 和 `providers` 目标层级已被移除:协议入口归入 `interfaces`,协议/transport/provider 转换归入 `adapters`,OS/runtime infrastructure 具体实现归入 `services`。 - execution 下 tool 相关目录已按职责命名:`tool-contracts`、`tool-provider-groups`、`tool-execution`。Cargo package / lib 名保持兼容。 +- `agent-stream` 已成为统一 stream DTO、tool-call 累积和 replay 契约 owner;provider stream 解析测试归属 `ai-adapters`。 - AGENTS、README、DeepReview path classifier、core boundary rules 和 Cargo workspace path 已同步到当前分层。 ## 2. 已建立的保护 diff --git a/docs/plans/core-decomposition-plan.md b/docs/plans/core-decomposition-plan.md index 5647d39a0..0d49f3a8f 100644 --- a/docs/plans/core-decomposition-plan.md +++ b/docs/plans/core-decomposition-plan.md @@ -23,6 +23,7 @@ - Desktop / CLI / ACP 仍通过 `bitfun-core/product-full` 获取完整能力;Server / Web / Mobile Web 不直接依赖 core,但尚未完成按交付形态裁剪最小 feature / dependency。 - `runtime-services` 已有 typed builder、capability availability 和 core product runtime provider adapter,但不少 concrete provider 仍在 core 创建或持有。 - `tool-contracts` 已承接 provider-neutral tool manifest、admission、catalog、result policy 等纯策略;`tool-execution` 只承接部分低层 IO/search helper;Bash、terminal lifecycle、indexed search、remote shell、permission wait、checkpoint orchestration 和完整 execution pipeline 仍未完全迁移。 +- `agent-stream` 已承接统一 stream DTO、tool-call 累积和 replay 契约;provider SSE / 响应解析测试归属 `ai-adapters`。 - `harness` 当前主要承接 descriptor / route plan / registry contract;Deep Review、DeepResearch、MiniApp 的 concrete workflow execution 仍在 core 或产品路径。 - `product-domains` 已承接 MiniApp / function-agent 的部分纯领域逻辑;worker、host side effect、AI acquisition、marker IO 等 concrete path 仍未完成 owner 迁移。 diff --git a/src/crates/adapters/ai-adapters/AGENTS.md b/src/crates/adapters/ai-adapters/AGENTS.md index 20fc6686f..803f6722e 100644 --- a/src/crates/adapters/ai-adapters/AGENTS.md +++ b/src/crates/adapters/ai-adapters/AGENTS.md @@ -3,8 +3,8 @@ Scope: this guide applies to `src/crates/adapters/ai-adapters`. `bitfun-ai-adapters` owns provider-specific request/response mapping and stream -normalization. Keep provider quirks here instead of leaking them into core tool -contracts or product runtime logic. +protocol parsing. Keep provider quirks here, then convert stream chunks into the +provider-neutral contracts owned by `bitfun-agent-stream`. ## Guardrails @@ -16,6 +16,8 @@ contracts or product runtime logic. count. - Do not change shared stream or usage semantics without updating the focused adapter tests and downstream usage expectations. +- Do not move provider-neutral stream DTOs, replay policy, or tool-call + accumulation ownership back into this crate. ## Verification diff --git a/src/crates/adapters/ai-adapters/Cargo.toml b/src/crates/adapters/ai-adapters/Cargo.toml index 93c9392af..28ecbce6a 100644 --- a/src/crates/adapters/ai-adapters/Cargo.toml +++ b/src/crates/adapters/ai-adapters/Cargo.toml @@ -11,6 +11,7 @@ crate-type = ["rlib"] [dependencies] anyhow = { workspace = true } +bitfun-agent-stream = { path = "../../execution/agent-stream" } bitfun-core-types = { path = "../../contracts/core-types" } chrono = { workspace = true } eventsource-stream = { workspace = true } @@ -22,3 +23,9 @@ serde_json = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } urlencoding = { workspace = true } + +[dev-dependencies] +async-trait = { workspace = true } +axum = { workspace = true } +bitfun-events = { path = "../../contracts/events" } +tokio-util = { workspace = true } diff --git a/src/crates/adapters/ai-adapters/README.md b/src/crates/adapters/ai-adapters/README.md index e67790f70..b2c3e21c9 100644 --- a/src/crates/adapters/ai-adapters/README.md +++ b/src/crates/adapters/ai-adapters/README.md @@ -6,8 +6,7 @@ This crate owns the portable AI integration layer: - provider request building - provider-specific message conversion -- SSE / stream parsing -- streamed tool-call aggregation +- SSE / stream parsing into provider-neutral stream contracts - shared AI-facing transport types - provider model discovery - connection health checks @@ -26,8 +25,8 @@ and re-exports this crate where convenient. - `client`: shared HTTP transport, retries, aggregation, health checks - `providers`: OpenAI / Anthropic / Gemini request and discovery adapters -- `stream`: provider SSE parsing into unified streaming events -- `tool_call_accumulator`: reconstruct structured tool calls from streamed deltas +- `stream`: provider SSE parsing into unified streaming events from `bitfun-agent-stream` +- `tool_call_accumulator`: compatibility re-export; canonical implementation lives in `bitfun-agent-stream` - `types`: portable request/response/config/message types ## Design Rule diff --git a/src/crates/adapters/ai-adapters/src/stream/types/unified.rs b/src/crates/adapters/ai-adapters/src/stream/types/unified.rs index fb5948eea..ba7964bb9 100644 --- a/src/crates/adapters/ai-adapters/src/stream/types/unified.rs +++ b/src/crates/adapters/ai-adapters/src/stream/types/unified.rs @@ -1,81 +1 @@ -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::borrow::Cow; -use std::fmt; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UnifiedToolCall { - #[serde(skip_serializing_if = "Option::is_none")] - pub tool_call_index: Option, - pub id: Option, - pub name: Option, - pub arguments: Option, - #[serde(default)] - pub arguments_is_snapshot: bool, -} - -/// Unified AI response format -#[derive(Clone, Serialize, Deserialize, Default)] -pub struct UnifiedResponse { - pub text: Option, - pub reasoning_content: Option, - /// Signature for Anthropic extended thinking (returned in multi-turn conversations) - #[serde(skip_serializing_if = "Option::is_none")] - pub thinking_signature: Option, - pub tool_call: Option, - pub usage: Option, - pub finish_reason: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub provider_metadata: Option, -} - -impl fmt::Debug for UnifiedResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let reasoning_summary = self.reasoning_content.as_ref().map(|s| { - if s.len() > 100 { - let end = s - .char_indices() - .take_while(|(i, _)| *i < 100) - .last() - .map(|(i, c)| i + c.len_utf8()) - .unwrap_or(0); - // Guard against multi-byte chars pushing end past the string length - let end = end.min(s.len()); - Cow::Owned(format!("{}... ({} bytes)", &s[..end], s.len())) - } else { - Cow::Borrowed(s.as_str()) - } - }); - f.debug_struct("UnifiedResponse") - .field("text", &self.text) - .field("reasoning_content", &reasoning_summary) - .field("thinking_signature", &"") - .field("tool_call", &self.tool_call) - .field("usage", &self.usage) - .field("finish_reason", &self.finish_reason) - .field("provider_metadata", &"") - .finish() - } -} - -/// Unified token usage statistics -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UnifiedTokenUsage { - pub prompt_token_count: u32, - pub candidates_token_count: u32, - pub total_token_count: u32, - #[serde(skip_serializing_if = "Option::is_none")] - pub reasoning_token_count: Option, - /// Cache READ tokens (i.e., served from cache this call). Universal across - /// providers: OpenAI `cached_tokens`, DeepSeek `prompt_cache_hit_tokens`, - /// Anthropic `cache_read_input_tokens`, Gemini `cachedContentTokenCount`. - /// Hit rate consumers must use this as numerator and `prompt_token_count` - /// as denominator. - #[serde(skip_serializing_if = "Option::is_none")] - pub cached_content_token_count: Option, - /// Cache WRITE tokens (only Anthropic reports this per-token; others either - /// have no creation concept or bill creation by storage time). Disjoint from - /// `cached_content_token_count`. Do NOT include in hit-rate numerator. - #[serde(skip_serializing_if = "Option::is_none", default)] - pub cache_creation_token_count: Option, -} +pub use bitfun_agent_stream::{UnifiedResponse, UnifiedTokenUsage, UnifiedToolCall}; diff --git a/src/crates/adapters/ai-adapters/src/tool_call_accumulator.rs b/src/crates/adapters/ai-adapters/src/tool_call_accumulator.rs index 9175672f1..761db6bf4 100644 --- a/src/crates/adapters/ai-adapters/src/tool_call_accumulator.rs +++ b/src/crates/adapters/ai-adapters/src/tool_call_accumulator.rs @@ -1,1089 +1 @@ -use log::{error, warn}; -use serde_json::{json, Value}; -use std::collections::BTreeMap; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ToolCallBoundary { - NewTool, - FinishReason, - StreamEnd, - GracefulShutdown, - EndOfAggregation, -} - -impl ToolCallBoundary { - fn as_str(self) -> &'static str { - match self { - Self::NewTool => "new_tool", - Self::FinishReason => "finish_reason", - Self::StreamEnd => "stream_end", - Self::GracefulShutdown => "graceful_shutdown", - Self::EndOfAggregation => "end_of_aggregation", - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum ToolCallStreamKey { - Indexed(usize), - Unindexed, -} - -impl From> for ToolCallStreamKey { - fn from(value: Option) -> Self { - match value { - Some(index) => Self::Indexed(index), - None => Self::Unindexed, - } - } -} - -#[derive(Debug, Clone, Default)] -pub struct PendingToolCall { - tool_id: String, - tool_name: String, - raw_arguments: String, - early_detected_emitted: bool, -} - -#[derive(Debug, Clone)] -pub struct FinalizedToolCall { - pub tool_id: String, - pub tool_name: String, - pub raw_arguments: String, - pub arguments: Value, - pub is_error: bool, - /// True when the raw stream produced unparseable JSON (e.g. truncated by - /// `max_tokens`) and we successfully patched the trailing brackets/strings - /// to make it parse. The recovered call still executes, but downstream - /// consumers should warn the model that the content may be incomplete. - pub recovered_from_truncation: bool, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct EarlyDetectedToolCall { - pub tool_id: String, - pub tool_name: String, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ToolCallParamsChunk { - pub tool_id: String, - pub tool_name: String, - pub params_chunk: String, -} - -#[derive(Debug, Clone, Default)] -pub struct ToolCallDeltaOutcome { - pub finalized_previous: Option, - pub early_detected: Option, - pub params_partial: Option, -} - -#[derive(Debug, Clone, Default)] -pub struct PendingToolCalls { - pending: BTreeMap, -} - -/// Tools where executing a truncated tool call is **safe and meaningful** — -/// the model intended to write content and a partial file is strictly more -/// useful than a hard failure. For everything else (Bash, Edit, Task, ...) we -/// surface the truncation as an error: a partial shell command or a partial -/// `old_string`/`new_string` for Edit can change semantics destructively. -pub fn is_write_like_tool_name(tool_name: &str) -> bool { - matches!(tool_name, "Write" | "file_write" | "write_notebook") -} - -fn is_truncation_safe_to_recover(tool_name: &str) -> bool { - is_write_like_tool_name(tool_name) || matches!(tool_name, "AskUserQuestion" | "TodoWrite") -} - -/// Attempt to repair a JSON document that was truncated mid-stream (typically -/// because the model hit `max_tokens`). Closes any open string literal and any -/// unclosed `{`/`[` brackets in their correct nesting order. Returns `None` -/// when the truncation occurs at a position where we would have to invent a -/// missing value (e.g. trailing `,` or `:`) since blindly closing in those -/// states would silently corrupt the semantics. -fn repair_truncated_json(raw: &str) -> Option { - let mut in_string = false; - let mut escape = false; - let mut stack: Vec = Vec::new(); - let mut last_significant: Option = None; - - for &b in raw.as_bytes() { - if escape { - escape = false; - continue; - } - if in_string { - match b { - b'\\' => escape = true, - b'"' => { - in_string = false; - last_significant = Some(b'"'); - } - _ => {} - } - continue; - } - match b { - b'"' => { - in_string = true; - last_significant = Some(b'"'); - } - b'{' => { - stack.push(b'{'); - last_significant = Some(b'{'); - } - b'[' => { - stack.push(b'['); - last_significant = Some(b'['); - } - b'}' => { - if stack.pop() != Some(b'{') { - return None; - } - last_significant = Some(b'}'); - } - b']' => { - if stack.pop() != Some(b'[') { - return None; - } - last_significant = Some(b']'); - } - b' ' | b'\t' | b'\n' | b'\r' => {} - other => last_significant = Some(other), - } - } - - // Nothing to repair (parser failed for some other reason). - if !in_string && stack.is_empty() { - return None; - } - - // Refuse to fabricate values when truncated mid-pair. - if !in_string { - if let Some(b',') | Some(b':') = last_significant { - return None; - } - } - - let mut out = String::with_capacity(raw.len() + stack.len() + 1); - out.push_str(raw); - if in_string { - out.push('"'); - } - while let Some(c) = stack.pop() { - out.push(match c { - b'{' => '}', - b'[' => ']', - _ => unreachable!(), - }); - } - Some(out) -} - -impl PendingToolCall { - fn strip_argument_wrapping(raw_arguments: &str) -> &str { - let trimmed = raw_arguments.trim(); - let Some(stripped) = trimmed - .strip_prefix("```") - .and_then(|value| value.strip_suffix("```")) - else { - return trimmed.trim_matches('`').trim(); - }; - - let stripped = stripped.trim(); - if let Some((first_line, rest)) = stripped.split_once('\n') { - if first_line - .chars() - .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') - { - return rest.trim(); - } - } - - stripped - } - - /// Best-effort repair for Git tool calls whose arguments came back as a raw - /// shell-style command (e.g. `git status`, `"git diff --staged"`). - fn parse_git_command_arguments(raw_arguments: &str) -> Option { - let trimmed = Self::strip_argument_wrapping(raw_arguments); - let command = trimmed - .strip_prefix("git ") - .map(str::trim) - .unwrap_or(trimmed); - let mut parts = command.splitn(2, char::is_whitespace); - let operation = parts.next()?.trim(); - if operation.is_empty() - || !operation - .chars() - .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') - { - return None; - } - - let args = parts.next().map(str::trim).filter(|args| !args.is_empty()); - let mut value = json!({ "operation": operation }); - if let Some(args) = args { - value["args"] = json!(args); - } - Some(value) - } - - fn normalize_git_tool_arguments(arguments: Value) -> Value { - if let Value::String(raw) = &arguments { - if let Some(repaired) = Self::parse_git_command_arguments(raw) { - warn!("Git tool call arguments repaired from JSON string command"); - return repaired; - } - } - arguments - } - - fn parse_arguments(tool_name: &str, raw_arguments: &str) -> Result { - match serde_json::from_str::(raw_arguments) { - Ok(arguments) => { - if tool_name == "Git" { - Ok(Self::normalize_git_tool_arguments(arguments)) - } else { - Ok(arguments) - } - } - Err(primary_error) => { - if tool_name == "Git" { - if let Some(arguments) = Self::parse_git_command_arguments(raw_arguments) { - warn!("Git tool call arguments repaired from raw command"); - return Ok(arguments); - } - } - Err(primary_error.to_string()) - } - } - } - - pub fn has_pending(&self) -> bool { - !self.tool_id.is_empty() - } - - pub fn has_meaningful_payload(&self) -> bool { - !self.tool_name.is_empty() || !self.raw_arguments.is_empty() - } - - pub fn tool_id(&self) -> &str { - &self.tool_id - } - - pub fn tool_name(&self) -> &str { - &self.tool_name - } - - pub fn start_new(&mut self, tool_id: String, tool_name: Option) { - self.tool_id = tool_id; - self.tool_name = tool_name.unwrap_or_default(); - self.raw_arguments.clear(); - self.early_detected_emitted = false; - } - - pub fn update_tool_name_if_missing(&mut self, tool_name: Option) { - if self.tool_name.is_empty() { - self.tool_name = tool_name.unwrap_or_default(); - } - } - - pub fn append_arguments(&mut self, arguments_chunk: &str) { - self.raw_arguments.push_str(arguments_chunk); - } - - pub fn replace_arguments(&mut self, arguments_snapshot: &str) { - self.raw_arguments.clear(); - self.raw_arguments.push_str(arguments_snapshot); - } - - pub fn raw_arguments(&self) -> &str { - &self.raw_arguments - } - - pub fn finalize(&mut self, boundary: ToolCallBoundary) -> Option { - if !self.has_pending() { - return None; - } - - if !self.has_meaningful_payload() { - self.tool_id.clear(); - self.tool_name.clear(); - self.raw_arguments.clear(); - self.early_detected_emitted = false; - return None; - } - - let tool_id = std::mem::take(&mut self.tool_id); - let tool_name = std::mem::take(&mut self.tool_name); - let raw_arguments = std::mem::take(&mut self.raw_arguments); - self.early_detected_emitted = false; - let parsed_arguments = Self::parse_arguments(&tool_name, &raw_arguments); - - let (arguments, is_error, recovered_from_truncation) = match parsed_arguments { - Ok(value) => (value, false, false), - Err(parse_err) => { - let repaired = repair_truncated_json(&raw_arguments) - .and_then(|candidate| Self::parse_arguments(&tool_name, &candidate).ok()); - match repaired { - Some(value) if is_truncation_safe_to_recover(&tool_name) => { - warn!( - "Tool call arguments recovered from truncation at boundary={}: tool_id={}, tool_name={}, raw_len={}", - boundary.as_str(), - tool_id, - tool_name, - raw_arguments.len() - ); - (value, false, true) - } - Some(_) => { - // We *could* repair but the tool's semantics make - // executing a partial call unsafe (Bash, Edit, ...). - // Surface as an error so the user/model knows the - // truncation happened and can retry sensibly. - warn!( - "Tool call arguments truncated at boundary={}: tool_id={}, tool_name={} — refusing to execute partial call (tool not in safe-recovery list)", - boundary.as_str(), - tool_id, - tool_name - ); - (json!({}), true, true) - } - None => { - error!( - "Tool call arguments parsing failed at boundary={}: tool_id={}, tool_name={}, error={}, raw_arguments={}", - boundary.as_str(), - tool_id, - tool_name, - parse_err, - raw_arguments - ); - (json!({}), true, false) - } - } - } - }; - - Some(FinalizedToolCall { - tool_id, - tool_name, - raw_arguments, - arguments, - is_error, - recovered_from_truncation, - }) - } -} - -impl PendingToolCalls { - pub fn new() -> Self { - Self { - pending: BTreeMap::new(), - } - } - - pub fn apply_delta( - &mut self, - key: ToolCallStreamKey, - tool_id: Option, - tool_name: Option, - arguments: Option, - arguments_is_snapshot: bool, - ) -> ToolCallDeltaOutcome { - let mut outcome = ToolCallDeltaOutcome::default(); - - let has_tool_id = tool_id.as_ref().is_some_and(|tool_id| !tool_id.is_empty()); - if !self.pending.contains_key(&key) { - if has_tool_id { - self.pending.insert(key.clone(), PendingToolCall::default()); - } else { - return outcome; - } - } - - let Some(pending) = self.pending.get_mut(&key) else { - return outcome; - }; - - if let Some(tool_id) = tool_id.filter(|tool_id| !tool_id.is_empty()) { - let is_new_tool = pending.tool_id() != tool_id; - if is_new_tool { - outcome.finalized_previous = pending.finalize(ToolCallBoundary::NewTool); - pending.start_new(tool_id, tool_name.clone()); - } else { - pending.update_tool_name_if_missing(tool_name.clone()); - } - } else if tool_name - .as_ref() - .is_some_and(|tool_name| !tool_name.is_empty()) - { - pending.update_tool_name_if_missing(tool_name.clone()); - } - - if pending.has_pending() - && !pending.tool_name().is_empty() - && !pending.early_detected_emitted - { - pending.early_detected_emitted = true; - outcome.early_detected = Some(EarlyDetectedToolCall { - tool_id: pending.tool_id().to_string(), - tool_name: pending.tool_name().to_string(), - }); - } - - if let Some(arguments) = arguments.filter(|arguments| !arguments.is_empty()) { - if pending.has_pending() { - if arguments_is_snapshot { - pending.replace_arguments(&arguments); - } else { - pending.append_arguments(&arguments); - } - let tool_name = pending.tool_name().to_string(); - let params_chunk = arguments; - if !params_chunk.is_empty() { - outcome.params_partial = Some(ToolCallParamsChunk { - tool_id: pending.tool_id().to_string(), - tool_name, - params_chunk, - }); - } - } - } - - outcome - } - - pub fn finalize_key( - &mut self, - key: &ToolCallStreamKey, - boundary: ToolCallBoundary, - ) -> Option { - let mut pending = self.pending.remove(key)?; - pending.finalize(boundary) - } - - pub fn finalize_all(&mut self, boundary: ToolCallBoundary) -> Vec { - let keys: Vec<_> = self.pending.keys().cloned().collect(); - keys.into_iter() - .filter_map(|key| self.finalize_key(&key, boundary)) - .collect() - } -} - -#[cfg(test)] -mod tests { - use super::{ - repair_truncated_json, EarlyDetectedToolCall, PendingToolCall, PendingToolCalls, - ToolCallBoundary, ToolCallParamsChunk, ToolCallStreamKey, - }; - use serde_json::json; - - #[test] - fn finalizes_complete_json_only_at_boundary() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("tool_a".to_string())); - pending.append_arguments("{\"a\":1}"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.tool_id, "call_1"); - assert_eq!(finalized.tool_name, "tool_a"); - assert_eq!(finalized.arguments, json!({"a": 1})); - assert!(!finalized.is_error); - assert!(!pending.has_pending()); - } - - #[test] - fn invalid_json_becomes_error_with_empty_object() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("tool_a".to_string())); - pending.append_arguments("{\"a\":"); - - let finalized = pending - .finalize(ToolCallBoundary::StreamEnd) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn repairs_git_raw_command_arguments() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Git".to_string())); - pending.append_arguments("git status"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.raw_arguments, "git status"); - assert_eq!(finalized.arguments, json!({"operation": "status"})); - assert!(!finalized.is_error); - } - - #[test] - fn repairs_git_json_string_command_arguments() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Git".to_string())); - pending.append_arguments("\"git diff --staged\""); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!( - finalized.arguments, - json!({"operation": "diff", "args": "--staged"}) - ); - assert!(!finalized.is_error); - } - - #[test] - fn git_args_only_object_is_left_for_tool_schema_diagnostic() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Git".to_string())); - pending.append_arguments("{\"args\": \"--since=\\\"2026-05-02\\\" --oneline\"}"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!( - finalized.arguments, - json!({"args": "--since=\"2026-05-02\" --oneline"}) - ); - assert!(!finalized.is_error); - } - - #[test] - fn git_duplicate_subcommand_in_args_is_left_for_tool_schema_diagnostic() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Git".to_string())); - pending.append_arguments("{\"args\": \"log --oneline -10\"}"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({"args": "log --oneline -10"})); - assert!(!finalized.is_error); - } - - #[test] - fn does_not_infer_git_operation_from_ambiguous_args_only_object() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Git".to_string())); - pending.append_arguments("{\"args\": \"--stat\"}"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({"args": "--stat"})); - assert!(!finalized.is_error); - } - - #[test] - fn raw_string_arguments_for_single_field_tools_stay_invalid_json() { - let cases = [ - ("Bash", "pnpm test"), - ("Skill", "openai-docs"), - ("Read", "src/main.rs"), - ("GetFileDiff", "src/lib.rs"), - ("LS", "src/crates"), - ("Delete", "tmp/output.log"), - ("Glob", "**/*.rs"), - ("Grep", "Arguments are invalid JSON"), - ("WebSearch", "OpenAI Agents SDK"), - ("WebFetch", "https://example.com"), - ("InitMiniApp", "Markdown Viewer"), - ]; - - for (tool_name, raw_arguments) in cases { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some(tool_name.to_string())); - pending.append_arguments(raw_arguments); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({}), "tool={tool_name}"); - assert!(finalized.is_error, "tool={tool_name}"); - } - } - - #[test] - fn incomplete_json_object_for_single_field_tools_stays_invalid() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments( - "{\"command\": \"git log --since=\\\"2026-05-02\\\" --oneline --stat", - ); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn does_not_wrap_incomplete_json_object_as_raw_string_argument() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments("{\"command\": "); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn does_not_repair_incomplete_json_object_for_multifield_tools() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Task".to_string())); - pending.append_arguments( - "{\"description\":\"Explore BitFun project structure\",\"prompt\":\"read README\\n\\nthoroughness: very", - ); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn does_not_repair_object_without_key_value_payload() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments("{"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn does_not_execute_truncated_incomplete_json_object() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments("{\"command\": \"git log --since=\\\"2026-05-02\\\" --on"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn json_string_arguments_for_single_field_tools_are_schema_errors_not_rewritten() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments("\"git status\""); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!("git status")); - assert!(!finalized.is_error); - } - - #[test] - fn fenced_raw_arguments_for_single_field_tools_stay_invalid_json() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments("```bash\npnpm run lint:web\n```"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn does_not_repair_raw_string_arguments_for_multifield_tools() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Edit".to_string())); - pending.append_arguments("src/main.rs"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn json_with_one_extra_trailing_right_brace_stays_invalid() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("tool_a".to_string())); - pending.append_arguments("{\"a\":1}}"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.raw_arguments, "{\"a\":1}}"); - assert_eq!(finalized.arguments, json!({})); - assert!(finalized.is_error); - } - - #[test] - fn finalized_arguments_preserve_object_fields() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("tool_a".to_string())); - pending.append_arguments("{\"a\":1,\"b\":\"x\"}"); - - let finalized = pending - .finalize(ToolCallBoundary::EndOfAggregation) - .expect("finalized tool"); - - assert_eq!(finalized.arguments["a"], json!(1)); - assert_eq!(finalized.arguments["b"], json!("x")); - } - - #[test] - fn replace_arguments_overwrites_partial_buffer() { - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("tool_a".to_string())); - pending.append_arguments("{\"city\":\"Bei"); - pending.replace_arguments("{\"city\":\"Beijing\"}"); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert_eq!(finalized.arguments, json!({"city": "Beijing"})); - assert!(!finalized.is_error); - } - - #[test] - fn manages_multiple_pending_tool_calls_by_index() { - let mut pending = PendingToolCalls::default(); - - assert_eq!( - pending - .apply_delta( - ToolCallStreamKey::Indexed(0), - Some("call_1".to_string()), - Some("tool_a".to_string()), - None, - false, - ) - .early_detected, - Some(EarlyDetectedToolCall { - tool_id: "call_1".to_string(), - tool_name: "tool_a".to_string(), - }) - ); - assert_eq!( - pending - .apply_delta( - ToolCallStreamKey::Indexed(1), - Some("call_2".to_string()), - Some("tool_b".to_string()), - None, - false, - ) - .early_detected, - Some(EarlyDetectedToolCall { - tool_id: "call_2".to_string(), - tool_name: "tool_b".to_string(), - }) - ); - - pending.apply_delta( - ToolCallStreamKey::Indexed(0), - None, - None, - Some("{\"a\":1}".to_string()), - false, - ); - pending.apply_delta( - ToolCallStreamKey::Indexed(1), - None, - None, - Some("{\"b\":2}".to_string()), - false, - ); - - let finalized = pending.finalize_all(ToolCallBoundary::FinishReason); - assert_eq!(finalized.len(), 2); - assert_eq!(finalized[0].tool_id, "call_1"); - assert_eq!(finalized[0].arguments, json!({"a": 1})); - assert_eq!(finalized[1].tool_id, "call_2"); - assert_eq!(finalized[1].arguments, json!({"b": 2})); - } - - #[test] - fn id_only_prelude_is_attached_to_following_payload_without_id() { - let mut pending = PendingToolCalls::default(); - - let prelude = pending.apply_delta( - ToolCallStreamKey::Indexed(0), - Some("call_1".to_string()), - None, - None, - false, - ); - assert_eq!(prelude.early_detected, None); - assert_eq!(prelude.params_partial, None); - - let payload = pending.apply_delta( - ToolCallStreamKey::Indexed(0), - None, - Some("tool_a".to_string()), - Some("{\"a\":1}".to_string()), - false, - ); - assert_eq!( - payload.early_detected, - Some(EarlyDetectedToolCall { - tool_id: "call_1".to_string(), - tool_name: "tool_a".to_string(), - }) - ); - assert_eq!( - payload.params_partial, - Some(ToolCallParamsChunk { - tool_id: "call_1".to_string(), - tool_name: "tool_a".to_string(), - params_chunk: "{\"a\":1}".to_string(), - }) - ); - } - - #[test] - fn id_only_orphan_is_dropped_on_finalize() { - let mut pending = PendingToolCalls::default(); - - let outcome = pending.apply_delta( - ToolCallStreamKey::Indexed(1), - Some("call_orphan".to_string()), - None, - None, - false, - ); - assert!(outcome.finalized_previous.is_none()); - assert!(outcome.early_detected.is_none()); - assert!(outcome.params_partial.is_none()); - assert!(pending - .finalize_all(ToolCallBoundary::FinishReason) - .is_empty()); - } - - #[test] - fn empty_argument_delta_is_ignored() { - let mut pending = PendingToolCalls::default(); - - let header = pending.apply_delta( - ToolCallStreamKey::Indexed(0), - Some("call_1".to_string()), - Some("tool_a".to_string()), - Some(String::new()), - false, - ); - assert_eq!( - header.early_detected, - Some(EarlyDetectedToolCall { - tool_id: "call_1".to_string(), - tool_name: "tool_a".to_string(), - }) - ); - assert!(header.params_partial.is_none()); - - let empty_delta = pending.apply_delta( - ToolCallStreamKey::Indexed(0), - None, - None, - Some(String::new()), - false, - ); - assert!(empty_delta.finalized_previous.is_none()); - assert!(empty_delta.early_detected.is_none()); - assert!(empty_delta.params_partial.is_none()); - } - - // ------------------------------------------------------------------ - // Truncation recovery tests - // ------------------------------------------------------------------ - - #[test] - fn write_truncated_mid_content_string_is_recovered() { - // Reproduces the deep-research dump: the model hit max_tokens while - // streaming `content`, so the JSON ends inside the string literal - // with no closing `"` and no closing `}`. - let raw = "{\"file_path\": \"/tmp/report.md\", \"content\": \"# Report\\n\\nA long body that was cut"; - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Write".to_string())); - pending.append_arguments(raw); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert!(!finalized.is_error, "Write recovery should succeed"); - assert!(finalized.recovered_from_truncation); - assert_eq!( - finalized.arguments, - json!({ - "file_path": "/tmp/report.md", - "content": "# Report\n\nA long body that was cut" - }) - ); - } - - #[test] - fn write_truncated_with_chinese_multibyte_is_recovered() { - let raw = "{\"file_path\": \"/tmp/r.md\", \"content\": \"深度研究报告:未完"; - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Write".to_string())); - pending.append_arguments(raw); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert!(!finalized.is_error); - assert!(finalized.recovered_from_truncation); - assert_eq!( - finalized.arguments["content"].as_str(), - Some("深度研究报告:未完") - ); - } - - #[test] - fn bash_truncated_mid_command_still_errors_but_records_truncation() { - let raw = r#"{"command": "git log --since=\"2026-05-02\" --on"#; - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("Bash".to_string())); - pending.append_arguments(raw); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - // We never execute a partial shell command. - assert!(finalized.is_error); - assert_eq!(finalized.arguments, json!({})); - // But the truncation is recorded so the surface error message and - // diagnostic dump can distinguish "truncated" from "model emitted - // bad JSON". - assert!(finalized.recovered_from_truncation); - } - - #[test] - fn repair_refuses_truncation_after_colon() { - // We can't invent the missing value, so this must not auto-repair. - assert!(repair_truncated_json(r#"{"a": 1, "b":"#).is_none()); - } - - #[test] - fn repair_refuses_truncation_after_comma() { - assert!(repair_truncated_json(r#"{"a": 1,"#).is_none()); - } - - #[test] - fn repair_returns_none_for_already_valid_json() { - // Already balanced — repair has nothing to do (parser would have - // succeeded anyway). - assert!(repair_truncated_json(r#"{"a": 1}"#).is_none()); - } - - #[test] - fn repair_closes_nested_brackets_in_correct_order() { - let raw = r#"{"a": [1, 2, {"b": "incomplete"#; - let repaired = repair_truncated_json(raw).expect("repaired"); - let parsed: serde_json::Value = - serde_json::from_str(&repaired).expect("repaired is valid JSON"); - assert_eq!(parsed, json!({"a": [1, 2, {"b": "incomplete"}]})); - } - - #[test] - fn repair_preserves_escaped_quote_inside_truncated_string() { - let raw = r#"{"content": "she said \"hello\" and then"#; - let repaired = repair_truncated_json(raw).expect("repaired"); - let parsed: serde_json::Value = serde_json::from_str(&repaired).expect("valid JSON"); - assert_eq!( - parsed["content"].as_str(), - Some("she said \"hello\" and then") - ); - } - - #[test] - fn ask_user_question_truncated_mid_chinese_string_is_recovered() { - let raw = r#"{"questions": [{"header": "重试场景", "multiSelect": true, "options": [{"description": "当消息发送后后端返回失败(消息气泡显示为红色失败状态,有 model rounds 但 status='error'),在失败气泡旁增加重试按钮,点击后重新发送该消息", "label": "失败消息气泡上加重试按钮"}]}]}"#; - // Truncate mid-Chinese-string, after a colon that opened the value - let truncated = &raw[..raw.find("消息气泡显示为红色失败状态").unwrap() - + "消息气泡显示为红色失败状态".len()]; - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("AskUserQuestion".to_string())); - pending.append_arguments(truncated); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert!(!finalized.is_error); - assert!(finalized.recovered_from_truncation); - } - - #[test] - fn ask_user_question_truncated_mid_options_is_recovered() { - // Truncation right after a completed description value's closing quote + comma - let raw = r#"{"questions": [{"header": "场景", "multiSelect": true, "options": [{"description": "第一条描述", "label": "选项一"}, {"description": "第二条描"#; - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("AskUserQuestion".to_string())); - pending.append_arguments(raw); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert!(!finalized.is_error); - assert!(finalized.recovered_from_truncation); - let questions = finalized.arguments["questions"].as_array().unwrap(); - assert_eq!(questions.len(), 1); - assert_eq!(questions[0]["options"].as_array().unwrap().len(), 2); - } - - #[test] - fn todo_write_truncated_mid_content_is_recovered() { - let raw = r#"{"todos": [{"id": "1", "content": "完成重构并优化性能", "status": "in_progress"}, {"id": "2", "content": "编写单元测"#; - let mut pending = PendingToolCall::default(); - pending.start_new("call_1".to_string(), Some("TodoWrite".to_string())); - pending.append_arguments(raw); - - let finalized = pending - .finalize(ToolCallBoundary::FinishReason) - .expect("finalized tool"); - - assert!(!finalized.is_error); - assert!(finalized.recovered_from_truncation); - let todos = finalized.arguments["todos"].as_array().unwrap(); - assert_eq!(todos.len(), 2); - } -} +pub use bitfun_agent_stream::tool_call_accumulator::*; diff --git a/src/crates/adapters/ai-adapters/src/types/ai.rs b/src/crates/adapters/ai-adapters/src/types/ai.rs index ee8982611..79f9edc5f 100644 --- a/src/crates/adapters/ai-adapters/src/types/ai.rs +++ b/src/crates/adapters/ai-adapters/src/types/ai.rs @@ -37,6 +37,32 @@ pub struct GeminiUsage { pub cache_creation_token_count: Option, } +impl From for GeminiUsage { + fn from(usage: bitfun_agent_stream::UnifiedTokenUsage) -> Self { + Self { + prompt_token_count: usage.prompt_token_count, + candidates_token_count: usage.candidates_token_count, + total_token_count: usage.total_token_count, + reasoning_token_count: usage.reasoning_token_count, + cached_content_token_count: usage.cached_content_token_count, + cache_creation_token_count: usage.cache_creation_token_count, + } + } +} + +impl From for bitfun_agent_stream::UnifiedTokenUsage { + fn from(usage: GeminiUsage) -> Self { + Self { + prompt_token_count: usage.prompt_token_count, + candidates_token_count: usage.candidates_token_count, + total_token_count: usage.total_token_count, + reasoning_token_count: usage.reasoning_token_count, + cached_content_token_count: usage.cached_content_token_count, + cache_creation_token_count: usage.cache_creation_token_count, + } + } +} + /// Structured message codes for localized connection test messaging. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -108,4 +134,47 @@ mod tests { assert_eq!(parsed.cached_content_token_count, Some(3)); assert_eq!(parsed.cache_creation_token_count, None); } + + #[test] + fn gemini_usage_converts_to_and_from_unified_token_usage() { + let usage = GeminiUsage { + prompt_token_count: 100, + candidates_token_count: 20, + total_token_count: 120, + reasoning_token_count: Some(7), + cached_content_token_count: Some(30), + cache_creation_token_count: Some(20), + }; + + let unified: bitfun_agent_stream::UnifiedTokenUsage = usage.clone().into(); + assert_eq!(unified.prompt_token_count, usage.prompt_token_count); + assert_eq!(unified.candidates_token_count, usage.candidates_token_count); + assert_eq!(unified.total_token_count, usage.total_token_count); + assert_eq!(unified.reasoning_token_count, usage.reasoning_token_count); + assert_eq!( + unified.cached_content_token_count, + usage.cached_content_token_count + ); + assert_eq!( + unified.cache_creation_token_count, + usage.cache_creation_token_count + ); + + let roundtrip: GeminiUsage = unified.into(); + assert_eq!(roundtrip.prompt_token_count, usage.prompt_token_count); + assert_eq!( + roundtrip.candidates_token_count, + usage.candidates_token_count + ); + assert_eq!(roundtrip.total_token_count, usage.total_token_count); + assert_eq!(roundtrip.reasoning_token_count, usage.reasoning_token_count); + assert_eq!( + roundtrip.cached_content_token_count, + usage.cached_content_token_count + ); + assert_eq!( + roundtrip.cache_creation_token_count, + usage.cache_creation_token_count + ); + } } diff --git a/src/crates/execution/agent-stream/tests/common/fixture_loader.rs b/src/crates/adapters/ai-adapters/tests/common/fixture_loader.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/common/fixture_loader.rs rename to src/crates/adapters/ai-adapters/tests/common/fixture_loader.rs diff --git a/src/crates/execution/agent-stream/tests/common/mod.rs b/src/crates/adapters/ai-adapters/tests/common/mod.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/common/mod.rs rename to src/crates/adapters/ai-adapters/tests/common/mod.rs diff --git a/src/crates/execution/agent-stream/tests/common/sse_fixture_server.rs b/src/crates/adapters/ai-adapters/tests/common/sse_fixture_server.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/common/sse_fixture_server.rs rename to src/crates/adapters/ai-adapters/tests/common/sse_fixture_server.rs diff --git a/src/crates/execution/agent-stream/tests/common/stream_test_harness.rs b/src/crates/adapters/ai-adapters/tests/common/stream_test_harness.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/common/stream_test_harness.rs rename to src/crates/adapters/ai-adapters/tests/common/stream_test_harness.rs diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/closed_after_message_delta.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/closed_after_message_delta.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/closed_after_message_delta.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/closed_after_message_delta.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/empty_thinking_signature_text_and_tool_use.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/empty_thinking_signature_text_and_tool_use.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/empty_thinking_signature_text_and_tool_use.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/empty_thinking_signature_text_and_tool_use.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/extended_thinking.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/extended_thinking.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/extended_thinking.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/extended_thinking.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/inline_think_text.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/inline_think_text.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/inline_think_text.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/inline_think_text.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/interleaved_parallel_tool_use.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/interleaved_parallel_tool_use.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/interleaved_parallel_tool_use.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/interleaved_parallel_tool_use.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/malformed_content_block_delta.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/malformed_content_block_delta.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/malformed_content_block_delta.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/malformed_content_block_delta.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/malformed_tool_arguments_extra_brace.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/malformed_tool_arguments_extra_brace.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/anthropic/malformed_tool_arguments_extra_brace.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/anthropic/malformed_tool_arguments_extra_brace.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/gemini/function_call_string_args.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/gemini/function_call_string_args.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/gemini/function_call_string_args.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/gemini/function_call_string_args.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/empty_reasoning_content_text_and_tool_call.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/empty_reasoning_content_text_and_tool_call.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/empty_reasoning_content_text_and_tool_call.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/empty_reasoning_content_text_and_tool_call.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/inline_think_text.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/inline_think_text.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/inline_think_text.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/inline_think_text.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/interleaved_parallel_tool_args_by_index.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/interleaved_parallel_tool_args_by_index.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/interleaved_parallel_tool_args_by_index.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/interleaved_parallel_tool_args_by_index.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/thinking_text_three_tools_with_empty_toolcall_anomaly.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/thinking_text_three_tools_with_empty_toolcall_anomaly.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/thinking_text_three_tools_with_empty_toolcall_anomaly.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/thinking_text_three_tools_with_empty_toolcall_anomaly.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_args_snapshot_stop_reason.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_args_snapshot_stop_reason.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_args_snapshot_stop_reason.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_args_snapshot_stop_reason.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_args_split_with_usage.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_args_split_with_usage.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_args_split_with_usage.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_args_split_with_usage.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_call_missing_type_field.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_call_missing_type_field.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_call_missing_type_field.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_call_missing_type_field.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_call_trailing_empty_args_finish_chunk.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_call_trailing_empty_args_finish_chunk.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_call_trailing_empty_args_finish_chunk.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_call_trailing_empty_args_finish_chunk.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_id_only_orphan_filtered.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_id_only_orphan_filtered.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_id_only_orphan_filtered.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_id_only_orphan_filtered.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_id_prelude_then_payload_without_id.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_id_prelude_then_payload_without_id.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/tool_id_prelude_then_payload_without_id.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/tool_id_prelude_then_payload_without_id.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/openai/two_tools_first_final_chunk_contains_orphan_id_only.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/two_tools_first_final_chunk_contains_orphan_id_only.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/openai/two_tools_first_final_chunk_contains_orphan_id_only.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/openai/two_tools_first_final_chunk_contains_orphan_id_only.sse diff --git a/src/crates/execution/agent-stream/tests/fixtures/stream/responses/malformed_function_call_arguments.sse b/src/crates/adapters/ai-adapters/tests/fixtures/stream/responses/malformed_function_call_arguments.sse similarity index 100% rename from src/crates/execution/agent-stream/tests/fixtures/stream/responses/malformed_function_call_arguments.sse rename to src/crates/adapters/ai-adapters/tests/fixtures/stream/responses/malformed_function_call_arguments.sse diff --git a/src/crates/execution/agent-stream/tests/stream_processor_anthropic.rs b/src/crates/adapters/ai-adapters/tests/stream_processor_anthropic.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/stream_processor_anthropic.rs rename to src/crates/adapters/ai-adapters/tests/stream_processor_anthropic.rs diff --git a/src/crates/execution/agent-stream/tests/stream_processor_openai.rs b/src/crates/adapters/ai-adapters/tests/stream_processor_openai.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/stream_processor_openai.rs rename to src/crates/adapters/ai-adapters/tests/stream_processor_openai.rs diff --git a/src/crates/execution/agent-stream/tests/stream_processor_tool_arguments.rs b/src/crates/adapters/ai-adapters/tests/stream_processor_tool_arguments.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/stream_processor_tool_arguments.rs rename to src/crates/adapters/ai-adapters/tests/stream_processor_tool_arguments.rs diff --git a/src/crates/execution/agent-stream/tests/stream_replay_regressions.rs b/src/crates/adapters/ai-adapters/tests/stream_replay_regressions.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/stream_replay_regressions.rs rename to src/crates/adapters/ai-adapters/tests/stream_replay_regressions.rs diff --git a/src/crates/execution/agent-stream/tests/stream_test_harness.rs b/src/crates/adapters/ai-adapters/tests/stream_test_harness.rs similarity index 100% rename from src/crates/execution/agent-stream/tests/stream_test_harness.rs rename to src/crates/adapters/ai-adapters/tests/stream_test_harness.rs diff --git a/src/crates/assembly/core/src/agentic/execution/stream_processor.rs b/src/crates/assembly/core/src/agentic/execution/stream_processor.rs index 01001efdb..1c3f7777a 100644 --- a/src/crates/assembly/core/src/agentic/execution/stream_processor.rs +++ b/src/crates/assembly/core/src/agentic/execution/stream_processor.rs @@ -39,7 +39,7 @@ impl From for StreamResult { thinking_signature: result.thinking_signature, full_text: result.full_text, tool_calls: result.tool_calls.into_iter().map(Into::into).collect(), - usage: result.usage, + usage: result.usage.map(Into::into), provider_metadata: result.provider_metadata, has_effective_output: result.has_effective_output, first_chunk_ms: result.first_chunk_ms, diff --git a/src/crates/execution/AGENTS-CN.md b/src/crates/execution/AGENTS-CN.md index 49f94e74d..8abeb062e 100644 --- a/src/crates/execution/AGENTS-CN.md +++ b/src/crates/execution/AGENTS-CN.md @@ -9,7 +9,7 @@ | Crate | 职责 | 本地文档 | |---|---|---| | `agent-runtime` | Agent registry、scheduler、prompt cache、hooks、goal 和 runtime control 契约 | [AGENTS.md](agent-runtime/AGENTS.md) | -| `agent-stream` | Provider stream 归一化和 replay 契约 | [AGENTS.md](agent-stream/AGENTS.md) | +| `agent-stream` | Provider-neutral stream DTO、tool-call 累积和 replay 契约 | [AGENTS.md](agent-stream/AGENTS.md) | | `tool-contracts` | Tool 契约、execution gate、input validation 和 result presentation 契约;Cargo package 仍为 `bitfun-agent-tools` | [AGENTS.md](tool-contracts/AGENTS.md) | | `harness` | Harness workflow 契约和 registry primitive | [AGENTS.md](harness/AGENTS.md) | | `runtime-services` | Typed runtime service assembly 和 service availability facts | [AGENTS.md](runtime-services/AGENTS.md) | @@ -18,7 +18,7 @@ ## 放置规则 -- 可移植 execution 编排、agent lifecycle 契约、tool 契约和 provider-neutral execution facts 放到这里。 +- 可移植 execution 编排、agent lifecycle 契约、tool 契约、provider-neutral stream 契约和 execution facts 放到这里。 - 具体 filesystem、git、terminal、MCP server、remote SSH、OS 行为应放到 `services`,除非只是纯底层 tool primitive。 - 协议 projection 与外部 provider 请求整形放到 `adapters`。 - 产品 feature 选择和 delivery-profile 决策放到 `assembly`,不要放入 execution primitive。 @@ -26,6 +26,6 @@ ## 依赖边界 -- Execution primitive crate 可以依赖 `contracts`,在 provider stream 归一化场景下可以窄依赖 provider-neutral DTO。 +- Execution primitive crate 可以依赖 `contracts`,也可以依赖本层拥有的窄 provider-neutral DTO。 - Execution primitive crate 不得依赖 `assembly/core`、`src/apps`、前端代码、Tauri API 或产品形态 lifecycle。 -- 新增对 `adapters` 或 `services` 的依赖时,必须在最近的模块文档或 PR 描述里说明边界原因。 +- 本层不得依赖 `adapters`。新增对 `services` 的依赖时,必须在最近的模块文档或 PR 描述里说明边界原因。 diff --git a/src/crates/execution/AGENTS.md b/src/crates/execution/AGENTS.md index dff1a5f41..bef05687e 100644 --- a/src/crates/execution/AGENTS.md +++ b/src/crates/execution/AGENTS.md @@ -13,7 +13,7 @@ delivery form. | Crate | Responsibility | Local doc | |---|---|---| | `agent-runtime` | Agent registry, scheduler, prompt cache, hooks, goals, and runtime control contracts | [AGENTS.md](agent-runtime/AGENTS.md) | -| `agent-stream` | Provider stream normalization and stream replay contracts | [AGENTS.md](agent-stream/AGENTS.md) | +| `agent-stream` | Provider-neutral stream DTOs, tool-call accumulation, and replay contracts | [AGENTS.md](agent-stream/AGENTS.md) | | `tool-contracts` | Tool contracts, execution gates, input validation, and result presentation contracts. Cargo package remains `bitfun-agent-tools`. | [AGENTS.md](tool-contracts/AGENTS.md) | | `harness` | Harness workflow contracts and registry primitives | [AGENTS.md](harness/AGENTS.md) | | `runtime-services` | Typed runtime service assembly and service availability facts | [AGENTS.md](runtime-services/AGENTS.md) | @@ -23,7 +23,7 @@ delivery form. ## Placement Rules - Put portable execution orchestration, agent lifecycle contracts, tool - contracts, and provider-neutral execution facts here. + contracts, provider-neutral stream contracts, and execution facts here. - Keep concrete filesystem, git, terminal, MCP server, remote SSH, and OS behavior in `services` unless the code is a pure low-level tool primitive. - Keep protocol projection and external provider request shaping in `adapters`. @@ -35,8 +35,9 @@ delivery form. ## Dependency Boundaries - Execution primitive crates may depend on `contracts` and narrowly scoped - provider-neutral DTOs when needed for stream normalization. + provider-neutral DTOs owned by this layer. - Execution primitive crates must not depend on `assembly/core`, `src/apps`, frontend code, Tauri APIs, or product-surface lifecycle. -- Any new dependency on `adapters` or `services` needs an explicit boundary - reason in the nearest module doc or PR description. +- Dependencies on `adapters` are not allowed from this layer. New dependencies + on `services` need an explicit boundary reason in the nearest module doc or + PR description. diff --git a/src/crates/execution/agent-stream/AGENTS.md b/src/crates/execution/agent-stream/AGENTS.md index b782b0401..18528d0a9 100644 --- a/src/crates/execution/agent-stream/AGENTS.md +++ b/src/crates/execution/agent-stream/AGENTS.md @@ -2,20 +2,22 @@ Scope: this guide applies to `src/crates/execution/agent-stream`. -`bitfun-agent-stream` owns provider stream normalization and replayable stream -processing contracts. It should preserve provider wire behavior while exposing a -portable stream surface to higher layers. +`bitfun-agent-stream` owns provider-neutral stream DTOs, tool-call accumulation, +and replayable stream processing contracts. Provider wire parsing belongs in +`src/crates/adapters/ai-adapters`, which converts provider chunks into these +portable stream contracts. ## Guardrails - Do not depend on `bitfun-core`, app crates, Tauri, concrete services, - transport adapters, terminal, tool-runtime, or product-domain implementations. -- Keep provider-specific parsing isolated to stream normalization. Do not add - session lifecycle, tool execution, prompt policy, or product orchestration - behavior here. -- Stream fixture changes must preserve ordering, tool-call reconstruction, + transport adapters, AI adapters, terminal, tool-runtime, or product-domain + implementations. +- Keep provider-specific SSE or response parsing in `bitfun-ai-adapters`; this + crate only owns provider-neutral stream assembly and replay behavior. +- Do not add session lifecycle, tool execution, prompt policy, or product + orchestration behavior here. +- Stream contract changes must preserve ordering, tool-call reconstruction, reasoning/thinking fields, usage accounting, and malformed-chunk handling. -- New provider quirks need fixture coverage rather than broad catch-all parsing. ## Verification @@ -24,4 +26,7 @@ cargo test -p bitfun-agent-stream node scripts/check-core-boundaries.mjs ``` +When provider fixture parsing changes, run the focused `bitfun-ai-adapters` +stream tests as well. + For documentation-only changes, run `git diff --check`. diff --git a/src/crates/execution/agent-stream/Cargo.toml b/src/crates/execution/agent-stream/Cargo.toml index 265fced28..a6e86d703 100644 --- a/src/crates/execution/agent-stream/Cargo.toml +++ b/src/crates/execution/agent-stream/Cargo.toml @@ -12,7 +12,6 @@ crate-type = ["rlib"] [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -bitfun-ai-adapters = { path = "../../adapters/ai-adapters" } bitfun-events = { path = "../../contracts/events" } futures = { workspace = true } log = { workspace = true } @@ -23,6 +22,4 @@ tokio-util = { workspace = true } uuid = { workspace = true } [dev-dependencies] -axum = { workspace = true } -reqwest = { workspace = true } tokio-stream = { workspace = true } diff --git a/src/crates/execution/agent-stream/src/lib.rs b/src/crates/execution/agent-stream/src/lib.rs index 47f0734cf..82e1cd2a0 100644 --- a/src/crates/execution/agent-stream/src/lib.rs +++ b/src/crates/execution/agent-stream/src/lib.rs @@ -2,10 +2,12 @@ //! //! Processes AI streaming responses, supports tool pre-detection and parameter streaming -use bitfun_ai_adapters::tool_call_accumulator::{ +pub mod tool_call_accumulator; +mod unified; + +use crate::tool_call_accumulator::{ FinalizedToolCall, PendingToolCalls, ToolCallBoundary, ToolCallStreamKey, }; -use bitfun_ai_adapters::{GeminiUsage, UnifiedResponse, UnifiedTokenUsage, UnifiedToolCall}; use bitfun_events::{AgenticEvent, AgenticEventPriority as EventPriority, ToolEventData}; use futures::{Stream, StreamExt}; use log::{debug, error, trace}; @@ -16,6 +18,7 @@ use std::fmt; use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc; +pub use unified::{UnifiedResponse, UnifiedTokenUsage, UnifiedToolCall}; /// Minimal tool-call value emitted by the stream processor. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -169,7 +172,7 @@ pub struct StreamResult { pub full_text: String, pub tool_calls: Vec, /// Token usage statistics (from model response) - pub usage: Option, + pub usage: Option, /// Provider-specific metadata captured from the stream tail. pub provider_metadata: Option, /// Whether this stream produced any user-visible output (text/thinking/tool events) @@ -218,7 +221,7 @@ struct StreamContext { thinking_signature: Option, full_text: String, tool_calls: Vec, - usage: Option, + usage: Option, provider_metadata: Option, // Current tool call state @@ -570,14 +573,7 @@ impl StreamProcessor { /// Handle usage statistics fn handle_usage(&self, ctx: &mut StreamContext, response_usage: &UnifiedTokenUsage) { - ctx.usage = Some(GeminiUsage { - prompt_token_count: response_usage.prompt_token_count, - candidates_token_count: response_usage.candidates_token_count, - total_token_count: response_usage.total_token_count, - reasoning_token_count: response_usage.reasoning_token_count, - cached_content_token_count: response_usage.cached_content_token_count, - cache_creation_token_count: response_usage.cache_creation_token_count, - }); + ctx.usage = Some(response_usage.clone()); debug!( "Received token usage stats: input={}, output={}, total={}", response_usage.prompt_token_count, @@ -996,7 +992,7 @@ impl StreamProcessor { #[cfg(test)] mod tests { use super::{StreamEventSink, StreamProcessOptions, StreamProcessor}; - use bitfun_ai_adapters::{UnifiedResponse, UnifiedTokenUsage, UnifiedToolCall}; + use super::{UnifiedResponse, UnifiedTokenUsage, UnifiedToolCall}; use bitfun_events::{AgenticEvent, AgenticEventPriority as EventPriority}; use futures::StreamExt; use serde_json::json; diff --git a/src/crates/execution/agent-stream/src/tool_call_accumulator.rs b/src/crates/execution/agent-stream/src/tool_call_accumulator.rs new file mode 100644 index 000000000..9175672f1 --- /dev/null +++ b/src/crates/execution/agent-stream/src/tool_call_accumulator.rs @@ -0,0 +1,1089 @@ +use log::{error, warn}; +use serde_json::{json, Value}; +use std::collections::BTreeMap; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ToolCallBoundary { + NewTool, + FinishReason, + StreamEnd, + GracefulShutdown, + EndOfAggregation, +} + +impl ToolCallBoundary { + fn as_str(self) -> &'static str { + match self { + Self::NewTool => "new_tool", + Self::FinishReason => "finish_reason", + Self::StreamEnd => "stream_end", + Self::GracefulShutdown => "graceful_shutdown", + Self::EndOfAggregation => "end_of_aggregation", + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ToolCallStreamKey { + Indexed(usize), + Unindexed, +} + +impl From> for ToolCallStreamKey { + fn from(value: Option) -> Self { + match value { + Some(index) => Self::Indexed(index), + None => Self::Unindexed, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct PendingToolCall { + tool_id: String, + tool_name: String, + raw_arguments: String, + early_detected_emitted: bool, +} + +#[derive(Debug, Clone)] +pub struct FinalizedToolCall { + pub tool_id: String, + pub tool_name: String, + pub raw_arguments: String, + pub arguments: Value, + pub is_error: bool, + /// True when the raw stream produced unparseable JSON (e.g. truncated by + /// `max_tokens`) and we successfully patched the trailing brackets/strings + /// to make it parse. The recovered call still executes, but downstream + /// consumers should warn the model that the content may be incomplete. + pub recovered_from_truncation: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EarlyDetectedToolCall { + pub tool_id: String, + pub tool_name: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ToolCallParamsChunk { + pub tool_id: String, + pub tool_name: String, + pub params_chunk: String, +} + +#[derive(Debug, Clone, Default)] +pub struct ToolCallDeltaOutcome { + pub finalized_previous: Option, + pub early_detected: Option, + pub params_partial: Option, +} + +#[derive(Debug, Clone, Default)] +pub struct PendingToolCalls { + pending: BTreeMap, +} + +/// Tools where executing a truncated tool call is **safe and meaningful** — +/// the model intended to write content and a partial file is strictly more +/// useful than a hard failure. For everything else (Bash, Edit, Task, ...) we +/// surface the truncation as an error: a partial shell command or a partial +/// `old_string`/`new_string` for Edit can change semantics destructively. +pub fn is_write_like_tool_name(tool_name: &str) -> bool { + matches!(tool_name, "Write" | "file_write" | "write_notebook") +} + +fn is_truncation_safe_to_recover(tool_name: &str) -> bool { + is_write_like_tool_name(tool_name) || matches!(tool_name, "AskUserQuestion" | "TodoWrite") +} + +/// Attempt to repair a JSON document that was truncated mid-stream (typically +/// because the model hit `max_tokens`). Closes any open string literal and any +/// unclosed `{`/`[` brackets in their correct nesting order. Returns `None` +/// when the truncation occurs at a position where we would have to invent a +/// missing value (e.g. trailing `,` or `:`) since blindly closing in those +/// states would silently corrupt the semantics. +fn repair_truncated_json(raw: &str) -> Option { + let mut in_string = false; + let mut escape = false; + let mut stack: Vec = Vec::new(); + let mut last_significant: Option = None; + + for &b in raw.as_bytes() { + if escape { + escape = false; + continue; + } + if in_string { + match b { + b'\\' => escape = true, + b'"' => { + in_string = false; + last_significant = Some(b'"'); + } + _ => {} + } + continue; + } + match b { + b'"' => { + in_string = true; + last_significant = Some(b'"'); + } + b'{' => { + stack.push(b'{'); + last_significant = Some(b'{'); + } + b'[' => { + stack.push(b'['); + last_significant = Some(b'['); + } + b'}' => { + if stack.pop() != Some(b'{') { + return None; + } + last_significant = Some(b'}'); + } + b']' => { + if stack.pop() != Some(b'[') { + return None; + } + last_significant = Some(b']'); + } + b' ' | b'\t' | b'\n' | b'\r' => {} + other => last_significant = Some(other), + } + } + + // Nothing to repair (parser failed for some other reason). + if !in_string && stack.is_empty() { + return None; + } + + // Refuse to fabricate values when truncated mid-pair. + if !in_string { + if let Some(b',') | Some(b':') = last_significant { + return None; + } + } + + let mut out = String::with_capacity(raw.len() + stack.len() + 1); + out.push_str(raw); + if in_string { + out.push('"'); + } + while let Some(c) = stack.pop() { + out.push(match c { + b'{' => '}', + b'[' => ']', + _ => unreachable!(), + }); + } + Some(out) +} + +impl PendingToolCall { + fn strip_argument_wrapping(raw_arguments: &str) -> &str { + let trimmed = raw_arguments.trim(); + let Some(stripped) = trimmed + .strip_prefix("```") + .and_then(|value| value.strip_suffix("```")) + else { + return trimmed.trim_matches('`').trim(); + }; + + let stripped = stripped.trim(); + if let Some((first_line, rest)) = stripped.split_once('\n') { + if first_line + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + { + return rest.trim(); + } + } + + stripped + } + + /// Best-effort repair for Git tool calls whose arguments came back as a raw + /// shell-style command (e.g. `git status`, `"git diff --staged"`). + fn parse_git_command_arguments(raw_arguments: &str) -> Option { + let trimmed = Self::strip_argument_wrapping(raw_arguments); + let command = trimmed + .strip_prefix("git ") + .map(str::trim) + .unwrap_or(trimmed); + let mut parts = command.splitn(2, char::is_whitespace); + let operation = parts.next()?.trim(); + if operation.is_empty() + || !operation + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + { + return None; + } + + let args = parts.next().map(str::trim).filter(|args| !args.is_empty()); + let mut value = json!({ "operation": operation }); + if let Some(args) = args { + value["args"] = json!(args); + } + Some(value) + } + + fn normalize_git_tool_arguments(arguments: Value) -> Value { + if let Value::String(raw) = &arguments { + if let Some(repaired) = Self::parse_git_command_arguments(raw) { + warn!("Git tool call arguments repaired from JSON string command"); + return repaired; + } + } + arguments + } + + fn parse_arguments(tool_name: &str, raw_arguments: &str) -> Result { + match serde_json::from_str::(raw_arguments) { + Ok(arguments) => { + if tool_name == "Git" { + Ok(Self::normalize_git_tool_arguments(arguments)) + } else { + Ok(arguments) + } + } + Err(primary_error) => { + if tool_name == "Git" { + if let Some(arguments) = Self::parse_git_command_arguments(raw_arguments) { + warn!("Git tool call arguments repaired from raw command"); + return Ok(arguments); + } + } + Err(primary_error.to_string()) + } + } + } + + pub fn has_pending(&self) -> bool { + !self.tool_id.is_empty() + } + + pub fn has_meaningful_payload(&self) -> bool { + !self.tool_name.is_empty() || !self.raw_arguments.is_empty() + } + + pub fn tool_id(&self) -> &str { + &self.tool_id + } + + pub fn tool_name(&self) -> &str { + &self.tool_name + } + + pub fn start_new(&mut self, tool_id: String, tool_name: Option) { + self.tool_id = tool_id; + self.tool_name = tool_name.unwrap_or_default(); + self.raw_arguments.clear(); + self.early_detected_emitted = false; + } + + pub fn update_tool_name_if_missing(&mut self, tool_name: Option) { + if self.tool_name.is_empty() { + self.tool_name = tool_name.unwrap_or_default(); + } + } + + pub fn append_arguments(&mut self, arguments_chunk: &str) { + self.raw_arguments.push_str(arguments_chunk); + } + + pub fn replace_arguments(&mut self, arguments_snapshot: &str) { + self.raw_arguments.clear(); + self.raw_arguments.push_str(arguments_snapshot); + } + + pub fn raw_arguments(&self) -> &str { + &self.raw_arguments + } + + pub fn finalize(&mut self, boundary: ToolCallBoundary) -> Option { + if !self.has_pending() { + return None; + } + + if !self.has_meaningful_payload() { + self.tool_id.clear(); + self.tool_name.clear(); + self.raw_arguments.clear(); + self.early_detected_emitted = false; + return None; + } + + let tool_id = std::mem::take(&mut self.tool_id); + let tool_name = std::mem::take(&mut self.tool_name); + let raw_arguments = std::mem::take(&mut self.raw_arguments); + self.early_detected_emitted = false; + let parsed_arguments = Self::parse_arguments(&tool_name, &raw_arguments); + + let (arguments, is_error, recovered_from_truncation) = match parsed_arguments { + Ok(value) => (value, false, false), + Err(parse_err) => { + let repaired = repair_truncated_json(&raw_arguments) + .and_then(|candidate| Self::parse_arguments(&tool_name, &candidate).ok()); + match repaired { + Some(value) if is_truncation_safe_to_recover(&tool_name) => { + warn!( + "Tool call arguments recovered from truncation at boundary={}: tool_id={}, tool_name={}, raw_len={}", + boundary.as_str(), + tool_id, + tool_name, + raw_arguments.len() + ); + (value, false, true) + } + Some(_) => { + // We *could* repair but the tool's semantics make + // executing a partial call unsafe (Bash, Edit, ...). + // Surface as an error so the user/model knows the + // truncation happened and can retry sensibly. + warn!( + "Tool call arguments truncated at boundary={}: tool_id={}, tool_name={} — refusing to execute partial call (tool not in safe-recovery list)", + boundary.as_str(), + tool_id, + tool_name + ); + (json!({}), true, true) + } + None => { + error!( + "Tool call arguments parsing failed at boundary={}: tool_id={}, tool_name={}, error={}, raw_arguments={}", + boundary.as_str(), + tool_id, + tool_name, + parse_err, + raw_arguments + ); + (json!({}), true, false) + } + } + } + }; + + Some(FinalizedToolCall { + tool_id, + tool_name, + raw_arguments, + arguments, + is_error, + recovered_from_truncation, + }) + } +} + +impl PendingToolCalls { + pub fn new() -> Self { + Self { + pending: BTreeMap::new(), + } + } + + pub fn apply_delta( + &mut self, + key: ToolCallStreamKey, + tool_id: Option, + tool_name: Option, + arguments: Option, + arguments_is_snapshot: bool, + ) -> ToolCallDeltaOutcome { + let mut outcome = ToolCallDeltaOutcome::default(); + + let has_tool_id = tool_id.as_ref().is_some_and(|tool_id| !tool_id.is_empty()); + if !self.pending.contains_key(&key) { + if has_tool_id { + self.pending.insert(key.clone(), PendingToolCall::default()); + } else { + return outcome; + } + } + + let Some(pending) = self.pending.get_mut(&key) else { + return outcome; + }; + + if let Some(tool_id) = tool_id.filter(|tool_id| !tool_id.is_empty()) { + let is_new_tool = pending.tool_id() != tool_id; + if is_new_tool { + outcome.finalized_previous = pending.finalize(ToolCallBoundary::NewTool); + pending.start_new(tool_id, tool_name.clone()); + } else { + pending.update_tool_name_if_missing(tool_name.clone()); + } + } else if tool_name + .as_ref() + .is_some_and(|tool_name| !tool_name.is_empty()) + { + pending.update_tool_name_if_missing(tool_name.clone()); + } + + if pending.has_pending() + && !pending.tool_name().is_empty() + && !pending.early_detected_emitted + { + pending.early_detected_emitted = true; + outcome.early_detected = Some(EarlyDetectedToolCall { + tool_id: pending.tool_id().to_string(), + tool_name: pending.tool_name().to_string(), + }); + } + + if let Some(arguments) = arguments.filter(|arguments| !arguments.is_empty()) { + if pending.has_pending() { + if arguments_is_snapshot { + pending.replace_arguments(&arguments); + } else { + pending.append_arguments(&arguments); + } + let tool_name = pending.tool_name().to_string(); + let params_chunk = arguments; + if !params_chunk.is_empty() { + outcome.params_partial = Some(ToolCallParamsChunk { + tool_id: pending.tool_id().to_string(), + tool_name, + params_chunk, + }); + } + } + } + + outcome + } + + pub fn finalize_key( + &mut self, + key: &ToolCallStreamKey, + boundary: ToolCallBoundary, + ) -> Option { + let mut pending = self.pending.remove(key)?; + pending.finalize(boundary) + } + + pub fn finalize_all(&mut self, boundary: ToolCallBoundary) -> Vec { + let keys: Vec<_> = self.pending.keys().cloned().collect(); + keys.into_iter() + .filter_map(|key| self.finalize_key(&key, boundary)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::{ + repair_truncated_json, EarlyDetectedToolCall, PendingToolCall, PendingToolCalls, + ToolCallBoundary, ToolCallParamsChunk, ToolCallStreamKey, + }; + use serde_json::json; + + #[test] + fn finalizes_complete_json_only_at_boundary() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("tool_a".to_string())); + pending.append_arguments("{\"a\":1}"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.tool_id, "call_1"); + assert_eq!(finalized.tool_name, "tool_a"); + assert_eq!(finalized.arguments, json!({"a": 1})); + assert!(!finalized.is_error); + assert!(!pending.has_pending()); + } + + #[test] + fn invalid_json_becomes_error_with_empty_object() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("tool_a".to_string())); + pending.append_arguments("{\"a\":"); + + let finalized = pending + .finalize(ToolCallBoundary::StreamEnd) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn repairs_git_raw_command_arguments() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Git".to_string())); + pending.append_arguments("git status"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.raw_arguments, "git status"); + assert_eq!(finalized.arguments, json!({"operation": "status"})); + assert!(!finalized.is_error); + } + + #[test] + fn repairs_git_json_string_command_arguments() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Git".to_string())); + pending.append_arguments("\"git diff --staged\""); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!( + finalized.arguments, + json!({"operation": "diff", "args": "--staged"}) + ); + assert!(!finalized.is_error); + } + + #[test] + fn git_args_only_object_is_left_for_tool_schema_diagnostic() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Git".to_string())); + pending.append_arguments("{\"args\": \"--since=\\\"2026-05-02\\\" --oneline\"}"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!( + finalized.arguments, + json!({"args": "--since=\"2026-05-02\" --oneline"}) + ); + assert!(!finalized.is_error); + } + + #[test] + fn git_duplicate_subcommand_in_args_is_left_for_tool_schema_diagnostic() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Git".to_string())); + pending.append_arguments("{\"args\": \"log --oneline -10\"}"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({"args": "log --oneline -10"})); + assert!(!finalized.is_error); + } + + #[test] + fn does_not_infer_git_operation_from_ambiguous_args_only_object() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Git".to_string())); + pending.append_arguments("{\"args\": \"--stat\"}"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({"args": "--stat"})); + assert!(!finalized.is_error); + } + + #[test] + fn raw_string_arguments_for_single_field_tools_stay_invalid_json() { + let cases = [ + ("Bash", "pnpm test"), + ("Skill", "openai-docs"), + ("Read", "src/main.rs"), + ("GetFileDiff", "src/lib.rs"), + ("LS", "src/crates"), + ("Delete", "tmp/output.log"), + ("Glob", "**/*.rs"), + ("Grep", "Arguments are invalid JSON"), + ("WebSearch", "OpenAI Agents SDK"), + ("WebFetch", "https://example.com"), + ("InitMiniApp", "Markdown Viewer"), + ]; + + for (tool_name, raw_arguments) in cases { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some(tool_name.to_string())); + pending.append_arguments(raw_arguments); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({}), "tool={tool_name}"); + assert!(finalized.is_error, "tool={tool_name}"); + } + } + + #[test] + fn incomplete_json_object_for_single_field_tools_stays_invalid() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments( + "{\"command\": \"git log --since=\\\"2026-05-02\\\" --oneline --stat", + ); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn does_not_wrap_incomplete_json_object_as_raw_string_argument() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments("{\"command\": "); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn does_not_repair_incomplete_json_object_for_multifield_tools() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Task".to_string())); + pending.append_arguments( + "{\"description\":\"Explore BitFun project structure\",\"prompt\":\"read README\\n\\nthoroughness: very", + ); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn does_not_repair_object_without_key_value_payload() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments("{"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn does_not_execute_truncated_incomplete_json_object() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments("{\"command\": \"git log --since=\\\"2026-05-02\\\" --on"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn json_string_arguments_for_single_field_tools_are_schema_errors_not_rewritten() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments("\"git status\""); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!("git status")); + assert!(!finalized.is_error); + } + + #[test] + fn fenced_raw_arguments_for_single_field_tools_stay_invalid_json() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments("```bash\npnpm run lint:web\n```"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn does_not_repair_raw_string_arguments_for_multifield_tools() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Edit".to_string())); + pending.append_arguments("src/main.rs"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn json_with_one_extra_trailing_right_brace_stays_invalid() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("tool_a".to_string())); + pending.append_arguments("{\"a\":1}}"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.raw_arguments, "{\"a\":1}}"); + assert_eq!(finalized.arguments, json!({})); + assert!(finalized.is_error); + } + + #[test] + fn finalized_arguments_preserve_object_fields() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("tool_a".to_string())); + pending.append_arguments("{\"a\":1,\"b\":\"x\"}"); + + let finalized = pending + .finalize(ToolCallBoundary::EndOfAggregation) + .expect("finalized tool"); + + assert_eq!(finalized.arguments["a"], json!(1)); + assert_eq!(finalized.arguments["b"], json!("x")); + } + + #[test] + fn replace_arguments_overwrites_partial_buffer() { + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("tool_a".to_string())); + pending.append_arguments("{\"city\":\"Bei"); + pending.replace_arguments("{\"city\":\"Beijing\"}"); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert_eq!(finalized.arguments, json!({"city": "Beijing"})); + assert!(!finalized.is_error); + } + + #[test] + fn manages_multiple_pending_tool_calls_by_index() { + let mut pending = PendingToolCalls::default(); + + assert_eq!( + pending + .apply_delta( + ToolCallStreamKey::Indexed(0), + Some("call_1".to_string()), + Some("tool_a".to_string()), + None, + false, + ) + .early_detected, + Some(EarlyDetectedToolCall { + tool_id: "call_1".to_string(), + tool_name: "tool_a".to_string(), + }) + ); + assert_eq!( + pending + .apply_delta( + ToolCallStreamKey::Indexed(1), + Some("call_2".to_string()), + Some("tool_b".to_string()), + None, + false, + ) + .early_detected, + Some(EarlyDetectedToolCall { + tool_id: "call_2".to_string(), + tool_name: "tool_b".to_string(), + }) + ); + + pending.apply_delta( + ToolCallStreamKey::Indexed(0), + None, + None, + Some("{\"a\":1}".to_string()), + false, + ); + pending.apply_delta( + ToolCallStreamKey::Indexed(1), + None, + None, + Some("{\"b\":2}".to_string()), + false, + ); + + let finalized = pending.finalize_all(ToolCallBoundary::FinishReason); + assert_eq!(finalized.len(), 2); + assert_eq!(finalized[0].tool_id, "call_1"); + assert_eq!(finalized[0].arguments, json!({"a": 1})); + assert_eq!(finalized[1].tool_id, "call_2"); + assert_eq!(finalized[1].arguments, json!({"b": 2})); + } + + #[test] + fn id_only_prelude_is_attached_to_following_payload_without_id() { + let mut pending = PendingToolCalls::default(); + + let prelude = pending.apply_delta( + ToolCallStreamKey::Indexed(0), + Some("call_1".to_string()), + None, + None, + false, + ); + assert_eq!(prelude.early_detected, None); + assert_eq!(prelude.params_partial, None); + + let payload = pending.apply_delta( + ToolCallStreamKey::Indexed(0), + None, + Some("tool_a".to_string()), + Some("{\"a\":1}".to_string()), + false, + ); + assert_eq!( + payload.early_detected, + Some(EarlyDetectedToolCall { + tool_id: "call_1".to_string(), + tool_name: "tool_a".to_string(), + }) + ); + assert_eq!( + payload.params_partial, + Some(ToolCallParamsChunk { + tool_id: "call_1".to_string(), + tool_name: "tool_a".to_string(), + params_chunk: "{\"a\":1}".to_string(), + }) + ); + } + + #[test] + fn id_only_orphan_is_dropped_on_finalize() { + let mut pending = PendingToolCalls::default(); + + let outcome = pending.apply_delta( + ToolCallStreamKey::Indexed(1), + Some("call_orphan".to_string()), + None, + None, + false, + ); + assert!(outcome.finalized_previous.is_none()); + assert!(outcome.early_detected.is_none()); + assert!(outcome.params_partial.is_none()); + assert!(pending + .finalize_all(ToolCallBoundary::FinishReason) + .is_empty()); + } + + #[test] + fn empty_argument_delta_is_ignored() { + let mut pending = PendingToolCalls::default(); + + let header = pending.apply_delta( + ToolCallStreamKey::Indexed(0), + Some("call_1".to_string()), + Some("tool_a".to_string()), + Some(String::new()), + false, + ); + assert_eq!( + header.early_detected, + Some(EarlyDetectedToolCall { + tool_id: "call_1".to_string(), + tool_name: "tool_a".to_string(), + }) + ); + assert!(header.params_partial.is_none()); + + let empty_delta = pending.apply_delta( + ToolCallStreamKey::Indexed(0), + None, + None, + Some(String::new()), + false, + ); + assert!(empty_delta.finalized_previous.is_none()); + assert!(empty_delta.early_detected.is_none()); + assert!(empty_delta.params_partial.is_none()); + } + + // ------------------------------------------------------------------ + // Truncation recovery tests + // ------------------------------------------------------------------ + + #[test] + fn write_truncated_mid_content_string_is_recovered() { + // Reproduces the deep-research dump: the model hit max_tokens while + // streaming `content`, so the JSON ends inside the string literal + // with no closing `"` and no closing `}`. + let raw = "{\"file_path\": \"/tmp/report.md\", \"content\": \"# Report\\n\\nA long body that was cut"; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Write".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error, "Write recovery should succeed"); + assert!(finalized.recovered_from_truncation); + assert_eq!( + finalized.arguments, + json!({ + "file_path": "/tmp/report.md", + "content": "# Report\n\nA long body that was cut" + }) + ); + } + + #[test] + fn write_truncated_with_chinese_multibyte_is_recovered() { + let raw = "{\"file_path\": \"/tmp/r.md\", \"content\": \"深度研究报告:未完"; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Write".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error); + assert!(finalized.recovered_from_truncation); + assert_eq!( + finalized.arguments["content"].as_str(), + Some("深度研究报告:未完") + ); + } + + #[test] + fn bash_truncated_mid_command_still_errors_but_records_truncation() { + let raw = r#"{"command": "git log --since=\"2026-05-02\" --on"#; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + // We never execute a partial shell command. + assert!(finalized.is_error); + assert_eq!(finalized.arguments, json!({})); + // But the truncation is recorded so the surface error message and + // diagnostic dump can distinguish "truncated" from "model emitted + // bad JSON". + assert!(finalized.recovered_from_truncation); + } + + #[test] + fn repair_refuses_truncation_after_colon() { + // We can't invent the missing value, so this must not auto-repair. + assert!(repair_truncated_json(r#"{"a": 1, "b":"#).is_none()); + } + + #[test] + fn repair_refuses_truncation_after_comma() { + assert!(repair_truncated_json(r#"{"a": 1,"#).is_none()); + } + + #[test] + fn repair_returns_none_for_already_valid_json() { + // Already balanced — repair has nothing to do (parser would have + // succeeded anyway). + assert!(repair_truncated_json(r#"{"a": 1}"#).is_none()); + } + + #[test] + fn repair_closes_nested_brackets_in_correct_order() { + let raw = r#"{"a": [1, 2, {"b": "incomplete"#; + let repaired = repair_truncated_json(raw).expect("repaired"); + let parsed: serde_json::Value = + serde_json::from_str(&repaired).expect("repaired is valid JSON"); + assert_eq!(parsed, json!({"a": [1, 2, {"b": "incomplete"}]})); + } + + #[test] + fn repair_preserves_escaped_quote_inside_truncated_string() { + let raw = r#"{"content": "she said \"hello\" and then"#; + let repaired = repair_truncated_json(raw).expect("repaired"); + let parsed: serde_json::Value = serde_json::from_str(&repaired).expect("valid JSON"); + assert_eq!( + parsed["content"].as_str(), + Some("she said \"hello\" and then") + ); + } + + #[test] + fn ask_user_question_truncated_mid_chinese_string_is_recovered() { + let raw = r#"{"questions": [{"header": "重试场景", "multiSelect": true, "options": [{"description": "当消息发送后后端返回失败(消息气泡显示为红色失败状态,有 model rounds 但 status='error'),在失败气泡旁增加重试按钮,点击后重新发送该消息", "label": "失败消息气泡上加重试按钮"}]}]}"#; + // Truncate mid-Chinese-string, after a colon that opened the value + let truncated = &raw[..raw.find("消息气泡显示为红色失败状态").unwrap() + + "消息气泡显示为红色失败状态".len()]; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("AskUserQuestion".to_string())); + pending.append_arguments(truncated); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error); + assert!(finalized.recovered_from_truncation); + } + + #[test] + fn ask_user_question_truncated_mid_options_is_recovered() { + // Truncation right after a completed description value's closing quote + comma + let raw = r#"{"questions": [{"header": "场景", "multiSelect": true, "options": [{"description": "第一条描述", "label": "选项一"}, {"description": "第二条描"#; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("AskUserQuestion".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error); + assert!(finalized.recovered_from_truncation); + let questions = finalized.arguments["questions"].as_array().unwrap(); + assert_eq!(questions.len(), 1); + assert_eq!(questions[0]["options"].as_array().unwrap().len(), 2); + } + + #[test] + fn todo_write_truncated_mid_content_is_recovered() { + let raw = r#"{"todos": [{"id": "1", "content": "完成重构并优化性能", "status": "in_progress"}, {"id": "2", "content": "编写单元测"#; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("TodoWrite".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error); + assert!(finalized.recovered_from_truncation); + let todos = finalized.arguments["todos"].as_array().unwrap(); + assert_eq!(todos.len(), 2); + } +} diff --git a/src/crates/execution/agent-stream/src/unified.rs b/src/crates/execution/agent-stream/src/unified.rs new file mode 100644 index 000000000..fb5948eea --- /dev/null +++ b/src/crates/execution/agent-stream/src/unified.rs @@ -0,0 +1,81 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::borrow::Cow; +use std::fmt; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UnifiedToolCall { + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_call_index: Option, + pub id: Option, + pub name: Option, + pub arguments: Option, + #[serde(default)] + pub arguments_is_snapshot: bool, +} + +/// Unified AI response format +#[derive(Clone, Serialize, Deserialize, Default)] +pub struct UnifiedResponse { + pub text: Option, + pub reasoning_content: Option, + /// Signature for Anthropic extended thinking (returned in multi-turn conversations) + #[serde(skip_serializing_if = "Option::is_none")] + pub thinking_signature: Option, + pub tool_call: Option, + pub usage: Option, + pub finish_reason: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub provider_metadata: Option, +} + +impl fmt::Debug for UnifiedResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let reasoning_summary = self.reasoning_content.as_ref().map(|s| { + if s.len() > 100 { + let end = s + .char_indices() + .take_while(|(i, _)| *i < 100) + .last() + .map(|(i, c)| i + c.len_utf8()) + .unwrap_or(0); + // Guard against multi-byte chars pushing end past the string length + let end = end.min(s.len()); + Cow::Owned(format!("{}... ({} bytes)", &s[..end], s.len())) + } else { + Cow::Borrowed(s.as_str()) + } + }); + f.debug_struct("UnifiedResponse") + .field("text", &self.text) + .field("reasoning_content", &reasoning_summary) + .field("thinking_signature", &"") + .field("tool_call", &self.tool_call) + .field("usage", &self.usage) + .field("finish_reason", &self.finish_reason) + .field("provider_metadata", &"") + .finish() + } +} + +/// Unified token usage statistics +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UnifiedTokenUsage { + pub prompt_token_count: u32, + pub candidates_token_count: u32, + pub total_token_count: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub reasoning_token_count: Option, + /// Cache READ tokens (i.e., served from cache this call). Universal across + /// providers: OpenAI `cached_tokens`, DeepSeek `prompt_cache_hit_tokens`, + /// Anthropic `cache_read_input_tokens`, Gemini `cachedContentTokenCount`. + /// Hit rate consumers must use this as numerator and `prompt_token_count` + /// as denominator. + #[serde(skip_serializing_if = "Option::is_none")] + pub cached_content_token_count: Option, + /// Cache WRITE tokens (only Anthropic reports this per-token; others either + /// have no creation concept or bill creation by storage time). Disjoint from + /// `cached_content_token_count`. Do NOT include in hit-rate numerator. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub cache_creation_token_count: Option, +}