Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
84742cb
feat(configurator): env_params as first-class trial-identity field
rutayan-nv Jun 20, 2026
7d68edf
refactor(configurator): env_params as cmd_args annotation, excluded f…
rutayan-nv Jun 23, 2026
bf2b690
fix(configurator): tighten env_params validation, align env.csv, fix …
rutayan-nv Jun 24, 2026
c341433
fix(workload): reject empty env_params candidate lists at build time
rutayan-nv Jun 24, 2026
7ac83fd
refactor(configurator): model env_params as cohesive EnvParam/EnvPara…
rutayan-nv Jun 24, 2026
7fdae40
refactor(configurator): collapse env_params sink into one concrete class
rutayan-nv Jun 25, 2026
f4c93e3
refactor(configurator): gate env_params sampling on an agent capabili…
rutayan-nv Jun 25, 2026
a7c6587
style(configurator): trim verbose comments toward self-documenting code
rutayan-nv Jun 25, 2026
822e8a3
fix(test_scenario): skip env_params on post-overlay revalidation
rutayan-nv Jun 25, 2026
1ecadee
fix(workload): reject scalar-only env_params annotations at parse time
rutayan-nv Jun 26, 2026
6dbe05e
refactor(env_params): thread per-trial sample as a local; split DR pr…
rutayan-nv Jun 29, 2026
ee55ba4
refactor(test_scenario): hoist Registry import to module level
rutayan-nv Jun 29, 2026
d02769e
fix(workload): reject singleton env_params candidate lists at parse time
rutayan-nv Jun 29, 2026
2437aa7
refactor(configurator): rename agent flag samples_env_params -> suppo…
rutayan-nv Jun 30, 2026
7c6033a
feat(configurator): add ObsLeafDescriptor + structured-observation pr…
rutayan-nv Jun 16, 2026
b6ca6ca
test(env-params): suppress pyright on intentional ObsLeafDescriptor r…
rutayan-nv Jun 16, 2026
03bd768
feat(configurator): add GymnasiumAdapter for CloudAI envs
rutayan-nv Jun 16, 2026
d30ba3b
fix(gymnasium-adapter): accept continuous actions in step() type sign…
rutayan-nv Jun 16, 2026
16696a7
fix(gymnasium-adapter): validate structured-observation key parity; p…
rutayan-nv Jun 16, 2026
1c74992
refactor(configurator): route gymnasium import through LazyImports si…
rutayan-nv Jun 16, 2026
07d1c8d
fix(configurator): make lazy-gymnasium refactor land CI-green
rutayan-nv Jun 16, 2026
610a7d5
feat(configurator): make GymnasiumAdapter inherit gymnasium.Env
rutayan-nv Jun 21, 2026
f34ed3b
test(gym): cast action_space to Dict in adapter contract test
rutayan-nv Jun 21, 2026
ef34aae
feat(configurator): add GymnasiumAdapter.encode_action as public inve…
rutayan-nv Jun 21, 2026
04d067d
style(configurator): ruff format gymnasium_adapter.py
rutayan-nv Jun 24, 2026
49c1bbf
refactor(gymnasium-adapter): defer continuous-action support with Con…
rutayan-nv Jun 25, 2026
39491c8
feat(configurator): add online GymServer mode to CloudAIGymEnv
rutayan-nv Jun 16, 2026
9d3012b
fix(cloudai-gym): validate env_class is a dotted import path before r…
rutayan-nv Jun 16, 2026
ba89fff
fix(configurator): make online trajectory step numbering monotonic
rutayan-nv Jun 21, 2026
b4cee9b
fix(configurator): align env.csv path with trajectory.csv in online mode
rutayan-nv Jun 25, 2026
ec0f182
feat(cli): route live_rl_mode jobs through the DSE handler
rutayan-nv Jun 16, 2026
f399f73
test(configurator): live-RL needs no action space; env_params alone i…
rutayan-nv Jun 16, 2026
627c284
fix(cli): guard live_rl_mode + single_sbatch (incompatible)
rutayan-nv Jun 20, 2026
0bf6e70
test(configurator): update live-RL env_params test to EnvParams model
rutayan-nv Jun 25, 2026
42895dc
docs(env_params): update validate_domain_randomization_active docstri…
rutayan-nv Jun 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ requires-python = ">=3.10"
"import-linter~=2.10",
"pytest-deadfixtures~=3.1",
"taplo~=0.9.3",
"gymnasium~=1.2",
]
rl = ["gymnasium~=1.2"]
docs = [
"sphinx~=8.1",
"nvidia-sphinx-theme~=0.0.8",
Expand Down
57 changes: 53 additions & 4 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import TYPE_CHECKING, Any, List, Optional, Set, Type, TypeAlias, Union

from ..util import flatten_dict
from .registry import Registry
from .system import System

if TYPE_CHECKING:
Expand Down Expand Up @@ -140,6 +141,43 @@ def get_metric_value(self, system: System, metric: str) -> MetricValue:
def is_dse_job(self) -> bool:
return self.test.is_dse_job or isinstance(self.num_nodes, list)

@property
def is_domain_randomization_active(self) -> bool:
"""
Whether this run will actually env-sample (domain-randomize) per trial.

True only when domain randomization is declared (``env_params`` present) and the run drives a
per-trial loop that samples them: a DSE job (so a per-trial loop exists - including a
``num_nodes`` sweep) on an agent that opts into sampling, or an online live-RL run
(``cmd_args.live_rl_mode``), which drives the agent's own loop and samples regardless of agent
kind. An unknown agent is treated as opted-in so the dedicated agent-resolution error surfaces
instead of this one.
"""
if not self.test.is_domain_randomization_enabled:
return False

if self.is_live_rl:
return True

agent = Registry().agents_map.get(self.test.agent)
return self.is_dse_job and (agent is None or agent.supports_variable_environment)

@property
def is_live_rl(self) -> bool:
"""True for online live-RL runs, which opt in via ``cmd_args.live_rl_mode``."""
return bool(getattr(self.test.cmd_args, "live_rl_mode", False))

@property
def is_agent_driven(self) -> bool:
"""
True for runs orchestrated by ``agent.run()`` rather than the grid-unrolled path.

A DSE sweep declares a search space (``is_dse_job``). An online live-RL run carries no sweep
(so ``is_dse_job`` is False) but still drives the agent's own ``run()`` loop; it opts in via
``cmd_args.live_rl_mode``.
"""
return self.is_dse_job or self.is_live_rl

@property
def nnodes(self) -> int:
"""Type safe getter for num_nodes, should only be used on an unrolled DSE job."""
Expand All @@ -156,7 +194,9 @@ def param_space(self) -> dict[str, Any]:
**{
key: value
for key, value in cmd_args_dict.items()
if isinstance(value, list) and not self.test.is_dse_excluded_arg(key)
if isinstance(value, list)
and not self.test.is_dse_excluded_arg(key)
and not self.test.is_env_sampled(key)
},
**{f"extra_env_vars.{key}": value for key, value in extra_env_vars_dict.items() if isinstance(value, list)},
}
Expand Down Expand Up @@ -184,9 +224,13 @@ def all_combinations(self) -> list[dict[str, Any]]:

