From 4195ffc1dd0eb2269a275407c593a442fdd96326 Mon Sep 17 00:00:00 2001 From: CloudSigma Date: Wed, 3 Jun 2026 18:09:05 +0000 Subject: [PATCH 1/2] Remove requester bridge from TaaS affinity plugin --- README.md | 158 ++++++---- index.ts | 486 +++++-------------------------- test/requester-bridge.test.ts | 518 --------------------------------- test/requester-runtime.test.ts | 122 ++++++++ test/smoke.mjs | 7 +- 5 files changed, 293 insertions(+), 998 deletions(-) delete mode 100644 test/requester-bridge.test.ts create mode 100644 test/requester-runtime.test.ts diff --git a/README.md b/README.md index 54b4bc9..1ede020 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ # openclaw-token-cache-optimizer -An [OpenClaw](https://openclaw.ai) plugin that maximises prompt-cache hit rates when using [CloudSigma TaaS](https://www.cloudsigma.com) as your LLM provider. +An [OpenClaw](https://openclaw.ai) provider plugin that maximises prompt-cache hit rates when using [CloudSigma TaaS](https://www.cloudsigma.com) as your LLM provider. It injects a stable, per-conversation session ID into every outbound request so TaaS can pin your conversation to the same upstream slot (OAuth token, Bedrock region, or Claude Code node) from the very first turn — giving you consistent prompt-cache reuse instead of cold starts on every message. +The plugin is intentionally narrow: requester-side tool execution is handled by Claude Code, TaaS, and the OpenClaw gateway Direction-2 path. This plugin does **not** lease requester bridges, poll for bridge work, invoke local tools, or alter OpenAI tool payloads. + --- ## The problem it solves @@ -11,7 +13,7 @@ It injects a stable, per-conversation session ID into every outbound request so TaaS routes LLM requests across a pool of upstream slots. Without a session signal, it uses heuristics to guess which requests belong to the same conversation: | Method | Confidence | Works when | -|---|---|---| +|---|---:|---| | Tool-use ID chain | 1.0 | Tool-result follow-up turns only | | Structural inference | 0.85 | Mid-conversation, after a few turns | | New session fallback | 0.30 | First turn — no prior context | @@ -24,7 +26,7 @@ This plugin passes a stable `session_id` derived from your OpenClaw workspace so ## How it works -OpenClaw's `wrapStreamFn` hook intercepts the outbound request payload before it is sent to TaaS. The plugin adds session affinity fields plus a sanitized requester runtime envelope: +OpenClaw's `wrapStreamFn` hook intercepts the outbound request payload before it is sent to TaaS. The plugin adds session affinity fields plus a small sanitized requester runtime envelope: ```json { @@ -32,41 +34,59 @@ OpenClaw's `wrapStreamFn` hook intercepts the outbound request payload before it "session_id": "oc:edebc39a82a8a041", "sticky_key": "oc:edebc39a82a8a041", "requester_runtime": { - "schema_version": "2026-05-18", + "schema_version": "2026-06-03", "source": "openclaw-token-cache-optimizer", - "capture_mode": "advisory_only", "session_key": "oc:edebc39a82a8a041", + "openclaw_session_id": "oc:edebc39a82a8a041", + "requester_host_id": "host:1a2b3c4d5e6f7890", "repo_name": "my-repo", - "available_bridges": [], - "required_execution_mode": "advisory_only" + "git_branch_hint": "dev", + "git_dirty_hint": false, + "provider": "cloudsigma", + "model_id": "cloudsigma/auto", + "session_source_hint": "source:4ae2870a2e73027c", + "tool_execution": "direction_2_gateway", + "metadata_classification": { + "identifiers": "hashed", + "repository": "name_branch_dirty_only", + "local_paths": "omitted_by_default" + }, + "redaction_policy": "no_secrets;no_raw_local_paths;no_env_values;no_git_remotes;no_status_or_diffs;no_extra_params" } } } ``` -- `session_id` — read by TaaS's OpenAI and Codex affinity paths -- `sticky_key` — additionally read by the Anthropic substrate routing layer -- `requester_runtime` — advisory requester-side runtime hints for downstream Claude Code routing/guardrails - -All metadata fields are no-overwrite: if the caller already supplied them, the plugin leaves them intact. Additionally, the plugin injects an `X-Session-Id` request header via the `resolveTransportTurnState` hook, for transport layers that support per-turn native headers. +- `session_id` — read by TaaS's OpenAI and Codex affinity paths. +- `sticky_key` — additionally read by the Anthropic substrate routing layer. +- `requester_runtime` — safe advisory hints for downstream routing and diagnostics. +- `X-Session-Id` — injected by `resolveTransportTurnState` for transports that support per-turn native headers. -### Requester runtime capture and bridge leasing +All metadata fields are no-overwrite: if the caller already supplied `metadata.session_id`, `metadata.sticky_key`, or `metadata.requester_runtime`, the plugin leaves them intact. -The runtime envelope is intentionally small and sanitized. It can include `workspace_dir` / `agent_dir` when OpenClaw exposes them to the provider hook, bounded repo hints derived from the workspace (`repo_root_hint`, `repo_name`, `git_branch_hint`, `git_dirty_hint`), hashed host/source identifiers, and provider/model hints. +### Requester runtime metadata -It never includes raw environment variables, tokens, git remotes, full status output, diffs, or arbitrary provider `extraParams`. Git probes are bounded with a short timeout. +The runtime envelope is intentionally small and sanitized. By default it contains: -For CloudSigma TaaS provider requests, bridge leasing is enabled by default. The plugin requests a short-lived requester bridge lease and forwards only the returned opaque descriptor in `requester_runtime.available_bridges`. If the lease service is unavailable, requests automatically fall back to advisory-only metadata with an empty bridge list. +- required affinity/session fields: `session_key`, `openclaw_session_id` +- hashed identifiers: `requester_host_id`, `session_source_hint` +- bounded repo hints when available: `repo_name`, `git_branch_hint`, `git_dirty_hint` +- provider/model hints when available: `provider`, `model_id` +- explicit execution-mode marker: `tool_execution: "direction_2_gateway"` +- metadata classification and redaction policy -The bridge is multi-tenant by design: proxy nodes receive only opaque descriptors and call TaaS; requester-owned tools execute in the requester runtime through this plugin's poller. Non-scaffold `requester.tool.invoke` operations are relayed to the requester-local OpenClaw Gateway `/tools/invoke` endpoint, not to any proxy-node backend. Use `TAAS_REQUESTER_LOCAL_GATEWAY_URL` / `TAAS_REQUESTER_LOCAL_GATEWAY_TOKEN` only on the requester host if its gateway is not reachable via the default local settings. The plugin falls back to `OPENCLAW_GATEWAY_URL`, `OPENCLAW_GATEWAY_TOKEN` / `OPENCLAW_GATEWAY_PASSWORD`, then `http://127.0.0.1:18789`. +It does **not** include raw local paths (`workspace_dir`, `agent_dir`, `repo_root_hint`) by default. It also never includes environment variables, tokens, git remotes, full status output, diffs, or arbitrary provider `extraParams`. Git probes are bounded with a short timeout. -Default lease URL: +### Tool execution model: Direction-2 -```text -https://taas.cloudsigma.com/internal/requester-bridges/leases -``` +Requester-side tools are handled outside this plugin by Claude Code / TaaS / OpenClaw gateway Direction-2. Consequently this plugin: -If the TaaS provider config exposes a custom `baseUrl`, the plugin derives the lease URL from that base. Set `TAAS_REQUESTER_BRIDGE_LEASE_URL` only for staging or custom deployments. +- does not call `/internal/requester-bridges/leases` +- does not inject `requester_runtime.available_bridges` +- does not set `capture_mode: "bridge_capable"` +- does not poll `/internal/requester-bridges/poll` or post `/internal/requester-bridges/results` +- does not invoke requester-local `/tools/invoke` +- does not intercept OpenAI `tools`, `tool_calls`, or `role: "tool"` messages ### Session ID derivation @@ -87,29 +107,30 @@ The ID is a SHA-256 hash of the session source, truncated to 16 hex chars and pr | **Resets on `/new`** | New conversation = new workspace = new ID | | **Namespaced** | `oc:` prefix avoids collision with Claude Code and other TaaS clients | -Example IDs: -``` -oc:edebc39a82a8a041 ← main agent session -oc:4ae2870a2e73027c ← subagent spawned from the above -oc:a1b2c3d4e5f60718 ← same agent, next conversation +### Autorouter capture + +The wrapper captures TaaS `X-TaaS-*` response headers for autorouted requests and exposes the most recent route via the gateway RPC: + +```text +taas.autorouter.lastRoute ``` -### Sub-agent behaviour +Callers can query by `workspaceDir`, direct `sessionId`, or `agentId`. Captured values include the autorouted model, algorithm/mode, algorithm source, thinking level applied, and routed context window. -OpenClaw sub-agents run in isolated processes and may not receive a `workspaceDir` in their `wrapStreamFn` context. Without this plugin, all sub-agent requests arrive at TaaS with `session_id = 'none'`, breaking affinity. +### Sub-agent behaviour -The tier fallback system ensures sub-agents always get a deterministic session ID: +OpenClaw sub-agents run in isolated processes and may not receive a `workspaceDir` in their `wrapStreamFn` context. The tier fallback system ensures sub-agents always get a deterministic session ID: -1. **If the sub-agent has a workspace** (Tier 1) — derives a unique ID from it. Sub-agents get their own ID, separate from the parent. -2. **If the parent agent workspace is visible** via globalThis (Tier 2) — reuses the parent's ID. This is correct behaviour: sub-agents from the same parent conversation share upstream slot affinity. +1. **If the sub-agent has a workspace** (Tier 1) — derives a unique ID from it. +2. **If the parent agent workspace is visible** via globalThis (Tier 2) — reuses the parent's ID. 3. **If OpenClaw injects env vars** (Tiers 3–4) — uses those for a stable per-agent ID. -4. **Last resort** (Tier 5) — falls back to the state dir hash. All sub-agents on the same install share this ID, which still beats `session_id = 'none'` for cache-hit purposes. +4. **Last resort** (Tier 5) — falls back to the state dir hash. -#### Enabling debug logging +#### Debug logging Set `OPENCLAW_DEBUG=1` (or `NODE_ENV=development`) to emit the session ID source on each request: -``` +```text [taas-affinity] wrapStreamFn sessionId=oc:edebc39a82a8a041 source=workspaceDir:/home/user/.openclaw/... [taas-affinity] resolveTransportTurnState sessionId=oc:edebc39a82a8a041 source=workspaceDir:... turnId=abc attempt=1 ``` @@ -130,28 +151,30 @@ Older OpenClaw builds may fail to load the plugin or may load it without applyin ## Installation -### Option 1 - npm install (recommended) +### Option 1 - npm install ```bash openclaw plugins install openclaw-token-cache-optimizer openclaw gateway restart ``` -The published npm package ships pre-built JavaScript in `dist/` and works on OpenClaw `2026.4.27` and later (see [Compatibility](#compatibility)). +The published npm package ships pre-built JavaScript in `dist/` and works on OpenClaw `2026.4.27` and later. -### Option 2 - Manual install from source +### Option 2 - manual install from source ```bash git clone https://github.com/cloudsigma/openclaw-token-cache-optimizer \ ~/.openclaw/extensions/openclaw-token-cache-optimizer cd ~/.openclaw/extensions/openclaw-token-cache-optimizer -npm install # runs the `prepare` script which builds dist/ +git checkout dev +npm ci +npm run build openclaw gateway restart ``` -`npm install` triggers the `prepare` lifecycle script which compiles TypeScript to `dist/index.js`. OpenClaw `2026.5.x` and later require this compiled output for installed plugins; older gateways will still load it directly. +`npm ci` runs the `prepare` lifecycle script, which compiles TypeScript to `dist/index.js`. Re-run `npm run build` after pulling new source changes. -That's it. No `openclaw.json` changes are required - the plugin auto-activates for all requests to the `cloudsigma` provider. +No `openclaw.json` changes are required - the plugin auto-activates for all requests to the `cloudsigma` and `cloudsigma-staging` providers. ### Verify it loaded @@ -160,7 +183,7 @@ openclaw gateway status openclaw plugins info openclaw-taas-affinity ``` -You should see `Status: loaded` and the source pointing at `dist/index.js` (or `/index.ts` on legacy gateways). +You should see `Status: loaded` and the source pointing at `dist/index.js`. --- @@ -169,10 +192,11 @@ You should see `Status: loaded` and the source pointing at `dist/index.js` (or ` | OpenClaw gateway | Status | Notes | |---|---|---| | >= 2026.5.x | Supported | Loads compiled `dist/index.js` | -| 2026.4.27 - 2026.4.x | Supported | Loads compiled `dist/index.js`; bridge polling endpoints may not exist on the gateway-side TaaS API yet - plugin falls back to advisory-only metadata | -| < 2026.4.27 | Not supported | Hooks the plugin relies on (`wrapStreamFn`, `hookAliases`, `resolveTransportTurnState`) are not exposed | +| 2026.4.27 - 2026.4.x | Supported | Loads compiled `dist/index.js`; transport/header support depends on gateway hook availability | +| < 2026.4.27 | Not supported | Hooks the plugin relies on are not exposed | + +--- -The plugin is **forward and backward compatible** within this range from a single build artefact: bridge leasing, polling, and result submission all degrade gracefully when the corresponding TaaS endpoints are missing. ## Verification ### Local validation @@ -180,22 +204,30 @@ The plugin is **forward and backward compatible** within this range from a singl For repository/CI validation, install dev dependencies and run the test suite: ```bash -npm install +npm ci npm test +npm run build ``` This runs: - `npm run typecheck` — validates the TypeScript source against the OpenClaw plugin SDK and Node typings -- `npm run smoke` — imports the plugin, registers the provider hook, and verifies payload/header injection +- `npm run smoke` — imports the plugin, registers the provider hook, and verifies payload/header injection plus autorouter capture +- `npm run unit` — validates sweeper/status/auto-abort behaviour and Direction-2 regressions -If validation fails because `openclaw/plugin-sdk/core` cannot be resolved, the local OpenClaw SDK dependency is missing or too old. Install/upgrade OpenClaw before deploying. +Direction-2 regression coverage includes: + +- requester bridge lease endpoint is not called +- `available_bridges` and bridge capture metadata are not injected +- OpenAI `tools` pass through untouched +- assistant `tool_calls` and `role: "tool"` messages are not intercepted +- existing metadata fields are not overwritten ### TaaS logs After installing, the first turn of every new conversation should show: -``` +```text match_reason: "external_id_new" ← first turn (new session in Redis) match_reason: "external_id" ← subsequent turns (known session) ``` @@ -217,28 +249,32 @@ Replace the ID with your actual session ID. A non-null response confirms TaaS ha | Session type | ID scope | |---|---| | Main agent | Own stable ID for the conversation lifetime | -| Spawned subagent | Own ID (separate `workspaceDir`) | -| Cron / isolated run | Own ID (isolated workspace per run) | +| Spawned subagent | Own ID when a separate `workspaceDir` is present; otherwise tier fallback | +| Cron / isolated run | Own ID when an isolated workspace/env source exists | | New conversation (`/new`, `/reset`) | New workspace → new ID | -| Parallel conversations | Each gets a separate ID | +| Parallel conversations | Each gets a separate ID when OpenClaw supplies separate workspaces/env sources | --- ## Configuration -None required for standard CloudSigma TaaS use. The plugin works out of the box with zero configuration: - -- Bridge leasing is enabled by default for `cloudsigma` and `cloudsigma-staging` provider requests. -- The standard lease endpoint is `https://taas.cloudsigma.com/internal/requester-bridges/leases`. -- If the provider supplies a custom `baseUrl`, the lease URL is derived from that base instead. +None required for standard CloudSigma TaaS use. -Optional environment variables: +Supported environment variables: | Variable | Default | Purpose | |---|---|---| -| `TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED` | enabled | Set to `0`, `false`, `no`, or `off` to disable bridge leasing explicitly. | -| `TAAS_REQUESTER_BRIDGE_LEASE_URL` | derived from provider base URL, then `https://taas.cloudsigma.com/internal/requester-bridges/leases` | Override only for staging/custom deployments. | -| `TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS` | `1000` | Poll interval for bridge operations. | +| `OPENCLAW_DEBUG` | unset | Emit debug logs for session source and autorouter capture | +| `OPENCLAW_SESSION_ID` | unset | Optional session source fallback | +| `OPENCLAW_AGENT_ID` / `OPENCLAW_RUN_ID` | unset | Optional per-agent session source fallback | +| `OPENCLAW_STATE_DIR` | `~/.openclaw` | Last-resort stable source for fallback session ID | +| `TAAS_AFFINITY_SWEEP_INTERVAL_MS` | `3600000` | Background trash sweeper interval | +| `TAAS_AFFINITY_SWEEP_STALE_DAYS` | `7` | Age threshold for stale `.deleted` agent directories | +| `TAAS_AFFINITY_RUNS_STATUS_PATH` | `~/.openclaw/alien-studio/runs-status.json` | Stuck-run status JSON path | +| `TAAS_AFFINITY_AUTO_ABORT_ZOMBIES` | `false` | Opt-in zombie run auto-abort check | +| `TAAS_AFFINITY_AUTO_ABORT_DRY_RUN` | `false` | Log zombie abort candidates without aborting | + +Requester bridge variables such as `TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED`, `TAAS_REQUESTER_BRIDGE_LEASE_URL`, and `TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS` are obsolete and ignored by this plugin version. --- diff --git a/index.ts b/index.ts index 1eef8e5..2636317 100644 --- a/index.ts +++ b/index.ts @@ -17,31 +17,15 @@ import type { * outbound request to CloudSigma TaaS providers so the session-affinity layer * achieves confidence=1.0 from turn 1, maximising prompt-cache hit rates. * - * For CloudSigma TaaS providers, the plugin also creates/refreshes a short - * requester bridge lease and injects the returned opaque descriptor into - * metadata.requester_runtime.available_bridges. Set - * TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED=0 to disable this behaviour explicitly. - * The bridge remains requester-authorized: TaaS is the relay/audit/transport - * layer, while requester/plugin-side permissions decide actual tool execution. + * Requester-side tool execution is handled by OpenClaw / Claude Code / TaaS + * Direction-2. This plugin only provides affinity metadata, the X-Session-Id + * transport header, and TaaS autorouter response-header capture. */ const SESSION_ID_PREFIX = "oc:" -const REQUESTER_RUNTIME_SCHEMA_VERSION = "2026-05-23" +const REQUESTER_RUNTIME_SCHEMA_VERSION = "2026-06-03" const REQUESTER_RUNTIME_SOURCE = "openclaw-token-cache-optimizer" -const REQUESTER_BRIDGE_PLUGIN_FLAG = "TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED" -const DEFAULT_TAAS_BASE_URL = "https://taas.cloudsigma.com" -const REQUESTER_BRIDGE_LEASE_PATH = "/internal/requester-bridges/leases" -const REQUESTER_BRIDGE_POLL_PATH = "/internal/requester-bridges/poll" -const REQUESTER_BRIDGE_RESULTS_PATH = "/internal/requester-bridges/results" -const REQUESTER_BRIDGE_CAPABILITY = "requester.tool.invoke" -const REQUESTER_BRIDGE_CAPABILITY_LEGACY = "openclaw.tool.invoke" -const REQUESTER_BRIDGE_DEFAULT_TTL_SECONDS = 5 * 60 const GIT_PROBE_TIMEOUT_MS = 250 -const LEASE_REQUEST_TIMEOUT_MS = 1200 -const POLL_REQUEST_TIMEOUT_MS = 1200 -const DEFAULT_POLL_INTERVAL_MS = 1000 -const DEFAULT_GATEWAY_URL = "http://127.0.0.1:18789" -const MAX_ECHO_BYTES = 4096 type RequesterRuntime = Record @@ -53,27 +37,6 @@ type RuntimeContextHints = { provider?: string } -type LeaseResponse = { - ok?: boolean - descriptor?: unknown - lease_id?: unknown - bridge_id?: unknown -} - -type BridgePollOperation = { - operation_id?: unknown - audit_id?: unknown - lease_id?: unknown - bridge_id?: unknown - operation?: unknown - arguments?: unknown - claim_id?: unknown - claim_expires_at?: unknown - delivery_attempt?: unknown -} - -type PollerState = { timer?: NodeJS.Timeout; inFlight: boolean; stopped?: boolean; leaseId?: string } - // OpenClaw stores the active registry state (including workspaceDir) on globalThis // under this well-known symbol key. const PLUGIN_REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState") @@ -81,18 +44,6 @@ const PLUGIN_REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState") const isDev = process.env.NODE_ENV === "development" || Boolean(process.env.OPENCLAW_DEBUG) -const activePollers = new Map() - -function envFlagDisabled(name: string): boolean { - return ["0", "false", "no", "off"].includes( - (process.env[name] ?? "").trim().toLowerCase() - ) -} - -function requesterBridgePluginEnabled(): boolean { - return !envFlagDisabled(REQUESTER_BRIDGE_PLUGIN_FLAG) -} - /** * Resolves the best available session source string, working through the * fallback tier list. Returns undefined only when no source is found at all @@ -268,156 +219,70 @@ function buildRequesterRuntime( return { schema_version: REQUESTER_RUNTIME_SCHEMA_VERSION, source: REQUESTER_RUNTIME_SOURCE, - capture_mode: "advisory_only", session_key: sessionId, openclaw_session_id: sessionId, - ...(hints.workspaceDir && { workspace_dir: path.resolve(hints.workspaceDir) }), - ...(hints.agentDir && { agent_dir: path.resolve(hints.agentDir) }), - ...(hints.repoRoot && { repo_root_hint: hints.repoRoot, repo_name: path.basename(hints.repoRoot) }), + requester_host_id: stableHash(os.hostname(), "host"), + ...(hints.repoRoot && { repo_name: path.basename(hints.repoRoot) }), ...(hints.repoRoot && { git_branch_hint: readGitHeadBranch(hints.repoRoot) }), ...(hints.repoRoot && { git_dirty_hint: readGitDirtyHint(hints.repoRoot) }), - requester_host_id: stableHash(os.hostname(), "host"), ...(hints.provider && { provider: hints.provider }), ...(hints.modelId && { model_id: hints.modelId }), session_source_hint: stableHash(source, "source"), - available_bridges: [], - required_execution_mode: "advisory_only", - redaction_policy: "no_secrets;bounded_paths;no_env_values;no_git_remotes;no_status_or_diffs;no_extra_params", - } -} - -function isTaasProvider(ctx: ProviderWrapStreamFnContext): boolean { - const provider = safeString((ctx as unknown as Record).provider) - return provider === "cloudsigma" || provider === "cloudsigma-staging" -} - -function providerBaseUrl(payload: Record, ctx: ProviderWrapStreamFnContext): string | undefined { - const direct = safeString(payload.base_url) ?? safeString(payload.baseURL) - if (direct) return direct - const modelRecord = asRecord((ctx as unknown as Record).model) - return safeString(modelRecord?.baseUrl) ?? safeString(modelRecord?.baseURL) -} - -function requesterBridgeLeaseUrl(payload: Record, ctx: ProviderWrapStreamFnContext): string | undefined { - const explicit = safeString(process.env.TAAS_REQUESTER_BRIDGE_LEASE_URL) - if (explicit) return explicit - const base = providerBaseUrl(payload, ctx) ?? DEFAULT_TAAS_BASE_URL - try { - return new URL(REQUESTER_BRIDGE_LEASE_PATH, base.endsWith("/") ? base : `${base}/`).toString() - } catch { - return undefined + tool_execution: "direction_2_gateway", + metadata_classification: { + identifiers: "hashed", + repository: "name_branch_dirty_only", + local_paths: "omitted_by_default", + }, + redaction_policy: "no_secrets;no_raw_local_paths;no_env_values;no_git_remotes;no_status_or_diffs;no_extra_params", } } -function bridgeSiblingUrl(leaseUrl: string, pathName: string): string | undefined { - try { - const url = new URL(leaseUrl) - url.pathname = pathName - url.search = "" - return url.toString() - } catch { - return undefined +function patchPayloadMetadata( + payload: Record, + sessionId: string, + requesterRuntime?: RequesterRuntime +): Record { + const existingMeta = asRecord(payload.metadata) ?? {} + // Never overwrite existing metadata fields — the caller owns them. + const needsSessionId = !existingMeta.session_id + const needsStickyKey = !existingMeta.sticky_key + const needsRequesterRuntime = requesterRuntime && !existingMeta.requester_runtime + if (!needsSessionId && !needsStickyKey && !needsRequesterRuntime) return payload + return { + ...payload, + metadata: { + ...existingMeta, + ...(needsSessionId && { session_id: sessionId }), + ...(needsStickyKey && { sticky_key: sessionId }), + ...(needsRequesterRuntime && { requester_runtime: requesterRuntime }), + }, } } -function pollIntervalMs(): number { - const value = Number.parseInt(process.env.TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS ?? "", 10) - if (Number.isFinite(value) && value >= 50) return Math.min(value, 30_000) - return DEFAULT_POLL_INTERVAL_MS -} - -function boundedJson(value: unknown): unknown { - const encoded = JSON.stringify(value ?? null) - if (encoded.length <= MAX_ECHO_BYTES) return value - return { truncated: true, message: "echo payload exceeded bridge scaffold size limit" } -} - -function requesterGatewayUrl(): string { - return safeString(process.env.TAAS_REQUESTER_LOCAL_GATEWAY_URL) - ?? safeString(process.env.OPENCLAW_GATEWAY_URL) - ?? (safeString(process.env.OPENCLAW_GATEWAY_PORT) ? `http://127.0.0.1:${process.env.OPENCLAW_GATEWAY_PORT}` : DEFAULT_GATEWAY_URL) -} - -function requesterGatewayToken(): string | undefined { - return safeString(process.env.TAAS_REQUESTER_LOCAL_GATEWAY_TOKEN) - ?? safeString(process.env.OPENCLAW_GATEWAY_TOKEN) - ?? safeString(process.env.OPENCLAW_GATEWAY_PASSWORD) -} - -async function invokeRequesterLocalTool(tool: string, args: Record): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> { - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), POLL_REQUEST_TIMEOUT_MS) - try { - const headers: Record = { "Content-Type": "application/json" } - const token = requesterGatewayToken() - if (token) headers.Authorization = `Bearer ${token}` - const response = await fetch(`${requesterGatewayUrl().replace(/\/+$/, "")}/tools/invoke`, { - method: "POST", - headers, - body: JSON.stringify({ tool, args }), - signal: controller.signal, - }) - const json = await response.json().catch(() => undefined) as { ok?: boolean; result?: unknown; error?: { type?: string; code?: string; message?: string } } | undefined - if (!response.ok || !json || json.ok !== true) { - const code = json?.error?.code ?? json?.error?.type ?? `http_${response.status}` - const message = json?.error?.message ?? `Requester local tool invoke failed: ${code}` - return { ok: false, error: { code, message } } - } - return { ok: true, result: json.result } - } catch (err) { - const name = err instanceof Error && err.name ? err.name : "requester_tool_invoke_failed" - const message = err instanceof Error && err.message ? err.message : String(err) - return { ok: false, error: { code: name, message } } - } finally { - clearTimeout(timer) - } +/** + * Captured autorouter metadata per-session, populated by the onResponse callback + * installed in `buildWrapper`. Exposed via the `taas.autorouter.lastRoute` gateway + * RPC so Alien AI Studio (or any other client) can pull the latest TaaS routing + * decision for a session — including the actual model chosen by the autorouter, + * the algorithm used, the source of that algorithm (org/dept/key/user/system), + * the thinking level applied, and the chosen model's context window. + * + * The map is keyed by the affinity session ID we already derive in + * `resolveSessionId(ctx.workspaceDir)`. Stored values are bounded — see + * `LAST_ROUTE_LIMIT` — to avoid unbounded growth in long-lived gateways. + */ +type AutorouterCapture = { + sessionId: string + capturedAt: number + autorouterModel: string | null + autorouterAlgo: string | null + autorouterAlgoSource: string | null + thinkingApplied: string | null + routedContextWindow: number | null } -async function executeSafeBridgeOperation(operation: BridgePollOperation): Promise<{ ok: true; result: unknown } | { ok: false; error: { code: string; message: string } }> { - if (operation.operation !== REQUESTER_BRIDGE_CAPABILITY && operation.operation !== REQUESTER_BRIDGE_CAPABILITY_LEGACY) { - return { ok: false, error: { code: "unsupported_operation", message: "Unsupported requester bridge operation" } } - } - const args = asRecord(operation.arguments) ?? {} - const tool = safeString(args.tool) ?? safeString(args.tool_name) ?? safeString(args.name) - const toolArgs = asRecord(args.arguments) ?? asRecord(args.input) ?? {} - if (!tool) return { ok: false, error: { code: "missing_tool", message: "Requester bridge operation missing tool name" } } - if (tool === "bridge.ping") { - return { ok: true, result: { pong: true, echo: boundedJson(toolArgs), scaffold: true } } - } - if (tool === "bridge.echo") { - return { ok: true, result: { echo: boundedJson(toolArgs), scaffold: true } } - } - return invokeRequesterLocalTool(tool, toolArgs) -} -async function postBridgeResult(resultsUrl: string, descriptor: Record, operation: BridgePollOperation, outcome: Awaited>): Promise { - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), POLL_REQUEST_TIMEOUT_MS) - try { - const claimId = typeof operation.claim_id === "string" ? operation.claim_id : undefined - await fetch(resultsUrl, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - schema_version: REQUESTER_RUNTIME_SCHEMA_VERSION, - lease_id: descriptor.lease_id, - bridge_id: descriptor.bridge_id, - auth_context_id: descriptor.auth_context_id, - operation_id: operation.operation_id, - audit_id: operation.audit_id, - ...(claimId && { claim_id: claimId }), - ...outcome, - }), - signal: controller.signal, - }) - } catch (err) { - if (isDev) console.debug(`[taas-affinity] requester bridge result post failed: ${err instanceof Error ? err.name : "unknown"}`) - } finally { - clearTimeout(timer) - } -} - -const DEFAULT_POLL_WAIT_MS = 25_000 // ── Trash sweeper configuration ────────────────────────────────────────────── const SWEEP_INTERVAL_MS = Number(process.env.TAAS_AFFINITY_SWEEP_INTERVAL_MS) || 3_600_000 const SWEEP_STALE_DAYS = Number(process.env.TAAS_AFFINITY_SWEEP_STALE_DAYS) || 7 @@ -463,224 +328,6 @@ function setAbortRunFn(fn: (sessionKey: string) => Promise): void { abortRun = fn } -function stopRequesterBridgePoller(leaseId?: string): void { - if (!leaseId) return - const state = activePollers.get(leaseId) - if (!state) return - state.stopped = true - if (state.timer) clearInterval(state.timer) - activePollers.delete(leaseId) -} - -async function pollRequesterBridgeOnce(pollUrl: string, resultsUrl: string, descriptor: Record, state: PollerState): Promise { - if (state.inFlight || state.stopped) return - state.inFlight = true - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), Math.max(POLL_REQUEST_TIMEOUT_MS, DEFAULT_POLL_WAIT_MS + 5_000)) - try { - const response = await fetch(pollUrl, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - schema_version: REQUESTER_RUNTIME_SCHEMA_VERSION, - lease_id: descriptor.lease_id, - bridge_id: descriptor.bridge_id, - auth_context_id: descriptor.auth_context_id, - max_operations: 10, - wait_ms: DEFAULT_POLL_WAIT_MS, - }), - signal: controller.signal, - }) - if (!response.ok) { - const json = await response.json().catch(() => undefined) as { error?: { code?: unknown } } | undefined - const code = safeString(json?.error?.code) - if (response.status === 404 || code === "lease_expired_or_unknown") { - // Lease state is server/cache-owned and can disappear on rollout or TTL expiry. - // Stop polling this dead lease immediately; the next provider turn will create - // a fresh lease instead of hammering TaaS/Redis with stale IDs. - stopRequesterBridgePoller(state.leaseId ?? safeString(descriptor.lease_id)) - } - return - } - const json = await response.json() as { operations?: unknown } - const operations = Array.isArray(json.operations) ? json.operations : [] - for (const raw of operations) { - const operation = asRecord(raw) as BridgePollOperation | undefined - if (!operation || !operation.operation_id) continue - const outcome = await executeSafeBridgeOperation(operation) - await postBridgeResult(resultsUrl, descriptor, operation, outcome) - } - } catch (err) { - if (isDev) console.debug(`[taas-affinity] requester bridge poll failed: ${err instanceof Error ? err.name : "unknown"}`) - } finally { - clearTimeout(timer) - state.inFlight = false - } -} - -function startRequesterBridgePoller(leaseUrl: string, descriptor: Record): void { - const leaseId = safeString(descriptor.lease_id) - if (!leaseId || activePollers.has(leaseId)) return - const pollUrl = bridgeSiblingUrl(leaseUrl, REQUESTER_BRIDGE_POLL_PATH) - const resultsUrl = bridgeSiblingUrl(leaseUrl, REQUESTER_BRIDGE_RESULTS_PATH) - if (!pollUrl || !resultsUrl) return - const state: PollerState = { inFlight: false, leaseId } - activePollers.set(leaseId, state) - const tick = () => { void pollRequesterBridgeOnce(pollUrl, resultsUrl, descriptor, state) } - state.timer = setInterval(tick, pollIntervalMs()) - state.timer.unref?.() - tick() -} - -function isSafeDescriptor(value: unknown): value is Record { - const descriptor = asRecord(value) - if (!descriptor) return false - if ("bridge_required" in descriptor) return false - const capabilities = descriptor.capabilities - if (!Array.isArray(capabilities) || (capabilities.includes(REQUESTER_BRIDGE_CAPABILITY) === false && capabilities.includes(REQUESTER_BRIDGE_CAPABILITY_LEGACY) === false)) { - return false - } - const encoded = JSON.stringify(descriptor) - const lowered = encoded.toLowerCase() - return ![ - "endpoint_url", - "access_token", - "refresh_token", - "authorization", - "bearer ", - "api_key", - "apikey", - "password", - "secret", - ].some((needle) => lowered.includes(needle)) -} - -async function createRequesterBridgeLease( - url: string, - runtime: RequesterRuntime, - ctx: ProviderWrapStreamFnContext -): Promise | undefined> { - const workspaceDir = safeString(runtime.workspace_dir) - const repoRoot = safeString(runtime.repo_root_hint) - const repoName = safeString(runtime.repo_name) - const controller = new AbortController() - const timer = setTimeout(() => controller.abort(), LEASE_REQUEST_TIMEOUT_MS) - try { - const body = { - schema_version: REQUESTER_RUNTIME_SCHEMA_VERSION, - session_key: runtime.session_key, - openclaw_session_id: runtime.openclaw_session_id, - requester_host_id: runtime.requester_host_id, - workspace: { - ...(workspaceDir && { workspace_dir: workspaceDir }), - ...(repoRoot && { repo_root: repoRoot }), - ...(repoName && { repo_name: repoName }), - }, - capabilities: [REQUESTER_BRIDGE_CAPABILITY], - ttl_s: REQUESTER_BRIDGE_DEFAULT_TTL_SECONDS, - callback: { mode: "poll" }, - client: { - source: REQUESTER_RUNTIME_SOURCE, - provider: runtime.provider, - model_id: runtime.model_id, - }, - } - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - "X-Session-Id": String(runtime.session_key), - }, - body: JSON.stringify(body), - signal: controller.signal, - }) - if (!response.ok) { - if (isDev) console.debug(`[taas-affinity] requester bridge lease failed status=${response.status}`) - return undefined - } - const json = (await response.json()) as LeaseResponse - if (!json.ok || !isSafeDescriptor(json.descriptor)) return undefined - const descriptor = json.descriptor - startRequesterBridgePoller(url, descriptor) - return descriptor - } catch (err) { - if (isDev) console.debug(`[taas-affinity] requester bridge lease unavailable: ${err instanceof Error ? err.name : "unknown"}`) - return undefined - } finally { - clearTimeout(timer) - } -} - -async function maybeUpgradeRequesterRuntimeWithBridge( - runtime: RequesterRuntime, - payload: Record, - ctx: ProviderWrapStreamFnContext -): Promise { - if (!requesterBridgePluginEnabled() || !isTaasProvider(ctx)) return runtime - const url = requesterBridgeLeaseUrl(payload, ctx) - if (!url) { - if (isDev) console.debug("[taas-affinity] requester bridge enabled but no TaaS base URL found") - return runtime - } - const descriptor = await createRequesterBridgeLease(url, runtime, ctx) - if (!descriptor) return runtime - return { - ...runtime, - capture_mode: "bridge_capable", - available_bridges: [descriptor], - required_execution_mode: "advisory_only", - } -} - -async function patchPayloadMetadata( - payload: Record, - sessionId: string, - requesterRuntime?: RequesterRuntime, - ctx?: ProviderWrapStreamFnContext -): Promise> { - const existingMeta = asRecord(payload.metadata) ?? {} - // Never overwrite existing metadata fields — the caller owns them. - const needsSessionId = !existingMeta.session_id - const needsStickyKey = !existingMeta.sticky_key - const needsRequesterRuntime = requesterRuntime && !existingMeta.requester_runtime - if (!needsSessionId && !needsStickyKey && !needsRequesterRuntime) return payload - const runtime = needsRequesterRuntime && requesterRuntime && ctx - ? await maybeUpgradeRequesterRuntimeWithBridge(requesterRuntime, payload, ctx) - : requesterRuntime - return { - ...payload, - metadata: { - ...existingMeta, - ...(needsSessionId && { session_id: sessionId }), - ...(needsStickyKey && { sticky_key: sessionId }), - ...(needsRequesterRuntime && { requester_runtime: runtime }), - }, - } -} - -/** - * Captured autorouter metadata per-session, populated by the onResponse callback - * installed in `buildWrapper`. Exposed via the `taas.autorouter.lastRoute` gateway - * RPC so Alien AI Studio (or any other client) can pull the latest TaaS routing - * decision for a session — including the actual model chosen by the autorouter, - * the algorithm used, the source of that algorithm (org/dept/key/user/system), - * the thinking level applied, and the chosen model's context window. - * - * The map is keyed by the affinity session ID we already derive in - * `resolveSessionId(ctx.workspaceDir)`. Stored values are bounded — see - * `LAST_ROUTE_LIMIT` — to avoid unbounded growth in long-lived gateways. - */ -type AutorouterCapture = { - sessionId: string - capturedAt: number - autorouterModel: string | null - autorouterAlgo: string | null - autorouterAlgoSource: string | null - thinkingApplied: string | null - routedContextWindow: number | null -} - - // ── Trash sweeper ──────────────────────────────────────────────────────────── let sweepInProgress = false @@ -1014,28 +661,31 @@ function runAbortCheck(agentsDir?: string): void { // ── Background task scheduler ──────────────────────────────────────────────── const backgroundTimers: (NodeJS.Timeout)[] = [] +function trackBackgroundTimer(timer: NodeJS.Timeout): NodeJS.Timeout { + timer.unref?.() + backgroundTimers.push(timer) + return timer +} + function startBackgroundTasks(): void { // Trash sweeper — randomised initial delay (0-30s) to stagger const sweepDelay = Math.floor(Math.random() * 30_000) - const sweepInit = setTimeout(() => { + trackBackgroundTimer(setTimeout(() => { runTrashSweep() - backgroundTimers.push(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS)) - }, sweepDelay) - backgroundTimers.push(sweepInit) + trackBackgroundTimer(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS)) + }, sweepDelay)) // Stuck-run status writer — 5s initial delay, then every 30s - const statusInit = setTimeout(() => { + trackBackgroundTimer(setTimeout(() => { writeRunStatus() - backgroundTimers.push(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS)) - }, 5_000) - backgroundTimers.push(statusInit) + trackBackgroundTimer(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS)) + }, 5_000)) // Zombie auto-abort — 10s initial delay, then every AUTO_ABORT_CHECK_INTERVAL_MS - const abortInit = setTimeout(() => { + trackBackgroundTimer(setTimeout(() => { runAbortCheck() - backgroundTimers.push(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS)) - }, 10_000) - backgroundTimers.push(abortInit) + trackBackgroundTimer(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS)) + }, 10_000)) } const LAST_ROUTE_LIMIT = 256 @@ -1156,7 +806,7 @@ function buildWrapper(ctx: ProviderWrapStreamFnContext) { if (prevOnPayload) return prevOnPayload(payload, payloadModel) return payload } - const patched = await patchPayloadMetadata(payloadRecord, sessionId, requesterRuntime, ctx) + const patched = patchPayloadMetadata(payloadRecord, sessionId, requesterRuntime) if (prevOnPayload) return prevOnPayload(patched, payloadModel) return patched } diff --git a/test/requester-bridge.test.ts b/test/requester-bridge.test.ts deleted file mode 100644 index 8c5e2cd..0000000 --- a/test/requester-bridge.test.ts +++ /dev/null @@ -1,518 +0,0 @@ -import assert from "node:assert/strict" -import { createServer } from "node:http" -import { test } from "node:test" - -async function loadPlugin(env: Record = {}) { - const oldEnv: Record = {} - for (const [key, value] of Object.entries(env)) { - oldEnv[key] = process.env[key] - if (value === undefined) delete process.env[key] - else process.env[key] = value - } - const mod = await import(`../index.ts?cacheBust=${Date.now()}-${Math.random()}`) - return { - plugin: mod.default, - restore() { - for (const [key, value] of Object.entries(oldEnv)) { - if (value === undefined) delete process.env[key] - else process.env[key] = value - } - }, - } -} - -function captureWrapper(plugin: any) { - let hook: any - plugin.register({ registerProvider(provider: any) { hook = provider } }) - return hook.wrapStreamFn -} - -async function runPayload(wrapperFactory: any, payload: any, ctxExtra: Record = {}) { - let captured: any - const streamFn = (_model: any, _context: any, options: any) => options.onPayload(payload, _model).then((result: any) => { - captured = result - return result - }) - const wrapper = wrapperFactory({ - provider: "cloudsigma", - modelId: "claude-code", - workspaceDir: process.cwd(), - model: { id: "claude-code", baseUrl: ctxExtra.baseUrl }, - streamFn, - ...ctxExtra, - }) - await wrapper({}, {}, {}) - return captured -} - -test("bridge can be explicitly disabled and preserves advisory-only requester runtime", async () => { - const { plugin, restore } = await loadPlugin({ TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED: "0" }) - try { - const payload = await runPayload(captureWrapper(plugin), { messages: [] }) - assert.equal(payload.metadata.requester_runtime.capture_mode, "advisory_only") - assert.deepEqual(payload.metadata.requester_runtime.available_bridges, []) - assert.equal("bridge_required" in payload.metadata.requester_runtime, false) - } finally { - restore() - } -}) - -test("plugin enabled creates lease and injects bridge-capable descriptor", async () => { - let requestBody: any - const server = createServer((req, res) => { - assert.equal(req.method, "POST") - assert.equal(req.url, "/internal/requester-bridges/leases") - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - requestBody = JSON.parse(body) - res.setHeader("Content-Type", "application/json") - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_test", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - }) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const baseUrl = `http://127.0.0.1:${address.port}` - const { plugin, restore } = await loadPlugin({}) - try { - const payload = await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl }) - assert.equal(requestBody.schema_version, "2026-05-23") - assert.deepEqual(requestBody.capabilities, ["requester.tool.invoke"]) - assert.equal(payload.metadata.requester_runtime.capture_mode, "bridge_capable") - assert.equal(payload.metadata.requester_runtime.available_bridges[0].lease_id, "brl_test") - assert.equal("bridge_required" in payload.metadata.requester_runtime, false) - assert.equal(JSON.stringify(payload).includes("access_token"), false) - } finally { - restore() - server.close() - } -}) - -test("lease failure falls back to advisory-only empty bridges", async () => { - const server = createServer((_req, res) => { - res.statusCode = 503 - res.end(JSON.stringify({ ok: false })) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const { plugin, restore } = await loadPlugin({}) - try { - const payload = await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${address.port}` }) - assert.equal(payload.metadata.requester_runtime.capture_mode, "advisory_only") - assert.deepEqual(payload.metadata.requester_runtime.available_bridges, []) - } finally { - restore() - server.close() - } -}) - - -test("plugin enabled polls and executes safe bridge scaffold operation", async () => { - let pollCount = 0 - let resultBody: any - const server = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - res.setHeader("Content-Type", "application/json") - if (req.url === "/internal/requester-bridges/leases") { - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_poll", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - return - } - if (req.url === "/internal/requester-bridges/poll") { - pollCount += 1 - res.end(JSON.stringify({ - ok: true, - operations: pollCount === 1 ? [{ - operation_id: "bro_test", - audit_id: "bra_test", - lease_id: "brl_poll", - bridge_id: "br_test", - operation: "requester.tool.invoke", - arguments: { tool: "bridge.ping", arguments: { message: "hi" } }, - }] : [], - })) - return - } - if (req.url === "/internal/requester-bridges/results") { - resultBody = JSON.parse(body) - res.end(JSON.stringify({ ok: true })) - return - } - res.statusCode = 404 - res.end(JSON.stringify({ ok: false })) - }) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const { plugin, restore } = await loadPlugin({ - TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS: "50", - }) - try { - await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${address.port}` }) - await new Promise((resolve) => setTimeout(resolve, 150)) - assert.equal(resultBody.operation_id, "bro_test") - assert.equal(resultBody.ok, true) - assert.equal(resultBody.result.pong, true) - assert.equal(resultBody.result.scaffold, true) - assert.equal(JSON.stringify(resultBody).includes("bridge_required"), false) - } finally { - restore() - server.close() - } -}) - -test("plugin handles legacy openclaw.tool.invoke operation name in poll", async () => { - let pollCount = 0 - let resultBody: any - const server = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - res.setHeader("Content-Type", "application/json") - if (req.url === "/internal/requester-bridges/leases") { - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_legacy", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - return - } - if (req.url === "/internal/requester-bridges/poll") { - pollCount += 1 - res.end(JSON.stringify({ - ok: true, - operations: pollCount === 1 ? [{ - operation_id: "bro_legacy", - audit_id: "bra_legacy", - lease_id: "brl_legacy", - bridge_id: "br_test", - operation: "openclaw.tool.invoke", - arguments: { tool: "bridge.echo", arguments: { msg: "legacy compat" } }, - }] : [], - })) - return - } - if (req.url === "/internal/requester-bridges/results") { - resultBody = JSON.parse(body) - res.end(JSON.stringify({ ok: true })) - return - } - res.statusCode = 404 - res.end(JSON.stringify({ ok: false })) - }) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const { plugin, restore } = await loadPlugin({ - TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS: "50", - }) - try { - await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${address.port}` }) - await new Promise((resolve) => setTimeout(resolve, 150)) - assert.equal(resultBody.operation_id, "bro_legacy") - assert.equal(resultBody.ok, true) - assert.equal(resultBody.result.echo.msg, "legacy compat") - assert.equal(resultBody.result.scaffold, true) - } finally { - restore() - server.close() - } -}) - -test("plugin executes non-scaffold requester tools through requester-local gateway", async () => { - let pollCount = 0 - let resultBody: any - let gatewayBody: any - const gateway = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - gatewayBody = JSON.parse(body) - assert.equal(req.url, "/tools/invoke") - assert.equal(req.headers.authorization, "Bearer requester-token") - res.setHeader("Content-Type", "application/json") - res.end(JSON.stringify({ ok: true, result: { rows: [{ title: "PRD" }] } })) - }) - }) - await new Promise((resolve) => gateway.listen(0, "127.0.0.1", resolve)) - const gatewayAddress = gateway.address() - assert(gatewayAddress && typeof gatewayAddress === "object") - - const taas = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - res.setHeader("Content-Type", "application/json") - if (req.url === "/internal/requester-bridges/leases") { - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_tool", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - return - } - if (req.url === "/internal/requester-bridges/poll") { - pollCount += 1 - res.end(JSON.stringify({ - ok: true, - operations: pollCount === 1 ? [{ - operation_id: "bro_tool", - audit_id: "bra_tool", - lease_id: "brl_tool", - bridge_id: "br_test", - operation: "requester.tool.invoke", - arguments: { tool: "prd_list", arguments: { query: "requester bridge" } }, - }] : [], - })) - return - } - if (req.url === "/internal/requester-bridges/results") { - resultBody = JSON.parse(body) - res.end(JSON.stringify({ ok: true })) - return - } - res.statusCode = 404 - res.end(JSON.stringify({ ok: false })) - }) - }) - await new Promise((resolve) => taas.listen(0, "127.0.0.1", resolve)) - const taasAddress = taas.address() - assert(taasAddress && typeof taasAddress === "object") - const { plugin, restore } = await loadPlugin({ - TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS: "50", - TAAS_REQUESTER_LOCAL_GATEWAY_URL: `http://127.0.0.1:${gatewayAddress.port}`, - TAAS_REQUESTER_LOCAL_GATEWAY_TOKEN: "requester-token", - }) - try { - await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${taasAddress.port}` }) - await new Promise((resolve) => setTimeout(resolve, 150)) - assert.deepEqual(gatewayBody, { tool: "prd_list", args: { query: "requester bridge" } }) - assert.equal(resultBody.operation_id, "bro_tool") - assert.equal(resultBody.ok, true) - assert.deepEqual(resultBody.result, { rows: [{ title: "PRD" }] }) - } finally { - restore() - taas.close() - gateway.close() - } -}) - -test("plugin includes claim_id in result submission when poll returns one", async () => { - let pollCount = 0 - let resultBody: any - const server = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - res.setHeader("Content-Type", "application/json") - if (req.url === "/internal/requester-bridges/leases") { - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_claim", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - return - } - if (req.url === "/internal/requester-bridges/poll") { - pollCount += 1 - res.end(JSON.stringify({ - ok: true, - operations: pollCount === 1 ? [{ - operation_id: "bro_claim", - audit_id: "bra_claim", - lease_id: "brl_claim", - bridge_id: "br_test", - operation: "requester.tool.invoke", - arguments: { tool: "bridge.ping" }, - claim_id: "brc_abc123claim", - claim_expires_at: "2026-05-23T20:00:00Z", - delivery_attempt: 1, - }] : [], - })) - return - } - if (req.url === "/internal/requester-bridges/results") { - resultBody = JSON.parse(body) - res.end(JSON.stringify({ ok: true })) - return - } - res.statusCode = 404 - res.end(JSON.stringify({ ok: false })) - }) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const { plugin, restore } = await loadPlugin({ - TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS: "50", - }) - try { - await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${address.port}` }) - await new Promise((resolve) => setTimeout(resolve, 150)) - assert.equal(resultBody.operation_id, "bro_claim") - assert.equal(resultBody.claim_id, "brc_abc123claim") - assert.equal(resultBody.ok, true) - } finally { - restore() - server.close() - } -}) - -test("plugin stops polling stale leases on lease_expired_or_unknown", async () => { - let pollCount = 0 - const server = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - void body - res.setHeader("Content-Type", "application/json") - if (req.url === "/internal/requester-bridges/leases") { - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_stale", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - return - } - if (req.url === "/internal/requester-bridges/poll") { - pollCount += 1 - res.statusCode = 404 - res.end(JSON.stringify({ ok: false, error: { code: "lease_expired_or_unknown" } })) - return - } - res.statusCode = 404 - res.end(JSON.stringify({ ok: false })) - }) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const { plugin, restore } = await loadPlugin({ - TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS: "50", - }) - try { - await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${address.port}` }) - await new Promise((resolve) => setTimeout(resolve, 220)) - assert.equal(pollCount, 1) - } finally { - restore() - server.close() - } -}) - -test("plugin sends wait_ms in poll request for long-polling", async () => { - let pollBody: any - const server = createServer((req, res) => { - let body = "" - req.on("data", (chunk) => { body += chunk }) - req.on("end", () => { - res.setHeader("Content-Type", "application/json") - if (req.url === "/internal/requester-bridges/leases") { - res.end(JSON.stringify({ - ok: true, - descriptor: { - name: "requester-workspace", - version: "2026-05-23", - status: "verified", - bridge_id: "br_test", - lease_id: "brl_wait", - capabilities: ["requester.tool.invoke"], - endpoint_ref: "epref_test", - auth_context_id: "authctx_test", - expires_at: "2026-05-23T19:00:00Z", - }, - })) - return - } - if (req.url === "/internal/requester-bridges/poll") { - pollBody = JSON.parse(body) - res.end(JSON.stringify({ ok: true, operations: [] })) - return - } - res.statusCode = 404 - res.end(JSON.stringify({ ok: false })) - }) - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const { plugin, restore } = await loadPlugin({ - TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS: "50", - }) - try { - await runPayload(captureWrapper(plugin), { messages: [] }, { baseUrl: `http://127.0.0.1:${address.port}` }) - await new Promise((resolve) => setTimeout(resolve, 150)) - assert.equal(pollBody.wait_ms, 25000) - assert.equal(pollBody.max_operations, 10) - } finally { - restore() - server.close() - } -}) diff --git a/test/requester-runtime.test.ts b/test/requester-runtime.test.ts new file mode 100644 index 0000000..6dae355 --- /dev/null +++ b/test/requester-runtime.test.ts @@ -0,0 +1,122 @@ +import assert from "node:assert/strict" +import { createServer } from "node:http" +import { test } from "node:test" + +async function loadPlugin(env: Record = {}) { + const oldEnv: Record = {} + for (const [key, value] of Object.entries(env)) { + oldEnv[key] = process.env[key] + if (value === undefined) delete process.env[key] + else process.env[key] = value + } + const mod = await import(`../index.ts?cacheBust=${Date.now()}-${Math.random()}`) + return { + plugin: mod.default, + restore() { + for (const [key, value] of Object.entries(oldEnv)) { + if (value === undefined) delete process.env[key] + else process.env[key] = value + } + }, + } +} + +function captureProvider(plugin: any) { + let provider: any + plugin.register({ registerProvider(candidate: any) { provider = candidate } }) + return provider +} + +async function runPayload(provider: any, payload: any, ctxExtra: Record = {}) { + let captured: any + const streamFn = (_model: any, _context: any, options: any) => options.onPayload(payload, _model).then((result: any) => { + captured = result + return result + }) + const wrapper = provider.wrapStreamFn({ + provider: "cloudsigma", + modelId: "cloudsigma/auto", + workspaceDir: process.cwd(), + model: { id: "cloudsigma/auto", baseUrl: ctxExtra.baseUrl }, + streamFn, + ...ctxExtra, + }) + await wrapper("model", { messages: [] }, {}) + return captured +} + +test("Direction-2 metadata does not create requester bridge leases or descriptors", async () => { + let called = false + const server = createServer((_req, res) => { + called = true + res.statusCode = 500 + res.end("unexpected") + }) + await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) + const address = server.address() + assert(address && typeof address === "object") + const baseUrl = `http://127.0.0.1:${address.port}` + const { plugin, restore } = await loadPlugin({ + TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED: "1", + TAAS_REQUESTER_BRIDGE_LEASE_URL: `${baseUrl}/internal/requester-bridges/leases`, + }) + try { + const payload = await runPayload(captureProvider(plugin), { messages: [], metadata: {} }, { baseUrl }) + const runtime = payload.metadata.requester_runtime + assert.equal(called, false, "lease endpoint must not be called") + assert.equal("available_bridges" in runtime, false) + assert.equal("capture_mode" in runtime, false) + assert.equal(runtime.tool_execution, "direction_2_gateway") + assert.equal("workspace_dir" in runtime, false) + assert.equal("agent_dir" in runtime, false) + assert.equal("repo_root_hint" in runtime, false) + } finally { + restore() + await new Promise((resolve) => server.close(() => resolve())) + } +}) + +test("OpenAI tool declarations pass through untouched", async () => { + const { plugin, restore } = await loadPlugin() + try { + const tools = [{ type: "function", function: { name: "lookup", parameters: { type: "object" } } }] + const payload = await runPayload(captureProvider(plugin), { messages: [], tools, tool_choice: "auto" }) + assert.equal(payload.tools, tools) + assert.equal(payload.tool_choice, "auto") + } finally { + restore() + } +}) + +test("assistant tool_calls and role=tool messages are not intercepted", async () => { + const { plugin, restore } = await loadPlugin() + try { + const messages = [ + { role: "assistant", tool_calls: [{ id: "call_1", type: "function", function: { name: "lookup", arguments: "{}" } }] }, + { role: "tool", tool_call_id: "call_1", content: "result" }, + ] + const payload = await runPayload(captureProvider(plugin), { messages }) + assert.equal(payload.messages, messages) + } finally { + restore() + } +}) + +test("existing affinity metadata is never overwritten", async () => { + const { plugin, restore } = await loadPlugin() + try { + const payload = await runPayload(captureProvider(plugin), { + messages: [], + metadata: { + session_id: "external-session", + sticky_key: "external-sticky", + requester_runtime: { source: "caller" }, + }, + }) + assert.equal(payload.metadata.session_id, "external-session") + assert.equal(payload.metadata.sticky_key, "external-sticky") + assert.deepEqual(payload.metadata.requester_runtime, { source: "caller" }) + } finally { + restore() + } +}) diff --git a/test/smoke.mjs b/test/smoke.mjs index 550ff31..2b695e5 100644 --- a/test/smoke.mjs +++ b/test/smoke.mjs @@ -58,8 +58,13 @@ assert.equal( ) assert.equal( capturedPayload.metadata.requester_runtime.redaction_policy, - "no_secrets;bounded_paths;no_env_values;no_git_remotes;no_status_or_diffs;no_extra_params" + "no_secrets;no_raw_local_paths;no_env_values;no_git_remotes;no_status_or_diffs;no_extra_params" ) +assert.equal( + capturedPayload.metadata.requester_runtime.tool_execution, + "direction_2_gateway" +) +assert.equal("available_bridges" in capturedPayload.metadata.requester_runtime, false) const transportState = provider.resolveTransportTurnState({ provider: "cloudsigma", From 347bf7dc171abf8180bb405983e4bfb0e4fe08b6 Mon Sep 17 00:00:00 2001 From: cloudsigma Date: Thu, 4 Jun 2026 16:02:11 +0000 Subject: [PATCH 2/2] refactor: remove trash sweeper, stuck-run status, zombie auto-abort, and background timers Streamline the plugin to focus on its core purpose: - Session affinity (session_id, sticky_key metadata injection) - X-Session-Id transport header - Requester runtime metadata (sanitized for TaaS proxy) - Autorouter response-header capture + taas.autorouter.lastRoute gateway RPC - Add activation.onStartup for reliable plugin loading Removed dead-end features that belong in the TaaS proxy layer, not the OpenClaw plugin: - Trash sweeper (periodic run-directory cleanup) - Stuck-run status writer (JSON status files) - Zombie auto-abort (detect idle runs, emit chat.abort) - Background timer scheduler (setInterval orchestration) These features were either no-ops (zombie abort had no SDK dispatch capability) or redundant with the TaaS proxy's own lifecycle management. --- README.md | 307 +++++-------- index.ts | 812 ++++++--------------------------- openclaw.plugin.json | 4 + test/auto-abort.test.ts | 229 ---------- test/requester-runtime.test.ts | 20 +- test/smoke.mjs | 7 +- test/sweeper.test.ts | 303 ------------ 7 files changed, 243 insertions(+), 1439 deletions(-) delete mode 100644 test/auto-abort.test.ts delete mode 100644 test/sweeper.test.ts diff --git a/README.md b/README.md index 1ede020..8ae4db2 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,57 @@ -# openclaw-token-cache-optimizer +# openclaw-taas-affinity -An [OpenClaw](https://openclaw.ai) provider plugin that maximises prompt-cache hit rates when using [CloudSigma TaaS](https://www.cloudsigma.com) as your LLM provider. +CloudSigma TaaS affinity provider hook for OpenClaw. -It injects a stable, per-conversation session ID into every outbound request so TaaS can pin your conversation to the same upstream slot (OAuth token, Bedrock region, or Claude Code node) from the very first turn — giving you consistent prompt-cache reuse instead of cold starts on every message. +This plugin is intentionally narrow after the Claude Code Direction-2 lane update. It does **not** lease requester bridges, poll for bridge work, invoke requester-local tools, rewrite OpenAI tool payloads, or run OpenClaw maintenance sidecars. -The plugin is intentionally narrow: requester-side tool execution is handled by Claude Code, TaaS, and the OpenClaw gateway Direction-2 path. This plugin does **not** lease requester bridges, poll for bridge work, invoke local tools, or alter OpenAI tool payloads. +## What it does ---- +For requests routed through the `cloudsigma` or `cloudsigma-staging` provider IDs, the plugin: -## The problem it solves +- derives a stable affinity session ID with the form `oc:` +- injects `metadata.session_id` when absent +- injects `metadata.sticky_key` when absent +- injects a sanitized `metadata.requester_runtime` envelope when absent +- injects transport header `X-Session-Id` +- captures TaaS autorouter response headers +- exposes the latest route capture via gateway method `taas.autorouter.lastRoute` -TaaS routes LLM requests across a pool of upstream slots. Without a session signal, it uses heuristics to guess which requests belong to the same conversation: +## Startup compatibility -| Method | Confidence | Works when | -|---|---:|---| -| Tool-use ID chain | 1.0 | Tool-result follow-up turns only | -| Structural inference | 0.85 | Mid-conversation, after a few turns | -| New session fallback | 0.30 | First turn — no prior context | - -That **0.30 confidence on turn 1** means the first message in every conversation is likely routed to a random slot, breaking prompt-cache continuity right from the start. +The manifest explicitly asks OpenClaw to import the plugin at gateway startup: -This plugin passes a stable `session_id` derived from your OpenClaw workspace so TaaS short-circuits heuristic matching and achieves **confidence 1.0 from turn 1**. +```json +{ + "activation": { + "onStartup": true, + "onProviders": ["cloudsigma", "cloudsigma-staging"] + } +} +``` ---- +This is required because gateway RPC handlers must be attached during gateway startup. Provider/lazy activation is not enough for `taas.autorouter.lastRoute` to be present in the live gateway dispatch table. -## How it works +## Request metadata -OpenClaw's `wrapStreamFn` hook intercepts the outbound request payload before it is sent to TaaS. The plugin adds session affinity fields plus a small sanitized requester runtime envelope: +Example injected metadata: ```json { "metadata": { - "session_id": "oc:edebc39a82a8a041", - "sticky_key": "oc:edebc39a82a8a041", + "session_id": "oc:0123456789abcdef", + "sticky_key": "oc:0123456789abcdef", "requester_runtime": { - "schema_version": "2026-06-03", - "source": "openclaw-token-cache-optimizer", - "session_key": "oc:edebc39a82a8a041", - "openclaw_session_id": "oc:edebc39a82a8a041", + "schema_version": "2026-06-04", + "source": "openclaw-taas-affinity", + "session_key": "oc:0123456789abcdef", + "openclaw_session_id": "oc:0123456789abcdef", "requester_host_id": "host:1a2b3c4d5e6f7890", - "repo_name": "my-repo", + "repo_name": "example-repo", "git_branch_hint": "dev", "git_dirty_hint": false, "provider": "cloudsigma", "model_id": "cloudsigma/auto", - "session_source_hint": "source:4ae2870a2e73027c", + "session_source_hint": "source:1a2b3c4d5e6f7890", "tool_execution": "direction_2_gateway", "metadata_classification": { "identifiers": "hashed", @@ -57,231 +64,121 @@ OpenClaw's `wrapStreamFn` hook intercepts the outbound request payload before it } ``` -- `session_id` — read by TaaS's OpenAI and Codex affinity paths. -- `sticky_key` — additionally read by the Anthropic substrate routing layer. -- `requester_runtime` — safe advisory hints for downstream routing and diagnostics. -- `X-Session-Id` — injected by `resolveTransportTurnState` for transports that support per-turn native headers. - -All metadata fields are no-overwrite: if the caller already supplied `metadata.session_id`, `metadata.sticky_key`, or `metadata.requester_runtime`, the plugin leaves them intact. +All metadata fields are no-overwrite. If the caller already supplied `metadata.session_id`, `metadata.sticky_key`, or `metadata.requester_runtime`, the plugin leaves them intact. -### Requester runtime metadata +The plugin does not include raw local paths (`workspace_dir`, `agent_dir`, `repo_root_hint`), environment variables, tokens, git remotes, full git status output, diffs, or arbitrary provider `extraParams`. -The runtime envelope is intentionally small and sanitized. By default it contains: +## Direction-2 tool handling -- required affinity/session fields: `session_key`, `openclaw_session_id` -- hashed identifiers: `requester_host_id`, `session_source_hint` -- bounded repo hints when available: `repo_name`, `git_branch_hint`, `git_dirty_hint` -- provider/model hints when available: `provider`, `model_id` -- explicit execution-mode marker: `tool_execution: "direction_2_gateway"` -- metadata classification and redaction policy +Requester-side tool execution is handled outside this plugin by the Claude Code / TaaS / OpenClaw Direction-2 path. -It does **not** include raw local paths (`workspace_dir`, `agent_dir`, `repo_root_hint`) by default. It also never includes environment variables, tokens, git remotes, full status output, diffs, or arbitrary provider `extraParams`. Git probes are bounded with a short timeout. +The plugin intentionally leaves these payload structures untouched: -### Tool execution model: Direction-2 +- OpenAI `tools` +- `tool_choice` +- assistant `tool_calls` +- `role: "tool"` continuation messages -Requester-side tools are handled outside this plugin by Claude Code / TaaS / OpenClaw gateway Direction-2. Consequently this plugin: +It also does not inject: -- does not call `/internal/requester-bridges/leases` -- does not inject `requester_runtime.available_bridges` -- does not set `capture_mode: "bridge_capable"` -- does not poll `/internal/requester-bridges/poll` or post `/internal/requester-bridges/results` -- does not invoke requester-local `/tools/invoke` -- does not intercept OpenAI `tools`, `tool_calls`, or `role: "tool"` messages +- `requester_runtime.available_bridges` +- `capture_mode: "bridge_capable"` +- bridge operation names such as `requester.tool.invoke`, `openclaw.tool.invoke`, `bridge.ping`, or `bridge.echo` -### Session ID derivation +## Autorouter route capture -The ID is a SHA-256 hash of the session source, truncated to 16 hex chars and prefixed `oc:`. The plugin walks through a tier list to find the best available source: +TaaS may return response headers such as: -| Tier | Source | Notes | -|---|---|---| -| 1 | `ctx.workspaceDir` (explicit) | Best signal — populated for main agent and many subagents | -| 2 | `globalThis[pluginRegistryState].workspaceDir` | Parent agent workspace via plugin registry | -| 3 | `process.env.OPENCLAW_SESSION_ID` | If OpenClaw sets this env var for sub-agents in future | -| 4 | `process.env.OPENCLAW_AGENT_ID` / `OPENCLAW_RUN_ID` | Any stable per-agent env var | -| 5 | `OPENCLAW_STATE_DIR` hash | Per-installation fallback — least specific | +- `X-TaaS-Autorouted: true` +- `X-TaaS-Autorouter-Model` +- `X-TaaS-Autorouter-Mode` +- `X-TaaS-Autorouter-Algorithm-Source` +- `X-TaaS-Thinking-Applied` +- `X-TaaS-Routed-Context-Window` -| Property | Detail | -|---|---| -| **Stable** | Same value for every API turn within one conversation | -| **Unique** | Different workspaces / env vars → different IDs | -| **Resets on `/new`** | New conversation = new workspace = new ID | -| **Namespaced** | `oc:` prefix avoids collision with Claude Code and other TaaS clients | +The plugin stores the latest bounded capture per affinity session and per derived OpenClaw agent ID. -### Autorouter capture +Query latest route by agent: -The wrapper captures TaaS `X-TaaS-*` response headers for autorouted requests and exposes the most recent route via the gateway RPC: - -```text -taas.autorouter.lastRoute +```bash +openclaw gateway call taas.autorouter.lastRoute \ + --params '{"agentId":"new-agent-2"}' \ + --json ``` -Callers can query by `workspaceDir`, direct `sessionId`, or `agentId`. Captured values include the autorouted model, algorithm/mode, algorithm source, thinking level applied, and routed context window. - -### Sub-agent behaviour - -OpenClaw sub-agents run in isolated processes and may not receive a `workspaceDir` in their `wrapStreamFn` context. The tier fallback system ensures sub-agents always get a deterministic session ID: - -1. **If the sub-agent has a workspace** (Tier 1) — derives a unique ID from it. -2. **If the parent agent workspace is visible** via globalThis (Tier 2) — reuses the parent's ID. -3. **If OpenClaw injects env vars** (Tiers 3–4) — uses those for a stable per-agent ID. -4. **Last resort** (Tier 5) — falls back to the state dir hash. - -#### Debug logging - -Set `OPENCLAW_DEBUG=1` (or `NODE_ENV=development`) to emit the session ID source on each request: +Query by explicit affinity session ID: -```text -[taas-affinity] wrapStreamFn sessionId=oc:edebc39a82a8a041 source=workspaceDir:/home/user/.openclaw/... -[taas-affinity] resolveTransportTurnState sessionId=oc:edebc39a82a8a041 source=workspaceDir:... turnId=abc attempt=1 +```bash +openclaw gateway call taas.autorouter.lastRoute \ + --params '{"sessionId":"oc:0123456789abcdef"}' \ + --json ``` ---- - -## Requirements - -- **OpenClaw** ≥ 2026.4.27 - - Requires the provider plugin hooks exposed via `openclaw/plugin-sdk/core`, including `wrapStreamFn`, `hookAliases`, and `resolveTransportTurnState`. -- **Node.js** 22+ -- **TaaS** with session-affinity short-circuit support (commit `61a9960`+, April 2026) -- A CloudSigma account with TaaS access - -Older OpenClaw builds may fail to load the plugin or may load it without applying the transport/header hook. Upgrade OpenClaw before deploying this plugin to production instances. - ---- - -## Installation - -### Option 1 - npm install +Query by workspace path, deriving the same affinity session ID as the wrapper: ```bash -openclaw plugins install openclaw-token-cache-optimizer -openclaw gateway restart +openclaw gateway call taas.autorouter.lastRoute \ + --params '{"workspaceDir":"/home/cloudsigma/.openclaw/workspace-new-agent-2"}' \ + --json ``` -The published npm package ships pre-built JavaScript in `dist/` and works on OpenClaw `2026.4.27` and later. - -### Option 2 - manual install from source +## Install / update from checkout ```bash -git clone https://github.com/cloudsigma/openclaw-token-cache-optimizer \ - ~/.openclaw/extensions/openclaw-token-cache-optimizer -cd ~/.openclaw/extensions/openclaw-token-cache-optimizer -git checkout dev +cd /home/cloudsigma/openclaw-taas-affinity npm ci +npm test npm run build -openclaw gateway restart +rsync -a --delete \ + --exclude '.git' \ + --exclude 'node_modules' \ + --exclude 'package-lock.json' \ + ./ ~/.openclaw/extensions/openclaw-taas-affinity/ +cd ~/.openclaw/extensions/openclaw-taas-affinity +npm ci --omit=dev --ignore-scripts ``` -`npm ci` runs the `prepare` lifecycle script, which compiles TypeScript to `dist/index.js`. Re-run `npm run build` after pulling new source changes. +Then restart the managed gateway service during a controlled window: -No `openclaw.json` changes are required - the plugin auto-activates for all requests to the `cloudsigma` and `cloudsigma-staging` providers. +```bash +systemctl --user restart openclaw-gateway.service +``` -### Verify it loaded +Verify: ```bash openclaw gateway status openclaw plugins info openclaw-taas-affinity +openclaw gateway call taas.autorouter.lastRoute --params '{"agentId":"new-agent-2"}' --json ``` -You should see `Status: loaded` and the source pointing at `dist/index.js`. - ---- - -## Compatibility - -| OpenClaw gateway | Status | Notes | -|---|---|---| -| >= 2026.5.x | Supported | Loads compiled `dist/index.js` | -| 2026.4.27 - 2026.4.x | Supported | Loads compiled `dist/index.js`; transport/header support depends on gateway hook availability | -| < 2026.4.27 | Not supported | Hooks the plugin relies on are not exposed | - ---- - -## Verification - -### Local validation - -For repository/CI validation, install dev dependencies and run the test suite: +## Development ```bash -npm ci +npm run typecheck +npm run smoke +npm run unit npm test npm run build ``` -This runs: - -- `npm run typecheck` — validates the TypeScript source against the OpenClaw plugin SDK and Node typings -- `npm run smoke` — imports the plugin, registers the provider hook, and verifies payload/header injection plus autorouter capture -- `npm run unit` — validates sweeper/status/auto-abort behaviour and Direction-2 regressions - -Direction-2 regression coverage includes: - -- requester bridge lease endpoint is not called -- `available_bridges` and bridge capture metadata are not injected -- OpenAI `tools` pass through untouched -- assistant `tool_calls` and `role: "tool"` messages are not intercepted -- existing metadata fields are not overwritten +Current tests cover: -### TaaS logs +- manifest startup activation +- provider hook registration for `cloudsigma` and `cloudsigma-staging` +- metadata/header injection +- no-overwrite behavior +- absence of requester bridge descriptors +- OpenAI tool payload pass-through +- autorouter capture and lookup by workspace/session/agent -After installing, the first turn of every new conversation should show: - -```text -match_reason: "external_id_new" ← first turn (new session in Redis) -match_reason: "external_id" ← subsequent turns (known session) -``` - -Previously turn 1 would show `match_reason: "new"` with `confidence: 0.30`. - -### Redis (from a TaaS pod) - -```bash -redis-cli -h redis.taas.svc.cluster.local get "anth:session:oc:edebc39a82a8a041" -``` - -Replace the ID with your actual session ID. A non-null response confirms TaaS has bound the session to a slot. - ---- - -## Behaviour by session type - -| Session type | ID scope | -|---|---| -| Main agent | Own stable ID for the conversation lifetime | -| Spawned subagent | Own ID when a separate `workspaceDir` is present; otherwise tier fallback | -| Cron / isolated run | Own ID when an isolated workspace/env source exists | -| New conversation (`/new`, `/reset`) | New workspace → new ID | -| Parallel conversations | Each gets a separate ID when OpenClaw supplies separate workspaces/env sources | - ---- - -## Configuration - -None required for standard CloudSigma TaaS use. - -Supported environment variables: +## Environment variables | Variable | Default | Purpose | -|---|---|---| +|---|---:|---| | `OPENCLAW_DEBUG` | unset | Emit debug logs for session source and autorouter capture | -| `OPENCLAW_SESSION_ID` | unset | Optional session source fallback | -| `OPENCLAW_AGENT_ID` / `OPENCLAW_RUN_ID` | unset | Optional per-agent session source fallback | -| `OPENCLAW_STATE_DIR` | `~/.openclaw` | Last-resort stable source for fallback session ID | -| `TAAS_AFFINITY_SWEEP_INTERVAL_MS` | `3600000` | Background trash sweeper interval | -| `TAAS_AFFINITY_SWEEP_STALE_DAYS` | `7` | Age threshold for stale `.deleted` agent directories | -| `TAAS_AFFINITY_RUNS_STATUS_PATH` | `~/.openclaw/alien-studio/runs-status.json` | Stuck-run status JSON path | -| `TAAS_AFFINITY_AUTO_ABORT_ZOMBIES` | `false` | Opt-in zombie run auto-abort check | -| `TAAS_AFFINITY_AUTO_ABORT_DRY_RUN` | `false` | Log zombie abort candidates without aborting | +| `OPENCLAW_SESSION_ID` | unset | Preferred stable session source when supplied by OpenClaw | +| `OPENCLAW_AGENT_ID` / `OPENCLAW_RUN_ID` | unset | Fallback stable agent/session source | +| `OPENCLAW_STATE_DIR` | `~/.openclaw` | Last-resort stable fallback source | Requester bridge variables such as `TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED`, `TAAS_REQUESTER_BRIDGE_LEASE_URL`, and `TAAS_REQUESTER_BRIDGE_POLL_INTERVAL_MS` are obsolete and ignored by this plugin version. - ---- - -## Contributing - -Issues and PRs welcome. The core logic lives in [`index.ts`](./index.ts). - -## License - -MIT — see [LICENSE](./LICENSE). diff --git a/index.ts b/index.ts index 2636317..963b221 100644 --- a/index.ts +++ b/index.ts @@ -13,54 +13,70 @@ import type { /** * openclaw-taas-affinity * - * Injects a stable per-conversation session ID (oc:) into every - * outbound request to CloudSigma TaaS providers so the session-affinity layer - * achieves confidence=1.0 from turn 1, maximising prompt-cache hit rates. + * Narrow CloudSigma TaaS provider hook for the post-requester-bridge Claude Code + * lane. It only provides: + * - stable affinity metadata (`metadata.session_id`, `metadata.sticky_key`) + * - sanitized advisory requester runtime metadata + * - `X-Session-Id` transport header injection + * - TaaS autorouter response-header capture exposed through one gateway method * - * Requester-side tool execution is handled by OpenClaw / Claude Code / TaaS - * Direction-2. This plugin only provides affinity metadata, the X-Session-Id - * transport header, and TaaS autorouter response-header capture. + * Requester bridge leasing/polling/tool execution and OpenClaw run-maintenance + * sidecars intentionally do not live in this plugin. */ const SESSION_ID_PREFIX = "oc:" -const REQUESTER_RUNTIME_SCHEMA_VERSION = "2026-06-03" -const REQUESTER_RUNTIME_SOURCE = "openclaw-token-cache-optimizer" +const REQUESTER_RUNTIME_SCHEMA_VERSION = "2026-06-04" +const REQUESTER_RUNTIME_SOURCE = "openclaw-taas-affinity" const GIT_PROBE_TIMEOUT_MS = 250 +const LAST_ROUTE_LIMIT = 256 + +// OpenClaw stores active registry state (including workspaceDir) on globalThis +// under this well-known symbol key. +const PLUGIN_REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState") +const isDev = process.env.NODE_ENV === "development" || Boolean(process.env.OPENCLAW_DEBUG) type RequesterRuntime = Record -type RuntimeContextHints = { - workspaceDir?: string - agentDir?: string - repoRoot?: string - modelId?: string - provider?: string +type AutorouterCapture = { + sessionId: string + capturedAt: number + autorouterModel: string | null + autorouterAlgo: string | null + autorouterAlgoSource: string | null + thinkingApplied: string | null + routedContextWindow: number | null } -// OpenClaw stores the active registry state (including workspaceDir) on globalThis -// under this well-known symbol key. -const PLUGIN_REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState") +function asRecord(value: unknown): Record | undefined { + return value !== null && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined +} -const isDev = - process.env.NODE_ENV === "development" || Boolean(process.env.OPENCLAW_DEBUG) +function safeString(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined +} + +function stableHash(value: string, prefix: string, length = 16): string { + const hex = createHash("sha256").update(value, "utf8").digest("hex") + return `${prefix}:${hex.slice(0, length)}` +} + +function deriveSessionId(source: string): string { + const normalised = source.startsWith("env:") || source.startsWith("agent:") + ? source + : path.resolve(source) + const hex = createHash("sha256").update(normalised, "utf8").digest("hex") + return `${SESSION_ID_PREFIX}${hex.slice(0, 16)}` +} -/** - * Resolves the best available session source string, working through the - * fallback tier list. Returns undefined only when no source is found at all - * (practically impossible — Tier 5 always produces a value via fallbackSessionId). - * - * Tier 1 is handled by the caller (ctx.workspaceDir) before reaching this. - */ function getActiveSessionSource(): string | undefined { - // Tier 3: explicit env var set by OpenClaw for sub-agents const envSessionId = process.env.OPENCLAW_SESSION_ID if (envSessionId) return `env:${envSessionId}` - // Tier 4: stable per-agent env vars const envAgentId = process.env.OPENCLAW_AGENT_ID ?? process.env.OPENCLAW_RUN_ID if (envAgentId) return `agent:${envAgentId}` - // Tier 2: workspace dir from plugin registry state (parent agent) const state = (globalThis as Record)[PLUGIN_REGISTRY_STATE] as | { workspaceDir?: string } | null @@ -68,30 +84,12 @@ function getActiveSessionSource(): string | undefined { return state?.workspaceDir } -function deriveSessionId(source: string): string { - const normalised = source.startsWith("env:") || source.startsWith("agent:") - ? source // already a stable unique token, hash as-is - : path.resolve(source) - const hex = createHash("sha256").update(normalised, "utf8").digest("hex") - return `${SESSION_ID_PREFIX}${hex.slice(0, 16)}` -} - -function fallbackSessionId(): string { - // Tier 5: stable per-installation but not per-session. - // Uses the OpenClaw state dir as a last-resort stable source. +function fallbackSessionSource(): string { const stateDir = process.env.OPENCLAW_STATE_DIR ?? path.join(os.homedir(), ".openclaw") - return deriveSessionId(stateDir) + return `stateDir:${stateDir}` } -/** - * Resolves the session ID, walking through Tiers 1–5 in order. - * Returns both the derived ID and the source string used (for debug logging). - */ -function resolveSessionId(workspaceDirFromCtx?: string): { - sessionId: string - source: string -} { - // Tier 1: explicit from wrapStreamFn context +function resolveSessionId(workspaceDirFromCtx?: string): { sessionId: string; source: string } { if (workspaceDirFromCtx) { return { sessionId: deriveSessionId(workspaceDirFromCtx), @@ -99,47 +97,13 @@ function resolveSessionId(workspaceDirFromCtx?: string): { } } - // Tiers 2–4 via getActiveSessionSource() - const activeSource = getActiveSessionSource() - if (activeSource) { - return { - sessionId: deriveSessionId(activeSource), - source: activeSource, - } - } - - // Tier 5 fallback - const stateDir = process.env.OPENCLAW_STATE_DIR ?? path.join(os.homedir(), ".openclaw") + const activeSource = getActiveSessionSource() ?? fallbackSessionSource() return { - sessionId: deriveSessionId(stateDir), - source: `stateDir:${stateDir}`, + sessionId: deriveSessionId(activeSource), + source: activeSource, } } -function asRecord(value: unknown): Record | undefined { - return value !== null && typeof value === "object" && !Array.isArray(value) - ? (value as Record) - : undefined -} - -function safeString(value: unknown): string | undefined { - return typeof value === "string" && value.trim().length > 0 ? value : undefined -} - -function stableHash(value: string, prefix: string, length = 16): string { - const hex = createHash("sha256").update(value, "utf8").digest("hex") - return `${prefix}:${hex.slice(0, length)}` -} - -function resolveWorkspaceDir(ctx: ProviderWrapStreamFnContext): string | undefined { - return safeString(ctx.workspaceDir) -} - -function resolveAgentDir(ctx: ProviderWrapStreamFnContext): string | undefined { - const maybeAgentDir = (ctx as unknown as Record).agentDir - return safeString(maybeAgentDir) -} - function findRepoRoot(startDir?: string): string | undefined { if (!startDir) return undefined let current = path.resolve(startDir) @@ -171,8 +135,7 @@ function readGitHeadBranch(repoRoot?: string): string | undefined { if (gitHead && gitHead !== "HEAD") return gitHead.slice(0, 120) try { - const headPath = path.join(repoRoot, ".git", "HEAD") - const head = fs.readFileSync(headPath, "utf8").trim() + const head = fs.readFileSync(path.join(repoRoot, ".git", "HEAD"), "utf8").trim() const match = /^ref: refs\/heads\/(.+)$/.exec(head) return match?.[1]?.slice(0, 120) } catch { @@ -187,46 +150,41 @@ function readGitDirtyHint(repoRoot?: string): boolean | undefined { return status.length > 0 } -function runtimeContextHints(ctx: ProviderWrapStreamFnContext): RuntimeContextHints { - const workspaceDir = resolveWorkspaceDir(ctx) - const agentDir = resolveAgentDir(ctx) - const repoRoot = findRepoRoot(workspaceDir) - const ctxRecord = ctx as unknown as Record - const modelRecord = asRecord(ctxRecord.model) - const modelId = safeString(ctxRecord.modelId) ?? safeString(modelRecord?.id) - return { - workspaceDir, - agentDir, - repoRoot, - modelId, - provider: safeString(ctxRecord.provider), - } +function deriveAgentIdForCapture(ctx: { agentDir?: string; workspaceDir?: string }): string | null { + const envAgent = process.env.OPENCLAW_AGENT_ID ?? process.env.OPENCLAW_RUN_ID + if (envAgent && envAgent.trim()) return envAgent.trim() + const base = ctx.agentDir ?? ctx.workspaceDir + if (!base) return null + const seg = path.basename(path.resolve(base)) + if (!seg) return null + if (seg === "workspace") return "main" + if (seg.startsWith("workspace-")) return seg.slice("workspace-".length) + return seg } -/** - * Builds a small, sanitized requester runtime envelope for downstream routing. - * - * This intentionally includes only bounded, locally-derived hints. It never - * serializes process.env, git remotes, full git status/diffs, tokens, or - * arbitrary provider extraParams. - */ function buildRequesterRuntime( ctx: ProviderWrapStreamFnContext, sessionId: string, source: string ): RequesterRuntime { - const hints = runtimeContextHints(ctx) + const ctxRecord = ctx as unknown as Record + const modelRecord = asRecord(ctxRecord.model) + const workspaceDir = safeString(ctx.workspaceDir) + const repoRoot = findRepoRoot(workspaceDir) + const modelId = safeString(ctxRecord.modelId) ?? safeString(modelRecord?.id) + const provider = safeString(ctxRecord.provider) + return { schema_version: REQUESTER_RUNTIME_SCHEMA_VERSION, source: REQUESTER_RUNTIME_SOURCE, session_key: sessionId, openclaw_session_id: sessionId, requester_host_id: stableHash(os.hostname(), "host"), - ...(hints.repoRoot && { repo_name: path.basename(hints.repoRoot) }), - ...(hints.repoRoot && { git_branch_hint: readGitHeadBranch(hints.repoRoot) }), - ...(hints.repoRoot && { git_dirty_hint: readGitDirtyHint(hints.repoRoot) }), - ...(hints.provider && { provider: hints.provider }), - ...(hints.modelId && { model_id: hints.modelId }), + ...(repoRoot && { repo_name: path.basename(repoRoot) }), + ...(repoRoot && { git_branch_hint: readGitHeadBranch(repoRoot) }), + ...(repoRoot && { git_dirty_hint: readGitDirtyHint(repoRoot) }), + ...(provider && { provider }), + ...(modelId && { model_id: modelId }), session_source_hint: stableHash(source, "source"), tool_execution: "direction_2_gateway", metadata_classification: { @@ -244,7 +202,6 @@ function patchPayloadMetadata( requesterRuntime?: RequesterRuntime ): Record { const existingMeta = asRecord(payload.metadata) ?? {} - // Never overwrite existing metadata fields — the caller owns them. const needsSessionId = !existingMeta.session_id const needsStickyKey = !existingMeta.sticky_key const needsRequesterRuntime = requesterRuntime && !existingMeta.requester_runtime @@ -260,492 +217,41 @@ function patchPayloadMetadata( } } -/** - * Captured autorouter metadata per-session, populated by the onResponse callback - * installed in `buildWrapper`. Exposed via the `taas.autorouter.lastRoute` gateway - * RPC so Alien AI Studio (or any other client) can pull the latest TaaS routing - * decision for a session — including the actual model chosen by the autorouter, - * the algorithm used, the source of that algorithm (org/dept/key/user/system), - * the thinking level applied, and the chosen model's context window. - * - * The map is keyed by the affinity session ID we already derive in - * `resolveSessionId(ctx.workspaceDir)`. Stored values are bounded — see - * `LAST_ROUTE_LIMIT` — to avoid unbounded growth in long-lived gateways. - */ -type AutorouterCapture = { - sessionId: string - capturedAt: number - autorouterModel: string | null - autorouterAlgo: string | null - autorouterAlgoSource: string | null - thinkingApplied: string | null - routedContextWindow: number | null -} - - -// ── Trash sweeper configuration ────────────────────────────────────────────── -const SWEEP_INTERVAL_MS = Number(process.env.TAAS_AFFINITY_SWEEP_INTERVAL_MS) || 3_600_000 -const SWEEP_STALE_DAYS = Number(process.env.TAAS_AFFINITY_SWEEP_STALE_DAYS) || 7 -const SWEEP_TRAJECTORY_MAX_MB = Number(process.env.TAAS_AFFINITY_SWEEP_TRAJECTORY_MAX_MB) || 50 -const SWEEP_TRAJECTORY_KEEP_MB = Number(process.env.TAAS_AFFINITY_SWEEP_TRAJECTORY_KEEP_MB) || 10 -const SWEEP_LOCK_ORPHAN_MIN = Number(process.env.TAAS_AFFINITY_SWEEP_LOCK_ORPHAN_MIN) || 60 -const SWEEP_CHECKPOINT_KEEP = Number(process.env.TAAS_AFFINITY_SWEEP_CHECKPOINT_KEEP) || 3 - -// ── Stuck-run status writer configuration ──────────────────────────────────── -const STATUS_INTERVAL_MS = 30_000 -const STATUS_WARN_MS = 5 * 60_000 -const STATUS_STUCK_MS = 15 * 60_000 -const STATUS_ZOMBIE_MS = 60 * 60_000 -const STATUS_PATH = process.env.TAAS_AFFINITY_RUNS_STATUS_PATH - || path.join(os.homedir(), ".openclaw", "alien-studio", "runs-status.json") - -// ── Zombie auto-abort configuration ─────────────────────────────────────────── -// ⚠️ PLUGIN BLOCKED: No plugin-side dispatch for chat.abort exists in the OpenClaw -// plugin SDK. The default abortRun function logs a warning and is a no-op. -// When the SDK adds api.runtime.chat.abort() or dispatchGatewayMethod(), -// replace the default abortRun in register() below. -// Set TAAS_AFFINITY_AUTO_ABORT_ZOMBIES=true to enable (currently opt-in). -const AUTO_ABORT_ENABLED = process.env.TAAS_AFFINITY_AUTO_ABORT_ZOMBIES === "true" -const AUTO_ABORT_DRY_RUN = process.env.TAAS_AFFINITY_AUTO_ABORT_DRY_RUN === "true" -const AUTO_ABORT_THRESHOLD_MS = Number(process.env.TAAS_AFFINITY_AUTO_ABORT_THRESHOLD_MS) || STATUS_ZOMBIE_MS -const AUTO_ABORT_CHECK_INTERVAL_MS = Number(process.env.TAAS_AFFINITY_AUTO_ABORT_CHECK_INTERVAL_MS) || 60_000 -const AUTO_ABORT_ABORTED_SET_MAX = 1000 - -/** In-memory set of sessionKeys already aborted in this process (LRU-bounded). */ -const abortedInThisProcess: string[] = [] - -/** - * Injected abort function. Default logs a warning because the plugin SDK does not - * expose a way to dispatch chat.abort from inside a plugin. Replace via - * setAbortRunFn() in tests or when the SDK adds the capability. - */ -let abortRun: (sessionKey: string) => Promise = async (sessionKey: string) => { - console.warn("[taas-affinity] auto-abort: chat.abort not available via plugin SDK; sessionKey=" + sessionKey) -} - -/** Replace the abort function (for testing or future SDK integration). */ -function setAbortRunFn(fn: (sessionKey: string) => Promise): void { - abortRun = fn -} - -// ── Trash sweeper ──────────────────────────────────────────────────────────── -let sweepInProgress = false - -function runTrashSweep(agentsDir?: string): void { - if (sweepInProgress) return - sweepInProgress = true - const t0 = Date.now() - let deleted = 0 - let truncated = 0 - let orphanedLocks = 0 - try { - const base = agentsDir || path.join(os.homedir(), ".openclaw", "agents") - let agents: string[] - try { agents = fs.readdirSync(base) } catch { sweepInProgress = false; return } - const staleMs = SWEEP_STALE_DAYS * 24 * 60 * 60 * 1000 - const trajectoryMaxBytes = SWEEP_TRAJECTORY_MAX_MB * 1024 * 1024 - const trajectoryKeepBytes = SWEEP_TRAJECTORY_KEEP_MB * 1024 * 1024 - const lockOrphanMs = SWEEP_LOCK_ORPHAN_MIN * 60 * 1000 - - for (const agentId of agents) { - const sessionsDir = path.join(base, agentId, "sessions") - let entries: string[] - try { entries = fs.readdirSync(sessionsDir) } catch { continue } - - // Group checkpoint files by base sessionId - const checkpointMap = new Map() - for (const name of entries) { - const m = name.match(/^(.+?)\.checkpoint\.\d+\.jsonl$/) - if (m) { - const arr = checkpointMap.get(m[1]) || [] - arr.push(name) - checkpointMap.set(m[1], arr) - } - } - - for (const name of entries) { - const fp = path.join(sessionsDir, name) - let st: fs.Stats - try { st = fs.statSync(fp) } catch { continue } - const age = Date.now() - st.mtimeMs - - // .deleted.* and .reset.* files - if (name.match(/\.deleted\.\d+\.jsonl(\.lock)?$/) || name.match(/\.reset\.\d+\.jsonl(\.lock)?$/)) { - if (age > staleMs) { - try { fs.unlinkSync(fp); deleted++ } catch (e) { console.warn("[taas-affinity] trash sweep: failed to delete", fp, e) } - } - continue - } - - // .checkpoint files — handled in batch below - if (name.match(/\.checkpoint\.\d+\.jsonl$/)) continue - - // .trajectory.jsonl oversized - if (name.endsWith(".trajectory.jsonl") && st.size > trajectoryMaxBytes) { - try { - const fh = fs.openSync(fp, "r") - const keepFrom = Math.max(0, st.size - trajectoryKeepBytes) - const buf = Buffer.alloc(st.size - keepFrom) - fs.readSync(fh, buf, 0, buf.length, keepFrom) - fs.closeSync(fh) - // Find first newline to avoid partial line - let nlIdx = buf.indexOf(10) // \n - const dataBuf = nlIdx >= 0 ? buf.slice(nlIdx + 1) : buf - // Backup original - const bakPath = fp + ".pre-truncate-" + Date.now() + ".bak" - fs.renameSync(fp, bakPath) - fs.writeFileSync(fp, dataBuf) - truncated++ - } catch (e) { console.warn("[taas-affinity] trash sweep: failed to truncate", fp, e) } - continue - } - - // .lock files — orphan detection - if (name.endsWith(".jsonl.lock")) { - if (age > lockOrphanMs) { - try { - const content = fs.readFileSync(fp, "utf8").trim() - const pidMatch = content.match(/^\d+/) - if (pidMatch) { - const pid = parseInt(pidMatch[0], 10) - if (!fs.existsSync("/proc/" + pid)) { - fs.unlinkSync(fp); orphanedLocks++ - } - } else { - fs.unlinkSync(fp); orphanedLocks++ - } - } catch (e) { console.warn("[taas-affinity] trash sweep: failed to process lock", fp, e) } - } - continue - } - } - - // Prune excess checkpoints per session - for (const [baseSessionId, files] of checkpointMap) { - const sorted = files - .map(f => ({ f, mtime: fs.statSync(path.join(sessionsDir, f)).mtimeMs })) - .sort((a, b) => b.mtime - a.mtime) - // Remove beyond KEEP limit - for (let i = SWEEP_CHECKPOINT_KEEP; i < sorted.length; i++) { - const fp = path.join(sessionsDir, sorted[i].f) - try { fs.unlinkSync(fp); deleted++ } catch (e) { console.warn("[taas-affinity] trash sweep: failed to delete checkpoint", fp, e) } - } - // Remove any stale checkpoints - for (const entry of sorted) { - const age2 = Date.now() - entry.mtime - if (age2 > staleMs) { - const fp = path.join(sessionsDir, entry.f) - try { fs.unlinkSync(fp); deleted++ } catch (e) { console.warn("[taas-affinity] trash sweep: failed to delete stale checkpoint", fp, e) } - } - } - } - } - } catch (e) { - console.warn("[taas-affinity] trash sweep error", e) - } finally { - sweepInProgress = false - console.info("[taas-affinity] trash sweep: deleted=" + deleted + " truncated=" + truncated + " orphaned_locks=" + orphanedLocks + " elapsed=" + (Date.now() - t0) + "ms") - } -} - -// ── Stuck-run status writer ────────────────────────────────────────────────── -interface RunState { - agentId: string - sessionUuid: string - sessionKey: string - lockMtime: number - idleMs: number - state: "active" | "warn" | "stuck" | "zombie" - pid: number | null - pidAlive: boolean | null -} - -interface RunsStatus { - generatedAt: number - thresholds: { warnMs: number; stuckMs: number; zombieMs: number } - counts: { active: number; warn: number; stuck: number; zombie: number } - runs: RunState[] -} - -let statusInProgress = false - -function writeRunStatus(agentsDir?: string, statusPath?: string): void { - if (statusInProgress) return - statusInProgress = true - try { - const base = agentsDir || path.join(os.homedir(), ".openclaw", "agents") - const outPath = statusPath || STATUS_PATH - let agents: string[] - try { agents = fs.readdirSync(base) } catch { agents = [] } - - const runs: RunState[] = [] - - for (const agentId of agents) { - const sessionsDir = path.join(base, agentId, "sessions") - let entries: string[] - try { entries = fs.readdirSync(sessionsDir) } catch { continue } - - // Try to read identity.json for mainKey - let mainKey: string | null = null - try { - const idJson = fs.readFileSync(path.join(base, agentId, "agent", "identity.json"), "utf8") - const parsed = JSON.parse(idJson) - if (typeof parsed.mainKey === "string") mainKey = parsed.mainKey - } catch { /* no identity file */ } - - // Build main session UUID from mainKey - let mainUuid: string | null = null - if (mainKey) { - // mainKey format like "agent::main" => no UUID, or "agent::session:" - // But for the main session, it's typically "agent::main" - // We'll compare by checking if sessionUuid appears in mainKey - const parts = mainKey.split(":") - const lastPart = parts[parts.length - 1] - if (lastPart !== "main" && lastPart.length >= 8) { - mainUuid = lastPart - } - } - - for (const name of entries) { - if (!name.endsWith(".jsonl.lock")) continue - const sessionUuid = name.replace(/\.jsonl\.lock$/, "") - const fp = path.join(sessionsDir, name) - let st: fs.Stats - try { st = fs.statSync(fp) } catch { continue } - - const lockMtime = st.mtimeMs - const idleMs = Date.now() - lockMtime - - let state: RunState["state"] - if (idleMs < STATUS_WARN_MS) state = "active" - else if (idleMs < STATUS_STUCK_MS) state = "warn" - else if (idleMs < STATUS_ZOMBIE_MS) state = "stuck" - else state = "zombie" - - let pid: number | null = null - try { - const content = fs.readFileSync(fp, "utf8").trim() - const pidMatch = content.match(/^\d+/) - if (pidMatch) pid = parseInt(pidMatch[0], 10) - } catch { /* empty */ } - - let pidAlive: boolean | null = null - if (pid !== null) { - try { pidAlive = fs.existsSync("/proc/" + pid) } catch { pidAlive = null } - } - - let sessionKey: string - if (mainKey && sessionUuid === mainUuid) { - sessionKey = "agent:" + agentId + ":main" - } else { - sessionKey = "agent:" + agentId + ":session:" + sessionUuid - } - - runs.push({ agentId, sessionUuid, sessionKey, lockMtime, idleMs, state, pid, pidAlive }) - } - } - - runs.sort((a, b) => b.idleMs - a.idleMs) - - const counts = { active: 0, warn: 0, stuck: 0, zombie: 0 } - for (const r of runs) counts[r.state]++ - - const output: RunsStatus = { - generatedAt: Date.now(), - thresholds: { warnMs: STATUS_WARN_MS, stuckMs: STATUS_STUCK_MS, zombieMs: STATUS_ZOMBIE_MS }, - counts, - runs, - } - - // Atomic write - const dir = path.dirname(outPath) - try { fs.mkdirSync(dir, { recursive: true }) } catch { /* may already exist */ } - const tmpPath = path.join(dir, ".runs-status.tmp." + process.pid) - fs.writeFileSync(tmpPath, JSON.stringify(output, null, 2)) - fs.renameSync(tmpPath, outPath) - } catch (e) { - console.warn("[taas-affinity] runs-status write error", e) - } finally { - statusInProgress = false - } -} - -// ── Zombie auto-abort ────────────────────────────────────────────────────────── -let abortCheckInProgress = false - -function runAbortCheck(agentsDir?: string): void { - if (abortCheckInProgress) return - abortCheckInProgress = true - try { - const base = agentsDir || path.join(os.homedir(), ".openclaw", "agents") - let agents: string[] - try { agents = fs.readdirSync(base) } catch { agents = [] } - - const zombies: Array<{ sessionKey: string; idleMs: number }> = [] - - for (const agentId of agents) { - const sessionsDir = path.join(base, agentId, "sessions") - let entries: string[] - try { entries = fs.readdirSync(sessionsDir) } catch { continue } - - let mainKey: string | null = null - try { - const idJson = fs.readFileSync(path.join(base, agentId, "agent", "identity.json"), "utf8") - const parsed = JSON.parse(idJson) - if (typeof parsed.mainKey === "string") mainKey = parsed.mainKey - } catch { /* no identity file */ } - - let mainUuid: string | null = null - if (mainKey) { - const parts = mainKey.split(":") - const lastPart = parts[parts.length - 1] - if (lastPart !== "main" && lastPart.length >= 8) { - mainUuid = lastPart - } - } - - for (const name of entries) { - if (!name.endsWith(".jsonl.lock")) continue - const sessionUuid = name.replace(/\.jsonl\.lock$/, "") - const fp = path.join(sessionsDir, name) - let st: fs.Stats - try { st = fs.statSync(fp) } catch { continue } - - const idleMs = Date.now() - st.mtimeMs - if (idleMs < AUTO_ABORT_THRESHOLD_MS) continue - - let sessionKey: string - if (mainKey && sessionUuid === mainUuid) { - sessionKey = "agent:" + agentId + ":main" - } else { - sessionKey = "agent:" + agentId + ":session:" + sessionUuid - } - - zombies.push({ sessionKey, idleMs }) - } - } - - // When auto-abort is disabled (default), just log candidates - if (!AUTO_ABORT_ENABLED) { - if (zombies.length > 0) { - console.info("[taas-affinity] zombie candidates (auto-abort disabled): " + zombies.length + " runs: " + zombies.map(z => z.sessionKey).join(", ")) - } - return - } - - // Auto-abort is enabled — abort each zombie not already aborted in this process - for (const z of zombies) { - if (abortedInThisProcess.includes(z.sessionKey)) continue - // Cap the set at AUTO_ABORT_ABORTED_SET_MAX (FIFO eviction) - if (abortedInThisProcess.length >= AUTO_ABORT_ABORTED_SET_MAX) { - abortedInThisProcess.shift() - } - abortedInThisProcess.push(z.sessionKey) - - if (AUTO_ABORT_DRY_RUN) { - console.info("[taas-affinity] auto-abort DRY RUN: would abort sessionKey=" + z.sessionKey + " idleMs=" + z.idleMs) - } else { - console.warn("[taas-affinity] auto-aborting zombie run sessionKey=" + z.sessionKey + " idleMs=" + z.idleMs) - abortRun(z.sessionKey).catch((e: unknown) => { - console.warn("[taas-affinity] auto-abort failed sessionKey=" + z.sessionKey, e) - }) - } - } - } catch (e) { - console.warn("[taas-affinity] abort-check error", e) - } finally { - abortCheckInProgress = false - } -} - -// ── Background task scheduler ──────────────────────────────────────────────── -const backgroundTimers: (NodeJS.Timeout)[] = [] - -function trackBackgroundTimer(timer: NodeJS.Timeout): NodeJS.Timeout { - timer.unref?.() - backgroundTimers.push(timer) - return timer -} - -function startBackgroundTasks(): void { - // Trash sweeper — randomised initial delay (0-30s) to stagger - const sweepDelay = Math.floor(Math.random() * 30_000) - trackBackgroundTimer(setTimeout(() => { - runTrashSweep() - trackBackgroundTimer(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS)) - }, sweepDelay)) - - // Stuck-run status writer — 5s initial delay, then every 30s - trackBackgroundTimer(setTimeout(() => { - writeRunStatus() - trackBackgroundTimer(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS)) - }, 5_000)) - - // Zombie auto-abort — 10s initial delay, then every AUTO_ABORT_CHECK_INTERVAL_MS - trackBackgroundTimer(setTimeout(() => { - runAbortCheck() - trackBackgroundTimer(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS)) - }, 10_000)) -} - -const LAST_ROUTE_LIMIT = 256 const lastRouteBySessionId = new Map() const lastRouteByAgentId = new Map() function pruneLastRouteMap(): void { - if (lastRouteBySessionId.size <= LAST_ROUTE_LIMIT) return - // Drop oldest entries by capturedAt ascending until we're back under the cap. - const entries = [...lastRouteBySessionId.entries()].sort( - (a, b) => a[1].capturedAt - b[1].capturedAt - ) - const toDrop = entries.length - LAST_ROUTE_LIMIT - for (let i = 0; i < toDrop; i++) { - lastRouteBySessionId.delete(entries[i][0]) + if (lastRouteBySessionId.size > LAST_ROUTE_LIMIT) { + const entries = [...lastRouteBySessionId.entries()].sort( + (a, b) => a[1].capturedAt - b[1].capturedAt + ) + for (let i = 0; i < entries.length - LAST_ROUTE_LIMIT; i++) { + lastRouteBySessionId.delete(entries[i][0]) + } } - if (lastRouteByAgentId.size <= LAST_ROUTE_LIMIT) return - const aEntries = [...lastRouteByAgentId.entries()].sort( - (a, b) => a[1].capturedAt - b[1].capturedAt - ) - const aDrop = aEntries.length - LAST_ROUTE_LIMIT - for (let i = 0; i < aDrop; i++) { - lastRouteByAgentId.delete(aEntries[i][0]) + if (lastRouteByAgentId.size > LAST_ROUTE_LIMIT) { + const entries = [...lastRouteByAgentId.entries()].sort( + (a, b) => a[1].capturedAt - b[1].capturedAt + ) + for (let i = 0; i < entries.length - LAST_ROUTE_LIMIT; i++) { + lastRouteByAgentId.delete(entries[i][0]) + } } } -/** - * Derive a stable agent identifier from the OpenClaw runtime context. Prefers - * explicit env vars set by the gateway for sub-agents (OPENCLAW_AGENT_ID / - * OPENCLAW_RUN_ID), then falls back to the trailing path segment of agentDir - * or workspaceDir (e.g. /home/u/.openclaw/workspace-new-agent-3 -> "new-agent-3", - * /home/u/.openclaw/workspace -> "main"). - */ -function deriveAgentIdForCapture( - ctx: { agentDir?: string; workspaceDir?: string } -): string | null { - const envAgent = process.env.OPENCLAW_AGENT_ID ?? process.env.OPENCLAW_RUN_ID - if (envAgent && envAgent.trim()) return envAgent.trim() - const base = ctx.agentDir ?? ctx.workspaceDir - if (!base) return null - const seg = path.basename(path.resolve(base)) - if (!seg) return null - if (seg === "workspace") return "main" - if (seg.startsWith("workspace-")) return seg.slice("workspace-".length) - return seg -} - function captureAutorouterFromHeaders( sessionId: string, headers: Record, agentId: string | null ): void { - // Header names from TaaS proxy are emitted in canonical "X-TaaS-*" form - // but Node/undici lowercases incoming response headers. Read case-insensitively. const lowered: Record = {} for (const [k, v] of Object.entries(headers)) { if (typeof v === "string") lowered[k.toLowerCase()] = v } - const autorouted = lowered["x-taas-autorouted"] - if (autorouted !== "true") return // ignore non-autorouted responses + if (lowered["x-taas-autorouted"] !== "true") return + + const rawContextWindow = lowered["x-taas-routed-context-window"] + const parsedContextWindow = rawContextWindow ? Number(rawContextWindow) : null const capture: AutorouterCapture = { sessionId, capturedAt: Date.now(), @@ -753,16 +259,16 @@ function captureAutorouterFromHeaders( autorouterAlgo: lowered["x-taas-autorouter-mode"] ?? null, autorouterAlgoSource: lowered["x-taas-autorouter-algorithm-source"] ?? null, thinkingApplied: lowered["x-taas-thinking-applied"] ?? null, - routedContextWindow: (() => { - const raw = lowered["x-taas-routed-context-window"] - if (!raw) return null - const n = Number(raw) - return Number.isFinite(n) && n > 0 ? n : null - })(), + routedContextWindow: + parsedContextWindow && Number.isFinite(parsedContextWindow) && parsedContextWindow > 0 + ? parsedContextWindow + : null, } + lastRouteBySessionId.set(sessionId, capture) if (agentId) lastRouteByAgentId.set(agentId, capture) pruneLastRouteMap() + if (isDev) { console.debug( `[taas-affinity] captured autorouter sessionId=${sessionId} ` + @@ -786,47 +292,27 @@ function buildWrapper(ctx: ProviderWrapStreamFnContext) { if (!streamFn) return undefined const { sessionId, source } = resolveSessionId(ctx.workspaceDir) - const agentIdForCapture = deriveAgentIdForCapture(ctx) + const agentIdForCapture = deriveAgentIdForCapture(ctx as { agentDir?: string; workspaceDir?: string }) const requesterRuntime = buildRequesterRuntime(ctx, sessionId, source) - if (isDev) { - console.debug(`[taas-affinity] wrapStreamFn sessionId=${sessionId} source=${source}`) - } + if (isDev) console.debug(`[taas-affinity] wrapStreamFn sessionId=${sessionId} source=${source}`) const inner = streamFn return function taasAffinityStreamFn(...args: Parameters) { const [model, context, options] = args const prevOnPayload = options?.onPayload - const onPayload: NonNullable["onPayload"] = async ( - payload, - payloadModel - ) => { + const onPayload: NonNullable["onPayload"] = async (payload, payloadModel) => { const payloadRecord = asRecord(payload) - if (!payloadRecord) { - if (prevOnPayload) return prevOnPayload(payload, payloadModel) - return payload - } + if (!payloadRecord) return prevOnPayload ? prevOnPayload(payload, payloadModel) : payload const patched = patchPayloadMetadata(payloadRecord, sessionId, requesterRuntime) - if (prevOnPayload) return prevOnPayload(patched, payloadModel) - return patched + return prevOnPayload ? prevOnPayload(patched, payloadModel) : patched } const prevOnResponse = options?.onResponse - const onResponse: NonNullable["onResponse"] = async ( - response, - responseModel - ) => { + const onResponse: NonNullable["onResponse"] = async (response, responseModel) => { try { - captureAutorouterFromHeaders( - sessionId, - response?.headers ?? {}, - agentIdForCapture - ) + captureAutorouterFromHeaders(sessionId, response?.headers ?? {}, agentIdForCapture) } catch (err) { - if (isDev) { - console.debug( - `[taas-affinity] onResponse capture failed: ${(err as Error)?.message ?? err}` - ) - } + if (isDev) console.debug(`[taas-affinity] onResponse capture failed: ${(err as Error)?.message ?? err}`) } if (prevOnResponse) await prevOnResponse(response, responseModel) } @@ -834,40 +320,16 @@ function buildWrapper(ctx: ProviderWrapStreamFnContext) { } as typeof inner } -/** - * Injects X-Session-Id as a per-turn transport header. - * - * resolveTransportTurnState is called by generic HTTP and WebSocket transports - * to attach provider-native headers on every request turn. This is the correct - * SDK hook for header injection — onPayload only controls the body. - * - * Note: ctx.sessionId here is OpenClaw's own internal ephemeral session UUID, - * not the TaaS affinity ID we derive. We derive our own ID from workspaceDir / - * env vars so the TaaS affinity signal is stable across retries within a turn. - */ -function buildTransportTurnState( - ctx: ProviderResolveTransportTurnStateContext -): ProviderTransportTurnState | null { - // We don't have ctx.workspaceDir here (it's not on this context type), - // so use the active session source tiers directly. - const activeSource = getActiveSessionSource() - const sessionId = activeSource - ? deriveSessionId(activeSource) - : fallbackSessionId() - +function buildTransportTurnState(ctx: ProviderResolveTransportTurnStateContext): ProviderTransportTurnState | null { + const activeSource = getActiveSessionSource() ?? fallbackSessionSource() + const sessionId = deriveSessionId(activeSource) if (isDev) { console.debug( `[taas-affinity] resolveTransportTurnState sessionId=${sessionId} ` + - `source=${activeSource ?? "stateDir-fallback"} ` + - `turnId=${ctx.turnId} attempt=${ctx.attempt}` + `source=${activeSource} turnId=${ctx.turnId} attempt=${ctx.attempt}` ) } - - return { - headers: { - "X-Session-Id": sessionId, - }, - } + return { headers: { "X-Session-Id": sessionId } } } export default { @@ -878,8 +340,6 @@ export default { "pin sessions to the same upstream slot from turn 1, maximising prompt-cache hit rates.", register(api: OpenClawPluginApi) { - // Unique id avoids conflicting with the config-driven "cloudsigma" provider. - // hookAliases routes cloudsigma/cloudsigma-staging requests to this hook. api.registerProvider({ id: "taas-affinity-hook", label: "CloudSigma TaaS Token Cache Optimizer", @@ -889,27 +349,14 @@ export default { resolveTransportTurnState: buildTransportTurnState, }) - // Expose captured TaaS autorouter metadata to gateway clients (Studio, etc.). - // The Alien AI Studio polls this after each turn to populate the model/algo/ - // thinking/context-window fields in the AgentChatPanel for cloudsigma/auto and - // other autorouted requests. See PRD "Alien AI Studio - Auto-Routing Model UX" - // (Confluence 1901363271). if (typeof api.registerGatewayMethod === "function") api.registerGatewayMethod( "taas.autorouter.lastRoute", async ({ params, respond }) => { - // Accept either { workspaceDir } (preferred — derives sessionId the - // same way the wrapper does) or { sessionId } (direct lookup). const pp = (params ?? {}) as Record - const directAgentId = - typeof pp.agentId === "string" && pp.agentId.trim() - ? pp.agentId.trim() - : null - const directSessionId = - typeof pp.sessionId === "string" ? pp.sessionId : null - const workspaceDir = - typeof pp.workspaceDir === "string" ? pp.workspaceDir : undefined - - // Prefer agent-keyed lookup when the caller supplied an agentId. + const directAgentId = safeString(pp.agentId) ?? null + const directSessionId = safeString(pp.sessionId) ?? null + const workspaceDir = safeString(pp.workspaceDir) + if (directAgentId) { const captured = getLastRouteForAgent(directAgentId) respond(true, { @@ -920,27 +367,22 @@ export default { return } - const resolvedSessionId = - directSessionId ?? resolveSessionId(workspaceDir).sessionId - const captured = getLastRouteForSession(resolvedSessionId) - respond(true, { sessionId: resolvedSessionId, capture: captured }) + const resolvedSessionId = directSessionId ?? resolveSessionId(workspaceDir).sessionId + respond(true, { + sessionId: resolvedSessionId, + capture: getLastRouteForSession(resolvedSessionId), + }) }, { scope: "operator.read" } ) - - // Start background tasks (trash sweeper + stuck-run status writer). - // Timers run for the lifetime of the gateway process. - try { startBackgroundTasks() } catch (e) { console.warn("[taas-affinity] failed to start background tasks", e) } }, + _testExports: { - runTrashSweep, - writeRunStatus, - runAbortCheck, - setAbortRunFn, - resetSweepInProgress: () => { sweepInProgress = false }, - resetStatusInProgress: () => { statusInProgress = false }, - resetAbortCheckInProgress: () => { abortCheckInProgress = false }, - clearAbortedInThisProcess: () => { abortedInThisProcess.length = 0 }, - getAbortedInThisProcess: () => [...abortedInThisProcess], + buildRequesterRuntime, + patchPayloadMetadata, + resolveSessionId, + captureAutorouterFromHeaders, + getLastRouteForAgent, + getLastRouteForSession, }, } diff --git a/openclaw.plugin.json b/openclaw.plugin.json index db3581f..9265524 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -1,5 +1,9 @@ { "id": "openclaw-taas-affinity", + "activation": { + "onStartup": true, + "onProviders": ["cloudsigma", "cloudsigma-staging"] + }, "providers": ["cloudsigma", "cloudsigma-staging"], "configSchema": { "type": "object", diff --git a/test/auto-abort.test.ts b/test/auto-abort.test.ts deleted file mode 100644 index c04af0d..0000000 --- a/test/auto-abort.test.ts +++ /dev/null @@ -1,229 +0,0 @@ -import assert from "node:assert/strict" -import fs from "node:fs" -import os from "node:os" -import path from "node:path" -import { test } from "node:test" - -// Helper: dynamic import with cache bust -async function loadPlugin(env: Record = {}) { - const oldEnv: Record = {} - for (const [key, value] of Object.entries(env)) { - oldEnv[key] = process.env[key] - if (value === undefined) delete process.env[key] - else process.env[key] = value - } - const mod = await import(`../index.ts?bust=${Date.now()}-${Math.random()}`) - for (const [key, value] of Object.entries(oldEnv)) { - if (value === undefined) delete process.env[key] - else process.env[key] = value - } - return mod -} - -// Helper: create a temp agents dir with zombie lock files -function createTempAgentsDir(structure: Array<{ - agentId: string - sessions: Array<{ uuid: string; mtimeAgeMs: number; pid?: number }> - identityJson?: Record -}>) { - const base = fs.mkdtempSync(path.join(os.tmpdir(), "taas-abort-test-")) - for (const { agentId, sessions = [], identityJson } of structure) { - const sessDir = path.join(base, agentId, "sessions") - fs.mkdirSync(sessDir, { recursive: true }) - for (const s of sessions) { - const lockPath = path.join(sessDir, s.uuid + ".jsonl.lock") - fs.writeFileSync(lockPath, s.pid != null ? String(s.pid) : "12345") - const newMtime = new Date(Date.now() - s.mtimeAgeMs) - fs.utimesSync(lockPath, newMtime, newMtime) - } - if (identityJson) { - const agentDir = path.join(base, agentId, "agent") - fs.mkdirSync(agentDir, { recursive: true }) - fs.writeFileSync(path.join(agentDir, "identity.json"), JSON.stringify(identityJson)) - } - } - return base -} - -// Collect console output -function collectConsole() { - const infoLogs: string[] = [] - const warnLogs: string[] = [] - const origInfo = console.info - const origWarn = console.warn - console.info = (...args: unknown[]) => { infoLogs.push(args.map(String).join(" ")) } - console.warn = (...args: unknown[]) => { warnLogs.push(args.map(String).join(" ")) } - return { - infoLogs, - warnLogs, - restore() { - console.info = origInfo - console.warn = origWarn - }, - } -} - -// AC-AUTO-ABORT.1: With auto-abort enabled, classify one zombie session, mock the abort callback, assert it was called with the right sessionKey. -test("AC-AUTO-ABORT.1: auto-abort enabled calls abortRun for zombie session", async () => { - const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true" }) - const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports - - clearAbortedInThisProcess() - resetAbortCheckInProgress() - - const aborted: Array<{ sessionKey: string }> = [] - setAbortRunFn(async (sessionKey: string) => { - aborted.push({ sessionKey }) - }) - - // Create a zombie: lock file older than 60 min - const agentsDir = createTempAgentsDir([{ - agentId: "test-agent", - sessions: [{ uuid: "abc12345-6789-def0-1234-567890abcdef", mtimeAgeMs: 65 * 60 * 1000 }], - }]) - - const logs = collectConsole() - try { - runAbortCheck(agentsDir) - } finally { - logs.restore() - } - - assert.ok(aborted.length === 1, `Expected 1 abort, got ${aborted.length}`) - assert.match(aborted[0].sessionKey, /^agent:test-agent:session:abc12345/) -}) - -// AC-AUTO-ABORT.2: With auto-abort disabled (default), classify zombies, assert the abort callback was NOT called and one log line listing candidates. -test("AC-AUTO-ABORT.2: auto-abort disabled logs candidates without aborting", async () => { - const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: undefined }) - const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports - - clearAbortedInThisProcess() - resetAbortCheckInProgress() - - const aborted: string[] = [] - setAbortRunFn(async (sessionKey: string) => { - aborted.push(sessionKey) - }) - - // Create two zombie lock files - const agentsDir = createTempAgentsDir([{ - agentId: "main", - sessions: [ - { uuid: "deadbeef-0000-0000-0000-000000000001", mtimeAgeMs: 70 * 60 * 1000 }, - { uuid: "deadbeef-0000-0000-0000-000000000002", mtimeAgeMs: 65 * 60 * 1000 }, - ], - }]) - - const logs = collectConsole() - try { - runAbortCheck(agentsDir) - } finally { - logs.restore() - } - - assert.equal(aborted.length, 0, "abortRun should NOT be called when auto-abort is disabled") - // Check that we logged the candidates - const candidateLog = logs.infoLogs.find(l => l.includes("zombie candidates (auto-abort disabled)")) - assert.ok(candidateLog, `Expected candidate log, got: ${JSON.stringify(logs.infoLogs)}`) - assert.ok(candidateLog!.includes("2 runs"), `Expected '2 runs' in log, got: ${candidateLog}`) -}) - -// AC-AUTO-ABORT.3: Same zombie classified twice in successive ticks → abort callback called ONLY ONCE (idempotence via the in-memory set). -test("AC-AUTO-ABORT.3: idempotent — same zombie only aborted once across two ticks", async () => { - const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true" }) - const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports - - clearAbortedInThisProcess() - resetAbortCheckInProgress() - - const aborted: string[] = [] - setAbortRunFn(async (sessionKey: string) => { - aborted.push(sessionKey) - }) - - const agentsDir = createTempAgentsDir([{ - agentId: "test-agent", - sessions: [{ uuid: "idempotent-0000-0000-000000000001", mtimeAgeMs: 66 * 60 * 1000 }], - }]) - - // First tick - resetAbortCheckInProgress() - runAbortCheck(agentsDir) - assert.equal(aborted.length, 1, "First tick should abort once") - - // Second tick — same zombie is still there - resetAbortCheckInProgress() - runAbortCheck(agentsDir) - assert.equal(aborted.length, 1, "Second tick should NOT abort again — idempotent") -}) - -// AC-AUTO-ABORT.4: 1001 distinct zombies all aborted → the in-memory set caps at 1000 entries. -test("AC-AUTO-ABORT.4: aborted set caps at 1000 entries (LRU eviction)", async () => { - const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true" }) - const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess, getAbortedInThisProcess } = mod.default._testExports - - clearAbortedInThisProcess() - resetAbortCheckInProgress() - - const aborted: string[] = [] - setAbortRunFn(async (sessionKey: string) => { - aborted.push(sessionKey) - }) - - // Create 1100 zombie lock files (more than the 1000 cap) - const sessions = [] - for (let i = 0; i < 1100; i++) { - sessions.push({ - uuid: `zombie-${String(i).padStart(8, "0")}-0000-000000000000`, - mtimeAgeMs: (61 + (i % 10)) * 60 * 1000, // 61-70 min idle - }) - } - const agentsDir = createTempAgentsDir([{ - agentId: "load-agent", - sessions, - }]) - - resetAbortCheckInProgress() - runAbortCheck(agentsDir) - - // All 1100 should have been passed to abortRun (they're new each time) - assert.equal(aborted.length, 1100, `Expected 1100 aborted calls, got ${aborted.length}`) - - // The in-memory set should be capped at 1000 - const set = getAbortedInThisProcess() - assert.ok(set.length <= 1000 && set.length > 0, `Expected set <= 1000, got ${set.length}`) -}) - -// Cleanup temp dirs (node:test handles most, but ensure) -test("AC-AUTO-ABORT.5: dry-run mode logs but does not call abortRun", async () => { - const mod = await loadPlugin({ - TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true", - TAAS_AFFINITY_AUTO_ABORT_DRY_RUN: "true", - }) - const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports - - clearAbortedInThisProcess() - resetAbortCheckInProgress() - - const aborted: string[] = [] - setAbortRunFn(async (sessionKey: string) => { - aborted.push(sessionKey) - }) - - const agentsDir = createTempAgentsDir([{ - agentId: "dry-agent", - sessions: [{ uuid: "dryrun-test-0000-0000-000000000001", mtimeAgeMs: 65 * 60 * 1000 }], - }]) - - const logs = collectConsole() - try { - runAbortCheck(agentsDir) - } finally { - logs.restore() - } - - assert.equal(aborted.length, 0, "dry-run should NOT call abortRun") - const dryLog = logs.infoLogs.find(l => l.includes("auto-abort DRY RUN")) - assert.ok(dryLog, `Expected DRY RUN log, got: ${JSON.stringify(logs.infoLogs)}`) -}) diff --git a/test/requester-runtime.test.ts b/test/requester-runtime.test.ts index 6dae355..e1614b1 100644 --- a/test/requester-runtime.test.ts +++ b/test/requester-runtime.test.ts @@ -1,5 +1,4 @@ import assert from "node:assert/strict" -import { createServer } from "node:http" import { test } from "node:test" async function loadPlugin(env: Record = {}) { @@ -45,34 +44,23 @@ async function runPayload(provider: any, payload: any, ctxExtra: Record { - let called = false - const server = createServer((_req, res) => { - called = true - res.statusCode = 500 - res.end("unexpected") - }) - await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)) - const address = server.address() - assert(address && typeof address === "object") - const baseUrl = `http://127.0.0.1:${address.port}` +test("Direction-2 metadata has no requester bridge descriptors or raw local paths", async () => { const { plugin, restore } = await loadPlugin({ TAAS_REQUESTER_BRIDGE_PLUGIN_ENABLED: "1", - TAAS_REQUESTER_BRIDGE_LEASE_URL: `${baseUrl}/internal/requester-bridges/leases`, + TAAS_REQUESTER_BRIDGE_LEASE_URL: "http://127.0.0.1:9/internal/requester-bridges/leases", }) try { - const payload = await runPayload(captureProvider(plugin), { messages: [], metadata: {} }, { baseUrl }) + const payload = await runPayload(captureProvider(plugin), { messages: [], metadata: {} }) const runtime = payload.metadata.requester_runtime - assert.equal(called, false, "lease endpoint must not be called") assert.equal("available_bridges" in runtime, false) assert.equal("capture_mode" in runtime, false) assert.equal(runtime.tool_execution, "direction_2_gateway") + assert.equal(runtime.source, "openclaw-taas-affinity") assert.equal("workspace_dir" in runtime, false) assert.equal("agent_dir" in runtime, false) assert.equal("repo_root_hint" in runtime, false) } finally { restore() - await new Promise((resolve) => server.close(() => resolve())) } }) diff --git a/test/smoke.mjs b/test/smoke.mjs index 2b695e5..a7b1ae2 100644 --- a/test/smoke.mjs +++ b/test/smoke.mjs @@ -1,6 +1,11 @@ import assert from "node:assert/strict" +import fs from "node:fs" import plugin from "../index.ts" +const manifest = JSON.parse(fs.readFileSync(new URL("../openclaw.plugin.json", import.meta.url), "utf8")) +assert.equal(manifest.activation.onStartup, true, "plugin explicitly loads at gateway startup") +assert.deepEqual(manifest.activation.onProviders, ["cloudsigma", "cloudsigma-staging"]) + assert.equal(plugin.id, "openclaw-taas-affinity") assert.equal(typeof plugin.register, "function") @@ -42,7 +47,7 @@ assert.match(capturedPayload.metadata.session_id, /^oc:[a-f0-9]{16}$/) assert.equal(capturedPayload.metadata.sticky_key, capturedPayload.metadata.session_id) assert.equal( capturedPayload.metadata.requester_runtime.source, - "openclaw-token-cache-optimizer" + "openclaw-taas-affinity" ) assert.equal( capturedPayload.metadata.requester_runtime.session_key, diff --git a/test/sweeper.test.ts b/test/sweeper.test.ts deleted file mode 100644 index a764ec0..0000000 --- a/test/sweeper.test.ts +++ /dev/null @@ -1,303 +0,0 @@ -import assert from "node:assert/strict" -import fs from "node:fs" -import os from "node:os" -import path from "node:path" -import { test } from "node:test" - -// Helper: dynamic import with cache bust -async function loadPlugin(env: Record = {}) { - const oldEnv: Record = {} - for (const [key, value] of Object.entries(env)) { - oldEnv[key] = process.env[key] - if (value === undefined) delete process.env[key] - else process.env[key] = value - } - const mod = await import(`../index.ts?bust=${Date.now()}-${Math.random()}`) - for (const [key, value] of Object.entries(oldEnv)) { - if (value === undefined) delete process.env[key] - else process.env[key] = value - } - return mod -} - -// Helper: create a temp agents dir with specified structure -function createTempAgentsDir(structure: { - agentId: string - files: Array<{ name: string; content?: string; sizeKb?: number; mtimeAgeMs?: number }> - identityJson?: Record -}[]) { - const base = fs.mkdtempSync(path.join(os.tmpdir(), "taas-sweep-test-")) - for (const { agentId, files = [], identityJson } of structure) { - const sessDir = path.join(base, agentId, "sessions") - fs.mkdirSync(sessDir, { recursive: true }) - for (const f of files) { - const fp = path.join(sessDir, f.name) - if (f.sizeKb) { - const data = "x".repeat(f.sizeKb * 1024) - fs.writeFileSync(fp, data) - } else { - fs.writeFileSync(fp, f.content || "test content for " + f.name) - } - if (f.mtimeAgeMs) { - const newMtime = new Date(Date.now() - f.mtimeAgeMs) - fs.utimesSync(fp, newMtime, newMtime) - } - } - if (identityJson) { - const agentDir = path.join(base, agentId, "agent") - fs.mkdirSync(agentDir, { recursive: true }) - fs.writeFileSync(path.join(agentDir, "identity.json"), JSON.stringify(identityJson)) - } - } - return base -} - -// AC-SWEEP.1: full sweep logic -test("AC-SWEEP.1: sweeper deletes stale .deleted, keeps fresh, prunes checkpoints, truncates trajectory, removes orphan lock", async () => { - const mod = await loadPlugin({ - TAAS_AFFINITY_SWEEP_STALE_DAYS: "7", - TAAS_AFFINITY_SWEEP_TRAJECTORY_MAX_MB: "50", - TAAS_AFFINITY_SWEEP_TRAJECTORY_KEEP_MB: "10", - TAAS_AFFINITY_SWEEP_LOCK_ORPHAN_MIN: "60", - TAAS_AFFINITY_SWEEP_CHECKPOINT_KEEP: "3", - }) - const { runTrashSweep, resetSweepInProgress } = mod.default._testExports - - const tmpDir = createTempAgentsDir([ - { - agentId: "test-agent", - files: [ - // stale deleted (8 days old) -> should be removed - { name: "abc.deleted.123.jsonl", mtimeAgeMs: 8 * 24 * 60 * 60 * 1000 }, - // fresh deleted (1 day old) -> should be kept - { name: "def.deleted.456.jsonl", mtimeAgeMs: 1 * 24 * 60 * 60 * 1000 }, - // 5 checkpoint files for same session (keep newest 3) - { name: "sess1.checkpoint.100.jsonl", mtimeAgeMs: 5 * 24 * 60 * 60 * 1000 }, - { name: "sess1.checkpoint.200.jsonl", mtimeAgeMs: 4 * 24 * 60 * 60 * 1000 }, - { name: "sess1.checkpoint.300.jsonl", mtimeAgeMs: 3 * 24 * 60 * 60 * 1000 }, - { name: "sess1.checkpoint.400.jsonl", mtimeAgeMs: 2 * 24 * 60 * 60 * 1000 }, - { name: "sess1.checkpoint.500.jsonl", mtimeAgeMs: 1 * 24 * 60 * 60 * 1000 }, - // normal file -> should be untouched - { name: "normal.jsonl", mtimeAgeMs: 0 }, - ], - }, - ]) - - const sessDir = path.join(tmpDir, "test-agent", "sessions") - - // Create a large trajectory file (~60MB -> will be truncated) - const trajPath = path.join(sessDir, "big.trajectory.jsonl") - const lineObj = JSON.stringify({ type: "msg", content: "x".repeat(200) }) + "\n" - const lineSize = Buffer.byteLength(lineObj) - const targetLines = Math.ceil((60 * 1024 * 1024) / lineSize) - const trajFd = fs.openSync(trajPath, "w") - for (let i = 0; i < targetLines; i++) { - fs.writeSync(trajFd, lineObj) - } - fs.closeSync(trajFd) - // Make it old so mtime check passes - fs.utimesSync(trajPath, new Date(Date.now() - 60000), new Date(Date.now() - 60000)) - - // Orphaned lock with bogus PID (old enough) - const lockPath = path.join(sessDir, "run1.jsonl.lock") - fs.writeFileSync(lockPath, "99999") - fs.utimesSync(lockPath, new Date(Date.now() - 120 * 60 * 1000), new Date(Date.now() - 120 * 60 * 1000)) - - // Run sweep - resetSweepInProgress() - runTrashSweep(tmpDir) - - // Assert: stale .deleted should be removed - assert.ok(!fs.existsSync(path.join(sessDir, "abc.deleted.123.jsonl")), "stale .deleted removed") - // Assert: fresh .deleted should be kept - assert.ok(fs.existsSync(path.join(sessDir, "def.deleted.456.jsonl")), "fresh .deleted kept") - // Assert: only 3 newest checkpoints remain (checkpoint.300, .400, .500) - assert.ok(!fs.existsSync(path.join(sessDir, "sess1.checkpoint.100.jsonl")), "oldest checkpoint removed") - assert.ok(!fs.existsSync(path.join(sessDir, "sess1.checkpoint.200.jsonl")), "second-oldest checkpoint removed") - assert.ok(fs.existsSync(path.join(sessDir, "sess1.checkpoint.300.jsonl")), "checkpoint 300 kept") - assert.ok(fs.existsSync(path.join(sessDir, "sess1.checkpoint.400.jsonl")), "checkpoint 400 kept") - assert.ok(fs.existsSync(path.join(sessDir, "sess1.checkpoint.500.jsonl")), "checkpoint 500 kept") - // Assert: trajectory was truncated + backup created - assert.ok(fs.existsSync(trajPath), "trajectory file still exists") - const newStat = fs.statSync(trajPath) - assert.ok(newStat.size < 60 * 1024 * 1024, "trajectory was truncated (smaller than original)") - assert.ok(newStat.size > 0, "trajectory is not empty") - // Check backup file exists - const bakFiles = fs.readdirSync(sessDir).filter(f => f.startsWith("big.trajectory.jsonl.pre-truncate-")) - assert.ok(bakFiles.length === 1, "exactly one backup file created") - // Assert: orphaned lock removed - assert.ok(!fs.existsSync(lockPath), "orphaned lock removed") - // Assert: normal file untouched - assert.ok(fs.existsSync(path.join(sessDir, "normal.jsonl")), "normal file untouched") - - // Cleanup - fs.rmSync(tmpDir, { recursive: true, force: true }) -}) - -// AC-SWEEP.2: sweeper no-op when called twice within interval -test("AC-SWEEP.2: sweeper runs correctly on fresh call after reset", async () => { - const mod = await loadPlugin() - const { runTrashSweep, resetSweepInProgress } = mod.default._testExports - - const tmpDir = createTempAgentsDir([{ - agentId: "noop-agent", - files: [ - { name: "stale.deleted.999.jsonl", mtimeAgeMs: 8 * 24 * 60 * 60 * 1000 }, - ], - }]) - - const sessDir = path.join(tmpDir, "noop-agent", "sessions") - const stalePath = path.join(sessDir, "stale.deleted.999.jsonl") - - // First sweep should work - resetSweepInProgress() - runTrashSweep(tmpDir) - assert.ok(!fs.existsSync(stalePath), "stale file deleted in first sweep") - - // Recreate for second sweep - fs.writeFileSync(stalePath, "should also be deleted") - fs.utimesSync(stalePath, new Date(Date.now() - 8 * 24 * 60 * 60 * 1000), new Date(Date.now() - 8 * 24 * 60 * 60 * 1000)) - - resetSweepInProgress() - runTrashSweep(tmpDir) - assert.ok(!fs.existsSync(stalePath), "stale file also deleted in second sweep") - - fs.rmSync(tmpDir, { recursive: true, force: true }) -}) - -// AC-STATUS.1: stuck-run status classification -test("AC-STATUS.1: status writer classifies locks as active/warn/stuck/zombie", async () => { - const mod = await loadPlugin() - const { writeRunStatus, resetStatusInProgress } = mod.default._testExports - - const tmpDir = createTempAgentsDir([ - { - agentId: "agent-a", - files: [ - // Fresh lock (1 min old) -> active - { name: "fresh.jsonl.lock", content: "12345", mtimeAgeMs: 1 * 60 * 1000 }, - // Warn lock (6 min old) -> warn - { name: "warn-sess.jsonl.lock", content: "23456", mtimeAgeMs: 6 * 60 * 1000 }, - ], - }, - { - agentId: "agent-b", - files: [ - // Stuck lock (20 min old) -> stuck - { name: "stuck.jsonl.lock", content: "34567", mtimeAgeMs: 20 * 60 * 1000 }, - ], - }, - { - agentId: "agent-c", - files: [ - // Zombie lock (70 min old) -> zombie - { name: "zombie.jsonl.lock", content: "45678", mtimeAgeMs: 70 * 60 * 1000 }, - ], - }, - ]) - - const statusPath = path.join(os.tmpdir(), "runs-status-test1-" + Date.now() + ".json") - - resetStatusInProgress() - writeRunStatus(tmpDir, statusPath) - - const content = fs.readFileSync(statusPath, "utf8") - const status = JSON.parse(content) - - // Assert structure - assert.ok(typeof status.generatedAt === "number", "generatedAt is number") - assert.deepEqual(Object.keys(status.thresholds).sort(), ["stuckMs", "warnMs", "zombieMs"]) - assert.deepEqual(Object.keys(status.counts).sort(), ["active", "stuck", "warn", "zombie"]) - - // Assert counts - assert.equal(status.counts.active, 1, "1 active") - assert.equal(status.counts.warn, 1, "1 warn") - assert.equal(status.counts.stuck, 1, "1 stuck") - assert.equal(status.counts.zombie, 1, "1 zombie") - - // Assert 4 runs total - assert.equal(status.runs.length, 4, "4 runs total") - - // Assert sorted by idleMs descending (worst offenders first) - for (let i = 1; i < status.runs.length; i++) { - assert.ok(status.runs[i - 1].idleMs >= status.runs[i].idleMs, "sorted by idleMs desc") - } - - // Check states - const states = status.runs.map((r: any) => r.state).sort() - assert.deepEqual(states, ["active", "stuck", "warn", "zombie"]) - - // Check each run has expected fields - for (const r of status.runs) { - assert.ok(typeof r.agentId === "string", "agentId is string") - assert.ok(typeof r.sessionUuid === "string", "sessionUuid is string") - assert.ok(typeof r.sessionKey === "string", "sessionKey is string") - assert.ok(typeof r.lockMtime === "number", "lockMtime is number") - assert.ok(typeof r.idleMs === "number", "idleMs is number") - assert.ok(["active", "warn", "stuck", "zombie"].includes(r.state), "valid state") - assert.ok(typeof r.pid === "number" || r.pid === null, "pid is number or null") - assert.ok(typeof r.pidAlive === "boolean" || r.pidAlive === null, "pidAlive is boolean or null") - } - - fs.rmSync(tmpDir, { recursive: true, force: true }) - fs.unlinkSync(statusPath) -}) - -// AC-STATUS.2: no lock files -> empty state -test("AC-STATUS.2: no lock files produces valid JSON with empty runs and zero counts", async () => { - const mod = await loadPlugin() - const { writeRunStatus, resetStatusInProgress } = mod.default._testExports - - // Empty agents dir - const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "taas-status-empty-")) - fs.mkdirSync(path.join(tmpDir, "some-agent", "sessions"), { recursive: true }) - - const statusPath = path.join(os.tmpdir(), "runs-status-empty-" + Date.now() + ".json") - - resetStatusInProgress() - writeRunStatus(tmpDir, statusPath) - - const content = fs.readFileSync(statusPath, "utf8") - const status = JSON.parse(content) - - assert.equal(status.counts.active, 0) - assert.equal(status.counts.warn, 0) - assert.equal(status.counts.stuck, 0) - assert.equal(status.counts.zombie, 0) - assert.ok(Array.isArray(status.runs)) - assert.equal(status.runs.length, 0) - assert.ok(typeof status.generatedAt === "number") - - fs.rmSync(tmpDir, { recursive: true, force: true }) - fs.unlinkSync(statusPath) -}) - -// AC-STATUS.3: atomic write produces valid JSON -test("AC-STATUS.3: atomic write produces valid JSON even with rapid successive writes", async () => { - const mod = await loadPlugin() - const { writeRunStatus, resetStatusInProgress } = mod.default._testExports - - const tmpDir = createTempAgentsDir([{ - agentId: "atomic-agent", - files: [ - { name: "lock1.jsonl.lock", content: "11111", mtimeAgeMs: 1000 }, - ], - }]) - - const statusPath = path.join(os.tmpdir(), "runs-status-atomic-" + Date.now() + ".json") - - // Write twice in rapid succession - resetStatusInProgress() - writeRunStatus(tmpDir, statusPath) - resetStatusInProgress() - writeRunStatus(tmpDir, statusPath) - - // Both writes should produce valid JSON - const content = fs.readFileSync(statusPath, "utf8") - const status = JSON.parse(content) - assert.ok(status.generatedAt, "has generatedAt") - assert.ok(Array.isArray(status.runs), "has runs array") - - fs.rmSync(tmpDir, { recursive: true, force: true }) - fs.unlinkSync(statusPath) -})