feat(streaming): stream member answers + synthesis (closes #7)#40
Merged
Conversation
Add streaming for the synthesize/raw path: a library async-generator API (Council.ask_stream / stream_sync) yielding StreamEvents, and a CLI --stream flag that renders live token output. Non-streaming remains the default and is unchanged. Transport: new stream_sse — the single streaming httpx boundary (client.stream(...)), reusing the pooled client + timeout + redact plumbing and parsing the SSE wire format (event/data lines, blank-line framing). Adapters: each builds its own streaming request (stream_request) and parses its own SSE delta shape (parse_sse_event), verified against current vendor docs: - openai-compat: stream:true + stream_options.include_usage, choices[].delta .content deltas, data:[DONE] sentinel, final empty-choices usage chunk - anthropic: stream:true, content_block_delta/text_delta text, input_tokens on message_start + cumulative output_tokens on message_delta, message_stop - gemini: streamGenerateContent?alt=sse, candidates[0].content.parts[].text per chunk, cumulative usageMetadata providers.call_model_stream mirrors call_model's never-raises contract: a non-streamable model, missing key, or any mid-stream error degrades to a ModelAnswer.error with partial text preserved and never raises; the assembled ModelAnswer (text + merged usage) matches the buffered result shape. streaming.py holds council-level orchestration (concurrent member interleaving via asyncio.Queue, optional synthesizer streaming, terminal done event) so council.py stays focused. --stream + cache: a hit renders the cached final text in one shot (no fake token stream). debate/adversarial streaming is out of scope. Tests: +19 (172 -> 191) covering per-adapter SSE assembly, mid-stream error/partial-text, non-streaming-never-opens-a-stream, CLI --stream smoke + exit-code contract, and --stream+cache one-shot replay. ruff clean. Docs: README (CLI + library streaming), system context diagram, PDD §9 #5 marked LANDED + §6 module table, Documentation_Index.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements GitHub issue #7 — streaming member answers / synthesis (PDD §9 #5). Scope is deliberately the synthesize/raw path only;
debate/adversarialare untouched.What this adds
Council.ask_stream(prompt, synthesize=True), an async generator yielding typedStreamEvents (member_delta/member_done,synthesis_delta/synthesis_done, terminaldonecarrying the fullCouncilResult).Council.stream_sync(prompt, on_event, ...)drives it for non-async callers. Chosen over a callback hook because the codebase is already async-iterator-first (httpxaiter_lines); an async generator composes withasync forand lets the terminaldoneevent deliver aCouncilResultwhose shape is identical toask(), so downstream consumers and the cache are unaffected.--streamflag — live token output forsynthesize/raw. Ignored under--json; rejected (exit 2) fordebate/adversarial. Honors the existing exit-code contract (1 on zero usable answers).Transport + per-adapter SSE (verified against current vendor docs)
transport.stream_sse— single streaming httpx boundary (client.stream(...)), reuses the pooled client + timeout +redact()plumbing; parses SSE framing (event:/data:lines, blank-line separation, comment/ping skip).stream:true+stream_options.include_usage:true; deltas atchoices[0].delta.content;data: [DONE]sentinel; final empty-choiceschunk carries top-levelusage./v1/messagesstream:true:content_block_delta→delta.text(text_delta);input_tokensfrommessage_start, cumulativeoutput_tokensfrommessage_delta;message_stopends the stream.streamGenerateContent?alt=sse:candidates[0].content.parts[].textper chunk; cumulativeusageMetadata; stream ends by EOF (no sentinel).Never-raises + partial text
providers.call_model_streammirrorscall_model's contract exactly: a non-streamable model falls back to the buffered call emitted as one chunk; a missing key / unknown provider / mid-stream malformed frame / connection drop / non-2xx all become aModelAnswer.errorwith any partial text preserved, and never raise out of the call path. Error text isredact()-scrubbed; keys are read by env-var name only. The final assembledModelAnswer(text + merged usage) matches the bufferedparse_responseresult, so synthesis/CouncilResultare unchanged.Cache interaction
A
--streamrun still populates/serves the cache. A cache hit has no live tokens → the cached final text is rendered in one shot (singlemember_delta/synthesis_deltaper source, then*_done, thendonewithcached=True); providers are not called. A miss streams live and stores the assembled result on completion.Placement
SSE parsing →
transport.py; per-adapter request/delta → adapterstream_request/parse_sse_event(matching the existingbuild_request/parse_responsesplit); per-model orchestration →providers.call_model_stream; council fan-out streaming → new focusedstreaming.py(keepscouncil.pyfrom inflating).Tests
+19 (172 → 191), all offline / transport-mocked. Per-adapter multi-chunk SSE (incremental arrival + assembled == concatenation == buffered result), mid-stream malformed-frame / connection-drop / non-2xx (error + partial preserved, no raise), default
ask()never opens a stream, CLI--streamsmoke + exit-code + debate-rejection,--stream+cache one-shot replay.ruff check+ruff format --checkclean.Do not merge — orchestrator reviews and merges.