Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions src/slop_code/agent_runner/agents/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,49 @@ def stream_cli_command(
parse_stderr: If True, also parse stderr lines through the parser
"""
env = dict(env or {})
stdout_buffer = ""
stderr_buffer = ""
stdout = stderr = ""
stdout_buffer_parts: list[str] = []
stderr_buffer_parts: list[str] = []
stdout_parts: list[str] = []
stderr_parts: list[str] = []
start = time.monotonic()
result: RuntimeResult | None = None

for event in runtime.stream(command=command, env=env, timeout=timeout):
if event.kind == "stdout":
stdout += event.text or ""
stdout_buffer += event.text or ""
while "\n" in stdout_buffer:
line, stdout_buffer = stdout_buffer.split("\n", 1)
line = line.strip()
if not line:
continue
yield parser(line)
elif event.kind == "stderr":
stderr += event.text or ""
if parse_stderr:
stderr_buffer += event.text or ""
while "\n" in stderr_buffer:
line, stderr_buffer = stderr_buffer.split("\n", 1)
chunk = event.text or ""
stdout_parts.append(chunk)
stdout_buffer_parts.append(chunk)
if "\n" in chunk:
stdout_buffer = "".join(stdout_buffer_parts)
while "\n" in stdout_buffer:
line, stdout_buffer = stdout_buffer.split("\n", 1)
line = line.strip()
if not line:
continue
yield parser(line)
stdout_buffer_parts = [stdout_buffer] if stdout_buffer else []
elif event.kind == "stderr":
chunk = event.text or ""
stderr_parts.append(chunk)
if parse_stderr:
stderr_buffer_parts.append(chunk)
if "\n" in chunk:
stderr_buffer = "".join(stderr_buffer_parts)
while "\n" in stderr_buffer:
line, stderr_buffer = stderr_buffer.split("\n", 1)
line = line.strip()
if not line:
continue
yield parser(line)
stderr_buffer_parts = (
[stderr_buffer] if stderr_buffer else []
)
elif event.kind == "finished":
result = event.result
break

# Flush remaining stdout buffer
stdout_buffer = "".join(stdout_buffer_parts)
for line in stdout_buffer.split("\n"):
line = line.strip()
if not line:
Expand All @@ -95,6 +107,7 @@ def stream_cli_command(

# Flush remaining stderr buffer if parsing stderr
if parse_stderr:
stderr_buffer = "".join(stderr_buffer_parts)
for line in stderr_buffer.split("\n"):
line = line.strip()
if not line:
Expand All @@ -104,8 +117,8 @@ def stream_cli_command(
if result is None:
result = RuntimeResult(
exit_code=0,
stdout=stdout,
stderr=stderr,
stdout="".join(stdout_parts),
stderr="".join(stderr_parts),
setup_stdout="",
setup_stderr="",
elapsed=time.monotonic() - start,
Expand Down
44 changes: 21 additions & 23 deletions src/slop_code/execution/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def process_stream(
tuple[Literal["stdout", "stderr", "finished"], str | None]
] = queue.Queue()
thread = start_stream_pump(stream, event_queue, stop_event)
stdout = ""
stderr = ""
stdout_parts: list[str] = []
stderr_parts: list[str] = []
setup_stdout = ""
setup_stderr = ""
yielding_stdout = yield_only_after is None
Expand All @@ -110,34 +110,32 @@ def handle_event(
kind: Literal["stdout", "stderr"],
payload: str,
) -> Iterator[RuntimeEvent]:
nonlocal stdout, stderr, setup_stdout, setup_stderr
nonlocal stdout_parts, stderr_parts, setup_stdout, setup_stderr
nonlocal yielding_stdout, yielding_stderr

if kind == "stdout":
stdout += payload
if (
not yielding_stdout
and yield_only_after
and yield_only_after in stdout
):
yielding_stdout = True
setup_stdout, stdout = stdout.split(yield_only_after, 1)
payload = stdout
stdout_parts.append(payload)
if not yielding_stdout and yield_only_after:
stdout = "".join(stdout_parts)
if yield_only_after in stdout:
yielding_stdout = True
setup_stdout, stdout = stdout.split(yield_only_after, 1)
stdout_parts = [stdout]
payload = stdout

if yielding_stdout and payload.strip():
yield RuntimeEvent(kind="stdout", text=payload)
return

if kind == "stderr":
stderr += payload
if (
not yielding_stderr
and yield_only_after
and yield_only_after in stderr
):
yielding_stderr = True
setup_stderr, stderr = stderr.split(yield_only_after, 1)
payload = stderr
stderr_parts.append(payload)
if not yielding_stderr and yield_only_after:
stderr = "".join(stderr_parts)
if yield_only_after in stderr:
yielding_stderr = True
setup_stderr, stderr = stderr.split(yield_only_after, 1)
stderr_parts = [stderr]
payload = stderr

if yielding_stderr and payload.strip():
yield RuntimeEvent(kind="stderr", text=payload)
Expand Down Expand Up @@ -196,8 +194,8 @@ def handle_event(
)
return RuntimeResult(
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
stdout="".join(stdout_parts),
stderr="".join(stderr_parts),
setup_stdout=setup_stdout,
setup_stderr=setup_stderr,
elapsed=elapsed,
Expand Down
72 changes: 72 additions & 0 deletions tests/agent_runner/agents/cli_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Tests for shared CLI agent helpers."""

from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING, cast

from slop_code.agent_runner.agents.cli_utils import stream_cli_command
from slop_code.common.llms import TokenUsage
from slop_code.execution.runtime import RuntimeEvent
from slop_code.execution.runtime import RuntimeResult

if TYPE_CHECKING:
from slop_code.execution.protocols import StreamingRuntime


class FakeRuntime:
def __init__(self, events: list[RuntimeEvent]) -> None:
self.events = events

def stream(
self,
command: str,
env: dict[str, str],
timeout: float | None,
) -> Iterator[RuntimeEvent]:
yield from self.events


def _parse_line(line: str) -> tuple[float | None, TokenUsage | None, dict]:
return None, None, {"line": line}


def test_stream_cli_command_parses_lines_across_chunk_boundaries() -> None:
result = RuntimeResult(
exit_code=0,
stdout='{"step": 1}\n{"step": 2}\n',
stderr='{"err": true}\n',
setup_stdout="",
setup_stderr="",
elapsed=0.1,
timed_out=False,
)
runtime = cast(
"StreamingRuntime",
FakeRuntime(
[
RuntimeEvent(kind="stdout", text='{"step"'),
RuntimeEvent(kind="stdout", text=': 1}\n{"step":'),
RuntimeEvent(kind="stdout", text=" 2}\n"),
RuntimeEvent(kind="stderr", text='{"err"'),
RuntimeEvent(kind="stderr", text=": true}\n"),
RuntimeEvent(kind="finished", result=result),
]
),
)

items = list(
stream_cli_command(
runtime=runtime,
command="agent --json",
parser=_parse_line,
parse_stderr=True,
)
)

assert items == [
(None, None, {"line": '{"step": 1}'}),
(None, None, {"line": '{"step": 2}'}),
(None, None, {"line": '{"err": true}'}),
result,
]
57 changes: 57 additions & 0 deletions tests/execution/stream_processor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,68 @@

from __future__ import annotations

from collections.abc import Iterator

from slop_code.execution.runtime import RuntimeEvent
from slop_code.execution.runtime import RuntimeResult
from slop_code.execution.shared import SPLIT_STRING
from slop_code.execution.stream_processor import ensure_string
from slop_code.execution.stream_processor import process_stream


def _collect_process_stream(
chunks: Iterator[tuple[str, str]],
*,
yield_only_after: str | None = None,
) -> tuple[list[RuntimeEvent], RuntimeResult]:
events: list[RuntimeEvent] = []
generator = process_stream(
chunks,
timeout=1,
poll_fn=lambda: None,
yield_only_after=yield_only_after,
)

while True:
try:
events.append(next(generator))
except StopIteration as stop:
return events, stop.value


def test_ensure_string_preserves_text_around_invalid_utf8_bytes() -> None:
decoded = ensure_string(b'{"type":"message_update","data":"ok"}\xff\n')

assert '{"type":"message_update","data":"ok"}' in decoded
assert decoded.endswith("\n")


def test_process_stream_splits_setup_marker_across_chunks() -> None:
marker_start, marker_end = SPLIT_STRING[:10], SPLIT_STRING[10:]

events, result = _collect_process_stream(
iter(
[
("setup stdout\n", ""),
(marker_start, ""),
(marker_end, ""),
("\ncommand stdout", ""),
(" continued", ""),
("", "setup stderr\n"),
("", marker_start),
("", marker_end),
("", "\ncommand stderr"),
]
),
yield_only_after=SPLIT_STRING,
)

assert [(event.kind, event.text) for event in events] == [
("stdout", "\ncommand stdout"),
("stdout", " continued"),
("stderr", "\ncommand stderr"),
]
assert result.setup_stdout == "setup stdout\n"
assert result.stdout == "\ncommand stdout continued"
assert result.setup_stderr == "setup stderr\n"
assert result.stderr == "\ncommand stderr"