From a9b21c31f27765e30d56da6fcdd5ca84648279ef Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Thu, 2 Jul 2026 19:40:48 -0400 Subject: [PATCH 1/3] refactor(configurator): name the next-trial index as upcoming_trial step() increments test_run.step before it samples/runs, so the next trial index is step + 1. Extract that offset into a read-only CloudAIGymEnv.upcoming_trial property and use it in step(), so the "+1" is defined in exactly one place. Behavior-preserving. --- src/cloudai/configurator/cloudai_gym.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index 37ae17021..f94be7dfb 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -71,6 +71,17 @@ def env_params_record_path(self) -> Path: """``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them.""" return self.iteration_dir / "env.csv" + @property + def upcoming_trial(self) -> int: + """ + Index of the next trial ``step`` will run. + + ``step`` increments ``test_run.step`` before it samples/runs, so at rest the counter + holds the last-run trial and the next one is ``+ 1``. ``step`` advances into it. The + ``+ 1`` offset is defined only here so callers cannot re-encode it inconsistently. + """ + return self.test_run.step + 1 + def define_action_space(self) -> Dict[str, list[Any]]: return self.test_run.param_space @@ -126,9 +137,10 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: - done (bool): Whether the episode is done. - info (dict): Additional info for debugging. """ + trial = self.upcoming_trial self.test_run.increment_step() # RNG lives in the env: sample here, then apply action + sample so the run and cache key see them. - sampled_env_params = self.params.sample(self.test_run.step) if self.params else {} + sampled_env_params = self.params.sample(trial) if self.params else {} self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params) cached_result = self.get_cached_trajectory_result(action, sampled_env_params) From 4e4d1dc04e084bad30e6834c4437a618137c8264 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Thu, 2 Jul 2026 19:41:31 -0400 Subject: [PATCH 2/3] feat(configurator): structured-obs producer for env_params CloudAIGymEnv keeps its flat (metrics) observation and reports the per-trial env_param regime on the Gym info dict under info["env_params"] (the key is present only when a regime was applied, so its presence alone signals a non-empty, valid regime). reset() peeks upcoming_trial to report the regime step() will apply next. Adds the producer hooks a consumer needs to build a structured observation: encode_env_params() and structured_observation_descriptors(), plus the Encoding/CategoricalEncoding stack and the StructuredObservation protocol. The GymnasiumAdapter (separate PR) composes the flat metrics with the queried regime into a spaces.Dict, so every RL impl gets the structured obs. --- src/cloudai/configurator/cloudai_gym.py | 48 ++++++-- src/cloudai/configurator/env_params.py | 139 +++++++++++++++++------- src/cloudai/core.py | 9 +- tests/test_cloudaigym.py | 84 ++++++++++++-- tests/test_env_params.py | 50 +++++++++ 5 files changed, 268 insertions(+), 62 deletions(-) diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index f94be7dfb..21daf101d 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -26,7 +26,7 @@ from .base_agent import RewardOverrides from .base_gym import BaseGym -from .env_params import EnvParams, write_env_params +from .env_params import EnvParams, ObsLeafDescriptor, write_env_params @dataclasses.dataclass(frozen=True) @@ -77,8 +77,9 @@ def upcoming_trial(self) -> int: Index of the next trial ``step`` will run. ``step`` increments ``test_run.step`` before it samples/runs, so at rest the counter - holds the last-run trial and the next one is ``+ 1``. ``step`` advances into it. The - ``+ 1`` offset is defined only here so callers cannot re-encode it inconsistently. + holds the last-run trial and the next one is ``+ 1``. ``reset`` peeks this to report the + regime the first ``step`` will apply; ``step`` advances into it. The ``+ 1`` offset is + defined only here so the two paths cannot disagree. """ return self.test_run.step + 1 @@ -119,9 +120,11 @@ def reset( if seed is not None: lazy.np.random.seed(seed) self.test_run.current_iteration = 0 - observation = [0.0] - info = {} - return observation, info + + info: dict[str, Any] = {} + if self.params is not None: + info["env_params"] = self.params.sample(self.upcoming_trial) + return self.define_observation_space(), info def step(self, action: Any) -> Tuple[list, float, bool, dict]: """ @@ -141,9 +144,11 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: self.test_run.increment_step() # RNG lives in the env: sample here, then apply action + sample so the run and cache key see them. sampled_env_params = self.params.sample(trial) if self.params else {} + info: dict[str, Any] = {"env_params": sampled_env_params} if self.params is not None else {} self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params) cached_result = self.get_cached_trajectory_result(action, sampled_env_params) + if cached_result is not None: logging.info( "Retrieved cached result from trajectory with reward %s (from step %s). Skipping execution.", @@ -159,11 +164,12 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: env_params=sampled_env_params, ) ) - return cached_result.observation, cached_result.reward, False, {} + + return cached_result.observation, cached_result.reward, False, info if not self.test_run.test.constraint_check(self.test_run, self.runner.system): logging.info("Constraint check failed. Skipping step.") - return [-1.0], self.rewards.constraint_failure, True, {} + return [-1.0], self.rewards.constraint_failure, True, info new_tr = copy.deepcopy(self.test_run) new_tr.output_path = self.runner.get_job_output_path(new_tr) @@ -185,20 +191,20 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]: self.test_run.step = new_tr.step self.test_run.output_path = new_tr.output_path - observation = self.get_observation(action) - reward = self.compute_reward(observation) + metrics = self.get_observation(action) + reward = self.compute_reward(metrics) self.write_trajectory( TrajectoryEntry( step=self.test_run.step, action=action, reward=reward, - observation=observation, + observation=metrics, env_params=sampled_env_params, ) ) - return observation, reward, False, {} + return metrics, reward, False, info def render(self, mode: str = "human"): """ @@ -253,6 +259,24 @@ def get_observation(self, action: Any) -> list: observation.append(v) return observation + def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescriptor]]: + """ + Per-leaf descriptors for the env_param regime, or ``None`` when none are declared. + + The flat observation (metrics) is unchanged; these describe the extra env_param leaves + the ``GymnasiumAdapter`` merges with the metrics leaf to build its ``spaces.Dict``. + """ + return self.params.observation_descriptors() if self.params is not None else None + + def encode_env_params(self, env_params: dict[str, Any]) -> Dict[str, Any]: + """ + Encode a queried regime (the env_params behind an observation) into named leaves. + + ``env_params`` is a ``{name: drawn value}`` regime (the ``info["env_params"]`` that + ``reset``/``step`` report). The adapter pairs the result with the flat metrics observation. + """ + return self.params.encode(env_params) if self.params is not None else {} + def write_trajectory(self, entry: TrajectoryEntry): """ Append the entry to the in-memory cache and trajectory.csv (plus env.csv when declared). diff --git a/src/cloudai/configurator/env_params.py b/src/cloudai/configurator/env_params.py index 36d73bf14..a926b1505 100644 --- a/src/cloudai/configurator/env_params.py +++ b/src/cloudai/configurator/env_params.py @@ -44,16 +44,75 @@ from cloudai.models.workload import TestDefinition +class ObsLeafDescriptor(BaseModel): + """ + Shape of one leaf of a structured (named) observation. + + Adapters use it to build the matching subspace without guessing: a ``"box"`` leaf is a + continuous vector of width ``dim``; a ``"discrete"`` leaf is a categorical of size ``n``. + Agents that consume the flat observation ignore it. + """ + + model_config = ConfigDict(extra="forbid") + + kind: Literal["box", "discrete"] + dim: int = 1 + n: Optional[int] = None + + @model_validator(mode="after") + def _validate(self) -> Self: + if self.dim < 1: + raise ValueError(f"ObsLeafDescriptor dim must be >= 1; got {self.dim}") + if self.kind == "discrete" and (self.n is None or self.n < 1): + raise ValueError(f"ObsLeafDescriptor(kind='discrete') requires n >= 1; got n={self.n}") + return self + + +class Encoding(Protocol): + """ + Strategy mapping an env_param's drawn value to an observation leaf. + + An encoding declares its own :class:`ObsLeafDescriptor` and encodes a drawn value + into a leaf of that shape. A new strategy implements this pair without touching + :class:`EnvParam` or the adapter. + """ + + def observation_descriptor(self, candidates: List[Any]) -> ObsLeafDescriptor: ... + + def encode(self, value: Any, candidates: List[Any]) -> Any: ... + + +class CategoricalEncoding(BaseModel): + """ + Default encoding: observe the drawn value as its categorical index into ``candidates``. + + Candidates are a discrete set (a ``cmd_args`` list), so the policy sees the per-trial + regime as a ``Discrete(len(candidates))`` index rather than a raw magnitude: an + arbitrary candidate list carries no ordinal meaning to encode continuously. + """ + + model_config = ConfigDict(extra="forbid") + + type: Literal["categorical"] = "categorical" + + def observation_descriptor(self, candidates: List[Any]) -> ObsLeafDescriptor: + return ObsLeafDescriptor(kind="discrete", n=len(candidates)) + + def encode(self, value: Any, candidates: List[Any]) -> int: + return candidates.index(value) + + class EnvParamSpec(BaseModel): """ Annotation marking one cmd_args field as env-sampled. Carries only *how* to sample - the candidate values themselves live in ``cmd_args.`` as a plain list. ``weights`` (optional) are positional, - aligned 1:1 with that candidate list; omit for uniform sampling. The - length match against the candidate list is a cross-field check enforced by - ``TestDefinition`` (which can see ``cmd_args``); here we validate only the - weights' intrinsic shape. + aligned 1:1 with that candidate list; omit for uniform sampling. ``encoding`` + (optional) selects how the drawn value is exposed to the policy as an + observation leaf, defaulting to a categorical index. The length match against + the candidate list is a cross-field check enforced by ``TestDefinition`` (which + can see ``cmd_args``); here we validate only the weights' intrinsic shape. """ model_config = ConfigDict(extra="forbid") @@ -62,6 +121,10 @@ class EnvParamSpec(BaseModel): default=None, description="Optional probability weights aligned with the cmd_args candidate list; uniform if omitted.", ) + encoding: CategoricalEncoding = Field( + default_factory=CategoricalEncoding, + description="How the drawn value is encoded as an observation leaf (categorical index into the candidates).", + ) @model_validator(mode="after") def _validate_weights(self) -> Self: @@ -76,67 +139,51 @@ def _validate_weights(self) -> Self: return self -class ObsLeafDescriptor(BaseModel): - """ - Description of one leaf of a structured (named) observation. - - A structured observation maps each observed name to a self-describing leaf - so adapters can build the matching subspace without guessing: a ``"box"`` - leaf becomes a continuous vector of width ``dim`` (e.g. a log-encoded - env_param as ``dim=2``); a ``"discrete"`` leaf becomes a categorical of - size ``n``. Stateless agents that consume the flat observation ignore this. - """ - - model_config = ConfigDict(extra="forbid") - - kind: Literal["box", "discrete"] - dim: int = 1 - n: Optional[int] = None - - @model_validator(mode="after") - def _validate(self) -> Self: - if self.dim < 1: - raise ValueError(f"ObsLeafDescriptor dim must be >= 1; got {self.dim}") - if self.kind == "discrete" and (self.n is None or self.n < 1): - raise ValueError(f"ObsLeafDescriptor(kind='discrete') requires n >= 1; got n={self.n}") - return self - - @runtime_checkable class StructuredObservation(Protocol): """ - Optional env hooks that expose a structured (per-leaf) observation. - - An env opts in by returning per-leaf :class:`ObsLeafDescriptor` from - ``structured_observation_descriptors`` (``None`` keeps the flat-vector - path) and encoding a raw observation into the matching named leaves via - ``encode_observation``. ``GymnasiumAdapter`` consumes these to expose a - ``gymnasium.spaces.Dict`` observation; the hooks are duck-typed, so envs - need not subclass this Protocol. + Optional env hooks exposing the env_params behind the (unchanged, flat) observation. + + The env keeps returning its flat observation (the metrics) and, when a regime was applied, + delivers it on the Gym ``info`` dict under ``info["env_params"]`` (the key is absent otherwise, + so its presence alone signals a non-empty regime). An env opts in by + declaring per-leaf :class:`ObsLeafDescriptor` for its env_params via + ``structured_observation_descriptors`` (``None`` when none are declared) and encoding a regime + into the matching named leaves via ``encode_env_params``. ``GymnasiumAdapter`` merges the flat + metrics with these env leaves into a ``gymnasium.spaces.Dict``. The hooks are duck-typed, so + envs need not subclass this Protocol. """ def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescriptor]]: ... - def encode_observation(self, observation: list) -> Dict[str, Any]: ... + def encode_env_params(self, env_params: dict[str, Any]) -> Dict[str, Any]: ... @dataclasses.dataclass(frozen=True) class EnvParam: """ - One env-sampled knob, resolved from cmd_args: its candidate values and optional weights. + One env-sampled knob, resolved from cmd_args: candidates, optional weights, and encoding. Weights (when present) are positional, aligned 1:1 with ``candidates``; ``None`` means - uniform sampling. Keeping the two together makes each knob a self-contained draw. + uniform sampling. ``encoding`` owns how a drawn value is exposed to the policy as an + observation leaf. Bundling the three makes each knob a self-contained draw-and-encode. """ candidates: List[Any] weights: Optional[List[float]] = None + encoding: Encoding = dataclasses.field(default_factory=CategoricalEncoding) def draw(self, rng: random.Random) -> Any: if self.weights is not None: return rng.choices(self.candidates, weights=self.weights, k=1)[0] return rng.choice(self.candidates) + def observation_descriptor(self) -> ObsLeafDescriptor: + return self.encoding.observation_descriptor(self.candidates) + + def encode(self, value: Any) -> Any: + return self.encoding.encode(value, self.candidates) + @dataclasses.dataclass(frozen=True) class EnvParams: @@ -167,7 +214,7 @@ def from_test(cls, test: "TestDefinition") -> Optional["EnvParams"]: value = getattr(test.cmd_args, name, None) if not isinstance(value, list): continue - params[name] = EnvParam(candidates=value, weights=spec.weights) + params[name] = EnvParam(candidates=value, weights=spec.weights, encoding=spec.encoding) if not params: return None seed = int((test.agent_config or {}).get("random_seed", 0)) @@ -183,6 +230,14 @@ def sample(self, trial: int) -> Dict[str, Any]: """ return {name: param.draw(random.Random(f"{self.seed}:{name}:{trial}")) for name, param in self.params.items()} + def encode(self, regime: Dict[str, Any]) -> Dict[str, Any]: + """Encode a drawn regime (``{name: value}``) into one named observation leaf per parameter.""" + return {name: param.encode(regime[name]) for name, param in self.params.items()} + + def observation_descriptors(self) -> Dict[str, ObsLeafDescriptor]: + """Per-parameter observation-leaf descriptors, keyed by parameter name.""" + return {name: param.observation_descriptor() for name, param in self.params.items()} + def write_env_params(path: Path, step: int, sample: Dict[str, Any]) -> None: """ diff --git a/src/cloudai/core.py b/src/cloudai/core.py index aefcd0573..3a4c7bcf7 100644 --- a/src/cloudai/core.py +++ b/src/cloudai/core.py @@ -51,7 +51,12 @@ from ._core.test_scenario import METRIC_ERROR, MetricErrorSentinel, MetricValue, TestDependency, TestRun, TestScenario from .configurator.base_agent import BaseAgent, BaseAgentConfig, RewardOverrides from .configurator.cloudai_gym import CloudAIGymEnv -from .configurator.env_params import ObsLeafDescriptor, StructuredObservation +from .configurator.env_params import ( + CategoricalEncoding, + Encoding, + ObsLeafDescriptor, + StructuredObservation, +) from .configurator.grid_search import GridSearchAgent from .models.workload import CmdArgs, NsysConfiguration, PredictorConfig, TestDefinition from .parser import Parser @@ -67,10 +72,12 @@ "BaseJob", "BaseRunner", "BaseSystemParser", + "CategoricalEncoding", "CloudAIGymEnv", "CmdArgs", "CommandGenStrategy", "DockerImage", + "Encoding", "File", "GitRepo", "Grader", diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index 966b6a459..543d0c8fd 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -21,7 +21,7 @@ import pytest from cloudai.configurator import CloudAIGymEnv, GridSearchAgent, TrajectoryEntry -from cloudai.configurator.env_params import EnvParamSpec +from cloudai.configurator.env_params import EnvParamSpec, ObsLeafDescriptor from cloudai.core import BaseRunner, RewardOverrides, Runner, TestRun, TestScenario from cloudai.systems.slurm import SlurmSystem from cloudai.util import flatten_dict @@ -191,7 +191,7 @@ def test_constraint_failure(nemorun: NeMoRunTestDefinition, rewards: RewardOverr assert obs == [-1.0] assert reward == expected_reward assert done is True - assert info == {} + assert "env_params" not in info, "no env_params declared -> key absent (its presence signals a real regime)" def test_action_space(nemorun: NeMoRunTestDefinition, setup_env: tuple[TestRun, BaseRunner]): @@ -556,14 +556,16 @@ def test_step_reruns_workload_when_env_params_change(tmp_path: Path) -> None: with patch.object(env, "get_observation", side_effect=lambda _action: next(fake_obs)): env.test_run.step = 0 - obs1, _r1, *_ = env.step(action) # samples ball_speed=3 - obs2, _r2, *_ = env.step(action) # samples ball_speed=1 + *_, info1 = env.step(action) # samples ball_speed=3 + *_, info2 = env.step(action) # samples ball_speed=1 assert runner.run.call_count == 2, ( "Different sampled env_params between two env.step() calls with the same action " "must trigger a workload re-run; the cache lookup must miss." ) - assert obs1 != obs2, "fresh workload run should produce a fresh observation" + assert info1["env_params"] != info2["env_params"], ( + "different env_param draws must be reported as different regimes on info" + ) def test_env_csv_is_step_aligned_with_trajectory(tmp_path: Path) -> None: @@ -733,10 +735,14 @@ def test_step_cache_hit_with_declared_env_params_still_writes_env_csv(tmp_path: env.test_run.step = 0 with patch.object(env, "get_observation", side_effect=AssertionError("cache miss path must not run")): - obs, reward, _done, _info = env.step(action) + obs, reward, _done, info = env.step(action) runner.run.assert_not_called() - assert reward == 0.42 and obs == [0.84] + assert reward == 0.42 + assert obs == [0.84], "flat obs stays the cached metrics; the regime is not mixed into it" + assert info["env_params"] == expected_sample, ( + "the per-trial regime behind this observation is reported on info['env_params']" + ) env_csv = env.env_params_record_path assert env_csv.exists(), "cache HIT must NOT skip the observer; env.csv must record the trial" @@ -840,3 +846,67 @@ def test_no_env_csv_when_env_params_not_declared(nemorun: NeMoRunTestDefinition, assert env.params is None, "no env_params declared -> no EnvParams object" assert not env.env_params_record_path.exists() + + +def _dr_env(tmp_path: Path, candidates: list, *, seed: int = 42) -> CloudAIGymEnv: + """A CloudAIGymEnv whose ``ball_speed`` is env-randomised over ``candidates``.""" + tdef = EnvVarTestDefinition( + name="dr", + description="dr", + test_template_name="dr_template", + cmd_args=EnvVarCmdArgs(ball_speed=candidates), + env_params={"ball_speed": EnvParamSpec()}, + agent_metrics=["default"], + agent_config={"random_seed": seed}, + ) + test_run = TestRun(name="dr_tr", test=tdef, num_nodes=1, nodes=[], output_path=tmp_path / "out" / "dr_tr" / "0") + runner = MagicMock(spec=BaseRunner) + runner.scenario_root = tmp_path / "scenario" + runner.system = MagicMock() + return CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + +class TestStructuredObservationProducer: + """CloudAIGymEnv produces the StructuredObservation the adapter consumes for DR runs.""" + + def test_descriptor_is_one_discrete_leaf_per_env_param(self, tmp_path: Path) -> None: + """Each env_param becomes a categorical leaf sized to its candidate list.""" + env = _dr_env(tmp_path, [1, 2, 3]) + + descriptors = env.structured_observation_descriptors() + + assert descriptors == {"ball_speed": ObsLeafDescriptor(kind="discrete", n=3)} + + def test_metrics_only_env_opts_out(self, nemorun: NeMoRunTestDefinition, tmp_path: Path) -> None: + """A workload with no env_params returns ``None`` (adapter keeps the flat Box path).""" + tdef = nemorun.model_copy(deep=True) + tdef.cmd_args.data.global_batch_size = 8 + test_run = TestRun(name="plain_tr", test=tdef, num_nodes=1, nodes=[]) + runner = MagicMock(spec=BaseRunner) + runner.system = MagicMock() + env = CloudAIGymEnv(test_run=test_run, runner=runner, rewards=RewardOverrides()) + + assert env.structured_observation_descriptors() is None + assert env.encode_env_params({}) == {} + + def test_encode_env_params_maps_native_value_to_candidate_index(self, tmp_path: Path) -> None: + """encode_env_params turns a drawn native value into its categorical index.""" + env = _dr_env(tmp_path, [1, 2, 3]) + + assert env.encode_env_params({"ball_speed": 3}) == {"ball_speed": 2} + assert env.encode_env_params({"ball_speed": 1}) == {"ball_speed": 0} + + def test_reset_reports_the_regime_step_will_apply_on_info(self, tmp_path: Path) -> None: + """reset()'s flat obs stays the metrics placeholder; the upcoming trial's regime is + reported on info["env_params"] and matches the value step() draws for that same index.""" + import random as _random + + env = _dr_env(tmp_path, [1, 2, 3]) + env.test_run.step = 0 + + obs, info = env.reset() + upcoming = _random.Random("42:ball_speed:1").choice([1, 2, 3]) + + assert obs == env.define_observation_space(), "reset's flat obs stays the metrics placeholder" + assert info["env_params"] == {"ball_speed": upcoming}, "reset peeks step+1 and reports the regime" + assert env.encode_env_params(info["env_params"]) == {"ball_speed": [1, 2, 3].index(upcoming)} diff --git a/tests/test_env_params.py b/tests/test_env_params.py index a0341d615..433273b95 100644 --- a/tests/test_env_params.py +++ b/tests/test_env_params.py @@ -40,6 +40,7 @@ from pydantic import BaseModel, ValidationError from cloudai.configurator.env_params import ( + CategoricalEncoding, EnvParam, EnvParams, EnvParamSpec, @@ -404,3 +405,52 @@ def test_obs_leaf_descriptor_rejects_bad_dim_and_extra_fields() -> None: ObsLeafDescriptor(kind="box", dim=1, unexpected=1) # type: ignore with pytest.raises(ValidationError): ObsLeafDescriptor(kind="categorical", dim=1) # type: ignore + + +# --- Encoding: pluggable strategy mapping a drawn value to an observation leaf --- + + +def test_env_param_spec_defaults_to_categorical_encoding() -> None: + """An unspecified encoding defaults to categorical (back-compat with bare ``EnvParamSpec()``).""" + assert EnvParamSpec().encoding == CategoricalEncoding() + + +def test_env_param_spec_parses_encoding_from_config() -> None: + """The encoding is config-selectable (TOML-style dict) via its ``type`` field.""" + spec = EnvParamSpec.model_validate({"encoding": {"type": "categorical"}}) + assert isinstance(spec.encoding, CategoricalEncoding) + + +def test_categorical_encoding_descriptor_and_encode() -> None: + """Categorical maps a candidate list to a Discrete leaf and a value to its index.""" + enc = CategoricalEncoding() + assert enc.observation_descriptor([10, 20, 30]) == ObsLeafDescriptor(kind="discrete", n=3) + assert enc.encode(30, [10, 20, 30]) == 2 + + +def test_env_param_delegates_observation_to_its_encoding() -> None: + """EnvParam owns no encoding logic itself; it delegates to the configured strategy.""" + knob = EnvParam(candidates=[10, 20, 30]) + + assert knob.observation_descriptor() == ObsLeafDescriptor(kind="discrete", n=3) + assert knob.encode(20) == 1 + + +def test_custom_encoding_plugs_into_env_param() -> None: + """A new encoding (e.g. a future log encoding) only needs the two-method interface. + + This proves the seam: EnvParam delegates to ``encoding`` without knowing its kind, so + adding strategies never touches EnvParam or the adapter. + """ + + class _BoxEncoding: + def observation_descriptor(self, candidates: list) -> ObsLeafDescriptor: + return ObsLeafDescriptor(kind="box", dim=1) + + def encode(self, value: object, candidates: list) -> list: + return [float(value)] + + knob = EnvParam(candidates=[10, 20, 30], encoding=_BoxEncoding()) + + assert knob.observation_descriptor() == ObsLeafDescriptor(kind="box", dim=1) + assert knob.encode(20) == [20.0] From 3c4e13075beea140259e0e8ce0efd1607a644f33 Mon Sep 17 00:00:00 2001 From: Rutayan Patro Date: Tue, 16 Jun 2026 01:04:25 -0400 Subject: [PATCH 3/3] feat(configurator): add GymnasiumAdapter for CloudAI envs Wrap a CloudAI BaseGym as a gymnasium.Env-shaped object: a spaces.Dict of Discrete (list params) and Box (ContinuousSpace) actions over the tunable params with fixed (single-value) params injected each step; observations as either a flat float32 Box or, when the env opts in via the structured-obs hooks, a spaces.Dict of per-leaf ObsLeafDescriptor subspaces. Continuous dtype="int" params are quantized (rounded/clamped) at decode_action so the trajectory cache key collapses float jitter. The adapter is a pure pass-through over test_run.step (never mutates it), so contextual-bandit rollouts that reset() per trial keep a monotonic trial index. gymnasium is an optional dependency lazy-imported behind the new [rl] extra (also added to dev); CloudAIGymEnv.define_observation_space() now returns one slot per agent metric so adapters get the right Box shape. Exported via cloudai.core. Caller-contract tests pin the step-monotonicity, observation pass-through, continuous-quantization, and structured-obs invariants. --- pyproject.toml | 2 + src/cloudai/cli/handlers.py | 2 +- src/cloudai/configurator/__init__.py | 2 + src/cloudai/configurator/cloudai_gym.py | 10 +- src/cloudai/configurator/env_params.py | 6 +- src/cloudai/configurator/gymnasium_adapter.py | 251 +++++++++++ src/cloudai/core.py | 6 +- src/cloudai/util/lazy_imports.py | 17 +- tests/test_cloudaigym.py | 2 +- tests/test_env_params.py | 6 +- tests/test_gymnasium_adapter_contract.py | 392 ++++++++++++++++++ uv.lock | 43 +- 12 files changed, 722 insertions(+), 17 deletions(-) create mode 100644 src/cloudai/configurator/gymnasium_adapter.py create mode 100644 tests/test_gymnasium_adapter_contract.py diff --git a/pyproject.toml b/pyproject.toml index 94f0bdf83..4507e5048 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,9 @@ classifiers = [ "import-linter~=2.10", "pytest-deadfixtures~=3.1", "taplo~=0.9.3", + "gymnasium~=1.2", ] + rl = ["gymnasium~=1.2"] docs = [ "sphinx~=8.1", "nvidia-sphinx-theme~=0.0.8", diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index 084d48ba1..620fd3009 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -179,7 +179,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: ) if run_error is not None: - raise run_error + raise run_error.with_traceback(run_error.__traceback__) logging.info("All jobs are complete.") return err diff --git a/src/cloudai/configurator/__init__.py b/src/cloudai/configurator/__init__.py index f05b65c5b..a88432c41 100644 --- a/src/cloudai/configurator/__init__.py +++ b/src/cloudai/configurator/__init__.py @@ -18,11 +18,13 @@ from .base_gym import BaseGym from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry from .grid_search import GridSearchAgent +from .gymnasium_adapter import GymnasiumAdapter __all__ = [ "BaseAgent", "BaseGym", "CloudAIGymEnv", "GridSearchAgent", + "GymnasiumAdapter", "TrajectoryEntry", ] diff --git a/src/cloudai/configurator/cloudai_gym.py b/src/cloudai/configurator/cloudai_gym.py index 21daf101d..21b3f82af 100644 --- a/src/cloudai/configurator/cloudai_gym.py +++ b/src/cloudai/configurator/cloudai_gym.py @@ -96,9 +96,10 @@ def define_observation_space(self) -> list: Define the observation space for the environment. Returns: - list: The observation space. + list: One float slot per agent metric (at least one), giving the correct shape + for adapters that derive ``gymnasium.spaces.Box`` from this output. """ - return [0.0] + return [0.0] * max(len(self.test_run.test.agent_metrics), 1) def reset( self, @@ -120,7 +121,6 @@ def reset( if seed is not None: lazy.np.random.seed(seed) self.test_run.current_iteration = 0 - info: dict[str, Any] = {} if self.params is not None: info["env_params"] = self.params.sample(self.upcoming_trial) @@ -263,8 +263,8 @@ def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescri """ Per-leaf descriptors for the env_param regime, or ``None`` when none are declared. - The flat observation (metrics) is unchanged; these describe the extra env_param leaves - the ``GymnasiumAdapter`` merges with the metrics leaf to build its ``spaces.Dict``. + The flat observation (metrics) is unchanged; these describe the env_param leaves the + ``GymnasiumAdapter`` merges with it into its structured observation ``spaces.Dict``. """ return self.params.observation_descriptors() if self.params is not None else None diff --git a/src/cloudai/configurator/env_params.py b/src/cloudai/configurator/env_params.py index a926b1505..a824d6205 100644 --- a/src/cloudai/configurator/env_params.py +++ b/src/cloudai/configurator/env_params.py @@ -140,7 +140,7 @@ def _validate_weights(self) -> Self: @runtime_checkable -class StructuredObservation(Protocol): +class StructuredObservationProducer(Protocol): """ Optional env hooks exposing the env_params behind the (unchanged, flat) observation. @@ -150,8 +150,8 @@ class StructuredObservation(Protocol): declaring per-leaf :class:`ObsLeafDescriptor` for its env_params via ``structured_observation_descriptors`` (``None`` when none are declared) and encoding a regime into the matching named leaves via ``encode_env_params``. ``GymnasiumAdapter`` merges the flat - metrics with these env leaves into a ``gymnasium.spaces.Dict``. The hooks are duck-typed, so - envs need not subclass this Protocol. + metrics with these env_param leaves into its structured observation ``gymnasium.spaces.Dict``. + The hooks are duck-typed, so envs need not subclass this Protocol. """ def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescriptor]]: ... diff --git a/src/cloudai/configurator/gymnasium_adapter.py b/src/cloudai/configurator/gymnasium_adapter.py new file mode 100644 index 000000000..dcc09c0e2 --- /dev/null +++ b/src/cloudai/configurator/gymnasium_adapter.py @@ -0,0 +1,251 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Gymnasium adapter for CloudAI ``BaseGym`` environments. + +Translates a CloudAI :class:`BaseGym` into the ``gymnasium.Env`` 5-tuple shape +that RLlib-based agents (e.g. PPO / DQN) and external training loops expect. +``gymnasium`` is an optional dependency (the ``[rl]`` extra), so it is imported +lazily and only required when an adapter is actually instantiated. + +Design invariant — adapter is a pure pass-through over ``test_run.step``. +The trial counter is owned by ``TestRun`` and advanced exclusively by +``CloudAIGymEnv.step()``. Adapters that wrote ``test_run.step`` themselves — +mirroring a Gym-protocol episode-local counter — collapsed every +contextual-bandit rollout onto ``step=1`` because RLlib calls ``reset()`` per +trial. This adapter never mutates ``test_run.step``; contract tests pin that +property. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Optional + +from cloudai.util.lazy_imports import lazy + +from .base_gym import BaseGym +from .env_params import StructuredObservationProducer + +if TYPE_CHECKING: + from gymnasium import Env as _GymnasiumEnvBase +else: + try: # ``gymnasium`` is an optional [rl] dependency; fall back to ``object`` when absent. + from gymnasium import Env as _GymnasiumEnvBase + except ImportError: + _GymnasiumEnvBase = object + + +class GymnasiumAdapter(_GymnasiumEnvBase): + """ + Expose a CloudAI :class:`BaseGym` as a ``gymnasium.Env``-shaped object. + + The adapter: + + * Builds a ``gymnasium.spaces.Dict`` of ``Discrete`` action spaces over + the *tunable* parameters (those with more than one candidate value), + and injects the *fixed* parameters (single candidate) automatically on + every step so agents never see them. + * Builds the observation from the env's flat metrics: a ``float32`` ``Box`` for + metrics-only envs, or — when the env declares env_params — a ``spaces.Dict`` + ``{"observation": Box, "context": Dict(...)}`` pairing the metrics ``observation`` + with the per-trial ``context`` the env reports on ``info["env_params"]``, each + param encoded to its leaf. Naming follows the contextual-MDP convention + (``observation`` = inner signal, ``context`` = env regime). + * Returns the gymnasium 5-tuple ``(obs, reward, terminated, truncated, info)`` + from :meth:`step` and :meth:`step_raw`. + + ``gymnasium`` and ``numpy`` are optional dependencies (the ``[rl]`` extra); + instantiating the adapter without them raises ``ImportError``. + """ + + # Overrides gymnasium.Env.metadata (a non-ClassVar instance attribute); matching that + # shape satisfies pyright's override check, so RUF012's ClassVar suggestion is silenced. + metadata: dict[str, Any] = {"render_modes": ["human"]} # noqa: RUF012 + + def __init__(self, env: BaseGym) -> None: + np = self._np = lazy.np + spaces = self._spaces = lazy.gymnasium.spaces + self._env = env + + raw_action_space = env.define_action_space() + + # Two classes of params from cloudai's param_space: + # list with len > 1 -> discrete tunable, mapped to gym.Discrete. + # list with len == 1 -> fixed (collapsed); injected on every step so + # agents never see them. + self._discrete_params: dict[str, list] = { + k: v for k, v in raw_action_space.items() if isinstance(v, list) and len(v) > 1 + } + self._fixed_params: dict[str, Any] = { + k: v[0] for k, v in raw_action_space.items() if isinstance(v, list) and len(v) == 1 + } + + action_space_components: dict[str, Any] = { + name: spaces.Discrete(len(values)) for name, values in self._discrete_params.items() + } + self.action_space = spaces.Dict(action_space_components) + + # Observation space. When the env declares env_params, expose a structured + # spaces.Dict pairing the flat metrics (unchanged) with the per-trial context: + # {"observation": Box(m), "context": Dict({: , ...})} + # so the policy sees named, individually-encoded context leaves (e.g. a log-encoded + # env_param as Box(2)) alongside the metrics; RLlib connectors own normalize + flatten. + # Envs without env_params keep the legacy flat Box. + metrics_shape = (len(env.define_observation_space()),) + metrics_space = spaces.Box(low=-np.inf, high=np.inf, shape=metrics_shape, dtype=np.float32) + self._obs_descriptors: Optional[dict[str, Any]] = self._structured_obs_descriptors(env) + if self._obs_descriptors: + context_space = spaces.Dict( + {name: self._descriptor_to_space(desc) for name, desc in self._obs_descriptors.items()} + ) + self.observation_space = spaces.Dict({"observation": metrics_space, "context": context_space}) + else: + self.observation_space = metrics_space + + @staticmethod + def _structured_obs_descriptors(env: BaseGym) -> Optional[dict[str, Any]]: + """ + Return the env's per-leaf obs descriptors, or ``None`` for the flat-Box path. + + The env owns the opt-in decision via ``structured_observation_descriptors`` + (returns ``None`` unless it declares env_params). Envs that don't satisfy the + ``StructuredObservationProducer`` protocol keep the legacy flat-Box path. + """ + if not isinstance(env, StructuredObservationProducer): + return None + descriptors = env.structured_observation_descriptors() + return descriptors or None + + def _descriptor_to_space(self, descriptor: Any) -> Any: + """Map a framework-agnostic ``ObsLeafDescriptor`` to a gymnasium subspace.""" + if descriptor.kind == "discrete": + return self._spaces.Discrete(descriptor.n) + return self._spaces.Box(low=-self._np.inf, high=self._np.inf, shape=(descriptor.dim,), dtype=self._np.float32) + + def decode_action(self, action: dict[str, Any]) -> dict[str, Any]: + """ + Map raw gym actions back to native parameter values. + + Discrete actions are list indices and resolve to the corresponding list + entry. + + Raises: + ValueError: if ``action`` is missing tunable params, contains + unknown keys, or carries an out-of-range discrete index. + """ + self._assert_keys(action.keys(), set(self._discrete_params), "action") + return {name: self._decode_discrete(name, raw) for name, raw in action.items()} + + def _decode_discrete(self, name: str, raw: Any) -> Any: + values = self._discrete_params[name] + idx = int(raw) + if not 0 <= idx < len(values): + raise ValueError(f"Action index out of range for '{name}': {idx} (expected 0..{len(values) - 1})") + return values[idx] + + def encode_action(self, values: dict[str, Any]) -> dict[str, Any]: + """ + Map native parameter values back to raw gym actions; inverse of :meth:`decode_action`. + + Discrete values resolve to their index in the candidate list. Together + with :meth:`decode_action` this is an invertible pair on native values: + ``decode_action(encode_action(v)) == v`` for any ``v`` drawn from the + tunable params. + + Consumers that need to express known native configs in the policy's + action space — e.g. warm-start / behavioral cloning from a recorded + trajectory — call this instead of reaching into the adapter internals. + + Raises: + ValueError: if ``values`` does not cover exactly the tunable params, + or carries a discrete value absent from its candidate list. + """ + self._assert_keys(values.keys(), set(self._discrete_params), "values") + return {name: self._encode_discrete(name, value) for name, value in values.items()} + + def _encode_discrete(self, name: str, value: Any) -> int: + values = self._discrete_params[name] + try: + return values.index(value) + except ValueError: + raise ValueError(f"Value {value!r} for '{name}' is not a candidate; expected one of {values}") from None + + def reset( + self, + *, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[Any, dict[str, Any]]: + obs, info = self._env.reset(seed=seed, options=options) + return self._as_obs(obs, info), info + + def step(self, action: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + params = {**self._fixed_params, **self.decode_action(action)} + return self._step_with_params(params) + + def step_raw(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + """ + Step the env with an already-decoded parameter dict; bypasses index decoding. + + Raises: + ValueError: if ``params`` does not cover exactly the tunable + + fixed param keys. + """ + expected = set(self._discrete_params) | set(self._fixed_params) + self._assert_keys(params.keys(), expected, "raw params") + return self._step_with_params(params) + + def render(self) -> None: + self._env.render() + + @staticmethod + def _assert_keys(received: Any, expected: set[str], ctx: str) -> None: + received_set = set(received) + if received_set == expected: + return + missing = sorted(expected - received_set) + extra = sorted(received_set - expected) + raise ValueError(f"{ctx} keys mismatch; missing={missing}, extra={extra}") + + def _step_with_params(self, params: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]: + obs, reward, done, info = self._env.step(params) + return self._as_obs(obs, info), float(reward), bool(done), False, info + + def _as_obs(self, obs: Any, info: dict[str, Any]) -> Any: + """ + Build the policy-facing observation from the env's flat obs and its ``info``. + + Flat path: a single ``float32`` ``Box`` array (the metrics), legacy behaviour. + Structured path: a ``spaces.Dict`` pairing the flat metrics with the per-trial + env_param regime the env reports on ``info["env_params"]``, each param encoded to + its declared leaf: ``{"observation": Box(m), "context": {: , ...}}``. + """ + metrics = self._np.asarray(obs, dtype=self._np.float32) + descriptors = self._obs_descriptors + env = self._env + if descriptors is None or not isinstance(env, StructuredObservationProducer): + return metrics + encoded = env.encode_env_params(info["env_params"]) + self._assert_keys(encoded.keys(), set(descriptors), "encoded env_params") + context_leaves = {name: self._leaf_to_value(descriptors[name], encoded[name]) for name in descriptors} + return {"observation": metrics, "context": context_leaves} + + def _leaf_to_value(self, descriptor: Any, leaf: Any) -> Any: + """Coerce one encoded leaf to its gymnasium subspace dtype.""" + if descriptor.kind == "discrete": + return int(leaf) + return self._np.asarray(leaf, dtype=self._np.float32) diff --git a/src/cloudai/core.py b/src/cloudai/core.py index 3a4c7bcf7..5bb739f99 100644 --- a/src/cloudai/core.py +++ b/src/cloudai/core.py @@ -55,9 +55,10 @@ CategoricalEncoding, Encoding, ObsLeafDescriptor, - StructuredObservation, + StructuredObservationProducer, ) from .configurator.grid_search import GridSearchAgent +from .configurator.gymnasium_adapter import GymnasiumAdapter from .models.workload import CmdArgs, NsysConfiguration, PredictorConfig, TestDefinition from .parser import Parser from .reporter import PerTestReporter, StatusReporter, TarballReporter @@ -83,6 +84,7 @@ "Grader", "GradingStrategy", "GridSearchAgent", + "GymnasiumAdapter", "HFModel", "InstallStatusResult", "Installable", @@ -105,7 +107,7 @@ "RewardOverrides", "Runner", "StatusReporter", - "StructuredObservation", + "StructuredObservationProducer", "System", "SystemConfigParsingError", "TarballReporter", diff --git a/src/cloudai/util/lazy_imports.py b/src/cloudai/util/lazy_imports.py index 65ba62df3..def790dfc 100644 --- a/src/cloudai/util/lazy_imports.py +++ b/src/cloudai/util/lazy_imports.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,6 +27,7 @@ import bokeh.palettes as bokeh_pallettes import bokeh.plotting as bokeh_plotting import bokeh.transform as bokeh_transform + import gymnasium import kubernetes as k8s import numpy as np import pandas as pd @@ -39,6 +40,7 @@ def __init__(self): self._np: ModuleType | None = None self._pd: ModuleType | None = None self._k8s: ModuleType | None = None + self._gymnasium: ModuleType | None = None self._bokeh: ModuleType | None = None self._bokeh_plotting: ModuleType | None = None self._bokeh_models: ModuleType | None = None @@ -75,6 +77,19 @@ def k8s(self) -> k8s: # type: ignore[no-any-return] return cast("k8s", self._k8s) + @property + def gymnasium(self) -> gymnasium: # type: ignore[no-any-return] + """Lazy import of gymnasium (optional ``cloudai[rl]`` extra).""" + if self._gymnasium is None: + try: + import gymnasium + except ImportError as exc: + raise ImportError( + "gymnasium is required for GymnasiumAdapter. Install it with: pip install 'cloudai[rl]'" + ) from exc + self._gymnasium = gymnasium + return cast("gymnasium", self._gymnasium) + @property def bokeh(self) -> bokeh: # type: ignore[no-any-return] """Lazy import of bokeh.""" diff --git a/tests/test_cloudaigym.py b/tests/test_cloudaigym.py index 543d0c8fd..dfc7ea7ae 100644 --- a/tests/test_cloudaigym.py +++ b/tests/test_cloudaigym.py @@ -867,7 +867,7 @@ def _dr_env(tmp_path: Path, candidates: list, *, seed: int = 42) -> CloudAIGymEn class TestStructuredObservationProducer: - """CloudAIGymEnv produces the StructuredObservation the adapter consumes for DR runs.""" + """CloudAIGymEnv produces the StructuredObservationProducer output the adapter consumes for DR runs.""" def test_descriptor_is_one_discrete_leaf_per_env_param(self, tmp_path: Path) -> None: """Each env_param becomes a categorical leaf sized to its candidate list.""" diff --git a/tests/test_env_params.py b/tests/test_env_params.py index 433273b95..9eb11d2ac 100644 --- a/tests/test_env_params.py +++ b/tests/test_env_params.py @@ -34,7 +34,7 @@ import dataclasses import random from pathlib import Path -from typing import List, Union +from typing import Any, List, Union import pytest from pydantic import BaseModel, ValidationError @@ -444,10 +444,10 @@ def test_custom_encoding_plugs_into_env_param() -> None: """ class _BoxEncoding: - def observation_descriptor(self, candidates: list) -> ObsLeafDescriptor: + def observation_descriptor(self, candidates: List[Any]) -> ObsLeafDescriptor: return ObsLeafDescriptor(kind="box", dim=1) - def encode(self, value: object, candidates: list) -> list: + def encode(self, value: Any, candidates: List[Any]) -> list: return [float(value)] knob = EnvParam(candidates=[10, 20, 30], encoding=_BoxEncoding()) diff --git a/tests/test_gymnasium_adapter_contract.py b/tests/test_gymnasium_adapter_contract.py new file mode 100644 index 000000000..a995fede0 --- /dev/null +++ b/tests/test_gymnasium_adapter_contract.py @@ -0,0 +1,392 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Caller-contract tests for ``GymnasiumAdapter``. + +The single invariant every consumer assumes: + + ``test_run.step`` is a **monotonic trial index** across the entire run. + +Gym's ``reset()`` is an *episode boundary*, not a trial boundary. For the +contextual-bandit configs (``agent_steps=1``), RLlib calls ``reset()`` before +*every* trial. An earlier adapter rewound ``test_run.step`` on reset and +collapsed every trial onto step 1 — silently overwriting output dirs and +producing duplicate-step rows in trajectory.csv / env.csv. + +These tests pin the negative invariant: the adapter must not mutate +``test_run.step``. That counter is owned by ``TestRun`` and advanced +exclusively by ``CloudAIGymEnv.step()``. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any, Optional + +import pytest + +from cloudai.configurator.base_gym import BaseGym +from cloudai.configurator.env_params import ObsLeafDescriptor + +try: + import gymnasium # noqa: F401 + + _HAS_GYMNASIUM = True +except ImportError: + _HAS_GYMNASIUM = False + +pytestmark = pytest.mark.skipif(not _HAS_GYMNASIUM, reason="gymnasium not installed") + +from cloudai.configurator.gymnasium_adapter import GymnasiumAdapter # noqa: E402 + + +class _StubBaseGym(BaseGym): + """Minimal BaseGym with a ``test_run`` attribute mirroring CloudAIGymEnv.""" + + def __init__(self) -> None: + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3], "param_b": [10, 20]} + self._observation_space: list[float] = [0.0, 0.0, 0.0] + self.test_run = SimpleNamespace(step=0) + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return self._observation_space + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + return [0.0, 0.0, 0.0], {} + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + self.test_run.step += 1 + return [1.0, 2.0, 3.0], 0.5, False, {} + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class TestStepIsMonotonicTrialIndex: + """``test_run.step`` is a trial index, not an episode-local counter.""" + + def test_step_advances_within_single_episode(self) -> None: + gym = _StubBaseGym() + adapter = GymnasiumAdapter(gym) + adapter.reset() + + seen: list[int] = [] + for _ in range(3): + adapter.step({"param_a": 0, "param_b": 0}) + seen.append(gym.test_run.step) + + assert seen == [1, 2, 3] + + def test_step_is_monotonic_across_episode_boundaries(self) -> None: + """The bug: ``reset()`` rewinds ``_step_count`` to 0, so the next + ``step()`` writes ``test_run.step = 1`` again. With contextual-bandit + RLlib (one step per episode) this means every trial reports step 1. + """ + gym = _StubBaseGym() + adapter = GymnasiumAdapter(gym) + + seen: list[int] = [] + for _ in range(5): + adapter.reset() + adapter.step({"param_a": 0, "param_b": 0}) + seen.append(gym.test_run.step) + + assert seen == [1, 2, 3, 4, 5], ( + f"test_run.step must be a monotonic trial index across episodes; got {seen}. " + "reset() is a Gym episode boundary, not a trial boundary; rewinding the " + "trial counter collapses every contextual-bandit rollout onto step 1." + ) + + def test_mixed_within_and_across_episode_steps_are_monotonic(self) -> None: + gym = _StubBaseGym() + adapter = GymnasiumAdapter(gym) + + seen: list[int] = [] + for episode_len in (2, 1, 3): + adapter.reset() + for _ in range(episode_len): + adapter.step({"param_a": 0, "param_b": 0}) + seen.append(gym.test_run.step) + + assert seen == [1, 2, 3, 4, 5, 6], ( + f"test_run.step must be a monotonic trial index regardless of episode shape; got {seen}" + ) + + +class _ContextualStubBaseGym(BaseGym): + """BaseGym stub that simulates CloudAIGymEnv's contextual-obs contract. + + ``reset()`` returns an observation with a per-trial context value in + slot 1 (mimicking how the upstream env writes a sampled env_param into + the obs vector built at the trial boundary). Each call to ``reset()`` + advances the simulated trial counter so we can assert the adapter + surfaces the *current* context, not a stale one. + """ + + def __init__(self, contexts: list[float]) -> None: + self._contexts = list(contexts) + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3], "param_b": [10, 20]} + self.test_run = SimpleNamespace(step=0) + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return [0.0, 0.0] + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + ctx = self._contexts[self.test_run.step] + self.test_run.step += 1 + return [0.0, ctx], {} + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + ctx = self._contexts[self.test_run.step - 1] + return [42.0, ctx], 0.5, False, {} + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class TestAdapterPropagatesContextualObservation: + """The adapter must pass through env-built observations unchanged. + + With the contextual-bandit fix in cloudai, ``CloudAIGymEnv.reset()`` + samples env_params at the trial boundary and bakes them into the obs + vector before returning. RLlib's policy reads obs from + ``adapter.reset()``, so the adapter must propagate that vector verbatim + (modulo numpy-float32 casting). The same propagation invariant applies + on ``adapter.step()``. + """ + + def test_reset_propagates_context_into_observation(self) -> None: + contexts = [0.001, 0.0, 0.01, 0.001] + gym = _ContextualStubBaseGym(contexts) + adapter = GymnasiumAdapter(gym) + + seen: list[float] = [] + for _ in range(len(contexts)): + obs, _info = adapter.reset() + seen.append(float(obs[1])) + + assert seen == pytest.approx(contexts, rel=1e-5), ( + f"adapter.reset() must surface the trial's context value (slot 1) verbatim " + f"(modulo float32 cast); got {seen}, expected {contexts}" + ) + + def test_step_propagates_context_into_observation(self) -> None: + contexts = [0.0, 0.01, 0.001] + gym = _ContextualStubBaseGym(contexts) + adapter = GymnasiumAdapter(gym) + + for ctx in contexts: + adapter.reset() + obs, _r, _term, _trunc, _info = adapter.step({"param_a": 0, "param_b": 0}) + assert float(obs[0]) == pytest.approx(42.0, rel=1e-5), ( + f"adapter.step() must propagate the env's measured-metric slot; got {obs[0]}" + ) + assert float(obs[1]) == pytest.approx(ctx, rel=1e-5), ( + f"adapter.step() must propagate the trial's context value; got {obs[1]}, expected {ctx}" + ) + + +class TestEncodeDecodeAreInverse: + """``encode_action`` is the inverse of ``decode_action`` on native values. + + Consumers (e.g. RLlib warm-start / behavioral cloning) must be able to + express a recorded native config in the policy's action space without + reaching into adapter internals. The public pair guarantees + ``decode_action(encode_action(v)) == v`` for any native ``v``. + """ + + def test_discrete_round_trip_decode_of_encode_is_identity(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + native = {"param_a": 3, "param_b": 10} + assert adapter.decode_action(adapter.encode_action(native)) == native + + def test_discrete_encode_of_decode_is_identity(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + action = {"param_a": 2, "param_b": 1} + assert adapter.encode_action(adapter.decode_action(action)) == action + + def test_encode_maps_value_to_candidate_index(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + assert adapter.encode_action({"param_a": 2, "param_b": 20}) == {"param_a": 1, "param_b": 1} + + def test_encode_rejects_non_candidate_value(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + with pytest.raises(ValueError, match="not a candidate"): + adapter.encode_action({"param_a": 7, "param_b": 10}) + + def test_encode_rejects_key_mismatch(self) -> None: + adapter = GymnasiumAdapter(_StubBaseGym()) + with pytest.raises(ValueError, match="keys mismatch"): + adapter.encode_action({"param_a": 1}) # missing param_b + + +class _StructuredStubBaseGym(BaseGym): + """BaseGym stub that opts in/out of the structured (Dict) obs path. + + Mirrors ``CloudAIGymEnv``'s contract: ``structured_observation_descriptors`` + returns ``None`` for a metrics-only env and a per-leaf descriptor dict otherwise; + the per-trial regime is reported on ``info["env_params"]``; ``encode_env_params`` + turns that regime into the matching encoded leaves. + """ + + def __init__( + self, + descriptors: Optional[dict[str, ObsLeafDescriptor]], + obs_dim: int = 2, + regime: Optional[dict[str, Any]] = None, + ) -> None: + self._descriptors = descriptors + self._obs_dim = obs_dim + if regime is not None: + self._regime = regime + elif descriptors: + self._regime = {name: (0 if d.kind == "discrete" else 0.0) for name, d in descriptors.items()} + else: + self._regime = {} + self._action_space: dict[str, Any] = {"param_a": [1, 2, 3]} + self.test_run = SimpleNamespace(step=0) + super().__init__() + + def define_action_space(self) -> dict[str, Any]: + return self._action_space + + def define_observation_space(self) -> list: + return [0.0] * self._obs_dim + + def structured_observation_descriptors(self) -> Optional[dict[str, ObsLeafDescriptor]]: + return self._descriptors + + def encode_env_params(self, env_params: dict[str, Any]) -> dict[str, Any]: + out: dict[str, Any] = {} + assert self._descriptors is not None + for name, desc in self._descriptors.items(): + raw = env_params[name] + if desc.kind == "discrete": + out[name] = int(raw) + elif desc.dim == 2: + out[name] = [1.0 if raw <= 0.0 else 0.0, float(raw)] + else: + out[name] = [float(raw)] + return out + + def _info(self) -> dict[str, Any]: + return {"env_params": dict(self._regime)} if self._descriptors is not None else {} + + def reset( + self, + seed: Optional[int] = None, + options: Optional[dict[str, Any]] = None, + ) -> tuple[list, dict[str, Any]]: + return [0.0] * self._obs_dim, self._info() + + def step(self, action: Any) -> tuple[list, float, bool, dict]: + return [0.0] * self._obs_dim, 0.5, False, self._info() + + def render(self, mode: str = "human") -> None: + return None + + def seed(self, seed: Optional[int] = None) -> None: + pass + + +class TestStructuredObsGate: + """D1: the structured (Dict) obs space is opt-in; metrics-only stays flat Box.""" + + def test_metrics_only_env_falls_back_to_box(self) -> None: + """An env that opts out (``structured_observation_descriptors`` -> None) keeps a flat Box. + + This is the blast-radius guard: non-DR workloads (BO/GA/MAB on plain + metrics) must NOT silently switch to a Dict obs layout. + """ + import gymnasium + + gym_env = _StructuredStubBaseGym(descriptors=None, obs_dim=3) + adapter = GymnasiumAdapter(gym_env) + + assert isinstance(adapter.observation_space, gymnasium.spaces.Box) + assert adapter.observation_space.shape == (3,) + + def test_env_param_env_uses_dict(self) -> None: + """An env with declared env_params exposes ``Dict({"observation": Box, "context": Dict(...)})``.""" + import gymnasium + + descriptors = { + "bus_bw": ObsLeafDescriptor(kind="box", dim=1), + "drop_rate": ObsLeafDescriptor(kind="box", dim=2), + } + adapter = GymnasiumAdapter(_StructuredStubBaseGym(descriptors=descriptors)) + + observation_space = adapter.observation_space + assert isinstance(observation_space, gymnasium.spaces.Dict) + assert set(observation_space) == {"observation", "context"} + context_space = observation_space["context"] + assert isinstance(context_space, gymnasium.spaces.Dict) + assert set(context_space) == {"bus_bw", "drop_rate"} + assert context_space["drop_rate"].shape == (2,) + + +class TestCategoricalLeafSubspace: + """D3: a categorical (discrete) descriptor maps to ``Discrete(k)`` and decodes to an int.""" + + def test_discrete_descriptor_becomes_discrete_space(self) -> None: + import gymnasium + + descriptors = { + "bus_bw": ObsLeafDescriptor(kind="box", dim=1), + "variant": ObsLeafDescriptor(kind="discrete", dim=1, n=3), + } + adapter = GymnasiumAdapter(_StructuredStubBaseGym(descriptors=descriptors)) + + observation_space = adapter.observation_space + assert isinstance(observation_space, gymnasium.spaces.Dict) + context_space = observation_space["context"] + assert isinstance(context_space, gymnasium.spaces.Dict) + variant_space = context_space["variant"] + assert isinstance(variant_space, gymnasium.spaces.Discrete) + assert int(variant_space.n) == 3 + + def test_discrete_leaf_emitted_as_int_index(self) -> None: + """The emitted obs for a discrete leaf is an ``int`` the Discrete space accepts.""" + descriptors = {"variant": ObsLeafDescriptor(kind="discrete", dim=1, n=3)} + gym_env = _StructuredStubBaseGym(descriptors=descriptors, obs_dim=1, regime={"variant": 2}) + adapter = GymnasiumAdapter(gym_env) + + obs, _ = adapter.reset() + assert isinstance(obs["context"]["variant"], int) + assert adapter.observation_space.contains(obs) diff --git a/uv.lock b/uv.lock index 7087498a5..3e71268af 100644 --- a/uv.lock +++ b/uv.lock @@ -287,6 +287,7 @@ dependencies = [ [package.optional-dependencies] dev = [ { name = "build" }, + { name = "gymnasium" }, { name = "import-linter" }, { name = "pandas-stubs" }, { name = "pre-commit" }, @@ -317,6 +318,9 @@ docs-cms = [ { name = "sphinx-rtd-theme" }, { name = "sphinxcontrib-mermaid" }, ] +rl = [ + { name = "gymnasium" }, +] [package.metadata] requires-dist = [ @@ -325,6 +329,8 @@ requires-dist = [ { name = "bokeh", specifier = "~=3.8" }, { name = "build", marker = "extra == 'dev'", specifier = "~=1.4" }, { name = "click", specifier = "~=8.3" }, + { name = "gymnasium", marker = "extra == 'dev'", specifier = "~=1.2" }, + { name = "gymnasium", marker = "extra == 'rl'", specifier = "~=1.2" }, { name = "huggingface-hub", specifier = "~=1.4" }, { name = "import-linter", marker = "extra == 'dev'", specifier = "~=2.10" }, { name = "jinja2", specifier = "~=3.1.6" }, @@ -357,7 +363,16 @@ requires-dist = [ { name = "vulture", marker = "extra == 'dev'", specifier = "==2.14" }, { name = "websockets", specifier = "~=16.0" }, ] -provides-extras = ["dev", "docs", "docs-cms"] +provides-extras = ["dev", "rl", "docs", "docs-cms"] + +[[package]] +name = "cloudpickle" +version = "3.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330, upload-time = "2025-11-03T09:25:26.604Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228, upload-time = "2025-11-03T09:25:25.534Z" }, +] [[package]] name = "colorama" @@ -683,6 +698,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8a/0e/97c33bf5009bdbac74fd2beace167cab3f978feb69cc36f1ef79360d6c4e/exceptiongroup-1.3.1-py3-none-any.whl", hash = "sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598", size = 16740, upload-time = "2025-11-21T23:01:53.443Z" }, ] +[[package]] +name = "farama-notifications" +version = "0.0.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ec/91/14397890dde30adc4bee6462158933806207bc5dd10d7b4d09d5c33845cf/farama_notifications-0.0.6.tar.gz", hash = "sha256:b19acac4bb41d76e59e03394b5dd165f4761c86fa327f56307a35cbee3b60158", size = 2517, upload-time = "2026-04-24T08:43:57.603Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/f0/21f81892e4ed10f4ec3ef2e7cf8635fb76e7c0907c55d0da66be50094760/farama_notifications-0.0.6-py3-none-any.whl", hash = "sha256:f84839188efa1ce5bb361c2a84881b2dc2c0d0d7fb661ff00421820170930935", size = 2897, upload-time = "2026-04-24T08:43:56.785Z" }, +] + [[package]] name = "fastapi" version = "0.136.3" @@ -893,6 +917,23 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/48/b2/b096ccce418882fbfda4f7496f9357aaa9a5af1896a9a7f60d9f2b275a06/grpcio-1.78.0-cp314-cp314-win_amd64.whl", hash = "sha256:dce09d6116df20a96acfdbf85e4866258c3758180e8c49845d6ba8248b6d0bbb", size = 4929852, upload-time = "2026-02-06T09:56:45.885Z" }, ] +[[package]] +name = "gymnasium" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cloudpickle" }, + { name = "farama-notifications" }, + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.4.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version == '3.11.*'" }, + { name = "numpy", version = "2.5.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.12'" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4d/ff/14b6880d703dfaca204490979d3254ccd280c99550798993319902873658/gymnasium-1.3.0.tar.gz", hash = "sha256:6939e86e835d6b71b6ba6bfd360487420876deafc79bfb7bacba83a7c446bcf3", size = 830646, upload-time = "2026-04-22T13:47:14.155Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/73/fda6a25f3beeb5e49d74330b44092b9e5a547395ccd478d1103ddcbff1fc/gymnasium-1.3.0-py3-none-any.whl", hash = "sha256:6b8c159a8540dcbcb221722d7efda24d78ebbcbc3bd2ea1c2611aa2a34471fc2", size = 953904, upload-time = "2026-04-22T13:47:12.13Z" }, +] + [[package]] name = "h11" version = "0.16.0"