Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions src/slop_code/agent_runner/agents/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
"stream_cli_command",
]

type ParsedCliLine = tuple[float | None, TokenUsage | None, dict]
type CliLineParser = Callable[[str], ParsedCliLine]


@dataclass(slots=True)
class AgentCommandResult:
Expand All @@ -32,16 +35,36 @@ class AgentCommandResult:
error_message: str | None = None


def _parse_buffered_lines(
buffer_parts: list[str],
parser: CliLineParser,
*,
flush: bool = False,
) -> Generator[ParsedCliLine, None, None]:
buffer = "".join(buffer_parts)
if flush:
lines = buffer.split("\n")
buffer_parts.clear()
else:
*lines, tail = buffer.split("\n")
buffer_parts[:] = [tail] if tail else []

for line in lines:
line = line.strip()
if line:
yield parser(line)


def stream_cli_command(
runtime: StreamingRuntime,
command: str,
parser: Callable[[str], tuple[float | None, TokenUsage | None, dict]],
parser: CliLineParser,
env: Mapping[str, str] | None = None,
timeout: float | None = None,
*,
parse_stderr: bool = False,
) -> Generator[
tuple[float | None, TokenUsage | None, dict] | RuntimeResult | None,
ParsedCliLine | RuntimeResult | None,
None,
None,
]:
Expand All @@ -56,56 +79,50 @@ def stream_cli_command(
parse_stderr: If True, also parse stderr lines through the parser
"""
env = dict(env or {})
stdout_buffer = ""
stderr_buffer = ""
stdout = stderr = ""
stdout_buffer_parts: list[str] = []
stderr_buffer_parts: list[str] = []
stdout_parts: list[str] = []
stderr_parts: list[str] = []
start = time.monotonic()
result: RuntimeResult | None = None

for event in runtime.stream(command=command, env=env, timeout=timeout):
if event.kind == "stdout":
stdout += event.text or ""
stdout_buffer += event.text or ""
while "\n" in stdout_buffer:
line, stdout_buffer = stdout_buffer.split("\n", 1)
line = line.strip()
if not line:
continue
yield parser(line)
chunk = event.text or ""
stdout_parts.append(chunk)
stdout_buffer_parts.append(chunk)
if "\n" in chunk:
yield from _parse_buffered_lines(stdout_buffer_parts, parser)
elif event.kind == "stderr":
stderr += event.text or ""
chunk = event.text or ""
stderr_parts.append(chunk)
if parse_stderr:
stderr_buffer += event.text or ""
while "\n" in stderr_buffer:
line, stderr_buffer = stderr_buffer.split("\n", 1)
line = line.strip()
if not line:
continue
yield parser(line)
stderr_buffer_parts.append(chunk)
if "\n" in chunk:
yield from _parse_buffered_lines(
stderr_buffer_parts,
parser,
)
elif event.kind == "finished":
result = event.result
break

# Flush remaining stdout buffer
for line in stdout_buffer.split("\n"):
line = line.strip()
if not line:
continue
yield parser(line)
yield from _parse_buffered_lines(stdout_buffer_parts, parser, flush=True)

# Flush remaining stderr buffer if parsing stderr
if parse_stderr:
for line in stderr_buffer.split("\n"):
line = line.strip()
if not line:
continue
yield parser(line)
yield from _parse_buffered_lines(
stderr_buffer_parts,
parser,
flush=True,
)

if result is None:
result = RuntimeResult(
exit_code=0,
stdout=stdout,
stderr=stderr,
stdout="".join(stdout_parts),
stderr="".join(stderr_parts),
setup_stdout="",
setup_stderr="",
elapsed=time.monotonic() - start,
Expand Down
63 changes: 63 additions & 0 deletions tests/agent_runner/agents/cli_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Tests for shared CLI agent utilities."""

from __future__ import annotations

from collections.abc import Iterator
from typing import cast

from slop_code.agent_runner.agents.cli_utils import stream_cli_command
from slop_code.common.llms import TokenUsage
from slop_code.execution.protocols import StreamingRuntime
from slop_code.execution.runtime import RuntimeEvent
from slop_code.execution.runtime import RuntimeResult


class FakeRuntime:
def __init__(self, events: list[RuntimeEvent]) -> None:
self.events = events

def stream(
self,
command: str,
env: dict[str, str],
timeout: float | None,
) -> Iterator[RuntimeEvent]:
yield from self.events


def _parse_line(line: str) -> tuple[float | None, TokenUsage | None, dict]:
return None, None, {"line": line}


def test_stream_cli_command_buffers_split_output() -> None:
runtime = cast(
"StreamingRuntime",
FakeRuntime(
[
RuntimeEvent(kind="stdout", text='{"step"'),
RuntimeEvent(kind="stdout", text=": 1}\ntrail"),
RuntimeEvent(kind="stdout", text="ing"),
RuntimeEvent(kind="stderr", text='{"err"'),
RuntimeEvent(kind="stderr", text=": true}\n"),
]
),
)

items = list(
stream_cli_command(
runtime=runtime,
command="agent",
parser=_parse_line,
parse_stderr=True,
)
)

assert items[:-1] == [
(None, None, {"line": '{"step": 1}'}),
(None, None, {"line": '{"err": true}'}),
(None, None, {"line": "trailing"}),
]
result = items[-1]
assert isinstance(result, RuntimeResult)
assert result.stdout == '{"step": 1}\ntrailing'
assert result.stderr == '{"err": true}\n'