Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions plugins/codex/scripts/lib/broker-lifecycle.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ export const PID_FILE_ENV = "CODEX_COMPANION_APP_SERVER_PID_FILE";
export const LOG_FILE_ENV = "CODEX_COMPANION_APP_SERVER_LOG_FILE";
const BROKER_STATE_FILE = "broker.json";

// APT local patch (re: openai/codex-plugin-cc broker readiness race).
// Upstream waits a fixed 2000ms for a freshly-spawned `codex app-server` to bind
// its endpoint, then gives up and lets the caller spawn a duplicate. On Windows a
// cold app-server (Defender on-open scan + MCP fleet init) routinely needs longer,
// so the fixed timer loses the race intermittently. Make both windows env-tunable
// and raise the win32 defaults; ensureBrokerSession also now waits on child
// liveness so a healthy-but-slow broker is never torn down prematurely.
const DEFAULT_BROKER_READY_TIMEOUT_MS =
Number(process.env.CODEX_COMPANION_BROKER_TIMEOUT_MS) ||
(process.platform === "win32" ? 15000 : 4000);
const DEFAULT_BROKER_REUSE_PROBE_MS =
Number(process.env.CODEX_COMPANION_BROKER_REUSE_PROBE_MS) ||
(process.platform === "win32" ? 750 : 250);

export function createBrokerSessionDir(prefix = "cxc-") {
return fs.mkdtempSync(path.join(os.tmpdir(), prefix));
}
Expand All @@ -21,7 +35,7 @@ function connectToEndpoint(endpoint) {
return net.createConnection({ path: target.path });
}