return all_combinations

def apply_params_set(self, action: dict[str, Any]) -> "TestRun":
def apply_params_set(self, action: dict[str, Any], env_params: dict[str, Any] | None = None) -> "TestRun":
tdef = self.test.model_copy(deep=True)
for key, value in action.items():

# RNG runs in the env before this call; applying only concrete values keeps this deterministic.
# action and env_params target disjoint keys, so a plain merge applies both in one pass.
full_action = action | (env_params or {})
for key, value in full_action.items():
if key.startswith("extra_env_vars."):
tdef.extra_env_vars[key[len("extra_env_vars.") :]] = value
else:
Expand All @@ -199,7 +243,12 @@ def apply_params_set(self, action: dict[str, Any]) -> "TestRun":
else:
setattr(obj, attrs[-1], value)

type(tdef)(**tdef.model_dump()) # trigger validation
# env_params is validated at parse time; after the overlay its target cmd_args fields hold
# concrete scalar draws, so re-validating it here would reject weighted specs. Drop it for
# this validation-only pass, which exists to validate the applied action values.
validation_args = tdef.model_dump()
validation_args.pop("env_params", None)
type(tdef)(**validation_args) # trigger validation

new_tr = copy.deepcopy(self)
new_tr.test = tdef
Expand Down
39 changes: 31 additions & 8 deletions src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import toml
import yaml

from cloudai.configurator.env_params import validate_domain_randomization_active
from cloudai.core import (
BaseInstaller,
CloudAIGymEnv,
Expand All @@ -39,6 +40,7 @@
System,
TestParser,
TestScenario,
TestScenarioParsingError,
)
from cloudai.models.scenario import ReportConfig
from cloudai.models.workload import TestDefinition
Expand Down Expand Up @@ -133,8 +135,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
return 1

err = 0
# Recoverable failures return a non-zero rc and are accumulated here; an unexpected exception
# (a bug) is a hard-fail. We capture it so reports still generate, then re-raise below.
# Capture an unexpected error so reports still generate, then re-raise below.
run_error: Exception | None = None
try:
for tr in runner.runner.test_scenario.test_runs:
Expand Down Expand Up @@ -177,7 +178,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
)

if run_error is not None:
raise run_error
raise run_error.with_traceback(run_error.__traceback__)

logging.info("All jobs are complete.")
return err
Expand Down Expand Up @@ -303,6 +304,12 @@ def handle_dry_run_and_run(args: argparse.Namespace) -> int:
return 1
system, test_scenario, tests = setup_result

try:
validate_domain_randomization_active(test_scenario)
except TestScenarioParsingError as e:
logging.error(str(e))
return 1

if not _handle_single_sbatch(args, system):
return 1

Expand Down Expand Up @@ -335,15 +342,30 @@ def handle_dry_run_and_run(args: argparse.Namespace) -> int:
register_signal_handlers(runner.cancel_on_signal)
logging.info(f"Scenario results will be stored at: {runner.runner.scenario_root}")

