-
Notifications
You must be signed in to change notification settings - Fork 157
Expand file tree
/
Copy path9_cfg_task.py
More file actions
140 lines (114 loc) · 3.48 KB
/
9_cfg_task.py
File metadata and controls
140 lines (114 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""Initialize a task from a config file."""
from __future__ import annotations
try:
import isaacgym # noqa: F401
except ImportError:
pass
from typing import Literal
import rootutils
import tyro
from loguru import logger as log
from rich.logging import RichHandler
rootutils.setup_root(__file__, pythonpath=True)
log.configure(handlers=[{"sink": RichHandler(), "format": "{message}"}])
import torch
from gymnasium import make_vec
import metasim
metasim.register_gym_envs()
from metasim.scenario.cameras import PinholeCameraCfg
from metasim.utils import configclass
from metasim.utils.obs_utils import ObsSaver
@configclass
class Args:
"""Arguments for the static scene."""
task: str = "stack_cube"
robot: str = "franka"
## Handlers
sim: Literal[
"isaacsim",
"isaacgym",
"genesis",
"pybullet",
"sapien2",
"sapien3",
"mujoco",
"mjx",
] = "mujoco"
## Others
num_envs: int = 1
headless: bool = False
device: str = "cuda"
save_video: bool = True
def __post_init__(self):
"""Post-initialization configuration."""
log.info(f"Args: {self}")
def main():
args = tyro.cli(Args)
# Add camera for video recording if needed
camera = (
PinholeCameraCfg(
name="main_camera",
pos=[4.0, 4.0, 3.0],
look_at=[0.0, 0.0, 1.0],
width=640,
height=480,
data_types=["rgb"],
)
if args.save_video
else None
)
env_id = f"RoboVerse/{args.task}"
env = make_vec(
env_id,
num_envs=args.num_envs,
# vectorization_mode=None,
simulator=args.sim,
headless=args.headless,
cameras=[camera] if args.save_video else [],
device=args.device,
)
obs, info = env.reset()
# Initialize video saver
obs_saver = None
if args.save_video:
import os
os.makedirs("get_started/output", exist_ok=True)
video_path = f"get_started/output/9_cfg_task_{args.sim}.mp4"
obs_saver = ObsSaver(video_path=video_path)
log.info(f"Will save video to: {video_path}")
robot = env.scenario.robots[0]
for step_i in range(100):
# batch actions: (num_envs, act_dim)
actions = [
{
robot.name: {
"dof_pos_target": {
joint_name: (
torch.rand(1).item()
* (robot.joint_limits[joint_name][1] - robot.joint_limits[joint_name][0])
+ robot.joint_limits[joint_name][0]
)
for joint_name in robot.joint_limits.keys()
}
}
}
for _ in range(args.num_envs)
]
obs, reward, terminated, truncated, info = env.step(actions)
# Save observations for video
if obs_saver is not None:
try:
raw_states = env.task_env.handler.get_states(mode="tensor") # Access the underlying simulator
obs_saver.add(raw_states)
except Exception as e:
log.debug(f"Could not get camera data: {e}")
# Save video at the end
if obs_saver is not None:
obs_saver.save()
log.info("Video saved successfully!")
try:
env.close()
except NotImplementedError:
log.debug("env.close() not implemented, ignoring")
if __name__ == "__main__":
main()