@@ -149,6 +149,129 @@ def setup_training(ip: str, username: str = "ubuntu"):
149149 logger .info ("Setup complete!" )
150150
151151
152+ def prepare_training_data (ip : str , group_size : int = 8 , username : str = "ubuntu" ):
153+ """Prepare parquet data files required by verl-agent.
154+
155+ verl-agent requires train/val parquet files even for env-based training.
156+ These define the modality (text vs visual) and batch sizing.
157+ """
158+ logger .info ("Preparing training data (parquet files)..." )
159+ prep_cmd = (
160+ "cd ~/verl-agent && "
161+ "conda run -n verl-agent python3 -m examples.data_preprocess.prepare "
162+ f"--mode visual --train_data_size { group_size } --val_data_size 128"
163+ )
164+ result = _ssh_run (ip , prep_cmd , username = username , stream = True )
165+ if result .returncode != 0 :
166+ raise RuntimeError ("Data preparation failed" )
167+
168+
169+ def patch_env_manager (ip : str , waa_server : str , task_id : str , max_steps : int = 15 , username : str = "ubuntu" ):
170+ """Patch verl-agent's env_manager.py to support WAADesktopEnv.
171+
172+ verl-agent uses a hardcoded if/elif chain in make_envs() to dispatch
173+ environments by name. We add a 'waa' branch that creates our
174+ WAADesktopEnv-based environment manager.
175+ """
176+ logger .info ("Patching verl-agent env_manager for WAA support..." )
177+
178+ # Write the patch script to the remote VM
179+ patch_script = f'''
180+ import os, sys
181+
182+ env_manager_path = os.path.expanduser(
183+ "~/verl-agent/agent_system/environments/env_manager.py"
184+ )
185+
186+ with open(env_manager_path, "r") as f:
187+ content = f.read()
188+
189+ # Check if already patched
190+ if "waa" in content.lower() and "WAADesktopEnv" in content:
191+ print("env_manager.py already patched for WAA")
192+ sys.exit(0)
193+
194+ # Find the else branch that exits and add our elif before it
195+ patch = """
196+ elif "waa" in config.env.env_name.lower():
197+ # WAA Desktop Automation Environment (openadapt-evals)
198+ from openadapt_evals.adapters.verl_env import WAADesktopEnv
199+ from functools import partial
200+ import asyncio
201+
202+ server_url = getattr(config.env, "waa", {{}}).get("server_url", "{ waa_server } ")
203+ task_id = getattr(config.env, "waa", {{}}).get("task_id", "{ task_id } ")
204+ max_steps = config.env.max_steps
205+
206+ env_config = {{
207+ "server_url": server_url,
208+ "task_id": task_id,
209+ "max_steps": max_steps,
210+ "evaluate_at_done": True,
211+ "action_type": "fractional",
212+ }}
213+
214+ # Build vectorized environments using Ray
215+ class WAAEnvWrapper:
216+ """Sync wrapper for WAADesktopEnv's async interface."""
217+ def __init__(self, config):
218+ self.env = WAADesktopEnv(config)
219+ self._loop = None
220+
221+ def _get_loop(self):
222+ if self._loop is None or self._loop.is_closed():
223+ self._loop = asyncio.new_event_loop()
224+ return self._loop
225+
226+ def reset(self, seed=0):
227+ return self._get_loop().run_until_complete(self.env.reset(seed))
228+
229+ def step(self, action):
230+ return self._get_loop().run_until_complete(self.env.step(action))
231+
232+ def close(self):
233+ if self._loop and not self._loop.is_closed():
234+ self._loop.run_until_complete(self.env.close())
235+ self._loop.close()
236+
237+ # For now, use a simple non-vectorized approach
238+ # Full Ray vectorization can be added once basic training works
239+ print(f"WAA environment: server={{server_url}}, task={{task_id}}, max_steps={{max_steps}}")
240+ print("NOTE: WAA env integration is experimental. See openadapt-evals docs.")
241+
242+ # Create minimal env manager compatible with verl-agent's expected interface
243+ env_wrapper = WAAEnvWrapper(env_config)
244+ # Return a placeholder - the actual integration requires implementing
245+ # EnvironmentManagerBase, which we'll do as a next step
246+ raise NotImplementedError(
247+ "WAA environment manager integration is in progress. "
248+ "The env dispatch is patched but EnvironmentManagerBase "
249+ "adapter is needed. See openadapt-evals PR #87."
250+ )
251+ """
252+
253+ # Insert before the else branch
254+ old = ' else:\\ n print("Environment not supported")'
255+ if old in content:
256+ content = content.replace(old, patch + ' else:\\ n print("Environment not supported")')
257+ with open(env_manager_path, "w") as f:
258+ f.write(content)
259+ print("env_manager.py patched successfully")
260+ else:
261+ # Try alternate pattern matching
262+ print("WARNING: Could not find expected else branch in env_manager.py")
263+ print("Manual patching may be required")
264+ sys.exit(1)
265+ '''
266+
267+ _ssh_run (
268+ ip ,
269+ f"conda run -n verl-agent python3 -c '{ patch_script } '" ,
270+ username = username ,
271+ stream = True ,
272+ )
273+
274+
152275def launch_training (
153276 ip : str ,
154277 waa_server : str ,
@@ -165,29 +288,52 @@ def launch_training(
165288
166289 The training connects to the WAA server via HTTP for environment
167290 interaction (reset, step, evaluate).
291+
292+ NOTE: verl-agent uses a hardcoded env dispatch in make_envs(). This
293+ function patches it to support our WAADesktopEnv before launching.
294+ The full EnvironmentManagerBase adapter is still TODO — this will
295+ raise NotImplementedError on the first training attempt. See the
296+ decision doc for the integration roadmap.
168297 """
169- # Build the verl-agent training command using Hydra-style overrides
298+ # Step 1: Prepare parquet data files (required by verl-agent)
299+ prepare_training_data (ip , group_size = group_size , username = username )
300+
301+ # Step 2: Patch env_manager to recognize 'waa' env name
302+ patch_env_manager (ip , waa_server , task_id , max_steps = max_turns , username = username )
303+
304+ # Step 3: Build the training command with validated Hydra overrides
305+ # Config paths validated against verl-agent's ppo_trainer.yaml schema.
306+ # See docs/verl_agent_decision.md for the validation report.
170307 train_cmd = f"""
171308cd ~/verl-agent && \\
172- conda activate verl-agent && \\
173- python3 -m verl.trainer.main_ppo \\
309+ conda run -n verl-agent python3 -m verl.trainer.main_ppo \\
174310 algorithm.adv_estimator={ algorithm } \\
311+ algorithm.gamma=0.95 \\
175312 actor_rollout_ref.model.path={ model } \\
176313 actor_rollout_ref.rollout.name=vllm \\
177314 actor_rollout_ref.rollout.tensor_model_parallel_size={ n_gpus } \\
178- env.env_name=openadapt_evals.adapters.verl_env.WAADesktopEnv \\
179- env.env_kwargs.server_url={ waa_server } \\
180- env.env_kwargs.task_id={ task_id } \\
181- env.env_kwargs.max_steps={ max_turns } \\
315+ actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \\
316+ actor_rollout_ref.rollout.enable_chunked_prefill=False \\
317+ actor_rollout_ref.actor.ppo_mini_batch_size=64 \\
318+ actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=8 \\
319+ env.env_name=waa_desktop \\
182320 env.max_steps={ max_turns } \\
183321 env.rollout.n={ group_size } \\
322+ env.waa.server_url={ waa_server } \\
323+ env.waa.task_id={ task_id } \\
324+ data.train_files=$HOME/data/verl-agent/visual/train.parquet \\
325+ data.val_files=$HOME/data/verl-agent/visual/test.parquet \\
184326 data.train_batch_size={ group_size } \\
327+ data.val_batch_size=128 \\
185328 data.max_prompt_length=2048 \\
186329 data.max_response_length=512 \\
187330 data.return_raw_chat=True \\
331+ data.filter_overlong_prompts=True \\
188332 trainer.n_gpus_per_node={ n_gpus } \\
189333 trainer.nnodes=1 \\
190334 trainer.total_epochs={ epochs } \\
335+ trainer.test_freq=5 \\
336+ trainer.experiment_name={ algorithm } _waa_desktop \\
191337 trainer.logger=['console','wandb'] \\
192338 trainer.project_name=openadapt-waa-rl
193339"""
0 commit comments