Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ classifiers = [
"import-linter~=2.10",
"pytest-deadfixtures~=3.1",
"taplo~=0.9.3",
"gymnasium~=1.2",
]
rl = ["gymnasium~=1.2"]
Comment thread
coderabbitai[bot] marked this conversation as resolved.
docs = [
"sphinx~=8.1",
"nvidia-sphinx-theme~=0.0.8",
Expand Down
2 changes: 1 addition & 1 deletion src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
)

if run_error is not None:
raise run_error
raise run_error.with_traceback(run_error.__traceback__)

logging.info("All jobs are complete.")
return err
Expand Down
2 changes: 2 additions & 0 deletions src/cloudai/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from .base_gym import BaseGym
from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry
from .grid_search import GridSearchAgent
from .gymnasium_adapter import GymnasiumAdapter

__all__ = [
"BaseAgent",
"BaseGym",
"CloudAIGymEnv",
"GridSearchAgent",
"GymnasiumAdapter",
"TrajectoryEntry",
]
62 changes: 49 additions & 13 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from .base_agent import RewardOverrides
from .base_gym import BaseGym
from .env_params import EnvParams, write_env_params
from .env_params import EnvParams, ObsLeafDescriptor, write_env_params


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -71,6 +71,18 @@ def env_params_record_path(self) -> Path:
"""``env.csv`` lives alongside ``trajectory.csv`` so a plain ``merge`` joins them."""
return self.iteration_dir / "env.csv"

@property
def upcoming_trial(self) -> int:
"""
Index of the next trial ``step`` will run.

``step`` increments ``test_run.step`` before it samples/runs, so at rest the counter
holds the last-run trial and the next one is ``+ 1``. ``reset`` peeks this to report the
regime the first ``step`` will apply; ``step`` advances into it. The ``+ 1`` offset is
defined only here so the two paths cannot disagree.
"""
return self.test_run.step + 1

def define_action_space(self) -> Dict[str, list[Any]]:
return self.test_run.param_space

Expand All @@ -84,9 +96,10 @@ def define_observation_space(self) -> list:
Define the observation space for the environment.

Returns:
list: The observation space.
list: One float slot per agent metric (at least one), giving the correct shape
for adapters that derive ``gymnasium.spaces.Box`` from this output.
"""
return [0.0]
return [0.0] * max(len(self.test_run.test.agent_metrics), 1)
Comment thread
podkidyshev marked this conversation as resolved.

def reset(
self,
Expand All @@ -108,9 +121,10 @@ def reset(
if seed is not None:
lazy.np.random.seed(seed)
self.test_run.current_iteration = 0
observation = [0.0]
info = {}
return observation, info
info: dict[str, Any] = {}
if self.params is not None:
info["env_params"] = self.params.sample(self.upcoming_trial)
return self.define_observation_space(), info

def step(self, action: Any) -> Tuple[list, float, bool, dict]:
"""
Expand All @@ -126,12 +140,15 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
- done (bool): Whether the episode is done.
- info (dict): Additional info for debugging.
"""
trial = self.upcoming_trial
self.test_run.increment_step()
# RNG lives in the env: sample here, then apply action + sample so the run and cache key see them.
sampled_env_params = self.params.sample(self.test_run.step) if self.params else {}
sampled_env_params = self.params.sample(trial) if self.params else {}
info: dict[str, Any] = {"env_params": sampled_env_params} if self.params is not None else {}
self.test_run = self.test_run.apply_params_set(action, env_params=sampled_env_params)

cached_result = self.get_cached_trajectory_result(action, sampled_env_params)

if cached_result is not None:
logging.info(
"Retrieved cached result from trajectory with reward %s (from step %s). Skipping execution.",
Expand All @@ -147,11 +164,12 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
env_params=sampled_env_params,
)
)
return cached_result.observation, cached_result.reward, False, {}

return cached_result.observation, cached_result.reward, False, info

if not self.test_run.test.constraint_check(self.test_run, self.runner.system):
logging.info("Constraint check failed. Skipping step.")
return [-1.0], self.rewards.constraint_failure, True, {}
return [-1.0], self.rewards.constraint_failure, True, info

new_tr = copy.deepcopy(self.test_run)
new_tr.output_path = self.runner.get_job_output_path(new_tr)
Expand All @@ -173,20 +191,20 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
self.test_run.step = new_tr.step
self.test_run.output_path = new_tr.output_path

observation = self.get_observation(action)
reward = self.compute_reward(observation)
metrics = self.get_observation(action)
reward = self.compute_reward(metrics)

self.write_trajectory(
TrajectoryEntry(
step=self.test_run.step,
action=action,
reward=reward,
observation=observation,
observation=metrics,
env_params=sampled_env_params,
)
)

return observation, reward, False, {}
return metrics, reward, False, info

def render(self, mode: str = "human"):
"""
Expand Down Expand Up @@ -241,6 +259,24 @@ def get_observation(self, action: Any) -> list:
observation.append(v)
return observation

