diff --git a/plugins/codex/scripts/codex-companion.mjs b/plugins/codex/scripts/codex-companion.mjs index 35222fd5..ee72dd67 100644 --- a/plugins/codex/scripts/codex-companion.mjs +++ b/plugins/codex/scripts/codex-companion.mjs @@ -35,6 +35,7 @@ import { import { buildSingleJobSnapshot, buildStatusSnapshot, + isStreamableProgressLine, readStoredJob, resolveCancelableJob, resolveResultJob, @@ -64,8 +65,14 @@ import { const ROOT_DIR = path.resolve(fileURLToPath(new URL("..", import.meta.url))); const REVIEW_SCHEMA = path.join(ROOT_DIR, "schemas", "review-output.schema.json"); -const DEFAULT_STATUS_WAIT_TIMEOUT_MS = 240000; +const DEFAULT_STATUS_WAIT_TIMEOUT_MS = 1800000; const DEFAULT_STATUS_POLL_INTERVAL_MS = 2000; +// A foreground `task` run observes a detached worker rather than running the +// Codex turn inline (issue #370). The observer is killed by Claude Code's +// 10-minute Bash ceiling on long turns, so this timeout is only a backstop for +// direct CLI use; the worker survives the ceiling regardless. +const FOREGROUND_OBSERVE_TIMEOUT_MS = 1800000; +const FOREGROUND_OBSERVE_POLL_INTERVAL_MS = 250; const VALID_REASONING_EFFORTS = new Set(["none", "minimal", "low", "medium", "high", "xhigh"]); const MODEL_ALIASES = new Map([["spark", "gpt-5.3-codex-spark"]]); const STOP_REVIEW_TASK_MARKER = "Run a stop-gate review of the previous Claude turn."; @@ -651,22 +658,31 @@ function spawnDetachedTaskWorker(cwd, jobId) { return child; } -function enqueueBackgroundTask(cwd, job, request) { +function enqueueDetachedTask(cwd, job, request) { const { logFile } = createTrackedProgress(job); - appendLogLine(logFile, "Queued for background execution."); + appendLogLine(logFile, "Queued for execution."); - const child = spawnDetachedTaskWorker(cwd, job.id); const queuedRecord = { ...job, status: "queued", phase: "queued", - pid: child.pid ?? null, + pid: null, logFile, request }; + // Persist the full record (including the request payload) BEFORE spawning so + // the detached worker always finds it when it boots and reads the job file. writeJobFile(job.workspaceRoot, job.id, queuedRecord); upsertJob(job.workspaceRoot, queuedRecord); + const child = spawnDetachedTaskWorker(cwd, job.id); + if (child.pid != null) { + // Record the worker pid so /codex:cancel can reach a job that has not + // transitioned to "running" yet. child.pid is the worker's own process.pid, + // so this only patches pid and never clobbers the worker's status writes. + upsertJob(job.workspaceRoot, { id: job.id, pid: child.pid }); + } + return { payload: { jobId: job.id, @@ -679,6 +695,149 @@ function enqueueBackgroundTask(cwd, job, request) { }; } +function ensureTrailingNewline(text) { + const value = String(text ?? ""); + return value.endsWith("\n") ? value : `${value}\n`; +} + +function streamJobLogTail(logFile, fromOffset) { + if (!logFile || !fs.existsSync(logFile)) { + return fromOffset; + } + let stat; + try { + stat = fs.statSync(logFile); + } catch { + return fromOffset; + } + if (stat.size <= fromOffset) { + return fromOffset; + } + let fd = null; + try { + fd = fs.openSync(logFile, "r"); + const length = stat.size - fromOffset; + const buffer = Buffer.alloc(length); + fs.readSync(fd, buffer, 0, length, fromOffset); + const chunk = buffer.toString("utf8"); + // Only consume complete lines; leave any partial trailing line for the next + // poll so a line is never split or filtered on incomplete content. + const lastNewline = chunk.lastIndexOf("\n"); + if (lastNewline === -1) { + return fromOffset; + } + const consumed = chunk.slice(0, lastNewline + 1); + // Echo only progress lines — never the persisted block bodies (assistant + // message, Final output, reasoning), which are rendered on stdout instead. + // Streaming them here would duplicate Codex's answer onto stderr (#372). + const progress = consumed + .split("\n") + .filter((line) => isStreamableProgressLine(line)) + .map((line) => `${line}\n`) + .join(""); + if (progress) { + process.stderr.write(progress); + } + return fromOffset + Buffer.byteLength(consumed, "utf8"); + } catch { + return fromOffset; + } finally { + if (fd != null) { + try { + fs.closeSync(fd); + } catch { + // Ignore close failures while tailing. + } + } + } +} + +// Observe a task whose Codex turn runs in a detached worker (spawnDetachedTaskWorker). +// The worker runs in its own session, so it survives Claude Code's 10-minute Bash +// ceiling; this foreground observer streams the worker's log and waits for it to +// finish, preserving live output and the synchronous stdout the rescue subagent and +// stop hook expect. If the harness kills this observer at the ceiling, the worker +// keeps running, records completion, and the result stays retrievable via +// `/codex:result ` instead of dying silently (issue #370). +async function observeDetachedTask(cwd, job, options = {}) { + const asJson = Boolean(options.json); + const jobId = job.id; + const workspaceRoot = job.workspaceRoot; + const logFile = job.logFile ?? null; + + if (!asJson) { + process.stderr.write( + `[codex] ${job.title ?? "Codex task"} dispatched as ${jobId}. ` + + "Streaming progress below; if this run is interrupted before it finishes, " + + `retrieve the result later with \`/codex:result ${jobId}\`.\n` + ); + } + + const deadline = Date.now() + FOREGROUND_OBSERVE_TIMEOUT_MS; + let logOffset = asJson ? 0 : streamJobLogTail(logFile, 0); + let snapshot = buildSingleJobSnapshot(cwd, jobId); + + while (isActiveJobStatus(snapshot.job.status) && Date.now() < deadline) { + await sleep(Math.min(FOREGROUND_OBSERVE_POLL_INTERVAL_MS, Math.max(0, deadline - Date.now()))); + if (!asJson) { + logOffset = streamJobLogTail(logFile, logOffset); + } + snapshot = buildSingleJobSnapshot(cwd, jobId); + } + if (!asJson) { + streamJobLogTail(logFile, logOffset); + } + + if (isActiveJobStatus(snapshot.job.status)) { + // The observer gave up waiting, but the detached worker is still running. + if (asJson) { + outputResult({ jobId, status: snapshot.job.status, waitTimedOut: true, job: snapshot.job }, true); + } else { + process.stdout.write( + `Codex job ${jobId} is still running. Retrieve the result later with \`/codex:result ${jobId}\`.\n` + ); + } + return snapshot; + } + + const status = snapshot.job.status; + const storedJob = readStoredJob(workspaceRoot, jobId); + const renderedText = + storedJob && typeof storedJob.rendered === "string" && storedJob.rendered.length > 0 + ? storedJob.rendered + : null; + + if (status === "completed") { + if (asJson) { + outputResult(storedJob?.result ?? { job: snapshot.job }, true); + } else { + process.stdout.write(ensureTrailingNewline(renderedText ?? renderStoredJobResult(snapshot.job, storedJob))); + } + return snapshot; + } + + // Failed or cancelled. + if (renderedText) { + // Codex produced output but reported a non-zero status — surface it exactly + // as the old inline foreground path did. + if (asJson) { + outputResult(storedJob?.result ?? { job: snapshot.job }, true); + } else { + process.stdout.write(ensureTrailingNewline(renderedText)); + } + } else { + // The turn failed before producing output — mirror the inline `main().catch` + // behaviour and report the error on stderr with a non-zero exit code. + const message = + storedJob?.errorMessage ?? snapshot.job.errorMessage ?? `Codex job ${jobId} ${status} before producing output.`; + process.stderr.write(`${message}\n`); + } + + const exitStatus = Number(storedJob?.result?.status); + process.exitCode = Number.isFinite(exitStatus) && exitStatus !== 0 ? exitStatus : 1; + return snapshot; +} + async function handleReviewCommand(argv, config) { const { options, positionals } = parseCommandInput(argv, { valueOptions: ["base", "scope", "model", "cwd"], @@ -769,27 +928,30 @@ async function handleTask(argv) { resumeLast, jobId: job.id }); - const { payload } = enqueueBackgroundTask(cwd, job, request); + const { payload } = enqueueDetachedTask(cwd, job, request); outputCommandResult(payload, renderQueuedTaskLaunch(payload), options.json); return; } + // Foreground (default): dispatch to the same detached worker the background + // path uses, then observe it. The worker survives Claude Code's 10-minute Bash + // ceiling, so a long turn no longer dies silently mid-run — it finishes in the + // worker and stays retrievable via /codex:result (issue #370). + ensureCodexAvailable(cwd); + requireTaskRequest(prompt, resumeLast); + const job = buildTaskJob(workspaceRoot, taskMetadata, write); - await runForegroundCommand( - job, - (progress) => - executeTaskRun({ - cwd, - model, - effort, - prompt, - write, - resumeLast, - jobId: job.id, - onProgress: progress - }), - { json: options.json } - ); + const request = buildTaskRequest({ + cwd, + model, + effort, + prompt, + write, + resumeLast, + jobId: job.id + }); + const { logFile } = enqueueDetachedTask(cwd, job, request); + await observeDetachedTask(cwd, { ...job, logFile }, { json: options.json }); } async function handleTaskWorker(argv) { diff --git a/plugins/codex/scripts/lib/job-control.mjs b/plugins/codex/scripts/lib/job-control.mjs index ad152c15..c4b626aa 100644 --- a/plugins/codex/scripts/lib/job-control.mjs +++ b/plugins/codex/scripts/lib/job-control.mjs @@ -58,6 +58,26 @@ function isProgressBlockTitle(line) { ); } +// A progress entry is written by appendLogLine as `[] message`, where +// the timestamp comes from nowIso() (`new Date().toISOString()`). Match that +// exact prefix — not a bare `[` — so block bodies that merely *start* with a +// bracket (a markdown reference like `[1] ...`, a `[P2] ...` line, or a JSON +// array in Codex's answer) are not mistaken for progress and streamed (#372). +const LOG_TIMESTAMP_PREFIX = /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z\]\s/; + +// True for a raw log line that is a streamable progress message — i.e. a +// timestamped entry that is not a block title. Block bodies (the assistant +// message, the final output, reasoning, etc.) are written as continuation lines +// without the timestamp prefix and return false, so callers tailing the log for +// live progress do not echo the full result (which is rendered on stdout). +export function isStreamableProgressLine(line) { + if (typeof line !== "string" || !LOG_TIMESTAMP_PREFIX.test(line)) { + return false; + } + const stripped = stripLogPrefix(line); + return Boolean(stripped) && !isProgressBlockTitle(stripped); +} + export function readJobProgressPreview(logFile, maxLines = DEFAULT_MAX_PROGRESS_LINES) { if (!logFile || !fs.existsSync(logFile)) { return []; diff --git a/tests/job-control.test.mjs b/tests/job-control.test.mjs new file mode 100644 index 00000000..639ca178 --- /dev/null +++ b/tests/job-control.test.mjs @@ -0,0 +1,41 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { isStreamableProgressLine } from "../plugins/codex/scripts/lib/job-control.mjs"; + +// Regression for #372: the foreground task observer tails the job log to stderr +// for live progress. It must echo only progress lines, never the persisted +// block bodies (assistant message, Final output, reasoning), or it duplicates +// Codex's answer onto stderr alongside the rendered stdout result. +test("isStreamableProgressLine keeps progress lines and drops block titles/bodies", () => { + // Timestamped progress lines are streamable. + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:39.925Z] Starting Codex Task."), true); + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:42.000Z] Turn completed."), true); + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Assistant message captured: OK"), true); + + // Block title lines (their bodies follow on stdout) are not streamable. + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Final output"), false); + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Assistant message"), false); + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Reasoning summary"), false); + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Review output"), false); + assert.equal(isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Subagent design-challenger message"), false); + assert.equal( + isStreamableProgressLine("[2026-06-13T06:15:43.000Z] Subagent design-challenger reasoning summary"), + false + ); + + // Unprefixed block-body / continuation lines and blanks are not streamable. + assert.equal(isStreamableProgressLine("OK"), false); + assert.equal(isStreamableProgressLine("the full assistant answer body line"), false); + assert.equal(isStreamableProgressLine(""), false); + assert.equal(isStreamableProgressLine(null), false); + + // Block-body lines that merely START with a bracket must NOT be streamed — + // only a real ISO-8601 timestamp prefix counts as a progress entry (#372). + assert.equal(isStreamableProgressLine("[1] https://example.com a markdown reference"), false); + assert.equal(isStreamableProgressLine("[P2] a finding Codex wrote in its answer"), false); + assert.equal(isStreamableProgressLine('["a", "b", "c"]'), false); + assert.equal(isStreamableProgressLine("[TODO] fix the empty-state guard"), false); + // A non-Z / non-timestamp bracketed prefix is still not a progress entry. + assert.equal(isStreamableProgressLine("[2026-06-13] partial date only"), false); +}); diff --git a/tests/runtime.test.mjs b/tests/runtime.test.mjs index 90408372..31862875 100644 --- a/tests/runtime.test.mjs +++ b/tests/runtime.test.mjs @@ -833,6 +833,77 @@ test("task --background enqueues a detached worker and exposes per-job status", assert.match(resultPayload.storedJob.rendered, /Handled the requested task/); }); +test("foreground task survives its observer being killed mid-run and stays retrievable (#370)", async (t) => { + const repo = makeTempDir(); + const binDir = makeTempDir(); + // The turn stays open for ~5s, giving a deterministic window to kill the + // observer while the detached worker is still running its turn. + installFakeCodex(binDir, "interruptible-slow-task"); + initGitRepo(repo); + fs.writeFileSync(path.join(repo, "README.md"), "hello\n"); + run("git", ["add", "README.md"], { cwd: repo }); + run("git", ["commit", "-m", "init"], { cwd: repo }); + + const env = buildEnv(binDir); + + // Launch a FOREGROUND task as its own process-group leader so the test can + // kill the observer (and its group) the way Claude Code's 10-minute Bash + // ceiling SIGTERMs the foreground process tree. The Codex turn runs in a + // detached worker that setsid's into its own session, so it is not in this + // group and must survive. + const observer = spawn(process.execPath, [SCRIPT, "task", "--json", "investigate the slow worker timeout"], { + cwd: repo, + env, + detached: true, + stdio: "ignore" + }); + observer.unref(); + t.after(() => { + try { + process.kill(-observer.pid, "SIGKILL"); + } catch { + // Already gone. + } + }); + + // Wait until the worker has the turn underway (running with a thread + turn id). + const jobId = await waitFor(() => { + const status = run("node", [SCRIPT, "status", "--json"], { cwd: repo, env }); + if (status.status !== 0) { + return null; + } + const running = JSON.parse(status.stdout).running ?? []; + const job = running.find((candidate) => candidate.jobClass === "task" && candidate.threadId && candidate.turnId); + return job ? job.id : null; + }, { timeoutMs: 15000 }); + + // Kill the observer's whole process group. The detached worker is in its own + // session, so this does not reach it. + process.kill(-observer.pid, "SIGKILL"); + + // The worker keeps running without the observer and drives the job to done. + const waited = run( + "node", + [SCRIPT, "status", jobId, "--wait", "--timeout-ms", "15000", "--json"], + { cwd: repo, env } + ); + assert.equal(waited.status, 0, waited.stderr); + assert.equal(JSON.parse(waited.stdout).job.status, "completed"); + + // The result is retrievable even though the observer died mid-run, instead of + // the job being stranded in "running" forever. + const resultPayload = await waitFor(() => { + const result = run("node", [SCRIPT, "result", jobId, "--json"], { cwd: repo, env }); + if (result.status !== 0) { + return null; + } + return JSON.parse(result.stdout); + }); + assert.equal(resultPayload.job.id, jobId); + assert.equal(resultPayload.job.status, "completed"); + assert.match(resultPayload.storedJob.rendered, /Handled the requested task/); +}); + test("review rejects focus text because it is native-review only", () => { const repo = makeTempDir(); const binDir = makeTempDir();