Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 183 additions & 21 deletions plugins/codex/scripts/codex-companion.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
import {
buildSingleJobSnapshot,
buildStatusSnapshot,
isStreamableProgressLine,
readStoredJob,
resolveCancelableJob,
resolveResultJob,
Expand Down Expand Up @@ -64,8 +65,14 @@ import {

const ROOT_DIR = path.resolve(fileURLToPath(new URL("..", import.meta.url)));
const REVIEW_SCHEMA = path.join(ROOT_DIR, "schemas", "review-output.schema.json");
const DEFAULT_STATUS_WAIT_TIMEOUT_MS = 240000;
const DEFAULT_STATUS_WAIT_TIMEOUT_MS = 1800000;
const DEFAULT_STATUS_POLL_INTERVAL_MS = 2000;
// A foreground `task` run observes a detached worker rather than running the
// Codex turn inline (issue #370). The observer is killed by Claude Code's
// 10-minute Bash ceiling on long turns, so this timeout is only a backstop for
// direct CLI use; the worker survives the ceiling regardless.
const FOREGROUND_OBSERVE_TIMEOUT_MS = 1800000;
const FOREGROUND_OBSERVE_POLL_INTERVAL_MS = 250;
const VALID_REASONING_EFFORTS = new Set(["none", "minimal", "low", "medium", "high", "xhigh"]);
const MODEL_ALIASES = new Map([["spark", "gpt-5.3-codex-spark"]]);
const STOP_REVIEW_TASK_MARKER = "Run a stop-gate review of the previous Claude turn.";
Expand Down Expand Up @@ -651,22 +658,31 @@ function spawnDetachedTaskWorker(cwd, jobId) {
return child;
}

function enqueueBackgroundTask(cwd, job, request) {
function enqueueDetachedTask(cwd, job, request) {
const { logFile } = createTrackedProgress(job);
appendLogLine(logFile, "Queued for background execution.");
appendLogLine(logFile, "Queued for execution.");

const child = spawnDetachedTaskWorker(cwd, job.id);
const queuedRecord = {
...job,
status: "queued",
phase: "queued",
pid: child.pid ?? null,
pid: null,
logFile,
request
};
// Persist the full record (including the request payload) BEFORE spawning so
// the detached worker always finds it when it boots and reads the job file.
writeJobFile(job.workspaceRoot, job.id, queuedRecord);
upsertJob(job.workspaceRoot, queuedRecord);

const child = spawnDetachedTaskWorker(cwd, job.id);
if (child.pid != null) {
// Record the worker pid so /codex:cancel can reach a job that has not
// transitioned to "running" yet. child.pid is the worker's own process.pid,
// so this only patches pid and never clobbers the worker's status writes.
upsertJob(job.workspaceRoot, { id: job.id, pid: child.pid });
}

return {
payload: {
jobId: job.id,
Expand All @@ -679,6 +695,149 @@ function enqueueBackgroundTask(cwd, job, request) {
};
}

function ensureTrailingNewline(text) {
const value = String(text ?? "");
return value.endsWith("\n") ? value : `${value}\n`;
}

function streamJobLogTail(logFile, fromOffset) {
if (!logFile || !fs.existsSync(logFile)) {
return fromOffset;
}
let stat;
try {
stat = fs.statSync(logFile);
} catch {
return fromOffset;
}
if (stat.size <= fromOffset) {
return fromOffset;
}
let fd = null;
try {
fd = fs.openSync(logFile, "r");
const length = stat.size - fromOffset;
const buffer = Buffer.alloc(length);
fs.readSync(fd, buffer, 0, length, fromOffset);
const chunk = buffer.toString("utf8");
// Only consume complete lines; leave any partial trailing line for the next
// poll so a line is never split or filtered on incomplete content.
const lastNewline = chunk.lastIndexOf("\n");
if (lastNewline === -1) {
return fromOffset;
}
const consumed = chunk.slice(0, lastNewline + 1);
// Echo only progress lines — never the persisted block bodies (assistant
// message, Final output, reasoning), which are rendered on stdout instead.
// Streaming them here would duplicate Codex's answer onto stderr (#372).
const progress = consumed
.split("\n")
.filter((line) => isStreamableProgressLine(line))
.map((line) => `${line}\n`)
.join("");
if (progress) {
process.stderr.write(progress);
}
return fromOffset + Buffer.byteLength(consumed, "utf8");
} catch {
return fromOffset;
} finally {
if (fd != null) {
try {
fs.closeSync(fd);
} catch {
// Ignore close failures while tailing.
}
}
}
}

// Observe a task whose Codex turn runs in a detached worker (spawnDetachedTaskWorker).
// The worker runs in its own session, so it survives Claude Code's 10-minute Bash
// ceiling; this foreground observer streams the worker's log and waits for it to
// finish, preserving live output and the synchronous stdout the rescue subagent and
// stop hook expect. If the harness kills this observer at the ceiling, the worker
// keeps running, records completion, and the result stays retrievable via
// `/codex:result <jobId>` instead of dying silently (issue #370).
async function observeDetachedTask(cwd, job, options = {}) {
const asJson = Boolean(options.json);
const jobId = job.id;
const workspaceRoot = job.workspaceRoot;
const logFile = job.logFile ?? null;

if (!asJson) {
process.stderr.write(
`[codex] ${job.title ?? "Codex task"} dispatched as ${jobId}. ` +
"Streaming progress below; if this run is interrupted before it finishes, " +
`retrieve the result later with \`/codex:result ${jobId}\`.\n`
);
}

const deadline = Date.now() + FOREGROUND_OBSERVE_TIMEOUT_MS;
let logOffset = asJson ? 0 : streamJobLogTail(logFile, 0);
let snapshot = buildSingleJobSnapshot(cwd, jobId);

while (isActiveJobStatus(snapshot.job.status) && Date.now() < deadline) {
await sleep(Math.min(FOREGROUND_OBSERVE_POLL_INTERVAL_MS, Math.max(0, deadline - Date.now())));
if (!asJson) {
logOffset = streamJobLogTail(logFile, logOffset);
}
snapshot = buildSingleJobSnapshot(cwd, jobId);
}
if (!asJson) {
streamJobLogTail(logFile, logOffset);
}

if (isActiveJobStatus(snapshot.job.status)) {
// The observer gave up waiting, but the detached worker is still running.
if (asJson) {
outputResult({ jobId, status: snapshot.job.status, waitTimedOut: true, job: snapshot.job }, true);
} else {
process.stdout.write(
`Codex job ${jobId} is still running. Retrieve the result later with \`/codex:result ${jobId}\`.\n`
);
}
return snapshot;
}

const status = snapshot.job.status;
const storedJob = readStoredJob(workspaceRoot, jobId);
const renderedText =
storedJob && typeof storedJob.rendered === "string" && storedJob.rendered.length > 0
? storedJob.rendered
: null;

if (status === "completed") {
if (asJson) {
outputResult(storedJob?.result ?? { job: snapshot.job }, true);
} else {
process.stdout.write(ensureTrailingNewline(renderedText ?? renderStoredJobResult(snapshot.job, storedJob)));
}
return snapshot;
}

// Failed or cancelled.
if (renderedText) {
// Codex produced output but reported a non-zero status — surface it exactly
// as the old inline foreground path did.
if (asJson) {
outputResult(storedJob?.result ?? { job: snapshot.job }, true);
} else {
process.stdout.write(ensureTrailingNewline(renderedText));
}
} else {
// The turn failed before producing output — mirror the inline `main().catch`
// behaviour and report the error on stderr with a non-zero exit code.
const message =
storedJob?.errorMessage ?? snapshot.job.errorMessage ?? `Codex job ${jobId} ${status} before producing output.`;
process.stderr.write(`${message}\n`);
}

const exitStatus = Number(storedJob?.result?.status);
process.exitCode = Number.isFinite(exitStatus) && exitStatus !== 0 ? exitStatus : 1;
return snapshot;
}

async function handleReviewCommand(argv, config) {
const { options, positionals } = parseCommandInput(argv, {
valueOptions: ["base", "scope", "model", "cwd"],
Expand Down Expand Up @@ -769,27 +928,30 @@ async function handleTask(argv) {
resumeLast,
jobId: job.id
});
const { payload } = enqueueBackgroundTask(cwd, job, request);
const { payload } = enqueueDetachedTask(cwd, job, request);
outputCommandResult(payload, renderQueuedTaskLaunch(payload), options.json);
return;
}

// Foreground (default): dispatch to the same detached worker the background
// path uses, then observe it. The worker survives Claude Code's 10-minute Bash
// ceiling, so a long turn no longer dies silently mid-run — it finishes in the
// worker and stays retrievable via /codex:result (issue #370).
ensureCodexAvailable(cwd);
requireTaskRequest(prompt, resumeLast);

const job = buildTaskJob(workspaceRoot, taskMetadata, write);
await runForegroundCommand(
job,
(progress) =>
executeTaskRun({
cwd,
model,
effort,
prompt,
write,
resumeLast,
jobId: job.id,
onProgress: progress
}),
{ json: options.json }
);
const request = buildTaskRequest({
cwd,
model,
effort,
prompt,
write,
resumeLast,
jobId: job.id
});
const { logFile } = enqueueDetachedTask(cwd, job, request);
await observeDetachedTask(cwd, { ...job, logFile }, { json: options.json });
Comment on lines +953 to +954

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve subagent labels when detaching foreground tasks

Because foreground tasks now always enter the detached-worker path, tasks that spawn Codex subagents can lose the thread/started metadata before the worker's turn id is known; the job log then records raw ids like thr_2 instead of the subagent nickname (design-challenger). I verified this with npm test -- --test-reporter=spec: tests/runtime.test.mjs's subagent-log test fails, and users lose named subagent context in foreground task logs/status for this scenario. Please preserve/apply buffered thread metadata when moving foreground tasks through the detached worker.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this — I tried to reproduce it and couldn't, so I want to share what I found before changing anything.

The subagent-label test passes for me both in isolation and in the full suite (8/8 runs), and also in the delicate path you'd expect to be at risk — a warm shared-broker + detached-worker + subagent run — where labels resolve to design-challenger with zero raw thr_* ids (3/3 runs).

This PR doesn't touch the subagent-label path in codex.mjs: the foreground task now runs the same captureTurn code the inline path already used (it only moved process), and buffered notifications are drained in arrival order (captureTurn), so the subagent's thread/started registers the nickname before the collabAgentToolCall item reads it via labelForThread. So a label would only fall back to the raw id if the app-server emitted the collab item before the subagent thread/started — which would affect the inline path identically and isn't changed here.

If you have the failing run's output or a seed, I'm glad to dig in — but with npm test green here I'd rather not add speculative handling to that path without a reproducer.

}

async function handleTaskWorker(argv) {
Expand Down
20 changes: 20 additions & 0 deletions plugins/codex/scripts/lib/job-control.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ function isProgressBlockTitle(line) {
);
}

// A progress entry is written by appendLogLine as `[<ISO-8601>] message`, where
// the timestamp comes from nowIso() (`new Date().toISOString()`). Match that
// exact prefix — not a bare `[` — so block bodies that merely *start* with a
// bracket (a markdown reference like `[1] ...`, a `[P2] ...` line, or a JSON
// array in Codex's answer) are not mistaken for progress and streamed (#372).
const LOG_TIMESTAMP_PREFIX = /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z\]\s/;

// True for a raw log line that is a streamable progress message — i.e. a
// timestamped entry that is not a block title. Block bodies (the assistant
// message, the final output, reasoning, etc.) are written as continuation lines
// without the timestamp prefix and return false, so callers tailing the log for
// live progress do not echo the full result (which is rendered on stdout).
export function isStreamableProgressLine(line) {
if (typeof line !== "string" || !LOG_TIMESTAMP_PREFIX.test(line)) {
return false;
}
const stripped = stripLogPrefix(line);
return Boolean(stripped) && !isProgressBlockTitle(stripped);
}

export function readJobProgressPreview(logFile, maxLines = DEFAULT_MAX_PROGRESS_LINES) {
if (!logFile || !fs.existsSync(logFile)) {
return [];
Expand Down
41 changes: 41 additions & 0 deletions tests/job-control.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import test from "node:test";
import assert from "node:assert/strict";

import { isStreamableProgressLine } from "../plugins/codex/scripts/lib/job-control.mjs";

// Regression for #372: the foreground task observer tails the job log to stderr
// for live progress. It must echo only progress lines, never the persisted
// block bodies (assistant message, Final output, reasoning), or it duplicates
// Codex's answer onto stderr alongside the rendered stdout result.
test("isStreamableProgressLine keeps progress lines and drops block titles/bodies", () => {
// Timestamped progress lines are streamable.
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:39.925Z] Starting Codex Task."), true);
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:42.000Z] Turn completed."), true);
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Assistant message captured: OK"), true);

// Block title lines (their bodies follow on stdout) are not streamable.
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Final output"), false);
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Assistant message"), false);
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Reasoning summary"), false);
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Review output"), false);
assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Subagent design-challenger message"), false);
assert.equal(
isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Subagent design-challenger reasoning summary"),
false
);

// Unprefixed block-body / continuation lines and blanks are not streamable.
assert.equal(isStreamableProgressLine("OK"), false);
assert.equal(isStreamableProgressLine("the full assistant answer body line"), false);
assert.equal(isStreamableProgressLine(""), false);
assert.equal(isStreamableProgressLine(null), false);

// Block-body lines that merely START with a bracket must NOT be streamed —
// only a real ISO-8601 timestamp prefix counts as a progress entry (#372).
assert.equal(isStreamableProgressLine("[1] https://example.com a markdown reference"), false);
assert.equal(isStreamableProgressLine("[P2] a finding Codex wrote in its answer"), false);
assert.equal(isStreamableProgressLine('["a", "b", "c"]'), false);
assert.equal(isStreamableProgressLine("[TODO] fix the empty-state guard"), false);
// A non-Z / non-timestamp bracketed prefix is still not a progress entry.
assert.equal(isStreamableProgressLine("[2026-06-13] partial date only"), false);
});
Loading