Skip to content

Add PPO RL controller, scenario library, and evaluation pipeline#12

Open
ZhiruiLiang wants to merge 10 commits intomasterfrom
feat/ppo-controller
Open

Add PPO RL controller, scenario library, and evaluation pipeline#12
ZhiruiLiang wants to merge 10 commits intomasterfrom
feat/ppo-controller

Conversation

@ZhiruiLiang
Copy link
Copy Markdown
Collaborator

Library code (openg2g/)

  • New module openg2g/controller/ppo.py — PPOBatchSizeController (single-site) and SharedPPOBatchSizeController (multi-site) that wrap a trained stable-baselines3 PPO policy as a Controller. Sits alongside the existing RuleBasedBatchSizeController and OFOBatchSizeController.
  • New subpackage openg2g/rl/ with env.py — a Gymnasium environment exposing the simulator as an RL training target. Provides structured observations (per-zone / per-bus / system-summary modes), composable rewards (voltage / throughput / latency / switching), and scenario sampling from a pre-built library. This is what train_ppo.py learns against.
  • Modified openg2g/controller/rule_based.py — tightened the default deadband for finer voltage tracking and added a zone_buses argument for zone-local observation (used in multi-DC ieee123 to give each site credit only for its own zone).
  • Modified openg2g/grid/opendss.py — single tiny change: downgrade a multi-bank RegControl message from info to debug to avoid log spam on ieee34.

Examples (examples/offline/)

  • New examples/offline/train_ppo.py — PPO training entrypoint. Wraps BatchSizeEnv in VecNormalize, runs stable-baselines3 PPO, saves model +VecNormalize stats + per-checkpoint snapshots + TensorBoard logs.
  • New examples/offline/build_scenario_library.py — generates randomized PV / TVL / inference-ramp scenarios, screens them by running baseline + OFO and rejecting cases with no learning signal, writes a library.pkl for the trainer.
  • New examples/offline/evaluate_controllers.py — held-out scenario eval that runs baseline / OFO / rule-based / PPO on the same scenarios and produces side-by-side voltage and throughput metrics (CSV + plots).
  • Modified examples/offline/systems.py — adds the PPO-side infrastructure layered on top of master's feeder constants: DCSite dataclass that bundles deployments with ReplicaSchedules, hardcoded model spec list (ALL_MODEL_SPECS), randomize_scenario / materialize_scenario helpers, ScenarioOpenDSSGrid for randomized PV/TVL, and with_ramp convenience for experiment factories.
  • Modified examples/offline/sweep_dc_locations.py — extends the existing 1-D and 2-D bus sweeps with a zone-constrained 3-phase sweep for ieee123 (Phase 1 screening per zone, Phase 2 combination, optional Phase 3 refinement). Also migrated to master's new grid.attach_dc(...) and Coordinator(datacenters=[...]) APIs.

Documentation (docs/)

  • New docs/examples/rl-controller.md — end-to-end walkthrough of the 3-stage PPO workflow: build scenario library → train PPO → evaluate.
  • Modified docs/examples/voltage-regulation-strategies.md — adds PPO as a fourth control strategy alongside baseline / rule-based / OFO, with a cross-link to the new RL doc.
  • Modified _zensical.toml — nav entry for the new RL example doc.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive Reinforcement Learning (RL) framework for voltage regulation using Proximal Policy Optimization (PPO), including training environments, controllers, and evaluation scripts. Key additions include a Gymnasium-compatible environment, PPO-based controllers, and utilities for generating scenario libraries and benchmarking against model-based and rule-based strategies. Feedback focuses on significant code duplication between the new scripts and core modules, particularly regarding profile generation and grid definitions. Reviewers also highlighted design concerns such as fragile file-searching logic for normalization stats, cross-script dependencies, and a functional limitation in the shared multi-site controller regarding command routing. Refactoring is recommended to centralize shared logic and improve the modularity of the scenario handling and simulation setup.

