-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlaunch.py
More file actions
executable file
·177 lines (143 loc) · 5.32 KB
/
Copy pathlaunch.py
File metadata and controls
executable file
·177 lines (143 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python3
"""Top-level launcher — run applications from any module(s) or scenario.
Usage:
python3 launch.py <module> [apps ...] [-s]
python3 launch.py --scenario <name> [-s]
python3 launch.py --list-scenarios
Examples:
python3 launch.py 01-operating-room Arm ArmController -s
python3 launch.py 02-record-playback RecordingService
python3 launch.py --scenario record -s
python3 launch.py --list-scenarios
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
try:
import argcomplete
except ImportError:
argcomplete = None
sys.path.insert(0, str(Path(__file__).resolve().parent / "resource" / "python"))
from scripts import module_runner
PROJECT_ROOT = Path(__file__).resolve().parent
SCENARIOS_PATH = PROJECT_ROOT / "resource" / "config" / "scenarios.json"
with open(SCENARIOS_PATH, encoding="utf-8") as f:
SCENARIOS: dict[str, dict] = json.load(f)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resolve_module(
module_name: str,
app_names: list[str] | None,
security: bool,
) -> tuple[list[list[str]], Path, dict[str, str]]:
"""Load a module's config and return (commands, module_dir, env)."""
modules = module_runner.discover_modules()
module_dir = modules[module_name]
env, all_apps = module_runner.load_module_config(module_dir, flags={"security": security})
if app_names:
for name in app_names:
if name not in all_apps:
raise ValueError(
f"Unknown app '{name}' in module '{module_name}'. "
f"Available: {', '.join(all_apps)}"
)
commands = [all_apps[app] for app in app_names]
else:
commands = list(all_apps.values())
return commands, module_dir, env
def _list_scenarios() -> None:
"""Print all available scenarios and exit."""
print("Available scenarios:\n")
max_name = max(len(name) for name in SCENARIOS)
for name, spec in SCENARIOS.items():
desc = spec.get("description", "")
modules_str = ", ".join(
f"{m} ({', '.join(apps) if apps else 'all'})" for m, apps in spec["modules"]
)
print(f" {name:<{max_name}} {desc}")
print(f" {'':<{max_name}} -> {modules_str}")
print()
def _complete_apps(prefix, parsed_args, **kwargs):
"""Return app names for the already-selected module (for argcomplete)."""
module_name = getattr(parsed_args, "module", None)
if not module_name:
return []
modules = module_runner.discover_modules()
module_dir = modules.get(module_name)
if not module_dir:
return []
try:
config_path = module_dir / "module.json"
with open(config_path, encoding="utf-8") as f:
raw = json.load(f)
return [a for a in raw.get("apps", {}) if a.startswith(prefix)]
except Exception:
return []
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
modules = module_runner.discover_modules()
parser = argparse.ArgumentParser(
description="Launch applications from module(s) or a predefined scenario.",
usage="launch.py (<module> [apps ...] | --scenario <name>) [-s]",
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"module",
nargs="?",
choices=sorted(modules),
default=None,
help="Module to launch (e.g. 01-operating-room).",
)
group.add_argument(
"--scenario",
choices=sorted(SCENARIOS),
metavar="NAME",
help="Launch a predefined scenario.",
)
group.add_argument(
"--list-scenarios",
action="store_true",
help="List all available scenarios and exit.",
)
parser.add_argument(
"apps",
nargs="*",
default=None,
help="Applications to launch (default: all in the module).",
).completer = _complete_apps
parser.add_argument(
"-s",
"--security",
action="store_true",
help="Launch with Security enabled.",
)
if argcomplete:
argcomplete.autocomplete(parser)
args = parser.parse_args()
if args.list_scenarios:
_list_scenarios()
return
if args.scenario:
spec = SCENARIOS[args.scenario]
print(f"Scenario: {args.scenario} — {spec.get('description', '')}")
specs = []
for module_name, app_names in spec["modules"]:
if module_name not in modules:
parser.error(f"Scenario references unknown module '{module_name}'")
cmds, mod_dir, env = _resolve_module(module_name, app_names, args.security)
specs.append((cmds, mod_dir, env))
module_runner.launch_multi(specs)
elif args.module:
cmds, mod_dir, env = _resolve_module(args.module, args.apps or None, args.security)
app_label = ", ".join(args.apps) if args.apps else "all"
print(f"Launching from {args.module}: {app_label}")
module_runner.launch(cmds, mod_dir, env)
else:
parser.error("Specify a module or --scenario")
if __name__ == "__main__":
main()