From 08c6e1f1733843b4b30c5ea712340db5565b02c5 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 7 Apr 2026 11:25:05 +0200 Subject: [PATCH 1/2] Add plot_reward_shapes.py --- plot_reward_shapes.py | 199 ++++++++++++++++++++++++++++++++++++++ src/reward_calculation.py | 20 ++-- 2 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 plot_reward_shapes.py diff --git a/plot_reward_shapes.py b/plot_reward_shapes.py new file mode 100644 index 0000000..d1e56db --- /dev/null +++ b/plot_reward_shapes.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +""" +Visualize reward component shapes from the live RewardCalculator code. + +All shapes are computed by calling actual methods in src/reward_calculation.py, +so any change to constants or formulas is immediately reflected here. + +Usage: + python plot_reward_shapes.py # interactive window + python plot_reward_shapes.py -o shapes.png # save to file + python plot_reward_shapes.py --context-avg 120 # different price baseline +""" + +import argparse + +import matplotlib.pyplot as plt +import numpy as np + +from src.config import CORES_PER_NODE, MAX_NODES, WEEK_HOURS +from src.reward_calculation import RewardCalculator + +class _MockPrices: + """Minimal Prices stand-in — no real data needed for shape visualization.""" + MIN_PRICE: float = -5.24 # €/MWh + MAX_PRICE: float = 207.98 # €/MWh + + def get_price_context(self): + # (None, ...) makes _price_context_average fall back to average_future_price param + return None, 0.0 + + +def _make_calc() -> RewardCalculator: + return RewardCalculator(_MockPrices()) # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- +# Individual panel functions — each calls the real RewardCalculator method +# --------------------------------------------------------------------------- + +def _plot_efficiency(ax: plt.Axes, calc: RewardCalculator) -> None: + """Efficiency reward vs core utilization % (node count cancels out analytically).""" + util_pct = np.linspace(0.0, 1.0, 300) + N = 100 # representative count; result is invariant to N for any fixed util% + rewards = [ + calc._reward_energy_efficiency_utilization_normalized(N, int(u * N * CORES_PER_NODE)) + for u in util_pct + ] + all_off = calc._reward_energy_efficiency_utilization_normalized(0, 0) + + ax.plot(util_pct * 100, rewards, lw=2) + ax.scatter([0], [all_off], color='red', zorder=5, label=f'all-off → {all_off:.2f}') + ax.axhline(0, color='k', lw=0.5) + ax.axvline(calc.EFFICIENCY_TARGET_RATIO * 100, color='gray', ls='--', lw=1, + label=f'target = {calc.EFFICIENCY_TARGET_RATIO:.0%}') + ax.set_xlabel('Core utilization %') + ax.set_ylabel('Reward') + ax.set_title('Efficiency') + ax.set_xlim(0, 100) + ax.set_ylim(-1.1, 1.1) + ax.legend(fontsize=8) + ax.grid(True, alpha=0.3) + + +def _plot_price(ax: plt.Axes, calc: RewardCalculator, context_avg: float) -> None: + """Price-timing reward vs current price for several equivalent-node loads.""" + prices = np.linspace(_MockPrices.MIN_PRICE, _MockPrices.MAX_PRICE, 400) + eq_nodes_list = [5, 15, 30, 100, MAX_NODES] + colors = plt.cm.viridis(np.linspace(0.1, 0.9, len(eq_nodes_list))) # type: ignore[attr-defined] + + for eq_nodes, color in zip(eq_nodes_list, colors): + used_cores = int(eq_nodes * CORES_PER_NODE) + rewards = [calc._reward_price_utilization(float(p), context_avg, used_cores) for p in prices] + ax.plot(prices, rewards, color=color, lw=1.5, label=f'{eq_nodes} eq.nodes') + + ax.axhline(0, color='k', lw=0.5) + ax.axvline(0, color='k', lw=0.5, ls=':') + ax.axvline(context_avg, color='gray', ls='--', lw=1, label=f'ctx avg = {context_avg:.0f}') + ax.set_xlabel('Current price (€/MWh)') + ax.set_ylabel('Reward') + ax.set_title('Price timing') + ax.set_xlim(_MockPrices.MIN_PRICE, _MockPrices.MAX_PRICE) + ax.legend(fontsize=7) + ax.grid(True, alpha=0.3) + + +def _plot_idle(ax: plt.Axes, calc: RewardCalculator) -> None: + """Idle penalty vs number of idle nodes.""" + idle_counts = np.arange(0, MAX_NODES + 1) + penalties = [calc._penalty_idle_normalized(int(n)) for n in idle_counts] + + ax.plot(idle_counts, penalties, lw=2) + ax.axhline(0, color='k', lw=0.5) + ax.set_xlabel('Idle nodes') + ax.set_ylabel('Penalty') + ax.set_title('Idle penalty') + ax.set_xlim(0, MAX_NODES) + ax.set_ylim(-1.1, 0.1) + ax.grid(True, alpha=0.3) + + +def _plot_job_age(ax: plt.Axes, calc: RewardCalculator) -> None: + """Job age penalty vs maximum job age in the queue.""" + max_ages = np.linspace(0, 2 * WEEK_HOURS, 300) + penalties = [] + for age in max_ages: + fake_queue = np.array([[1.0, float(age)]]) # columns: [duration, age] + penalties.append(calc._penalty_job_age_normalized(num_off_nodes=1, job_queue_2d=fake_queue)) + + tau_h = WEEK_HOURS / 2.0 + ax.plot(max_ages, penalties, lw=2) + ax.axhline(0, color='k', lw=0.5) + ax.axvline(tau_h, color='gray', ls=':', lw=1, label=f'τ = {tau_h:.0f}h') + ax.axvline(WEEK_HOURS, color='gray', ls='--', lw=1, label=f'1 week = {WEEK_HOURS}h') + ax.set_xlabel('Max job age (hours)') + ax.set_ylabel('Penalty') + ax.set_title('Job age penalty') + ax.set_xlim(0, 2 * WEEK_HOURS) + ax.set_ylim(-1.1, 0.1) + ax.legend(fontsize=8) + ax.grid(True, alpha=0.3) + + +def _plot_blackout(ax: plt.Axes, calc: RewardCalculator) -> None: + """Blackout term vs unprocessed job count (all nodes off).""" + job_counts = np.arange(0, 600) + penalties = [calc._blackout_term(0, 0, int(n)) for n in job_counts] + + ax.plot(job_counts, penalties, lw=2) + ax.axhline(0, color='k', lw=0.5) + ax.set_xlabel('Unprocessed jobs (all nodes off)') + ax.set_ylabel('Reward') + ax.set_title('Blackout term') + ax.set_xlim(0, 600) + ax.grid(True, alpha=0.3) + + +def _plot_drop(ax: plt.Axes, calc: RewardCalculator) -> None: + """Drop penalty vs jobs dropped this step.""" + counts = np.linspace(0, 150, 400) + penalties = [calc._penalty_drop(int(n)) for n in counts] + + for n in [1, 10, 20, 50]: + y = calc._penalty_drop(n) + ax.annotate(f'{n}→{y:.2f}', (n, y), fontsize=7, + textcoords='offset points', xytext=(4, 6)) + + ax.plot(counts, penalties, lw=2) + ax.axhline(0, color='k', lw=0.5) + ax.axvline(calc.DROP_PENALTY_TAU, color='gray', ls='--', lw=1, + label=f'TAU = {calc.DROP_PENALTY_TAU:.0f}') + ax.set_xlabel('Jobs dropped this step') + ax.set_ylabel('Penalty') + ax.set_title('Drop penalty') + ax.set_xlim(0, 150) + ax.set_ylim(-1.1, 0.1) + ax.legend(fontsize=8) + ax.grid(True, alpha=0.3) + + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + +def plot_reward_shapes(output: str | None = None, context_avg: float = 80.0) -> None: + """Generate 3×2 grid of reward component shape plots.""" + calc = _make_calc() + + fig, axes = plt.subplots(3, 2, figsize=(12, 12)) + fig.suptitle('Reward component shapes (src/reward_calculation.py)', fontsize=13) + + _plot_efficiency(axes[0, 0], calc) + _plot_price(axes[0, 1], calc, context_avg=context_avg) + _plot_idle(axes[1, 0], calc) + _plot_job_age(axes[1, 1], calc) + _plot_blackout(axes[2, 0], calc) + _plot_drop(axes[2, 1], calc) + + plt.tight_layout() + + if output: + plt.savefig(output, dpi=150, bbox_inches='tight') + print(f'Saved → {output}') + else: + plt.show() + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('-o', '--output', metavar='PATH', + help='Save to file instead of showing interactively (e.g. shapes.png)') + parser.add_argument('--context-avg', type=float, default=80.0, metavar='€/MWh', + help='Context price average used as baseline in the price panel (default: 80)') + args = parser.parse_args() + plot_reward_shapes(output=args.output, context_avg=args.context_avg) + + +if __name__ == '__main__': + main() diff --git a/src/reward_calculation.py b/src/reward_calculation.py index 2e4e06f..32d7f24 100644 --- a/src/reward_calculation.py +++ b/src/reward_calculation.py @@ -315,6 +315,10 @@ def _blackout_term(self, num_used_nodes: int, num_idle_nodes: int, num_unprocess penalty = np.exp(-ratio * SATURATION_FACTOR) - 1.0 return float(np.clip(penalty, -1.0, 0.0)) + def _penalty_drop(self, num_dropped: int) -> float: + """Drop penalty: tanh saturation curve bounded in [-1, 0].""" + return min(0, PENALTY_DROPPED_JOB * num_dropped) + def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: float, average_future_price: float, num_off_nodes: int, job_queue_2d: np.ndarray, num_unprocessed_jobs: int, weights: Weights, num_dropped_this_step: int, @@ -339,30 +343,30 @@ def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: flo total_used_cores: Total cores in use across all powered nodes Returns: - Tuple of (total reward, total cost, eff_reward_norm, price_reward, - idle_penalty_norm, job_age_penalty_norm) + Tuple of (total reward, total cost, eff_reward_norm, price_reward, idle_penalty_norm, job_age_penalty_norm) """ - # 0. Energy efficiency. Reward calculation based on Workload (used nodes) (W) / Cost (C) + # 1. Energy efficiency. Reward calculation based on Workload (used nodes) (W) / Cost (C) total_cost = power_cost(num_on_nodes, total_used_cores, current_price) efficiency_reward_norm = self._reward_energy_efficiency_utilization_normalized(num_on_nodes, total_used_cores) - price_reward = self._reward_price_utilization(current_price, average_future_price, total_used_cores) - + # legacy: efficiency_reward_norm = self._reward_energy_efficiency_normalized(num_used_nodes, num_idle_nodes) efficiency_reward_norm += self._blackout_term(num_used_nodes, num_idle_nodes, num_unprocessed_jobs) efficiency_reward_weighted = weights.efficiency_weight * efficiency_reward_norm # 2. Increase reward if current price is favorable and currently useful work is high. + price_reward = self._reward_price_utilization(current_price, average_future_price, total_used_cores) + # legacy: price_reward = self._reward_price_normalized_legacy(current_price, average_future_price, total_used_cores) price_reward_weighted = weights.price_weight * price_reward # 3. penalize delayed jobs, more if they are older. but only if there are turned off nodes job_age_penalty_norm = self._penalty_job_age_normalized(num_off_nodes, job_queue_2d) job_age_penalty_weighted = weights.job_age_weight * job_age_penalty_norm - # 5. penalty for idling nodes + # 4. penalty for idling nodes idle_penalty_norm = self._penalty_idle_normalized(num_idle_nodes) idle_penalty_weighted = weights.idle_weight * idle_penalty_norm - # 6. penalty for lost jobs (aged out or rejected because queue/backlog was full) - drop_penalty = min(0, PENALTY_DROPPED_JOB * num_dropped_this_step) + # 5. penalty for lost jobs (aged out or rejected because queue/backlog was full) + drop_penalty = self._penalty_drop(num_dropped_this_step) drop_penalty_weighted = weights.drop_weight * drop_penalty reward = ( From 8f4bc0d456b0a7bfbc1600f72604b4f940a63dfd Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 7 Apr 2026 11:54:05 +0200 Subject: [PATCH 2/2] smoother drop penalty --- src/reward_calculation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/reward_calculation.py b/src/reward_calculation.py index 32d7f24..2a36188 100644 --- a/src/reward_calculation.py +++ b/src/reward_calculation.py @@ -6,7 +6,7 @@ from src.config import ( COST_IDLE_MW, COST_USED_MW, PENALTY_IDLE_NODE, - PENALTY_DROPPED_JOB, MAX_NODES, MAX_NEW_JOBS_PER_HOUR, WEEK_HOURS, + MAX_NODES, MAX_NEW_JOBS_PER_HOUR, WEEK_HOURS, CORES_PER_NODE, ) from src.prices import Prices @@ -68,6 +68,8 @@ class RewardCalculator: NEGATIVE_PRICE_OVERDRIVE_FLOOR = 0.35 NEGATIVE_PRICE_OVERDRIVE_ALLOW_ABOVE_ONE = True NEGATIVE_PRICE_OVERDRIVE_MAX_REWARD = 1.5 + # Drop penalty: tanh saturation curve. TAU=20: 1 drop≈-0.05, 10 drops≈-0.46, 50 drops≈-1.0. + DROP_PENALTY_TAU = 20.0 def __init__(self, prices: Prices) -> None: """ @@ -317,7 +319,7 @@ def _blackout_term(self, num_used_nodes: int, num_idle_nodes: int, num_unprocess def _penalty_drop(self, num_dropped: int) -> float: """Drop penalty: tanh saturation curve bounded in [-1, 0].""" - return min(0, PENALTY_DROPPED_JOB * num_dropped) + return -float(np.tanh(num_dropped / self.DROP_PENALTY_TAU)) def calculate(self, num_used_nodes: int, num_idle_nodes: int, current_price: float, average_future_price: float, num_off_nodes: int, job_queue_2d: np.ndarray,