diff --git a/README.md b/README.md index 458c39fb..eb5f85bf 100644 --- a/README.md +++ b/README.md @@ -270,6 +270,74 @@ Your configuration will be picked up based on: Check out the Codex docs for more [configuration options](https://developers.openai.com/codex/config-reference). +### Multiple Accounts (`CODEX_HOME`) + +The Codex CLI keeps everything account-specific — `auth.json`, `config.toml`, +session history — inside `CODEX_HOME` (default `~/.codex`). Pointing it at a +different directory gives you a fully independent account profile, which is +useful when one account hits its 5h/weekly usage window and another has idle +capacity (for example, a Business workspace where some seats belong to +teammates who don't use Codex day-to-day). + +#### Setup + +Add one alias per account to your shell profile: + +```bash +# ~/.zshrc / ~/.bashrc +alias codex-main='CODEX_HOME=$HOME/.codex codex' +alias codex-alice='CODEX_HOME=$HOME/.codex-alice codex' +alias codex-bob='CODEX_HOME=$HOME/.codex-bob codex' +``` + +Then: + +```bash +source ~/.zshrc +``` + +Log each account in once, into its own home: + +```bash +codex-main login # default account -> ~/.codex +codex-alice login # second account -> ~/.codex-alice +codex-bob login # third account -> ~/.codex-bob +``` + +From then on, each alias is a fully independent Codex: + +```bash +codex-main +codex-alice +codex-bob +``` + +> **Note:** the aliases are pure convenience for you in interactive shells — +> switching accounts by typing `codex-alice` instead of +> `CODEX_HOME=$HOME/.codex-alice codex`. The only **mandatory** step is logging +> each account in once, into its own home (`codex-alice login`). The plugin +> never uses your aliases: it reads the `CODEX_HOME` environment variable +> directly, as described below. + +#### How the plugin handles it + +The plugin honors the same variable: invoking the companion with a different +`CODEX_HOME` runs that turn on that account. The per-workspace broker is +account-aware — it restarts when the account changes **and the broker is idle**; +if it is mid-task for the previous account, the new call runs on a directly +spawned app server instead (in-flight work is never interrupted) and the +rotation happens on the next idle call. Same-account calls keep reusing the +warm broker. + +Two gotchas worth knowing: + +- **Non-interactive shells** (agents, CI, hooks) don't load your shell aliases — + always use the explicit `CODEX_HOME=... codex ...` form there. +- **Placement in compound commands**: an env assignment only applies to the + command it directly prefixes. `CODEX_HOME=... cd dir && codex ...` sets the + variable for `cd` and silently runs Codex on the default account. Correct: + `cd dir && CODEX_HOME=... codex ...`. + ### Moving The Work Over To Codex Delegated tasks and any [stop gate](#what-does-the-review-gate-do) run can also be directly resumed inside Codex by running `codex resume` either with the specific session ID you received from running `/codex:result` or `/codex:status` or by selecting it from the list. diff --git a/plugins/codex/scripts/app-server-broker.mjs b/plugins/codex/scripts/app-server-broker.mjs index 1954274f..d0507ff3 100644 --- a/plugins/codex/scripts/app-server-broker.mjs +++ b/plugins/codex/scripts/app-server-broker.mjs @@ -158,11 +158,29 @@ async function main() { } if (message.id !== undefined && message.method === "broker/shutdown") { - send(socket, { id: message.id, result: {} }); + // `ifIdle` makes the shutdown atomic with the busy check (single + // event loop): a turn that started after a caller's idle probe makes + // the broker refuse, instead of dropping that in-flight work. + const busyNow = Boolean(activeRequestSocket || activeStreamSocket); + if (message.params?.ifIdle && busyNow) { + send(socket, { id: message.id, result: { shutdown: false, busy: true } }); + continue; + } + send(socket, { id: message.id, result: { shutdown: true } }); await shutdown(server); process.exit(0); } + // Answered before the busy gate so callers can probe safely while a + // turn is in flight (used by account-aware broker rotation). + if (message.id !== undefined && message.method === "broker/status") { + send(socket, { + id: message.id, + result: { busy: Boolean(activeRequestSocket || activeStreamSocket) } + }); + continue; + } + if (message.id === undefined) { continue; } diff --git a/plugins/codex/scripts/lib/broker-lifecycle.mjs b/plugins/codex/scripts/lib/broker-lifecycle.mjs index ef763819..93caa8c5 100644 --- a/plugins/codex/scripts/lib/broker-lifecycle.mjs +++ b/plugins/codex/scripts/lib/broker-lifecycle.mjs @@ -40,19 +40,98 @@ export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000) { return false; } -export async function sendBrokerShutdown(endpoint) { - await new Promise((resolve) => { +// Keep in sync with BROKER_BUSY_RPC_CODE in lib/app-server.mjs (importing it +// here would create a circular dependency: app-server.mjs imports this module). +const BROKER_BUSY_RPC_CODE = -32001; + +/** + * Probe whether the broker is currently serving a request or streaming turn. + * Uses the `broker/status` RPC (answered before the busy gate). Brokers from + * older plugin versions don't implement it: when busy their gate rejects the + * probe with BROKER_BUSY_RPC_CODE (busy), and when idle they forward it to the + * app server which rejects the unknown method (idle) — both interpretable. + * Timeouts are treated as busy so an unresponsive broker is never killed + * mid-turn by account rotation. + */ +export async function isBrokerBusy(endpoint, timeoutMs = 1500) { + return await new Promise((resolve) => { const socket = connectToEndpoint(endpoint); + let buffer = ""; + let settled = false; + const finish = (busy) => { + if (!settled) { + settled = true; + clearTimeout(timer); + socket.destroy(); + resolve(busy); + } + }; + const timer = setTimeout(() => finish(true), timeoutMs); socket.setEncoding("utf8"); socket.on("connect", () => { - socket.write(`${JSON.stringify({ id: 1, method: "broker/shutdown", params: {} })}\n`); + socket.write(`${JSON.stringify({ id: 1, method: "broker/status", params: {} })}\n`); }); - socket.on("data", () => { - socket.end(); - resolve(); + socket.on("data", (chunk) => { + buffer += chunk; + const newlineIndex = buffer.indexOf("\n"); + if (newlineIndex === -1) { + return; + } + try { + const message = JSON.parse(buffer.slice(0, newlineIndex)); + if (message.error) { + finish(message.error.code === BROKER_BUSY_RPC_CODE); + return; + } + finish(Boolean(message.result?.busy)); + } catch { + finish(true); + } }); - socket.on("error", resolve); - socket.on("close", resolve); + socket.on("error", () => finish(true)); + socket.on("close", () => finish(true)); + }); +} + +/** + * Ask the broker to shut down. With `ifIdle: true` the broker refuses when a + * request/stream is in flight (atomic with its busy state — see + * app-server-broker.mjs); the promise then resolves `false`. Resolves `true` + * when the broker shut down (or on legacy brokers that ignore the param and + * reply with an empty result). + */ +export async function sendBrokerShutdown(endpoint, { ifIdle = false } = {}) { + return await new Promise((resolve) => { + const socket = connectToEndpoint(endpoint); + let buffer = ""; + let settled = false; + const finish = (didShutdown) => { + if (!settled) { + settled = true; + socket.end(); + resolve(didShutdown); + } + }; + socket.setEncoding("utf8"); + socket.on("connect", () => { + const params = ifIdle ? { ifIdle: true } : {}; + socket.write(`${JSON.stringify({ id: 1, method: "broker/shutdown", params })}\n`); + }); + socket.on("data", (chunk) => { + buffer += chunk; + const newlineIndex = buffer.indexOf("\n"); + if (newlineIndex === -1) { + return; + } + try { + const message = JSON.parse(buffer.slice(0, newlineIndex)); + finish(message.result?.shutdown !== false); + } catch { + finish(true); + } + }); + socket.on("error", () => finish(true)); + socket.on("close", () => finish(true)); }); } @@ -111,12 +190,36 @@ async function isBrokerEndpointReady(endpoint) { } export async function ensureBrokerSession(cwd, options = {}) { + // Account-aware reuse: the broker (and the `codex app-server` it manages) + // inherits CODEX_HOME once, at spawn time, so a live broker started under one + // account would otherwise silently serve every later call in this workspace — + // ignoring the caller's CODEX_HOME and breaking multi-account fallback. + // When the caller's CODEX_HOME differs from the one the session was created + // with, shut the old broker down gracefully and start a fresh one. + const desiredCodexHome = (options.env ?? process.env).CODEX_HOME ?? ""; const existing = loadBrokerSession(cwd); - if (existing && (await isBrokerEndpointReady(existing.endpoint))) { + const existingReady = existing ? await isBrokerEndpointReady(existing.endpoint) : false; + if (existing && existingReady && (existing.codexHome ?? "") === desiredCodexHome) { return existing; } if (existing) { + if (existingReady && (existing.codexHome ?? "") !== desiredCodexHome) { + // Never kill in-flight work on account rotation. The probe also covers + // legacy brokers (their busy gate rejects it with BROKER_BUSY_RPC_CODE); + // the `ifIdle` shutdown closes the probe→shutdown race on current + // brokers (a turn that starts in between makes the broker refuse). + // On either busy signal: return null so this call falls back to a + // directly spawned app server with the caller's env, and the rotation + // happens on the next call that finds the broker idle. + if (await isBrokerBusy(existing.endpoint)) { + return null; + } + const didShutdown = await sendBrokerShutdown(existing.endpoint, { ifIdle: true }); + if (!didShutdown) { + return null; + } + } teardownBrokerSession({ endpoint: existing.endpoint ?? null, pidFile: existing.pidFile ?? null, @@ -164,7 +267,8 @@ export async function ensureBrokerSession(cwd, options = {}) { pidFile, logFile, sessionDir, - pid: child.pid ?? null + pid: child.pid ?? null, + codexHome: desiredCodexHome }; saveBrokerSession(cwd, session); return session; diff --git a/tests/broker-lifecycle.test.mjs b/tests/broker-lifecycle.test.mjs new file mode 100644 index 00000000..932a4f4b --- /dev/null +++ b/tests/broker-lifecycle.test.mjs @@ -0,0 +1,220 @@ +import assert from "node:assert/strict"; +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import process from "node:process"; +import { fileURLToPath } from "node:url"; +import { test } from "node:test"; + +import { + ensureBrokerSession, + loadBrokerSession, + saveBrokerSession +} from "../plugins/codex/scripts/lib/broker-lifecycle.mjs"; + +const FAKE_BROKER = path.join(path.dirname(fileURLToPath(import.meta.url)), "fake-broker-fixture.mjs"); + +function makeTempDir(prefix = "codex-broker-lifecycle-") { + return fs.mkdtempSync(path.join(os.tmpdir(), prefix)); +} + +function isPidAlive(pid) { + if (!Number.isFinite(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function stopBroker(session) { + if (session?.pid) { + try { + process.kill(session.pid); + } catch { + // already gone + } + } +} + +async function waitFor(predicate, { timeoutMs = 5000, intervalMs = 50 } = {}) { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (await predicate()) { + return true; + } + await new Promise((resolve) => setTimeout(resolve, intervalMs)); + } + throw new Error("Timed out waiting for condition."); +} + +async function withIsolatedState(fn) { + const pluginDataDir = makeTempDir(); + const cwd = makeTempDir(); + const previousPluginDataDir = process.env.CLAUDE_PLUGIN_DATA; + process.env.CLAUDE_PLUGIN_DATA = pluginDataDir; + try { + await fn(cwd); + } finally { + if (previousPluginDataDir === undefined) { + delete process.env.CLAUDE_PLUGIN_DATA; + } else { + process.env.CLAUDE_PLUGIN_DATA = previousPluginDataDir; + } + fs.rmSync(pluginDataDir, { recursive: true, force: true }); + fs.rmSync(cwd, { recursive: true, force: true }); + } +} + +function startSession(cwd, { codexHome, recordFile, busy, busyAfterStatus } = {}) { + const env = { ...process.env }; + delete env.CODEX_HOME; + delete env.FAKE_BROKER_BUSY; + if (codexHome !== undefined) { + env.CODEX_HOME = codexHome; + } + if (recordFile) { + env.FAKE_BROKER_RECORD = recordFile; + } + if (busy) { + env.FAKE_BROKER_BUSY = "1"; + } + if (busyAfterStatus) { + env.FAKE_BROKER_BUSY_AFTER_STATUS = "1"; + } + // Generous readiness timeout: under parallel test-runner load a node child + // can take several seconds to start; 5s flaked on busy machines. + return ensureBrokerSession(cwd, { scriptPath: FAKE_BROKER, env, timeoutMs: 20000 }); +} + +test("ensureBrokerSession records CODEX_HOME and reuses the broker for the same account", async () => { + await withIsolatedState(async (cwd) => { + const homeA = makeTempDir("codex-home-a-"); + let session; + try { + session = await startSession(cwd, { codexHome: homeA }); + assert.ok(session, "broker session should start"); + assert.equal(session.codexHome, homeA); + + const reused = await startSession(cwd, { codexHome: homeA }); + assert.ok(reused); + assert.equal(reused.pid, session.pid, "same account must reuse the live broker"); + assert.equal(reused.sessionDir, session.sessionDir); + } finally { + stopBroker(session); + fs.rmSync(homeA, { recursive: true, force: true }); + } + }); +}); + +test("switching CODEX_HOME shuts down the old broker and spawns one with the new env", async () => { + await withIsolatedState(async (cwd) => { + const homeA = makeTempDir("codex-home-a-"); + const homeB = makeTempDir("codex-home-b-"); + const recordB = path.join(makeTempDir("codex-record-"), "spawned-env"); + let first; + let second; + try { + first = await startSession(cwd, { codexHome: homeA }); + assert.ok(first); + + second = await startSession(cwd, { codexHome: homeB, recordFile: recordB }); + assert.ok(second, "broker for the new account should start"); + assert.equal(second.codexHome, homeB); + assert.notEqual(second.sessionDir, first.sessionDir, "a fresh broker must be spawned"); + + // the new broker process really inherited the new CODEX_HOME + assert.equal(fs.readFileSync(recordB, "utf8"), homeB); + + // the old broker received broker/shutdown and exited + await waitFor(() => !isPidAlive(first.pid)); + + // persisted state points at the new account + assert.equal(loadBrokerSession(cwd)?.codexHome, homeB); + } finally { + stopBroker(first); + stopBroker(second); + for (const dir of [homeA, homeB, path.dirname(recordB)]) { + fs.rmSync(dir, { recursive: true, force: true }); + } + } + }); +}); + +test("account switch never kills a busy broker — falls back to null and keeps it serving", async () => { + await withIsolatedState(async (cwd) => { + const homeA = makeTempDir("codex-home-a-"); + const homeB = makeTempDir("codex-home-b-"); + let first; + try { + // Broker for account A that reports itself busy (mid-turn). + first = await startSession(cwd, { codexHome: homeA, busy: true }); + assert.ok(first); + + // A caller with account B must NOT shut it down: null means "run direct". + const second = await startSession(cwd, { codexHome: homeB }); + assert.equal(second, null, "busy broker must not be rotated"); + + // The busy broker is untouched and still owned by account A. + assert.equal(isPidAlive(first.pid), true, "busy broker must stay alive"); + assert.equal(loadBrokerSession(cwd)?.codexHome, homeA); + + // Same-account callers keep reusing it as before. + const sameAccount = await startSession(cwd, { codexHome: homeA }); + assert.equal(sameAccount?.pid, first.pid); + } finally { + stopBroker(first); + for (const dir of [homeA, homeB]) { + fs.rmSync(dir, { recursive: true, force: true }); + } + } + }); +}); + +test("a turn starting between the idle probe and the shutdown makes the broker refuse (ifIdle)", async () => { + await withIsolatedState(async (cwd) => { + const homeA = makeTempDir("codex-home-a-"); + const homeB = makeTempDir("codex-home-b-"); + let first; + try { + // Broker reports idle to the status probe, then becomes busy before the + // shutdown lands — the exact probe->shutdown race. + first = await startSession(cwd, { codexHome: homeA, busyAfterStatus: true }); + assert.ok(first); + + const second = await startSession(cwd, { codexHome: homeB }); + assert.equal(second, null, "refused ifIdle shutdown must fall back to null"); + assert.equal(isPidAlive(first.pid), true, "broker with the late-starting turn must survive"); + assert.equal(loadBrokerSession(cwd)?.codexHome, homeA); + } finally { + stopBroker(first); + for (const dir of [homeA, homeB]) { + fs.rmSync(dir, { recursive: true, force: true }); + } + } + }); +}); + +test("legacy broker.json without codexHome is treated as the default account", async () => { + await withIsolatedState(async (cwd) => { + let session; + try { + session = await startSession(cwd, {}); + assert.ok(session); + assert.equal(session.codexHome, ""); + + // simulate a session persisted by a pre-fix plugin version + const { codexHome: _omitted, ...legacy } = session; + saveBrokerSession(cwd, legacy); + + const reused = await startSession(cwd, {}); + assert.ok(reused); + assert.equal(reused.pid, session.pid, "default-account caller must reuse a legacy session"); + } finally { + stopBroker(session); + } + }); +}); diff --git a/tests/fake-broker-fixture.mjs b/tests/fake-broker-fixture.mjs new file mode 100644 index 00000000..4fda1451 --- /dev/null +++ b/tests/fake-broker-fixture.mjs @@ -0,0 +1,58 @@ +// Minimal stand-in for scripts/app-server-broker.mjs used by broker-lifecycle +// tests. Listens on the unix endpoint, writes the pid file, optionally records +// the CODEX_HOME it was spawned with (FAKE_BROKER_RECORD), and exits cleanly on +// the broker/shutdown RPC — the same surface ensureBrokerSession relies on. +import fs from "node:fs"; +import net from "node:net"; +import process from "node:process"; + +const args = process.argv.slice(2); + +function opt(name) { + const index = args.indexOf(name); + return index >= 0 ? args[index + 1] : undefined; +} + +const endpoint = opt("--endpoint"); +const pidFile = opt("--pid-file"); +if (!endpoint) { + process.exit(1); +} +const socketPath = endpoint.replace(/^unix:/, ""); + +if (process.env.FAKE_BROKER_RECORD) { + fs.writeFileSync(process.env.FAKE_BROKER_RECORD, process.env.CODEX_HOME ?? ""); +} + +const busy = process.env.FAKE_BROKER_BUSY === "1"; +// Simulates the probe->shutdown race: report idle on broker/status, then act +// busy by the time the (ifIdle) shutdown arrives. +const busyAfterStatus = process.env.FAKE_BROKER_BUSY_AFTER_STATUS === "1"; +let statusProbed = false; + +const server = net.createServer((socket) => { + socket.setEncoding("utf8"); + socket.on("data", (chunk) => { + if (chunk.includes("broker/shutdown")) { + const busyNow = busy || (busyAfterStatus && statusProbed); + if (busyNow && chunk.includes("ifIdle")) { + socket.write(`${JSON.stringify({ id: 1, result: { shutdown: false, busy: true } })}\n`); + return; + } + socket.write(`${JSON.stringify({ id: 1, result: { shutdown: true } })}\n`); + socket.end(); + server.close(() => process.exit(0)); + return; + } + if (chunk.includes("broker/status")) { + socket.write(`${JSON.stringify({ id: 1, result: { busy } })}\n`); + statusProbed = true; + } + }); +}); + +server.listen(socketPath, () => { + if (pidFile) { + fs.writeFileSync(pidFile, String(process.pid)); + } +});