diff --git a/src/slop_code/agent_runner/agents/cli_utils.py b/src/slop_code/agent_runner/agents/cli_utils.py index 3871ba55c..8fb75605d 100644 --- a/src/slop_code/agent_runner/agents/cli_utils.py +++ b/src/slop_code/agent_runner/agents/cli_utils.py @@ -56,37 +56,49 @@ def stream_cli_command( parse_stderr: If True, also parse stderr lines through the parser """ env = dict(env or {}) - stdout_buffer = "" - stderr_buffer = "" - stdout = stderr = "" + stdout_buffer_parts: list[str] = [] + stderr_buffer_parts: list[str] = [] + stdout_parts: list[str] = [] + stderr_parts: list[str] = [] start = time.monotonic() result: RuntimeResult | None = None for event in runtime.stream(command=command, env=env, timeout=timeout): if event.kind == "stdout": - stdout += event.text or "" - stdout_buffer += event.text or "" - while "\n" in stdout_buffer: - line, stdout_buffer = stdout_buffer.split("\n", 1) - line = line.strip() - if not line: - continue - yield parser(line) - elif event.kind == "stderr": - stderr += event.text or "" - if parse_stderr: - stderr_buffer += event.text or "" - while "\n" in stderr_buffer: - line, stderr_buffer = stderr_buffer.split("\n", 1) + chunk = event.text or "" + stdout_parts.append(chunk) + stdout_buffer_parts.append(chunk) + if "\n" in chunk: + stdout_buffer = "".join(stdout_buffer_parts) + while "\n" in stdout_buffer: + line, stdout_buffer = stdout_buffer.split("\n", 1) line = line.strip() if not line: continue yield parser(line) + stdout_buffer_parts = [stdout_buffer] if stdout_buffer else [] + elif event.kind == "stderr": + chunk = event.text or "" + stderr_parts.append(chunk) + if parse_stderr: + stderr_buffer_parts.append(chunk) + if "\n" in chunk: + stderr_buffer = "".join(stderr_buffer_parts) + while "\n" in stderr_buffer: + line, stderr_buffer = stderr_buffer.split("\n", 1) + line = line.strip() + if not line: + continue + yield parser(line) + stderr_buffer_parts = ( + [stderr_buffer] if stderr_buffer else [] + ) elif event.kind == "finished": result = event.result break # Flush remaining stdout buffer + stdout_buffer = "".join(stdout_buffer_parts) for line in stdout_buffer.split("\n"): line = line.strip() if not line: @@ -95,6 +107,7 @@ def stream_cli_command( # Flush remaining stderr buffer if parsing stderr if parse_stderr: + stderr_buffer = "".join(stderr_buffer_parts) for line in stderr_buffer.split("\n"): line = line.strip() if not line: @@ -104,8 +117,8 @@ def stream_cli_command( if result is None: result = RuntimeResult( exit_code=0, - stdout=stdout, - stderr=stderr, + stdout="".join(stdout_parts), + stderr="".join(stderr_parts), setup_stdout="", setup_stderr="", elapsed=time.monotonic() - start, diff --git a/src/slop_code/execution/stream_processor.py b/src/slop_code/execution/stream_processor.py index 6e5497fbb..ef62d31a4 100644 --- a/src/slop_code/execution/stream_processor.py +++ b/src/slop_code/execution/stream_processor.py @@ -98,8 +98,8 @@ def process_stream( tuple[Literal["stdout", "stderr", "finished"], str | None] ] = queue.Queue() thread = start_stream_pump(stream, event_queue, stop_event) - stdout = "" - stderr = "" + stdout_parts: list[str] = [] + stderr_parts: list[str] = [] setup_stdout = "" setup_stderr = "" yielding_stdout = yield_only_after is None @@ -110,34 +110,32 @@ def handle_event( kind: Literal["stdout", "stderr"], payload: str, ) -> Iterator[RuntimeEvent]: - nonlocal stdout, stderr, setup_stdout, setup_stderr + nonlocal stdout_parts, stderr_parts, setup_stdout, setup_stderr nonlocal yielding_stdout, yielding_stderr if kind == "stdout": - stdout += payload - if ( - not yielding_stdout - and yield_only_after - and yield_only_after in stdout - ): - yielding_stdout = True - setup_stdout, stdout = stdout.split(yield_only_after, 1) - payload = stdout + stdout_parts.append(payload) + if not yielding_stdout and yield_only_after: + stdout = "".join(stdout_parts) + if yield_only_after in stdout: + yielding_stdout = True + setup_stdout, stdout = stdout.split(yield_only_after, 1) + stdout_parts = [stdout] + payload = stdout if yielding_stdout and payload.strip(): yield RuntimeEvent(kind="stdout", text=payload) return if kind == "stderr": - stderr += payload - if ( - not yielding_stderr - and yield_only_after - and yield_only_after in stderr - ): - yielding_stderr = True - setup_stderr, stderr = stderr.split(yield_only_after, 1) - payload = stderr + stderr_parts.append(payload) + if not yielding_stderr and yield_only_after: + stderr = "".join(stderr_parts) + if yield_only_after in stderr: + yielding_stderr = True + setup_stderr, stderr = stderr.split(yield_only_after, 1) + stderr_parts = [stderr] + payload = stderr if yielding_stderr and payload.strip(): yield RuntimeEvent(kind="stderr", text=payload) @@ -196,8 +194,8 @@ def handle_event( ) return RuntimeResult( exit_code=exit_code, - stdout=stdout, - stderr=stderr, + stdout="".join(stdout_parts), + stderr="".join(stderr_parts), setup_stdout=setup_stdout, setup_stderr=setup_stderr, elapsed=elapsed, diff --git a/tests/agent_runner/agents/cli_utils_test.py b/tests/agent_runner/agents/cli_utils_test.py new file mode 100644 index 000000000..0ffe74a37 --- /dev/null +++ b/tests/agent_runner/agents/cli_utils_test.py @@ -0,0 +1,72 @@ +"""Tests for shared CLI agent helpers.""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import TYPE_CHECKING, cast + +from slop_code.agent_runner.agents.cli_utils import stream_cli_command +from slop_code.common.llms import TokenUsage +from slop_code.execution.runtime import RuntimeEvent +from slop_code.execution.runtime import RuntimeResult + +if TYPE_CHECKING: + from slop_code.execution.protocols import StreamingRuntime + + +class FakeRuntime: + def __init__(self, events: list[RuntimeEvent]) -> None: + self.events = events + + def stream( + self, + command: str, + env: dict[str, str], + timeout: float | None, + ) -> Iterator[RuntimeEvent]: + yield from self.events + + +def _parse_line(line: str) -> tuple[float | None, TokenUsage | None, dict]: + return None, None, {"line": line} + + +def test_stream_cli_command_parses_lines_across_chunk_boundaries() -> None: + result = RuntimeResult( + exit_code=0, + stdout='{"step": 1}\n{"step": 2}\n', + stderr='{"err": true}\n', + setup_stdout="", + setup_stderr="", + elapsed=0.1, + timed_out=False, + ) + runtime = cast( + "StreamingRuntime", + FakeRuntime( + [ + RuntimeEvent(kind="stdout", text='{"step"'), + RuntimeEvent(kind="stdout", text=': 1}\n{"step":'), + RuntimeEvent(kind="stdout", text=" 2}\n"), + RuntimeEvent(kind="stderr", text='{"err"'), + RuntimeEvent(kind="stderr", text=": true}\n"), + RuntimeEvent(kind="finished", result=result), + ] + ), + ) + + items = list( + stream_cli_command( + runtime=runtime, + command="agent --json", + parser=_parse_line, + parse_stderr=True, + ) + ) + + assert items == [ + (None, None, {"line": '{"step": 1}'}), + (None, None, {"line": '{"step": 2}'}), + (None, None, {"line": '{"err": true}'}), + result, + ] diff --git a/tests/execution/stream_processor_test.py b/tests/execution/stream_processor_test.py index b37e555a1..fac1b3016 100644 --- a/tests/execution/stream_processor_test.py +++ b/tests/execution/stream_processor_test.py @@ -2,7 +2,33 @@ from __future__ import annotations +from collections.abc import Iterator + +from slop_code.execution.runtime import RuntimeEvent +from slop_code.execution.runtime import RuntimeResult +from slop_code.execution.shared import SPLIT_STRING from slop_code.execution.stream_processor import ensure_string +from slop_code.execution.stream_processor import process_stream + + +def _collect_process_stream( + chunks: Iterator[tuple[str, str]], + *, + yield_only_after: str | None = None, +) -> tuple[list[RuntimeEvent], RuntimeResult]: + events: list[RuntimeEvent] = [] + generator = process_stream( + chunks, + timeout=1, + poll_fn=lambda: None, + yield_only_after=yield_only_after, + ) + + while True: + try: + events.append(next(generator)) + except StopIteration as stop: + return events, stop.value def test_ensure_string_preserves_text_around_invalid_utf8_bytes() -> None: @@ -10,3 +36,34 @@ def test_ensure_string_preserves_text_around_invalid_utf8_bytes() -> None: assert '{"type":"message_update","data":"ok"}' in decoded assert decoded.endswith("\n") + + +def test_process_stream_splits_setup_marker_across_chunks() -> None: + marker_start, marker_end = SPLIT_STRING[:10], SPLIT_STRING[10:] + + events, result = _collect_process_stream( + iter( + [ + ("setup stdout\n", ""), + (marker_start, ""), + (marker_end, ""), + ("\ncommand stdout", ""), + (" continued", ""), + ("", "setup stderr\n"), + ("", marker_start), + ("", marker_end), + ("", "\ncommand stderr"), + ] + ), + yield_only_after=SPLIT_STRING, + ) + + assert [(event.kind, event.text) for event in events] == [ + ("stdout", "\ncommand stdout"), + ("stdout", " continued"), + ("stderr", "\ncommand stderr"), + ] + assert result.setup_stdout == "setup stdout\n" + assert result.stdout == "\ncommand stdout continued" + assert result.setup_stderr == "setup stderr\n" + assert result.stderr == "\ncommand stderr"