diff --git a/controllers/__init__.py b/controllers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/floating_controller.py b/controllers/floating_controller.py similarity index 97% rename from floating_controller.py rename to controllers/floating_controller.py index 87b7377..1ed14ef 100644 --- a/floating_controller.py +++ b/controllers/floating_controller.py @@ -1,11 +1,11 @@ # floating_controller.py import mujoco import numpy as np -from config import AppConfig +from utils.config import AppConfig from typing import TYPE_CHECKING if TYPE_CHECKING: - from terrain_manager import TerrainManager + from core.terrain_manager import TerrainManager class FloatingController: """ diff --git a/controllers/hardware_controller.py b/controllers/hardware_controller.py new file mode 100644 index 0000000..7df31a1 --- /dev/null +++ b/controllers/hardware_controller.py @@ -0,0 +1,299 @@ +import serial +import threading +import time +import numpy as np +import struct +import re +from typing import TYPE_CHECKING + +from utils.logger import log +from state import OperatingMode, ControlSubMode + +if TYPE_CHECKING: # 型別提示,避免循環匯入 + from utils.config import AppConfig + from core.policy import PolicyManager + from state import SimulationState + from serial_communicator import SerialCommunicator + +# ------------------------------------------------------------- +# 常數:Teensy 實際傳送 40 個浮點數 + 1 個 CRC 欄位 +# ------------------------------------------------------------- +EXPECTED_FLOATS = 40 # 資料欄位數 (不含 CRC) +EXPECTED_CSV_FIELDS = EXPECTED_FLOATS + 1 # 40 floats + CRC +_CSV_REGEX = re.compile(r"[,\s]+") # 允許逗號或空白分隔 + + +def _crc8(data: bytes) -> int: + """計算簡易 CRC‑8: poly = x^8 + x^2 + x + 1 (0x07)""" + crc = 0 + for b in data: + crc ^= b + for _ in range(8): + crc = (crc << 1) ^ 0x07 if (crc & 0x80) else (crc << 1) + crc &= 0xFF + return crc + + +class HardwareController: + """重構版硬體控制器,已適配 40 維數據流並具有 CRC 驗證。""" + + def __init__(self, config: 'AppConfig', policy: 'PolicyManager', + global_state: 'SimulationState', serial_comm: 'SerialCommunicator'): + self.config = config + self.policy = policy + self.global_state = global_state + self.serial_comm = serial_comm + + self.ser: serial.Serial | None = None + self._is_running = threading.Event() + self._read_thread: threading.Thread | None = None + self._control_thread: threading.Thread | None = None + self._lock = threading.Lock() + self._partial_line: list[str] = [] # 斷包暫存 + + # --- 最新感測資料緩衝 (符合 40 維定義) --- + self._raw_angular_velocity = np.zeros(3) + self._raw_gravity_vector = np.zeros(3) + self._raw_linear_velocity = np.zeros(3) + self._raw_accelerometer = np.zeros(3) + self._raw_joint_positions = np.zeros(12) + self._raw_joint_velocities = np.zeros(12) + self._last_action = np.zeros(12) + self._last_update_time = 0.0 + + log.info("✅ 重構版硬體控制器已初始化 (40-dim stream)。") + + @property + def is_running(self) -> bool: + """公開查詢是否運行中的屬性""" + return self._is_running.is_set() + + # ------------------------------------------------------------- + # lifecycle 生命週期 + # ------------------------------------------------------------- + def attach_serial(self, ser: serial.Serial | None) -> None: + """由 SerialCommunicator 呼叫,將已連線的序列埠交給硬體控制器。""" + self.ser = ser + if ser is not None: + self.serial_comm.attach_serial(ser) + + def start(self) -> bool: + """啟動背景執行緒並接管序列埠。""" + if self._is_running.is_set(): + log.info("硬體控制器已在運行中。") + return False + if not self.serial_comm.is_connected: + log.error("❌ 硬體模式錯誤:請先連接序列埠。") + return False + if not self.ser: + self.ser = self.serial_comm.get_serial_connection() + if not self.ser: + log.error("❌ 無法取得有效序列埠連接。") + return False + self.serial_comm.attach_serial(self.ser) + try: + log.info("-> 正在命令 Teensy 切換至 POLICY_STREAM 模式...") + self.ser.write(b"monitor p\n") + time.sleep(0.1) + self.ser.reset_input_buffer() + except serial.SerialException as e: + log.error(f"❌ 發送模式切換指令失敗: {e}") + self.serial_comm.is_managed_by_hardware_controller = False + return False + self._is_running.set() + self._read_thread = threading.Thread(target=self._read_loop, daemon=True) + self._read_thread.start() + self._control_thread = threading.Thread(target=self._control_loop, daemon=True) + self._control_thread.start() + with self.global_state.lock: + self.global_state.hardware.is_connected = True + log.info("✅ 硬體控制執行緒已啟動。") + return True + + def stop(self) -> None: + if not self._is_running.is_set(): + return + log.info("正在停止硬體控制器...") + self._is_running.clear() + if self.ser and self.ser.is_open: + try: + self.ser.write(b"stop\n") + time.sleep(0.05) + self.ser.write(b"monitor h\n") + except serial.SerialException: + log.warning("發送停止指令失敗。") + if self._control_thread: + self._control_thread.join(timeout=1) + if self._read_thread: + self._read_thread.join(timeout=1) + self.serial_comm.detach_serial() + self.ser = None + with self.global_state.lock: + self.global_state.hardware.is_connected = False + self.global_state.hardware.ai_is_active = False + log.info("硬體控制器已完全停止。") + + # ------------------------------------------------------------- + # internal helpers + # ------------------------------------------------------------- + def _parse_policy_stream(self, line: str) -> None: + """解析來自 Teensy 的 CSV 資料並進行 CRC 驗證""" + parts = _CSV_REGEX.split(line.strip()) + + # 1) 若上輪有殘包,先拼接 + if self._partial_line: + parts = self._partial_line + parts + self._partial_line = [] + + # 2) 欄位不足:暫存待補 + if len(parts) < EXPECTED_CSV_FIELDS: + self._partial_line = parts + return + + # 3) 欄位過多:截斷並記錄 + if len(parts) > EXPECTED_CSV_FIELDS: + with open("bad_lines.log", "a", encoding='utf-8') as f: + f.write(f"LONG_LINE: {line}\n") + parts = parts[:EXPECTED_CSV_FIELDS] + + try: + float_values = [float(p) for p in parts[:-1]] # 先解析所有浮點數 + crc_from_teensy = int(parts[-1]) & 0xFF # 最後一欄為 CRC + except ValueError as e: + with open("bad_lines.log", "a", encoding='utf-8') as f: + f.write(f"PARSE_ERROR: {line} | {e}\n") + with self.global_state.lock: + self.global_state.hardware.mismatch_count += 1 + self.global_state.hardware.status_text = "Parse Error!" + return + + if len(float_values) != EXPECTED_FLOATS: + with open("bad_lines.log", "a", encoding='utf-8') as f: + f.write(f"DIM_MISMATCH: {line}\n") + with self.global_state.lock: + self.global_state.hardware.mismatch_count += 1 + self.global_state.hardware.status_text = "Data dim mismatch!" + return + + float_bytes = struct.pack('<' + 'f'*EXPECTED_FLOATS, *float_values) + calculated_crc = _crc8(float_bytes) + if calculated_crc != crc_from_teensy: + with self.global_state.lock: + self.global_state.hardware.crc_error_count += 1 + self.global_state.hardware.status_text = ( + f"CRC Error! PC:{calculated_crc} != Teensy:{crc_from_teensy}") + with open("bad_lines.log", "a", encoding='utf-8') as f: + f.write( + f"CRC_ERROR: {line} | PC_CRC:{calculated_crc} | TEENSY_CRC:{crc_from_teensy}\n" + ) + return + + data_vec = np.frombuffer(float_bytes, dtype=np.float32) + with self._lock: + self._raw_angular_velocity[:] = data_vec[0:3] + self._raw_gravity_vector[:] = data_vec[3:6] + self._raw_linear_velocity[:] = data_vec[6:9] + self._raw_accelerometer[:] = data_vec[9:12] + self._raw_joint_positions[:] = data_vec[12:24] + self._raw_joint_velocities[:] = data_vec[24:36] + # 36~39 為保留欄位,目前忽略 + self._last_update_time = time.time() # 只有 CRC 成功才更新時間 + with self.global_state.lock: + self.global_state.hardware.status_text = "OK" + + def _read_loop(self) -> None: + while self._is_running.is_set(): + if not self.ser or not self.ser.is_open: + self.stop() + break + try: + line = self.ser.readline().decode('utf-8', 'ignore').strip() + if line: + self._parse_policy_stream(line) + except (serial.SerialException, OSError): + log.error("❌ 序列埠斷開連接,停止硬體控制器。") + self.stop() + break + + def _control_loop(self) -> None: + default_pose = self.global_state.sim.default_pose if self.global_state.sim else np.zeros(12) + while self._is_running.is_set(): + loop_start = time.perf_counter() + + # 先同步最新感測值到全域狀態,並抓取目前的控制子模式 + with self._lock, self.global_state.lock: + state = self.global_state + state.hardware.angular_velocity_radps = self._raw_angular_velocity.copy() + state.hardware.gravity_vector = self._raw_gravity_vector.copy() + state.hardware.linear_velocity = self._raw_linear_velocity.copy() + state.hardware.accelerometer = self._raw_accelerometer.copy() + state.hardware.joint_positions_rad = self._raw_joint_positions.copy() + state.hardware.joint_velocities_radps = self._raw_joint_velocities.copy() + state.hardware.last_update_time = self._last_update_time + sub_mode = state.control_sub_mode + state.hardware.ai_is_active = ( + sub_mode in (ControlSubMode.WALKING, ControlSubMode.FLOATING) + ) + + # --- 無論 AI 是否啟動,都先建構 ONNX 觀察向量 --- + obs_components = { + 'angular_velocity': self._raw_angular_velocity, + 'gravity_vector': self._raw_gravity_vector, + 'linear_velocity': self._raw_linear_velocity, + 'accelerometer': self._raw_accelerometer, + 'joint_positions': self._raw_joint_positions, + 'joint_velocities': self._raw_joint_velocities, + 'last_action': self._last_action, + # commands 仍需納入,以確保維度一致 + 'commands': state.command * self.config.command_scaling_factors, + } + recipe = self.policy.get_active_recipe() + try: + obs_list = [obs_components[key] for key in recipe] + onnx_input = np.concatenate(obs_list).astype(np.float32) + except KeyError as e: + log.error(f"硬體觀察向量構建失敗: 缺少 '{e}'") + onnx_input = np.array([]) + + action_raw = np.zeros(12) + final_cmd = np.zeros(12) + command_to_send = None + + if sub_mode in (ControlSubMode.WALKING, ControlSubMode.FLOATING): + # 只有在 AI 模式下才進行 ONNX 推論 + if onnx_input.size == 0: + time.sleep(0.02) + continue + _, action_raw = self.policy.get_action_for_hardware(onnx_input) + self._last_action[:] = action_raw + final_cmd = default_pose + action_raw * state.tuning_params.action_scale + elif sub_mode == ControlSubMode.JOINT_TEST: + final_cmd = default_pose + state.joint_test_offsets + elif sub_mode == ControlSubMode.MANUAL_CTRL: + final_cmd = state.manual_final_ctrl + elif sub_mode == ControlSubMode.IDLE: + command_to_send = "stop\n" + final_cmd = default_pose + + # 若尚未決定指令字串,則根據 final_cmd 產生 move 指令 + if command_to_send is None: + action_str = ' '.join(f"{v:.4f}" for v in final_cmd) + command_to_send = f"move all {action_str}\n" + + # 實際寫入序列埠 + if self.ser and self.ser.is_open: + try: + self.ser.write(command_to_send.encode('utf-8')) + except serial.SerialException: + self.stop() + + # 更新全域狀態,供 UI 即時顯示 + with self.global_state.lock: + state.hardware.latest_onnx_input = onnx_input.copy() + state.hardware.latest_action_raw = action_raw.copy() + state.hardware.latest_final_ctrl = final_cmd.copy() + + loop_duration = time.perf_counter() - loop_start + sleep_time = (1.0 / self.config.control_freq) - loop_duration + if sleep_time > 0: + time.sleep(sleep_time) diff --git a/controllers/simulation_controller.py b/controllers/simulation_controller.py new file mode 100644 index 0000000..d1fb997 --- /dev/null +++ b/controllers/simulation_controller.py @@ -0,0 +1,265 @@ +"""Run MuJoCo simulation in a background thread. + +以背景執行緒方式運行模擬,並整合新的 OperatingMode 與 ControlSubMode 架構。""" +from __future__ import annotations + +import threading +import time +from typing import TYPE_CHECKING +import numpy as np + +from utils.logger import log +from mock_simulation import MockSimulation +from state import OperatingMode, ControlSubMode + +try: + import mujoco +except ImportError: # 無頭環境可能沒有安裝 + mujoco = None + +if TYPE_CHECKING: + from state import SimulationState + + +class SimulationController: + """在獨立執行緒中運行 MuJoCo 模擬。""" + + def __init__(self, state: SimulationState) -> None: + self.state = state + self.sim = state.sim + self.config = state.config + + # 其他模組參考 + self.policy_manager = state.policy_manager_ref + self.terrain_manager = state.terrain_manager_ref + self.floating_controller = state.floating_controller_ref + self.hardware_controller = state.hardware_controller_ref + + self._running = threading.Event() + self.thread: threading.Thread | None = None + + # 手動模式下懸浮控制器啟動狀態 + self._manual_float_active = False + + # ------------------------------------------------------------------ + def start(self) -> None: + if self.thread and self.thread.is_alive(): + return + self._running.set() + self.thread = threading.Thread(target=self.run, daemon=True) + self.thread.start() + + def stop(self) -> None: + self._running.clear() + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=1) + + # ------------------------------------------------------------------ + def _initialize_simulation_state(self) -> None: + if isinstance(self.sim, MockSimulation): + log.info("[MOCK] Skip simulation state initialization.") + return + if self.terrain_manager.is_functional: + self.terrain_manager.reset() + self.hard_reset() + log.info("\n--- Simulation Started ---") + + # ------------------------------------------------------------------ + def run(self) -> None: + is_headless = isinstance(self.sim, MockSimulation) + if not is_headless: + self.sim.initialize_window_and_context(self.state) + self._initialize_simulation_state() + + while self._running.is_set(): + # --- 檢查關閉請求 --- + with self.state.lock: + shutdown_req = self.state.shutdown_requested + should_close = shutdown_req or (not is_headless and self.sim.should_close()) + if should_close: + if shutdown_req and not is_headless and not self.sim.should_close(): + from glfw import set_window_should_close + set_window_should_close(self.sim.window, 1) + self._running.clear() + from nicegui import app + app.shutdown() + continue + + # --- 處理請求並讀取當前模式 --- + self.process_requests() + with self.state.lock: + op_mode = self.state.operating_mode + sub_mode = self.state.control_sub_mode + single_step = self.state.single_step_mode + execute_one = self.state.execute_one_step + + if single_step and not execute_one: + self.sim.render_from_thread(self.state) + time.sleep(0.01) + continue + if execute_one: + with self.state.lock: + self.state.execute_one_step = False + + # 只有在模擬模式下才進行物理步進 + if not is_headless and op_mode == OperatingMode.SIMULATION: + self._simulation_step() + else: + time.sleep(1.0 / 60.0) + + self.update_derived_states_and_render() + + log.info("模擬執行緒已停止。") + + # ------------------------------------------------------------------ + def process_requests(self) -> None: + """檢查並處理所有待處理的狀態變更請求。""" + with self.state.lock: + hard_reset = self.state.hard_reset_requested + soft_reset = self.state.soft_reset_requested + if hard_reset: + self.state.hard_reset_requested = False + if soft_reset: + self.state.soft_reset_requested = False + + # 啟停硬體控制器 + self.manage_hardware_controller() + + if not isinstance(self.sim, MockSimulation): + if hard_reset: + self.hard_reset() + if soft_reset: + self.soft_reset() + + def manage_hardware_controller(self) -> None: + """根據頂層操作模式決定是否啟動硬體控制器。""" + with self.state.lock: + op_mode = self.state.operating_mode + is_hw_running = self.hardware_controller.is_running + if op_mode == OperatingMode.HARDWARE and not is_hw_running: + log.info("偵測到切換至硬體模式,啟動硬體控制器...") + threading.Thread(target=self.hardware_controller.start, daemon=True).start() + elif op_mode == OperatingMode.SIMULATION and is_hw_running: + log.info("偵測到切換至模擬模式,停止硬體控制器...") + threading.Thread(target=self.hardware_controller.stop, daemon=True).start() + + # ------------------------------------------------------------------ + def update_derived_states_and_render(self) -> None: + with self.state.lock: + op_mode = self.state.operating_mode + sub_mode = self.state.control_sub_mode + manual_float_req = self.state.manual_mode_is_floating + sim_pos = self.state.sim_latest_pos + terrain_mode = self.state.terrain_mode + + if op_mode == OperatingMode.SIMULATION: + is_manual_mode = sub_mode in (ControlSubMode.JOINT_TEST, ControlSubMode.MANUAL_CTRL) + if is_manual_mode and manual_float_req and not self._manual_float_active: + self.floating_controller.enable(sim_pos) + self._manual_float_active = True + elif (not is_manual_mode or not manual_float_req) and self._manual_float_active: + self.floating_controller.disable() + self._manual_float_active = False + + is_headless = isinstance(self.sim, MockSimulation) + if not is_headless: + if self.terrain_manager.is_functional and self.terrain_manager.needs_physics_and_scene_update: + mujoco.mj_forward(self.sim.model, self.sim.data) + mujoco.mjr_uploadHField(self.sim.model, self.sim.context, self.terrain_manager.hfield_id) + self.terrain_manager.needs_physics_and_scene_update = False + log.info("✅ 地形物理與渲染已同步更新。") + + with self.state.lock: + self.state.sim_latest_pos = self.sim.data.body('torso').xpos.copy() + self.state.sim_latest_quat = self.sim.data.body('torso').xquat.copy() + self.state.sim_latest_joint_positions = self.sim.data.qpos[7:].copy() + + if self.terrain_manager.is_functional: + self.terrain_manager.update(sim_pos, terrain_mode) + + self.sim.render_from_thread(self.state) + + # ------------------------------------------------------------------ + def _simulation_step(self) -> None: + with self.state.lock: + command = self.state.command.copy() + sub_mode = self.state.control_sub_mode + tuning = self.state.tuning_params + + onnx_input, action_final = self.policy_manager.get_action(command) + + if sub_mode == ControlSubMode.MANUAL_CTRL: + with self.state.lock: + final_ctrl = self.state.manual_final_ctrl.copy() + elif sub_mode == ControlSubMode.JOINT_TEST: + with self.state.lock: + final_ctrl = self.sim.default_pose + self.state.joint_test_offsets + else: # WALKING or FLOATING + final_ctrl = self.sim.default_pose + action_final * tuning.action_scale + + self.sim.apply_position_control(final_ctrl, tuning) + + with self.state.lock: + self.state.sim_latest_onnx_input = onnx_input.flatten() + self.state.sim_latest_action_raw = action_final + self.state.sim_latest_final_ctrl = final_ctrl + + # 使用 state.control_dt 以支援動態控制頻率 + target_time = self.sim.data.time + self.state.control_dt + while self.sim.data.time < target_time: + if not self._running.is_set(): + break + mujoco.mj_step(self.sim.model, self.sim.data) + + # ------------------------------------------------------------------ + def hard_reset(self) -> None: + log.info("\n--- 正在執行機器人硬重置 ---") + with self.state.lock: + if self.state.operating_mode == OperatingMode.HARDWARE: + log.warning("硬重置請求在硬體模式下被忽略。") + return + terrain_name = self.terrain_manager.get_current_terrain_name_simple(self.state) + start_z_offset = 1.5 if terrain_name in ["Pyramid", "Stepped Pyramid"] else 0.3 + mujoco.mj_resetData(self.sim.model, self.sim.data) + self.sim.data.qpos[0], self.sim.data.qpos[1] = 0, 0 + start_ground_z = self.terrain_manager.get_height_at(0, 0) + self.sim.data.qpos[2] = start_ground_z + start_z_offset + self.sim.data.qpos[3:7] = np.array([1., 0, 0, 0]) + self.sim.data.qpos[7:] = self.sim.default_pose + self.sim.data.qvel[:] = 0 + self.sim.data.ctrl[:] = self.sim.default_pose + # 先步進數個小步,讓姿態與速度回到穩定狀態,避免初始化雜訊 + for _ in range(10): + mujoco.mj_step(self.sim.model, self.sim.data) + self.policy_manager.reset() + with self.state.lock: + if self.state.control_sub_mode == ControlSubMode.FLOATING: + self.state.control_sub_mode = ControlSubMode.WALKING + self.state.command.fill(0.0) + self.state.joint_test_offsets.fill(0.0) + self.state.manual_final_ctrl.fill(0.0) + self.state.manual_mode_is_floating = False + if self._manual_float_active: + self.floating_controller.disable() + self._manual_float_active = False + mujoco.mj_forward(self.sim.model, self.sim.data) + + def soft_reset(self) -> None: + log.info("\n--- 正在執行空中姿態重置 ---") + with self.state.lock: + if self.state.operating_mode == OperatingMode.HARDWARE: + log.warning("軟重置請求在硬體模式下被忽略。") + return + self.sim.data.qpos[3:7] = np.array([1., 0, 0, 0]) + self.sim.data.qpos[7:] = self.sim.default_pose + self.sim.data.qvel[:] = 0 + self.policy_manager.reset() + with self.state.lock: + self.state.command.fill(0.0) + self.state.joint_test_offsets.fill(0.0) + self.state.manual_final_ctrl.fill(0.0) + self.state.manual_mode_is_floating = False + if self._manual_float_active: + self.floating_controller.disable() + self._manual_float_active = False + mujoco.mj_forward(self.sim.model, self.sim.data) diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/observation.py b/core/observation.py similarity index 94% rename from observation.py rename to core/observation.py index e69b9c0..ec7a3c8 100644 --- a/observation.py +++ b/core/observation.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, List, Dict if TYPE_CHECKING: - from config import AppConfig + from utils.config import AppConfig class ObservationBuilder: def __init__(self, data, model, torso_id, default_pose, config: 'AppConfig'): @@ -70,11 +70,15 @@ def get_observation(self, command, last_action) -> np.ndarray: return np.concatenate(obs_list).astype(np.float32) def _get_torso_inverse_rotation(self): + """取得軀幹姿態的逆四元數 (單位化後的共軛)""" torso_quat = self.data.xquat[self.torso_id] - norm = np.sum(np.square(torso_quat)) - if norm < 1e-8: torso_quat = np.array([1., 0, 0, 0]) - torso_quat /= np.sqrt(np.sum(np.square(torso_quat))) - return np.array([torso_quat[0], -torso_quat[1], -torso_quat[2], -torso_quat[3]]) / np.sum(np.square(torso_quat)) + norm_sq = np.sum(torso_quat ** 2) + if norm_sq < 1e-8: + return np.array([1., 0, 0, 0]) + torso_quat /= np.sqrt(norm_sq) + q_inv = torso_quat.copy() + q_inv[1:] *= -1 # 共軛 + return q_inv def _rotate_vec_by_quat_inv(self, v, q_inv): u, s = q_inv[1:], q_inv[0] diff --git a/policy.py b/core/policy.py similarity index 93% rename from policy.py rename to core/policy.py index a3be771..ee9b97a 100644 --- a/policy.py +++ b/core/policy.py @@ -9,8 +9,8 @@ # 為了型別提示,避免循環匯入 if TYPE_CHECKING: - from config import AppConfig - from observation import ObservationBuilder + from utils.config import AppConfig + from core.observation import ObservationBuilder from rendering import DebugOverlay class PolicyManager: @@ -42,6 +42,11 @@ def __init__(self, config: 'AppConfig', obs_builder: 'ObservationBuilder', overl print(f" ⚠️ 警告: 模型 '{name}' 缺少 'path' 或 'observation_recipe',已跳過。") continue + # 若需求配方包含 accelerometer 但模型中不存在,則跳過以避免維度錯誤 + if 'accelerometer' in recipe and getattr(self.obs_builder, 'accelerometer_id', -1) == -1: + print(f" ⚠️ 模型 '{name}' 需要 'accelerometer' 感測器,但在 XML 中找不到,已跳過。") + continue + print(f" - 載入模型 '{name}' 從: {path}") try: # --- ONNX Runtime 優化與載入 --- @@ -153,9 +158,12 @@ def get_action(self, command: np.ndarray) -> tuple[np.ndarray, np.ndarray]: # 將歷史觀察拼接成 ONNX 的最終輸入 onnx_input = np.concatenate(list(self.obs_histories[name])).astype(np.float32).reshape(1, -1) - + # 檢查維度是否匹配,以防萬一 - if onnx_input.shape[1] != session.get_inputs()[0].shape[1]: + expected_dim = session.get_inputs()[0].shape[1] + actual_dim = onnx_input.shape[1] + if actual_dim != expected_dim: + print(f"⚠️ 模型 '{name}' ONNX 輸入維度不符: expected {expected_dim}, got {actual_dim}") action_raw = np.zeros(self.config.num_motors, dtype=np.float32) else: input_name = session.get_inputs()[0].name @@ -206,8 +214,11 @@ def get_action_for_hardware(self, observation: np.ndarray) -> tuple[np.ndarray, self.obs_histories[name].append(observation) onnx_input = np.concatenate(list(self.obs_histories[name])).astype(np.float32).reshape(1, -1) - - if onnx_input.shape[1] != session.get_inputs()[0].shape[1]: + + expected_dim = session.get_inputs()[0].shape[1] + actual_dim = onnx_input.shape[1] + if actual_dim != expected_dim: + print(f"⚠️ 模型 '{name}' ONNX 輸入維度不符: expected {expected_dim}, got {actual_dim}") action_raw = np.zeros(self.config.num_motors, dtype=np.float32) else: input_name = session.get_inputs()[0].name diff --git a/simulation.py b/core/simulation.py similarity index 93% rename from simulation.py rename to core/simulation.py index 560683b..a7b5f59 100644 --- a/simulation.py +++ b/core/simulation.py @@ -6,12 +6,12 @@ import time # 新增: 提供初始化延遲 from typing import TYPE_CHECKING, Optional -from config import AppConfig +from utils.config import AppConfig from state import SimulationState, TuningParams from rendering import DebugOverlay if TYPE_CHECKING: - from keyboard_input_handler import KeyboardInputHandler + from inputs.keyboard_input_handler import KeyboardInputHandler class Simulation: """ @@ -55,7 +55,7 @@ def __init__(self, config: AppConfig): # 視窗與渲染上下文將在模擬執行緒中初始化 - def initialize_window_and_context(self): + def initialize_window_and_context(self, state: SimulationState): """在當前執行緒中初始化 GLFW 與渲染上下文。""" if not glfw.init(): sys.exit("❌ 錯誤: GLFW 初始化失敗。") @@ -73,7 +73,7 @@ def initialize_window_and_context(self): self.cam = mujoco.MjvCamera() self.opt = mujoco.MjvOption() - self.overlay = DebugOverlay() + self.overlay = DebugOverlay(state) mujoco.mjv_defaultCamera(self.cam) mujoco.mjv_defaultOption(self.opt) self.cam.distance, self.cam.elevation, self.cam.azimuth = 2.5, -20, 90 @@ -144,8 +144,12 @@ def apply_position_control(self, target_pos: np.ndarray, params: TuningParams): self.model.actuator_biasprm[:, 1] = -params.kp self.model.actuator_biasprm[:, 2] = -params.kd self.data.ctrl[:] = target_pos - force_bias = np.full(self.config.num_motors, params.bias) - self.data.qfrc_applied[6:] = force_bias + # 若啟用 bias,才施加額外力矩,否則清零 + if params.bias_enabled: + force_bias = np.full(self.config.num_motors, params.bias) + self.data.qfrc_applied[6:] = force_bias + else: + self.data.qfrc_applied[6:] = 0.0 def step(self, state: SimulationState): while self.data.time < state.control_timer: diff --git a/terrain_manager.py b/core/terrain_manager.py similarity index 99% rename from terrain_manager.py rename to core/terrain_manager.py index c24c0da..b255c47 100644 --- a/terrain_manager.py +++ b/core/terrain_manager.py @@ -5,7 +5,7 @@ from typing import Dict, Optional, Callable, Tuple from datetime import datetime from PIL import Image -from logger import log +from utils.logger import log # 為了型別提示,避免循環匯入 from typing import TYPE_CHECKING diff --git a/crc_debug.py b/crc_debug.py new file mode 100644 index 0000000..039a9ad --- /dev/null +++ b/crc_debug.py @@ -0,0 +1,27 @@ +"""簡易 CRC-8 驗證工具 (CRC-8/ATM, poly=0x07) +This script outputs the CRC-8 for a fixed float array so that +Teensy firmware can cross-check its implementation. +""" + +import os +import sys +import struct +import numpy as np + +# 將專案根目錄加入搜尋路徑, 確保可匯入 controllers 模組 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from controllers.hardware_controller import _crc8 + + +def main() -> None: + # 固定測試資料: 1.0 ~ 34.0 (float32) + test_data = np.arange(1.0, 35.0, dtype=np.float32) + # 以 little-endian 打包成 bytes (與硬體端一致) + float_bytes = struct.pack('<' + 'f'*len(test_data), *test_data) + crc = _crc8(float_bytes) + print("PC calculated CRC:", crc) + + +if __name__ == "__main__": + main() diff --git a/hardware_controller.py b/hardware_controller.py deleted file mode 100644 index 9158f54..0000000 --- a/hardware_controller.py +++ /dev/null @@ -1,294 +0,0 @@ -# hardware_controller.py -import serial -import serial.tools.list_ports -import threading -import time -from logger import log -import re -import numpy as np -from scipy.spatial.transform import Rotation -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from config import AppConfig - from policy import PolicyManager - from state import SimulationState - from serial_communicator import SerialCommunicator - -class RobotStateHardware: - """【修改】只儲存AI決策所需的、單位正確的數據。""" - def __init__(self): - # 這些是直接從Teensy的POLICY_STREAM解析來的數據 - self.angular_velocity_radps = np.zeros(3, dtype=np.float32) - self.gravity_vector_norm = np.zeros(3, dtype=np.float32) - self.accelerometer_ms2 = np.zeros(3, dtype=np.float32) - self.pitch_rad = 0.0 - self.joint_positions_rad = np.zeros(12, dtype=np.float32) - self.joint_velocities_radps = np.zeros(12, dtype=np.float32) - - # 這些是PC端自己維護的狀態 - self.last_action = np.zeros(12, dtype=np.float32) - self.command = np.zeros(3, dtype=np.float32) - - self.last_update_time = 0.0 - -class HardwareController: - """【修改版】管理與實體硬體的AI控制迴圈,從SerialCommunicator借用連接。""" - - def __init__(self, config: 'AppConfig', policy: 'PolicyManager', global_state: 'SimulationState', serial_comm: 'SerialCommunicator'): - """【修改】初始化時接收 SerialCommunicator 的參考。""" - self.config = config - self.policy = policy - self.global_state = global_state - self.serial_comm = serial_comm - - self.ser = None - self.is_running = False - self.read_thread = None - self.control_thread = None - - self.hw_state = RobotStateHardware() - self.lock = threading.Lock() - self.ai_control_enabled = threading.Event() - - self.foot_positions_in_body = np.array([ - [-0.0804, -0.1759, -0.1964], - [ 0.0806, -0.1759, -0.1964], - [-0.0804, 0.0239, -0.1964], - [ 0.0806, 0.0239, -0.1964], - ], dtype=np.float32) - log.info("✅ 硬體控制器已初始化。") - - def start_controller_threads(self): - """在啟動背景執行緒前,自動命令Teensy切換到POLICY_STREAM模式。""" - if self.is_running: - log.info("硬體控制器已在運行中。") - return - - # 所有前置檢查邏輯 - if not self.serial_comm.is_connected: - log.error("❌ 硬體模式錯誤:請先連接序列埠。") - self.global_state.set_control_mode("WALKING") - return - - self.ser = self.serial_comm.get_serial_connection() - if not self.ser: - log.error("❌ 硬體模式錯誤:無法取得有效序列埠連接。") - self.global_state.set_control_mode("WALKING") - return - - # 接管序列埠控制權 - log.info(f"✅ 硬體控制器已接管序列埠 {self.ser.port} 的控制權。") - self.serial_comm.is_managed_by_hardware_controller = True - - # 自動切換 Teensy 到 AI 決策流模式 - try: - log.info(" -> 正在命令 Teensy 切換至 POLICY_STREAM 模式...") - self.ser.write(b"monitor p\n") - time.sleep(0.1) # 給Teensy一點時間切換模式並清空緩衝區 - self.ser.reset_input_buffer() - log.info(" -> Teensy 模式切換指令已發送。") - except serial.SerialException as e: - log.error(f"❌ 發送模式切換指令失敗: {e}") - self.stop_controller_threads() # 使用現有的停止函式來清理 - return - - # 後續的執行緒啟動邏輯 - self.ai_control_enabled.clear() - with self.global_state.lock: - self.global_state.hardware_ai_is_active = False - - self.is_running = True - self.read_thread = threading.Thread(target=self._read_from_port, daemon=True) - self.read_thread.start() - self.control_thread = threading.Thread(target=self._control_loop, daemon=True) - self.control_thread.start() - - with self.global_state.lock: - self.global_state.hardware_is_connected = True - log.info("✅ 硬體控制執行緒已啟動。") - - def stop_controller_threads(self): - """【修改】在停止執行緒後,自動命令Teensy恢復到安全的人類友好模式。""" - if not self.is_running: return - - log.info("正在停止硬體控制器...") - self.is_running = False - self.disable_ai() - self.ai_control_enabled.set() - - # 在交還控制權前,命令Teensy恢復安全狀態 - if self.ser and self.ser.is_open: - try: - log.info(" -> 正在命令 Teensy 停止運動並恢復 HUMAN 遙測模式...") - self.ser.write(b"stop\n") - time.sleep(0.05) - self.ser.write(b"monitor h\n") - time.sleep(0.05) - except serial.SerialException: - log.warning(" -> 警告: 發送停止指令失敗,可能連接已斷開。") - - # 所有執行緒清理和交還控制權的邏輯 - if self.control_thread and self.control_thread.is_alive(): - self.control_thread.join(timeout=1) - if self.read_thread and self.read_thread.is_alive(): - self.read_thread.join(timeout=1) - - if self.serial_comm: - self.serial_comm.is_managed_by_hardware_controller = False - log.info("序列埠控制權已交還。") - - self.ser = None - log.info("硬體控制器已完全停止。") - - def enable_ai(self): - if not self.is_running: - log.info("無法啟用 AI:硬體控制器未運行。") - return - log.info("🤖 AI 控制已啟用。") - self.policy.reset() - self.ai_control_enabled.set() - self.global_state.hardware_ai_is_active = True - - def disable_ai(self): - log.info("⏸️ AI 控制已暫停。") - self.ai_control_enabled.clear() - self.global_state.hardware_ai_is_active = False - if self.is_running and self.ser and self.ser.is_open: # 增加 is_running 判斷 - try: self.ser.write(b"stop\n") - except serial.SerialException as e: - log.error(f"發送停止指令失敗: {e}") - - def parse_policy_stream(self, line: str): - """ - 專門解析來自 Teensy 'monitor p' 模式的 34 維數據流。 - """ - try: - parts = line.split(',') - if len(parts) != 34: - return - - data_vec = np.array(parts, dtype=np.float32) - - with self.lock: - self.hw_state.angular_velocity_radps[:] = data_vec[0:3] - self.hw_state.gravity_vector_norm[:] = data_vec[3:6] - self.hw_state.accelerometer_ms2[:] = data_vec[6:9] - self.hw_state.pitch_rad = data_vec[9] - self.hw_state.joint_positions_rad[:] = data_vec[10:22] - self.hw_state.joint_velocities_radps[:] = data_vec[22:34] - self.hw_state.last_update_time = time.time() - - except (ValueError, IndexError) as e: - log.error(f"❌ 解析 POLICY_STREAM 時出錯: {e} | 原始數據長度: {len(parts)}") - - def construct_observation(self) -> np.ndarray: - """ - [全新重構] 從 hw_state 中直接獲取數據,並拼接成最終的 ONNX 輸入向量。 - 不再需要任何客戶端的估算。 - """ - with self.lock: - # 【保留】從全域狀態獲取用戶指令 - self.hw_state.command = self.global_state.command * np.array(self.config.command_scaling_factors) - - # 【修改】建立一個全新的、數據源清晰的字典 - obs_list = { - # --- 來自Teensy的高品質數據 --- - 'angular_velocity': self.hw_state.angular_velocity_radps, - 'gravity_vector': self.hw_state.gravity_vector_norm, - 'accelerometer': self.hw_state.accelerometer_ms2, - 'pitch': np.array([self.hw_state.pitch_rad]), # 確保是1維向量 - 'joint_positions': self.hw_state.joint_positions_rad, - 'joint_velocities': self.hw_state.joint_velocities_radps, - - # --- PC端自行維護的狀態 --- - 'last_action': self.hw_state.last_action, - 'commands': self.hw_state.command, - - # --- 為了兼容性,保留舊的鍵,用零填充 --- - 'linear_velocity': np.zeros(3), - } - - # 【保留】由配方驅動的拼接邏輯 - recipe = self.policy.get_active_recipe() - if not recipe: - log.warning("⚠️ 警告: 無法從策略管理器獲取有效的觀察配方。") - return np.array([]) - - try: - final_obs_list = [obs_list[key] for key in recipe] - return np.concatenate(final_obs_list).astype(np.float32) - except KeyError as e: - log.error(f"❌ 觀察向量構建失敗:配方中需求的 '{e}' 不在 obs_list 中。") - return np.array([]) - - def _read_from_port(self): - log.info("[硬體讀取線程已啟動] 等待來自 Teensy 的 POLICY_STREAM 數據...") - while self.is_running: - if not self.ser or not self.ser.is_open: - # 【修改】確保呼叫正確的停止函式 - self.stop_controller_threads() - break - try: - line = self.ser.readline().decode('utf-8', errors='ignore').strip() - if line: - # 【修改】呼叫新的解析器 - self.parse_policy_stream(line) - except (serial.SerialException, OSError): - log.error("❌ 錯誤:序列埠斷開連接或讀取錯誤。") - self.stop_controller_threads() - break - except Exception as e: - log.error(f"❌ _read_from_port 發生未知錯誤: {e}") - - def _control_loop(self): - """【保留大部分邏輯】此迴圈現在是硬體指令的唯一來源,並能感知 JOINT_TEST 模式。""" - log.info("\n--- 硬體控制執行緒已就緒 ---") - # 【保留】 default_pose_hardware 保持不變 - default_pose_hardware = self.global_state.sim.default_pose - while self.is_running: - loop_start_time = time.perf_counter() - command_to_send = None - - # 【保留】 JOINT_TEST 模式的指令生成邏輯不變 - if self.global_state.control_mode == "JOINT_TEST": - final_command = default_pose_hardware + self.global_state.joint_test_offsets - action_str = ' '.join(f"{a:.4f}" for a in final_command) - command_to_send = f"move all {action_str}\n" - - # 【保留】 HARDWARE_MODE 的指令生成邏輯 - elif self.global_state.control_mode == "HARDWARE_MODE": - # 【保留】等待AI啟用信號 - self.ai_control_enabled.wait() - if not self.is_running: break - - # 【保留】呼叫新的 construct_observation 函式 - # 這個函式現在會使用 RobotStateHardware 中更新過的高品質數據 - observation = self.construct_observation() - if observation.size == 0: - time.sleep(0.02); continue - - # 【保留】運行AI策略獲取動作 - _, action_raw = self.policy.get_action_for_hardware(observation) - with self.lock: - # 【保留】更新 last_action - self.hw_state.last_action[:] = action_raw - - # 【保留】生成發送給Teensy的文字指令 - final_command = default_pose_hardware + action_raw * self.global_state.tuning_params.action_scale - action_str = ' '.join(f"{a:.4f}" for a in final_command) - command_to_send = f"move all {action_str}\n" - - # 【保留】發送指令邏輯 - if command_to_send and self.ser and self.ser.is_open: - try: - self.ser.write(command_to_send.encode('utf-8')) - except serial.SerialException: - # 【修改】確保呼叫一致的停止函式 - self.stop_controller_threads() # <--- 這裡從 self.stop() 改為 self.stop_controller_threads() - - # 【保留】迴圈時間管理 - loop_duration = time.perf_counter() - loop_start_time - sleep_time = (1.0 / self.config.control_freq) - loop_duration - if sleep_time > 0: - time.sleep(sleep_time) diff --git a/inputs/__init__.py b/inputs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/keyboard_input_handler.py b/inputs/keyboard_input_handler.py similarity index 97% rename from keyboard_input_handler.py rename to inputs/keyboard_input_handler.py index 68fc81c..db8a80e 100644 --- a/keyboard_input_handler.py +++ b/inputs/keyboard_input_handler.py @@ -4,7 +4,7 @@ except ImportError: # 無頭環境可能沒有安裝 glfw glfw = None from state import SimulationState -from logger import log +from utils.logger import log class KeyboardInputHandler: """處理所有鍵盤輸入事件,並根據當前模式進行分派。""" @@ -129,8 +129,9 @@ def handle_global_and_default_keys(self, window, key, action): # --- 長按事件 (重複觸發) --- if action in [glfw.PRESS, glfw.REPEAT]: step = self.config.keyboard_velocity_adjust_step + # 指令順序為 [vy, vx, wz] with self.state.lock: - if key == glfw.KEY_Q: + if key == glfw.KEY_Q: self.state.command[2] += step return elif key == glfw.KEY_E: @@ -158,8 +159,8 @@ def handle_global_and_default_keys(self, window, key, action): # 移動指令 step = self.config.keyboard_velocity_adjust_step - if key == glfw.KEY_W: self.state.command[1] += step - elif key == glfw.KEY_S: self.state.command[1] -= step - elif key == glfw.KEY_A: self.state.command[0] += step - elif key == glfw.KEY_D: self.state.command[0] -= step + if key == glfw.KEY_W: self.state.command[1] += step # vx + elif key == glfw.KEY_S: self.state.command[1] -= step # vx + elif key == glfw.KEY_A: self.state.command[0] += step # vy + elif key == glfw.KEY_D: self.state.command[0] -= step # vy diff --git a/xbox_controller.py b/inputs/xbox_controller.py similarity index 100% rename from xbox_controller.py rename to inputs/xbox_controller.py diff --git a/xbox_input_handler.py b/inputs/xbox_input_handler.py similarity index 97% rename from xbox_input_handler.py rename to inputs/xbox_input_handler.py index 6e992ac..721867e 100644 --- a/xbox_input_handler.py +++ b/inputs/xbox_input_handler.py @@ -1,6 +1,6 @@ # xbox_input_handler.py from state import SimulationState -from xbox_controller import XboxController +from .xbox_controller import XboxController import threading import time @@ -46,6 +46,7 @@ def _update_loop(self) -> None: with self.state.lock: if self.state.input_mode == "GAMEPAD": + # 指令順序為 [vy, vx, wz] self.state.command[0] = current_input['left_analog_x'] * self.config.gamepad_sensitivity['vy'] self.state.command[1] = current_input['left_analog_y'] * self.config.gamepad_sensitivity['vx'] * -1 self.state.command[2] = current_input['right_analog_x'] * self.config.gamepad_sensitivity['wz'] diff --git a/main.py b/main.py deleted file mode 100644 index 6bb0377..0000000 --- a/main.py +++ /dev/null @@ -1,181 +0,0 @@ -# main.py -import sys -import numpy as np -import mujoco -import time - -from config import load_config -from state import SimulationState -from simulation import Simulation -from policy import PolicyManager -from observation import ObservationBuilder -from rendering import DebugOverlay -from keyboard_input_handler import KeyboardInputHandler -from xbox_input_handler import XboxInputHandler -from floating_controller import FloatingController -from serial_communicator import SerialCommunicator -from terrain_manager import TerrainManager -from hardware_controller import HardwareController - -def main(): - """主程式入口:初始化所有組件並運行模擬迴圈。""" - from xbox_controller import XboxController - print("\n--- 機器人模擬控制器 (含硬體與多模型模式) ---") - - # --- 1. 初始化核心組件 --- - config = load_config() - state = SimulationState(config) - sim = Simulation(config) - - # --- 2. 【核心修改】將核心物件的參考存入 state,使其成為全域上下文 --- - state.sim = sim - - # --- 3. 按照依賴順序初始化所有管理器 --- - terrain_manager = TerrainManager(sim.model, sim.data) - state.terrain_manager_ref = terrain_manager - - floating_controller = FloatingController(config, sim.model, sim.data, terrain_manager) - state.floating_controller_ref = floating_controller - - serial_comm = SerialCommunicator() - state.serial_communicator_ref = serial_comm # 將 serial_comm 存入 state - - xbox_handler = XboxInputHandler(state) - - obs_builder = ObservationBuilder(sim.data, sim.model, sim.torso_id, sim.default_pose, config) - # 在無 GUI 版本中仍建立 DebugOverlay 以顯示文字資訊 - overlay = DebugOverlay() - - policy_manager = PolicyManager(config, obs_builder, overlay) - state.policy_manager_ref = policy_manager - state.available_policies = policy_manager.model_names - - # 將 serial_comm 傳入 HardwareController 的建構函式 - hw_controller = HardwareController(config, policy_manager, state, serial_comm) - state.hardware_controller_ref = hw_controller - - # KeyboardInputHandler 不再需要直接傳入 serial_comm - keyboard_handler = KeyboardInputHandler(state, xbox_handler, terrain_manager) - # 先註冊回呼,待初始化視窗後會正式生效 - sim.register_callbacks(keyboard_handler) - # 初始化 GLFW 視窗與渲染上下文 - sim.initialize_window_and_context() - - # --- 4. 定義重置函式 --- - def hard_reset(): - print("\n--- 正在執行機器人硬重置 (R Key) ---") - if state.control_mode == "HARDWARE_MODE": return - mujoco.mj_resetData(sim.model, sim.data) - sim.data.qpos[0], sim.data.qpos[1] = 0, 0 - start_ground_z = terrain_manager.get_height_at(0, 0) - robot_height_offset = 0.3 - sim.data.qpos[2] = start_ground_z + robot_height_offset - print(f"機器人重置至原點:地形高度({start_ground_z:.2f}m) + 偏移({robot_height_offset:.2f}m) = 世界Z({sim.data.qpos[2]:.2f}m)") - sim.data.qpos[3:7] = np.array([1., 0, 0, 0]) - sim.data.qpos[7:] = sim.default_pose - sim.data.qvel[:] = 0 - sim.data.ctrl[:] = sim.default_pose - for _ in range(10): mujoco.mj_step(sim.model, sim.data) - policy_manager.reset() - if state.control_mode == "FLOATING": state.set_control_mode("WALKING") - state.reset_control_state(sim.data.time) - state.clear_command() - state.joint_test_offsets.fill(0.0) - state.manual_final_ctrl.fill(0.0) - state.manual_mode_is_floating = False - state.hard_reset_requested = False - mujoco.mj_forward(sim.model, sim.data) - - def soft_reset(): - print("\n--- 正在執行空中姿態重置 (X Key) ---") - if state.control_mode == "HARDWARE_MODE": return - sim.data.qpos[3:7] = np.array([1., 0, 0, 0]) - sim.data.qpos[7:] = sim.default_pose - sim.data.qvel[:] = 0 - policy_manager.reset() - state.clear_command() - state.joint_test_offsets.fill(0.0) - state.manual_final_ctrl.fill(0.0) - state.manual_mode_is_floating = False - mujoco.mj_forward(sim.model, sim.data) - state.soft_reset_requested = False - - # --- 5. 啟動程序 --- - if terrain_manager.is_functional: - terrain_manager.initial_generate() - hard_reset() - - # 【快捷鍵變更】更新啟動時的提示文字 - print("\n--- Simulation Started (SPACE: Pause, N: Step) ---") - print(" (F: Float, G: Joint Test, B: Manual Ctrl, H: Hardware Mode)") - print(" (M: Input Mode, R: Hard Reset, X: Soft Reset)") - print(" (Y: Regen Terrain, P: Save Terrain PNG, 1..: Select Policy)") - print(" (V: Cycle Terrain, K: Toggle HW AI)") - print(" ( ~ : Toggle Serial Console )") # 新增此行,替換舊的 T 鍵提示 - - state.execute_one_step = False - - # --- 6. 主模擬迴圈 --- - while not sim.should_close(): - if state.single_step_mode and not state.execute_one_step: - sim.render(state) - continue - if state.execute_one_step: state.execute_one_step = False - - if state.input_mode == "GAMEPAD": xbox_handler.update_state() - if state.hard_reset_requested: hard_reset() - if state.soft_reset_requested: soft_reset() - - state.latest_pos = sim.data.body('torso').xpos.copy() - state.latest_quat = sim.data.body('torso').xquat.copy() - - if terrain_manager.is_functional: - terrain_manager.update(state.latest_pos, state.terrain_mode) - - if state.control_mode == "HARDWARE_MODE": - if hw_controller.is_running: - with hw_controller.lock: - t_since_update = time.time() - hw_controller.hw_state.last_update_time - conn_status = f"Data Delay: {t_since_update:.2f}s" if t_since_update < 1.0 else "Data Timeout!" - state.hardware_status_text = f"Connection Status: {conn_status}\n" - state.hardware_status_text += f"LinVel: {np.array2string(hw_controller.hw_state.lin_vel_local, precision=2)}\n" - state.hardware_status_text += f"Gyro: {np.array2string(hw_controller.hw_state.imu_gyro_radps, precision=2)}" - else: - state.hardware_status_text = "Hardware controller not running." - - elif state.control_mode == "SERIAL_MODE": - pass - else: # 模擬模式 (WALKING, FLOATING, etc.) - if state.single_step_mode: print("\n" + "="*20 + f" STEP AT TIME {sim.data.time:.4f} " + "="*20) - - onnx_input, action_final = policy_manager.get_action(state.command) - state.latest_onnx_input = onnx_input.flatten() - state.latest_action_raw = action_final - - if state.control_mode == "MANUAL_CTRL": - final_ctrl = state.manual_final_ctrl.copy() - sim.apply_position_control(final_ctrl, state.tuning_params) - elif state.control_mode == "JOINT_TEST": - final_ctrl = sim.default_pose + state.joint_test_offsets - sim.apply_position_control(final_ctrl, state.tuning_params) - else: - final_ctrl = sim.default_pose + action_final * state.tuning_params.action_scale - sim.apply_position_control(final_ctrl, state.tuning_params) - - state.latest_final_ctrl = final_ctrl - - target_time = sim.data.time + config.control_dt - while sim.data.time < target_time: - mujoco.mj_step(sim.model, sim.data) - - sim.render(state) - - # --- 7. 程式結束,清理資源 --- - hw_controller.stop() - sim.close() - xbox_handler.close() - serial_comm.close() - print("\n程式已安全退出。") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/main_nicegui.py b/main_nicegui.py index 03400b8..e616ef4 100644 --- a/main_nicegui.py +++ b/main_nicegui.py @@ -4,26 +4,26 @@ import argparse from nicegui import ui, app -from config import load_config +from utils.config import load_config from state import SimulationState -from policy import PolicyManager -from hardware_controller import HardwareController +from core.policy import PolicyManager +from controllers.hardware_controller import HardwareController from serial_communicator import SerialCommunicator -from xbox_input_handler import XboxInputHandler +from inputs.xbox_input_handler import XboxInputHandler from ui_controller import UIController -from simulation_controller import SimulationController -from keyboard_input_handler import KeyboardInputHandler -from logger import log +from controllers.simulation_controller import SimulationController +from inputs.keyboard_input_handler import KeyboardInputHandler +from utils.logger import log def create_simulation_components(use_sim: bool, config): """根據是否使用模擬,建立對應的模組實例。""" if use_sim: log.info("✅ Simulation mode enabled.") - from simulation import Simulation - from observation import ObservationBuilder - from terrain_manager import TerrainManager - from floating_controller import FloatingController + from core.simulation import Simulation + from core.observation import ObservationBuilder + from core.terrain_manager import TerrainManager + from controllers.floating_controller import FloatingController sim = Simulation(config) terrain = TerrainManager(sim.model, sim.data) @@ -67,22 +67,28 @@ def main() -> None: sim, obs_builder, terrain_manager, floating_controller = create_simulation_components(use_sim, config) - state.sim = sim - state.terrain_manager_ref = terrain_manager - state.floating_controller_ref = floating_controller + # 使用 lock 確保設定參考時的執行緒安全 + with state.lock: + state.sim = sim + state.terrain_manager_ref = terrain_manager + state.floating_controller_ref = floating_controller serial_comm = SerialCommunicator() - state.serial_communicator_ref = serial_comm + with state.lock: + state.serial_communicator_ref = serial_comm xbox_handler = XboxInputHandler(state) - state.xbox_handler_ref = xbox_handler + with state.lock: + state.xbox_handler_ref = xbox_handler policy_manager = PolicyManager(config, obs_builder, None) - state.policy_manager_ref = policy_manager - state.available_policies = policy_manager.model_names + with state.lock: + state.policy_manager_ref = policy_manager + state.available_policies = policy_manager.model_names hw_controller = HardwareController(config, policy_manager, state, serial_comm) - state.hardware_controller_ref = hw_controller + with state.lock: + state.hardware_controller_ref = hw_controller keyboard_handler = KeyboardInputHandler(state, xbox_handler, terrain_manager) sim.register_callbacks(keyboard_handler) @@ -98,7 +104,7 @@ def start_background_threads() -> None: def cleanup_resources() -> None: log.info("NiceGUI 正在關閉,釋放資源...") simulation_controller.stop() - hw_controller.stop_controller_threads() + hw_controller.stop() serial_comm.close() xbox_handler.close() sim.close() diff --git a/mock_simulation.py b/mock_simulation.py index 69fdcfc..43daaf5 100644 --- a/mock_simulation.py +++ b/mock_simulation.py @@ -30,7 +30,7 @@ def __init__(self, config): self.torso_id = 1 self._last_render_time = time.perf_counter() - def initialize_window_and_context(self): + def initialize_window_and_context(self, state=None): print("[MOCK] initialize_window_and_context() called. Doing nothing.") def register_callbacks(self, keyboard_handler): diff --git a/pyserial.py b/pyserial.py deleted file mode 100644 index 5ad666d..0000000 --- a/pyserial.py +++ /dev/null @@ -1,87 +0,0 @@ -# 引入 pyserial 套件以與序列埠通訊 -import serial -# 提供延遲功能,讓硬體有時間初始化 -import time -import sys -import threading # 使用執行緒避免阻塞式讀取 -from serial_utils import select_serial_port - -# 全域旗標,用於在主執行緒與讀取執行緒間傳遞退出訊號 -exit_signal = threading.Event() - -def read_from_port(ser): - """在背景執行緒中持續讀取來自序列埠的資料。""" - print("\n[讀取線程已啟動] 等待來自 Teensy 的消息...") - while not exit_signal.is_set(): - try: - if ser.in_waiting > 0: - response = ser.readline().decode('utf-8', errors='ignore').strip() - if response: - sys.stdout.write(f"\r[Teensy]: {response}\n") - sys.stdout.flush() - except serial.SerialException: - print("\n[讀取線程錯誤] 序列埠已斷開。") - break - except Exception as e: - print(f"\n[讀取線程未知錯誤]: {e}") - break - time.sleep(0.01) - - -def main(): - """主流程:建立連線並處理使用者輸入。""" - # 1. 自動或手動選擇序列埠 - SERIAL_PORT = select_serial_port() - if not SERIAL_PORT: - sys.exit(1) - BAUD_RATE = 115200 # 與 Teensy 端保持一致的鮑率 - ser = None - try: - print(f"\n正在嘗試連接到您選擇的埠 {SERIAL_PORT}...") - # 2. 正式打開序列埠 - ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1) - print("等待 Teensy 初始化 (0.5秒)...") - # 3. 等待硬體初始化完成 - time.sleep(0.5) - # 清空讀寫緩衝區,以免殘留資料影響通訊 - ser.reset_input_buffer() - ser.reset_output_buffer() - print("緩衝區已清空,連接準備就緒。") - # 4. 啟動背景讀取執行緒 - read_thread = threading.Thread(target=read_from_port, args=(ser,)) - read_thread.daemon = True - read_thread.start() - print("\n--- Teensy 控制台已啟動 ---") - print("您可以輸入指令 (例如: 'stop', 'printon'), 然後按 Enter。") - print("輸入 'exit' 來退出程式。") - # 5. 進入主迴圈,等待使用者輸入指令並透過序列埠傳送 - while True: - command = input() - if command.lower() == 'exit': - break - command_to_send = command + '\n' - ser.write(command_to_send.encode('utf-8')) - except serial.SerialException as e: - # 連接或傳輸過程中發生錯誤 - print(f"--- 致命錯誤 ---") - print(f"無法打開或操作序列埠 {SERIAL_PORT}。") - print(f"錯誤詳情: {e}") - except KeyboardInterrupt: - # 使用者按下 Ctrl+C 中斷 - print("\n偵測到 Ctrl+C,正在終止程式...") - except Exception as e: - # 其他未預期的錯誤 - print(f"發生未知錯誤: {e}") - finally: - # 確保背景執行緒與序列埠正確關閉 - exit_signal.set() - if ser and ser.is_open: - ser.close() - print(f"序列埠 {SERIAL_PORT} 已安全關閉。") - if 'read_thread' in locals() and read_thread.is_alive(): - read_thread.join(timeout=1) - print("程式已退出。") - -if __name__ == "__main__": - # 直接執行此檔案時,啟動主流程 - main() diff --git a/rendering.py b/rendering.py index d18652c..4c785cb 100644 --- a/rendering.py +++ b/rendering.py @@ -6,22 +6,21 @@ from typing import TYPE_CHECKING, List, Dict if TYPE_CHECKING: - from simulation import Simulation + from core.simulation import Simulation class DebugOverlay: - """ - 負責在 MuJoCo 視窗上渲染所有文字除錯資訊。 - """ - def __init__(self): + """負責在 MuJoCo 視窗上渲染文字除錯資訊。""" + + def __init__(self, state: SimulationState): self.recipe: List[str] = [] self.component_dims: Dict[str, int] = {} - + self.display_pages_content = [ ['linear_velocity', 'angular_velocity', 'gravity_vector', 'commands', 'accelerometer'], ['joint_positions', 'joint_velocities', 'last_action'], ] - state_class_ref = SimulationState - state_class_ref.num_display_pages = len(self.display_pages_content) + # [修正] 直接設定實例的頁面數,避免僅更新類別屬性造成 1/1 顯示 + state.num_display_pages = len(self.display_pages_content) def set_recipe(self, recipe: List[str]): """動態設定當前要顯示的觀察配方。""" @@ -80,7 +79,7 @@ def render_hardware_overlay(self, viewport, context, state: SimulationState): top_left_rect = mujoco.MjrRect(padding, viewport.height - panel_height - padding, panel_width, panel_height) mujoco.mjr_rectangle(top_left_rect, 0.1, 0.1, 0.1, 0.8) - ai_status = "Enabled" if state.hardware_ai_is_active else "Disabled" + ai_status = "Enabled" if state.hardware.ai_is_active else "Disabled" title = f"--- HARDWARE CONTROL MODE (AI: {ai_status}) ---" help_text = "Press 'H' to exit | 'K': Toggle AI | 'G': Joint Test | 1..: Select Policy" @@ -95,7 +94,7 @@ def render_hardware_overlay(self, viewport, context, state: SimulationState): else: policy_text = f"Active Policy: {pm.primary_policy_name}" - status_text = f"--- Real-time Hardware Status ---\n{state.hardware_status_text}" + status_text = f"--- Real-time Hardware Status ---\n{state.hardware.status_text}" sensor_text = "" hw_ctrl = state.hardware_controller_ref if hw_ctrl and hw_ctrl.is_running: @@ -133,7 +132,7 @@ def render_serial_console(self, viewport, context, state: SimulationState): title = "--- SERIAL CONSOLE MODE (Press ~ to exit) ---" mujoco.mjr_overlay(mujoco.mjtFont.mjFONT_BIG, mujoco.mjtGridPos.mjGRID_TOPLEFT, console_rect, title, " ", context) - from logger import log_queue + from utils.logger import log_queue log_text = "\n".join(list(log_queue)[-50:]) log_rect = mujoco.MjrRect(console_rect.left + 10, console_rect.bottom, console_rect.width - 20, console_rect.height - 50) mujoco.mjr_overlay(mujoco.mjtFont.mjFONT_NORMAL, mujoco.mjtGridPos.mjGRID_TOPLEFT, log_rect, "\n\n" + log_text, " ", context) diff --git a/serial_communicator.py b/serial_communicator.py index f2ac305..5fb4cbd 100644 --- a/serial_communicator.py +++ b/serial_communicator.py @@ -1,78 +1,78 @@ # serial_communicator.py import serial import time -import sys import threading import serial.tools.list_ports -from serial_utils import select_serial_port -from collections import deque -from logger import log +from utils.serial_utils import select_serial_port +from utils.logger import log + class SerialCommunicator: - """ - 【修改版】一個類別,統一管理序列埠的連接與通訊。 - 它作為唯一的連接建立者,可以將已建立的連接「出借」給其他模組(如HardwareController)使用。 - """ - def __init__(self, max_log_lines=50): # 【容量加大】將日誌行數從 15 增加到 50 - """初始化通訊器。""" - self.ser = None - self.read_thread = None + """統一管理序列埠連線的服務""" + + def __init__(self, max_log_lines: int = 50): + self.ser: serial.Serial | None = None + self.read_thread: threading.Thread | None = None self.exit_signal = threading.Event() self.is_connected = False - self.port_name = None + self.port_name: str | None = None self.is_managed_by_hardware_controller = False log.info("序列埠通訊器已初始化 (等待連接指令)。") + # -------------------------------------------------------------- + # connection helpers + # -------------------------------------------------------------- def get_serial_connection(self) -> serial.Serial | None: """返回已建立的 serial.Serial 物件,供 HardwareController 使用。""" - if self.is_connected: # 如果已連接 - return self.ser # 返回序列埠物件 - return None # 否則返回 None + if self.is_connected: + return self.ser + return None def scan_and_connect(self) -> bool: - """掃描、讓使用者選擇並連接序列埠。""" - if self.is_connected: # 如果已連接 + """掃描並連接序列埠。""" + if self.is_connected: log.info("序列埠已連接,無需重新掃描。") return True - - selected_port = self._select_serial_port() # 讓使用者選擇序列埠 - if selected_port: # 如果選擇了 - self.port_name = selected_port # 儲存埠名 - return self.connect() # 執行連接 + selected_port = self._select_serial_port() + if selected_port: + self.port_name = selected_port + return self.connect() return False def _select_serial_port(self): - """掃描並在終端機列出所有可用的序列埠供使用者選擇。""" - return select_serial_port() # 呼叫工具函式 + return select_serial_port() - def connect(self, baud_rate=115200) -> bool: - """連接到指定的序列埠並啟動讀取執行緒。""" - if not self.port_name: return False # 如果沒有埠名,返回失敗 + def connect(self, baud_rate: int = 115200) -> bool: + """連接到指定序列埠並啟動讀取執行緒。""" + if not self.port_name: + return False try: log.info(f"正在連接到 {self.port_name}...") - self.ser = serial.Serial(self.port_name, baud_rate, timeout=0.1) # 建立序列埠物件 - time.sleep(0.5) # 等待硬體初始化 - self.ser.reset_input_buffer() # 清空輸入緩衝區 - self.ser.reset_output_buffer() # 清空輸出緩衝區 - - self.exit_signal.clear() # 重置退出信號 - self.read_thread = threading.Thread(target=self._read_from_port, daemon=True) # 建立讀取執行緒 - self.read_thread.start() # 啟動執行緒 - self.is_connected = True # 設定連接旗標 + self.ser = serial.Serial(self.port_name, baud_rate, timeout=0.1) + time.sleep(0.5) + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + + self.exit_signal.clear() + self.read_thread = threading.Thread(target=self._read_from_port, daemon=True) + self.read_thread.start() + self.is_connected = True log.info(f"✅ 序列埠 {self.port_name} 連接成功。") return True - except serial.SerialException as e: # 捕捉連接錯誤 + except serial.SerialException as e: log.error(f"❌ 序列埠連接失敗: {e}") self.is_connected = False return False - def _read_from_port(self): - """[背景執行緒函式] 持續地從序列埠讀取數據並存入日誌。""" - while not self.exit_signal.is_set(): # 當未收到退出信號時 - if self.is_managed_by_hardware_controller: # 如果控制權已交給硬體控制器 - time.sleep(0.1) # 短暫休眠,避免資源競爭 - continue # 繼續下一輪迴圈 - + # -------------------------------------------------------------- + # background reading + # -------------------------------------------------------------- + def _read_from_port(self) -> None: + """背景執行緒:持續從序列埠讀取資料。""" + while not self.exit_signal.is_set(): + if self.is_managed_by_hardware_controller: + time.sleep(0.1) + continue try: if self.ser and self.ser.is_open and self.ser.in_waiting > 0: response = self.ser.readline().decode('utf-8', 'ignore').strip() @@ -82,27 +82,44 @@ def _read_from_port(self): log.error("[ERROR] Serial port disconnected.") self.is_connected = False break - time.sleep(0.01) # 短暫休眠 + time.sleep(0.01) - def send_command(self, command: str): - """向序列埠發送一個字串指令。""" + # -------------------------------------------------------------- + # send / attach / detach + # -------------------------------------------------------------- + def send_command(self, command: str) -> None: + """向序列埠發送一個字串指令 (未接管時使用)。""" if self.is_connected and command and not self.is_managed_by_hardware_controller: try: - command_to_send = command + '\n' - self.ser.write(command_to_send.encode('utf-8')) + self.ser.write((command + '\n').encode('utf-8')) except serial.SerialException as e: log.error(f"[ERROR] Send failed: {e}") self.is_connected = False + def attach_serial(self, ser: serial.Serial) -> None: + """讓 HardwareController 接管序列埠,本類進入旁路模式""" + self.is_managed_by_hardware_controller = True + self.ser = ser # 重用相同 serial 物件,避免重開 - def close(self): + def detach_serial(self) -> None: + self.is_managed_by_hardware_controller = False + if self.ser: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + + # -------------------------------------------------------------- + # teardown + # -------------------------------------------------------------- + def close(self) -> None: """安全地關閉序列埠和讀取執行緒。""" - if self.is_managed_by_hardware_controller: return # 如果硬體控制器正在管理連接,則本類別不應關閉它 + if self.is_managed_by_hardware_controller: + return - if self.read_thread and self.read_thread.is_alive(): # 如果讀取執行緒在運行 - self.exit_signal.set() # 發送退出信號 - self.read_thread.join(timeout=1) # 等待執行緒結束 - if self.ser and self.ser.is_open: # 如果序列埠已開啟 - self.ser.close() # 關閉序列埠 + if self.read_thread and self.read_thread.is_alive(): + self.exit_signal.set() + self.read_thread.join(timeout=1) + if self.ser and self.ser.is_open: + self.ser.close() log.info(f"序列埠 {self.port_name} 已安全關閉。") - self.is_connected = False # 更新連接狀態 \ No newline at end of file + self.is_connected = False + diff --git a/simulation_controller.py b/simulation_controller.py deleted file mode 100644 index 793b273..0000000 --- a/simulation_controller.py +++ /dev/null @@ -1,307 +0,0 @@ -"""Run MuJoCo simulation in a background thread.""" - -from __future__ import annotations - -import threading -import time -from typing import TYPE_CHECKING -from logger import log - -import numpy as np -from mock_simulation import MockSimulation - -try: - import mujoco -except ImportError: # 無頭環境可能沒有安裝 - mujoco = None - -if TYPE_CHECKING: # pragma: no cover - type hints - from state import SimulationState - - -class SimulationController: - """在獨立執行緒中運行模擬並處理所有狀態變更。""" - - def __init__(self, state: SimulationState) -> None: - self.state = state - self.sim = state.sim - self.config = state.config - - self.policy_manager = state.policy_manager_ref - self.terrain_manager = state.terrain_manager_ref - self.floating_controller = state.floating_controller_ref - self.xbox_handler = state.xbox_handler_ref - self.hardware_controller = state.hardware_controller_ref - - self._running = threading.Event() - self.thread: threading.Thread | None = None - - # 追蹤手動模式下懸浮是否已啟用 - self._manual_float_active = False - - # 初始化將在執行緒啟動後進行 - - # ------------------------------------------------------------------ - def _initialize_simulation_state(self) -> None: - if isinstance(self.sim, MockSimulation): - log.info("[MOCK] Skip simulation state initialization.") - return - - if self.terrain_manager.is_functional: - # 初始啟動時重置地形管理器,以確保中心點與高度場為最新狀態 - self.terrain_manager.reset() - self.hard_reset() - print("\n--- Simulation Started (SPACE: Pause, N: Step) ---") - - # ------------------------------------------------------------------ - def start(self) -> None: - """啟動模擬執行緒。""" - if self.thread and self.thread.is_alive(): - return - self._running.set() - self.thread = threading.Thread(target=self.run, daemon=True) - self.thread.start() - - def run(self) -> None: - """執行緒進入點:負責處理所有請求並運行模擬。""" - is_headless = isinstance(self.sim, MockSimulation) - if not is_headless: - self.sim.initialize_window_and_context() - self._initialize_simulation_state() - else: - print("[MOCK] Headless mode, skip window/context init.") - # 無頭模式不需要初始化真實模擬狀態 - - while self._running.is_set(): - with self.state.lock: - shutdown_req = self.state.shutdown_requested - should_close = shutdown_req - if not is_headless: - should_close = should_close or self.sim.should_close() - if should_close: - if shutdown_req and not is_headless and not self.sim.should_close(): - log.info("偵測到全域關閉請求,正在關閉模擬視窗...") - from glfw import set_window_should_close - set_window_should_close(self.sim.window, 1) - self._running.clear() - from nicegui import app - app.shutdown() - continue - - # 1) 先處理所有待辦請求 - self.process_requests() - - # 2) 讀取必要狀態 - with self.state.lock: - mode = self.state.control_mode - terrain_mode = self.state.terrain_mode - pos = self.state.latest_pos - single_step = self.state.single_step_mode - execute_one = self.state.execute_one_step - manual_float = self.state.manual_mode_is_floating - - if single_step and not execute_one: - self.sim.render_from_thread(self.state) - time.sleep(0.01) - continue - if execute_one: - with self.state.lock: - self.state.execute_one_step = False - - if not is_headless and mode not in ["HARDWARE_MODE", "SERIAL_MODE"]: - self._simulation_step() - - # 根據手動懸浮開關決定是否啟用懸浮控制器 - is_manual_mode = mode in ["JOINT_TEST", "MANUAL_CTRL"] - if is_manual_mode and manual_float and not self._manual_float_active: - self.floating_controller.enable(self.state.latest_pos) - self._manual_float_active = True - elif (not is_manual_mode or not manual_float) and self._manual_float_active: - self.floating_controller.disable() - self._manual_float_active = False - - self.update_derived_states_and_render(pos, terrain_mode) - - print("模擬執行緒已停止。") - - def process_requests(self) -> None: - """檢查並處理所有待處理的狀態變更請求。""" - with self.state.lock: - pending_mode = self.state.control_mode_pending - hard_reset = self.state.hard_reset_requested - soft_reset = self.state.soft_reset_requested - if pending_mode: - self.state.control_mode_pending = None - if hard_reset: - self.state.hard_reset_requested = False - if soft_reset: - self.state.soft_reset_requested = False - - if pending_mode: - self.handle_mode_change(self.state.control_mode, pending_mode) - - # 無頭模式下沒有真實模擬,跳過重置流程 - if not isinstance(self.sim, MockSimulation): - if hard_reset: - self.hard_reset() - if soft_reset: - self.soft_reset() - - def handle_mode_change(self, old_mode: str, new_mode: str) -> None: - """執行模式切換並處理硬體控制執行緒與模擬狀態。""" - self.state.set_control_mode(new_mode) - - is_headless = isinstance(self.sim, MockSimulation) - - if not is_headless: - # 離開舊模式時的物理處理 - if old_mode == "FLOATING": - self.floating_controller.disable() - elif old_mode == "MANUAL_CTRL" and self.state.manual_mode_is_floating: - self.floating_controller.disable() - self.state.manual_mode_is_floating = False - - # 進入新模式時的初始化 - if new_mode == "FLOATING": - self.floating_controller.enable(self.state.latest_pos) - elif new_mode in ["JOINT_TEST", "MANUAL_CTRL"]: - log.info(f"進入 {new_mode} 模式,重置機器人關節與速度") - self.sim.data.qpos[7:] = self.sim.default_pose.copy() - self.sim.data.qvel[6:] = 0 - if mujoco: - mujoco.mj_forward(self.sim.model, self.sim.data) - - if new_mode == "HARDWARE_MODE" and not self.hardware_controller.is_running: - log.info("派生執行緒以啟動硬體控制器...") - threading.Thread(target=self.hardware_controller.start_controller_threads, daemon=True).start() - elif old_mode == "HARDWARE_MODE" and new_mode != "HARDWARE_MODE": - if self.hardware_controller.is_running: - log.info("派生執行緒以停止硬體控制器...") - threading.Thread(target=self.hardware_controller.stop_controller_threads, daemon=True).start() - - def update_derived_states_and_render(self, pos, terrain_mode) -> None: - """更新衍生狀態並渲染場景。""" - is_headless = isinstance(self.sim, MockSimulation) - - if not is_headless and self.terrain_manager.is_functional and self.terrain_manager.needs_physics_and_scene_update: - mujoco.mj_forward(self.sim.model, self.sim.data) - mujoco.mjr_uploadHField(self.sim.model, self.sim.context, self.terrain_manager.hfield_id) - self.terrain_manager.needs_physics_and_scene_update = False - log.info("✅ 地形物理與渲染已同步更新。") - - if not is_headless: - with self.state.lock: - self.state.latest_pos = self.sim.data.body('torso').xpos.copy() - self.state.latest_quat = self.sim.data.body('torso').xquat.copy() - # 將當前關節角度複製到共享狀態,避免 UI 執行緒直接讀取 sim.data - self.state.latest_joint_positions = self.sim.data.qpos[7:].copy() - - if self.terrain_manager.is_functional: - self.terrain_manager.update(pos, terrain_mode) - - self.sim.render_from_thread(self.state) - - # ------------------------------------------------------------------ - def _simulation_step(self) -> None: - with self.state.lock: - command = self.state.command.copy() - control_mode = self.state.control_mode - tuning_params = self.state.tuning_params - - onnx_input, action_final = self.policy_manager.get_action(command) - - if control_mode == "MANUAL_CTRL": - with self.state.lock: - final_ctrl = self.state.manual_final_ctrl.copy() - elif control_mode == "JOINT_TEST": - with self.state.lock: - final_ctrl = self.sim.default_pose + self.state.joint_test_offsets - else: - final_ctrl = self.sim.default_pose + action_final * tuning_params.action_scale - - self.sim.apply_position_control(final_ctrl, tuning_params) - - with self.state.lock: - self.state.latest_onnx_input = onnx_input.flatten() - self.state.latest_action_raw = action_final - self.state.latest_final_ctrl = final_ctrl - - target_time = self.sim.data.time + self.config.control_dt - while self.sim.data.time < target_time: - if not self._running.is_set(): - break - mujoco.mj_step(self.sim.model, self.sim.data) - - # ------------------------------------------------------------------ - def stop(self) -> None: - self._running.clear() - if self.thread and self.thread.is_alive(): - self.thread.join(timeout=1) - - # ------------------------------------------------------------------ - def hard_reset(self) -> None: - """根據目前地形自動決定適當高度並執行硬重置。""" - with self.state.lock: - # 取得當前地形名稱以判斷重置高度 - terrain_name = self.terrain_manager.get_current_terrain_name_simple(self.state) - - difficult = ["Pyramid", "Stepped Pyramid"] - # 困難地形需要更高的初始高度以保證落地安全 - start_z_offset = 1.5 if terrain_name in difficult else 0.3 - - print(f"\n--- 正在執行機器人硬重置 (地形: {terrain_name}, 高度偏移: {start_z_offset}m) ---") - # 【核心修正】硬重置僅重置機器人狀態,不再重置地形 - - with self.state.lock: - if self.state.control_mode == "HARDWARE_MODE": - return - - mujoco.mj_resetData(self.sim.model, self.sim.data) - self.sim.data.qpos[0], self.sim.data.qpos[1] = 0, 0 - # 依照目前地形取得原點的高度,確保重置後不會埋在地底 - start_ground_z = self.terrain_manager.get_height_at(0, 0) - self.sim.data.qpos[2] = start_ground_z + start_z_offset - self.sim.data.qpos[3:7] = np.array([1., 0, 0, 0]) - self.sim.data.qpos[7:] = self.sim.default_pose - self.sim.data.qvel[:] = 0 - self.sim.data.ctrl[:] = self.sim.default_pose - for _ in range(10): - mujoco.mj_step(self.sim.model, self.sim.data) - - self.policy_manager.reset() - if self.state.control_mode == "FLOATING": - self.state.set_control_mode("WALKING") - self.state.reset_control_state(self.sim.data.time) - self.state.clear_command() - self.state.joint_test_offsets.fill(0.0) - self.state.manual_final_ctrl.fill(0.0) - self.state.manual_mode_is_floating = False - if self._manual_float_active: - self.floating_controller.disable() - self._manual_float_active = False - self.state.hard_reset_requested = False - mujoco.mj_forward(self.sim.model, self.sim.data) - - def soft_reset(self) -> None: - print("\n--- 正在執行空中姿態重置 ---") - with self.state.lock: - if self.state.control_mode == "HARDWARE_MODE": - return - - self.sim.data.qpos[3:7] = np.array([1., 0, 0, 0]) - self.sim.data.qpos[7:] = self.sim.default_pose - self.sim.data.qvel[:] = 0 - - self.policy_manager.reset() - self.state.clear_command() - self.state.joint_test_offsets.fill(0.0) - self.state.manual_final_ctrl.fill(0.0) - self.state.manual_mode_is_floating = False - if self._manual_float_active: - self.floating_controller.disable() - self._manual_float_active = False - mujoco.mj_forward(self.sim.model, self.sim.data) - self.state.soft_reset_requested = False - - - diff --git a/state.py b/state.py index 39837ba..67dfa47 100644 --- a/state.py +++ b/state.py @@ -1,154 +1,261 @@ -# state.py from __future__ import annotations +"""Central state module with clear operating modes.""" import numpy as np - from dataclasses import dataclass, field -from config import AppConfig -from logger import log -from typing import TYPE_CHECKING +from enum import Enum, auto import threading +from typing import TYPE_CHECKING, Any + +from utils.config import AppConfig +from utils.logger import log -# 為了型別提示,避免循環匯入 if TYPE_CHECKING: - import mujoco - from floating_controller import FloatingController - from policy import PolicyManager - from hardware_controller import HardwareController - from terrain_manager import TerrainManager - from serial_communicator import SerialCommunicator - from simulation import Simulation - from xbox_input_handler import XboxInputHandler + from core.simulation import Simulation + +# ======================== +# Mode Enumerations 模式列舉 +# ======================== +class OperatingMode(Enum): + """頂層操作模式: 模擬 or 真實硬體""" + SIMULATION = auto() + HARDWARE = auto() +class ControlSubMode(Enum): + """控制子模式, both for sim & hardware""" + WALKING = auto() # AI 走路 + FLOATING = auto() # AI 懸浮 + JOINT_TEST = auto() # 手動關節測試 + MANUAL_CTRL = auto() # 手動姿態控制 + IDLE = auto() # 待機 + +# ======================== +# Hardware State 硬體狀態 +# ======================== +@dataclass +class HardwareState: + """即時硬體相關數據""" + is_connected: bool = False + ai_is_active: bool = False + status_text: str = "Not Connected" + last_update_time: float = 0.0 # 最後一次成功接收資料的時間 + crc_error_count: int = 0 # CRC 錯誤次數 + mismatch_count: int = 0 # 欄位長度錯誤次數 + + # Teensy 端的感測值 + angular_velocity_radps: np.ndarray = field(default_factory=lambda: np.zeros(3)) + gravity_vector: np.ndarray = field(default_factory=lambda: np.zeros(3)) + linear_velocity: np.ndarray = field(default_factory=lambda: np.zeros(3)) + accelerometer: np.ndarray = field(default_factory=lambda: np.zeros(3)) + joint_positions_rad: np.ndarray = field(default_factory=lambda: np.zeros(12)) + joint_velocities_radps: np.ndarray = field(default_factory=lambda: np.zeros(12)) + + # PC 端為硬體準備的資訊 (for UI) + latest_onnx_input: np.ndarray = field(default_factory=lambda: np.array([])) + latest_action_raw: np.ndarray = field(default_factory=lambda: np.zeros(12)) + latest_final_ctrl: np.ndarray = field(default_factory=lambda: np.zeros(12)) + +# ======================== +# Tuning Params 調校參數 +# ======================== @dataclass class TuningParams: - """用於即時調整機器人控制參數的類別。""" - kp: float # P gain (Proportional gain) - kd: float # D gain (Derivative gain) - action_scale: float # 動作縮放比例 - bias: float # 力矩偏置 + kp: float + kd: float + action_scale: float + bias: float + bias_enabled: bool = False # 是否啟用偏壓力矩, 預設關閉以更安全 +# ======================== +# Simulation State 主狀態容器 +# ======================== @dataclass class SimulationState: - """Central state shared across threads.""" config: AppConfig lock: threading.Lock = field(default_factory=threading.Lock) + + operating_mode: OperatingMode = OperatingMode.SIMULATION + control_sub_mode: ControlSubMode = ControlSubMode.WALKING + + # 專用硬體狀態 + hardware: HardwareState = field(default_factory=HardwareState) + + # 模擬專用最新資料 + sim_latest_onnx_input: np.ndarray = field(default_factory=lambda: np.array([])) + sim_latest_action_raw: np.ndarray = field(default_factory=lambda: np.zeros(12)) + sim_latest_final_ctrl: np.ndarray = field(default_factory=lambda: np.zeros(12)) + sim_latest_pos: np.ndarray = field(default_factory=lambda: np.zeros(3)) + sim_latest_quat: np.ndarray = field(default_factory=lambda: np.array([1., 0., 0., 0.], dtype=np.float32)) + sim_latest_joint_positions: np.ndarray = field(default_factory=lambda: np.zeros(12)) + + # 使用者命令與調校參數 command: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float32)) tuning_params: TuningParams = field(init=False) - - hard_reset_requested: bool = False # 硬重置請求旗標 - soft_reset_requested: bool = False # 軟重置請求旗標 - - control_timer: float = 0.0 # 控制迴圈的計時器 - - sim_mode_text: str = "Initializing" # 舊的模式文字,可能可以移除 - input_mode: str = "KEYBOARD" # 當前的輸入模式 ("KEYBOARD" 或 "GAMEPAD") - control_mode: str = "WALKING" # 當前的總控制模式 (例如 "WALKING", "HARDWARE_MODE") - # 【新增】UI 執行緒若想切換模式,先將欲切換的模式寫入此處,模擬執行緒會在下一迴圈處理 - control_mode_pending: str | None = None - previous_control_mode: str = "WALKING" # 【新功能】儲存進入 SERIAL_MODE 前的模式,以便能正確返回 - - terrain_mode: str = "INFINITE" # 當前的地形模式 ("INFINITE" 或 "SINGLE") - single_terrain_index: int = 0 # 在 SINGLE 地形模式下,當前選擇的地形索引 - - latest_onnx_input: np.ndarray = field(default_factory=lambda: np.array([])) # 最新一幀的 ONNX 模型輸入向量 - latest_action_raw: np.ndarray = field(default_factory=lambda: np.zeros(12)) # 最新一幀的 ONNX 模型原始輸出 - latest_final_ctrl: np.ndarray = field(default_factory=lambda: np.zeros(12)) # 最終計算後要傳給致動器的控制指令 - latest_pos: np.ndarray = field(default_factory=lambda: np.zeros(3)) # 機器人軀幹的最新位置 - latest_quat: np.ndarray = field(default_factory=lambda: np.array([1., 0., 0., 0.])) # 機器人軀幹的最新姿態(四元數) - # 新增欄位:安全儲存最新的各關節角度,避免 UI 執行緒直接讀取 sim.data - latest_joint_positions: np.ndarray = field(default_factory=lambda: np.zeros(12)) - display_page: int = 0 # 除錯資訊顯示的當前頁碼 - num_display_pages: int = 2 # 除錯資訊的總頁數 - - # --- 【序列埠控制台模式相關狀態】 --- - - joint_test_index: int = 0 # 在關節測試模式下,當前選中的關節索引 - joint_test_offsets: np.ndarray = field(default_factory=lambda: np.zeros(12)) # 儲存各關節在測試模式下的偏移量 - - manual_ctrl_index: int = 0 # 在手動控制模式下,當前選中的關節索引 - manual_final_ctrl: np.ndarray = field(default_factory=lambda: np.zeros(12)) # 儲存手動控制模式下的最終控制角度 - manual_mode_is_floating: bool = False # 標記手動控制模式下是否啟用懸浮 - - serial_is_connected: bool = False # 標記序列埠是否已連接 - gamepad_is_connected: bool = False # 標記遊戲搖桿是否已連接 - - tuning_param_index: int = 0 # 當前選中要調整的參數索引 (Kp, Kd, etc.) - - # --- 【核心修改】將所有主要物件的參考儲存在此,使其成為全域上下文 --- - sim: 'Simulation' = None # 模擬環境物件的參考 - floating_controller_ref: 'FloatingController' = None # 懸浮控制器物件的參考 - terrain_manager_ref: 'TerrainManager' = None # 地形管理器物件的參考 - policy_manager_ref: 'PolicyManager' = None # 策略管理器物件的參考 - hardware_controller_ref: 'HardwareController' = None # 硬體控制器物件的參考 - serial_communicator_ref: 'SerialCommunicator' = None # 序列埠通訊器物件的參考 - xbox_handler_ref: 'XboxInputHandler' = None # Xbox 搖桿處理器的參考 - - available_policies: list = field(default_factory=list) # 所有可用的 ONNX 策略名稱列表 - - hardware_is_connected: bool = False # 標記硬體控制器是否已成功啟動 - hardware_ai_is_active: bool = False # 標記硬體模式下的 AI 是否已啟用 - hardware_status_text: str = "Not Connected" # 用於在 UI 上顯示的硬體狀態文字 - - single_step_mode: bool = False # 標記是否處於單步模擬模式 - execute_one_step: bool = False # 在單步模式下,請求執行下一步的旗標 - - shutdown_requested: bool = False # 新增:UI 請求程式結束 - - def __post_init__(self): - """在初始化後,根據設定檔設定初始值。""" - self.tuning_params = TuningParams(**self.config.initial_tuning_params.__dict__) # 從設定檔初始化調校參數 - self.latest_action_raw = np.zeros(self.config.num_motors) # 初始化原始動作向量 - self.latest_final_ctrl = np.zeros(self.config.num_motors) # 初始化最終控制向量 - self.manual_final_ctrl = np.zeros(self.config.num_motors) # 初始化手動控制向量 - self.latest_joint_positions = np.zeros(self.config.num_motors) # 初始化關節位置副本 - log.info("✅ SimulationState 初始化完成 (含執行緒鎖)。") - - def reset_control_state(self, sim_time: float): - """重置控制迴圈的計時器。""" - self.control_timer = sim_time # 將計時器設定為當前的模擬時間 - log.info("✅ 控制狀態已重置。") - - def clear_command(self): - """清除使用者輸入的運動指令。""" - self.command.fill(0.0) # 將指令向量全部設為 0 + input_mode: str = "KEYBOARD" + + # 地形相關設定 Terrain + terrain_mode: str = "INFINITE" # 地形模式: 無限或單一 + single_terrain_index: int = 0 # 單一地形索引 + + # 手動控制相關 + joint_test_offsets: np.ndarray = field(default_factory=lambda: np.zeros(12)) + manual_final_ctrl: np.ndarray = field(default_factory=lambda: np.zeros(12)) + manual_mode_is_floating: bool = False + joint_test_index: int = 0 # 目前選中的關節索引 + manual_ctrl_index: int = 0 # 手動控制模式下的關節索引 + + # UI 旗標 + hard_reset_requested: bool = False + soft_reset_requested: bool = False + single_step_mode: bool = False + execute_one_step: bool = False + shutdown_requested: bool = False + serial_is_connected: bool = False # 序列埠是否已連接 + gamepad_is_connected: bool = False # 搖桿是否已連接 + serial_command_buffer: str = "" # 序列埠輸入緩衝 + tuning_param_index: int = 0 # 目前調整參數索引 + display_page: int = 0 # 目前顯示頁面 + num_display_pages: int = 1 # 顯示頁面數量 + previous_control_mode: str = "WALKING" # 追蹤前一個模式 (相容舊介面) + control_mode_pending: str | None = None # 待切換模式 (相容舊介面) + + # 內部變數: 控制頻率,可動態調整 + _control_freq: float = field(init=False) + + # 外部模組參考 (References) + sim: Simulation | None = None + policy_manager_ref: Any = None + hardware_controller_ref: Any = None + serial_communicator_ref: Any = None + xbox_handler_ref: Any = None + terrain_manager_ref: Any = None + floating_controller_ref: Any = None + + def __post_init__(self) -> None: + """初始化後設置調校參數與控制頻率""" + self.tuning_params = TuningParams(**self.config.initial_tuning_params.__dict__) + self._control_freq = self.config.control_freq + log.info("✅ 重構版 SimulationState 初始化完成 (含便利方法與動態控制頻率)。") + + # ============= + # Convenience Methods 便利方法 + # ============= + def clear_command(self) -> None: + """清除使用者輸入的運動指令""" + with self.lock: + self.command.fill(0.0) log.info("運動指令已清除。") - def toggle_input_mode(self, new_mode: str, clear_cmd: bool = True): - """切換輸入模式,可選擇是否清除現有指令。""" + def toggle_input_mode(self, new_mode: str, clear_cmd: bool = True) -> None: + """切換輸入模式,可選擇是否清除指令""" with self.lock: if self.input_mode != new_mode: self.input_mode = new_mode if clear_cmd: - self.clear_command() + self.command.fill(0.0) log.info(f"輸入模式已切換至: {self.input_mode}") - - def set_control_mode(self, new_mode: str): - """更新控制模式,此函式僅變更狀態本身。""" + + def request_mode_change(self, new_op: OperatingMode, new_sub: ControlSubMode) -> None: + """統一的模式切換接口""" + with self.lock: + self.operating_mode = new_op + self.control_sub_mode = new_sub + log.info(f"模式已切換至: {new_op.name}/{new_sub.name}") + + def request_sub_mode_change(self, new_sub: ControlSubMode) -> None: + """僅改變控制子模式的便捷函式""" + with self.lock: + self.control_sub_mode = new_sub + log.info(f"控制子模式已請求切換至: {new_sub.name}") + + # -------- 動態控制頻率與週期 -------- + @property + def control_freq(self) -> float: + """取得目前控制頻率 Hz""" with self.lock: - if self.control_mode == new_mode: - return - - old_mode = self.control_mode - - if new_mode == "SERIAL_MODE": - self.previous_control_mode = old_mode - - # 僅變更模式,不做任何實際硬體或模擬操作 - self.control_mode = new_mode - log.info(f"控制模式已設定為: {self.control_mode}") - - # 進入新模式時初始化相關狀態 - if new_mode == "JOINT_TEST": - self.joint_test_offsets.fill(0.0) - elif new_mode == "MANUAL_CTRL": - initial_pose = self.sim.default_pose.copy() if hasattr(self.sim, 'default_pose') else np.zeros(self.config.num_motors) - self.manual_final_ctrl[:] = initial_pose - - # 若從手動模式回到 AI 模式,重置 AI 狀態 - is_entering_ai = new_mode in ["WALKING", "FLOATING"] - is_leaving_manual = old_mode in ["JOINT_TEST", "MANUAL_CTRL", "SERIAL_MODE"] - if is_entering_ai and is_leaving_manual: - log.info("從手動模式返回,重置 AI 狀態...") - if self.policy_manager_ref: - self.policy_manager_ref.reset() - self.clear_command() + return self._control_freq + + @control_freq.setter + def control_freq(self, value: float) -> None: + """更新控制頻率, 會同步改變 control_dt""" + if value <= 0: + return + with self.lock: + self._control_freq = value + log.info(f"控制頻率已更新為 {value} Hz (dt={self.control_dt:.4f}s)") + + @property + def control_dt(self) -> float: + """根據控制頻率計算控制週期秒數""" + with self.lock: + return 1.0 / self._control_freq if self._control_freq > 0 else float('inf') + + # ---- Legacy compatibility methods ---- + def get_control_mode_string(self) -> str: + """以舊字串格式回傳目前模式""" + if self.operating_mode == OperatingMode.HARDWARE: + return "HARDWARE_MODE" + return self.control_sub_mode.name + + def set_control_mode(self, mode: str) -> None: + """相容舊介面的模式設定函式""" + mapping = { + "WALKING": (OperatingMode.SIMULATION, ControlSubMode.WALKING), + "FLOATING": (OperatingMode.SIMULATION, ControlSubMode.FLOATING), + "JOINT_TEST": (OperatingMode.SIMULATION, ControlSubMode.JOINT_TEST), + "MANUAL_CTRL": (OperatingMode.SIMULATION, ControlSubMode.MANUAL_CTRL), + "HARDWARE_MODE": (OperatingMode.HARDWARE, ControlSubMode.IDLE), + } + with self.lock: + self.previous_control_mode = self.get_control_mode_string() + op, sub = mapping.get(mode, (self.operating_mode, self.control_sub_mode)) + self.request_mode_change(op, sub) + + # 提供舊屬性接口,避免舊模組存取失敗 + @property + def control_mode(self) -> str: # 讀取時回傳舊格式字串 + return self.get_control_mode_string() + + @control_mode.setter + def control_mode(self, mode: str) -> None: # 寫入時自動映射到新枚舉 + self.set_control_mode(mode) + + # Legacy alias properties for backward compatibility ----------------- + @property + def latest_pos(self) -> np.ndarray: + return self.sim_latest_pos + + @latest_pos.setter + def latest_pos(self, value: np.ndarray) -> None: + self.sim_latest_pos = value + + @property + def latest_joint_positions(self) -> np.ndarray: + return self.sim_latest_joint_positions + + @latest_joint_positions.setter + def latest_joint_positions(self, value: np.ndarray) -> None: + self.sim_latest_joint_positions = value + + @property + def latest_onnx_input(self) -> np.ndarray: + return self.sim_latest_onnx_input + + @latest_onnx_input.setter + def latest_onnx_input(self, value: np.ndarray) -> None: + self.sim_latest_onnx_input = value + + @property + def latest_action_raw(self) -> np.ndarray: + return self.sim_latest_action_raw + + @latest_action_raw.setter + def latest_action_raw(self, value: np.ndarray) -> None: + self.sim_latest_action_raw = value + + @property + def latest_final_ctrl(self) -> np.ndarray: + return self.sim_latest_final_ctrl + + @latest_final_ctrl.setter + def latest_final_ctrl(self, value: np.ndarray) -> None: + self.sim_latest_final_ctrl = value diff --git a/tennsy.md b/tennsy.md index 9fdd0b9..63a644c 100644 --- a/tennsy.md +++ b/tennsy.md @@ -29,7 +29,7 @@ --- -## 📖 指令參考手冊 (Command Reference - v3.4) +## 📖 指令參考手冊 (Command Reference - v3.5) ### 語法提示 (Syntax Legend) * `` : 表示需要使用者替換的變數 @@ -82,8 +82,10 @@ * `status` * 描述:打印一次性的完整狀態報告。 * `monitor ` - * 描述:設定遙測格式 (h:人類, c:CSV, d:儀表板)。 - * **注意**: `monitor c` 的 CSV 格式為單行寬格式,專為數據分析設計。 + * 描述:設定遙測格式 (h:人類, c:舊CSV, d:儀表板)。 +* `monitor csv42` + * **【推薦】** 啟動 42 欄位 CSV 串流,並在行尾附加 CRC8 校驗碼,供 PC 端 AI 控制使用。 + * **格式**:`角速度(3), 重力向量(3), 線速度(3), 加速度(3), 關節位置(12), 關節速度(12), 保留(4), CRC8` * `monitor freq ` * 描述:設定遙測數據的更新頻率 (Hz)。 * `monitor ` diff --git a/test/test_serial_utils.py b/test/test_serial_utils.py index 0cf0976..a7fe4ab 100644 --- a/test/test_serial_utils.py +++ b/test/test_serial_utils.py @@ -14,7 +14,7 @@ if repo_root not in sys.path: sys.path.insert(0, repo_root) -from serial_utils import select_serial_port, TEENSY_VID, TEENSY_PID +from utils.serial_utils import select_serial_port, TEENSY_VID, TEENSY_PID class DummyPort: def __init__(self, device, description, vid, pid): diff --git a/test/test_teensy_connection.py b/test/test_teensy_connection.py index b89a921..3136eda 100644 --- a/test/test_teensy_connection.py +++ b/test/test_teensy_connection.py @@ -17,7 +17,7 @@ if repo_root not in sys.path: sys.path.insert(0, repo_root) -from serial_utils import TEENSY_VID, TEENSY_PID # 用於辨識 Teensy 的 VID/PID +from utils.serial_utils import TEENSY_VID, TEENSY_PID # 用於辨識 Teensy 的 VID/PID def test_teensy_connection(): diff --git a/test/verify_model_mode.py b/test/verify_model_mode.py index 6d08cf9..7a8acd2 100644 --- a/test/verify_model_mode.py +++ b/test/verify_model_mode.py @@ -6,9 +6,9 @@ from pathlib import Path # --- 導入您專案的模組 --- -from config import load_config -from policy import ONNXPolicy -from observation import ObservationBuilder # 我們將使用您修改後的版本 +from utils.config import load_config +from core.policy import ONNXPolicy +from core.observation import ObservationBuilder # 我們將使用您修改後的版本 # --- 腳本設定 --- SIMULATION_DURATION = 3.0 diff --git a/ui_controller.py b/ui_controller.py index cd347e7..50d99b2 100644 --- a/ui_controller.py +++ b/ui_controller.py @@ -1,12 +1,10 @@ -from nicegui import ui, app +from nicegui import ui import numpy as np -import threading -from typing import TYPE_CHECKING, List +import time -from logger import log, log_queue +from utils.logger import log, log_queue -if TYPE_CHECKING: - from state import SimulationState +from state import SimulationState, OperatingMode, ControlSubMode class UIController: """管理 NiceGUI 介面與互動邏輯。""" @@ -76,12 +74,13 @@ def _create_main_control_panel(self): with ui.card(): ui.label('模式控制 (Control Mode)').classes('text-lg') with ui.row(): - ui.button('走路 (Walking)', on_click=lambda: self._request_mode_change("WALKING")) - ui.button('懸浮 (Floating)', on_click=lambda: self._request_mode_change("FLOATING")) - ui.button('硬體 (Hardware)', on_click=lambda: self._request_mode_change("HARDWARE_MODE")) + # 使用新的子模式切換函式 + ui.button('走路 (Walking)', on_click=lambda: self.state.request_sub_mode_change(ControlSubMode.WALKING)) + ui.button('懸浮 (Floating)', on_click=lambda: self.state.request_sub_mode_change(ControlSubMode.FLOATING)) + ui.button('硬體 (Hardware)', on_click=self._toggle_operating_mode) with ui.row(): - ui.button('關節測試 (Joint Test)', on_click=lambda: self._request_mode_change("JOINT_TEST")) - ui.button('手動控制 (Manual Ctrl)', on_click=lambda: self._request_mode_change("MANUAL_CTRL")) + ui.button('關節測試 (Joint Test)', on_click=lambda: self.state.request_sub_mode_change(ControlSubMode.JOINT_TEST)) + ui.button('手動控制 (Manual Ctrl)', on_click=lambda: self.state.request_sub_mode_change(ControlSubMode.MANUAL_CTRL)) ui.separator() ui.label('模擬播放 (Playback)').classes('text-lg') @@ -134,7 +133,7 @@ def _create_device_panel(self): with ui.card(): ui.label('硬體 AI 控制').classes('text-lg') ui.button('啟用/停用 AI (K)', on_click=self._toggle_hardware_ai).bind_enabled_from( - self.state, 'control_mode', lambda mode: mode == "HARDWARE_MODE") + self.state, 'operating_mode', lambda m: m == OperatingMode.HARDWARE) ui.separator() ui.label('設備連接').classes('text-lg') @@ -159,7 +158,7 @@ def _create_joystick_panel(self): def _create_joint_control_panel(self): """在 JOINT_TEST 或 MANUAL_CTRL 模式下顯示的關節微調面板。""" - with ui.card().bind_visibility_from(self.state, 'control_mode', lambda m: m in ["JOINT_TEST", "MANUAL_CTRL"]).classes('w-full'): + with ui.card().bind_visibility_from(self.state, 'control_sub_mode', lambda m: m in [ControlSubMode.JOINT_TEST, ControlSubMode.MANUAL_CTRL]).classes('w-full'): ui.label('關節微調 (Joint Fine-Tuning)').classes('text-lg') # 懸浮開關,適用於手動相關模式 with ui.row().classes('items-center'): @@ -200,6 +199,8 @@ def _create_status_display(self): self.status_labels['command'] = ui.label('vy: 0.00, vx: 0.00, wz: 0.00') ui.label('機器人狀態 (Robot State)').classes('font-bold') self.status_labels['robot_pos'] = ui.label('位置: [0.0, 0.0, 0.0]') + # 顯示硬體延遲與 CRC 錯誤數 + self.status_badge = ui.badge('Delay -- | CRC 0') self.status_labels['robot_vel'] = ui.label('速度: [0.0, 0.0, 0.0]') def _create_onnx_display(self): @@ -230,14 +231,26 @@ def update_ui_elements(self): """更新所有 UI 元件,先鎖定狀態取得資料,再在鎖外更新。""" # --- 在鎖內快速複製所有需要的狀態值 --- with self.state.lock: - mode = self.state.control_mode + op_mode = self.state.operating_mode + sub_mode = self.state.control_sub_mode input_mode = self.state.input_mode sim_time = self.state.sim.data.time if self.state.sim else None - serial_connected = self.state.serial_is_connected - gamepad_connected = self.state.gamepad_is_connected - hw_ai_active = self.state.hardware_ai_is_active + + # 使用實際物件狀態,避免資訊延遲 + serial_connected = self.serial_comm.is_connected if self.serial_comm else False + gamepad_connected = ( + self.state.xbox_handler_ref.controller.is_connected() + if self.state.xbox_handler_ref else False + ) + + hw_ai_active = self.state.hardware.ai_is_active command = self.state.command.copy() - pos = self.state.latest_pos.copy() + + # 根據操作模式決定位置資料來源 + if op_mode == OperatingMode.SIMULATION: + pos = self.state.sim_latest_pos.copy() + else: # HARDWARE 模式下無全域位置資訊 + pos = np.zeros(3) pm = self.policy_manager transitioning = pm.is_transitioning @@ -249,86 +262,134 @@ def update_ui_elements(self): terrain_name = self.state.terrain_manager_ref.get_current_terrain_name_simple(self.state) joint_info = None - if mode == "JOINT_TEST": - idx = self.state.joint_test_index - offset = self.state.joint_test_offsets[idx] - default_pos = self.state.sim.default_pose[idx] - target_abs = default_pos + offset - actual_abs = self.state.latest_joint_positions[idx] - joint_info = { - "mode": "offset", - "index": idx, - "target_abs": target_abs, - "actual_abs": actual_abs, - "offset": offset, - } - elif mode == "MANUAL_CTRL": - idx = self.state.manual_ctrl_index - target_abs = self.state.manual_final_ctrl[idx] - actual_abs = self.state.latest_joint_positions[idx] - joint_info = { - "mode": "absolute", - "index": idx, - "target_abs": target_abs, - "actual_abs": actual_abs, - } + # 只有在關節測試或手動模式下才需要關節資訊 + if sub_mode in [ControlSubMode.JOINT_TEST, ControlSubMode.MANUAL_CTRL]: + if op_mode == OperatingMode.SIMULATION: + actual_joint_positions = self.state.sim_latest_joint_positions + else: + actual_joint_positions = self.state.hardware.joint_positions_rad + + if sub_mode == ControlSubMode.JOINT_TEST: + idx = self.state.joint_test_index + offset = self.state.joint_test_offsets[idx] + default_pos = self.state.sim.default_pose[idx] + target_abs = default_pos + offset + actual_abs = actual_joint_positions[idx] + joint_info = { + "mode": "offset", + "index": idx, + "target_abs": target_abs, + "actual_abs": actual_abs, + "offset": offset, + } + elif sub_mode == ControlSubMode.MANUAL_CTRL: + idx = self.state.manual_ctrl_index + target_abs = self.state.manual_final_ctrl[idx] + actual_abs = actual_joint_positions[idx] + joint_info = { + "mode": "absolute", + "index": idx, + "target_abs": target_abs, + "actual_abs": actual_abs, + } # --- 在鎖外更新 UI 元件 --- - self.status_labels['mode'].set_text(f"模式: {mode}") + self.status_labels['mode'].set_text(f"模式: {op_mode.name} / {sub_mode.name}") self.status_labels['input_mode'].set_text(f"輸入: {input_mode}") if sim_time is not None: self.status_labels['sim_time'].set_text(f"時間: {sim_time:.2f}s") else: self.status_labels['sim_time'].set_text("時間: N/A") - self.status_labels['serial_status'].set_text('序列埠: Connected' if serial_connected else '序列埠: Disconnected') - self.status_labels['gamepad_status'].set_text('搖桿: Connected' if gamepad_connected else '搖桿: Disconnected') - if mode == 'HARDWARE_MODE': - self.status_labels['hardware_ai'].set_text('硬體AI: Active' if hw_ai_active else '硬體AI: Disabled') + + self.status_labels['serial_status'].set_text( + '序列埠: Connected' if serial_connected else '序列埠: Disconnected' + ) + self.status_labels['gamepad_status'].set_text( + '搖桿: Connected' if gamepad_connected else '搖桿: Disconnected' + ) + + if op_mode == OperatingMode.HARDWARE: + self.status_labels['hardware_ai'].set_text( + '硬體AI: Active' if hw_ai_active else '硬體AI: Disabled' + ) else: self.status_labels['hardware_ai'].set_text('硬體AI: N/A') - self.status_labels['command'].set_text(f"vy: {command[0]:.2f}, vx: {command[1]:.2f}, wz: {command[2]:.2f}") - self.status_labels['robot_pos'].set_text(f"位置: [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}]") + + self.status_labels['command'].set_text( + f"vy: {command[0]:.2f}, vx: {command[1]:.2f}, wz: {command[2]:.2f}" + ) + self.status_labels['robot_pos'].set_text( + f"位置: [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}]" + ) if transitioning: - policy_text = f"策略: Blending {src_policy} -> {tgt_policy} ({alpha*100:.0f}%)" + policy_text = ( + f"策略: Blending {src_policy} -> {tgt_policy} ({alpha*100:.0f}%)" + ) else: policy_text = f"策略: {primary_policy}" self.status_labels['policy_status'].set_text(policy_text) - self.status_labels['policy_selector'].set_value(primary_policy) + if self.status_labels['policy_selector'].value != primary_policy: + self.status_labels['policy_selector'].set_value(primary_policy) - # 更新地形下拉選單狀態 if self.ui_terrain_selection != terrain_name: - self.ui_terrain_selection = terrain_name + self.terrain_selector.set_value(terrain_name) - # 更新關節控制資訊 - if joint_info and self.joint_control_slider is not None: + # --- 關節控制資訊的安全更新 --- + if joint_info: idx = joint_info['index'] - self.joint_selector.set_value(idx) + if self.joint_selector.value != idx: + self.joint_selector.set_value(idx) target_abs = joint_info['target_abs'] actual_abs = joint_info['actual_abs'] - if abs(self.joint_control_slider.value - target_abs) > 1e-3: + if ( + self.joint_control_slider is not None + and self.joint_control_slider.value is not None + and abs(self.joint_control_slider.value - target_abs) > 1e-3 + ): self.joint_control_slider.set_value(target_abs) error = target_abs - actual_abs if joint_info['mode'] == 'offset': offset = joint_info['offset'] - text = f"模式: 偏移 | Offset={offset:+.2f} | Target={target_abs:+.2f} | Actual={actual_abs:+.2f} | Err={error:+.2f}" + text = ( + f"模式: 偏移 | Offset={offset:+.2f} | Target={target_abs:+.2f} | " + f"Actual={actual_abs:+.2f} | Err={error:+.2f}" + ) else: - text = f"模式: 絕對 | Target={target_abs:+.2f} | Actual={actual_abs:+.2f} | Err={error:+.2f}" + text = ( + f"模式: 絕對 | Target={target_abs:+.2f} | Actual={actual_abs:+.2f} | " + f"Err={error:+.2f}" + ) self.status_labels['joint_info'].set_text(text) + else: + # 非相關模式清空標籤內容 + self.status_labels['joint_info'].set_text('') # 更新 ONNX 標籤與日誌 self._update_onnx_labels() log_content = "\n".join(log_queue) self.log_area.set_value(log_content) + # 顯示 CRC 與延遲資訊 + if self.state.operating_mode == OperatingMode.HARDWARE: + delay = time.time() - self.state.hardware.last_update_time + crc_err = self.state.hardware.crc_error_count + self.status_badge.set_text(f"Delay {delay:.2f}s | CRC {crc_err}") + else: + self.status_badge.set_text('') + def _update_onnx_labels(self): - if self.state.latest_onnx_input.size == 0 or not self.policy_manager.get_active_recipe(): + """依據目前模式更新 ONNX 輸入顯示""" + if self.state.operating_mode == OperatingMode.SIMULATION: + obs_vec = self.state.sim_latest_onnx_input + else: + obs_vec = self.state.hardware.latest_onnx_input + if obs_vec.size == 0 or not self.policy_manager.get_active_recipe(): return recipe = self.policy_manager.get_active_recipe() - obs_vec = self.state.latest_onnx_input current_idx = 0 # 從已註冊的 policy_manager 取得各觀察元件的維度 component_dims = self.policy_manager.obs_builder.component_dims @@ -342,12 +403,6 @@ def _update_onnx_labels(self): self.onnx_input_labels[comp_name].set_text(f'{comp_name}: {vec_str}') current_idx = end_idx - def _request_mode_change(self, mode: str) -> None: - """【新增】僅設定待切換模式,由模擬執行緒在下個循環處理。""" - with self.state.lock: - self.state.control_mode_pending = mode - log.info(f"UI 請求切換模式至 {mode}") - # 【新增】「暫停/播放」按鈕的回呼函式 def _toggle_pause(self): """切換模擬的暫停/播放狀態。""" @@ -363,12 +418,24 @@ def _request_one_step(self): if self.state.single_step_mode: self.state.execute_one_step = True + def _toggle_operating_mode(self) -> None: + """在模擬與硬體模式間切換""" + with self.state.lock: + current_op = self.state.operating_mode + + if current_op == OperatingMode.SIMULATION: + self.state.request_mode_change(OperatingMode.HARDWARE, ControlSubMode.IDLE) + else: + self.state.request_mode_change(OperatingMode.SIMULATION, ControlSubMode.WALKING) + def _toggle_hardware_ai(self): - if self.hardware_controller and self.state.control_mode == 'HARDWARE_MODE': - if self.state.hardware_ai_is_active: - self.hardware_controller.disable_ai() - else: - self.hardware_controller.enable_ai() + """切換硬體端 AI 的啟用狀態""" + if self.state.operating_mode != OperatingMode.HARDWARE: + return + if self.state.hardware.ai_is_active: + self.state.request_sub_mode_change(ControlSubMode.IDLE) + else: + self.state.request_sub_mode_change(ControlSubMode.WALKING) def _request_flag_change(self, flag_name: str): """非阻塞地請求一次性操作,如重置。""" @@ -379,6 +446,8 @@ def _request_flag_change(self, flag_name: str): def _connect_serial(self): if self.serial_comm: is_connected = self.serial_comm.scan_and_connect() + # 只更新連線狀態,暫不將控制權交給硬體控制器 + # keep serial port managed by SerialCommunicator until HW mode with self.state.lock: self.state.serial_is_connected = is_connected @@ -397,33 +466,33 @@ def _request_shutdown(self) -> None: def _set_joint_index(self, index: int): """設定目前選中的關節索引。""" with self.state.lock: - if self.state.control_mode == "JOINT_TEST": + if self.state.control_sub_mode == ControlSubMode.JOINT_TEST: self.state.joint_test_index = index - elif self.state.control_mode == "MANUAL_CTRL": + elif self.state.control_sub_mode == ControlSubMode.MANUAL_CTRL: self.state.manual_ctrl_index = index def _on_joint_slider_change(self, event): """滑桿改變時即時更新目標值。""" value = event.value with self.state.lock: - if self.state.control_mode == "JOINT_TEST": + if self.state.control_sub_mode == ControlSubMode.JOINT_TEST: idx = self.state.joint_test_index # 滑桿給的是絕對角度,轉成偏移量存回 state self.state.joint_test_offsets[idx] = value - self.state.sim.default_pose[idx] - elif self.state.control_mode == "MANUAL_CTRL": + elif self.state.control_sub_mode == ControlSubMode.MANUAL_CTRL: idx = self.state.manual_ctrl_index self.state.manual_final_ctrl[idx] = value def _adjust_joint_value(self, value: float, clear: bool = False): """依目前模式調整關節值或歸零。""" with self.state.lock: - if self.state.control_mode == "JOINT_TEST": + if self.state.control_sub_mode == ControlSubMode.JOINT_TEST: idx = self.state.joint_test_index if clear: self.state.joint_test_offsets[idx] = 0.0 else: self.state.joint_test_offsets[idx] += value - elif self.state.control_mode == "MANUAL_CTRL": + elif self.state.control_sub_mode == ControlSubMode.MANUAL_CTRL: idx = self.state.manual_ctrl_index if clear: self.state.manual_final_ctrl[idx] = 0.0 diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config.py b/utils/config.py similarity index 82% rename from config.py rename to utils/config.py index ad67eda..a4265e6 100644 --- a/config.py +++ b/utils/config.py @@ -20,6 +20,14 @@ class FloatingControllerConfig: kp_attitude: float kd_attitude: float +# [新增] 級聯控制器設定,用於硬體模式 PD 外層控制 +@dataclass +class CascadeControllerConfig: + pos_kp: float + vel_kp: float + max_target_velocity_rad_s: float + torque_limit: float + @dataclass class AppConfig: """儲存所有應用程式設定的資料類別。""" @@ -31,8 +39,7 @@ class AppConfig: num_motors: int physics_timestep: float - control_freq: float - control_dt: float + control_freq: float # 控制頻率 (Hz) warmup_duration: float command_scaling_factors: List[float] @@ -42,6 +49,7 @@ class AppConfig: initial_tuning_params: TuningParamsConfig floating_controller: FloatingControllerConfig + cascade_controller: CascadeControllerConfig def load_config(path: str = "config.yaml") -> AppConfig: """ @@ -57,6 +65,7 @@ def load_config(path: str = "config.yaml") -> AppConfig: tuning_params = TuningParamsConfig(**config_data['initial_tuning_params']) floating_config = FloatingControllerConfig(**config_data['floating_controller']) + cascade_config = CascadeControllerConfig(**config_data['cascade_controller']) config_obj = AppConfig( mujoco_model_file=config_data['mujoco_model_file'], @@ -67,17 +76,17 @@ def load_config(path: str = "config.yaml") -> AppConfig: num_motors=config_data['num_motors'], physics_timestep=config_data['physics_timestep'], control_freq=config_data['control_freq'], - control_dt=1.0 / config_data['control_freq'], warmup_duration=config_data['warmup_duration'], command_scaling_factors=config_data['command_scaling_factors'], keyboard_velocity_adjust_step=config_data['keyboard_velocity_adjust_step'], gamepad_sensitivity=config_data['gamepad_sensitivity'], param_adjust_steps=config_data['param_adjust_steps'], - + initial_tuning_params=tuning_params, - floating_controller=floating_config + floating_controller=floating_config, + cascade_controller=cascade_config ) - - print("✅ 設定檔載入成功 (包含懸浮控制器設定)。") + + print("✅ 設定檔載入成功 (含 Cascade Controller 設定)。") return config_obj diff --git a/logger.py b/utils/logger.py similarity index 100% rename from logger.py rename to utils/logger.py diff --git a/serial_utils.py b/utils/serial_utils.py similarity index 100% rename from serial_utils.py rename to utils/serial_utils.py