From 9633a9e0430d0e1dc4d755c40a0aacf13f6cd8bf Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:30:07 +0100 Subject: [PATCH 01/11] feat(supervisor): parse apiserver pod-object count from metrics scrape --- .../k8sPodCountSignalSource.test.ts | 29 +++++++++++++++++++ .../backpressure/k8sPodCountSignalSource.ts | 14 +++++++++ 2 files changed, 43 insertions(+) create mode 100644 apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts create mode 100644 apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts new file mode 100644 index 00000000000..8c0497a3488 --- /dev/null +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts @@ -0,0 +1,29 @@ +import { describe, it, expect } from "vitest"; +import { parsePodCount } from "./k8sPodCountSignalSource.js"; + +describe("parsePodCount", () => { + it("reads the pods object count", () => { + const text = [ + "# HELP apiserver_storage_objects Number of stored objects", + "# TYPE apiserver_storage_objects gauge", + 'apiserver_storage_objects{resource="pods"} 8421', + 'apiserver_storage_objects{resource="configmaps"} 17', + ].join("\n"); + expect(parsePodCount(text)).toBe(8421); + }); + + it("is tolerant of extra labels in any order", () => { + const text = 'apiserver_storage_objects{group="",resource="pods",extra="x"} 12'; + expect(parsePodCount(text)).toBe(12); + }); + + it("parses scientific notation", () => { + const text = 'apiserver_storage_objects{resource="pods"} 1.2e+04'; + expect(parsePodCount(text)).toBe(12000); + }); + + it("throws when the pods metric is absent", () => { + const text = 'apiserver_storage_objects{resource="configmaps"} 17'; + expect(() => parsePodCount(text)).toThrow(); + }); +}); diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts new file mode 100644 index 00000000000..947897cea5e --- /dev/null +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts @@ -0,0 +1,14 @@ +// Reads the apiserver's stored-pod-object count from a Prometheus /metrics scrape. +const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+-]+)/m; + +export function parsePodCount(metricsText: string): number { + const match = metricsText.match(POD_COUNT_RE); + if (!match) { + throw new Error('apiserver_storage_objects{resource="pods"} not found in metrics'); + } + const value = Number(match[1]); + if (!Number.isFinite(value)) { + throw new Error(`unparseable pod count: ${match[1]}`); + } + return value; +} From d76fed4dfcdb7fd0cd4010a7ae9571140d6f2a42 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:32:44 +0100 Subject: [PATCH 02/11] feat(supervisor): pod-count backpressure source with hysteresis --- .../k8sPodCountSignalSource.test.ts | 58 ++++++++++++++++++- .../backpressure/k8sPodCountSignalSource.ts | 32 ++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts index 8c0497a3488..fb26eb3c364 100644 --- a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { parsePodCount } from "./k8sPodCountSignalSource.js"; +import { parsePodCount, K8sPodCountSignalSource } from "./k8sPodCountSignalSource.js"; describe("parsePodCount", () => { it("reads the pods object count", () => { @@ -27,3 +27,59 @@ describe("parsePodCount", () => { expect(() => parsePodCount(text)).toThrow(); }); }); + +function metrics(count: number): string { + return `apiserver_storage_objects{resource="pods"} ${count}`; +} + +describe("K8sPodCountSignalSource", () => { + it("engages at the engage threshold and reports the count", async () => { + const counts: number[] = []; + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => metrics(10000), + engageThreshold: 10000, + releaseThreshold: 5000, + reportPodCount: (c) => counts.push(c), + }); + const verdict = await source.read(); + expect(verdict.engaged).toBe(true); + expect(typeof verdict.ts).toBe("number"); + expect(counts).toEqual([10000]); + }); + + it("does not engage below the engage threshold", async () => { + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => metrics(9999), + engageThreshold: 10000, + releaseThreshold: 5000, + }); + expect((await source.read()).engaged).toBe(false); + }); + + it("stays engaged in the hysteresis band, releases only below release threshold", async () => { + let count = 10000; + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => metrics(count), + engageThreshold: 10000, + releaseThreshold: 5000, + }); + expect((await source.read()).engaged).toBe(true); // engage + count = 7000; + expect((await source.read()).engaged).toBe(true); // band -> still engaged + count = 4999; + expect((await source.read()).engaged).toBe(false); // below release -> off + count = 7000; + expect((await source.read()).engaged).toBe(false); // band again -> stays off + }); + + it("propagates scrape failures (monitor fails open on throw)", async () => { + const source = new K8sPodCountSignalSource({ + fetchMetrics: async () => { + throw new Error("connection refused"); + }, + engageThreshold: 10000, + releaseThreshold: 5000, + }); + await expect(source.read()).rejects.toThrow("connection refused"); + }); +}); diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts index 947897cea5e..a9a7fbd2c86 100644 --- a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts @@ -1,3 +1,5 @@ +import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js"; + // Reads the apiserver's stored-pod-object count from a Prometheus /metrics scrape. const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+-]+)/m; @@ -12,3 +14,33 @@ export function parsePodCount(metricsText: string): number { } return value; } + +export type K8sPodCountSignalSourceOptions = { + fetchMetrics: () => Promise; + engageThreshold: number; + releaseThreshold: number; + reportPodCount?: (count: number) => void; +}; + +// Engage/release with hysteresis so a count hovering near the line doesn't flap. +export class K8sPodCountSignalSource implements BackpressureSignalSource { + private engaged = false; + + constructor(private readonly opts: K8sPodCountSignalSourceOptions) {} + + async read(): Promise { + const text = await this.opts.fetchMetrics(); + const count = parsePodCount(text); + this.opts.reportPodCount?.(count); + + if (this.engaged) { + if (count < this.opts.releaseThreshold) { + this.engaged = false; + } + } else if (count >= this.opts.engageThreshold) { + this.engaged = true; + } + + return { engaged: this.engaged, ts: Date.now() }; + } +} From d96449a0facfe40d6985d4fad7d292a115e29545 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:35:53 +0100 Subject: [PATCH 03/11] feat(supervisor): apiserver metrics fetcher for cluster pod count --- apps/supervisor/src/clients/kubernetes.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index f66e57e4353..ad9ad85015d 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -53,3 +53,25 @@ function getKubeConfig() { } export { k8s }; + +/** + * Builds a function that scrapes the apiserver's Prometheus /metrics endpoint. + * One lightweight aggregate read - not a pod listing. Requires the service + * account to be granted GET on the /metrics non-resource URL. + */ +export function createApiserverMetricsFetcher(): () => Promise { + const kubeConfig = getKubeConfig(); + + return async () => { + const cluster = kubeConfig.getCurrentCluster(); + if (!cluster) { + throw new Error("no current cluster in kubeconfig"); + } + const requestInit = await kubeConfig.applyToFetchOptions({ method: "GET" }); + const response = await fetch(`${cluster.server}/metrics`, requestInit as unknown as RequestInit); + if (!response.ok) { + throw new Error(`apiserver /metrics scrape failed: ${response.status}`); + } + return response.text(); + }; +} From 8bb439ff2c0d8873c789bff41d71708519726d16 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:38:58 +0100 Subject: [PATCH 04/11] feat(supervisor): config for pod-count backpressure source --- apps/supervisor/src/env.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 99d440820a7..d3afdc1d102 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -73,6 +73,12 @@ const Env = z TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false), + // Backpressure signal source. "redis" reads a verdict from a Redis key; + // "k8s-pod-count" scrapes the cluster apiserver's total pod-object count and + // engages above ENGAGE, releasing below RELEASE (hysteresis). + TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: z.enum(["redis", "k8s-pod-count"]).default("redis"), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), From 0e12327bd703e89d160554c7a4747d1071960e0b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:52:06 +0100 Subject: [PATCH 05/11] feat(supervisor): select pod-count backpressure source, expose cluster pod-count gauge --- apps/supervisor/src/index.ts | 86 +++++++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 26 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 647549da7d1..d079013cc17 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -20,8 +20,8 @@ import { CheckpointClient, isKubernetesEnvironment, } from "@trigger.dev/core/v3/serverOnly"; -import { createK8sApi } from "./clients/kubernetes.js"; -import { collectDefaultMetrics, Histogram } from "prom-client"; +import { createK8sApi, createApiserverMetricsFetcher } from "./clients/kubernetes.js"; +import { collectDefaultMetrics, Gauge, Histogram } from "prom-client"; import { register } from "./metrics.js"; import { PodCleaner } from "./services/podCleaner.js"; import { FailedPodHandler } from "./services/failedPodHandler.js"; @@ -33,9 +33,13 @@ import { } from "./services/warmStartVerificationService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; import { Redis } from "ioredis"; -import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; +import { + BackpressureMonitor, + type BackpressureSignalSource, +} from "./backpressure/backpressureMonitor.js"; import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js"; +import { K8sPodCountSignalSource } from "./backpressure/k8sPodCountSignalSource.js"; import { fromContext, recordPhaseSince, @@ -214,24 +218,62 @@ class ManagedSupervisor { } if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { - this.backpressureRedis = new Redis({ - host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, - port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, - username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME, - password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, - ...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }), - maxRetriesPerRequest: null, - }); - this.backpressureRedis.on("error", (error) => - this.logger.error("Backpressure redis error", { error: error.message }) - ); + let source: BackpressureSignalSource; + + if (env.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "k8s-pod-count") { + if ( + env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= + env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE + ) { + throw new Error( + "TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE" + ); + } + const podCountGauge = new Gauge({ + name: "supervisor_cluster_pod_count", + help: "Total pod objects stored in the cluster, scraped for backpressure", + registers: [register], + }); + source = new K8sPodCountSignalSource({ + fetchMetrics: createApiserverMetricsFetcher(), + engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, + releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, + reportPodCount: (count) => podCountGauge.set(count), + }); + this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", { + engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, + release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + }); + } else { + this.backpressureRedis = new Redis({ + host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, + port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, + username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME, + password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, + ...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }), + maxRetriesPerRequest: null, + }); + this.backpressureRedis.on("error", (error) => + this.logger.error("Backpressure redis error", { error: error.message }) + ); + source = new RedisBackpressureSignalSource( + this.backpressureRedis, + env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY + ); + this.logger.log("🛑 Dequeue backpressure enabled (redis source)", { + key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + }); + } this.backpressureMonitor = new BackpressureMonitor({ enabled: true, - source: new RedisBackpressureSignalSource( - this.backpressureRedis, - env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY - ), + source, refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, @@ -239,14 +281,6 @@ class ManagedSupervisor { logger: this.logger, metrics: new BackpressureMetrics({ register }), }); - - this.logger.log("🛑 Dequeue backpressure enabled", { - key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, - refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, - maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, - rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, - dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, - }); } this.workerSession = new SupervisorSession({ From f7b3e7fb90d654ccf4e38410e505d597e1c4bdcd Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 24 Jun 2026 09:42:42 +0100 Subject: [PATCH 06/11] docs(supervisor): server-changes note for pod-count backpressure --- .server-changes/supervisor-pod-count-backpressure.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/supervisor-pod-count-backpressure.md diff --git a/.server-changes/supervisor-pod-count-backpressure.md b/.server-changes/supervisor-pod-count-backpressure.md new file mode 100644 index 00000000000..f45411b04a5 --- /dev/null +++ b/.server-changes/supervisor-pod-count-backpressure.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +The supervisor can pause dequeuing when the Kubernetes cluster is saturated, based on the cluster's total pod count. Opt-in and off by default. From e2eefd8502b8a8bb69e9425d84da39c62c1b5921 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 24 Jun 2026 09:51:11 +0100 Subject: [PATCH 07/11] fix(supervisor): source-aware backpressure config, 5s scrape interval, parser hardening --- .../k8sPodCountSignalSource.test.ts | 10 +++++ .../backpressure/k8sPodCountSignalSource.ts | 2 +- apps/supervisor/src/clients/kubernetes.ts | 1 + apps/supervisor/src/env.test.ts | 44 +++++++++++++++++++ apps/supervisor/src/env.ts | 9 +++- apps/supervisor/src/index.ts | 6 ++- 6 files changed, 67 insertions(+), 5 deletions(-) create mode 100644 apps/supervisor/src/env.test.ts diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts index fb26eb3c364..8857372a042 100644 --- a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.test.ts @@ -24,6 +24,16 @@ describe("parsePodCount", () => { it("throws when the pods metric is absent", () => { const text = 'apiserver_storage_objects{resource="configmaps"} 17'; + expect(() => parsePodCount(text)).toThrow(/not found/); + }); + + it("throws on a non-finite value (e.g. 1e999)", () => { + const text = 'apiserver_storage_objects{resource="pods"} 1e999'; + expect(() => parsePodCount(text)).toThrow(); + }); + + it("throws on a negative value", () => { + const text = 'apiserver_storage_objects{resource="pods"} -5'; expect(() => parsePodCount(text)).toThrow(); }); }); diff --git a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts index a9a7fbd2c86..e1fa0b78b11 100644 --- a/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts +++ b/apps/supervisor/src/backpressure/k8sPodCountSignalSource.ts @@ -1,7 +1,7 @@ import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js"; // Reads the apiserver's stored-pod-object count from a Prometheus /metrics scrape. -const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+-]+)/m; +const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+]+)/m; export function parsePodCount(metricsText: string): number { const match = metricsText.match(POD_COUNT_RE); diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index ad9ad85015d..45d5a09b005 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -68,6 +68,7 @@ export function createApiserverMetricsFetcher(): () => Promise { throw new Error("no current cluster in kubeconfig"); } const requestInit = await kubeConfig.applyToFetchOptions({ method: "GET" }); + // node-fetch vs DOM RequestInit: structurally compatible, declaration-only mismatch const response = await fetch(`${cluster.server}/metrics`, requestInit as unknown as RequestInit); if (!response.ok) { throw new Error(`apiserver /metrics scrape failed: ${response.status}`); diff --git a/apps/supervisor/src/env.test.ts b/apps/supervisor/src/env.test.ts new file mode 100644 index 00000000000..67435364fdd --- /dev/null +++ b/apps/supervisor/src/env.test.ts @@ -0,0 +1,44 @@ +import { describe, it, expect, vi } from "vitest"; + +// Mock std-env before importing env.ts so the module-level `Env.parse(stdEnv)` +// doesn't fail in a test environment that lacks required vars. +vi.mock("std-env", () => ({ + env: { + TRIGGER_API_URL: "http://localhost:3030", + TRIGGER_WORKER_TOKEN: "test-token", + MANAGED_WORKER_SECRET: "test-secret", + OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318", + }, +})); + +const { Env } = await import("./env.js"); + +// Minimal env that satisfies all required fields; everything else has defaults. +const base = { + TRIGGER_API_URL: "http://localhost:3030", + TRIGGER_WORKER_TOKEN: "test-token", + MANAGED_WORKER_SECRET: "test-secret", + OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318", +}; + +describe("Env superRefine - backpressure source awareness", () => { + it("accepts k8s-pod-count source without a Redis host", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", + TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "k8s-pod-count", + }) + ).not.toThrow(); + }); + + it("rejects redis source when Redis host is absent", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", + TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "redis", + }) + ).toThrow(); + }); +}); diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index d3afdc1d102..b7db7f18280 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -3,7 +3,7 @@ import { env as stdEnv } from "std-env"; import { z } from "zod"; import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; -const Env = z +export const Env = z .object({ // This will come from `spec.nodeName` in k8s TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), @@ -79,6 +79,7 @@ const Env = z TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: z.enum(["redis", "k8s-pod-count"]).default("redis"), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), @@ -332,7 +333,11 @@ const Env = z path: ["TRIGGER_WORKLOAD_API_DOMAIN"], }); } - if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) { + if ( + data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && + data.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "redis" && + !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST + ) { ctx.addIssue({ code: z.ZodIssueCode.custom, message: diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index d079013cc17..400cd26e3bd 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -219,8 +219,10 @@ class ManagedSupervisor { if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { let source: BackpressureSignalSource; + let refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS; if (env.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "k8s-pod-count") { + refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS; if ( env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE @@ -243,7 +245,7 @@ class ManagedSupervisor { this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", { engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, - refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + refreshIntervalMs, dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, }); } else { @@ -274,7 +276,7 @@ class ManagedSupervisor { this.backpressureMonitor = new BackpressureMonitor({ enabled: true, source, - refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + refreshIntervalMs, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, From 31799181e278e3651ac81d1d1d770e0502f73cb9 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 24 Jun 2026 11:46:53 +0100 Subject: [PATCH 08/11] feat(supervisor): evaluate backpressure sources independently and OR them --- apps/supervisor/src/env.test.ts | 8 +- apps/supervisor/src/env.ts | 12 +-- apps/supervisor/src/index.ts | 143 +++++++++++++++++--------------- 3 files changed, 80 insertions(+), 83 deletions(-) diff --git a/apps/supervisor/src/env.test.ts b/apps/supervisor/src/env.test.ts index 67435364fdd..fa92a34cd54 100644 --- a/apps/supervisor/src/env.test.ts +++ b/apps/supervisor/src/env.test.ts @@ -22,22 +22,20 @@ const base = { }; describe("Env superRefine - backpressure source awareness", () => { - it("accepts k8s-pod-count source without a Redis host", () => { + it("pod-count source can be enabled without a Redis host", () => { expect(() => Env.parse({ ...base, - TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", - TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "k8s-pod-count", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true", }) ).not.toThrow(); }); - it("rejects redis source when Redis host is absent", () => { + it("redis source requires a Redis host", () => { expect(() => Env.parse({ ...base, TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", - TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: "redis", }) ).toThrow(); }); diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index b7db7f18280..3a6cf691519 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -73,10 +73,8 @@ export const Env = z TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false), - // Backpressure signal source. "redis" reads a verdict from a Redis key; - // "k8s-pod-count" scrapes the cluster apiserver's total pod-object count and - // engages above ENGAGE, releasing below RELEASE (hysteresis). - TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE: z.enum(["redis", "k8s-pod-count"]).default("redis"), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: BoolEnv.default(false), + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN: BoolEnv.default(true), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000), @@ -333,11 +331,7 @@ export const Env = z path: ["TRIGGER_WORKLOAD_API_DOMAIN"], }); } - if ( - data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && - data.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "redis" && - !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST - ) { + if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) { ctx.addIssue({ code: z.ZodIssueCode.custom, message: diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 400cd26e3bd..19109140be9 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -33,10 +33,7 @@ import { } from "./services/warmStartVerificationService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; import { Redis } from "ioredis"; -import { - BackpressureMonitor, - type BackpressureSignalSource, -} from "./backpressure/backpressureMonitor.js"; +import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js"; import { K8sPodCountSignalSource } from "./backpressure/k8sPodCountSignalSource.js"; @@ -76,7 +73,7 @@ class ManagedSupervisor { private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; private readonly tracing?: OtlpTraceService; - private readonly backpressureMonitor?: BackpressureMonitor; + private readonly backpressureMonitors: BackpressureMonitor[] = []; private readonly backpressureRedis?: Redis; private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); @@ -217,71 +214,79 @@ class ManagedSupervisor { ); } + // Redis-verdict source (external aggregator). Keeps existing metric names. if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { - let source: BackpressureSignalSource; - let refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS; - - if (env.TRIGGER_DEQUEUE_BACKPRESSURE_SOURCE === "k8s-pod-count") { - refreshIntervalMs = env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS; - if ( - env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= - env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE - ) { - throw new Error( - "TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE" - ); - } - const podCountGauge = new Gauge({ - name: "supervisor_cluster_pod_count", - help: "Total pod objects stored in the cluster, scraped for backpressure", - registers: [register], - }); - source = new K8sPodCountSignalSource({ - fetchMetrics: createApiserverMetricsFetcher(), - engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, - releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, - reportPodCount: (count) => podCountGauge.set(count), - }); - this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", { - engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, - release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, - refreshIntervalMs, - dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, - }); - } else { - this.backpressureRedis = new Redis({ - host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, - port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, - username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME, - password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, - ...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }), - maxRetriesPerRequest: null, - }); - this.backpressureRedis.on("error", (error) => - this.logger.error("Backpressure redis error", { error: error.message }) - ); - source = new RedisBackpressureSignalSource( - this.backpressureRedis, - env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY - ); - this.logger.log("🛑 Dequeue backpressure enabled (redis source)", { - key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, + this.backpressureRedis = new Redis({ + host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, + port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, + username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME, + password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, + ...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }), + maxRetriesPerRequest: null, + }); + this.backpressureRedis.on("error", (error) => + this.logger.error("Backpressure redis error", { error: error.message }) + ); + this.backpressureMonitors.push( + new BackpressureMonitor({ + enabled: true, + source: new RedisBackpressureSignalSource( + this.backpressureRedis, + env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY + ), refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, - }); - } - - this.backpressureMonitor = new BackpressureMonitor({ - enabled: true, - source, - refreshIntervalMs, - maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, - rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + logger: this.logger, + metrics: new BackpressureMetrics({ register }), + }) + ); + this.logger.log("🛑 Dequeue backpressure enabled (redis source)", { + key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, - logger: this.logger, - metrics: new BackpressureMetrics({ register }), + }); + } + + // Pod-count source (in-process apiserver scrape). Namespaced metrics so the + // redis source's metric names are preserved. + if (env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED) { + if ( + env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= + env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE + ) { + throw new Error( + "TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE" + ); + } + const podCountGauge = new Gauge({ + name: "supervisor_cluster_pod_count", + help: "Total pod objects stored in the cluster, scraped for backpressure", + registers: [register], + }); + this.backpressureMonitors.push( + new BackpressureMonitor({ + enabled: true, + source: new K8sPodCountSignalSource({ + fetchMetrics: createApiserverMetricsFetcher(), + engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, + releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, + reportPodCount: (count) => podCountGauge.set(count), + }), + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN, + logger: this.logger, + metrics: new BackpressureMetrics({ register, prefix: "supervisor_backpressure_pod_count" }), + }) + ); + this.logger.log("🛑 Dequeue backpressure enabled (pod-count source)", { + engage: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, + release: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN, }); } @@ -308,14 +313,14 @@ class ManagedSupervisor { dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR, // Freeze scale-up while backpressure is hard-engaged (not during the resume // ramp). Undefined when backpressure is disabled → no effect on scaling. - shouldPauseScaling: () => this.backpressureMonitor?.isEngaged() ?? false, + shouldPauseScaling: () => this.backpressureMonitors.some((m) => m.isEngaged()), }, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS, preDequeue: async () => { - // Synchronous, hot-path-safe cached read; undefined when backpressure is disabled. - const skipForBackpressure = this.backpressureMonitor?.shouldSkipDequeue() ?? false; + // Synchronous, hot-path-safe cached read; false when no monitors are active. + const skipForBackpressure = this.backpressureMonitors.some((m) => m.shouldSkipDequeue()); if (!env.RESOURCE_MONITOR_ENABLED || this.isKubernetes) { // Resource monitor is not used in k8s; backpressure is the only gate there. @@ -710,7 +715,7 @@ class ManagedSupervisor { this.logger.log("Starting up"); // Optional services - this.backpressureMonitor?.start(); + this.backpressureMonitors.forEach((m) => m.start()); await this.podCleaner?.start(); await this.failedPodHandler?.start(); await this.metricsServer?.start(); @@ -738,7 +743,7 @@ class ManagedSupervisor { await this.workerSession.stop(); // Optional services - this.backpressureMonitor?.stop(); + this.backpressureMonitors.forEach((m) => m.stop()); await this.backpressureRedis?.quit(); await this.podCleaner?.stop(); await this.failedPodHandler?.stop(); From a3e578be27735fba88d1958db0a4a5706bc0ecac Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 24 Jun 2026 11:49:05 +0100 Subject: [PATCH 09/11] test(supervisor): cover both backpressure sources enabled together --- apps/supervisor/src/env.test.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/supervisor/src/env.test.ts b/apps/supervisor/src/env.test.ts index fa92a34cd54..10d1aaf4cb2 100644 --- a/apps/supervisor/src/env.test.ts +++ b/apps/supervisor/src/env.test.ts @@ -39,4 +39,15 @@ describe("Env superRefine - backpressure source awareness", () => { }) ).toThrow(); }); + + it("both sources can be enabled together (with a Redis host)", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true", + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST: "localhost", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true", + }) + ).not.toThrow(); + }); }); From 0160156b8a7c48ec99512f6b404febdf80d6c45a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 24 Jun 2026 12:04:40 +0100 Subject: [PATCH 10/11] fix(supervisor): scrape apiserver /metrics over https so TLS verifies against the cluster CA --- apps/supervisor/src/clients/kubernetes.ts | 41 +++++++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index 45d5a09b005..5951d89b93f 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -1,3 +1,4 @@ +import * as https from "node:https"; import * as k8s from "@kubernetes/client-node"; import { Informer } from "@kubernetes/client-node"; import { ListPromise } from "@kubernetes/client-node"; @@ -67,12 +68,38 @@ export function createApiserverMetricsFetcher(): () => Promise { if (!cluster) { throw new Error("no current cluster in kubeconfig"); } - const requestInit = await kubeConfig.applyToFetchOptions({ method: "GET" }); - // node-fetch vs DOM RequestInit: structurally compatible, declaration-only mismatch - const response = await fetch(`${cluster.server}/metrics`, requestInit as unknown as RequestInit); - if (!response.ok) { - throw new Error(`apiserver /metrics scrape failed: ${response.status}`); - } - return response.text(); + const url = new URL(`${cluster.server}/metrics`); + const opts: https.RequestOptions = { + method: "GET", + protocol: url.protocol, + hostname: url.hostname, + port: url.port, + path: url.pathname, + }; + // applyToHTTPSOptions sets the cluster CA, client cert/key, and auth headers + // (incl. exec plugins) on the request - so TLS verifies against the cluster + // CA, not the system store. The fetch-options path attaches the CA as an + // https.Agent, which global fetch (undici) ignores. + await kubeConfig.applyToHTTPSOptions(opts); + + return new Promise((resolve, reject) => { + const req = https.request(opts, (res) => { + const status = res.statusCode ?? 0; + let body = ""; + res.setEncoding("utf8"); + res.on("data", (chunk) => { + body += chunk; + }); + res.on("end", () => { + if (status >= 200 && status < 300) { + resolve(body); + } else { + reject(new Error(`apiserver /metrics scrape failed: ${status}`)); + } + }); + }); + req.on("error", reject); + req.end(); + }); }; } From 33929da3034780a79a7074f862787210e2d544d3 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 24 Jun 2026 15:20:51 +0100 Subject: [PATCH 11/11] fix(supervisor): scrape timeout + schema-level pod-count hysteresis guard --- apps/supervisor/src/clients/kubernetes.ts | 7 ++++++- apps/supervisor/src/env.test.ts | 11 +++++++++++ apps/supervisor/src/env.ts | 19 +++++++++++++++++++ apps/supervisor/src/index.ts | 13 ++++--------- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index 5951d89b93f..129ff32b6ec 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -60,7 +60,7 @@ export { k8s }; * One lightweight aggregate read - not a pod listing. Requires the service * account to be granted GET on the /metrics non-resource URL. */ -export function createApiserverMetricsFetcher(): () => Promise { +export function createApiserverMetricsFetcher(timeoutMs: number): () => Promise { const kubeConfig = getKubeConfig(); return async () => { @@ -98,6 +98,11 @@ export function createApiserverMetricsFetcher(): () => Promise { } }); }); + // Without this a hung connect/TLS/read never settles, and the monitor's + // refreshInFlight guard would freeze the source (silent fail-open). + req.setTimeout(timeoutMs, () => { + req.destroy(new Error(`apiserver /metrics scrape timed out after ${timeoutMs}ms`)); + }); req.on("error", reject); req.end(); }); diff --git a/apps/supervisor/src/env.test.ts b/apps/supervisor/src/env.test.ts index 10d1aaf4cb2..b02d117513c 100644 --- a/apps/supervisor/src/env.test.ts +++ b/apps/supervisor/src/env.test.ts @@ -50,4 +50,15 @@ describe("Env superRefine - backpressure source awareness", () => { }) ).not.toThrow(); }); + + it("rejects pod-count release >= engage when the source is enabled", () => { + expect(() => + Env.parse({ + ...base, + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: "100", + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: "100", + }) + ).toThrow(); + }); }); diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 3a6cf691519..10e8f9b62b3 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -78,6 +78,13 @@ export const Env = z TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000), TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000), + // Hard timeout on the apiserver /metrics scrape. A hung request would otherwise + // never settle and freeze the monitor's refresh loop (fail-open silently). + TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_SCRAPE_TIMEOUT_MS: z.coerce + .number() + .int() + .positive() + .default(10_000), // Optional services TRIGGER_WARM_START_URL: z.string().optional(), @@ -317,6 +324,18 @@ export const Env = z TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false), }) .superRefine((data, ctx) => { + if ( + data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED && + data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= + data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE + ) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: + "TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE", + path: ["TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE"], + }); + } if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { ctx.addIssue({ code: z.ZodIssueCode.custom, diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 19109140be9..d2bc82c13b9 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -252,14 +252,7 @@ class ManagedSupervisor { // Pod-count source (in-process apiserver scrape). Namespaced metrics so the // redis source's metric names are preserved. if (env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED) { - if ( - env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >= - env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE - ) { - throw new Error( - "TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE" - ); - } + // RELEASE < ENGAGE is enforced in env.ts (superRefine), so it's valid here. const podCountGauge = new Gauge({ name: "supervisor_cluster_pod_count", help: "Total pod objects stored in the cluster, scraped for backpressure", @@ -269,7 +262,9 @@ class ManagedSupervisor { new BackpressureMonitor({ enabled: true, source: new K8sPodCountSignalSource({ - fetchMetrics: createApiserverMetricsFetcher(), + fetchMetrics: createApiserverMetricsFetcher( + env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_SCRAPE_TIMEOUT_MS + ), engageThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE, releaseThreshold: env.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE, reportPodCount: (count) => podCountGauge.set(count),