feat: dynamic tasksets (lazy/infinite load_tasks, pull-based env server)#1829
feat: dynamic tasksets (lazy/infinite load_tasks, pull-based env server)#1829mikasenghaas wants to merge 29 commits into
Conversation
Overload Taskset.load_tasks from `-> list[TaskT]` to `-> Iterable[TaskT]` so a taskset can yield tasks from a generator (lazily-built or unbounded), not just return a fixed list. Add `select_tasks`, which draws only the tasks a run needs: without `--shuffle` it islices the first `--num-tasks` straight off the generator, so an eval on a subset never materializes the rest. The in-process eval runner and the validate entrypoint now select through it; the index-addressed env-server still materializes (`list()`) to fix its index range, so it needs a finite taskset. Convert the two built-in generative tasksets: - textarena_v1: yield seeded episodes (deepcopy + reset per seed-specific game) - harbor_v1: parse each task.toml/instruction.md lazily textarena WordLadder (-n 5 of num_tasks=1000): load_tasks 92s -> 1.1s (84.7x), the selected tasks byte-identical to the eager path; a live gpt-4.1-mini Wordle eval builds only the 2 episodes it runs and scores them normally. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A truly non-terminating `load_tasks` would hang the moment something materializes it: `select_tasks` does `list()` to shuffle, and the index-addressed env-server `list()`s unconditionally to fix its index range. Worse, `--num-tasks 5 --shuffle` hangs too — the `list()` runs before the slice. Add a `Taskset.UNBOUNDED` class var (mirrors `NEEDS_CONTAINER`). When set, `select_tasks` refuses `--shuffle` (can't sample without reading the whole stream; moot anyway — the first `-n` seeds are already an arbitrary sample) and requires `--num-tasks` (else there's nothing to bound), and the env-server refuses to serve it — all up front with a clear error instead of hanging. Finite tasksets are unaffected; the two built-ins stay finite (textarena caps at num_tasks, harbor reads a fixed dataset). Documented in GUIDE.md (capability flags + loading tasks). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
wikispeedia's (source, target) pair space is effectively infinite, so model it as one: sample_pairs is now an unbounded generator and the taskset sets UNBOUNDED. Drop the config's num_tasks (the eval's -n bounds the draw) and document `seed` as the knob that reshuffles which pairs a run gets - the reproducible stand-in for --shuffle, which UNBOUNDED refuses. Also trim two over-long doc/comment lines (the UNBOUNDED docstring and harbor's load_tasks comment). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Decouple the env-server from a fixed dataset length so it can serve unbounded tasksets. It no longer materializes `load_tasks` up front: a list (free length) is still materialized and its count reported, but a generator is consumed lazily by index (`_task` extends a cache on demand) and reports no count. `InfoResponse.num_tasks` is now `int | None` (None = lazy/unbounded — the caller drives `task_idx`). run_eval_server adapts: with a reported count it behaves as before; for a lazy taskset it requires -n/--num-tasks and draws the first N indices. The v0 bridge (always a finite dataset) reports its length unchanged. This removes the earlier UNBOUNDED-refusal in the server — an unbounded taskset is now served by index rather than rejected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Move the env-server's index-addressed task resolution off the server into an `IndexedTasks` helper in taskset.py, next to `select_tasks` (its sibling "consume load_tasks" helper): a list is materialized + counted, a generator is consumed lazily and cached by index with count=None. The server just holds one and indexes it, so its rollout/serving paths read plainly. Also fix GUIDE.md: the env-server now *serves* unbounded tasksets (lazily by index, no count) rather than refusing them, and drop the "shuffle is moot" aside. Use wikispeedia_v1 as the UNBOUNDED worked example (textarena_v1 is a lazy *finite* generator). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts: # environments/wikispeedia_v1/wikispeedia_v1/taskset.py # verifiers/v1/tasksets/harbor/taskset.py
An UNBOUNDED taskset still can't be shuffled (it would have to materialize a non-terminating load_tasks), but ignore --shuffle with a warning rather than hard-failing the run. A missing --num-tasks cap still raises (it would hang). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…askset - IndexedTasks materializes any finite taskset (a list OR a finite generator) and reports a real count; only an UNBOUNDED taskset is served lazily by index with num_tasks=None. So a lazy-but-finite generator is no longer mistaken for unbounded (it now gets enumerated/shuffled/epoched correctly), and the "pass -n" error and the lazy-shuffle warning accurately say "unbounded". - IndexedTasks asserts the taskset yields >=1 task (clear error otherwise); the env-server's shared-toolset sample can then drop its empty-taskset fallback. - Consolidate the reproducible shuffle seed into taskset.SHUFFLE_SEED (select_tasks default + the env-server index shuffle both use it; drop runner's _SHUFFLE_SEED). - GUIDE: capability-flag tables (taskset + harness use the same Var/Default/Description shape); document the two UNBOUNDED rules (-n required for eval/validate; --shuffle ignored with a warning, vary the stream via a config seed instead). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The env server no longer addresses tasks by index. `run_rollout()`/`run_group(n)` carry no task_idx; the server holds a cursor and hands out the next task. Task ordering — shuffle + epoch looping — moves out of the caller and into the server: - finite taskset: served as a (re)shuffled permutation that loops over epochs (reshuffled each epoch; seed fixed, not configurable, like eval). A finite eval bounds itself to num_tasks so it never wraps; training pulls forever. - unbounded taskset: pulled lazily from its generator (which owns its order; shuffle is a no-op). `EnvConfig.shuffle` (default True) controls it; `EvalConfig` keeps shuffle=False (deterministic, lazy eval). The served task is identified by `trace.task.idx`. The eval --server path and the v0 legacy bridge both pull via the shared cursor. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…t < num_rollouts - EnvConfig.shuffle defaults to False (deterministic, matching in-process eval), so a direct server start no longer silently shuffles a finite taskset. prime-rl's train envs opt in (shuffle=True); eval stays deterministic. - run_eval_server: a group's num_rollouts run together in one indivisible run_group request, so max_concurrent can't bound below one group — warn instead of silently exceeding the cap when max_concurrent < num_rollouts. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add sample() -> Task (server pulls via its cursor; shuffle/epoch stay server-owned) and restore run_rollout(task) running a given task. A caller that wants concurrency control at the rollout level pulls one task per group with sample(), then issues group_size individual run_rollout(task) requests — so each rollout is one permit, freed on completion (no straggler holding a whole group's permits). run_group(n) stays task-less for group-scored envs (they must run+score the group together). The v0 bridge carries only the idx in the sampled task (it holds the dataset, re-fetches by idx). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
sample() is now the only place the cursor advances — both run_rollout(task) and run_group(task, n) take the sampled task (typed WireTask, not a dict). The eval --server unit samples then run_groups; the v0 bridge reads task.idx. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…quired) The v0 bridge's sample() built WireTask(idx=...) without the required prompt field, so sample() failed validation and the orchestrator got no task (0 inflight). Set prompt=None. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…zes finite Pull is sequential (sample() advances; tasks are echoed back by value), so the server never re-reads a past index — IndexedTasks's cache-by-arbitrary-index was vestigial (and its unbounded cache grew forever). The server now holds a materialized list for a finite taskset (shuffled-permutation random access + count) and streams next() for an unbounded one (a one-task peek for non-empty + serving + task type). Removes the cache leak + the eviction TODO. _next_index is finite-only now (native-finite + the v0 bridge).
…duplicate tasks) Each pool worker is its own EnvServer with its own task cursor (seeded identically), but the broker load-balanced every request — so with >1 worker, sample() hit different workers' cursors, each producing the same shuffled permutation: duplicate tasks served + tasks dropped (eval --server with a static pool always; elastic after scale-up; multi-worker training under load). Pin sample() to worker 0 (the cursor owner; the pool is upscale-only so it's stable; sample counts 0 rollout-slots). run_rollout/run_group carry the task, so they stay load-balanced. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Single peek/non-empty check for both finite and infinite tasksets; branch only on stream-vs-materialize. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replaces three field_serializers that called model_dump() with the idiomatic SerializeAsAny annotation (same as EnvConfig.taskset); a concrete taskset Task dumps its own fields instead of the base WireTask schema. Behavior-identical. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- TasksetConfig.shuffle: ShuffleConfig | None (None = serve in load order). Moved off EnvConfig; the eval/validate -s/--shuffle flag is removed (use --taskset.shuffle). - Taskset.seed exposed (framework-set, offset per env-server) so an INFINITE taskset's load_tasks can seed its generation and a multi-worker pool draws divergent streams. - Each pool worker offsets the seed by its index; the broker load-balances sample when shuffling (divergent streams) and pins it to worker 0 only when not (single cursor). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Mirror the native server's empty-taskset check; an empty v0 dataset previously crashed with ZeroDivisionError (cursor % 0) on the first sample. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Drop the SHUFFLE_SEED module constant (ShuffleConfig.seed defaults to 0 inline). - Taskset no longer sets a .seed attribute; INFINITE generators read self.config.shuffle.seed (may be None), which the env-server offsets per worker via config. - select_tasks moves from a free function to Taskset.select_tasks. - env-server payloads type traces as WireTrace; drop verbose request-field docstrings. - rename the per-worker seed offset param seed_offset -> idx. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…0 pin The pool broker stamps a global cursor index onto each sample request; finite workers share the shuffle seed so their permutations agree, so any worker can serve any index — sample now load-balances like a rollout (no worker-0 pin). A lone server uses its own cursor. INFINITE tasksets still offset the seed per worker (forward streams the broker can't index). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| self._order_epoch = epoch | ||
| return self._order[pos] | ||
|
|
||
| def _next_task(self, index: int | None): |
There was a problem hiding this comment.
🟠 High serve/server.py:190
When self._count is None (infinite taskset), _next_task ignores the broker-supplied index and instead returns next(self._iter) from the worker's private generator. In a multi-worker pool, the broker stamps each sample request with a global cursor so any worker can serve the same logical position, but for infinite tasksets the returned task depends on how many prior requests happened to land on that worker — not on the cursor. Two sample requests with the same index served by different workers return different tasks, and the global ordering the cursor is meant to enforce is lost. If streaming without indexing is intentional for infinite tasksets, consider documenting this limitation at the _next_task level so the broker/pool layer can avoid stamping a meaningless index, or adjust the pool to route infinite tasksets without a global cursor.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @verifiers/v1/serve/server.py around line 190:
When `self._count is None` (infinite taskset), `_next_task` ignores the broker-supplied `index` and instead returns `next(self._iter)` from the worker's private generator. In a multi-worker pool, the broker stamps each `sample` request with a global cursor so any worker can serve the same logical position, but for infinite tasksets the returned task depends on how many prior requests happened to land on that worker — not on the cursor. Two `sample` requests with the same `index` served by different workers return different tasks, and the global ordering the cursor is meant to enforce is lost. If streaming without indexing is intentional for infinite tasksets, consider documenting this limitation at the `_next_task` level so the broker/pool layer can avoid stamping a meaningless index, or adjust the pool to route infinite tasksets without a global cursor.
| random.Random(f"{self._seed}-{epoch}").shuffle(self._order) | ||
| self._order_epoch = epoch |
There was a problem hiding this comment.
🟡 Medium serve/server.py:186
_index_at seeds the first epoch's shuffle with random.Random(f"{self._seed}-0") instead of random.Random(self._seed), so a finite taskset's first-epoch order in server mode differs from the in-process path (e.g. Taskset.select_tasks), which seeds with random.Random(seed). The same shuffle.seed therefore produces different task orderings depending on execution mode, breaking reproducibility across eval paths. Consider seeding epoch 0 with self._seed directly and only appending the epoch for subsequent epochs.
- if self._shuffle:
- random.Random(f"{self._seed}-{epoch}").shuffle(self._order)
+ if self._shuffle:
+ seed = self._seed if epoch == 0 else f"{self._seed}-{epoch}"
+ random.Random(seed).shuffle(self._order)🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @verifiers/v1/serve/server.py around lines 186-187:
`_index_at` seeds the first epoch's shuffle with `random.Random(f"{self._seed}-0")` instead of `random.Random(self._seed)`, so a finite taskset's first-epoch order in server mode differs from the in-process path (e.g. `Taskset.select_tasks`), which seeds with `random.Random(seed)`. The same `shuffle.seed` therefore produces different task orderings depending on execution mode, breaking reproducibility across eval paths. Consider seeding epoch 0 with `self._seed` directly and only appending the epoch for subsequent epochs.
| class RunRolloutRequest(BaseRequest): | ||
| method: ClassVar[str] = "run_rollout" | ||
| task_idx: int | ||
| task: WireTask |
There was a problem hiding this comment.
🟡 Medium serve/types.py:65
RunRolloutRequest.task accepts a full WireTask from the caller, and the server validates and executes it directly (self._task_type.model_validate(req.task.model_dump()) → env.episode(task, ...)). This lets a caller modify prompt, task-specific answer fields, or image/workdir before calling run_rollout, bypassing the server-owned taskset and producing tampered evaluation results. The request should carry only an index so the server resolves the canonical task from its own taskset, not the caller-supplied object.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @verifiers/v1/serve/types.py around line 65:
`RunRolloutRequest.task` accepts a full `WireTask` from the caller, and the server validates and executes it directly (`self._task_type.model_validate(req.task.model_dump())` → `env.episode(task, ...)`). This lets a caller modify `prompt`, task-specific answer fields, or `image`/`workdir` before calling `run_rollout`, bypassing the server-owned taskset and producing tampered evaluation results. The request should carry only an index so the server resolves the canonical task from its own taskset, not the caller-supplied object.
Summary
Overload
Taskset.load_tasksfrom-> list[TaskT]to-> Iterable[TaskT]so a taskset canyieldits tasks from a generator — lazily built, or genuinely infinite — instead of only returning a fixed list. Alistis still a validIterable, so existing tasksets are unchanged.Add
verifiers.v1.taskset.select_tasks(taskset, num_tasks, shuffle), which materializes only the tasks a run needs. Without--shuffleititertools.islices the first--num-tasksstraight offload_tasks, so an infinite generator stops after the subset and the rest is never built. (--shufflehas to see the whole set to sample from it, so it materializes first.) The shuffle seed is a fixed module constant (_SHUFFLE_SEED = 0), not a config field — matching the eval path, which never exposed a seed knob either.Add a
Taskset.INFINITEcapability flag (aClassVar[bool], mirrorsNEEDS_CONTAINER) for aload_tasksthat may never terminate. A barelist()/--shuffleon such a taskset would hang — and--num-tasks N --shufflehangs too, since thelist()runs before the slice. With the flag set,select_tasksrequires--num-tasks(raises if missing — it would hang otherwise) and ignores--shufflewith a warning rather than hard-failing the run. (--shuffleis moot anyway for a generated taskset: the first-nitems are already an arbitrary sample.)Make the env server pull-based — it owns task scheduling. Three RPCs replace the old index-addressed
run_rollout(task_idx)/run_group(task_idx, n):sample() -> Taskadvances the server's cursor and returns the next task (by value, as aWireTask).run_rollout(task) -> Traceruns one rollout of a sampled task.run_group(task, n) -> list[Trace]runsnrollouts of one sampled task (for group-scored envs).The caller samples a task, then runs it — it never chooses which task by index. Shuffle + epoch live on the server (
EnvConfig.shuffle, defaultFalse— deterministic, like in-process eval; prime-rl's train envs override toTrue; seed fixed, not configurable): a finite taskset is materialized once and served as a reshuffled permutation that loops over epochs (_SHUFFLE_SEED + epoch); anINFINITEone is pulled lazily from its generator with a one-task look-ahead (shuffle a no-op, warned at startup).InfoResponse.num_tasksis the count (None⟺INFINITE) and only bounds a finite eval. The eval--serverpath and the v0 legacy bridge both pull via these RPCs.The pool broker owns a global task cursor.
EnvServerPoolstamps a monotonicindexonto eachsamplerequest; finite workers share the shuffle seed, so their permutations agree and any worker can serve any index. Sosampleload-balances like a rollout — no worker pinning — while coverage stays exact (each task once per epoch across the pool). A lone server (max_workers <= 1, no broker) falls back to its own cursor.run_rollout/run_groupcarry the task and route least-busy as before.Shuffle policy moves to
TasksetConfig.shuffle: ShuffleConfig | None(offEnvConfig).None(default) serves in load order (deterministic — what eval wants); aShuffleConfig(aseed) shuffles a finite taskset (reshuffled each epoch) and seeds anINFINITEtaskset's generation. AnINFINITEload_tasksreads the seed viaself.config.shuffle.seed(may beNone); since infinite streams can't be index-addressed by the broker, the env-server offsets that seed by the worker index so a multi-worker pool draws divergent streams. Theeval/validate-s/--shuffleflag is removed; configure via--taskset.shuffle.select_tasksis now aTasksetmethod.Wire it into the eval + validate entrypoints. The in-process eval runner (
run_eval, the defaultvf evalpath) and thevalidateentrypoint select throughselect_tasks(lazyislice), replacing the old "load all → shuffle →[:num_tasks]" slice; the--serverpath pulls task-by-task, bounded by the reported count /-n.Convert the two built-in generative tasksets to the generator form:
textarena— yield one seeded episode per index; for seed-specific games (WordLadder, WordSearch) each episode is adeepcopy(template) + reset(), now built only for the episodes a run touches.harbor— keep the dataset download + dir listing up front (needed to enumerate idxs), but parse eachtask.toml/instruction.mdlazily.Dataset-backed tasksets (gsm8k, reverse_text, alphabet_sort, …) are intentionally left as lists: their dominant setup cost is the eager
load_dataset, so laziness wouldn't help.Update the v1
GUIDE.md"Loading tasks" section with the lazy/infinite pattern, the twoINFINITErules (needs--num-tasks; can't shuffle), and capability-flag tables.Verification
Setup time —
load_tasksfortextarenaatnum_tasks=1000, selecting a subset of 5 (eager "build all then slice" vs. lazyselect_tasks), nltk corpora pre-warmed:-n 5)WordLadder-v0WordSearch-v0Wordle-v0The lazily-drawn subset is byte-identical (
model_dump()) to the first 5 of the full eager build in every case; seed-invariant games (Wordle) correctly show no change, since the prompt is built once regardless.Still runs correctly — a live
gpt-4.1-miniWordle eval through the realrun_eval(num_tasks=1000,-n 2 -r 1) built exactly 2 tasks from the generator (not 1000) and scored both rollouts normally:An
INFINITEtaskset overitertools.count()confirms the guards:select_tasks(num_tasks=5)returns 5 (no hang);--shuffleis ignored with a warning (still returns the first 5); a missing--num-tasksraises a clearValueError; finite tasksets (all / cap / shuffle) are unaffected.Pull protocol — a live
--servereval (i3-logic-v1,deepseek-v4-flash,-n 4) drove rollouts entirely through the pull RPCs: the server reported its count,sample()handed out tasks0,1,2,3in order (evalshuffle=False, deterministic), and eachTracecarried the served idx — 0 errors. A scheduler unit test covers finite shuffle + per-epoch reshuffle (reproducible), finite in-order looping, eval-subset uniqueness (pull ≤ count never wraps), and infinite monotonic pulls.Multi-worker cursor (no pin) — a 2-worker static pool, no shuffle, covered tasks
0–5exactly once (broker-cursor coverage without pinning); the same pool driving a reverse_text training run (4 steps,group_size=16) produced 8 distinct tasks × 16 rollouts per step, shuffled, with no duplication across steps and 0 errors (one coherent global cursor across workers).ruff check/ruff formatclean;tests/v1/test_configs.pyandtests/v1/test_trace.pypass.Breaking
Taskset.load_tasksreturn annotation is nowIterable[TaskT](waslist[TaskT]). Returning aliststill satisfies it, so existing tasksets need no change; only code that consumedload_tasks()and relied on it being a concretelist(indexing,len) must wrap it — the two in-tree consumers (select_tasks, the env-server) already do.InfoResponse.num_tasksis nowint | None(wasint);None⟺ the taskset isINFINITE. Consumers that assumed anintmust handleNone—run_eval_serverdoes; prime-rl's orchestrator gets the matching change in its companion PR.RunRolloutRequest/RunGroupRequest(andEnvClient.run_rollout(task_idx)/run_group(task_idx, n)) are replaced bysample() -> Taskplusrun_rollout(task)/run_group(task, n)(task carried by value). A caller that addressed tasks by index must insteadsample()then run; the served task's idx is on the returnedTrace. (Breaks the index-addressed orchestrator — coordinated with the prime-rl companion PR.)TasksetConfig.shuffle: ShuffleConfig | None(defaultNone= deterministic), notEnvConfig.shuffle: bool. Theeval/validate-s/--shuffleflag is removed — use--taskset.shuffle(e.g.--taskset.shuffle.seed 0). Consumers that want shuffled training set aShuffleConfig. Theselect_tasksfree function is now theTaskset.select_tasksmethod, and theSHUFFLE_SEEDconstant /Taskset.seedattribute are gone (read the seed offconfig.shuffle).