Skip to content

Commit 8c18397

Browse files
authored
Fix non-determinism in DAgger (fixes #643) (#649)
* Fix non-determinism in DAgger reported by #643 The issue was that DAgger demonstrations were loaded from disk for training in a different order each run. This was because the filenames for the saved demonstrations changed each run and that changed the order in which os.listdir returned the filenames. The filenames changed each run, because they included a timestamp and the first 6 characters of a UUID generated without fixing a random seed. This PR fixes the non-determinism by making the filenames the same each run as long as the same random seed is used. It does so by removing the timestamp from the filename and fixing the seed of the UUID. Because the timestamp is removed, the PR introduces a trajectory index in the filename, so that a user can tell the order in which trajectories were created. It also includes the entire UUID instead of just the first 6 characters. Finally, it sorts the filenames returned by os.listdir. listdir returns filenames in an arbitrary order that depends on the file system implementation (https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic). We sort the filenames to ensure the order is consistent across file systems. Why include a UUID in the filename at all? If we removed the UUID from the filename, then the DAgger trainers would not overwrite filenames, because they take care to write to a new directory each round. However, if the InteractiveTrajectoryCollector is used independently of those trainers, then it can end up overwriting filenames without the UUID. Do we need to shuffle the filenames returned by os.listdir after sorting? We could, but the demonstrations loaded from the files are passed to a DataLoader, which shuffles them. That seems like the right place to handle the shuffling rather than making it the responsibility of the utility function that returns the filenames. * Assert that the DAgger demonstration file does not already exist before saving. * Minor clean-up: Shorten list comprehension * Make the reproducibility tests more thorough This PR makes the test_trainer_reproducible and test_traj_collector_reproducible more thorough. For test_trainer_reproducible, it tests that the trajectories from rolling out the trained policy are the same each run (instead of just testing that the rewards achieved by the trained policy are the same). For test_traj_collector_reproducible, it tests that the filenames for the files storing DAgger demonstrations are the same each run and that each file in the first run stores the same trajectory as the file with the same filename in the second run (instead of just testing that the observations from the trajectories are the same). * Reduce the number of training iterations in test_trainer_reproducible This PR reduces the number of training iterations in test_trainer_reproducible, because the previous number of iterations used was for testing that the policy improved with training, but that's not needed to test reproducibility.
1 parent 681cb72 commit 8c18397

2 files changed

Lines changed: 139 additions & 8 deletions

File tree

src/imitation/algorithms/dagger.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import logging
1111
import os
1212
import pathlib
13+
import uuid
1314
from typing import Any, Callable, List, Mapping, Optional, Sequence, Tuple, Union
1415

1516
import numpy as np
@@ -21,7 +22,6 @@
2122
from imitation.algorithms import base, bc
2223
from imitation.data import rollout, types
2324
from imitation.util import logger as imit_logger
24-
from imitation.util import util
2525

2626

2727
class BetaSchedule(abc.ABC):
@@ -99,16 +99,21 @@ def reconstruct_trainer(
9999

100100
def _save_dagger_demo(
101101
trajectory: types.Trajectory,
102+
trajectory_index: int,
102103
save_dir: types.AnyPath,
104+
rng: np.random.Generator,
103105
prefix: str = "",
104106
) -> None:
105107
save_dir = types.parse_path(save_dir)
106108
assert isinstance(trajectory, types.Trajectory)
107109
actual_prefix = f"{prefix}-" if prefix else ""
108-
timestamp = util.make_unique_timestamp()
109-
filename = f"{actual_prefix}dagger-demo-{timestamp}.npz"
110-
110+
randbits = int.from_bytes(rng.bytes(16), "big")
111+
random_uuid = uuid.UUID(int=randbits, version=4).hex
112+
filename = f"{actual_prefix}dagger-demo-{trajectory_index}-{random_uuid}.npz"
111113
npz_path = save_dir / filename
114+
assert (
115+
not npz_path.exists()
116+
), "The following DAgger demonstration path already exists: {0}".format(npz_path)
112117
types.save(npz_path, [trajectory])
113118
logging.info(f"Saved demo at '{npz_path}'")
114119

@@ -246,8 +251,8 @@ def step_wait(self) -> VecEnvStepReturn:
246251
infos=infos,
247252
dones=dones,
248253
)
249-
for traj in fresh_demos:
250-
_save_dagger_demo(traj, self.save_dir)
254+
for traj_index, traj in enumerate(fresh_demos):
255+
_save_dagger_demo(traj, traj_index, self.save_dir, self.rng)
251256

252257
return next_obs, rews, dones, infos
253258

@@ -372,7 +377,13 @@ def _load_all_demos(self) -> Tuple[types.Transitions, List[int]]:
372377
return demo_transitions, num_demos_by_round
373378

374379
def _get_demo_paths(self, round_dir: pathlib.Path) -> List[pathlib.Path]:
375-
return [round_dir / p for p in os.listdir(round_dir) if p.endswith(".npz")]
380+
# listdir returns filenames in an arbitrary order that depends on the
381+
# file system implementation:
382+
# https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic
383+
# To ensure the order is consistent across file systems,
384+
# we sort by the filename.
385+
filenames = sorted(os.listdir(round_dir))
386+
return [round_dir / f for f in filenames if f.endswith(".npz")]
376387

377388
def _demo_dir_path_for_round(self, round_num: Optional[int] = None) -> pathlib.Path:
378389
if round_num is None:
@@ -570,10 +581,12 @@ def __init__(
570581
if expert_trajs is not None:
571582
# Save each initial expert trajectory into the "round 0" demonstration
572583
# data directory.
573-
for traj in expert_trajs:
584+
for traj_index, traj in enumerate(expert_trajs):
574585
_save_dagger_demo(
575586
traj,
587+
traj_index,
576588
self._demo_dir_path_for_round(),
589+
self.rng,
577590
prefix="initial_data",
578591
)
579592

tests/algorithms/test_dagger.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,63 @@ def get_random_acts(obs):
105105
assert nonzero_acts == 0
106106

107107

108+
def test_traj_collector_reproducible(tmpdir, pendulum_venv):
109+
# We run the collector twice with the same random seeds and
110+
# check that the following 2 properties hold:
111+
# 1) The files written in the first run have the same filenames
112+
# as the files written in the second run.
113+
# 2) Each file in the first run stores the same trajectory as
114+
# the file with the same filename in the second run.
115+
filename_to_traj_dicts = []
116+
with torch.random.fork_rng():
117+
for run_idx in range(2):
118+
# Reset the random seeds.
119+
save_dir = os.path.join(tmpdir, "run{0}".format(run_idx))
120+
rng = np.random.default_rng(12345)
121+
pendulum_venv.seed(12345)
122+
pendulum_venv.action_space.seed(12345)
123+
124+
# Run the collector.
125+
collector = dagger.InteractiveTrajectoryCollector(
126+
venv=pendulum_venv,
127+
get_robot_acts=lambda o: [
128+
pendulum_venv.action_space.sample() for _ in range(len(o))
129+
],
130+
beta=0.5,
131+
save_dir=save_dir,
132+
rng=rng,
133+
)
134+
collector.reset()
135+
zero_acts = np.zeros(
136+
(pendulum_venv.num_envs,) + pendulum_venv.action_space.shape,
137+
dtype=pendulum_venv.action_space.dtype,
138+
)
139+
for i in range(1000):
140+
_, _, dones, _ = collector.step(zero_acts)
141+
142+
# Get the observations from all the collected trajectories.
143+
file_paths = glob.glob(os.path.join(save_dir, "dagger-demo-*.npz"))
144+
filename_to_traj_dict = {}
145+
for fp in file_paths:
146+
traj = types.load_with_rewards(fp)[0]
147+
# For the purposes of testing, we remove `infos` from the
148+
# trajectory, because `infos` contains the time that it
149+
# takes to complete an episode, which we expect to differ
150+
# slightly between runs.
151+
traj_without_infos = types.TrajectoryWithRew(
152+
obs=traj.obs,
153+
acts=traj.acts,
154+
infos=None,
155+
terminal=traj.terminal,
156+
rews=traj.rews,
157+
)
158+
filename = os.path.basename(fp)
159+
filename_to_traj_dict[filename] = traj_without_infos
160+
filename_to_traj_dicts.append(filename_to_traj_dict)
161+
162+
assert filename_to_traj_dicts[0] == filename_to_traj_dicts[1]
163+
164+
108165
def _build_dagger_trainer(
109166
tmpdir,
110167
venv,
@@ -325,6 +382,67 @@ def test_trainer_makes_progress(init_trainer_fn, pendulum_venv, pendulum_expert_
325382
)
326383

327384

385+
@pytest.mark.parametrize(
386+
"init_trainer_fn",
387+
[_build_dagger_trainer, _build_simple_dagger_trainer],
388+
)
389+
def test_trainer_reproducible(
390+
init_trainer_fn,
391+
tmpdir,
392+
pendulum_venv,
393+
pendulum_expert_policy,
394+
custom_logger,
395+
):
396+
# Check that we get the same results if we run the trainer
397+
# twice with the same random seeds.
398+
# In particular, check that the trajectories from rolling out
399+
# the trained policy are the same in each run.
400+
run_trajs = []
401+
with torch.random.fork_rng():
402+
for run_idx in range(2):
403+
# Reset the random seeds.
404+
run_dir = os.path.join(tmpdir, "run{0}".format(run_idx))
405+
torch.random.manual_seed(12345)
406+
rng = np.random.default_rng(12345)
407+
pendulum_venv.seed(12345)
408+
pendulum_venv.action_space.seed(12345)
409+
410+
beta_schedule = None
411+
maybe_pendulum_expert_trajectories = None
412+
trainer = init_trainer_fn(
413+
run_dir,
414+
pendulum_venv,
415+
beta_schedule,
416+
pendulum_expert_policy,
417+
maybe_pendulum_expert_trajectories,
418+
custom_logger,
419+
rng,
420+
)
421+
422+
for i in range(2):
423+
collector = trainer.create_trajectory_collector()
424+
obs = collector.reset()
425+
dones = [False] * pendulum_venv.num_envs
426+
while not np.any(dones):
427+
expert_actions, _ = pendulum_expert_policy.predict(
428+
obs,
429+
deterministic=True,
430+
)
431+
obs, _, dones, _ = collector.step(expert_actions)
432+
trainer.extend_and_update(dict(n_epochs=1))
433+
434+
trajs = rollout.rollout(
435+
trainer.policy,
436+
pendulum_venv,
437+
rollout.make_sample_until(min_episodes=2),
438+
rng,
439+
)
440+
run_trajs.append(trajs)
441+
442+
assert len(run_trajs) == 2
443+
assert run_trajs[0] == run_trajs[1]
444+
445+
328446
def test_trainer_save_reload(tmpdir, init_trainer_fn, pendulum_venv):
329447
trainer = init_trainer_fn()
330448
trainer.round_num = 3

0 commit comments

Comments
 (0)