Skip to content

feat(streaming): stream member answers + synthesis (closes #7)#40

Merged
ernestprovo23 merged 1 commit into
mainfrom
feat/streaming
Jun 9, 2026
Merged

feat(streaming): stream member answers + synthesis (closes #7)#40
ernestprovo23 merged 1 commit into
mainfrom
feat/streaming

Conversation

@ernestprovo23

Copy link
Copy Markdown
Member

Implements GitHub issue #7 — streaming member answers / synthesis (PDD §9 #5). Scope is deliberately the synthesize/raw path only; debate/adversarial are untouched.

What this adds

  • Library streaming APICouncil.ask_stream(prompt, synthesize=True), an async generator yielding typed StreamEvents (member_delta/member_done, synthesis_delta/synthesis_done, terminal done carrying the full CouncilResult). Council.stream_sync(prompt, on_event, ...) drives it for non-async callers. Chosen over a callback hook because the codebase is already async-iterator-first (httpx aiter_lines); an async generator composes with async for and lets the terminal done event deliver a CouncilResult whose shape is identical to ask(), so downstream consumers and the cache are unaffected.
  • CLI --stream flag — live token output for synthesize/raw. Ignored under --json; rejected (exit 2) for debate/adversarial. Honors the existing exit-code contract (1 on zero usable answers).

Transport + per-adapter SSE (verified against current vendor docs)

  • transport.stream_sse — single streaming httpx boundary (client.stream(...)), reuses the pooled client + timeout + redact() plumbing; parses SSE framing (event:/data: lines, blank-line separation, comment/ping skip).
  • OpenAI-compatible (openai/xai/perplexity/groq/deepseek/mistral/together): stream:true + stream_options.include_usage:true; deltas at choices[0].delta.content; data: [DONE] sentinel; final empty-choices chunk carries top-level usage.
  • Anthropic /v1/messages stream:true: content_block_deltadelta.text (text_delta); input_tokens from message_start, cumulative output_tokens from message_delta; message_stop ends the stream.
  • Gemini streamGenerateContent?alt=sse: candidates[0].content.parts[].text per chunk; cumulative usageMetadata; stream ends by EOF (no sentinel).

Never-raises + partial text

providers.call_model_stream mirrors call_model's contract exactly: a non-streamable model falls back to the buffered call emitted as one chunk; a missing key / unknown provider / mid-stream malformed frame / connection drop / non-2xx all become a ModelAnswer.error with any partial text preserved, and never raise out of the call path. Error text is redact()-scrubbed; keys are read by env-var name only. The final assembled ModelAnswer (text + merged usage) matches the buffered parse_response result, so synthesis/CouncilResult are unchanged.

Cache interaction

A --stream run still populates/serves the cache. A cache hit has no live tokens → the cached final text is rendered in one shot (single member_delta/synthesis_delta per source, then *_done, then done with cached=True); providers are not called. A miss streams live and stores the assembled result on completion.

Placement

SSE parsing → transport.py; per-adapter request/delta → adapter stream_request/parse_sse_event (matching the existing build_request/parse_response split); per-model orchestration → providers.call_model_stream; council fan-out streaming → new focused streaming.py (keeps council.py from inflating).

Tests

+19 (172 → 191), all offline / transport-mocked. Per-adapter multi-chunk SSE (incremental arrival + assembled == concatenation == buffered result), mid-stream malformed-frame / connection-drop / non-2xx (error + partial preserved, no raise), default ask() never opens a stream, CLI --stream smoke + exit-code + debate-rejection, --stream+cache one-shot replay. ruff check + ruff format --check clean.

Do not merge — orchestrator reviews and merges.

Add streaming for the synthesize/raw path: a library async-generator API
(Council.ask_stream / stream_sync) yielding StreamEvents, and a CLI --stream
flag that renders live token output. Non-streaming remains the default and is
unchanged.

Transport: new stream_sse — the single streaming httpx boundary
(client.stream(...)), reusing the pooled client + timeout + redact plumbing and
parsing the SSE wire format (event/data lines, blank-line framing).

Adapters: each builds its own streaming request (stream_request) and parses its
own SSE delta shape (parse_sse_event), verified against current vendor docs:
- openai-compat: stream:true + stream_options.include_usage, choices[].delta
  .content deltas, data:[DONE] sentinel, final empty-choices usage chunk
- anthropic: stream:true, content_block_delta/text_delta text, input_tokens on
  message_start + cumulative output_tokens on message_delta, message_stop
- gemini: streamGenerateContent?alt=sse, candidates[0].content.parts[].text per
  chunk, cumulative usageMetadata

providers.call_model_stream mirrors call_model's never-raises contract: a
non-streamable model, missing key, or any mid-stream error degrades to a
ModelAnswer.error with partial text preserved and never raises; the assembled
ModelAnswer (text + merged usage) matches the buffered result shape.

streaming.py holds council-level orchestration (concurrent member interleaving
via asyncio.Queue, optional synthesizer streaming, terminal done event) so
council.py stays focused. --stream + cache: a hit renders the cached final text
in one shot (no fake token stream). debate/adversarial streaming is out of scope.

Tests: +19 (172 -> 191) covering per-adapter SSE assembly, mid-stream
error/partial-text, non-streaming-never-opens-a-stream, CLI --stream smoke +
exit-code contract, and --stream+cache one-shot replay. ruff clean.

Docs: README (CLI + library streaming), system context diagram, PDD §9 #5 marked
LANDED + §6 module table, Documentation_Index.
@ernestprovo23 ernestprovo23 merged commit 611a97f into main Jun 9, 2026
5 checks passed
@ernestprovo23 ernestprovo23 deleted the feat/streaming branch June 10, 2026 00:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant