Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,11 @@ Full schema:
| `response_field` | no | HTTP / invocations only. Dot-path to extract the response text (default: `text`). |
| `tool_calls_field` | no | HTTP / invocations only. Dot-path to extract tool calls for agent-workflow evaluators. |
| `headers` | no | HTTP / invocations only. Static extra HTTP headers. |
| `auth_header_env` | no | HTTP / invocations only. Environment variable that holds a Bearer token. |
| `auth_header_env` | no | HTTP / invocations only. Environment variable that holds the auth token. |
| `auth_header_name` | no | HTTP / invocations only. Header name for the auth token (default: `Authorization`). |
| `auth_value_template` | no | HTTP / invocations only. Template for the auth header value; `{token}` is replaced by the `auth_header_env` value (default: `Bearer {token}`). |
| `response_mode` | no | HTTP / invocations only. `json` (default, single `json.loads` of the body), `sse` (parse `data:` lines), or `text` (concatenate raw streamed text). |
| `stream` | no | HTTP / invocations only. Streaming aggregation block for `response_mode: sse|text`: `text_field` (dot-path to token text in JSON `data:` lines), `done_marker` (stop token, e.g. `[DONE]`), `strip_leading_token` (drop the first whitespace-delimited token, e.g. a `conversation_id` prefix). |
| `assert_path` | no | Optional ASSERT policy/results file or directory referenced by Doctor/evidence. AgentOps does not execute ASSERT. |
| `acs_path` | no | Optional Agent Control Specification contract file or directory referenced by Doctor/evidence. AgentOps does not apply ACS controls. |
| `redteam_path` | no | Optional red-team plan/results evidence path. AgentOps records metadata and never exposes payload text. |
Expand Down
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ This format follows [Keep a Changelog](https://keepachangelog.com/) and adheres

## [Unreleased]

## [0.4.4] - 2026-06-18

### Added
- **Streaming HTTP targets.** The `http_json` target now understands streaming
responses so AgentOps can evaluate SSE/streaming agents (such as the
gpt-rag-orchestrator `/orchestrator` endpoint) directly, without a manual
adapter. A new `response_mode: json|sse|text` field selects the response
parser (`json` is the default and preserves the existing single-`json.loads`
behavior exactly). For `sse`/`text`, an optional `stream` block configures
aggregation: `text_field` (dotted path to the token text when each SSE
`data:` line is JSON), `done_marker` (stop token, e.g. `[DONE]`), and
`strip_leading_token` (drop the leading whitespace-delimited token, e.g. the
orchestrator's `conversation_id` prefix). The auth header is now configurable
via `auth_header_name` (default `Authorization`) and `auth_value_template`
(default `Bearer {token}`, where `{token}` is replaced by the
`auth_header_env` value), so targets gated by a shared secret such as
`X-API-KEY` are supported without hardcoding the secret in `agentops.yaml`.
Streaming uses the same stdlib (`urllib`) transport and 3-try backoff as the
JSON path. When a JSON parse fails on a `text/event-stream` response, the
error now suggests setting `response_mode: sse|text`.

## [0.4.3] - 2026-06-17

### Added
Expand Down
9 changes: 8 additions & 1 deletion plugins/agentops/skills/agentops-eval/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,11 @@ dataset reference.
```
- For HTTP/JSON agents that need auth, set
`auth_header_env: MY_TOKEN_VAR` and AgentOps adds
`Authorization: Bearer $MY_TOKEN_VAR`.
`Authorization: Bearer $MY_TOKEN_VAR`. For a shared-secret gate, override the
header with `auth_header_name: X-API-KEY` and `auth_value_template: "{token}"`.
- For streaming HTTP agents (e.g. an SSE `text/event-stream` endpoint), set
`response_mode: sse` (each `data:` line) or `response_mode: text` (raw
streamed text). Use the optional `stream:` block to tune aggregation:
`text_field` (dot-path to the token text when `data:` lines are JSON),
`done_marker` (e.g. `[DONE]`), and `strip_leading_token: true` (drop a leading
`conversation_id` prefix). `response_mode: json` (default) is unchanged.
98 changes: 95 additions & 3 deletions src/agentops/core/agentops_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
#: Wire protocol for hosted / HTTP targets.
Protocol = Literal["responses", "invocations", "http-json"]

#: How AgentOps reads an ``http-json`` response body.
#:
#: - ``json`` (default): parse a single JSON document. Preserves the exact
#: behavior of every existing ``http_json`` config.
#: - ``sse``: parse a Server-Sent Events body, concatenating ``data:`` lines.
#: - ``text``: concatenate a raw streamed text body.
ResponseMode = Literal["json", "sse", "text"]

#: How thresholds compare against measured metric values.
Criteria = Literal[">=", ">", "<=", "<", "==", "true", "false"]

Expand Down Expand Up @@ -540,6 +548,41 @@ class RedTeamRunConfig(BaseModel):
model_config = ConfigDict(extra="forbid")


class StreamConfig(BaseModel):
"""Streaming aggregation options for ``http-json`` targets.

Only meaningful when ``response_mode`` is ``"sse"`` or ``"text"``. These
fields control how a streamed response body is parsed and reassembled into
a single answer string.
"""

text_field: Optional[str] = Field(
None,
description=(
"For SSE where each 'data:' line is a JSON object, the dotted path "
"to the token text (e.g. 'delta.content'). Omit (None) when each "
"'data:' line is already raw text."
),
)
done_marker: Optional[str] = Field(
None,
description=(
"Optional sentinel (e.g. '[DONE]'); aggregation stops when a "
"'data:' line equals this value."
),
)
strip_leading_token: bool = Field(
False,
description=(
"Drop the first whitespace-delimited token from the aggregated "
"text. Use for endpoints that prefix the stream with an id (e.g. "
"the gpt-rag orchestrator emits '<conversation_id> ' first)."
),
)

model_config = ConfigDict(extra="forbid")


# ---------------------------------------------------------------------------
# Top-level config
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -600,7 +643,20 @@ class AgentOpsConfig(BaseModel):

``headers`` / ``auth_header_env``
Optional HTTP request configuration for ``http-json`` and
``invocations`` targets.
``invocations`` targets. ``auth_header_env`` names the environment
variable that holds the secret.

``response_mode`` / ``stream``
``http-json`` only. ``response_mode`` selects how the response body is
read: ``"json"`` (default, single JSON document), ``"sse"`` (Server-Sent
Events), or ``"text"`` (raw streamed text). The ``stream`` block tunes
SSE/text aggregation (``text_field``, ``done_marker``,
``strip_leading_token``).

``auth_header_name`` / ``auth_value_template``
``http-json`` only. Decouple the auth header from the default
``Authorization: Bearer {token}``. For a shared-secret endpoint set
``auth_header_name: X-API-KEY`` and ``auth_value_template: "{token}"``.

``evaluators``
Optional escape hatch: explicit list of evaluator names that overrides
Expand Down Expand Up @@ -696,6 +752,37 @@ class AgentOpsConfig(BaseModel):
tool_calls_field: Optional[str] = None
headers: Dict[str, str] = Field(default_factory=dict)
auth_header_env: Optional[str] = None
response_mode: ResponseMode = Field(
"json",
description=(
"How to read the HTTP/JSON response body. 'json' (default) parses "
"a single JSON document and preserves existing behavior. 'sse' "
"parses Server-Sent Events 'data:' lines; 'text' concatenates a "
"raw streamed text body. Only valid for http-json targets."
),
)
stream: Optional[StreamConfig] = Field(
None,
description=(
"Streaming aggregation options, used only when response_mode is "
"'sse' or 'text'."
),
)
auth_header_name: Optional[str] = Field(
None,
description=(
"HTTP header that carries the secret read from auth_header_env. "
"Defaults to 'Authorization' when unset."
),
)
auth_value_template: Optional[str] = Field(
None,
description=(
"Template for the auth header value; '{token}' is replaced by the "
"auth_header_env value. Defaults to 'Bearer {token}' when unset. "
"Use '{token}' alone for a raw shared-secret header like X-API-KEY."
),
)

evaluators: Optional[List[EvaluatorOverride]] = None
rubrics: List[RubricConfig] = Field(
Expand Down Expand Up @@ -850,6 +937,10 @@ def _validate_protocol_compat(self) -> "AgentOpsConfig":
or self.tool_calls_field
or self.headers
or self.auth_header_env
or self.response_mode != "json"
or self.stream is not None
or self.auth_header_name
or self.auth_value_template
):
# Foundry hosted (responses/invocations) defines its own wire
# format. HTTP-only request/response shaping is invalid there.
Expand All @@ -859,8 +950,9 @@ def _validate_protocol_compat(self) -> "AgentOpsConfig":
else:
raise ValueError(
"request_field / response_field / tool_calls_field / "
"headers / auth_header_env are only valid for HTTP/JSON "
"or Foundry hosted (invocations) targets"
"headers / auth_header_env / response_mode / stream / "
"auth_header_name / auth_value_template are only valid for "
"HTTP/JSON or Foundry hosted (invocations) targets"
)
return self

Expand Down
170 changes: 162 additions & 8 deletions src/agentops/pipeline/invocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,130 @@ def _http_request_json(
return json.loads(payload)


def _http_request_stream(
*,
method: str,
url: str,
headers: Dict[str, str],
body: Optional[Dict[str, Any]] = None,
timeout: float,
) -> str:
"""POST and return the full streamed response body as decoded text.

Uses the same 3-try backoff policy and tenant-mismatch guidance as
:func:`_http_request_json`. The response is read to completion (streamed
endpoints used for evaluation emit a bounded answer) and returned verbatim
so :func:`_aggregate_stream` can reassemble it. stdlib ``urllib`` only.
"""
encoded = json.dumps(body or {}).encode("utf-8") if method != "GET" else None
request = urllib.request.Request(
url=url, data=encoded, method=method, headers=headers
)
last_exc: Optional[BaseException] = None
for attempt in range(1, 4):
try:
with urllib.request.urlopen(request, timeout=timeout) as response: # noqa: S310
# HTTPResponse is iterable line-by-line; joining restores the
# full body (including newlines) for both sse and text modes.
chunks = [line.decode("utf-8", errors="replace") for line in response]
return "".join(chunks)
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace") if exc.fp else ""
transient = exc.code >= 500 or exc.code == 429
if transient and attempt < 3:
time.sleep(2 ** attempt)
last_exc = exc
continue
message = f"HTTP {exc.code} from {url}: {detail or exc.reason}"
raise RuntimeError(with_tenant_mismatch_guidance(message)) from exc
except urllib.error.URLError as exc:
if attempt < 3:
time.sleep(2 ** attempt)
last_exc = exc
continue
raise
else: # pragma: no cover - loop exits via break/raise
raise RuntimeError(f"HTTP request to {url} failed: {last_exc!r}")


def _strip_leading_token(text: str) -> str:
"""Drop the first whitespace-delimited token from ``text``.

Used to remove the conversation-id prefix the gpt-rag orchestrator emits
as its first streamed chunk (``"<conversation_id> <answer...>"``).
"""
stripped = text.strip()
parts = stripped.split(None, 1)
return parts[1] if len(parts) == 2 else ""


def _aggregate_stream(
response_mode: str,
body: str,
stream_cfg: Optional[Any],
) -> str:
"""Reassemble a streamed response body into a single answer string.

``text`` mode concatenates the whole body. ``sse`` mode parses ``data:``
lines, optionally JSON-decoding each line and extracting ``text_field``,
stopping at ``done_marker``, and raising on an ``event: error`` frame.
"""
strip_leading = bool(getattr(stream_cfg, "strip_leading_token", False))

if response_mode == "text":
text = body or ""
return _strip_leading_token(text) if strip_leading else text

# SSE mode.
text_field = getattr(stream_cfg, "text_field", None)
done_marker = getattr(stream_cfg, "done_marker", None)

pieces: List[str] = []
saw_error = False
for raw_line in (body or "").splitlines():
line = raw_line.rstrip("\r")
stripped = line.strip()
if not stripped:
# Blank line closes the current SSE frame.
saw_error = False
continue
if stripped.startswith("event:"):
if stripped[len("event:"):].strip() == "error":
saw_error = True
continue
if not stripped.startswith("data:"):
# Ignore id:/retry:/comment lines.
continue
data = line.split("data:", 1)[1]
if data.startswith(" "):
# SSE strips a single leading space after the field colon.
data = data[1:]
if done_marker is not None and data == done_marker:
break
if saw_error:
raise RuntimeError(
f"streaming endpoint returned an error event: {data}"
)
if text_field:
try:
parsed = json.loads(data)
except json.JSONDecodeError:
pieces.append(data)
continue
token = _dot_path(parsed, text_field)
if token is None:
continue
pieces.append(
token if isinstance(token, str)
else json.dumps(token, ensure_ascii=False)
)
else:
pieces.append(data)

text = "".join(pieces)
return _strip_leading_token(text) if strip_leading else text


def _dot_path(payload: Any, path: str) -> Any:
"""Resolve ``a.b.c`` or ``a.0.b`` against a JSON-like object."""
current = payload
Expand Down Expand Up @@ -468,19 +592,49 @@ def _invoke_http_json(
f"auth_header_env {config.auth_header_env!r} is set in config but "
"the environment variable is empty"
)
headers["Authorization"] = f"Bearer {token}"
# Default to today's behavior (Authorization: Bearer <token>) so
# existing configs are byte-for-byte unchanged; allow an arbitrary
# header name/value template for shared-secret gates (e.g. X-API-KEY).
header_name = config.auth_header_name or "Authorization"
value_template = config.auth_value_template or "Bearer {token}"
headers[header_name] = value_template.replace("{token}", token)

request_field = config.request_field or "message"
body: Dict[str, Any] = {request_field: _row_input(row)}

if config.response_mode in ("sse", "text"):
started = time.perf_counter()
raw_body = _http_request_stream(
method="POST",
url=target.url,
headers=headers,
body=body,
timeout=timeout,
)
elapsed = time.perf_counter() - started
aggregated = _aggregate_stream(config.response_mode, raw_body, config.stream)
return InvocationResult(
response=aggregated.strip(),
latency_seconds=elapsed,
tool_calls=None,
)

started = time.perf_counter()
payload = _http_request_json(
method="POST",
url=target.url,
headers=headers,
body=body,
timeout=timeout,
)
try:
payload = _http_request_json(
method="POST",
url=target.url,
headers=headers,
body=body,
timeout=timeout,
)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"HTTP/JSON response from {target.url} was not valid JSON. If this "
"endpoint streams Server-Sent Events or raw text (for example "
"Content-Type: text/event-stream), set response_mode: sse or "
"response_mode: text in agentops.yaml."
) from exc
elapsed = time.perf_counter() - started

response_path = config.response_field or "text"
Expand Down
9 changes: 8 additions & 1 deletion src/agentops/templates/skills/agentops-eval/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,11 @@ dataset reference.
```
- For HTTP/JSON agents that need auth, set
`auth_header_env: MY_TOKEN_VAR` and AgentOps adds
`Authorization: Bearer $MY_TOKEN_VAR`.
`Authorization: Bearer $MY_TOKEN_VAR`. For a shared-secret gate, override the
header with `auth_header_name: X-API-KEY` and `auth_value_template: "{token}"`.
- For streaming HTTP agents (e.g. an SSE `text/event-stream` endpoint), set
`response_mode: sse` (each `data:` line) or `response_mode: text` (raw
streamed text). Use the optional `stream:` block to tune aggregation:
`text_field` (dot-path to the token text when `data:` lines are JSON),
`done_marker` (e.g. `[DONE]`), and `strip_leading_token: true` (drop a leading
`conversation_id` prefix). `response_mode: json` (default) is unchanged.
Loading
Loading