Comment thread examples/offline/sweep_dc_locations.py Outdated
Comment thread openg2g/controller/ppo.py Outdated
Comment on lines +292 to +298
if len(self._site_model_mapping) > 1:
raise NotImplementedError(
"SharedPPOBatchSizeController multi-DC routing needs migration: "
"the controller must accept a dict[str, DatacenterBackend] in "
"__init__ to route per-site SetBatchSize commands. Single-site "
"shared mode still works."
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The SharedPPOBatchSizeController raises a NotImplementedError for the multi-site case, indicating that the command routing is not yet implemented. The PR description mentions this controller is for multi-site use. While the error prevents incorrect behavior, this seems like a significant limitation for a shared multi-site controller. Is the full multi-site functionality planned for a follow-up PR? If so, it would be good to mention this limitation in the documentation.

Comment on lines +81 to +406
def run_simulation(
mode: str,
*,
sys: dict,
dc_sites: dict[str, DCSite],
ofo_config,
inference_data: InferenceData,
training_trace: TrainingTrace,
logistic_models,
pv_systems: list[PVSystemSpec] | None = None,
time_varying_loads: list[TimeVaryingLoadSpec] | None = None,
tap_schedule: TapSchedule | None = None,
rule_based_config: RuleBasedConfig | None = None,
rule_zone_local: bool = False,
ppo_model: str = "",
obs_mode: str = "full-voltage",
training_overlay: dict | None = None,
save_dir: Path,
) -> tuple[VoltageStats, object]:
"""Run a simulation with the specified controller mode.

Modes:
'baseline': NoopController (no batch control)
'rule_based': RuleBasedBatchSizeController
'ofo': OFOBatchSizeController
'ppo': PPOBatchSizeController / SharedPPOBatchSizeController

rule_zone_local: when True AND sys defines `zones` AND there is more than
one DC site, each rule-based controller observes only the buses in its
own zone (looked up via the site_id key). Decentralizes credit assignment
in multi-DC topologies (ieee123). No effect for single-site systems.

Returns (VoltageStats, SimulationLog).
"""
pv_systems = pv_systems or []
time_varying_loads = time_varying_loads or []
exclude_buses = tuple(sys["exclude_buses"])
site_ids = list(dc_sites.keys())

training_run = None
if training_overlay is not None:
training_run = TrainingRun(
n_gpus=training_overlay["n_gpus"],
trace=training_trace,
target_peak_W_per_gpu=training_overlay["target_peak_W_per_gpu"],
).at(t_start=training_overlay["t_start"], t_end=training_overlay["t_end"])

datacenters: dict[str, OfflineDatacenter] = {}
controllers: list = []
site_specs_map: dict[str, tuple[InferenceModelSpec, ...]] = {}
primary_bus = ""

for site_id, site in dc_sites.items():
site_specs = tuple(md.spec for md, _ in site.models)
site_specs_map[site_id] = site_specs
site_inference = inference_data.filter_models(site_specs)

replica_schedules: dict[str, ReplicaSchedule] = {
md.spec.model_label: sched for md, sched in site.models
}
initial_batch_sizes = {md.spec.model_label: md.initial_batch_size for md, _ in site.models}

dc_config = DatacenterConfig(
gpus_per_server=8,
base_kw_per_phase=site.base_kw_per_phase,
)
workload_kwargs: dict = {
"inference_data": site_inference,
"replica_schedules": replica_schedules,
"initial_batch_sizes": initial_batch_sizes,
}
if training_run is not None:
workload_kwargs["training"] = training_run
workload = OfflineWorkload(**workload_kwargs)

dc = OfflineDatacenter(
dc_config,
workload,
name=site_id,
dt_s=DT_DC,
seed=site.seed,
power_augmentation=POWER_AUG,
total_gpu_capacity=site.total_gpu_capacity,
)
datacenters[site_id] = dc
if not primary_bus:
primary_bus = site.bus

# Build grid then attach DCs (master's imperative pattern; the old
# dc_loads / dc_bus kwargs are no longer accepted by OpenDSSGrid).
grid = ScenarioOpenDSSGrid(
pv_systems=pv_systems,
time_varying_loads=time_varying_loads,
source_pu=sys["source_pu"],
dss_case_dir=sys["dss_case_dir"],
dss_master_file=sys["dss_master_file"],
dt_s=DT_GRID,
initial_tap_position=sys["initial_taps"],
exclude_buses=exclude_buses,
)
dc_pf = DatacenterConfig(base_kw_per_phase=0).power_factor
for site_id, dc in datacenters.items():
site = dc_sites[site_id]
grid.attach_dc(dc, bus=site.bus, connection_type=site.connection_type, power_factor=dc_pf)

if mode == "baseline":
sched = tap_schedule if tap_schedule is not None else TapSchedule(())
else:
sched = TapSchedule(())
controllers.append(TapScheduleController(schedule=sched, dt_s=DT_CTRL))

if mode == "ofo":
from openg2g.controller.ofo import OFOBatchSizeController

for site_id in site_ids:
site_initial_bs = {
md.spec.model_label: md.initial_batch_size for md, _ in dc_sites[site_id].models
}
ofo_ctrl = OFOBatchSizeController(
site_specs_map[site_id],
datacenter=datacenters[site_id],
grid=grid,
models=logistic_models,
config=ofo_config,
dt_s=DT_CTRL,
initial_batch_sizes=site_initial_bs,
)
controllers.append(ofo_ctrl)

elif mode == "rule_based" or mode.startswith("rule_based_"):
rb_config = rule_based_config or RuleBasedConfig(v_min=V_MIN, v_max=V_MAX)
zones = sys.get("zones") if rule_zone_local else None
for site_id in site_ids:
site_initial_bs = {
md.spec.model_label: md.initial_batch_size for md, _ in dc_sites[site_id].models
}
zone_buses = None
if zones is not None and len(site_ids) > 1 and site_id in zones:
zone_buses = tuple(zones[site_id])
rb_ctrl = RuleBasedBatchSizeController(
site_specs_map[site_id],
datacenter=datacenters[site_id],
grid=grid,
config=rb_config,
dt_s=DT_CTRL,
exclude_buses=exclude_buses,
zone_buses=zone_buses,
initial_batch_sizes=site_initial_bs,
)
controllers.append(rb_ctrl)

elif mode == "ppo":
from openg2g.controller.ppo import PPOBatchSizeController, SharedPPOBatchSizeController
from openg2g.rl.env import ObservationConfig

ppo_path = Path(ppo_model).resolve()

def _find_vecnormalize(model_file: Path) -> Path | None:
mf = Path(model_file)
stem = mf.with_suffix("").name
candidates = [
mf.parent / f"{stem}_vecnormalize.pkl",
mf.parent / "vecnormalize.pkl",
]
if stem.startswith("ppo_") and stem.endswith("_steps"):
candidates.append(mf.parent / f"ppo_vecnormalize_{stem[len('ppo_'):]}.pkl")
for parent in [mf.parent, *mf.parent.parents][:4]:
candidates.extend(sorted(parent.glob("ppo_model_*_vecnormalize.pkl")))
for c in candidates:
if c.is_file():
return c
return None

shared_model = None
if ppo_path.is_dir():
cand = ppo_path / "ppo_model_shared.zip"
if cand.exists():
shared_model = cand
elif ppo_path.suffix == ".zip":
looks_shared = (
len(site_ids) > 1
or "shared" in ppo_path.parts
or ppo_path.stem.startswith("ppo_model_shared")
)
if looks_shared and ppo_path.exists():
shared_model = ppo_path
if shared_model is not None and shared_model.exists():
from stable_baselines3 import PPO as SB3PPO

sb3 = SB3PPO.load(str(shared_model.with_suffix("")))
saved_obs_dim = sb3.observation_space.shape[0]
n_models_all = sum(len(dc_sites[sid].models) for sid in site_ids)
zones = sys.get("zones")

if obs_mode == "per-zone-summary":
n_bus_phases = 0
zone_summary = {zname: tuple(zbuses) for zname, zbuses in zones.items()} if zones else None
bus_phase_groups = None
elif obs_mode == "system-summary-only":
n_bus_phases = 0
zone_summary = None
bus_phase_groups = None
elif obs_mode == "per-bus-summary":
from openg2g.rl.env import compute_bus_phase_groups
grid.do_reset()
grid.start()
_v_index = grid.v_index
grid.stop()
bus_phase_groups = compute_bus_phase_groups(_v_index)
n_bus_phases = 2 * len(bus_phase_groups)
zone_summary = {zname: tuple(zbuses) for zname, zbuses in zones.items()} if zones else None
else: # "full-voltage"
n_bus_phases = saved_obs_dim - 3 - 5 * n_models_all
zone_summary = None
bus_phase_groups = None

site_model_mapping = {sid: [md.spec.model_label for md, _ in dc_sites[sid].models] for sid in site_ids}
all_init_bs = {md.spec.model_label: md.initial_batch_size for sid in site_ids for md, _ in dc_sites[sid].models}
obs_config = ObservationConfig.from_multi_site(
site_specs_map,
{sid: {md.spec.model_label: sched.initial for md, sched in dc_sites[sid].models} for sid in site_ids},
n_bus_phases=n_bus_phases,
initial_batch_sizes=all_init_bs,
zone_summary=zone_summary,
bus_phase_groups=bus_phase_groups,
v_min=V_MIN,
v_max=V_MAX,
)
vn_path = _find_vecnormalize(shared_model)
if vn_path is not None:
logger.info("PPO: loading VecNormalize stats from %s", vn_path)
else:
logger.warning(
"PPO: no VecNormalize stats found next to %s — policy will see UNNORMALIZED obs", shared_model
)
ppo_ctrl = SharedPPOBatchSizeController(
datacenter=next(iter(datacenters.values())),
grid=grid,
model_path=str(shared_model),
obs_config=obs_config,
site_model_mapping=site_model_mapping,
dt_s=DT_CTRL,
vecnormalize_path=str(vn_path) if vn_path is not None else None,
)
controllers.append(ppo_ctrl)
else:
zones = sys.get("zones")

for site_id in site_ids:
if ppo_path.is_dir():
site_model = str(ppo_path / f"ppo_model_{site_id}.zip")
elif ppo_path.suffix == ".zip" and len(site_ids) == 1:
site_model = str(ppo_path)
else:
site_model = str(ppo_path.parent / f"ppo_model_{site_id}.zip")

from stable_baselines3 import PPO as SB3PPO

sb3 = SB3PPO.load(str(Path(site_model).with_suffix("")))
saved_obs_dim = sb3.observation_space.shape[0]
n_models_site = len(dc_sites[site_id].models)
n_bus_phases = saved_obs_dim - 3 - 5 * n_models_site

zone_buses = None
if zones is not None and site_id in zones:
zone_buses = tuple(zones[site_id])

specs = site_specs_map[site_id]
replica_counts = {md.spec.model_label: sched.initial for md, sched in dc_sites[site_id].models}
site_init_bs = {md.spec.model_label: md.initial_batch_size for md, _ in dc_sites[site_id].models}
obs_config = ObservationConfig.from_model_specs(
specs, replica_counts, n_bus_phases=n_bus_phases, initial_batch_sizes=site_init_bs, zone_buses=zone_buses, v_min=V_MIN, v_max=V_MAX
)
vn_path = _find_vecnormalize(Path(site_model))
if vn_path is not None:
logger.info("PPO[%s]: loading VecNormalize stats from %s", site_id, vn_path)
else:
logger.warning(
"PPO[%s]: no VecNormalize stats found next to %s — policy will see UNNORMALIZED obs",
site_id,
site_model,
)
ppo_ctrl = PPOBatchSizeController(
specs,
datacenter=datacenters[site_id],
grid=grid,
model_path=site_model,
obs_config=obs_config,
dt_s=DT_CTRL,
vecnormalize_path=str(vn_path) if vn_path is not None else None,
)
controllers.append(ppo_ctrl)

coord = Coordinator(
datacenters=list(datacenters.values()),
grid=grid,
controllers=controllers,
total_duration_s=TOTAL_DURATION_S,
)

from openg2g.controller.ppo import SharedPPOBatchSizeController

for ctrl in controllers:
if isinstance(ctrl, SharedPPOBatchSizeController):
ctrl.attach_datacenters(list(datacenters.values()))

logger.info("Running %s...", mode)
log = coord.run()

vstats = compute_allbus_voltage_stats(
log.grid_states,
v_min=V_MIN,
v_max=V_MAX,
exclude_buses=exclude_buses,
)
logger.info(
" %s: viol=%.1fs integral=%.4f vmin=%.4f vmax=%.4f",
mode,
vstats.violation_time_s,
vstats.integral_violation_pu_s,
vstats.worst_vmin,
vstats.worst_vmax,
)

return vstats, log

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The run_simulation function is quite long and complex, handling the setup for all controller types (baseline, rule_based, ofo, ppo). This reduces its readability and maintainability. Consider refactoring this function. For example, you could extract the setup logic for each controller type into its own helper function. This would make run_simulation a dispatcher and easier to follow.

Comment on lines +238 to +252
def _find_vecnormalize(model_file: Path) -> Path | None:
mf = Path(model_file)
stem = mf.with_suffix("").name
candidates = [
mf.parent / f"{stem}_vecnormalize.pkl",
mf.parent / "vecnormalize.pkl",
]
if stem.startswith("ppo_") and stem.endswith("_steps"):
candidates.append(mf.parent / f"ppo_vecnormalize_{stem[len('ppo_'):]}.pkl")
for parent in [mf.parent, *mf.parent.parents][:4]:
candidates.extend(sorted(parent.glob("ppo_model_*_vecnormalize.pkl")))
for c in candidates:
if c.is_file():
return c
return None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _find_vecnormalize function has complex logic to locate the vecnormalize.pkl file, searching through multiple parent directories and trying different naming patterns. This can be fragile and hard to debug if file structures change. It would be more robust to have the training script save the path to the vecnormalize.pkl file in a metadata file alongside the model, or to enforce a stricter, simpler directory structure. For example, always save it as vecnormalize.pkl in the same directory as the model.

Comment on lines +470 to +494
def _per_step_voltage_pen(grid_states, *, v_min: float, v_max: float, exclude_buses: tuple[str, ...]) -> np.ndarray:
"""Compute the per-step voltage penalty (sum of squared violation magnitude).

This matches ``compute_reward`` in ``openg2g/rl/env.py``: at each step the
penalty is ``sum_max(v_min - v, 0)^2 + sum_max(v - v_max, 0)^2`` over all
bus-phases (excluding the substation buses), with NaNs treated as
"not in violation".
"""
drop = {b.lower() for b in exclude_buses}
out = np.zeros(len(grid_states), dtype=np.float64)
for i, gs in enumerate(grid_states):
s = 0.0
for bus in gs.voltages.buses():
if bus.lower() in drop:
continue
pv = gs.voltages[bus]
for v in (pv.a, pv.b, pv.c):
if math.isnan(v):
continue
if v < v_min:
s += (v_min - v) ** 2
elif v > v_max:
s += (v - v_max) ** 2
out[i] = s
return out
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function _per_step_voltage_pen seems to duplicate the reward calculation logic from openg2g.rl.env.compute_reward. The docstring even mentions this. To avoid code duplication and potential inconsistencies in the future, consider refactoring this logic into a shared utility function that can be called from both places.

matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
from build_scenario_library import run_simulation
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This script imports run_simulation from build_scenario_library.py. This creates a dependency between two example scripts, where one acts as a library for the other. To improve modularity and clarity, utility functions like run_simulation that are shared across multiple scripts should be moved to a common library module within the openg2g package (e.g., openg2g.utils or openg2g.simulation). This would make the code organization cleaner and dependencies more explicit.

Comment thread openg2g/rl/env.py
Comment on lines +489 to +501
# Register a stub module so pickle can resolve
# build_scenario_library.ScenarioRecord without requiring the
# script to be on sys.path.
if "build_scenario_library" not in sys.modules:
_stub = types.ModuleType("build_scenario_library")

class _ScenarioRecord:
pass

_ScenarioRecord.__module__ = "build_scenario_library"
_ScenarioRecord.__qualname__ = "ScenarioRecord"
_stub.ScenarioRecord = _ScenarioRecord
sys.modules["build_scenario_library"] = _stub
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The ScenarioLibrary class includes a mechanism to create a stub module for build_scenario_library to handle unpickling of ScenarioRecord. While this works, it's a bit of a workaround and indicates a cyclic dependency or misplaced class definition. A cleaner design would be to define ScenarioRecord in a shared library module (e.g., openg2g.rl.scenario) that both build_scenario_library.py and this module can import from. This would remove the need for this stub and make the data structures more independent of specific scripts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant