Add PPO RL controller, scenario library, and evaluation pipeline#12
Add PPO RL controller, scenario library, and evaluation pipeline#12ZhiruiLiang wants to merge 10 commits intomasterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive Reinforcement Learning (RL) framework for voltage regulation using Proximal Policy Optimization (PPO), including training environments, controllers, and evaluation scripts. Key additions include a Gymnasium-compatible environment, PPO-based controllers, and utilities for generating scenario libraries and benchmarking against model-based and rule-based strategies. Feedback focuses on significant code duplication between the new scripts and core modules, particularly regarding profile generation and grid definitions. Reviewers also highlighted design concerns such as fragile file-searching logic for normalization stats, cross-script dependencies, and a functional limitation in the shared multi-site controller regarding command routing. Refactoring is recommended to centralize shared logic and improve the modularity of the scenario handling and simulation setup.
| if len(self._site_model_mapping) > 1: | ||
| raise NotImplementedError( | ||
| "SharedPPOBatchSizeController multi-DC routing needs migration: " | ||
| "the controller must accept a dict[str, DatacenterBackend] in " | ||
| "__init__ to route per-site SetBatchSize commands. Single-site " | ||
| "shared mode still works." | ||
| ) |
There was a problem hiding this comment.
The SharedPPOBatchSizeController raises a NotImplementedError for the multi-site case, indicating that the command routing is not yet implemented. The PR description mentions this controller is for multi-site use. While the error prevents incorrect behavior, this seems like a significant limitation for a shared multi-site controller. Is the full multi-site functionality planned for a follow-up PR? If so, it would be good to mention this limitation in the documentation.
| def run_simulation( | ||
| mode: str, | ||
| *, | ||
| sys: dict, | ||
| dc_sites: dict[str, DCSite], | ||
| ofo_config, | ||
| inference_data: InferenceData, | ||
| training_trace: TrainingTrace, | ||
| logistic_models, | ||
| pv_systems: list[PVSystemSpec] | None = None, | ||
| time_varying_loads: list[TimeVaryingLoadSpec] | None = None, | ||
| tap_schedule: TapSchedule | None = None, | ||
| rule_based_config: RuleBasedConfig | None = None, | ||
| rule_zone_local: bool = False, | ||
| ppo_model: str = "", | ||
| obs_mode: str = "full-voltage", | ||
| training_overlay: dict | None = None, | ||
| save_dir: Path, | ||
| ) -> tuple[VoltageStats, object]: | ||
| """Run a simulation with the specified controller mode. | ||
|
|
||
| Modes: | ||
| 'baseline': NoopController (no batch control) | ||
| 'rule_based': RuleBasedBatchSizeController | ||
| 'ofo': OFOBatchSizeController | ||
| 'ppo': PPOBatchSizeController / SharedPPOBatchSizeController | ||
|
|
||
| rule_zone_local: when True AND sys defines `zones` AND there is more than | ||
| one DC site, each rule-based controller observes only the buses in its | ||
| own zone (looked up via the site_id key). Decentralizes credit assignment | ||
| in multi-DC topologies (ieee123). No effect for single-site systems. | ||
|
|
||
| Returns (VoltageStats, SimulationLog). | ||
| """ | ||
| pv_systems = pv_systems or [] | ||
| time_varying_loads = time_varying_loads or [] | ||
| exclude_buses = tuple(sys["exclude_buses"]) | ||
| site_ids = list(dc_sites.keys()) | ||
|
|
||
| training_run = None | ||
| if training_overlay is not None: | ||
| training_run = TrainingRun( | ||
| n_gpus=training_overlay["n_gpus"], | ||
| trace=training_trace, | ||
| target_peak_W_per_gpu=training_overlay["target_peak_W_per_gpu"], | ||
| ).at(t_start=training_overlay["t_start"], t_end=training_overlay["t_end"]) | ||
|
|
||
| datacenters: dict[str, OfflineDatacenter] = {} | ||
| controllers: list = [] | ||
| site_specs_map: dict[str, tuple[InferenceModelSpec, ...]] = {} | ||
| primary_bus = "" | ||
|
|
||
| for site_id, site in dc_sites.items(): | ||
| site_specs = tuple(md.spec for md, _ in site.models) | ||
| site_specs_map[site_id] = site_specs | ||
| site_inference = inference_data.filter_models(site_specs) | ||
|
|
||
| replica_schedules: dict[str, ReplicaSchedule] = { | ||
| md.spec.model_label: sched for md, sched in site.models | ||
| } | ||
| initial_batch_sizes = {md.spec.model_label: md.initial_batch_size for md, _ in site.models} | ||
|
|
||
| dc_config = DatacenterConfig( | ||
| gpus_per_server=8, | ||
| base_kw_per_phase=site.base_kw_per_phase, | ||
| ) | ||
| workload_kwargs: dict = { | ||
| "inference_data": site_inference, | ||
| "replica_schedules": replica_schedules, | ||
| "initial_batch_sizes": initial_batch_sizes, | ||
| } | ||
| if training_run is not None: | ||
| workload_kwargs["training"] = training_run | ||
| workload = OfflineWorkload(**workload_kwargs) | ||
|
|
||
| dc = OfflineDatacenter( | ||
| dc_config, | ||
| workload, | ||
| name=site_id, | ||
| dt_s=DT_DC, | ||
| seed=site.seed, | ||
| power_augmentation=POWER_AUG, | ||
| total_gpu_capacity=site.total_gpu_capacity, | ||
| ) | ||
| datacenters[site_id] = dc | ||
| if not primary_bus: | ||
| primary_bus = site.bus | ||
|
|
||
| # Build grid then attach DCs (master's imperative pattern; the old | ||
| # dc_loads / dc_bus kwargs are no longer accepted by OpenDSSGrid). | ||
| grid = ScenarioOpenDSSGrid( | ||
| pv_systems=pv_systems, | ||
| time_varying_loads=time_varying_loads, | ||
| source_pu=sys["source_pu"], | ||
| dss_case_dir=sys["dss_case_dir"], | ||
| dss_master_file=sys["dss_master_file"], | ||
| dt_s=DT_GRID, | ||
| initial_tap_position=sys["initial_taps"], | ||
| exclude_buses=exclude_buses, | ||
| ) | ||
| dc_pf = DatacenterConfig(base_kw_per_phase=0).power_factor | ||
| for site_id, dc in datacenters.items(): | ||
| site = dc_sites[site_id] | ||
| grid.attach_dc(dc, bus=site.bus, connection_type=site.connection_type, power_factor=dc_pf) | ||
|
|
||
| if mode == "baseline": | ||
| sched = tap_schedule if tap_schedule is not None else TapSchedule(()) | ||
| else: | ||
| sched = TapSchedule(()) | ||
| controllers.append(TapScheduleController(schedule=sched, dt_s=DT_CTRL)) | ||
|
|
||
| if mode == "ofo": | ||
| from openg2g.controller.ofo import OFOBatchSizeController | ||
|
|
||
| for site_id in site_ids: | ||
| site_initial_bs = { | ||
| md.spec.model_label: md.initial_batch_size for md, _ in dc_sites[site_id].models | ||
| } | ||
| ofo_ctrl = OFOBatchSizeController( | ||
| site_specs_map[site_id], | ||
| datacenter=datacenters[site_id], | ||
| grid=grid, | ||
| models=logistic_models, | ||
| config=ofo_config, | ||
| dt_s=DT_CTRL, | ||
| initial_batch_sizes=site_initial_bs, | ||
| ) | ||
| controllers.append(ofo_ctrl) | ||
|
|
||
| elif mode == "rule_based" or mode.startswith("rule_based_"): | ||
| rb_config = rule_based_config or RuleBasedConfig(v_min=V_MIN, v_max=V_MAX) | ||
| zones = sys.get("zones") if rule_zone_local else None | ||
| for site_id in site_ids: | ||
| site_initial_bs = { | ||
| md.spec.model_label: md.initial_batch_size for md, _ in dc_sites[site_id].models | ||
| } | ||
| zone_buses = None | ||
| if zones is not None and len(site_ids) > 1 and site_id in zones: | ||
| zone_buses = tuple(zones[site_id]) | ||
| rb_ctrl = RuleBasedBatchSizeController( | ||
| site_specs_map[site_id], | ||
| datacenter=datacenters[site_id], | ||
| grid=grid, | ||
| config=rb_config, | ||
| dt_s=DT_CTRL, | ||
| exclude_buses=exclude_buses, | ||
| zone_buses=zone_buses, | ||
| initial_batch_sizes=site_initial_bs, | ||
| ) | ||
| controllers.append(rb_ctrl) | ||
|
|
||
| elif mode == "ppo": | ||
| from openg2g.controller.ppo import PPOBatchSizeController, SharedPPOBatchSizeController | ||
| from openg2g.rl.env import ObservationConfig | ||
|
|
||
| ppo_path = Path(ppo_model).resolve() | ||
|
|
||
| def _find_vecnormalize(model_file: Path) -> Path | None: | ||
| mf = Path(model_file) | ||
| stem = mf.with_suffix("").name | ||
| candidates = [ | ||
| mf.parent / f"{stem}_vecnormalize.pkl", | ||
| mf.parent / "vecnormalize.pkl", | ||
| ] | ||
| if stem.startswith("ppo_") and stem.endswith("_steps"): | ||
| candidates.append(mf.parent / f"ppo_vecnormalize_{stem[len('ppo_'):]}.pkl") | ||
| for parent in [mf.parent, *mf.parent.parents][:4]: | ||
| candidates.extend(sorted(parent.glob("ppo_model_*_vecnormalize.pkl"))) | ||
| for c in candidates: | ||
| if c.is_file(): | ||
| return c | ||
| return None | ||
|
|
||
| shared_model = None | ||
| if ppo_path.is_dir(): | ||
| cand = ppo_path / "ppo_model_shared.zip" | ||
| if cand.exists(): | ||
| shared_model = cand | ||
| elif ppo_path.suffix == ".zip": | ||
| looks_shared = ( | ||
| len(site_ids) > 1 | ||
| or "shared" in ppo_path.parts | ||
| or ppo_path.stem.startswith("ppo_model_shared") | ||
| ) | ||
| if looks_shared and ppo_path.exists(): | ||
| shared_model = ppo_path | ||
| if shared_model is not None and shared_model.exists(): | ||
| from stable_baselines3 import PPO as SB3PPO | ||
|
|
||
| sb3 = SB3PPO.load(str(shared_model.with_suffix(""))) | ||
| saved_obs_dim = sb3.observation_space.shape[0] | ||
| n_models_all = sum(len(dc_sites[sid].models) for sid in site_ids) | ||
| zones = sys.get("zones") | ||
|
|
||
| if obs_mode == "per-zone-summary": | ||
| n_bus_phases = 0 | ||
| zone_summary = {zname: tuple(zbuses) for zname, zbuses in zones.items()} if zones else None | ||
| bus_phase_groups = None | ||
| elif obs_mode == "system-summary-only": | ||
| n_bus_phases = 0 | ||
| zone_summary = None | ||
| bus_phase_groups = None | ||
| elif obs_mode == "per-bus-summary": | ||
| from openg2g.rl.env import compute_bus_phase_groups | ||
| grid.do_reset() | ||
| grid.start() | ||
| _v_index = grid.v_index | ||
| grid.stop() | ||
| bus_phase_groups = compute_bus_phase_groups(_v_index) | ||
| n_bus_phases = 2 * len(bus_phase_groups) | ||
| zone_summary = {zname: tuple(zbuses) for zname, zbuses in zones.items()} if zones else None | ||
| else: # "full-voltage" | ||
| n_bus_phases = saved_obs_dim - 3 - 5 * n_models_all | ||
| zone_summary = None | ||
| bus_phase_groups = None | ||
|
|
||
| site_model_mapping = {sid: [md.spec.model_label for md, _ in dc_sites[sid].models] for sid in site_ids} | ||
| all_init_bs = {md.spec.model_label: md.initial_batch_size for sid in site_ids for md, _ in dc_sites[sid].models} | ||
| obs_config = ObservationConfig.from_multi_site( | ||
| site_specs_map, | ||
| {sid: {md.spec.model_label: sched.initial for md, sched in dc_sites[sid].models} for sid in site_ids}, | ||
| n_bus_phases=n_bus_phases, | ||
| initial_batch_sizes=all_init_bs, | ||
| zone_summary=zone_summary, | ||
| bus_phase_groups=bus_phase_groups, | ||
| v_min=V_MIN, | ||
| v_max=V_MAX, | ||
| ) | ||
| vn_path = _find_vecnormalize(shared_model) | ||
| if vn_path is not None: | ||
| logger.info("PPO: loading VecNormalize stats from %s", vn_path) | ||
| else: | ||
| logger.warning( | ||
| "PPO: no VecNormalize stats found next to %s — policy will see UNNORMALIZED obs", shared_model | ||
| ) | ||
| ppo_ctrl = SharedPPOBatchSizeController( | ||
| datacenter=next(iter(datacenters.values())), | ||
| grid=grid, | ||
| model_path=str(shared_model), | ||
| obs_config=obs_config, | ||
| site_model_mapping=site_model_mapping, | ||
| dt_s=DT_CTRL, | ||
| vecnormalize_path=str(vn_path) if vn_path is not None else None, | ||
| ) | ||
| controllers.append(ppo_ctrl) | ||
| else: | ||
| zones = sys.get("zones") | ||
|
|
||
| for site_id in site_ids: | ||
| if ppo_path.is_dir(): | ||
| site_model = str(ppo_path / f"ppo_model_{site_id}.zip") | ||
| elif ppo_path.suffix == ".zip" and len(site_ids) == 1: | ||
| site_model = str(ppo_path) | ||
| else: | ||
| site_model = str(ppo_path.parent / f"ppo_model_{site_id}.zip") | ||
|
|
||
| from stable_baselines3 import PPO as SB3PPO | ||
|
|
||
| sb3 = SB3PPO.load(str(Path(site_model).with_suffix(""))) | ||
| saved_obs_dim = sb3.observation_space.shape[0] | ||
| n_models_site = len(dc_sites[site_id].models) | ||
| n_bus_phases = saved_obs_dim - 3 - 5 * n_models_site | ||
|
|
||
| zone_buses = None | ||
| if zones is not None and site_id in zones: | ||
| zone_buses = tuple(zones[site_id]) | ||
|
|
||
| specs = site_specs_map[site_id] | ||
| replica_counts = {md.spec.model_label: sched.initial for md, sched in dc_sites[site_id].models} | ||
| site_init_bs = {md.spec.model_label: md.initial_batch_size for md, _ in dc_sites[site_id].models} | ||
| obs_config = ObservationConfig.from_model_specs( | ||
| specs, replica_counts, n_bus_phases=n_bus_phases, initial_batch_sizes=site_init_bs, zone_buses=zone_buses, v_min=V_MIN, v_max=V_MAX | ||
| ) | ||
| vn_path = _find_vecnormalize(Path(site_model)) | ||
| if vn_path is not None: | ||
| logger.info("PPO[%s]: loading VecNormalize stats from %s", site_id, vn_path) | ||
| else: | ||
| logger.warning( | ||
| "PPO[%s]: no VecNormalize stats found next to %s — policy will see UNNORMALIZED obs", | ||
| site_id, | ||
| site_model, | ||
| ) | ||
| ppo_ctrl = PPOBatchSizeController( | ||
| specs, | ||
| datacenter=datacenters[site_id], | ||
| grid=grid, | ||
| model_path=site_model, | ||
| obs_config=obs_config, | ||
| dt_s=DT_CTRL, | ||
| vecnormalize_path=str(vn_path) if vn_path is not None else None, | ||
| ) | ||
| controllers.append(ppo_ctrl) | ||
|
|
||
| coord = Coordinator( | ||
| datacenters=list(datacenters.values()), | ||
| grid=grid, | ||
| controllers=controllers, | ||
| total_duration_s=TOTAL_DURATION_S, | ||
| ) | ||
|
|
||
| from openg2g.controller.ppo import SharedPPOBatchSizeController | ||
|
|
||
| for ctrl in controllers: | ||
| if isinstance(ctrl, SharedPPOBatchSizeController): | ||
| ctrl.attach_datacenters(list(datacenters.values())) | ||
|
|
||
| logger.info("Running %s...", mode) | ||
| log = coord.run() | ||
|
|
||
| vstats = compute_allbus_voltage_stats( | ||
| log.grid_states, | ||
| v_min=V_MIN, | ||
| v_max=V_MAX, | ||
| exclude_buses=exclude_buses, | ||
| ) | ||
| logger.info( | ||
| " %s: viol=%.1fs integral=%.4f vmin=%.4f vmax=%.4f", | ||
| mode, | ||
| vstats.violation_time_s, | ||
| vstats.integral_violation_pu_s, | ||
| vstats.worst_vmin, | ||
| vstats.worst_vmax, | ||
| ) | ||
|
|
||
| return vstats, log | ||
|
|
There was a problem hiding this comment.
The run_simulation function is quite long and complex, handling the setup for all controller types (baseline, rule_based, ofo, ppo). This reduces its readability and maintainability. Consider refactoring this function. For example, you could extract the setup logic for each controller type into its own helper function. This would make run_simulation a dispatcher and easier to follow.
| def _find_vecnormalize(model_file: Path) -> Path | None: | ||
| mf = Path(model_file) | ||
| stem = mf.with_suffix("").name | ||
| candidates = [ | ||
| mf.parent / f"{stem}_vecnormalize.pkl", | ||
| mf.parent / "vecnormalize.pkl", | ||
| ] | ||
| if stem.startswith("ppo_") and stem.endswith("_steps"): | ||
| candidates.append(mf.parent / f"ppo_vecnormalize_{stem[len('ppo_'):]}.pkl") | ||
| for parent in [mf.parent, *mf.parent.parents][:4]: | ||
| candidates.extend(sorted(parent.glob("ppo_model_*_vecnormalize.pkl"))) | ||
| for c in candidates: | ||
| if c.is_file(): | ||
| return c | ||
| return None |
There was a problem hiding this comment.
The _find_vecnormalize function has complex logic to locate the vecnormalize.pkl file, searching through multiple parent directories and trying different naming patterns. This can be fragile and hard to debug if file structures change. It would be more robust to have the training script save the path to the vecnormalize.pkl file in a metadata file alongside the model, or to enforce a stricter, simpler directory structure. For example, always save it as vecnormalize.pkl in the same directory as the model.
| def _per_step_voltage_pen(grid_states, *, v_min: float, v_max: float, exclude_buses: tuple[str, ...]) -> np.ndarray: | ||
| """Compute the per-step voltage penalty (sum of squared violation magnitude). | ||
|
|
||
| This matches ``compute_reward`` in ``openg2g/rl/env.py``: at each step the | ||
| penalty is ``sum_max(v_min - v, 0)^2 + sum_max(v - v_max, 0)^2`` over all | ||
| bus-phases (excluding the substation buses), with NaNs treated as | ||
| "not in violation". | ||
| """ | ||
| drop = {b.lower() for b in exclude_buses} | ||
| out = np.zeros(len(grid_states), dtype=np.float64) | ||
| for i, gs in enumerate(grid_states): | ||
| s = 0.0 | ||
| for bus in gs.voltages.buses(): | ||
| if bus.lower() in drop: | ||
| continue | ||
| pv = gs.voltages[bus] | ||
| for v in (pv.a, pv.b, pv.c): | ||
| if math.isnan(v): | ||
| continue | ||
| if v < v_min: | ||
| s += (v_min - v) ** 2 | ||
| elif v > v_max: | ||
| s += (v - v_max) ** 2 | ||
| out[i] = s | ||
| return out |
There was a problem hiding this comment.
This function _per_step_voltage_pen seems to duplicate the reward calculation logic from openg2g.rl.env.compute_reward. The docstring even mentions this. To avoid code duplication and potential inconsistencies in the future, consider refactoring this logic into a shared utility function that can be called from both places.
| matplotlib.use("Agg") | ||
| import matplotlib.pyplot as plt | ||
| import numpy as np | ||
| from build_scenario_library import run_simulation |
There was a problem hiding this comment.
This script imports run_simulation from build_scenario_library.py. This creates a dependency between two example scripts, where one acts as a library for the other. To improve modularity and clarity, utility functions like run_simulation that are shared across multiple scripts should be moved to a common library module within the openg2g package (e.g., openg2g.utils or openg2g.simulation). This would make the code organization cleaner and dependencies more explicit.
| # Register a stub module so pickle can resolve | ||
| # build_scenario_library.ScenarioRecord without requiring the | ||
| # script to be on sys.path. | ||
| if "build_scenario_library" not in sys.modules: | ||
| _stub = types.ModuleType("build_scenario_library") | ||
|
|
||
| class _ScenarioRecord: | ||
| pass | ||
|
|
||
| _ScenarioRecord.__module__ = "build_scenario_library" | ||
| _ScenarioRecord.__qualname__ = "ScenarioRecord" | ||
| _stub.ScenarioRecord = _ScenarioRecord | ||
| sys.modules["build_scenario_library"] = _stub |
There was a problem hiding this comment.
The ScenarioLibrary class includes a mechanism to create a stub module for build_scenario_library to handle unpickling of ScenarioRecord. While this works, it's a bit of a workaround and indicates a cyclic dependency or misplaced class definition. A cleaner design would be to define ScenarioRecord in a shared library module (e.g., openg2g.rl.scenario) that both build_scenario_library.py and this module can import from. This would remove the need for this stub and make the data structures more independent of specific scripts.
…l-dependency extra
Library code (openg2g/)
Examples (examples/offline/)
Documentation (docs/)