-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix: per-workspace broker ignores CODEX_HOME — make it account-aware so multi-account fallback works #369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: per-workspace broker ignores CODEX_HOME — make it account-aware so multi-account fallback works #369
Changes from all commits
912bd18
bc1258f
75dec55
bcc82b8
cd7fb09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,19 +40,98 @@ export async function waitForBrokerEndpoint(endpoint, timeoutMs = 2000) { | |
| return false; | ||
| } | ||
|
|
||
| export async function sendBrokerShutdown(endpoint) { | ||
| await new Promise((resolve) => { | ||
| // Keep in sync with BROKER_BUSY_RPC_CODE in lib/app-server.mjs (importing it | ||
| // here would create a circular dependency: app-server.mjs imports this module). | ||
| const BROKER_BUSY_RPC_CODE = -32001; | ||
|
|
||
| /** | ||
| * Probe whether the broker is currently serving a request or streaming turn. | ||
| * Uses the `broker/status` RPC (answered before the busy gate). Brokers from | ||
| * older plugin versions don't implement it: when busy their gate rejects the | ||
| * probe with BROKER_BUSY_RPC_CODE (busy), and when idle they forward it to the | ||
| * app server which rejects the unknown method (idle) — both interpretable. | ||
| * Timeouts are treated as busy so an unresponsive broker is never killed | ||
| * mid-turn by account rotation. | ||
| */ | ||
| export async function isBrokerBusy(endpoint, timeoutMs = 1500) { | ||
| return await new Promise((resolve) => { | ||
| const socket = connectToEndpoint(endpoint); | ||
| let buffer = ""; | ||
| let settled = false; | ||
| const finish = (busy) => { | ||
| if (!settled) { | ||
| settled = true; | ||
| clearTimeout(timer); | ||
| socket.destroy(); | ||
| resolve(busy); | ||
| } | ||
| }; | ||
| const timer = setTimeout(() => finish(true), timeoutMs); | ||
| socket.setEncoding("utf8"); | ||
| socket.on("connect", () => { | ||
| socket.write(`${JSON.stringify({ id: 1, method: "broker/shutdown", params: {} })}\n`); | ||
| socket.write(`${JSON.stringify({ id: 1, method: "broker/status", params: {} })}\n`); | ||
| }); | ||
| socket.on("data", () => { | ||
| socket.end(); | ||
| resolve(); | ||
| socket.on("data", (chunk) => { | ||
| buffer += chunk; | ||
| const newlineIndex = buffer.indexOf("\n"); | ||
| if (newlineIndex === -1) { | ||
| return; | ||
| } | ||
| try { | ||
| const message = JSON.parse(buffer.slice(0, newlineIndex)); | ||
| if (message.error) { | ||
| finish(message.error.code === BROKER_BUSY_RPC_CODE); | ||
| return; | ||
| } | ||
| finish(Boolean(message.result?.busy)); | ||
| } catch { | ||
| finish(true); | ||
| } | ||
| }); | ||
| socket.on("error", resolve); | ||
| socket.on("close", resolve); | ||
| socket.on("error", () => finish(true)); | ||
| socket.on("close", () => finish(true)); | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Ask the broker to shut down. With `ifIdle: true` the broker refuses when a | ||
| * request/stream is in flight (atomic with its busy state — see | ||
| * app-server-broker.mjs); the promise then resolves `false`. Resolves `true` | ||
| * when the broker shut down (or on legacy brokers that ignore the param and | ||
| * reply with an empty result). | ||
| */ | ||
| export async function sendBrokerShutdown(endpoint, { ifIdle = false } = {}) { | ||
| return await new Promise((resolve) => { | ||
| const socket = connectToEndpoint(endpoint); | ||
| let buffer = ""; | ||
| let settled = false; | ||
| const finish = (didShutdown) => { | ||
| if (!settled) { | ||
| settled = true; | ||
| socket.end(); | ||
| resolve(didShutdown); | ||
| } | ||
| }; | ||
| socket.setEncoding("utf8"); | ||
| socket.on("connect", () => { | ||
| const params = ifIdle ? { ifIdle: true } : {}; | ||
| socket.write(`${JSON.stringify({ id: 1, method: "broker/shutdown", params })}\n`); | ||
| }); | ||
| socket.on("data", (chunk) => { | ||
| buffer += chunk; | ||
| const newlineIndex = buffer.indexOf("\n"); | ||
| if (newlineIndex === -1) { | ||
| return; | ||
| } | ||
| try { | ||
| const message = JSON.parse(buffer.slice(0, newlineIndex)); | ||
| finish(message.result?.shutdown !== false); | ||
| } catch { | ||
| finish(true); | ||
| } | ||
| }); | ||
| socket.on("error", () => finish(true)); | ||
| socket.on("close", () => finish(true)); | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -111,12 +190,36 @@ async function isBrokerEndpointReady(endpoint) { | |
| } | ||
|
|
||
| export async function ensureBrokerSession(cwd, options = {}) { | ||
| // Account-aware reuse: the broker (and the `codex app-server` it manages) | ||
| // inherits CODEX_HOME once, at spawn time, so a live broker started under one | ||
| // account would otherwise silently serve every later call in this workspace — | ||
| // ignoring the caller's CODEX_HOME and breaking multi-account fallback. | ||
| // When the caller's CODEX_HOME differs from the one the session was created | ||
| // with, shut the old broker down gracefully and start a fresh one. | ||
| const desiredCodexHome = (options.env ?? process.env).CODEX_HOME ?? ""; | ||
| const existing = loadBrokerSession(cwd); | ||
| if (existing && (await isBrokerEndpointReady(existing.endpoint))) { | ||
| const existingReady = existing ? await isBrokerEndpointReady(existing.endpoint) : false; | ||
| if (existing && existingReady && (existing.codexHome ?? "") === desiredCodexHome) { | ||
| return existing; | ||
| } | ||
|
|
||
| if (existing) { | ||
| if (existingReady && (existing.codexHome ?? "") !== desiredCodexHome) { | ||
| // Never kill in-flight work on account rotation. The probe also covers | ||
| // legacy brokers (their busy gate rejects it with BROKER_BUSY_RPC_CODE); | ||
| // the `ifIdle` shutdown closes the probe→shutdown race on current | ||
| // brokers (a turn that starts in between makes the broker refuse). | ||
| // On either busy signal: return null so this call falls back to a | ||
| // directly spawned app server with the caller's env, and the rotation | ||
| // happens on the next call that finds the broker idle. | ||
| if (await isBrokerBusy(existing.endpoint)) { | ||
| return null; | ||
| } | ||
| const didShutdown = await sendBrokerShutdown(existing.endpoint, { ifIdle: true }); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the saved session belongs to a pre-fix broker (no Useful? React with 👍 / 👎. |
||
| if (!didShutdown) { | ||
| return null; | ||
| } | ||
| } | ||
| teardownBrokerSession({ | ||
| endpoint: existing.endpoint ?? null, | ||
| pidFile: existing.pidFile ?? null, | ||
|
|
@@ -164,7 +267,8 @@ export async function ensureBrokerSession(cwd, options = {}) { | |
| pidFile, | ||
| logFile, | ||
| sessionDir, | ||
| pid: child.pid ?? null | ||
| pid: child.pid ?? null, | ||
| codexHome: desiredCodexHome | ||
| }; | ||
| saveBrokerSession(cwd, session); | ||
| return session; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This account match only applies to callers that go through
ensureBrokerSession; callers usingreuseExistingBrokerstill readloadBrokerSession(cwd)?.endpointdirectly inCodexAppServerClient.connect. In practice, after account A has started a shared broker, running/codex:setupor another auth/status path withCODEX_HOMEset to account B will connect to A's broker and report A'saccount/read/config/readresults, so setup can say the wrong account is logged in even though task/review calls would rotate or fall back for B.Useful? React with 👍 / 👎.