diff --git a/DOCUMENTATION_INDEX.md b/DOCUMENTATION_INDEX.md index 11509c2..a3b1af7 100644 --- a/DOCUMENTATION_INDEX.md +++ b/DOCUMENTATION_INDEX.md @@ -35,20 +35,21 @@ Package root: `src/conclave/` (installed as the `conclave` package; console scri | Module | Path | Responsibility | |--------|------|----------------| | Package API | [`src/conclave/__init__.py`](src/conclave/__init__.py) | Public exports: `Council`, `CouncilResult`, `ModelAnswer`, `TokenUsage`, `DebateRound`, `AdversarialResult`, `ConclaveConfig`, `load_config`, `__version__`. | -| Council | [`src/conclave/council.py`](src/conclave/council.py) | Primary importable entry point. Reusable primitives (`fan_out`, `synthesize_blocks`) + the public mode API (`ask`/`debate`/`adversarial`, async + sync). | +| Council | [`src/conclave/council.py`](src/conclave/council.py) | Primary importable entry point. Reusable primitives (`fan_out`, `synthesize_blocks`) + the public mode API (`ask`/`debate`/`adversarial`, async + sync) + streaming (`ask_stream`/`stream_sync`, synthesize/raw). | | Modes | [`src/conclave/modes.py`](src/conclave/modes.py) | `debate` (multi-round, anonymized peers, drop-out) and `adversarial` (propose → refute → verdict) orchestration, built on `Council.fan_out`/`synthesize_blocks`. | +| Streaming | [`src/conclave/streaming.py`](src/conclave/streaming.py) | `stream_ask` — council-level streaming engine behind `Council.ask_stream`: concurrent member interleaving via an `asyncio.Queue`, optional synthesizer streaming, terminal `done` event with the full `CouncilResult` (synthesize/raw only). | | Prompts | [`src/conclave/prompts.py`](src/conclave/prompts.py) | Role/template strings for debate + adversarial and the anonymized peer-block builder. | -| Providers | [`src/conclave/providers.py`](src/conclave/providers.py) | Single async `call_model` path: resolves the adapter, reads the key by name at call time, calls transport, parses; latency/usage/redacted-error capture; never raises. | -| Transport | [`src/conclave/transport.py`](src/conclave/transport.py) | `post_json` — the single async httpx network boundary for the whole provider highway. | +| Providers | [`src/conclave/providers.py`](src/conclave/providers.py) | Async `call_model` (buffered) + `call_model_stream` (SSE) paths: resolve the adapter, read the key by name at call time, call transport, parse; latency/usage/redacted-error capture; never raises (partial text preserved on mid-stream failure). | +| Transport | [`src/conclave/transport.py`](src/conclave/transport.py) | `post_json` + `stream_sse` — the single async httpx network boundary (buffered POST and `client.stream(...)` SSE) for the whole provider highway. | | Adapter registry | [`src/conclave/adapters/__init__.py`](src/conclave/adapters/__init__.py) | `resolve_adapter(model_id, config)` — provider registry + **extension seam** (one registration per family; config-only for OpenAI-compatible endpoints). | -| Adapter base | [`src/conclave/adapters/base.py`](src/conclave/adapters/base.py) | `ProviderAdapter` protocol, `ProviderError`, and `redact()` (error-string secret scrubber). | +| Adapter base | [`src/conclave/adapters/base.py`](src/conclave/adapters/base.py) | `ProviderAdapter` protocol (`build_request`/`parse_response` + `stream_request`/`parse_sse_event`), `SSEDelta`, `ProviderError`, and `redact()` (error-string secret scrubber). | | OpenAI-compat adapter | [`src/conclave/adapters/openai_compat.py`](src/conclave/adapters/openai_compat.py) | `OpenAICompatAdapter` — openai/xai/perplexity + custom OpenAI-compatible endpoints. | | Anthropic adapter | [`src/conclave/adapters/anthropic.py`](src/conclave/adapters/anthropic.py) | `AnthropicAdapter` — native `/v1/messages`, system-prompt hoist, required `max_tokens`. | | Gemini adapter | [`src/conclave/adapters/gemini.py`](src/conclave/adapters/gemini.py) | `GeminiAdapter` — native `generateContent`, OpenAI-role mapping, `usageMetadata`. | | Registry | [`src/conclave/registry.py`](src/conclave/registry.py) | Friendly-name → model-id defaults; provider → env-var mapping; key **presence** logic (never values). | | Config | [`src/conclave/config.py`](src/conclave/config.py) | Loads/merges `~/.conclave/config.yml` over defaults; resolves model ids and named/CSV councils; parses the `endpoints:` section (custom OpenAI-compatible providers). | -| Models | [`src/conclave/models.py`](src/conclave/models.py) | Pydantic result contract: `TokenUsage`, `ModelAnswer`, `DebateRound`, `AdversarialResult`, `CouncilResult` (`mode`/`rounds`/`adversarial`). Stable downstream surface. | -| CLI | [`src/conclave/cli.py`](src/conclave/cli.py) | `conclave ask` (synthesize/raw/debate/adversarial; `--rounds`/`--proposer`) + `conclave providers`; rich panels and `--json`; never prints key values. | +| Models | [`src/conclave/models.py`](src/conclave/models.py) | Pydantic result contract: `TokenUsage`, `ModelAnswer`, `StreamEvent`, `DebateRound`, `AdversarialResult`, `CouncilResult` (`mode`/`rounds`/`adversarial`). Stable downstream surface. | +| CLI | [`src/conclave/cli.py`](src/conclave/cli.py) | `conclave ask` (synthesize/raw/debate/adversarial; `--rounds`/`--proposer`/`--stream`) + `conclave providers`; rich panels, live `--stream` output, and `--json`; never prints key values. | | Logging | [`src/conclave/logging.py`](src/conclave/logging.py) | Logger factory; stderr; verbosity via `CONCLAVE_LOG_LEVEL` (default `WARNING`). | ## Tests @@ -62,6 +63,7 @@ Package root: `src/conclave/` (installed as the `conclave` package; console scri | Registry/config tests | [`tests/test_registry_config.py`](tests/test_registry_config.py) | Name resolution, key-presence logic, config merge. | | CLI tests | [`tests/test_cli.py`](tests/test_cli.py) | Typer `CliRunner`: exit-code contract (0 success / 1 zero-usable-answers / 2 usage error), `--json` payload + exit code, human renderers per mode, `providers` table never prints secrets, aclose lifecycle. | | Transport tests | [`tests/test_transport.py`](tests/test_transport.py) | `post_json` via httpx `MockTransport`: success/error-status/non-JSON fallback, timeout & connect/HTTP errors → `TransportError` (key never leaks), client reuse/pooling, aclose idempotency. | +| Streaming tests | [`tests/test_streaming.py`](tests/test_streaming.py) | Per-adapter SSE via `MockTransport` (openai-compat/anthropic/gemini): incremental chunks + assembled answer == concatenation == buffered result; mid-stream malformed-frame/connection-drop/non-2xx → error set with partial text preserved (never raises); key redaction in stream errors; buffered `ask()` never opens a stream; `Council.ask_stream` interleaving + terminal `done` shape; CLI `--stream` smoke + exit-code contract + debate rejection; `--stream` + cache one-shot replay. | | Logging tests | [`tests/test_logging.py`](tests/test_logging.py) | `CONCLAVE_LOG_LEVEL` resolution (default `WARNING`, case-insensitive, unknown → `WARNING`), factory contract, one-shot configuration. | | Fixtures | [`tests/conftest.py`](tests/conftest.py) | Shared fixtures; mocks the httpx transport so the suite needs no network and no API keys. | diff --git a/README.md b/README.md index 8083773..bb2e5a4 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,18 @@ the synthesizer *and* the adversarial judge. of a council defined in your config (see below). The built-in `default` council is all known providers. +Add `--stream` to render member (and synthesizer) tokens live as they arrive +(`synthesize`/`raw` modes only): + +```bash +conclave ask "Explain CRDTs in two paragraphs." -c grok,gemini,claude --stream +``` + +Streaming and the non-streaming default produce the **same** final +`CouncilResult`; `--stream` only changes how output is rendered. It is ignored +with `--json` (which always emits the full structured payload), and on a cache +hit the cached text is rendered in one shot rather than as a fake token stream. + ## Quickstart (library) ```python @@ -131,6 +143,28 @@ for answer in result.answers: print("SYNTHESIS:\n", result.synthesis) ``` +### Streaming (synthesize/raw) + +`Council.ask_stream` is an async generator that yields incremental `StreamEvent`s +as member (and synthesizer) tokens arrive, then a terminal `done` event carrying +the full `CouncilResult` — the same shape `ask()` returns, so downstream code is +unaffected: + +```python +async for event in council.ask_stream("What is the capital of France?"): + if event.type in ("member_delta", "synthesis_delta"): + print(event.text, end="", flush=True) # live token + elif event.type == "done": + result = event.result # full CouncilResult +``` + +Event types: `member_delta` / `member_done` (per member, interleaved), +`synthesis_delta` / `synthesis_done` (when synthesizing), and the final `done`. +A member that cannot stream, or any mid-stream failure, degrades gracefully — +partial text is preserved and the error lands on that member's `ModelAnswer`, +never raising (the same never-raises contract as `ask`). Streaming applies to +`synthesize`/`raw` only; `debate`/`adversarial` are not streamed. + ### Debate and adversarial modes ```python diff --git a/SYSTEM_CONTEXT_DIAGRAM.md b/SYSTEM_CONTEXT_DIAGRAM.md index 69ff47d..40b127e 100644 --- a/SYSTEM_CONTEXT_DIAGRAM.md +++ b/SYSTEM_CONTEXT_DIAGRAM.md @@ -105,6 +105,15 @@ flowchart TB speaks native `generateContent` (OpenAI roles mapped, `systemInstruction` hoisted, `usageMetadata` parsed). Every adapter builds a request and hands it to the **single** network boundary — `transport.post_json` (`transport.py`), one async httpx call site. +- **Streaming shares the same boundary (PDD §9 #5).** A `--stream` run (and the library + `Council.ask_stream` async generator) flows through a streaming sibling of the call path: + `call_model_stream` (`providers.py`) → `transport.stream_sse` (`transport.py`, the single + streaming httpx call site, `client.stream(...)`) → each adapter's `stream_request` + + `parse_sse_event` (OpenAI-compat `data:`/`[DONE]` deltas; Anthropic named SSE events; + Gemini `streamGenerateContent?alt=sse`). `streaming.py` interleaves members concurrently + and emits `StreamEvent`s, ending with a `done` event whose `CouncilResult` matches the + non-streaming shape. Streaming covers `synthesize`/`raw` only; the never-raises + + `redact()` invariants hold identically, with partial text preserved on mid-stream failure. - **`resolve_adapter` is the extension seam.** Adding a *new provider family* is one registration in `adapters/__init__.py`; adding an *OpenAI-compatible endpoint* is **config-only** — a `~/.conclave/config.yml` `endpoints:` entry, no code. That is why diff --git a/docs/PRODUCT_DESIGN_DOCUMENT.md b/docs/PRODUCT_DESIGN_DOCUMENT.md index 665ce93..0129d9e 100644 --- a/docs/PRODUCT_DESIGN_DOCUMENT.md +++ b/docs/PRODUCT_DESIGN_DOCUMENT.md @@ -246,7 +246,8 @@ CLI (cli.py, typer+rich) Library (from conclave import Council) | `modes.py` | Deliberation orchestration: `run_debate` (multi-round, anonymized peers, drop-out) and `run_adversarial` (propose → refute → verdict). Built entirely on `Council.fan_out` + `synthesize_blocks` — no duplicated concurrency or synthesizer code. | | `prompts.py` | Role/template strings for debate and adversarial (member, critic, judge, debate-final system prompts) and the anonymized peer-block builder. Separates *what each role is told* from *when to call whom*. | | `providers.py` | `call_model` — the single async call path. Resolves the adapter for a model id, reads the key value *by name at call time*, calls the adapter+transport, parses the reply, and captures latency, token usage, and any (redacted) error into a `ModelAnswer`; never raises for provider-side failures. Signature and never-raises contract unchanged from v0.1/v0.2. | -| `transport.py` | The single async network boundary: `post_json` — one httpx call site for the whole highway. Nothing else in conclave touches the network. | +| `transport.py` | The single async network boundary: `post_json` (buffered) + `stream_sse` (SSE via `client.stream(...)`, issue #7) — the only two httpx call sites for the whole highway. Nothing else in conclave touches the network. | +| `streaming.py` | Council-level streaming engine (issue #7): `stream_ask` fans members out concurrently (interleaved via an `asyncio.Queue`), optionally streams the synthesizer, and emits `StreamEvent`s ending with a `done` event whose `CouncilResult` matches the buffered shape. Behind `Council.ask_stream`/`stream_sync`. synthesize/raw only. | | `adapters/__init__.py` | `resolve_adapter(model_id, config)` — the provider registry and **extension seam**. Maps a model-id prefix (or a config `endpoints:` entry) to the adapter that serves it. Adding a provider family = one registration here; adding an OpenAI-compatible endpoint = config-only. | | `adapters/base.py` | The `ProviderAdapter` protocol, `ProviderError`, and `redact()` — the secret-scrubber applied to every error string before it reaches `ModelAnswer.error`. | | `adapters/openai_compat.py` | `OpenAICompatAdapter` — serves openai / xai / perplexity / groq / deepseek / mistral / together and any custom OpenAI-compatible endpoint. Per-provider full completions URL (note: Perplexity has no `/v1` segment; Groq nests its OpenAI surface under `/openai/v1`). | @@ -341,7 +342,8 @@ LLM-SDK dependency.** Packaged with hatchling; console script `conclave = concla - **No hosted/proxied token path.** No conclave-operated endpoint that sees user prompts or takes a margin. BYO-keys, direct-to-provider, always. (Permanent.) - **No persistence/caching of results** in v0.1 (caching is Roadmap, §9). -- **No streaming** in v0.1 (Roadmap, §9). +- **No streaming** in v0.1 (Roadmap, §9 — **LANDED for synthesize/raw in v0.3**, issue #7; + `debate`/`adversarial` streaming still out of scope). - **No server mode** in v0.1 (possible Roadmap, §9). - **No `vote` mode** yet (Roadmap, §9 — flagged for build, not for removal). `debate` and `adversarial` shipped in v0.2. @@ -378,7 +380,23 @@ Ordered roughly by strategic value to the origin use case and to mcp-warden. defaults stay open under §12 #5. Struck through for traceability. 4. **Caching** — optional result cache keyed on (prompt, council, mode, model ids) to make repeated/eval runs cheap. Must remain off by default and never persist keys. -5. **Streaming** — stream member answers and/or the synthesis to the terminal/library. +5. ~~**Streaming** — stream member answers and/or the synthesis to the terminal/library.~~ + **LANDED** (issue #7): streaming for the `synthesize`/`raw` path. Library API is the + `Council.ask_stream` async generator (plus `Council.stream_sync` for non-async callers), + yielding `StreamEvent`s (`member_delta`/`member_done`, `synthesis_delta`/`synthesis_done`, + terminal `done` carrying the full `CouncilResult`); CLI flag is `--stream`. The single + streaming network boundary is `transport.stream_sse` (`client.stream(...)`), reusing the + pooled client/timeout/`redact()` plumbing; each adapter parses its own SSE delta shape + (OpenAI-compat `choices[].delta.content` + `[DONE]` with `stream_options.include_usage`; + Anthropic `content_block_delta` text + `message_start`/`message_delta` usage + + `message_stop`; Gemini `streamGenerateContent?alt=sse` `parts[].text` + cumulative + `usageMetadata`). A non-streamable model or any mid-stream error degrades gracefully — + partial text preserved, error on `ModelAnswer`, never raises — and the assembled + `ModelAnswer` (text + usage) matches the buffered result, so synthesis/`CouncilResult` + are unaffected. Non-streaming remains the default and is byte-for-byte unchanged. + `--stream` + cache: a hit renders the cached final text in one shot (no fake token + stream). `debate`/`adversarial` streaming is intentionally **out of scope** (a later + issue may extend them). Struck through for traceability. 6. **Local HTTP/server mode (under evaluation)** — a *local* server for convenience only; must not become a hosted token path or violate the no-middleman non-goal. 7. ~~**Key-leak hardening** — scrub/limit provider-originated error strings before they land diff --git a/src/conclave/__init__.py b/src/conclave/__init__.py index 9f67584..110447e 100644 --- a/src/conclave/__init__.py +++ b/src/conclave/__init__.py @@ -9,6 +9,13 @@ result = council.ask_sync("Your prompt") # sync result = await council.ask("Your prompt") # async + # streaming (synthesize/raw only): incremental StreamEvents + async for event in council.ask_stream("Your prompt"): + if event.type in ("member_delta", "synthesis_delta"): + print(event.text, end="", flush=True) + elif event.type == "done": + result = event.result # full CouncilResult, same shape as ask() + # multi-round debate result = await council.debate("Your prompt", rounds=3) result = council.debate_sync("Your prompt", rounds=3) @@ -39,6 +46,7 @@ CouncilResult, DebateRound, ModelAnswer, + StreamEvent, TokenUsage, ) from .transport import aclose @@ -52,6 +60,7 @@ "TokenUsage", "DebateRound", "AdversarialResult", + "StreamEvent", "ConclaveConfig", "load_config", "aclose", diff --git a/src/conclave/adapters/anthropic.py b/src/conclave/adapters/anthropic.py index 94ce2ee..a6ccb93 100644 --- a/src/conclave/adapters/anthropic.py +++ b/src/conclave/adapters/anthropic.py @@ -16,9 +16,11 @@ from __future__ import annotations +import json + from ..models import TokenUsage from ..registry import PROVIDER_ENV_VARS -from .base import ProviderError, status_error +from .base import ProviderError, SSEDelta, status_error ANTHROPIC_URL = "https://api.anthropic.com/v1/messages" ANTHROPIC_VERSION = "2023-06-01" @@ -34,6 +36,7 @@ class AnthropicAdapter: prefix = "anthropic" completions_url = ANTHROPIC_URL + supports_streaming = True def __init__(self, max_tokens: int = DEFAULT_MAX_TOKENS) -> None: self.max_tokens = max_tokens @@ -113,6 +116,87 @@ def parse_response(self, status: int, payload: object) -> tuple[str, TokenUsage usage = _parse_usage(payload.get("usage")) return text, usage + def stream_request( + self, + model_id: str, + messages: list[dict[str, str]], + temperature: float | None, + timeout: float, + api_key: str, + ) -> tuple[str, dict[str, str], dict]: + """Build the streaming POST: ``build_request`` + ``stream: true``. + + Anthropic streams the same ``/v1/messages`` endpoint with ``stream: + true``; no other body change is needed. See + :meth:`ProviderAdapter.stream_request`. + """ + url, headers, body = self.build_request(model_id, messages, temperature, timeout, api_key) + body["stream"] = True + return url, headers, body + + def parse_sse_event(self, event: str, data: str) -> SSEDelta: + """Parse one Anthropic SSE frame by its named ``event`` type. + + Event flow handled (verified against the Anthropic Messages streaming + reference): + + * ``message_start`` -- carries ``message.usage.input_tokens`` (the + prompt accounting) -> a usage frame with ``prompt_tokens`` set. + * ``content_block_delta`` with ``delta.type == "text_delta"`` -> + ``delta.text`` is a text delta. (``input_json_delta`` / + ``thinking_delta`` blocks carry no answer text and are skipped.) + * ``message_delta`` -- carries the *cumulative* ``usage.output_tokens`` + -> a usage frame with ``completion_tokens`` set (last wins). + * ``message_stop`` -> ``done=True``. + * ``error`` -- a structured stream error -> :class:`ProviderError`. + + Other events (``content_block_start``/``stop``, ``ping``) yield an empty + :class:`SSEDelta`. See :meth:`ProviderAdapter.parse_sse_event`. + """ + if event == "message_stop": + return SSEDelta(done=True) + try: + frame = json.loads(data) + except (ValueError, TypeError) as exc: + raise ProviderError( + f"anthropic: malformed stream frame ({type(exc).__name__})" + ) from exc + if not isinstance(frame, dict): + raise ProviderError("anthropic: malformed stream frame (non-object)") + + frame_type = frame.get("type") + if frame_type == "error" or event == "error": + raise ProviderError(status_error("anthropic", 200, frame, secondary_keys=("type",))) + + if frame_type == "content_block_delta": + delta = frame.get("delta") + if isinstance(delta, dict) and delta.get("type") == "text_delta": + text = delta.get("text") + if isinstance(text, str): + return SSEDelta(text=text) + return SSEDelta() + + if frame_type == "message_start": + message = frame.get("message") + usage = message.get("usage") if isinstance(message, dict) else None + if isinstance(usage, dict): + prompt = int(usage.get("input_tokens", 0) or 0) + if prompt: + # Leave total at 0 so _merge_usage recomputes it from the + # merged prompt + completion (Anthropic never sends a sum). + return SSEDelta(usage=TokenUsage(prompt_tokens=prompt)) + return SSEDelta() + + if frame_type == "message_delta": + usage = frame.get("usage") + if isinstance(usage, dict): + completion = int(usage.get("output_tokens", 0) or 0) + if completion: + return SSEDelta(usage=TokenUsage(completion_tokens=completion)) + return SSEDelta() + + return SSEDelta() + def _parse_usage(raw: object) -> TokenUsage | None: """Map Anthropic ``input_tokens``/``output_tokens`` to :class:`TokenUsage`.""" diff --git a/src/conclave/adapters/base.py b/src/conclave/adapters/base.py index f439ed1..1501f5f 100644 --- a/src/conclave/adapters/base.py +++ b/src/conclave/adapters/base.py @@ -24,6 +24,7 @@ import os import re +from dataclasses import dataclass from typing import Protocol, runtime_checkable from ..models import TokenUsage @@ -152,6 +153,32 @@ def redact(text: str) -> str: return cleaned +@dataclass +class SSEDelta: + """The result of interpreting one Server-Sent Event from a stream (issue #7). + + An adapter's :meth:`ProviderAdapter.parse_sse_event` turns each raw + ``(event, data)`` pair from :func:`conclave.transport.stream_sse` into one of + these. All fields are optional because a single SSE frame may carry just text + (a content delta), just usage (a final accounting frame), the end-of-stream + signal, or nothing relevant (a control/ping frame the adapter skips). + + Attributes: + text: Incremental answer text in this frame, or ``""`` when the frame + carries no text. + usage: Token usage if this frame is the provider's final usage accounting + (OpenAI's ``include_usage`` chunk, Anthropic's ``message_delta``, + Gemini's per-chunk ``usageMetadata``), else ``None``. + done: ``True`` when this frame signals end-of-stream (OpenAI's + ``[DONE]`` sentinel, Anthropic's ``message_stop``); the caller stops + consuming after a done frame. + """ + + text: str = "" + usage: TokenUsage | None = None + done: bool = False + + class ProviderError(Exception): """A provider-side failure: non-2xx status or a malformed/empty payload. @@ -179,6 +206,7 @@ class ProviderAdapter(Protocol): prefix: str env_vars: tuple[str, ...] completions_url: str + supports_streaming: bool def build_request( self, @@ -219,3 +247,48 @@ def parse_response(self, status: int, payload: object) -> tuple[str, TokenUsage message already scrubbed of secrets. """ ... + + def stream_request( + self, + model_id: str, + messages: list[dict[str, str]], + temperature: float | None, + timeout: float, + api_key: str, + ) -> tuple[str, dict[str, str], dict]: + """Build ``(url, headers, json_body)`` for a STREAMING request (issue #7). + + Mirrors :meth:`build_request` but sets the provider's stream-enabling + flag (OpenAI ``stream: true`` + ``stream_options.include_usage``, + Anthropic ``stream: true``, Gemini ``?alt=sse``). Adapters that cannot + stream do not implement this and report ``supports_streaming = False``; + the provider call path then falls back to a buffered request and emits + the text in one chunk. + + Args and return mirror :meth:`build_request`. + """ + ... + + def parse_sse_event(self, event: str, data: str) -> SSEDelta: + """Interpret one raw SSE ``(event, data)`` pair into an :class:`SSEDelta`. + + Called once per frame yielded by :func:`conclave.transport.stream_sse`. + Returns the incremental text, a final usage accounting if this frame + carries it, and/or the end-of-stream signal. A frame the adapter does + not care about (a control/ping/role frame) yields an empty + :class:`SSEDelta`. + + Args: + event: The SSE ``event:`` name (``""`` for OpenAI/Gemini streams, + e.g. ``"content_block_delta"`` for Anthropic). + data: The raw ``data:`` payload (JSON, or the ``[DONE]`` sentinel). + + Returns: + An :class:`SSEDelta` describing this frame. + + Raises: + ProviderError: When a frame is a structured provider error event or + is irrecoverably malformed; the caller captures it as a + non-raising ``ModelAnswer.error`` with partial text preserved. + """ + ... diff --git a/src/conclave/adapters/gemini.py b/src/conclave/adapters/gemini.py index d16b71e..0f89e56 100644 --- a/src/conclave/adapters/gemini.py +++ b/src/conclave/adapters/gemini.py @@ -18,9 +18,11 @@ from __future__ import annotations +import json + from ..models import TokenUsage from ..registry import PROVIDER_ENV_VARS -from .base import ProviderError, status_error +from .base import ProviderError, SSEDelta, status_error GEMINI_BASE = "https://generativelanguage.googleapis.com/v1beta/models" DEFAULT_MAX_OUTPUT_TOKENS = 4096 @@ -40,6 +42,7 @@ class GeminiAdapter: # The concrete URL embeds the model and is built per-request; this base is # exposed for parity with the protocol's ``completions_url`` attribute. completions_url = GEMINI_BASE + supports_streaming = True def __init__(self, max_output_tokens: int = DEFAULT_MAX_OUTPUT_TOKENS) -> None: self.max_output_tokens = max_output_tokens @@ -118,6 +121,65 @@ def parse_response(self, status: int, payload: object) -> tuple[str, TokenUsage usage = _parse_usage(payload.get("usageMetadata")) return text, usage + def stream_request( + self, + model_id: str, + messages: list[dict[str, str]], + temperature: float | None, + timeout: float, + api_key: str, + ) -> tuple[str, dict[str, str], dict]: + """Build the streaming POST against ``streamGenerateContent?alt=sse``. + + Same body as :meth:`build_request`, but the URL targets the streaming + method with ``?alt=sse`` so Gemini emits standard SSE frames (without + ``alt=sse`` it returns a single JSON array, not a stream -- verified + against the Gemini API streaming reference). See + :meth:`ProviderAdapter.stream_request`. + """ + _url, headers, body = self.build_request(model_id, messages, temperature, timeout, api_key) + model = self._bare_model(model_id) + url = f"{GEMINI_BASE}/{model}:streamGenerateContent?alt=sse" + return url, headers, body + + def parse_sse_event(self, event: str, data: str) -> SSEDelta: + """Parse one Gemini SSE frame (a partial ``GenerateContentResponse``). + + Each frame carries ``candidates[0].content.parts[*].text`` (a text + delta) and may carry a *cumulative* ``usageMetadata`` accounting (last + wins). Gemini has no ``[DONE]`` sentinel -- the stream simply ends -- so + no frame sets ``done``; the transport's end-of-iteration terminates the + loop. A frame whose JSON is malformed raises :class:`ProviderError`; a + frame carrying a structured ``error`` likewise raises. A safety-blocked + or otherwise text-less candidate yields a usage-only / empty delta. See + :meth:`ProviderAdapter.parse_sse_event`. + """ + try: + frame = json.loads(data) + except (ValueError, TypeError) as exc: + raise ProviderError(f"gemini: malformed stream frame ({type(exc).__name__})") from exc + if not isinstance(frame, dict): + raise ProviderError("gemini: malformed stream frame (non-object)") + + if isinstance(frame.get("error"), (dict, str)): + raise ProviderError(status_error("gemini", 200, frame, secondary_keys=("status",))) + + text = "" + candidates = frame.get("candidates") + if isinstance(candidates, list) and candidates: + candidate = candidates[0] + content = candidate.get("content") if isinstance(candidate, dict) else None + parts = content.get("parts") if isinstance(content, dict) else None + if isinstance(parts, list): + text = "".join( + part.get("text", "") + for part in parts + if isinstance(part, dict) and "text" in part + ) + + usage = _parse_usage(frame.get("usageMetadata")) + return SSEDelta(text=text, usage=usage) + def _parse_usage(raw: object) -> TokenUsage | None: """Map Gemini ``usageMetadata`` counts to :class:`TokenUsage`.""" diff --git a/src/conclave/adapters/openai_compat.py b/src/conclave/adapters/openai_compat.py index 2d976c7..71cf8fc 100644 --- a/src/conclave/adapters/openai_compat.py +++ b/src/conclave/adapters/openai_compat.py @@ -15,8 +15,10 @@ from __future__ import annotations +import json + from ..models import TokenUsage -from .base import ProviderError, status_error +from .base import ProviderError, SSEDelta, status_error # Verified per-provider full completions URLs. Note Perplexity has NO ``/v1`` # segment while xAI/OpenAI do, and Groq nests its OpenAI surface under @@ -46,6 +48,10 @@ class OpenAICompatAdapter: parameter is omitted so the provider applies its own default. """ + # Every OpenAI-compatible vendor conclave ships speaks the standard + # streaming protocol (``stream: true`` -> SSE deltas -> ``[DONE]``). + supports_streaming = True + def __init__( self, prefix: str, @@ -122,6 +128,70 @@ def parse_response(self, status: int, payload: object) -> tuple[str, TokenUsage usage = _parse_usage(payload.get("usage")) return content, usage + def stream_request( + self, + model_id: str, + messages: list[dict[str, str]], + temperature: float | None, + timeout: float, + api_key: str, + ) -> tuple[str, dict[str, str], dict]: + """Build the streaming POST: ``build_request`` + ``stream`` flags. + + Sets ``stream: true`` and ``stream_options.include_usage: true`` so the + provider emits incremental ``choices[0].delta.content`` chunks followed + by a final chunk with empty ``choices`` and a top-level ``usage`` object + (verified against the OpenAI chat-completions streaming reference). See + :meth:`ProviderAdapter.stream_request`. + """ + url, headers, body = self.build_request(model_id, messages, temperature, timeout, api_key) + body["stream"] = True + body["stream_options"] = {"include_usage": True} + return url, headers, body + + def parse_sse_event(self, event: str, data: str) -> SSEDelta: + """Parse one OpenAI-style SSE frame. + + Frame shapes handled (verified against the OpenAI chat-completions + streaming reference): + + * ``[DONE]`` -- the terminating sentinel -> ``done=True``. + * a chunk with ``choices[0].delta.content`` -> a text delta. + * the final ``include_usage`` chunk: ``choices == []`` and a top-level + ``usage`` object -> a usage frame. + + A frame whose JSON is malformed raises :class:`ProviderError`; a frame + that simply carries no content (role-only delta, ``finish_reason`` only) + yields an empty :class:`SSEDelta`. See + :meth:`ProviderAdapter.parse_sse_event`. + """ + if data == "[DONE]": + return SSEDelta(done=True) + try: + chunk = json.loads(data) + except (ValueError, TypeError) as exc: + raise ProviderError( + f"{self.prefix}: malformed stream frame ({type(exc).__name__})" + ) from exc + if not isinstance(chunk, dict): + raise ProviderError(f"{self.prefix}: malformed stream frame (non-object)") + + # A structured error can arrive mid-stream as a normal data frame. + if isinstance(chunk.get("error"), (dict, str)): + raise ProviderError(status_error(self.prefix, 200, chunk, secondary_keys=("type",))) + + text = "" + choices = chunk.get("choices") + if isinstance(choices, list) and choices: + delta = choices[0].get("delta") if isinstance(choices[0], dict) else None + if isinstance(delta, dict): + content = delta.get("content") + if isinstance(content, str): + text = content + + usage = _parse_usage(chunk.get("usage")) + return SSEDelta(text=text, usage=usage) + def _parse_usage(raw: object) -> TokenUsage | None: """Map an OpenAI-style ``usage`` block to :class:`TokenUsage`, or ``None``.""" diff --git a/src/conclave/cli.py b/src/conclave/cli.py index dbddce1..76fa56b 100644 --- a/src/conclave/cli.py +++ b/src/conclave/cli.py @@ -21,7 +21,7 @@ from . import __version__ from .config import load_config from .council import Council -from .models import CouncilResult +from .models import CouncilResult, StreamEvent from .registry import DEFAULT_MODELS, key_present, key_source app = typer.Typer( @@ -129,6 +129,59 @@ def _render_adversarial(result: CouncilResult) -> None: err_console.print(f"[yellow]No verdict: {adv.verdict_error}[/yellow]") +def _stream_to_terminal(council: Council, prompt: str, *, synthesize: bool) -> CouncilResult: + """Render a live token stream to the terminal and return the final result. + + Drives :meth:`Council.stream_sync`, printing each member's (and the + synthesizer's) tokens inline as they arrive under a header. A failed member + is shown in red at its ``member_done`` event. The returned + :class:`CouncilResult` is the same structure the buffered path produces, so + the caller applies the usual exit-code contract. + + Headers are tracked per source so a header prints exactly once -- on the + first delta for a streaming source, or at the done event for a source that + streamed no live tokens (e.g. a cache hit emits a single delta; a failed + member emits none). + """ + # Track which sources have had a header printed so we open each section once. + started: set[str] = set() + + def _ensure_header(label: str, key: str, style: str) -> None: + if key not in started: + started.add(key) + console.print(f"\n[bold {style}]{label}[/bold {style}]") + + def on_event(event: StreamEvent) -> None: + if event.type == "member_delta": + _ensure_header(f"{event.name} ({event.model_id})", f"m:{event.name}", "cyan") + console.print(event.text or "", end="", soft_wrap=True, highlight=False) + elif event.type == "member_done": + ans = event.answer + if ans is not None and not ans.ok: + _ensure_header(f"{event.name} (failed)", f"m:{event.name}", "red") + console.print(f"[red]{ans.error}[/red]") + else: + # Close the streamed block with a newline so the next header is clean. + console.print() + elif event.type == "synthesis_delta": + _ensure_header(f"SYNTHESIS ({event.name} · {event.model_id})", "synthesis", "green") + console.print(event.text or "", end="", soft_wrap=True, highlight=False) + elif event.type == "synthesis_done": + ans = event.answer + if ans is not None and not ans.ok: + err_console.print(f"\n[yellow]No synthesis: {ans.error}[/yellow]") + else: + console.print() + + result = council.stream_sync(prompt, on_event, synthesize=synthesize) + _print_skipped(result) + # Surface a synthesis short-circuit reason that produced no synthesis_done + # event (e.g. synthesizer had no key, or no usable answers to synthesize). + if synthesize and result.synthesis is None and result.synthesis_error: + err_console.print(f"[yellow]No synthesis: {result.synthesis_error}[/yellow]") + return result + + # Mode name -> human renderer. JSON output bypasses this via model_dump. _RENDERERS = { "synthesize": _render_human, @@ -234,6 +287,15 @@ def ask( "the providers. The cache never stores API keys." ), ), + stream: bool = typer.Option( + False, + "--stream", + help=( + "Stream member (and synthesizer) tokens live to the terminal as they " + "arrive (synthesize/raw modes only). Ignored with --json. On a cache " + "hit the cached text is rendered in one shot (no live token stream)." + ), + ), ) -> None: """Fan PROMPT out to a council and synthesize, debate, or adversarially review. @@ -253,6 +315,12 @@ def ask( ) raise typer.Exit(code=2) + if stream and mode_lower not in ("synthesize", "raw"): + err_console.print( + f"[red]--stream is only supported for synthesize/raw modes, not '{mode_lower}'.[/red]" + ) + raise typer.Exit(code=2) + cfg = load_config() members = cfg.resolve_council(council) if not members: @@ -260,6 +328,19 @@ def ask( raise typer.Exit(code=2) c = Council(models=members, synthesizer=synthesizer, config=cfg, cache=cache) + + # Streaming path: live token output (synthesize/raw only, not with --json). + # It produces the same final CouncilResult, so the exit-code contract below + # applies identically. + if stream and not as_json: + result = _stream_to_terminal(c, prompt, synthesize=(mode_lower == "synthesize")) + if not result.successful_answers: + err_console.print( + "[red]No usable council answers. Run 'conclave providers' to check keys.[/red]" + ) + raise typer.Exit(code=1) + return + if mode_lower == "debate": threshold = _resolve_converge_threshold( converge, converge_threshold, cfg.converge_threshold diff --git a/src/conclave/council.py b/src/conclave/council.py index 3ee28f9..6f7508f 100644 --- a/src/conclave/council.py +++ b/src/conclave/council.py @@ -13,13 +13,13 @@ from __future__ import annotations import asyncio -from collections.abc import Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable from . import cache as cache_mod from . import transport from .config import ConclaveConfig, load_config from .logging import get_logger -from .models import CouncilResult, ModelAnswer +from .models import CouncilResult, ModelAnswer, StreamEvent from .providers import call_model from .registry import key_present @@ -255,6 +255,108 @@ async def _ask_uncached(self, prompt: str, synthesize: bool = True) -> CouncilRe await self._synthesize(result) return result + async def ask_stream(self, prompt: str, synthesize: bool = True) -> AsyncIterator[StreamEvent]: + """Stream a synthesize/raw run, yielding incremental :class:`StreamEvent`s. + + The streaming counterpart of :meth:`ask` (issue #7). Members are fanned + out concurrently and their tokens are interleaved as ``member_delta`` / + ``member_done`` events; when ``synthesize`` is ``True`` the synthesizer's + tokens follow as ``synthesis_delta`` / ``synthesis_done``; a terminal + ``done`` event carries the fully-assembled :class:`CouncilResult`, whose + shape matches the non-streaming path exactly. Streaming applies to the + synthesize/raw path only -- ``debate``/``adversarial`` are not streamed. + + **Cache interaction.** When the result cache is enabled and an identical + prior run is cached, there are no live provider tokens to stream: the + cached final text is rendered in **one shot** -- a single + ``member_delta`` per member (and a single ``synthesis_delta`` if a + synthesis was cached) followed by the matching ``*_done`` events and the + terminal ``done`` (with ``result.cached is True``). The providers are not + called. On a cache **miss**, the live stream runs and, on completion, the + assembled result is stored so a later ``--stream`` or buffered run hits. + + Args: + prompt: The user prompt to fan out. + synthesize: When ``True`` (default), stream the synthesizer too. + + Yields: + :class:`StreamEvent` objects; the last one is always ``type="done"``. + """ + from .streaming import stream_ask + + mode = "synthesize" if synthesize else "raw" + + if self.cache_enabled: + key = self._cache_key(prompt, mode) + hit = cache_mod.load(key) + if hit is not None: + logger.info("cache hit for %s stream (%s)", mode, key[:12]) + for event in self._replay_cached(hit): + yield event + return + + # Live miss: stream, capture the terminal result, then store it. + final: CouncilResult | None = None + async for event in stream_ask(self, prompt, synthesize=synthesize): + if event.type == "done" and event.result is not None: + final = event.result + yield event + if final is not None: + cache_mod.store(key, final) + return + + async for event in stream_ask(self, prompt, synthesize=synthesize): + yield event + + @staticmethod + def _replay_cached(result: CouncilResult) -> list[StreamEvent]: + """Render a cached :class:`CouncilResult` as one-shot stream events. + + A cache hit has no live tokens, so each member's full cached answer is + emitted as a single ``member_delta`` + ``member_done`` (errors emit only + ``member_done``), the cached synthesis as a single ``synthesis_delta`` + + ``synthesis_done``, and finally the terminal ``done`` carrying the cached + result verbatim (``cached is True``). This keeps the streaming consumer's + event contract intact without fabricating a fake token-by-token stream. + """ + events: list[StreamEvent] = [] + for ans in result.answers: + if ans.answer: + events.append( + StreamEvent( + type="member_delta", + name=ans.name, + model_id=ans.model_id, + text=ans.answer, + ) + ) + events.append( + StreamEvent(type="member_done", name=ans.name, model_id=ans.model_id, answer=ans) + ) + if result.synthesis is not None: + events.append( + StreamEvent( + type="synthesis_delta", + name=result.synthesizer, + model_id=result.synthesizer_model_id, + text=result.synthesis, + ) + ) + events.append( + StreamEvent( + type="synthesis_done", + name=result.synthesizer, + model_id=result.synthesizer_model_id, + answer=ModelAnswer( + name=result.synthesizer or "synthesizer", + model_id=result.synthesizer_model_id or "", + answer=result.synthesis, + ), + ) + ) + events.append(StreamEvent(type="done", result=result)) + return events + async def _synthesize(self, result: CouncilResult) -> None: """Run the synthesizer over the successful answers, mutating ``result``.""" usable = result.successful_answers @@ -426,6 +528,43 @@ def ask_sync(self, prompt: str, synthesize: bool = True) -> CouncilResult: """ return self._run_sync(lambda: self.ask(prompt, synthesize=synthesize), "ask_sync") + def stream_sync( + self, + prompt: str, + on_event: Callable[[StreamEvent], None], + synthesize: bool = True, + ) -> CouncilResult: + """Drive :meth:`ask_stream` synchronously, invoking ``on_event`` per event. + + For non-async callers (the CLI ``--stream`` path). Each + :class:`StreamEvent` is passed to ``on_event`` as it arrives so live + output can be rendered; the fully-assembled :class:`CouncilResult` (from + the terminal ``done`` event) is returned. Closes the pooled HTTP client + when the loop ends, like the other ``*_sync`` wrappers. Raises + ``RuntimeError`` if invoked from inside a running event loop -- iterate + :meth:`ask_stream` directly there instead. + + Args: + prompt: The user prompt to fan out. + on_event: Callback invoked once per :class:`StreamEvent` in order. + synthesize: When ``True`` (default), stream the synthesizer too. + + Returns: + The final :class:`CouncilResult` carried by the ``done`` event. + """ + + async def _consume() -> CouncilResult: + final: CouncilResult | None = None + async for event in self.ask_stream(prompt, synthesize=synthesize): + on_event(event) + if event.type == "done" and event.result is not None: + final = event.result + # ask_stream always ends with a done event carrying a result; fall + # back to an empty result only as a defensive guard. + return final if final is not None else CouncilResult(prompt=prompt) + + return self._run_sync(_consume, "stream_sync") + def debate_sync( self, prompt: str, rounds: int = 2, converge_threshold: float | None = None ) -> CouncilResult: diff --git a/src/conclave/models.py b/src/conclave/models.py index 2fac00e..dec2693 100644 --- a/src/conclave/models.py +++ b/src/conclave/models.py @@ -42,6 +42,47 @@ def ok(self) -> bool: return self.error is None and self.answer is not None +class StreamEvent(BaseModel): + """One incremental event from a streaming council run (issue #7). + + Streaming yields a flat sequence of these so a consumer can render live + output without knowing the council's internals. The terminal ``done`` event + carries the fully-assembled :class:`CouncilResult`, so a consumer that only + wants the final structured result can ignore every chunk and read + ``done`` -- the result shape is byte-for-byte the same as the + non-streaming path. + + Attributes: + type: The event kind: + + * ``"member_delta"`` -- an incremental text chunk from one council + member. ``name``/``model_id`` identify the member and ``text`` + carries the new tokens. + * ``"member_done"`` -- a member finished (or failed). ``answer`` + carries that member's final :class:`ModelAnswer` (with ``error`` + set on failure, partial text preserved if any). + * ``"synthesis_delta"`` -- an incremental text chunk from the + synthesizer (only when ``synthesize=True`` and synthesis runs). + * ``"synthesis_done"`` -- the synthesizer finished; ``answer`` holds + its final :class:`ModelAnswer`. + * ``"done"`` -- the run is complete; ``result`` holds the full + :class:`CouncilResult`. + name: Friendly member/synthesizer name for delta/done events. + model_id: Resolved model id for delta/done events. + text: The incremental text for ``*_delta`` events. + answer: The final :class:`ModelAnswer` for ``member_done`` / + ``synthesis_done`` events. + result: The full :class:`CouncilResult` for the terminal ``done`` event. + """ + + type: str + name: str | None = None + model_id: str | None = None + text: str | None = None + answer: ModelAnswer | None = None + result: CouncilResult | None = None + + class DebateRound(BaseModel): """One round of a multi-round debate. @@ -148,3 +189,9 @@ def successful_answers(self) -> list[ModelAnswer]: def failed_answers(self) -> list[ModelAnswer]: """Members that were attempted but errored.""" return [a for a in self.answers if not a.ok] + + +# ``StreamEvent.result`` forward-references ``CouncilResult`` (defined after it +# under ``from __future__ import annotations``); resolve that ref now that the +# class exists so ``StreamEvent`` validates correctly. +StreamEvent.model_rebuild() diff --git a/src/conclave/providers.py b/src/conclave/providers.py index de5c72e..f90a6c0 100644 --- a/src/conclave/providers.py +++ b/src/conclave/providers.py @@ -16,13 +16,14 @@ import os import time +from collections.abc import AsyncIterator from . import transport from .adapters import ProviderError, resolve_adapter from .adapters.base import ProviderAdapter, redact from .config import ConclaveConfig, load_config from .logging import get_logger -from .models import ModelAnswer +from .models import ModelAnswer, TokenUsage from .transport import TransportError logger = get_logger("providers") @@ -121,3 +122,173 @@ async def call_model( message = redact(f"{type(exc).__name__}: {exc}") logger.warning("%s (%s) unexpected error: %s", name, model_id, message) return ModelAnswer(name=name, model_id=model_id, latency_s=latency, error=message) + + +def _merge_usage(acc: TokenUsage | None, frame: TokenUsage | None) -> TokenUsage | None: + """Merge a per-frame usage accounting into the running total, field-wise. + + Providers split usage across frames differently: OpenAI sends one final + chunk with all three counts; Anthropic sends ``prompt_tokens`` on + ``message_start`` and *cumulative* ``completion_tokens`` on each + ``message_delta``; Gemini repeats a cumulative ``usageMetadata`` per chunk. + Taking the last non-zero value per field (and recomputing ``total`` when the + provider did not send one) yields the same final accounting the buffered + ``parse_response`` would have produced, regardless of which scheme applies. + """ + if frame is None: + return acc + if acc is None: + acc = TokenUsage() + prompt = frame.prompt_tokens or acc.prompt_tokens + completion = frame.completion_tokens or acc.completion_tokens + # An explicit combined total from a frame wins (OpenAI sends one). Otherwise + # derive it from the merged components -- Anthropic/Gemini split prompt and + # completion across frames and never send a true sum, so a stale per-component + # value must not masquerade as the total. + if frame.total_tokens: + total = frame.total_tokens + elif prompt or completion: + total = prompt + completion + else: + total = acc.total_tokens + return TokenUsage(prompt_tokens=prompt, completion_tokens=completion, total_tokens=total) + + +async def call_model_stream( + name: str, + model_id: str, + messages: list[dict[str, str]], + *, + temperature: float = 0.7, + timeout: float = 120.0, + config: ConclaveConfig | None = None, +) -> AsyncIterator[str | ModelAnswer]: + """Stream a single model's answer, yielding text deltas then a final answer. + + The streaming counterpart of :func:`call_model` (issue #7). It yields zero or + more ``str`` text chunks in arrival order, then **exactly one** + :class:`ModelAnswer` as the final item -- the fully-assembled result whose + ``answer`` equals the concatenation of every yielded chunk and whose + ``usage`` matches what the buffered path would produce. A consumer + distinguishes the two by type:: + + async for item in call_model_stream(...): + if isinstance(item, str): + render(item) # live token + else: + final = item # ModelAnswer (last item) + + Never-raises contract (identical to :func:`call_model`): an unknown + provider, a missing key, a model that cannot stream, or any mid-stream error + is captured on the final :class:`ModelAnswer.error`; **any partial text seen + before the failure is preserved** both in the chunks already yielded and in + the final answer's ``answer`` field. This coroutine never propagates a + provider/network exception. + + A provider whose adapter reports ``supports_streaming = False`` is served by + falling back to the buffered :func:`call_model`, whose full answer text is + emitted as a single chunk -- so a non-streaming provider degrades to a + one-shot render rather than an error. + + Args mirror :func:`call_model`. + + Yields: + ``str`` text deltas, then a final :class:`ModelAnswer`. + """ + started = time.perf_counter() + + try: + resolved_config = config if config is not None else load_config() + adapter = resolve_adapter(model_id, resolved_config) + except ProviderError as exc: + latency = time.perf_counter() - started + logger.warning("%s (%s) unresolved: %s", name, model_id, exc) + yield ModelAnswer(name=name, model_id=model_id, latency_s=latency, error=str(exc)) + return + + # Providers without a streaming path degrade to a single-chunk render so the + # caller's streaming code path still works uniformly. + if not getattr(adapter, "supports_streaming", False): + answer = await call_model( + name, + model_id, + messages, + temperature=temperature, + timeout=timeout, + config=resolved_config, + ) + if answer.answer: + yield answer.answer + yield answer + return + + api_key = _resolve_key(adapter) + if api_key is None: + latency = time.perf_counter() - started + names = " or ".join(adapter.env_vars) or "(none)" + msg = f"no API key in environment (set {names})" + logger.warning("%s (%s) %s", name, model_id, msg) + yield ModelAnswer(name=name, model_id=model_id, latency_s=latency, error=msg) + return + + parts: list[str] = [] + usage: TokenUsage | None = None + try: + url, headers, body = adapter.stream_request( + model_id, messages, temperature, timeout, api_key + ) + async for event, data in transport.stream_sse(url, headers, body, timeout): + delta = adapter.parse_sse_event(event, data) + if delta.text: + parts.append(delta.text) + yield delta.text + usage = _merge_usage(usage, delta.usage) + if delta.done: + break + + text = "".join(parts) + latency = time.perf_counter() - started + if not text: + # A stream that closed without any text is a failure, mirroring the + # buffered adapters' "empty response" ProviderError. + yield ModelAnswer( + name=name, + model_id=model_id, + latency_s=latency, + error=f"{adapter.prefix}: empty response (no streamed content)", + ) + return + logger.info("%s (%s) streamed ok in %.2fs", name, model_id, latency) + yield ModelAnswer( + name=name, + model_id=model_id, + answer=text, + latency_s=latency, + usage=usage, + ) + except (ProviderError, TransportError) as exc: + # Mid-stream failure: preserve any partial text already collected, set + # the (redacted) error, never raise out of the call path. + latency = time.perf_counter() - started + message = redact(str(exc)) + logger.warning("%s (%s) stream failed: %s", name, model_id, message) + yield ModelAnswer( + name=name, + model_id=model_id, + answer="".join(parts) or None, + latency_s=latency, + usage=usage, + error=message, + ) + except Exception as exc: # noqa: BLE001 -- never let an unexpected raise kill the run + latency = time.perf_counter() - started + message = redact(f"{type(exc).__name__}: {exc}") + logger.warning("%s (%s) unexpected stream error: %s", name, model_id, message) + yield ModelAnswer( + name=name, + model_id=model_id, + answer="".join(parts) or None, + latency_s=latency, + usage=usage, + error=message, + ) diff --git a/src/conclave/streaming.py b/src/conclave/streaming.py new file mode 100644 index 0000000..e943567 --- /dev/null +++ b/src/conclave/streaming.py @@ -0,0 +1,249 @@ +"""Streaming orchestration for the synthesize/raw path (issue #7). + +This module owns the council-level streaming logic so :mod:`conclave.council` +stays focused on the buffered modes. :func:`stream_ask` is the single +async-generator engine behind :meth:`conclave.council.Council.ask_stream`: it + +* fans the prompt out to every available member **concurrently**, interleaving + each member's incremental text into one flat :class:`conclave.models.StreamEvent` + sequence (``member_delta`` / ``member_done``), +* optionally streams the synthesizer over the successful answers + (``synthesis_delta`` / ``synthesis_done``), and +* emits a terminal ``done`` event carrying the fully-assembled + :class:`conclave.models.CouncilResult` whose shape is **byte-for-byte + identical** to the non-streaming :meth:`Council.ask` result -- so downstream + consumers (and the cache) are unaffected. + +Streaming applies to the synthesize/raw path only; ``debate``/``adversarial`` +are intentionally out of scope for this issue. + +Interleaving is done with an :class:`asyncio.Queue`: each member runs as its own +task draining :func:`conclave.providers.call_model_stream` and pushing events +onto the queue, while the generator yields whatever arrives first. This gives +true concurrency (slow members do not block fast ones) with deterministic +final assembly (the final :class:`CouncilResult.answers` list is ordered by the +members list, not by completion order). +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from typing import TYPE_CHECKING + +from .logging import get_logger +from .models import CouncilResult, ModelAnswer, StreamEvent +from .providers import call_model_stream +from .registry import key_present + +if TYPE_CHECKING: # avoid an import cycle at runtime + from .council import Council + +logger = get_logger("streaming") + +# Sentinel pushed onto the queue when a single member's stream is exhausted, so +# the consumer knows when all members have finished without polling task state. +_MEMBER_DONE = object() + + +async def _drive_member( + council: Council, + name: str, + model_id: str, + messages: list[dict[str, str]], + queue: asyncio.Queue, +) -> None: + """Drain one member's token stream onto the shared queue. + + Pushes ``("delta", StreamEvent)`` for each text chunk and a final + ``("answer", ModelAnswer)`` carrying the assembled answer, then the + ``_MEMBER_DONE`` sentinel. Never raises: :func:`call_model_stream` already + captures provider failures onto the final ``ModelAnswer`` (partial text + preserved), and any unexpected error here is converted to an error answer so + one bad member can never wedge the queue or abort the run. + """ + try: + async for item in call_model_stream( + name, + model_id, + messages, + temperature=council.temperature, + timeout=council.timeout, + config=council.config, + ): + if isinstance(item, ModelAnswer): + await queue.put(("answer", item)) + else: + await queue.put( + ( + "delta", + StreamEvent(type="member_delta", name=name, model_id=model_id, text=item), + ) + ) + except Exception as exc: # noqa: BLE001 -- a member must never wedge the run + logger.warning("%s streaming raised unexpectedly: %s", name, exc) + await queue.put( + ( + "answer", + ModelAnswer( + name=name, + model_id=model_id, + error=f"{type(exc).__name__}: {exc}", + ), + ) + ) + finally: + await queue.put((_MEMBER_DONE, None)) + + +async def stream_ask( + council: Council, prompt: str, synthesize: bool = True +) -> AsyncIterator[StreamEvent]: + """Stream a synthesize/raw council run as a flat :class:`StreamEvent` sequence. + + See the module docstring and :meth:`conclave.council.Council.ask_stream`. + + Args: + council: The :class:`Council` whose members/synthesizer/config drive the + run. + prompt: The user prompt to fan out. + synthesize: When ``True`` (default), stream the synthesizer over the + successful member answers after every member finishes. + + Yields: + ``member_delta`` / ``member_done`` events per member (interleaved), + then ``synthesis_delta`` / ``synthesis_done`` when synthesis runs, then + a terminal ``done`` event whose ``result`` is the full + :class:`CouncilResult`. + """ + members, skipped = council._available_members() + result = CouncilResult( + prompt=prompt, + mode="synthesize" if synthesize else "raw", + skipped=skipped, + ) + + if not members: + logger.warning("no council members have keys available; nothing to stream") + yield StreamEvent(type="done", result=result) + return + + base_messages = [{"role": "user", "content": prompt}] + queue: asyncio.Queue = asyncio.Queue() + tasks = [ + asyncio.create_task(_drive_member(council, name, model_id, base_messages, queue)) + for name, model_id in members + ] + + # Collect finished answers keyed by name so the final list can be reordered + # to match the members list (deterministic shape) regardless of arrival. + by_name: dict[str, ModelAnswer] = {} + remaining = len(members) + try: + while remaining > 0: + kind, payload = await queue.get() + if kind == "delta": + yield payload + elif kind == "answer": + by_name[payload.name] = payload + yield StreamEvent( + type="member_done", + name=payload.name, + model_id=payload.model_id, + answer=payload, + ) + elif kind is _MEMBER_DONE: + remaining -= 1 + finally: + # Ensure every member task is awaited even if the consumer stops early. + for task in tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + # Reassemble answers in members order for a stable, non-streaming-equivalent + # CouncilResult. A member that produced no answer event (should not happen -- + # the driver always emits one) is recorded as an explicit error. + result.answers = [ + by_name.get(name) or ModelAnswer(name=name, model_id=model_id, error="no answer produced") + for name, model_id in members + ] + + if synthesize: + async for event in _stream_synthesis(council, result): + yield event + + yield StreamEvent(type="done", result=result) + + +async def _stream_synthesis(council: Council, result: CouncilResult) -> AsyncIterator[StreamEvent]: + """Stream the synthesizer over ``result``'s successful answers, mutating it. + + Mirrors :meth:`Council._synthesize` (no-usable-answers and no-key short + circuits set ``synthesis_error`` exactly the same way), but streams the + synthesizer's tokens as ``synthesis_delta`` events and finishes with a + ``synthesis_done`` event. On any short circuit nothing is yielded (there is + no live token stream) -- the reason lands on ``result.synthesis_error`` and + is visible in the terminal ``done`` event. + """ + from .council import _SYNTH_SYSTEM + + usable = result.successful_answers + if not usable: + result.synthesis_error = "no successful member answers to synthesize" + logger.warning(result.synthesis_error) + return + + synth_id = council.config.resolve_model_id(council.synthesizer) + result.synthesizer = council.synthesizer + result.synthesizer_model_id = synth_id + + if not key_present(synth_id): + result.synthesis_error = ( + f"synthesizer '{council.synthesizer}' ({synth_id}) has no API key; " + "returning raw answers only" + ) + logger.warning(result.synthesis_error) + return + + blocks = "\n\n".join(f"### Answer from {a.name} ({a.model_id})\n{a.answer}" for a in usable) + user_content = ( + f"Original prompt:\n{result.prompt}\n\n" + f"Council answers:\n\n{blocks}\n\n" + "Now produce the consolidated answer." + ) + messages = [ + {"role": "system", "content": _SYNTH_SYSTEM}, + {"role": "user", "content": user_content}, + ] + + final: ModelAnswer | None = None + async for item in call_model_stream( + council.synthesizer, + synth_id, + messages, + temperature=council.temperature, + timeout=council.timeout, + config=council.config, + ): + if isinstance(item, ModelAnswer): + final = item + else: + yield StreamEvent( + type="synthesis_delta", + name=council.synthesizer, + model_id=synth_id, + text=item, + ) + + if final is not None and final.ok: + result.synthesis = final.answer + elif final is not None: + result.synthesis_error = final.error + if final is not None: + yield StreamEvent( + type="synthesis_done", + name=council.synthesizer, + model_id=synth_id, + answer=final, + ) diff --git a/src/conclave/transport.py b/src/conclave/transport.py index d056df3..1107984 100644 --- a/src/conclave/transport.py +++ b/src/conclave/transport.py @@ -13,6 +13,8 @@ from __future__ import annotations +from collections.abc import AsyncIterator + import httpx from .logging import get_logger @@ -82,6 +84,94 @@ async def post_json( return response.status_code, body +async def stream_sse( + url: str, + headers: dict[str, str], + json_body: dict, + timeout: float, +) -> AsyncIterator[tuple[str, str]]: + """POST a JSON body and yield Server-Sent Events as ``(event, data)`` pairs. + + The streaming counterpart of :func:`post_json` and the single streaming + network boundary for conclave (issue #7). It reuses the same pooled client + and timeout plumbing, and -- like ``post_json`` -- knows nothing about auth + headers or provider response shapes: it parses the SSE wire format and hands + each event back to the adapter to interpret. + + SSE framing parsed here (the subset every supported vendor uses): + + * Events are separated by a blank line. + * ``event: `` sets the event name for the current event (Anthropic + uses named events; OpenAI/Gemini do not, so ``event`` is ``""`` there). + * ``data: `` lines are accumulated (multiple ``data:`` lines in one + event are joined with ``\\n``, per the SSE spec). + * Comment lines (starting ``:``) and other fields are ignored. + + A non-2xx status on the streaming response is surfaced as a + :class:`TransportError` whose message includes the status and a bounded, + decoded body snippet (the adapter wraps it as a ``ProviderError`` upstream). + The body is read fully only on the error path; on success nothing is + buffered beyond one line at a time. + + Args: + url: Fully-qualified endpoint URL built by the adapter. + headers: Request headers built by the adapter (may carry the API key). + json_body: The request payload to serialize as JSON (already carrying + the provider's stream-enabling flag). + timeout: Per-call timeout in seconds (applied to the whole request). + + Yields: + ``(event_name, data)`` pairs in arrival order. ``event_name`` is ``""`` + when the stream omits ``event:`` lines. ``data`` is the raw payload + string (typically JSON, or the ``[DONE]`` sentinel for OpenAI-style + streams); the adapter decodes it. + + Raises: + TransportError: On any network-level failure (timeout, connection + error) or a non-2xx streaming status. The message names only the + failure kind / HTTP status and never echoes the headers. + """ + client = _get_client() + try: + async with client.stream( + "POST", url, headers=headers, json=json_body, timeout=timeout + ) as response: + if response.status_code < 200 or response.status_code >= 300: + # Drain the error body so we can report a useful, bounded detail. + # aread() is required before the response is consumed/closed. + raw = await response.aread() + detail = raw.decode("utf-8", "replace")[:500] + raise TransportError(f"HTTP {response.status_code}: {detail}") + + event_name = "" + data_lines: list[str] = [] + async for line in response.aiter_lines(): + # A blank line terminates the current event -> dispatch it. + if line == "": + if data_lines: + yield event_name, "\n".join(data_lines) + event_name = "" + data_lines = [] + continue + if line.startswith(":"): + # SSE comment / keep-alive ping; ignore. + continue + if line.startswith("event:"): + event_name = line[len("event:") :].strip() + elif line.startswith("data:"): + data_lines.append(line[len("data:") :].lstrip()) + # Any other field (id:, retry:, ...) is irrelevant here. + + # Flush a final event with no trailing blank line (some servers do + # not emit the terminating newline). + if data_lines: + yield event_name, "\n".join(data_lines) + except httpx.TimeoutException as exc: + raise TransportError(f"request timed out after {timeout:.0f}s") from exc + except httpx.HTTPError as exc: + raise TransportError(f"network error: {type(exc).__name__}") from exc + + async def aclose() -> None: """Close the shared client. Optional; primarily for clean test teardown.""" global _client diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..89e5c23 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,577 @@ +"""Tests for streaming member answers + synthesis (issue #7). + +All tests run offline. Two layers are exercised: + +* **Per-adapter SSE parsing** drives the *real* ``call_model_stream`` -> + ``transport.stream_sse`` -> adapter ``parse_sse_event`` path through an + :class:`httpx.MockTransport` that emits a multi-chunk SSE byte stream for each + adapter family (openai-compat, anthropic, gemini). We assert chunks arrive + incrementally AND that the assembled ``ModelAnswer.answer`` equals the + concatenation -- and matches what the buffered path produces for the same + content. +* **Council / CLI streaming** drives ``Council.ask_stream`` and the CLI + ``--stream`` flag with ``call_model_stream`` patched, covering interleaving, + the terminal ``done`` result, the never-opened-stream guarantee for the + default path, the mid-stream error contract, and the cache interaction. +""" + +from __future__ import annotations + +import json + +import httpx +import pytest +from typer.testing import CliRunner + +from conclave import Council, cli, transport +from conclave.config import ConclaveConfig +from conclave.models import ModelAnswer, StreamEvent +from conclave.providers import call_model, call_model_stream + +runner = CliRunner() + + +# --------------------------------------------------------------------------- # +# MockTransport-backed streaming client (mirrors tests/test_transport.py) +# --------------------------------------------------------------------------- # + + +@pytest.fixture +async def mock_stream_client(): + """Install a MockTransport-backed pooled client; restore the global after. + + Returns an installer ``use(handler)`` where ``handler(request) -> Response``. + For streaming, the handler returns an ``httpx.Response`` whose content is an + iterable of SSE byte chunks, so ``transport.stream_sse`` reads them via + ``aiter_lines`` exactly as it would a real network stream. + """ + saved = transport._client + created: list[httpx.AsyncClient] = [] + + def use(handler): + client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + created.append(client) + transport._client = client + return client + + yield use + + for client in created: + if not client.is_closed: + await client.aclose() + transport._client = saved + + +def _sse(*frames: str) -> bytes: + """Join raw SSE frame blocks into a single body (blank-line separated).""" + return ("".join(f"{frame}\n\n" for frame in frames)).encode("utf-8") + + +async def _collect(name, model_id, **kwargs): + """Run call_model_stream, returning (text_chunks, final_ModelAnswer).""" + chunks: list[str] = [] + final: ModelAnswer | None = None + async for item in call_model_stream( + name, model_id, [{"role": "user", "content": "hi"}], **kwargs + ): + if isinstance(item, ModelAnswer): + final = item + else: + chunks.append(item) + return chunks, final + + +# --------------------------------------------------------------------------- # +# Per-adapter SSE parsing (real adapter + real transport, mocked bytes) +# --------------------------------------------------------------------------- # + + +async def test_openai_compat_stream_assembles_and_increments(monkeypatch, mock_stream_client): + """OpenAI-style deltas stream incrementally; final answer = concatenation.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + captured = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = json.loads(request.read()) + body = _sse( + 'data: {"choices":[{"delta":{"role":"assistant"}}]}', + 'data: {"choices":[{"delta":{"content":"Hello"}}]}', + 'data: {"choices":[{"delta":{"content":", "}}]}', + 'data: {"choices":[{"delta":{"content":"world"}}]}', + 'data: {"choices":[],"usage":{"prompt_tokens":3,"completion_tokens":2,' + '"total_tokens":5}}', + "data: [DONE]", + ) + return httpx.Response(200, content=body, headers={"content-type": "text/event-stream"}) + + mock_stream_client(handler) + chunks, final = await _collect("openai", "openai/gpt-4.1") + + # Streamed incrementally as separate text chunks (role-only delta carries no text). + assert chunks == ["Hello", ", ", "world"] + assert final is not None and final.ok + assert final.answer == "Hello, world" + assert final.answer == "".join(chunks) + assert final.usage is not None and final.usage.total_tokens == 5 + # The request actually enabled streaming + usage accounting. + assert captured["body"]["stream"] is True + assert captured["body"]["stream_options"] == {"include_usage": True} + + +async def test_anthropic_stream_assembles_and_increments(monkeypatch, mock_stream_client): + """Anthropic named-event SSE: text_delta -> text; usage from start+delta frames.""" + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + captured = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["body"] = json.loads(request.read()) + body = _sse( + "event: message_start\n" + 'data: {"type":"message_start","message":{"usage":{"input_tokens":10,' + '"output_tokens":1}}}', + "event: content_block_start\n" + 'data: {"type":"content_block_start","index":0,' + '"content_block":{"type":"text","text":""}}', + "event: content_block_delta\n" + 'data: {"type":"content_block_delta","index":0,' + '"delta":{"type":"text_delta","text":"Hel"}}', + "event: content_block_delta\n" + 'data: {"type":"content_block_delta","index":0,' + '"delta":{"type":"text_delta","text":"lo"}}', + 'event: content_block_stop\ndata: {"type":"content_block_stop","index":0}', + "event: message_delta\n" + 'data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},' + '"usage":{"output_tokens":7}}', + 'event: message_stop\ndata: {"type":"message_stop"}', + ) + return httpx.Response(200, content=body, headers={"content-type": "text/event-stream"}) + + mock_stream_client(handler) + chunks, final = await _collect("claude", "anthropic/claude-sonnet-4-6") + + assert chunks == ["Hel", "lo"] + assert final is not None and final.ok + assert final.answer == "Hello" + assert final.answer == "".join(chunks) + # input_tokens from message_start, output_tokens from message_delta, total = sum. + assert final.usage is not None + assert final.usage.prompt_tokens == 10 + assert final.usage.completion_tokens == 7 + assert final.usage.total_tokens == 17 + assert captured["body"]["stream"] is True + + +async def test_gemini_stream_assembles_and_increments(monkeypatch, mock_stream_client): + """Gemini alt=sse: each chunk carries parts[].text; cumulative usageMetadata.""" + monkeypatch.setenv("GEMINI_API_KEY", "AIza-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + captured = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + body = _sse( + 'data: {"candidates":[{"content":{"parts":[{"text":"Foo"}],"role":"model"}}]}', + 'data: {"candidates":[{"content":{"parts":[{"text":"bar"}],"role":"model"}}],' + '"usageMetadata":{"promptTokenCount":4,"candidatesTokenCount":2,' + '"totalTokenCount":6}}', + ) + return httpx.Response(200, content=body, headers={"content-type": "text/event-stream"}) + + mock_stream_client(handler) + chunks, final = await _collect("gemini", "gemini/gemini-2.5-pro") + + assert chunks == ["Foo", "bar"] + assert final is not None and final.ok + assert final.answer == "Foobar" + assert final.answer == "".join(chunks) + assert final.usage is not None and final.usage.total_tokens == 6 + # The streaming URL targets streamGenerateContent with ?alt=sse. + assert ":streamGenerateContent" in captured["url"] + assert "alt=sse" in captured["url"] + + +async def test_stream_final_answer_matches_buffered(monkeypatch, mock_stream_client): + """The assembled streamed answer equals the buffered parse for the same content.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + full = "The quick brown fox" + + def stream_handler(request: httpx.Request) -> httpx.Response: + frames = [f'data: {{"choices":[{{"delta":{{"content":"{w} "}}}}]}}' for w in full.split()] + frames.append("data: [DONE]") + return httpx.Response(200, content=_sse(*frames)) + + mock_stream_client(stream_handler) + _chunks, streamed = await _collect("openai", "openai/gpt-4.1") + + def buffered_handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={"choices": [{"message": {"content": streamed.answer}}]}) + + mock_stream_client(buffered_handler) + buffered = await call_model("openai", "openai/gpt-4.1", [{"role": "user", "content": "hi"}]) + + assert streamed.answer == buffered.answer + + +# --------------------------------------------------------------------------- # +# Mid-stream error: partial text preserved, error set, never raises +# --------------------------------------------------------------------------- # + + +async def test_midstream_malformed_frame_preserves_partial(monkeypatch, mock_stream_client): + """A malformed SSE frame mid-stream -> error set, partial text kept, no raise.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + def handler(request: httpx.Request) -> httpx.Response: + body = _sse( + 'data: {"choices":[{"delta":{"content":"partial "}}]}', + "data: {not valid json", # malformed -> ProviderError in parse_sse_event + "data: [DONE]", + ) + return httpx.Response(200, content=body) + + mock_stream_client(handler) + chunks, final = await _collect("openai", "openai/gpt-4.1") + + # The good chunk was streamed before the failure. + assert chunks == ["partial "] + assert final is not None + assert not final.ok + assert final.error is not None + assert "partial " in (final.answer or "") # partial text preserved on the answer + + +async def test_midstream_connection_drop_preserves_partial(monkeypatch, mock_stream_client): + """A transport-level drop mid-stream -> error set, partial preserved, no raise.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + class DroppingStream(httpx.AsyncByteStream): + """Emits one good SSE frame, then raises a read error mid-stream.""" + + async def __aiter__(self): + yield b'data: {"choices":[{"delta":{"content":"half "}}]}\n\n' + raise httpx.ReadError("connection dropped") + + async def aclose(self): # pragma: no cover - nothing to release + return None + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, stream=DroppingStream()) + + mock_stream_client(handler) + chunks, final = await _collect("openai", "openai/gpt-4.1") + + assert chunks == ["half "] + assert final is not None and not final.ok + assert "network error" in final.error + assert (final.answer or "") == "half " + + +async def test_stream_non2xx_status_is_error(monkeypatch, mock_stream_client): + """A non-2xx streaming status becomes a non-raising ModelAnswer.error.""" + monkeypatch.setenv("OPENAI_API_KEY", "sk-test") + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(401, json={"error": {"message": "invalid api key"}}) + + mock_stream_client(handler) + chunks, final = await _collect("openai", "openai/gpt-4.1") + + assert chunks == [] + assert final is not None and not final.ok + assert "401" in final.error + + +async def test_stream_missing_key_is_error(monkeypatch): + """No key set -> a clean ModelAnswer.error naming the env var, no stream opened.""" + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + chunks, final = await _collect("openai", "openai/gpt-4.1") + assert chunks == [] + assert final is not None and not final.ok + assert "OPENAI_API_KEY" in final.error + + +async def test_stream_key_redacted_in_error(monkeypatch, mock_stream_client): + """A key value echoed in a streaming error body is scrubbed via redact().""" + fake_key = "sk-streamleak-0123456789abcdef" + monkeypatch.setenv("OPENAI_API_KEY", fake_key) + monkeypatch.setenv("CONCLAVE_CONFIG", "/nonexistent/conclave.yml") + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(401, json={"error": {"message": f"bad key {fake_key}"}}) + + mock_stream_client(handler) + _chunks, final = await _collect("openai", "openai/gpt-4.1") + assert final is not None and not final.ok + assert fake_key not in final.error + assert "[REDACTED]" in final.error + + +# --------------------------------------------------------------------------- # +# Non-streaming default path must never open a stream +# --------------------------------------------------------------------------- # + + +def _config() -> ConclaveConfig: + return ConclaveConfig( + models={ + "grok": "xai/grok-4.3", + "gemini": "gemini/gemini-2.5-pro", + "claude": "anthropic/claude-sonnet-4-6", + }, + councils={"default": ["grok", "gemini", "claude"]}, + synthesizer="claude", + ) + + +async def test_default_ask_never_opens_a_stream(monkeypatch, patch_call_model): + """The buffered ask() path must not call transport.stream_sse at all.""" + for var in ("XAI_API_KEY", "GEMINI_API_KEY", "ANTHROPIC_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + + opened = {"n": 0} + + async def spy_stream_sse(*args, **kwargs): # pragma: no cover - must not run + opened["n"] += 1 + if False: + yield # make it an async generator + + monkeypatch.setattr(transport, "stream_sse", spy_stream_sse) + + def handler(model, messages, **kwargs): + from tests.conftest import make_response + + return make_response(f"ans-{model}") + + patch_call_model(handler) + council = Council(models=["grok", "gemini"], synthesizer="claude", config=_config()) + result = await council.ask("hi", synthesize=False) + assert all(a.ok for a in result.answers) + assert opened["n"] == 0, "buffered ask() must never open a stream" + + +# --------------------------------------------------------------------------- # +# Council.ask_stream interleaving + terminal result +# --------------------------------------------------------------------------- # + + +def _patch_stream(monkeypatch, deltas_by_model): + """Patch streaming.call_model_stream to emit canned deltas + a final answer.""" + import conclave.streaming as streaming_mod + + async def fake_stream(name, model_id, messages, *, temperature=0.7, timeout=120.0, config=None): + text_parts = deltas_by_model.get(model_id, ["x"]) + for part in text_parts: + yield part + yield ModelAnswer(name=name, model_id=model_id, answer="".join(text_parts)) + + monkeypatch.setattr(streaming_mod, "call_model_stream", fake_stream) + + +async def test_ask_stream_yields_member_and_done_events(monkeypatch): + """ask_stream yields member deltas/dones for each member then a done result.""" + for var in ("XAI_API_KEY", "GEMINI_API_KEY", "ANTHROPIC_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + + _patch_stream( + monkeypatch, + { + "xai/grok-4.3": ["A", "B"], + "gemini/gemini-2.5-pro": ["C"], + "anthropic/claude-sonnet-4-6": ["SYN"], # synthesizer + }, + ) + + council = Council(models=["grok", "gemini"], synthesizer="claude", config=_config()) + events = [e async for e in council.ask_stream("hi", synthesize=True)] + + types = [e.type for e in events] + assert types[-1] == "done" + assert "member_delta" in types + assert types.count("member_done") == 2 + assert "synthesis_delta" in types + assert "synthesis_done" in types + + done = events[-1] + assert done.result is not None + # Final result shape matches non-streaming: 2 answers ordered by members list. + assert [a.name for a in done.result.answers] == ["grok", "gemini"] + assert done.result.answers[0].answer == "AB" + assert done.result.synthesis == "SYN" + + +async def test_ask_stream_raw_skips_synthesis(monkeypatch): + """raw mode (synthesize=False) yields no synthesis events.""" + for var in ("XAI_API_KEY", "GEMINI_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) + + _patch_stream(monkeypatch, {"xai/grok-4.3": ["a"], "gemini/gemini-2.5-pro": ["b"]}) + council = Council(models=["grok", "gemini"], synthesizer="claude", config=_config()) + events = [e async for e in council.ask_stream("hi", synthesize=False)] + + assert not any(e.type.startswith("synthesis") for e in events) + assert events[-1].type == "done" + assert events[-1].result.synthesis is None + + +async def test_ask_stream_no_members_emits_done(monkeypatch): + """Zero available members -> a single done event, empty answers, no raise.""" + for var in ("XAI_API_KEY", "GEMINI_API_KEY", "ANTHROPIC_API_KEY"): + monkeypatch.delenv(var, raising=False) + + council = Council(models=["grok", "gemini"], synthesizer="claude", config=_config()) + events = [e async for e in council.ask_stream("hi")] + assert len(events) == 1 + assert events[0].type == "done" + assert events[0].result.answers == [] + assert events[0].result.skipped == ["grok", "gemini"] + + +# --------------------------------------------------------------------------- # +# CLI --stream smoke + exit-code contract +# --------------------------------------------------------------------------- # + + +@pytest.fixture +def patch_cli_config(monkeypatch): + monkeypatch.setattr(cli, "load_config", _config) + + +def test_cli_stream_smoke_exits_zero(monkeypatch, patch_cli_config): + """--stream renders live output and exits 0 on a successful mocked stream.""" + import conclave.streaming as streaming_mod + + for var in ("XAI_API_KEY", "GEMINI_API_KEY", "ANTHROPIC_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + + async def fake_stream(name, model_id, messages, *, temperature=0.7, timeout=120.0, config=None): + yield f"tok-{name} " + yield ModelAnswer(name=name, model_id=model_id, answer=f"tok-{name} ") + + monkeypatch.setattr(streaming_mod, "call_model_stream", fake_stream) + + result = runner.invoke( + cli.app, ["ask", "hello", "--council", "grok,gemini", "--mode", "raw", "--stream"] + ) + assert result.exit_code == 0 + assert "tok-grok" in result.output + assert "tok-gemini" in result.output + + +def test_cli_stream_zero_usable_exits_one(monkeypatch, patch_cli_config): + """--stream honors the exit-code contract: zero usable answers -> exit 1.""" + import conclave.streaming as streaming_mod + + for var in ("XAI_API_KEY", "GEMINI_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + + async def fake_stream(name, model_id, messages, *, temperature=0.7, timeout=120.0, config=None): + yield ModelAnswer(name=name, model_id=model_id, error="provider down") + + monkeypatch.setattr(streaming_mod, "call_model_stream", fake_stream) + + result = runner.invoke( + cli.app, ["ask", "hello", "--council", "grok,gemini", "--mode", "raw", "--stream"] + ) + assert result.exit_code == 1 + assert "No usable council answers" in result.output + + +def test_cli_stream_rejected_for_debate(patch_cli_config): + """--stream is rejected (exit 2) for non synthesize/raw modes.""" + result = runner.invoke( + cli.app, ["ask", "hello", "--council", "grok", "--mode", "debate", "--stream"] + ) + assert result.exit_code == 2 + assert "only supported for synthesize/raw" in result.output + + +# --------------------------------------------------------------------------- # +# --stream + cache: first run streams + populates; second is a one-shot hit +# --------------------------------------------------------------------------- # + + +def test_cli_stream_cache_second_run_is_one_shot_hit(monkeypatch, patch_cli_config, tmp_path): + """--stream --cache: 2nd identical run is a hit rendered in one shot (no calls).""" + import conclave.streaming as streaming_mod + + monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path)) + for var in ("XAI_API_KEY", "GEMINI_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + + calls = {"n": 0} + + async def fake_stream(name, model_id, messages, *, temperature=0.7, timeout=120.0, config=None): + calls["n"] += 1 + yield "live " + yield ModelAnswer(name=name, model_id=model_id, answer="live answer") + + monkeypatch.setattr(streaming_mod, "call_model_stream", fake_stream) + + args = ["ask", "2+2?", "--council", "grok", "--mode", "raw", "--stream", "--cache"] + first = runner.invoke(cli.app, args) + assert first.exit_code == 0 + n_after_first = calls["n"] + assert n_after_first > 0 + + second = runner.invoke(cli.app, args) + assert second.exit_code == 0 + # No new provider stream calls -> served from cache, rendered one-shot. + assert calls["n"] == n_after_first + assert "live answer" in second.output + + +async def test_ask_stream_cache_hit_replays_one_shot(monkeypatch, tmp_path): + """A cache hit replays as single-shot deltas with cached result, no live stream.""" + import conclave.streaming as streaming_mod + + monkeypatch.setenv("XDG_CACHE_HOME", str(tmp_path)) + for var in ("XAI_API_KEY", "GEMINI_API_KEY", "ANTHROPIC_API_KEY"): + monkeypatch.setenv(var, "dummy-key") + + live_calls = {"n": 0} + + async def fake_stream(name, model_id, messages, *, temperature=0.7, timeout=120.0, config=None): + live_calls["n"] += 1 + yield "x" + yield ModelAnswer(name=name, model_id=model_id, answer="x") + + monkeypatch.setattr(streaming_mod, "call_model_stream", fake_stream) + + council = Council(models=["grok", "gemini"], synthesizer="claude", config=_config(), cache=True) + # First run populates the cache. + first = [e async for e in council.ask_stream("hi", synthesize=False)] + assert first[-1].result.cached is False + n_after_first = live_calls["n"] + + # Second identical run is a cache hit -> no new live stream calls. + second = [e async for e in council.ask_stream("hi", synthesize=False)] + assert live_calls["n"] == n_after_first + done = second[-1] + assert done.type == "done" + assert done.result.cached is True + # One member_delta per member (one-shot), each followed by a member_done. + member_deltas = [e for e in second if e.type == "member_delta"] + assert len(member_deltas) == 2 + + +def test_stream_event_done_carries_full_result_shape(): + """StreamEvent('done') carries a CouncilResult that serializes secret-free.""" + from conclave.models import CouncilResult + + ev = StreamEvent(type="done", result=CouncilResult(prompt="p")) + dumped = ev.model_dump(mode="json") + assert dumped["type"] == "done" + assert dumped["result"]["prompt"] == "p"