has_dse = any(tr.is_dse_job for tr in test_scenario.test_runs)
if args.single_sbatch or not has_dse: # in this mode cases are unrolled using grid search
return _dispatch_agent_driven_run(args, runner, test_scenario)


def _dispatch_agent_driven_run(args: argparse.Namespace, runner: Runner, test_scenario: TestScenario) -> int:
"""Route a parsed scenario to the DSE, grid-unroll, or error path based on its test runs."""
agent_driven = [tr.is_agent_driven for tr in test_scenario.test_runs]

# Stopgap guard: single_sbatch grid-unrolls DSE param spaces inside one allocation, but
# SingleSbatchRunner has no live-RL path, so a live_rl_mode job would silently run once as a
# static job instead of driving the in-process GymServer loop. Reject the combination here.
# TODO(https://github.com/NVIDIA/cloudai/issues/937): replace with a proper single_sbatch vs
# agent-driven routing rework (mixed-job guard ordering + per-agent-type semantics).
if args.single_sbatch and any(tr.is_live_rl for tr in test_scenario.test_runs):
logging.error("Single sbatch is not supported for live-RL (live_rl_mode) jobs.")
return 1

if args.single_sbatch or not any(agent_driven): # in this mode cases are unrolled using grid search
handle_non_dse_job(runner, args)
return 0

if all(tr.is_dse_job for tr in test_scenario.test_runs):
if all(agent_driven):
return handle_dse_job(runner, args)

logging.error("Mixing DSE and non-DSE jobs is not allowed.")
logging.error("Mixing agent-driven (DSE / live-RL) and plain jobs is not allowed.")
return 1


Expand Down Expand Up @@ -491,7 +513,8 @@ def verify_test_scenarios(
tests = Parser.parse_tests(test_tomls, system)
hook_tests = Parser.parse_tests(hook_test_tomls, system)
hooks = Parser.parse_hooks(hook_tomls, system, {t.name: t for t in hook_tests})
Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks)
scenario = Parser.parse_test_scenario(scenario_file, system, {t.name: t for t in tests}, hooks)
validate_domain_randomization_active(scenario)
except Exception:
nfailed += 1

Expand Down
5 changes: 4 additions & 1 deletion src/cloudai/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

from .base_agent import BaseAgent
from .base_gym import BaseGym
from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry
from .cloudai_gym import CloudAIGymEnv, GymServer, TrajectoryEntry
from .grid_search import GridSearchAgent
from .gymnasium_adapter import GymnasiumAdapter

__all__ = [
"BaseAgent",
"BaseGym",
"CloudAIGymEnv",
"GridSearchAgent",
"GymServer",
"GymnasiumAdapter",
"TrajectoryEntry",
]
17 changes: 11 additions & 6 deletions src/cloudai/configurator/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ class BaseAgent(ABC):
Provides a unified interface and parameter management for action spaces.
"""

# Opt-in: agents that operate over a variable environment - one that changes per trial, whether
# by env_params sampling (domain randomization) or a curriculum schedule - set this True. Default
# False so env_params declared for an agent that cannot handle a varying env are rejected rather
# than silently ignored.
supports_variable_environment: bool = False

def __init__(self, env: BaseGym, config: BaseAgentConfig):
"""
Initialize the agent with the environment.
Expand Down Expand Up @@ -94,9 +100,8 @@ def select_action(self, observation: list[float] | None = None) -> tuple[int, di

Args:
observation: Latest observation produced by the environment (``env.reset()`` on the
first call, then the result of the prior ``env.step()``). Stateless agents such
as grid search or Bayesian optimization may ignore this; observation-conditioned
agents (RL, contextual bandits) should use it.
first call, then the result of the prior ``env.step()``). Stateless agents may
ignore this; observation-conditioned agents should use it.

Returns:
Tuple[int, Dict[str, Any]] | None: The current step index and a dictionary mapping action keys
Expand All @@ -120,8 +125,7 @@ def run(self) -> int:

Default: a step loop driven by the dispatcher (``select_action`` →
``env.step`` → ``update_policy`` per trial). Agents that drive their
own training loop (e.g. RLlib-based agents calling ``algo.train()``)
override this method.
own training loop override this method.

Failure contract (``handle_dse_job`` consumes the result via
``err |= agent.run()``):
Expand All @@ -131,7 +135,8 @@ def run(self) -> int:
accumulated and the next ``TestRun`` still executes. Workload-level
failures are already surfaced this way: ``CloudAIGymEnv.step`` maps a
failed metric to ``rewards.metric_failure`` rather than raising, and
``rllib_run`` catches training errors and returns ``rc=1``.
agents with their own training loop should likewise catch training
errors and return a non-zero code.
- Raise for *unexpected* failures (framework/agent bugs). Exceptions
propagate out of ``handle_dse_job`` and hard-fail the job so the bug
is surfaced instead of masked as a penalizing reward.
Expand Down
Loading
Loading