def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescriptor]]:
"""
Per-leaf descriptors for the env_param regime, or ``None`` when none are declared.

The flat observation (metrics) is unchanged; these describe the env_param leaves the
``GymnasiumAdapter`` merges with it into its structured observation ``spaces.Dict``.
"""
return self.params.observation_descriptors() if self.params is not None else None

def encode_env_params(self, env_params: dict[str, Any]) -> Dict[str, Any]:
"""
Encode a queried regime (the env_params behind an observation) into named leaves.

``env_params`` is a ``{name: drawn value}`` regime (the ``info["env_params"]`` that
``reset``/``step`` report). The adapter pairs the result with the flat metrics observation.
"""
return self.params.encode(env_params) if self.params is not None else {}

def write_trajectory(self, entry: TrajectoryEntry):
"""
Append the entry to the in-memory cache and trajectory.csv (plus env.csv when declared).
Expand Down
141 changes: 98 additions & 43 deletions src/cloudai/configurator/env_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,75 @@
from cloudai.models.workload import TestDefinition


class ObsLeafDescriptor(BaseModel):
"""
Shape of one leaf of a structured (named) observation.

Adapters use it to build the matching subspace without guessing: a ``"box"`` leaf is a
continuous vector of width ``dim``; a ``"discrete"`` leaf is a categorical of size ``n``.
Agents that consume the flat observation ignore it.
"""

model_config = ConfigDict(extra="forbid")

kind: Literal["box", "discrete"]
dim: int = 1
n: Optional[int] = None

@model_validator(mode="after")
def _validate(self) -> Self:
if self.dim < 1:
raise ValueError(f"ObsLeafDescriptor dim must be >= 1; got {self.dim}")
if self.kind == "discrete" and (self.n is None or self.n < 1):
raise ValueError(f"ObsLeafDescriptor(kind='discrete') requires n >= 1; got n={self.n}")
return self


class Encoding(Protocol):
"""
Strategy mapping an env_param's drawn value to an observation leaf.

An encoding declares its own :class:`ObsLeafDescriptor` and encodes a drawn value
into a leaf of that shape. A new strategy implements this pair without touching
:class:`EnvParam` or the adapter.
"""

def observation_descriptor(self, candidates: List[Any]) -> ObsLeafDescriptor: ...

def encode(self, value: Any, candidates: List[Any]) -> Any: ...


class CategoricalEncoding(BaseModel):
"""
Default encoding: observe the drawn value as its categorical index into ``candidates``.

Candidates are a discrete set (a ``cmd_args`` list), so the policy sees the per-trial
regime as a ``Discrete(len(candidates))`` index rather than a raw magnitude: an
arbitrary candidate list carries no ordinal meaning to encode continuously.
"""

model_config = ConfigDict(extra="forbid")

type: Literal["categorical"] = "categorical"

def observation_descriptor(self, candidates: List[Any]) -> ObsLeafDescriptor:
return ObsLeafDescriptor(kind="discrete", n=len(candidates))

def encode(self, value: Any, candidates: List[Any]) -> int:
return candidates.index(value)


class EnvParamSpec(BaseModel):
"""
Annotation marking one cmd_args field as env-sampled.

Carries only *how* to sample - the candidate values themselves live in
``cmd_args.<name>`` as a plain list. ``weights`` (optional) are positional,
aligned 1:1 with that candidate list; omit for uniform sampling. The
length match against the candidate list is a cross-field check enforced by
``TestDefinition`` (which can see ``cmd_args``); here we validate only the
weights' intrinsic shape.
aligned 1:1 with that candidate list; omit for uniform sampling. ``encoding``
(optional) selects how the drawn value is exposed to the policy as an
observation leaf, defaulting to a categorical index. The length match against
the candidate list is a cross-field check enforced by ``TestDefinition`` (which
can see ``cmd_args``); here we validate only the weights' intrinsic shape.
"""

model_config = ConfigDict(extra="forbid")
Expand All @@ -62,6 +121,10 @@ class EnvParamSpec(BaseModel):
default=None,
description="Optional probability weights aligned with the cmd_args candidate list; uniform if omitted.",
)
encoding: CategoricalEncoding = Field(
default_factory=CategoricalEncoding,
description="How the drawn value is encoded as an observation leaf (categorical index into the candidates).",
)

@model_validator(mode="after")
def _validate_weights(self) -> Self:
Expand All @@ -76,67 +139,51 @@ def _validate_weights(self) -> Self:
return self


class ObsLeafDescriptor(BaseModel):
"""
Description of one leaf of a structured (named) observation.

