From 64ee0e3c9484928d8b56937a6b916c6858b6bac1 Mon Sep 17 00:00:00 2001 From: Rohan Gupta Date: Wed, 27 May 2026 15:22:29 -0400 Subject: [PATCH] fix: buffer cli command output --- .../agent_runner/agents/cli_utils.py | 83 +++++++++++-------- tests/agent_runner/agents/cli_utils_test.py | 63 ++++++++++++++ 2 files changed, 113 insertions(+), 33 deletions(-) create mode 100644 tests/agent_runner/agents/cli_utils_test.py diff --git a/src/slop_code/agent_runner/agents/cli_utils.py b/src/slop_code/agent_runner/agents/cli_utils.py index 3871ba55c..6d4d94be3 100644 --- a/src/slop_code/agent_runner/agents/cli_utils.py +++ b/src/slop_code/agent_runner/agents/cli_utils.py @@ -18,6 +18,9 @@ "stream_cli_command", ] +type ParsedCliLine = tuple[float | None, TokenUsage | None, dict] +type CliLineParser = Callable[[str], ParsedCliLine] + @dataclass(slots=True) class AgentCommandResult: @@ -32,16 +35,36 @@ class AgentCommandResult: error_message: str | None = None +def _parse_buffered_lines( + buffer_parts: list[str], + parser: CliLineParser, + *, + flush: bool = False, +) -> Generator[ParsedCliLine, None, None]: + buffer = "".join(buffer_parts) + if flush: + lines = buffer.split("\n") + buffer_parts.clear() + else: + *lines, tail = buffer.split("\n") + buffer_parts[:] = [tail] if tail else [] + + for line in lines: + line = line.strip() + if line: + yield parser(line) + + def stream_cli_command( runtime: StreamingRuntime, command: str, - parser: Callable[[str], tuple[float | None, TokenUsage | None, dict]], + parser: CliLineParser, env: Mapping[str, str] | None = None, timeout: float | None = None, *, parse_stderr: bool = False, ) -> Generator[ - tuple[float | None, TokenUsage | None, dict] | RuntimeResult | None, + ParsedCliLine | RuntimeResult | None, None, None, ]: @@ -56,56 +79,50 @@ def stream_cli_command( parse_stderr: If True, also parse stderr lines through the parser """ env = dict(env or {}) - stdout_buffer = "" - stderr_buffer = "" - stdout = stderr = "" + stdout_buffer_parts: list[str] = [] + stderr_buffer_parts: list[str] = [] + stdout_parts: list[str] = [] + stderr_parts: list[str] = [] start = time.monotonic() result: RuntimeResult | None = None for event in runtime.stream(command=command, env=env, timeout=timeout): if event.kind == "stdout": - stdout += event.text or "" - stdout_buffer += event.text or "" - while "\n" in stdout_buffer: - line, stdout_buffer = stdout_buffer.split("\n", 1) - line = line.strip() - if not line: - continue - yield parser(line) + chunk = event.text or "" + stdout_parts.append(chunk) + stdout_buffer_parts.append(chunk) + if "\n" in chunk: + yield from _parse_buffered_lines(stdout_buffer_parts, parser) elif event.kind == "stderr": - stderr += event.text or "" + chunk = event.text or "" + stderr_parts.append(chunk) if parse_stderr: - stderr_buffer += event.text or "" - while "\n" in stderr_buffer: - line, stderr_buffer = stderr_buffer.split("\n", 1) - line = line.strip() - if not line: - continue - yield parser(line) + stderr_buffer_parts.append(chunk) + if "\n" in chunk: + yield from _parse_buffered_lines( + stderr_buffer_parts, + parser, + ) elif event.kind == "finished": result = event.result break # Flush remaining stdout buffer - for line in stdout_buffer.split("\n"): - line = line.strip() - if not line: - continue - yield parser(line) + yield from _parse_buffered_lines(stdout_buffer_parts, parser, flush=True) # Flush remaining stderr buffer if parsing stderr if parse_stderr: - for line in stderr_buffer.split("\n"): - line = line.strip() - if not line: - continue - yield parser(line) + yield from _parse_buffered_lines( + stderr_buffer_parts, + parser, + flush=True, + ) if result is None: result = RuntimeResult( exit_code=0, - stdout=stdout, - stderr=stderr, + stdout="".join(stdout_parts), + stderr="".join(stderr_parts), setup_stdout="", setup_stderr="", elapsed=time.monotonic() - start, diff --git a/tests/agent_runner/agents/cli_utils_test.py b/tests/agent_runner/agents/cli_utils_test.py new file mode 100644 index 000000000..b181cd6c8 --- /dev/null +++ b/tests/agent_runner/agents/cli_utils_test.py @@ -0,0 +1,63 @@ +"""Tests for shared CLI agent utilities.""" + +from __future__ import annotations + +from collections.abc import Iterator +from typing import cast + +from slop_code.agent_runner.agents.cli_utils import stream_cli_command +from slop_code.common.llms import TokenUsage +from slop_code.execution.protocols import StreamingRuntime +from slop_code.execution.runtime import RuntimeEvent +from slop_code.execution.runtime import RuntimeResult + + +class FakeRuntime: + def __init__(self, events: list[RuntimeEvent]) -> None: + self.events = events + + def stream( + self, + command: str, + env: dict[str, str], + timeout: float | None, + ) -> Iterator[RuntimeEvent]: + yield from self.events + + +def _parse_line(line: str) -> tuple[float | None, TokenUsage | None, dict]: + return None, None, {"line": line} + + +def test_stream_cli_command_buffers_split_output() -> None: + runtime = cast( + "StreamingRuntime", + FakeRuntime( + [ + RuntimeEvent(kind="stdout", text='{"step"'), + RuntimeEvent(kind="stdout", text=": 1}\ntrail"), + RuntimeEvent(kind="stdout", text="ing"), + RuntimeEvent(kind="stderr", text='{"err"'), + RuntimeEvent(kind="stderr", text=": true}\n"), + ] + ), + ) + + items = list( + stream_cli_command( + runtime=runtime, + command="agent", + parser=_parse_line, + parse_stderr=True, + ) + ) + + assert items[:-1] == [ + (None, None, {"line": '{"step": 1}'}), + (None, None, {"line": '{"err": true}'}), + (None, None, {"line": "trailing"}), + ] + result = items[-1] + assert isinstance(result, RuntimeResult) + assert result.stdout == '{"step": 1}\ntrailing' + assert result.stderr == '{"err": true}\n'