export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000) {
export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000, child = null) {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
const ready = await new Promise((resolve) => {
Expand All @@ -35,6 +49,12 @@ export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000) {
if (ready) {
return true;
}
// APT local patch: if the spawned broker has already exited there is nothing
// left to bind the endpoint, so stop waiting immediately instead of burning
// the remainder of the (now longer) timeout.
if (child && child.exitCode !== null) {
return false;
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
return false;
Expand Down Expand Up @@ -104,7 +124,7 @@ async function isBrokerEndpointReady(endpoint) {
return false;
}
try {
return await waitForBrokerEndpoint(endpoint, 150);
return await waitForBrokerEndpoint(endpoint, DEFAULT_BROKER_REUSE_PROBE_MS);
} catch {
return false;
}
Expand Down Expand Up @@ -146,7 +166,7 @@ export async function ensureBrokerSession(cwd, options = {}) {
env: options.env ?? process.env
});

const ready = await waitForBrokerEndpoint(endpoint, options.timeoutMs ?? 2000);
const ready = await waitForBrokerEndpoint(endpoint, options.timeoutMs ?? DEFAULT_BROKER_READY_TIMEOUT_MS, child);
if (!ready) {
teardownBrokerSession({
endpoint,
Expand Down
95 changes: 94 additions & 1 deletion plugins/codex/scripts/session-lifecycle-hook.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ import {
sendBrokerShutdown,
teardownBrokerSession
} from "./lib/broker-lifecycle.mjs";
import { loadState, resolveStateFile, saveState } from "./lib/state.mjs";
import {
loadState,
readJobFile,
resolveJobFile,
resolveStateFile,
saveState,
writeJobFile
} from "./lib/state.mjs";
import { TRANSCRIPT_PATH_ENV } from "./lib/claude-session-transfer.mjs";
import { resolveWorkspaceRoot } from "./lib/workspace.mjs";

Expand Down Expand Up @@ -74,10 +81,96 @@ function cleanupSessionJobs(cwd, sessionId) {
});
}

// SessionEnd only cleans the *current* session, and on Windows that hook
// frequently never fires (abrupt terminal close / crash / window reload). So
// dead-pid brokers and ghost "running"/"queued" job records from prior sessions
// accumulate, making `/codex:status` report phantom jobs and (if `activeTask`
// ever points at one) blocking new dispatches. SessionStart already runs, so use
// it to self-heal prior-session orphans.
function isPidAlive(pid) {
if (!Number.isFinite(pid) || pid <= 0) {
return false;
}
try {
process.kill(pid, 0);
return true;
} catch (error) {
// EPERM => the process exists but we may not signal it; treat as alive.
return error?.code === "EPERM";
}
}

function reapStaleJobs(cwd) {
const workspaceRoot = resolveWorkspaceRoot(cwd);
const stateFile = resolveStateFile(workspaceRoot);
if (!fs.existsSync(stateFile)) {
return;
}

const state = loadState(workspaceRoot);
let changed = false;
for (const job of state.jobs) {
const active = job.status === "queued" || job.status === "running";
if (!active || isPidAlive(job.pid)) {
continue;
}
job.status = "failed";
job.pid = null;
Comment on lines +117 to +118

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Terminate stale job process groups before marking failed

When a detached task-worker exits but leaves its Codex child running in the same detached process group, isPidAlive(job.pid) is false here, so SessionStart only flips the record to failed and never calls the existing terminateProcessTree(job.pid) cleanup path. Since spawnDetachedTaskWorker creates detached workers and terminateProcessTree is what kills their process groups on Unix, this still leaks the orphaned app-server/task subprocesses while making the job look reaped.

Useful? React with 👍 / 👎.

changed = true;
// Keep the per-job record consistent with the neutralized state entry.
try {
const jobFile = resolveJobFile(workspaceRoot, job.id);
if (fs.existsSync(jobFile)) {
const record = readJobFile(jobFile);
record.status = "failed";
record.pid = null;
writeJobFile(workspaceRoot, job.id, record);
}
} catch {
// Ignore per-job file write failures during self-heal.
}
}

if (changed) {
saveState(workspaceRoot, state);
}
}

function reapOrphanBroker(cwd) {
const existing = loadBrokerSession(cwd);
if (!existing || isPidAlive(existing.pid)) {
return;
}
// The persisted broker's owning process is gone — tear down its stale endpoint
// so the next task spawns a fresh broker instead of probing a dead one.
teardownBrokerSession({
endpoint: existing.endpoint ?? null,
pidFile: existing.pidFile ?? null,
logFile: existing.logFile ?? null,
sessionDir: existing.sessionDir ?? null,
pid: existing.pid ?? null,
killProcess: terminateProcessTree
});
clearBrokerSession(cwd);
}

function handleSessionStart(input) {
appendEnvVar(SESSION_ID_ENV, input.session_id);
appendEnvVar(TRANSCRIPT_PATH_ENV, input.transcript_path);
appendEnvVar(PLUGIN_DATA_ENV, process.env[PLUGIN_DATA_ENV]);

// Best-effort self-heal of prior-session orphans; never block startup.
const cwd = input.cwd || process.cwd();
try {
reapStaleJobs(cwd);
} catch {
// Ignore self-heal failures.
}
try {
reapOrphanBroker(cwd);
} catch {
// Ignore self-heal failures.
}
}

async function handleSessionEnd(input) {
Expand Down
39 changes: 39 additions & 0 deletions tests/broker-lifecycle.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import assert from "node:assert/strict";
import test from "node:test";

import { waitForBrokerEndpoint } from "../plugins/codex/scripts/lib/broker-lifecycle.mjs";

const UNREACHABLE_ENDPOINT =
process.platform === "win32"
? "pipe:\\\\.\\pipe\\codex-broker-lifecycle-test-does-not-exist"
: "unix:/tmp/codex-broker-lifecycle-test-does-not-exist.sock";

test("waitForBrokerEndpoint returns false for an unreachable endpoint within the timeout", async () => {
const start = Date.now();
const ready = await waitForBrokerEndpoint(UNREACHABLE_ENDPOINT, 300);
const elapsed = Date.now() - start;
assert.equal(ready, false);
// Should consume roughly the whole window (probe + 50ms backoff loop), not hang.
assert.ok(elapsed >= 250 && elapsed < 2000, `unexpected elapsed ${elapsed}ms`);
});

test("waitForBrokerEndpoint bails immediately once the spawned broker has exited", async () => {
// A child whose exitCode is already set means nothing will ever bind the
// endpoint, so the wait must abandon early instead of burning the full timeout.
const exitedChild = { exitCode: 1 };
const start = Date.now();
const ready = await waitForBrokerEndpoint(UNREACHABLE_ENDPOINT, 10000, exitedChild);
const elapsed = Date.now() - start;
assert.equal(ready, false);
assert.ok(elapsed < 1000, `expected early bail, took ${elapsed}ms`);
});

test("waitForBrokerEndpoint keeps waiting while the spawned broker is still alive", async () => {
// exitCode null => process still booting; the wait should run to the timeout.
const liveChild = { exitCode: null };
const start = Date.now();
const ready = await waitForBrokerEndpoint(UNREACHABLE_ENDPOINT, 300, liveChild);
const elapsed = Date.now() - start;
assert.equal(ready, false);
assert.ok(elapsed >= 250, `expected full wait, took ${elapsed}ms`);
});