A structured observation maps each observed name to a self-describing leaf
so adapters can build the matching subspace without guessing: a ``"box"``
leaf becomes a continuous vector of width ``dim`` (e.g. a log-encoded
env_param as ``dim=2``); a ``"discrete"`` leaf becomes a categorical of
size ``n``. Stateless agents that consume the flat observation ignore this.
"""

model_config = ConfigDict(extra="forbid")

kind: Literal["box", "discrete"]
dim: int = 1
n: Optional[int] = None

@model_validator(mode="after")
def _validate(self) -> Self:
if self.dim < 1:
raise ValueError(f"ObsLeafDescriptor dim must be >= 1; got {self.dim}")
if self.kind == "discrete" and (self.n is None or self.n < 1):
raise ValueError(f"ObsLeafDescriptor(kind='discrete') requires n >= 1; got n={self.n}")
return self


@runtime_checkable
class StructuredObservation(Protocol):
class StructuredObservationProducer(Protocol):
"""
Optional env hooks that expose a structured (per-leaf) observation.

An env opts in by returning per-leaf :class:`ObsLeafDescriptor` from
``structured_observation_descriptors`` (``None`` keeps the flat-vector
path) and encoding a raw observation into the matching named leaves via
``encode_observation``. ``GymnasiumAdapter`` consumes these to expose a
``gymnasium.spaces.Dict`` observation; the hooks are duck-typed, so envs
need not subclass this Protocol.
Optional env hooks exposing the env_params behind the (unchanged, flat) observation.

The env keeps returning its flat observation (the metrics) and, when a regime was applied,
delivers it on the Gym ``info`` dict under ``info["env_params"]`` (the key is absent otherwise,
so its presence alone signals a non-empty regime). An env opts in by
declaring per-leaf :class:`ObsLeafDescriptor` for its env_params via
``structured_observation_descriptors`` (``None`` when none are declared) and encoding a regime
into the matching named leaves via ``encode_env_params``. ``GymnasiumAdapter`` merges the flat
metrics with these env_param leaves into its structured observation ``gymnasium.spaces.Dict``.
The hooks are duck-typed, so envs need not subclass this Protocol.
"""

def structured_observation_descriptors(self) -> Optional[Dict[str, ObsLeafDescriptor]]: ...

def encode_observation(self, observation: list) -> Dict[str, Any]: ...
def encode_env_params(self, env_params: dict[str, Any]) -> Dict[str, Any]: ...


@dataclasses.dataclass(frozen=True)
class EnvParam:
"""
One env-sampled knob, resolved from cmd_args: its candidate values and optional weights.
One env-sampled knob, resolved from cmd_args: candidates, optional weights, and encoding.

Weights (when present) are positional, aligned 1:1 with ``candidates``; ``None`` means
uniform sampling. Keeping the two together makes each knob a self-contained draw.
uniform sampling. ``encoding`` owns how a drawn value is exposed to the policy as an
observation leaf. Bundling the three makes each knob a self-contained draw-and-encode.
"""

candidates: List[Any]
weights: Optional[List[float]] = None
encoding: Encoding = dataclasses.field(default_factory=CategoricalEncoding)

def draw(self, rng: random.Random) -> Any:
if self.weights is not None:
return rng.choices(self.candidates, weights=self.weights, k=1)[0]
return rng.choice(self.candidates)

def observation_descriptor(self) -> ObsLeafDescriptor:
return self.encoding.observation_descriptor(self.candidates)

def encode(self, value: Any) -> Any:
return self.encoding.encode(value, self.candidates)


@dataclasses.dataclass(frozen=True)
class EnvParams:
Expand Down Expand Up @@ -167,7 +214,7 @@ def from_test(cls, test: "TestDefinition") -> Optional["EnvParams"]:
value = getattr(test.cmd_args, name, None)
if not isinstance(value, list):
continue
params[name] = EnvParam(candidates=value, weights=spec.weights)
params[name] = EnvParam(candidates=value, weights=spec.weights, encoding=spec.encoding)
if not params:
return None
seed = int((test.agent_config or {}).get("random_seed", 0))
Expand All @@ -183,6 +230,14 @@ def sample(self, trial: int) -> Dict[str, Any]:
"""
return {name: param.draw(random.Random(f"{self.seed}:{name}:{trial}")) for name, param in self.params.items()}

def encode(self, regime: Dict[str, Any]) -> Dict[str, Any]:
"""Encode a drawn regime (``{name: value}``) into one named observation leaf per parameter."""
return {name: param.encode(regime[name]) for name, param in self.params.items()}

def observation_descriptors(self) -> Dict[str, ObsLeafDescriptor]:
"""Per-parameter observation-leaf descriptors, keyed by parameter name."""
return {name: param.observation_descriptor() for name, param in self.params.items()}


def write_env_params(path: Path, step: int, sample: Dict[str, Any]) -> None:
"""
Expand Down
Loading
Loading