diff --git a/scripts/use_q_ide.py b/scripts/use_q_ide.py index 32dd7f6..ba42d92 100644 --- a/scripts/use_q_ide.py +++ b/scripts/use_q_ide.py @@ -6,7 +6,6 @@ """ import logging -from dataclasses import dataclass from pathlib import Path from typing import Any @@ -14,7 +13,7 @@ from crane_controller.envs.controlled_crane_pendulum import AntiPendulumConfig, AntiPendulumEnv from crane_controller.envs.simple_test_env import SimpleTestEnv from crane_controller.experiment_config import RewardConfig -from crane_controller.q_agent import QLearningAgent +from crane_controller.q_agent import QLearningAgent, QLearningConfig logging.basicConfig(level=logging.INFO, format="%(message)s") LOGGER = logging.getLogger(__name__) @@ -22,97 +21,70 @@ USE_DISCRETE2 = 2 -@dataclass(kw_only=True, frozen=True, slots=True) -class Config: - """Data for experiments performed in this module. - - Args: - v0: start speed of load in x-direction. 0: Pendulum mode, >/< 0 same/random start at every episode - randomize_start: Optionally randomize the start speed within +/- v0. Default: False - render: render mode of environment - file: Optional definition of model-save file - use_file: How 'file' is used (if exists): 'r', 'w', 'rw' - episodes: nnumber of episodes run in the training - steps: number of steps per episodes (if not terminated or truncated) - dt: step-size per time step - r_fac: optional weight factors (RewardConfig) for reward - r_limit: optional reward limit - disc: discount rate of acceleration history to include in observation - lr: optionally change the learning rate - seed: optionally change the start seed - - """ - - v0: float = 1.0 - randomize_start: bool = False - render: str = "none" - discretization: str = "energy" - file: str | None = None - use_file: str = "r" - episodes: int = 10000 - steps: int = 1000 - dt: float = 1.0 - rc: RewardConfig | None = None - r_limit: float | None = None - discount: float = 0.8 - seed: int = 1 - strategy: str = "default" - lr: float = 0.1 - eps: float = 1e-10 - if rc is None: - rc = RewardConfig(energy=1.0, positional=1.0, crane_velocity=0.5) - - -def do_use(conf: Config | dict[str, Any] | None = None) -> None: +def do_use(conf: dict[str, Any]) -> None: """Perform training on the (Anti-)Pendulum environment using q-learning. Args: conf: Configuration data set. See Config class for all definitions. """ - _conf = Config() if conf is None else (Config(**conf) if isinstance(conf, dict) else conf) - env = AntiPendulumEnv( - build_crane, - conf=AntiPendulumConfig( - start_speed=_conf.v0, - randomize_start=_conf.randomize_start, - seed=_conf.seed, - dt=_conf.dt, - render_mode=_conf.render, - discrete=_conf.discretization, - reward_fac=_conf.rc, - reward_limit=_conf.r_limit, - discount=_conf.discount, - ), + _e_conf = AntiPendulumConfig() # default values + e_conf = AntiPendulumConfig( + acc=conf.get("acc", _e_conf.acc), + start_speed=conf.get("start_speed", _e_conf.start_speed), + randomize_start=conf.get("randomize_start", _e_conf.randomize_start), + render_mode=conf.get("render_mode", _e_conf.render_mode), + rail_limit=conf.get("rail_limit", _e_conf.rail_limit), + seed=conf.get("seed", _e_conf.seed), + reward_limit=conf.get("reward_limit", _e_conf.reward_limit), + dt=conf.get("dt", _e_conf.dt), + discrete=conf.get("discrete", _e_conf.discrete), + reward_fac=conf.get("reward_fac", _e_conf.reward_fac), + continuous_actions=conf.get("continuous_actions", _e_conf.continuous_actions), + length=conf.get("length", _e_conf.length), + q_factor=conf.get("q_factor", _e_conf.q_factor), ) - - filename = Path(_conf.file) if _conf.file is not None else None + env = AntiPendulumEnv(build_crane, conf=e_conf) + _a_conf = QLearningConfig() # default values + a_conf = QLearningConfig( + learning_rate=conf.get("learning_rate", _a_conf.learning_rate), + epsilon_decay=conf.get("epsilon_decay", _a_conf.epsilon_decay), + final_epsilon=conf.get("final_epsilon", _a_conf.final_epsilon), + discount_factor=conf.get("discount_factor", _a_conf.discount_factor), + ) + filename = conf.get("file") if filename is not None: - filename.parent.mkdir(parents=True, exist_ok=True) - agent = QLearningAgent(env, filename=filename, use_file=_conf.use_file, strategy=_conf.strategy) + Path(filename).parent.mkdir(parents=True, exist_ok=True) + agent = QLearningAgent( + env, + conf=a_conf, + filename=filename, + use_file=conf.get("use_file", "w"), + strategy=conf.get("strategy", "default"), + ) LOGGER.info(f"DISCRETE: {agent.env.discrete}") - agent.do_episodes(n_episodes=_conf.episodes, max_steps=_conf.steps, show=0) + agent.do_episodes(n_episodes=conf.get("episodes", 10), max_steps=conf.get("steps", 1000), show=0) if filename is not None and "w" in agent.use_file: LOGGER.info(f"Model saved to {filename}") -def simple_env(episodes: int, render: str, file: str, use: str, r_limit: float | None, steps: int) -> None: +def simple_env(episodes: int, render_mode: str, file: str, use: str, reward_limit: float | None, steps: int) -> None: """Define a SimpleTest environment. Args: episodes: number of episodes - render: render mode + render_mode: render_mode mode file: Optional definition of model-save file use: How 'file' is used (if exists): 'r', 'w', 'rw' - r_limit: optional reward limit + reward_limit: optional reward limit steps: number of steps per episodes (if not terminated or truncated) """ env = SimpleTestEnv( reward_fac=(1.0, 1.0), - reward_limit=r_limit, + reward_limit=reward_limit, dt=1.0, - render_mode=render, + render_mode=render_mode, ) - agent = QLearningAgent(env, filename=Path(file), use_file=use) + agent = QLearningAgent(env, filename=file, use_file=use) agent.do_episodes(n_episodes=episodes, max_steps=steps) @@ -125,24 +97,33 @@ def update_conf(conf: dict["str", Any], updates: dict["str", Any]) -> dict["str" if __name__ == "__main__": # ruff: disable[ERA001] ## we intentionally work with commenting out lines here - # do_use( v0, render, file, use_file, episodes, steps, rc, reward, s, seed, ) + # do_use( start_speed, render_mode, file, use_file, episodes, steps, reward_fac, reward, s, seed, ) ## Anti-pendulum training and results: conf1 = { - "discretization": "phase", - "v0": 2.0, - "render": "data", - "file": MODELS / "q_anti-pendulum_2.json", + "discrete": "phase", + "start_speed": 2.0, + "randomize_start": False, + "render_mode": "data", + "file": MODELS / "q_anti-pendulum1.json", "use_file": "rw", - "episodes": 3000, - "r_limit": -0.1, + "steps": 1000, + "episodes": 50000, + "reward_fac": RewardConfig(energy=1.0, positional=1.0, crane_velocity=0.5), + "reward_limit": -0.001, "seed": 43, + "q_factor": 500, } + _conf1 = update_conf(conf1, {"use_file": "r", "episodes": 10, "render_mode": "plot"}) # do_use(conf1) - do_use(update_conf(conf1, {"use_file": "r", "episodes": 10, "render": "plot"})) + do_use(_conf1) + # do_use(update_conf(conf1, {"use_file": "r", "episodes": 10, "render_mode": "plot"})) + # conf2 = update_conf(conf1, {"file": MODELS / "q_anti-pendulum2.json", "randomize_start": True}) + # do_use(conf2) + ## Pendulum training and results: - # conf0 = update_conf(conf1, {'v0':0.0,'file':MODELS / "q_pendulum.json",'r_limit':1000.0}) # start a pendulum - # do_use( update_conf( conf0, {'use_file':"r", 'episodes':10,'render':'plot'})) + # conf0 = update_conf(conf1, {'start_speed':0.0,'file':MODELS / "q_pendulum.json",'reward_limit':1000.0}) + # do_use( update_conf( conf0, {'use_file':"r", 'episodes':10,'render_mode':'plot'})) # do_use(conf0) - # simple_env(episodes=50000, render="none", file=models/"q_simple.json", use="w", r_limit=29.4, steps=200) - # simple_env(episodes=10, render="plot", file=models/"q_simple.json", use="r", r_limit=29.7, steps=20) + # simple_env(episodes=50000, render_mode="none", file=models/"q_simple.json", use="w", reward_limit=29.4, steps=200) + # simple_env(episodes=10, render_mode="plot", file=models/"q_simple.json", use="r", reward_limit=29.7, steps=20) # ruff: enable[ERA001] diff --git a/src/crane_controller/envs/controlled_crane_pendulum.py b/src/crane_controller/envs/controlled_crane_pendulum.py index f3c68d5..62c51ff 100644 --- a/src/crane_controller/envs/controlled_crane_pendulum.py +++ b/src/crane_controller/envs/controlled_crane_pendulum.py @@ -53,6 +53,8 @@ class AntiPendulumConfig: continuous_actions: If True, the action space is ``Box([-1], [1])`` and an action value in ``[-1, 1]`` is scaled by ``acc`` to produce the crane acceleration. If False, the action space is ``Discrete(3)`` with mapping``0=-acc, 1=0, 2=+acc`` (Q-agent compatible). + length: the length of the crane wire (and the pedestal) + q_factor: the damping factor of the pendulum action """ acc: float = 0.1 @@ -67,6 +69,8 @@ class AntiPendulumConfig: reward_fac: RewardConfig | None = None continuous_actions: bool = False discount: float = 0.8 + length: float = 10.0 + q_factor: float = 50.0 class AntiPendulumEnv(gym.Env[tuple[int, ...] | np.ndarray, int]): @@ -117,7 +121,7 @@ def __init__(self, crane: Callable[..., Crane], conf: AntiPendulumConfig | None self.crane_maker = crane self.conf = AntiPendulumConfig() if conf is None else conf self.render_mode: str | None = self.conf.render_mode # gymnasium convention: expose as direct attribute - self.crane: Crane = crane() + self.crane: Crane = crane(length=self.conf.length, q_factor=self.conf.q_factor) self.wire: Wire = self.crane.boom_by_name("wire") # type: ignore[assignment] # Wire is a sub-class of Boom assert isinstance(self.wire, Wire), "Need a crane wire!" assert self.conf.render_mode in AntiPendulumEnv.metadata["render_modes"], ( # type: ignore[operator] # metadata values are typed as object @@ -383,14 +387,17 @@ def _get_obs(self, acc: float = 0.0) -> tuple[np.ndarray | tuple[int, ...], floa position = -abs(self.crane.position[0]) acc_penalty = -abs(acc) rc = self.reward_fac - self.reward = ( - rc.energy * energy - + rc.positional * positional - + rc.time * (-self.time) - + rc.position * position - + rc.acceleration * acc_penalty - + rc.crane_velocity * self.crane.velocity[0] ** 2 - ) + self.reward = rc.energy * energy + for rc_fac, rc_base in { + rc.positional: positional, + rc.time: (-self.time), + rc.position: position, + rc.acceleration: acc_penalty, + rc.crane_velocity: self.crane.velocity[0] ** 2, + rc.t_min_crane: self._t_min_crane(), + }.items(): + if rc_fac != 0.0: + self.reward += rc_fac * rc_base if len(self.discrete): self.obs, truncate = self._get_discrete_obs(energy, acc)