diff --git a/config.yaml b/config.yaml index 49345e4..ace9bb5 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,11 @@ +# ================================== +# === 開發與除錯設定 (Dev & Debug Settings) +# ================================== +# 如果設為 true,硬體模式將不會連接真實的USB序列埠, +# 而是啟動一個與MuJoCo模擬器連結的「虛擬Teensy」。 +# 這允許在沒有實體硬體的情況下,完整地測試和除錯 HardwareController。 +use_virtual_teensy: false # 預設為 false,要測試時手動改為 true + # ================================== # === 模擬與模型設定 (Simulation & Model Settings) # ================================== diff --git a/main.py b/main.py index f6eb442..e602d2f 100644 --- a/main.py +++ b/main.py @@ -170,8 +170,10 @@ def soft_reset(): t_since_update = time.time() - hw_controller.hw_state_data.last_update_time conn_status = f"Data Delay: {t_since_update:.2f}s" if t_since_update < 1.0 else "Data Timeout!" state.hardware_status_text = f"Connection Status: {conn_status}\n" - state.hardware_status_text += f"LinVel: {np.array2string(hw_controller.hw_state_data.lin_vel_local, precision=2)}\n" - state.hardware_status_text += f"Gyro: {np.array2string(hw_controller.hw_state_data.imu_gyro_radps, precision=2)}" + # 顯示角速度(Gyro)資訊 + state.hardware_status_text += f"AngVel: {np.array2string(hw_controller.hw_state_data.angular_velocity_radps, precision=2)}\n" + # 顯示加速度資訊 + state.hardware_status_text += f"Accel: {np.array2string(hw_controller.hw_state_data.accelerometer_ms2, precision=2)}" else: state.hardware_status_text = "Hardware controller not running." @@ -210,4 +212,4 @@ def soft_reset(): print("\n程式已安全退出。") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/main_nicegui.py b/main_nicegui.py index 0d337cf..8a0006e 100644 --- a/main_nicegui.py +++ b/main_nicegui.py @@ -32,6 +32,7 @@ from src.controllers.simulation_controller import SimulationController from src.input_handlers.keyboard_input_handler import KeyboardInputHandler from src.core.logger import log +from src.utils.gamepad_presence_guard import start_gamepad_presence_guard # 【v4.3.2 新增】 導入新的 ObservationManager from src.simulation.observation_manager import ObservationManager @@ -88,6 +89,10 @@ def main() -> None: try: config = load_config() state = SimulationState(config) + if config.use_virtual_teensy: + # 🔌 虛擬Teensy模式:不需要實體序列埠也能進入硬體模式 + state.serial_is_connected = True + log.info("虛擬Teensy模式啟用,跳過序列埠連線檢查。") except Exception as exc: sys.exit(f"failed to initialise: {exc}") @@ -108,6 +113,8 @@ def main() -> None: xbox_handler = XboxInputHandler(state) state.xbox_handler_ref = xbox_handler + # 啟動搖桿存在守門員,只用於 UI 顯示,不影響 handler + start_gamepad_presence_guard(state) # 【v4.3.2 修改】 將 observation_manager 傳入 PolicyManager policy_manager = PolicyManager(config, observation_manager, None) # 在 NiceGUI 模式下,overlay 設為 None @@ -150,4 +157,4 @@ def cleanup_resources() -> None: if __name__ in {"__main__", "__mp_main__"}: - main() \ No newline at end of file + main() diff --git a/readme.md b/readme.md index 6d2e184..caeb21a 100644 --- a/readme.md +++ b/readme.md @@ -8,70 +8,4 @@ pip install pyserial pip install Pillow pip install scipy //運動學 hardware 需用 -pip install numpy onnxruntime PyYAML nicegui pygame pyserial Pillow scipy - -### 專案目錄結構與模組化原則 -``` -simulation_test_recoil/ -├── assets/ # 靜態資源 (Static Assets) -├── models/ # AI 模型文件 (AI Model Files) -├── pdf/ # 參考文檔 (Reference Documents) -├── src/ # 核心原始碼 (Core Source Code) -│ ├── core/ # 核心通用模組 (Core Common Modules) -│ ├── controllers/ # 主要控制器與邏輯協調 (Main Controllers & Logic Orchestration) -│ ├── simulation/ # 模擬環境相關 (Simulation Environment Specific) -│ ├── hardware/ # 硬體交互與底層AI推理 (Hardware Interaction & Low-level AI Inference) -│ ├── input_handlers/ # 用戶輸入處理 (User Input Handling) -│ ├── utils/ # 通用工具函式 (General Utility Functions) -│ └── mock/ # 模擬/測試用模組 (Mock/Test Modules) -├── test/ # 測試與輔助腳本 (Tests & Auxiliary Scripts) -├── output/ # 生成的輸出文件 (Generated Output Files) -├── .gitignore # Git 忽略文件 (Git Ignore File) -├── config.yaml # 應用程式主配置 (Main Application Configuration) -├── project_overview_config.yaml # 專案概覽工具配置 (Project Overview Tool Configuration) -├── main.py # CLI 主入口 (CLI Main Entry Point) -├── main_nicegui.py # NiceGUI UI 主入口 (NiceGUI UI Main Entry Point) -├── readme.md # 專案說明 (Project Readme) -└── tennsy.md # Teensy 韌體文檔 (Teensy Firmware Documentation) -``` - -**目錄說明與分類原則:** - -* **`assets/`**: - * **原則**: 存放模擬器使用的所有靜態資源文件,例如 MuJoCo 模型所需的 3D 網格 (`.stl`)、紋理圖檔 (`.png`) 和場景定義文件 (`.xml`)。這些文件通常在運行時被載入,但不包含任何可執行邏輯。 -* **`models/`**: - * **原則**: 專門用於存放預訓練的 AI 模型文件,例如 ONNX 格式的策略模型 (`.onnx`, `.ort`)。這些是應用程式的數據資產,而非程式碼。 -* **`pdf/`**: - * **原則**: 存放專案相關的參考文檔,如 MuJoCo 的官方文檔等。 -* **`src/`**: - * **原則**: 本專案所有核心 Python 原始碼的根目錄。所有可執行邏輯的模組都應置於此,並在此目錄內進一步細分。這確保了源碼與其他類型的文件清晰分離。 - * **`src/core/`**: - * **原則**: 存放應用程式最底層、最通用、被多個高層模組共同依賴且變動頻率較低的模組。它們是整個系統的基石。 - * **內容**: 全局配置 (`config.py`)、中央狀態管理 (`state.py`)、異步事件匯流排 (`event_system.py`)、日誌系統 (`logger.py`)。 - * **`src/controllers/`**: - * **原則**: 存放協調應用程式高層邏輯和業務流程的「指揮官」模組。它們負責監聽事件、更新中央狀態、調用低層服務,並確保各子系統的同步運行。 - * **內容**: 模擬主控制器 (`simulation_controller.py`)、硬體主控制器 (`hardware_controller.py`)、UI 邏輯控制器 (`ui_controller.py`)。 - * **`src/simulation/`**: - * **原則**: 存放與 MuJoCo 物理模擬環境直接相關的模組。這些模組通常需要直接訪問 MuJoCo 的 API 或模型數據。 - * **內容**: MuJoCo 模擬器接口 (`simulation.py`)、3D 渲染與除錯疊層 (`rendering.py`)、地形管理 (`terrain_manager.py`)、懸浮控制器 (`floating_controller.py`)、以及將被新版取代的舊觀察器 (`observation.py`)。 - * **`src/hardware/`**: - * **原則**: 存放與外部硬體設備(如 Teensy、Xbox 搖桿)進行低層通信和 AI 推理邏輯的模組。它們負責將物理世界或感測器的數據轉換為程式碼可處理的格式,或將程式碼指令轉換為硬體可執行的命令。 - * **內容**: AI 策略管理 (`policy.py`)、序列埠通信 (`serial_communicator.py`)、Xbox 搖桿接口 (`xbox_controller.py`)。 - * **`src/input_handlers/`**: - * **原則**: 專門處理用戶輸入(鍵盤、搖桿)的模組。它們的職責是將原始輸入事件翻譯為應用程式內部的標準命令或請求事件。 - * **內容**: 鍵盤輸入處理 (`keyboard_input_handler.py`)、Xbox 搖桿輸入處理 (`xbox_input_handler.py`)。 - * **`src/utils/`**: - * **原則**: 存放跨多個模塊使用的通用輔助函數或小型工具類。這些模塊通常不包含複雜的業務邏輯,而是提供可重用的功能。 - * **內容**: 序列埠選擇工具 (`serial_utils.py`)。 - * **`src/mock/`**: - * **原則**: 存放用於測試或無頭模式下使用的「模擬」或「假」的模組實現。它們提供與真實模組相同的接口,但內部邏輯是簡化或模擬的。 - * **內容**: 模擬 MuJoCo 環境和各類控制器 (`mock_simulation.py`)。 -* **`test/`**: - * **原則**: 存放所有自動化測試腳本 (`pytest` 測試) 和獨立的輔助工具/演示腳本。這些文件不屬於應用程式的核心運行邏輯,但對開發和驗證至關重要。 - * **內容**: 專案概覽生成工具 (`project_overview.py`)、序列埠控制台工具 (`test_pyserial_console.py`)、以及各種測試用例。 -* **`output/`**: - * **原則**: 專門存放腳本在運行時生成的各種輸出文件。這些文件通常不應被版本控制,因此被添加到 `.gitignore` 中。 - * **內容**: 專案概覽報告 (`project_overview_*.md/.txt`)、地形快照 (`terrain_snapshot_*.png`)。 -* **根目錄文件**: - * **原則**: 存放專案的頂層配置、主要入口點和通用文檔。力求保持根目錄的簡潔和高層次概覽。 - * **內容**: Git 忽略文件 (`.gitignore`)、應用程式主要配置 (`config.yaml`)、專案概覽工具配置 (`project_overview_config.yaml`)、主入口腳本 (`main.py`, `main_nicegui.py`)、專案總說明 (`readme.md`)、Teensy 韌體文檔 (`tennsy.md`)。 \ No newline at end of file +pip install numpy onnxruntime PyYAML nicegui pygame pyserial Pillow scipy \ No newline at end of file diff --git a/src/controllers/hardware_controller.py b/src/controllers/hardware_controller.py index 79de9f6..1985108 100644 --- a/src/controllers/hardware_controller.py +++ b/src/controllers/hardware_controller.py @@ -7,11 +7,43 @@ from src.core.logger import log import numpy as np from typing import TYPE_CHECKING -from queue import Queue, Empty -from enum import Enum, auto +from queue import Queue, Empty # 引入佇列,用於實現執行緒安全的命令傳遞 +from enum import Enum, auto # 引入枚舉,用於定義清晰的狀態和命令 +# 導入專案內部模組 +from src.core.logger import log from src.core.event_system import event_bus, EVENT_HARDWARE_AI_TOGGLE_REQUESTED +# 每 LOG_EVERY_N 筆資料列印一次,避免終端被大量輸出淹沒 +LOG_EVERY_N = 50 + + +def construct_observation_51(state, hw): + """將 34 維硬體資料 + 內部狀態組裝成 51 維觀測。""" + # linear_velocity(3):實體端無此量,虛擬模式可從模擬器取得 + if getattr(state.config, "use_virtual_teensy", False) and hasattr(state.sim, "linear_velocity_local"): + lin_vel = np.asarray(state.sim.linear_velocity_local(), dtype=np.float32) + else: + lin_vel = np.zeros(3, dtype=np.float32) + + ang_vel = np.asarray(hw.angular_velocity_radps, dtype=np.float32) + g_vec = np.asarray(hw.gravity_vector_norm, dtype=np.float32) + accel = np.asarray(hw.accelerometer_ms2, dtype=np.float32) + qpos = np.asarray(hw.joint_positions_rad, dtype=np.float32) + qvel = np.asarray(hw.joint_velocities_radps, dtype=np.float32) + + last_action = np.asarray(getattr(state, "last_action", np.zeros(12, np.float32)), dtype=np.float32) + cmd = np.asarray(getattr(state, "command", np.zeros(3, np.float32)), dtype=np.float32) + + scale = getattr(getattr(state, "tuning_params", None), "command_scale", [1.0, 1.0, 1.0]) + if isinstance(scale, (list, tuple, np.ndarray)) and len(scale) == 3: + cmd = cmd * np.asarray(scale, dtype=np.float32) + + obs = np.concatenate([lin_vel, ang_vel, g_vec, accel, qpos, qvel, last_action, cmd]).astype(np.float32) + assert obs.shape[0] == 51, f"觀測維度應為 51,實得 {obs.shape[0]}" + return obs + +# 類型檢查區塊,僅在靜態分析時執行,避免循環導入 if TYPE_CHECKING: from src.core.config import AppConfig from src.hardware.policy import PolicyManager @@ -22,18 +54,25 @@ # 這個類別的所有狀態都已遷移到 SimulationState 的 raw_... 屬性中, # 以實現統一的數據流。 + class HWCommand(Enum): - START = auto() - STOP = auto() - TOGGLE_AI = auto() + """定義可以發送給硬體控制器的命令類型。""" + + START = auto() # 啟動命令 + STOP = auto() # 停止命令 + TOGGLE_AI = auto() # 切換AI開/關命令 + class HWState(Enum): + """定義硬體控制器內部的狀態機狀態。""" + STOPPED = auto() STARTING = auto() RUNNING = auto() STOPPING = auto() FAILED = auto() + class HardwareController: """【v4.3.2 修改】管理硬體AI控制迴圈,作為原始數據提供者。""" @@ -62,151 +101,162 @@ def __init__(self, config: 'AppConfig', policy: 'PolicyManager', state: 'Simulat log.info("✅ 硬體控制器 (v4.3.2 數據流統一版) 已初始化。") def _subscribe_to_events(self): - event_bus.subscribe(EVENT_HARDWARE_AI_TOGGLE_REQUESTED, - lambda: self.command_queue.put(HWCommand.TOGGLE_AI)) - log.info(" -> HardwareController 已訂閱 AI 切換請求事件 (將發送至內部隊列)。") + """訂閱來自事件系統的外部事件,並將其轉換為內部命令放入隊列。""" + # 當收到外部的AI切換請求時,將一個TOGGLE_AI命令放入隊列,由控制迴圈處理 + event_bus.subscribe( + EVENT_HARDWARE_AI_TOGGLE_REQUESTED, + lambda: self.command_queue.put(HWCommand.TOGGLE_AI), + ) + log.info(" -> HardwareController 已訂閱 AI 切換請求事件。") def request_start(self) -> None: """(外部API, 非阻塞) 請求啟動硬體控制器。""" + # 只有在控制器處於可以啟動的狀態時才接受請求 if self.internal_state in [HWState.STOPPED, HWState.FAILED]: log.info("收到啟動請求,向控制執行緒發送 START 命令。") - self._start_threads_if_not_alive() - self.command_queue.put(HWCommand.START) + self._start_threads_if_not_alive() # 確保執行緒已啟動 + self.command_queue.put(HWCommand.START) # 發送命令 else: log.warning(f"當前狀態為 {self.internal_state.name},忽略啟動請求。") def request_stop(self) -> None: """(外部API, 非阻塞) 請求停止硬體控制器。""" + # 只有在控制器正在運行時才接受停止請求 if self.internal_state == HWState.RUNNING: log.info("收到停止請求,向控制執行緒發送 STOP 命令。") self.command_queue.put(HWCommand.STOP) else: log.warning(f"當前狀態為 {self.internal_state.name},忽略停止請求。") - + def shutdown(self): """(外部API, 阻塞) 應用程式關閉時的強制清理。""" - self._is_running_event.clear() + self._is_running_event.clear() # 通知所有執行緒退出迴圈 if self.control_thread and self.control_thread.is_alive(): - self.control_thread.join(timeout=1) + self.control_thread.join(timeout=1) # 等待執行緒結束 if self.read_thread and self.read_thread.is_alive(): self.read_thread.join(timeout=1) log.info("硬體控制器所有執行緒已關閉。") def _start_threads_if_not_alive(self): - """(內部) 確保背景執行緒被創建並啟動。""" - self._is_running_event.set() + """(內部) 確保背景執行緒只被創建和啟動一次。""" + self._is_running_event.set() # 設定運行信號 if not self.control_thread or not self.control_thread.is_alive(): - self.control_thread = threading.Thread(target=self._control_loop, daemon=True) + self.control_thread = threading.Thread( + target=self._control_loop, daemon=True + ) self.control_thread.start() log.info("硬體控制執行緒已啟動。") - + if not self.read_thread or not self.read_thread.is_alive(): - self.read_thread = threading.Thread(target=self._read_from_port, daemon=True) + self.read_thread = threading.Thread( + target=self._read_from_port, daemon=True + ) self.read_thread.start() log.info("硬體讀取執行緒已啟動。") - def _read_from_port(self): - """(執行緒) 在背景持續讀取序列埠數據。""" - log.info("[硬體讀取執行緒已啟動] 等待數據...") - while self._is_running_event.is_set(): - if self.internal_state != HWState.RUNNING or not self.ser or not self.ser.is_open: - time.sleep(0.1) - continue - - try: - if self.ser.in_waiting > 0: - line = self.ser.readline().decode('utf-8', errors='ignore').strip() - if line: - self.parse_policy_stream(line) - except (serial.SerialException, OSError): - log.error("❌ 讀取時序列埠斷開或出錯。將狀態設置為 FAILED。") - self._set_internal_state(HWState.FAILED) - break - except Exception as e: - log.error(f"❌ _read_from_port 發生未知錯誤: {e}", exc_info=True) - self._set_internal_state(HWState.FAILED) - break - time.sleep(0.01) - def _set_internal_state(self, new_state: HWState): """(內部) 安全地切換狀態機並同步到全局 State。""" if self.internal_state != new_state: log.info(f"硬體控制器狀態: {self.internal_state.name} -> {new_state.name}") self.internal_state = new_state - self.last_state_change_time = time.time() + # 將運行狀態同步到全局 state,供UI等模組讀取 with self.state.lock: - self.state.hardware_is_running = (new_state == HWState.RUNNING) + self.state.hardware_is_running = new_state == HWState.RUNNING def _control_loop(self): """(執行緒) 狀態機驅動者和命令派發中心。""" log.info("--- 硬體控制執行緒已就緒,等待命令 ---") while self._is_running_event.is_set(): + # 檢查命令隊列 try: command: HWCommand = self.command_queue.get_nowait() - if command == HWCommand.START and self.internal_state in [HWState.STOPPED, HWState.FAILED]: + if command == HWCommand.START and self.internal_state in [ + HWState.STOPPED, + HWState.FAILED, + ]: self._execute_start() - elif command == HWCommand.STOP and self.internal_state == HWState.RUNNING: + elif ( + command == HWCommand.STOP and self.internal_state == HWState.RUNNING + ): self._execute_stop() - elif command == HWCommand.TOGGLE_AI and self.internal_state == HWState.RUNNING: + elif ( + command == HWCommand.TOGGLE_AI + and self.internal_state == HWState.RUNNING + ): self._execute_toggle_ai() except Empty: - pass + pass # 隊列為空,無事發生 + # 如果處於運行狀態且AI已啟用,則執行AI決策 if self.internal_state == HWState.RUNNING and self.ai_control_active: self._perform_ai_step() - + + # 維持固定的迴圈頻率 time.sleep(1.0 / self.config.control_freq) def _execute_start(self): - """(內部, 可能阻塞) 執行啟動流程。""" + """(內部) 執行啟動流程,包含真實/虛擬模式的選擇。""" self._set_internal_state(HWState.STARTING) - if not self.serial_comm.is_connected: - log.error("❌ 硬體啟動失敗:序列埠未連接。") - self._set_internal_state(HWState.FAILED) - return - - self.ser = self.serial_comm.get_serial_connection() # 取得序列連線實體 - if not self.ser: - log.error("❌ 硬體啟動失敗:無法獲取有效連接。") - self._set_internal_state(HWState.FAILED) - return - # --- 接管與初始化 --- - log.info(f"✅ 硬體控制器已接管序列埠 {self.ser.port} 的控制權。") - self.serial_comm.is_managed_by_hardware_controller = True # 告知 serial_comm 不再管理 serial + if self.config.use_virtual_teensy: + log.info("🚀 正在啟用【虛擬 Teensy】模式...") + # 避免在模擬器缺失時發生導入錯誤,於此處動態導入 + from src.hardware.virtual_teensy import VirtualTeensy + self.ser = VirtualTeensy(self.state, rate_hz=50.0) + # 虛擬模式也視為已接管序列埠,避免 SerialCommunicator 介入 + self.serial_comm.is_managed_by_hardware_controller = True + else: + log.info("🔌 正在啟用【真實硬體】模式...") + if not self.serial_comm.is_connected: + log.error("❌ 硬體啟動失敗:序列埠未連接。") + self._set_internal_state(HWState.FAILED) + return + self.ser = self.serial_comm.get_serial_connection() + if not self.ser: + log.error("❌ 硬體啟動失敗:無法獲取有效連接。") + self._set_internal_state(HWState.FAILED) + return + log.info(f"✅ 硬體控制器已接管序列埠 {self.ser.port} 的控制權。") + self.serial_comm.is_managed_by_hardware_controller = True + # 後續初始化流程對真實/虛擬Teensy一視同仁 try: log.info(" -> 命令 Teensy 切換至 POLICY_STREAM 模式...") self.ser.write(b"monitor p\n") - time.sleep(0.1) + time.sleep(0.1) self.ser.reset_input_buffer() log.info(" -> Teensy 模式指令已發送。") self._set_internal_state(HWState.RUNNING) - self.ai_control_active = False - with self.state.lock: self.state.hardware_ai_is_active = False - except serial.SerialException as e: + self.ai_control_active = False # 啟動後AI預設關閉 + with self.state.lock: + self.state.hardware_ai_is_active = False + except (serial.SerialException, AttributeError) as e: log.error(f"❌ 發送模式指令失敗: {e}") - self.serial_comm.is_managed_by_hardware_controller = False + if not self.config.use_virtual_teensy: + self.serial_comm.is_managed_by_hardware_controller = False self._set_internal_state(HWState.FAILED) def _execute_stop(self): - """(內部, 可能阻塞) 執行停止流程。""" + """(內部) 執行停止流程。""" self._set_internal_state(HWState.STOPPING) self.ai_control_active = False - with self.state.lock: self.state.hardware_ai_is_active = False + with self.state.lock: + self.state.hardware_ai_is_active = False if self.ser and self.ser.is_open: try: log.info(" -> 命令 Teensy 停止並恢復 HUMAN 模式...") - self.ser.write(b"stop\n"); time.sleep(0.05) - self.ser.write(b"monitor h\n"); time.sleep(0.05) - except serial.SerialException as e: + self.ser.write(b"stop\n") + time.sleep(0.05) + self.ser.write(b"monitor h\n") + time.sleep(0.05) + except (serial.SerialException, AttributeError) as e: log.warning(f" -> 警告: 發送停止指令失敗: {e}") - - if self.serial_comm: - self.serial_comm.is_managed_by_hardware_controller = False - log.info(" -> 序列埠控制權已交還。") - + + # 真實與虛擬模式皆需要釋放控制權 + self.serial_comm.is_managed_by_hardware_controller = False + log.info(" -> 序列埠控制權已交還。") + self.ser = None self._set_internal_state(HWState.STOPPED) @@ -215,14 +265,16 @@ def _execute_toggle_ai(self): self.ai_control_active = not self.ai_control_active with self.state.lock: self.state.hardware_ai_is_active = self.ai_control_active - + log.info(f"🤖 硬體 AI 控制已 {'啟用' if self.ai_control_active else '暫停'}.") - + if self.ai_control_active: - self.policy.reset() + self.policy.reset() # 啟用時重置策略歷史 elif self.ser and self.ser.is_open: - try: self.ser.write(b"stop\n") - except serial.SerialException as e: log.error(f"發送停止指令失敗: {e}") + try: + self.ser.write(b"stop\n") # 暫停時發送停止指令 + except (serial.SerialException, AttributeError) as e: + log.error(f"發送停止指令失敗: {e}") # 【v4.3.2 修改】 _perform_ai_step 方法 def _perform_ai_step(self): @@ -247,8 +299,9 @@ def _perform_ai_step(self): command_to_send = f"move all {action_str}\n" if self.ser and self.ser.is_open: - try: self.ser.write(command_to_send.encode('utf-8')) - except serial.SerialException: + try: + self.ser.write(command_to_send.encode("utf-8")) + except (serial.SerialException, AttributeError): log.error("AI 步驟中發送指令失敗,連接可能已斷開。") self._set_internal_state(HWState.FAILED) @@ -259,8 +312,9 @@ def parse_policy_stream(self, line: str): 解析來自Teensy的數據流,並將原始數據直接寫入中央的SimulationState。 """ try: - parts = line.split(',') - if len(parts) != 34: return + parts = line.split(",") + if len(parts) != 34: + return data_vec = np.array(parts, dtype=np.float32) # 使用全局 state.lock 保護對 SimulationState 的寫入 @@ -278,7 +332,7 @@ def parse_policy_stream(self, line: str): # (例如,在硬體模式下,linear_velocity 可能為零) except (ValueError, IndexError): - pass # 在高頻率流中,忽略解析錯誤比打印日誌更好 + pass # 在高頻率流中,靜默忽略單次的解析錯誤 # 【v4.3.2 刪除】 construct_observation 方法 # 此方法的職責已完全轉移給 ObservationManager。 \ No newline at end of file diff --git a/src/controllers/simulation_controller.py b/src/controllers/simulation_controller.py index 462dc9b..295cf3b 100644 --- a/src/controllers/simulation_controller.py +++ b/src/controllers/simulation_controller.py @@ -6,7 +6,6 @@ from src.core.logger import log import numpy as np -from src.mock.mock_simulation import MockSimulation from src.core.event_system import ( event_bus, @@ -88,7 +87,7 @@ def _subscribe_to_events(self): # ------------------------------------------------------------------ def _initialize_simulation_state(self) -> None: - if isinstance(self.sim, MockSimulation): + if self.sim.__class__.__name__ == "MockSimulation": log.info("[MOCK] Skip simulation state initialization.") return @@ -112,8 +111,9 @@ def run(self) -> None: """ 【v4.0.2 修改版】執行緒主迴圈。 """ - is_headless = isinstance(self.sim, MockSimulation) - if not is_headless: + # 以類名判斷是否為 MockSimulation,避免直接匯入造成 NameError + is_headless = self.sim.__class__.__name__ == "MockSimulation" + if not is_headless and hasattr(self.sim, "initialize_window_and_context"): self.sim.initialize_window_and_context() self._initialize_simulation_state() @@ -165,15 +165,17 @@ def run(self) -> None: if execute_one: with self.state.lock: self.state.execute_one_step = False - # 【v4.0.2 修正】UX 優化 - is_simulation_active = not is_headless and mode not in ["HARDWARE_MODE", "SERIAL_MODE"] - + # 【v4.0.3 修正】虛擬Teensy下的硬體模式仍須推進模擬 + is_hw_mode = mode in ["HARDWARE_MODE", "SERIAL_MODE"] + use_virtual = getattr(self.config, "use_virtual_teensy", False) + is_simulation_active = (not is_headless) and (not is_hw_mode or use_virtual) + if is_simulation_active: # 【模擬活動模式】: 執行物理計算,然後更新狀態並渲染完整畫面 self._simulation_step() self.update_derived_states_and_render() - elif not is_headless: - # 【模擬非活動模式 (硬體/序列埠)】: + elif not is_headless and hasattr(self.sim, "poll_window_events"): + # 【模擬非活動模式 (真實硬體/序列埠)】: # 不執行任何物理或渲染計算,只處理視窗事件以保持響應。 self.sim.poll_window_events() # 加入一個非常短的休眠,以防止此迴圈在空閒時吃掉100%的CPU核心。 @@ -218,7 +220,7 @@ def _handle_mode_change(self, new_mode: str): # 【v4.0.2 新增】在完成模式切換後,如果進入了非模擬模式, # 我們主動渲染一次“凍結幀”,以確保UI上顯示的是正確的遮罩和文字。 if new_mode in ["HARDWARE_MODE", "SERIAL_MODE"]: - if not isinstance(self.sim, MockSimulation): + if self.sim.__class__.__name__ != "MockSimulation": log.info(f"渲染 '{new_mode}' 的凍結畫面...") self.sim.render_from_thread(self.state) @@ -338,6 +340,12 @@ def on_device_connect_requested(self, device: str): """處理設備連接請求。""" log.info(f"接收到連接 '{device}' 的請求...") if device == "serial" and self.serial_comm: + if self.config.use_virtual_teensy: + # 虛擬Teensy模式下,序列埠連線純屬虛構 + log.info("使用虛擬Teensy,跳過序列埠掃描與連線。") + with self.state.lock: + self.state.serial_is_connected = True + return is_connected = self.serial_comm.scan_and_connect() with self.state.lock: self.state.serial_is_connected = is_connected @@ -459,7 +467,7 @@ def update_derived_states_and_render(self) -> None: 【v4.0】更新所有依賴於核心物理狀態的衍生狀態(如地形),並渲染場景。 此方法現在自給自足,直接從 self.state 獲取所需數據。 """ - is_headless = isinstance(self.sim, MockSimulation) + is_headless = self.sim.__class__.__name__ == "MockSimulation" # 步驟 1: 在函式內部,從 state 中讀取本幀需要的所有數據 with self.state.lock: @@ -494,38 +502,17 @@ def _simulation_step(self) -> None: control_mode = self.state.control_mode tuning_params = self.state.tuning_params - onnx_input, action_final = self.policy_manager.get_action(command) - - # [保留] 根據模式計算最終控制指令的邏輯不變 - if control_mode == "MANUAL_CTRL": - with self.state.lock: - final_ctrl = self.state.manual_final_ctrl.copy() - elif control_mode == "JOINT_TEST": - with self.state.lock: - final_ctrl = self.sim.default_pose + self.state.joint_test_offsets - else: - final_ctrl = self.sim.default_pose + action_final * tuning_params.action_scale - - # [保留] 應用 PD 控制的邏輯不變 - self.sim.apply_position_control(final_ctrl, tuning_params) - - # [保留] 更新 UI 顯示用的數據的邏輯不變 - with self.state.lock: - self.state.latest_onnx_input = onnx_input.flatten() - self.state.latest_action_raw = action_final - self.state.latest_final_ctrl = final_ctrl - - # [保留] 執行物理模擬的迴圈不變 + # 1. 首先,推進 MuJoCo 物理模擬到目標時間。 + # 這會更新 sim.data 中的所有物理狀態。 target_time = self.sim.data.time + self.config.control_dt while self.sim.data.time < target_time: if not self._running.is_set(): break - # 這是物理引擎的核心步驟 mujoco.mj_step(self.sim.model, self.sim.data) - # 【v4.3.1 新增】 - 將原始物理數據寫入 State - # 在 mj_step 之後,sim.data 中包含了最新的物理狀態,我們將其寫入 state.raw_... - # 作為 ObservationManager 的數據源。 + # 2. 【v4.3.1 新增】 - 將原始物理數據寫入 State + # 在 mj_step 之後,sim.data 中包含了最新的物理狀態,我們將其寫入 state.raw_... + # 作為 ObservationManager 的數據源。 with self.state.lock: # 讀取軀幹的姿態四元數 self.state.raw_torso_quat = self.sim.data.body('torso').xquat.copy() @@ -544,6 +531,39 @@ def _simulation_step(self) -> None: # 如果感測器不存在,用零填充 self.state.raw_accelerometer.fill(0.0) + # 在這裡也更新 last_action,因為它是 ObservationManager 讀取的一部分 + # PolicyManager 的 get_action 會將 action_final 賦值給 self.last_action + # 但在這裡,我們確保 raw_last_action 作為 ObservationManager 的數據源被更新 + # 考慮到 PolicyManager.get_action 會更新其自身的 last_action, + # 並且 SimulationState 的 on_tick_update 已經負責將 PolicyManager 傳入的 action_raw 賦值給 state.raw_last_action + # 這裡就不重複賦值了,讓 PolicyManager 在其執行完成後,透過 EVENT_SIMULATION_TICK 事件 + # 將其計算出的 `action_final` 傳遞給 `SimulationState.on_tick_update` 並更新 `state.raw_last_action`。 + # 所以,這裡只需要確保 `raw_` 物理數據是最新即可。 + + # 3. 獲取 AI 動作。現在 ObservationManager 可以從已更新的 state.raw_... 中讀取數據了。 + onnx_input, action_final = self.policy_manager.get_action(command) + + # [保留] 根據模式計算最終控制指令的邏輯不變 + if control_mode == "MANUAL_CTRL": + with self.state.lock: + final_ctrl = self.state.manual_final_ctrl.copy() + elif control_mode == "JOINT_TEST": + with self.state.lock: + final_ctrl = self.sim.default_pose + self.state.joint_test_offsets + else: + final_ctrl = self.sim.default_pose + action_final * tuning_params.action_scale + + # [保留] 應用 PD 控制的邏輯不變 + self.sim.apply_position_control(final_ctrl, tuning_params) + + # [保留] 更新 UI 顯示用的數據的邏輯不變 + with self.state.lock: + self.state.latest_onnx_input = onnx_input.flatten() + self.state.latest_action_raw = action_final + self.state.latest_final_ctrl = final_ctrl + # 【v4.3.1 修正】確保 PolicyManager 的原始輸出也傳遞給 state.raw_last_action + self.state.raw_last_action = action_final # 現在這裡更新 raw_last_action 是正確的,因為 action_final 是 PolicyManager 的原始輸出 + # ------------------------------------------------------------------ diff --git a/src/controllers/ui_controller.py b/src/controllers/ui_controller.py index b85274a..ee360b4 100644 --- a/src/controllers/ui_controller.py +++ b/src/controllers/ui_controller.py @@ -3,6 +3,7 @@ import threading from typing import TYPE_CHECKING, List from src.core.logger import log, log_queue +from src.controllers.hardware_controller import HWState # 導入硬體狀態枚舉 # [新增] 導入我們新創建的事件系統模組和所有UI會用到的事件名稱 # 解釋: @@ -263,7 +264,12 @@ def _create_status_display(self): self.status_labels['input_mode'] = ui.label('輸入: KEYBOARD') self.status_labels['sim_time'] = ui.label('時間: 0.00s') self.status_labels['serial_status'] = ui.label('序列埠: Disconnected') - self.status_labels['gamepad_status'] = ui.label('搖桿: Disconnected') + # 透過 Gamepad Presence Guard 綁定 UI 顯示 + self.status_labels['gamepad_status'] = ui.label().bind_text_from( + self.state, + 'ui_gamepad_connected', + lambda v: '搖桿: Connected' if v else '搖桿: Disconnected', + ) self.status_labels['hardware_ai'] = ui.label('硬體AI: N/A') self.status_labels['policy_status'] = ui.label(f'策略: {self.policy_manager.primary_policy_name}') ui.separator() @@ -321,13 +327,15 @@ def update_ui_elements(self): input_mode = self.state.input_mode sim_time = self.state.sim.data.time if self.state.sim else None serial_connected = self.state.serial_is_connected - gamepad_connected = self.state.gamepad_is_connected hw_running = self.state.hardware_is_running hw_ai_active = self.state.hardware_ai_is_active + hw_internal = self.hardware_controller.internal_state if self.hardware_controller else None command = self.state.command.copy() pos = self.state.latest_pos.copy() + # 取出最新的線速度資料(世界座標),若無資料則以零向量表示 + vel = getattr(self.state, 'raw_torso_linear_velocity_world', np.zeros(3)).copy() # --- AI 策略狀態 --- pm = self.policy_manager @@ -367,11 +375,14 @@ def update_ui_elements(self): # 直接從 hardware_controller 讀取其內部狀態來更新 UI hw_mode_active = self.state.control_mode == 'HARDWARE_MODE' ai_status_text = '硬體AI: N/A' - if hw_running and hw_mode_active: - ai_status_text = '硬體AI: Active' if hw_ai_active else '硬體AI: Disabled' - elif hw_mode_active and not hw_running: - ai_status_text = '硬體AI: Starting...' # 或 'Failed' - + if hw_mode_active: + if hw_running: + ai_status_text = '硬體AI: Active' if hw_ai_active else '硬體AI: Disabled' + elif hw_internal == HWState.FAILED: + ai_status_text = '硬體AI: Failed' + else: + ai_status_text = '硬體AI: Starting...' + self.status_labels['hardware_ai'].set_text(ai_status_text) @@ -382,10 +393,10 @@ def update_ui_elements(self): self.status_labels['input_mode'].set_text(f"輸入: {input_mode}") self.status_labels['sim_time'].set_text(f"時間: {sim_time:.2f}s" if sim_time is not None else "時間: N/A") self.status_labels['serial_status'].set_text('序列埠: Connected' if serial_connected else '序列埠: Disconnected') - self.status_labels['gamepad_status'].set_text('搖桿: Connected' if gamepad_connected else '搖桿: Connected') - self.status_labels['hardware_ai'].set_text('硬體AI: Active' if hw_ai_active else '硬體AI: Disabled' if mode == 'HARDWARE_MODE' else '硬體AI: N/A') + # hardware_ai 的文字已在上方統一處理,這裡不再覆寫 self.status_labels['command'].set_text(f"vy: {command[0]:.2f}, vx: {command[1]:.2f}, wz: {command[2]:.2f}") self.status_labels['robot_pos'].set_text(f"位置: [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}]") + self.status_labels['robot_vel'].set_text(f"速度: [{vel[0]:.2f}, {vel[1]:.2f}, {vel[2]:.2f}]") # --- 更新 AI 策略相關 UI --- policy_text = f"策略: Blending {src_policy} -> {tgt_policy} ({alpha*100:.0f}%)" if transitioning else f"策略: {primary_policy}" diff --git a/src/core/config.py b/src/core/config.py index ad67eda..760a008 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -1,83 +1,125 @@ -# config.py -import yaml -from dataclasses import dataclass, field -from typing import Dict, List, Any +# src/core/config.py + +import yaml # 引入 PyYAML 函式庫,用於解析 YAML 格式的設定檔 +from dataclasses import dataclass, field # 引入 dataclasses,方便快速建立資料類別 +from typing import Dict, List, Any # 引入類型提示,增強程式碼可讀性與健壯性 + @dataclass class TuningParamsConfig: """從設定檔載入的初始調校參數資料類別。""" - kp: float - kd: float - action_scale: float - bias: float + + # 此類別對應 config.yaml 中的 initial_tuning_params 區塊 + kp: float # PD 控制器中的比例增益 (Proportional gain),代表剛度 + kd: float # PD 控制器中的微分增益 (Derivative gain),代表阻尼 + action_scale: float # AI 模型輸出的動作縮放比例 + bias: float # 施加到馬達的額外力矩偏置 + @dataclass class FloatingControllerConfig: """懸浮控制器的設定。""" - target_height: float - kp_vertical: float - kd_vertical: float - kp_attitude: float - kd_attitude: float + + # 此類別對應 config.yaml 中的 floating_controller 區塊 + target_height: float # 機器人懸浮時,身體距離地面的目標高度 (單位:公尺) + kp_vertical: float # 垂直方向的 PD 控制器 - P 增益 + kd_vertical: float # 垂直方向的 PD 控制器 - D 增益 + kp_attitude: float # 姿態(滾轉/俯仰)的 PD 控制器 - P 增益 + kd_attitude: float # 姿態(滾轉/俯仰)的 PD 控制器 - D 增益 + @dataclass class AppConfig: - """儲存所有應用程式設定的資料類別。""" - mujoco_model_file: str - + """ + 儲存所有應用程式設定的資料類別。 + 這個類別作為一個集中的設定容器,將從 YAML 檔案讀取的設定值結構化。 + """ + + # --- 開發與除錯設定 --- + use_virtual_teensy: bool # 是否啟用虛擬Teensy,用於無硬體測試 + + # --- 檔案路徑 --- + mujoco_model_file: str # MuJoCo 主要場景的 XML 檔案路徑 + # 【修改】使用新的 onnx_models 結構,可以包含路徑和配方 + # 一個字典,儲存所有可用的 ONNX 模型資訊,鍵為模型暱稱 onnx_models: Dict[str, Dict[str, Any]] - policy_transition_duration: float - - num_motors: int - physics_timestep: float - control_freq: float - control_dt: float - warmup_duration: float - command_scaling_factors: List[float] - - keyboard_velocity_adjust_step: float - gamepad_sensitivity: Dict[str, float] - param_adjust_steps: Dict[str, float] - - initial_tuning_params: TuningParamsConfig - floating_controller: FloatingControllerConfig + + # --- 模擬與控制參數 --- + policy_transition_duration: float # AI 策略模型之間平滑切換所需的秒數 + num_motors: int # 機器人的馬達(致動器)數量 + physics_timestep: float # MuJoCo 物理引擎的模擬時間步長 (dt) + control_freq: float # 控制迴圈的頻率 (Hz),即每秒執行控制邏輯的次數 + control_dt: float # 控制迴圈的時間間隔 (秒),由 1.0 / control_freq 計算而來 + warmup_duration: float # (已棄用) 預留的暖機時間 + command_scaling_factors: List[ + float + ] # 將使用者輸入指令 (如搖桿) 縮放到 ONNX 模型期望範圍的係數 + + # --- 輸入處理參數 --- + keyboard_velocity_adjust_step: float # 使用鍵盤控制時,每按一下增加/減少的速度值 + gamepad_sensitivity: Dict[str, float] # 遊戲搖桿各個軸的靈敏度設定 + param_adjust_steps: Dict[str, float] # 在 UI 或透過搖桿調整參數時,每按一下的步進值 + + # --- 結構化設定 --- + initial_tuning_params: TuningParamsConfig # 初始 PD 控制參數的實例 + floating_controller: FloatingControllerConfig # 懸浮控制器設定的實例 + def load_config(path: str = "config.yaml") -> AppConfig: """ 從 YAML 檔案載入設定並回傳一個 AppConfig 物件。 + + Args: + path (str): YAML 設定檔的路徑。預設為 "config.yaml"。 + + Returns: + AppConfig: 一個包含所有應用程式設定的實例。 + + Raises: + FileNotFoundError: 如果設定檔不存在。 + IOError: 如果讀取或解析檔案時發生錯誤。 """ try: - with open(path, 'r', encoding='utf-8') as f: + # 使用 'with' 陳述式確保檔案會被正確關閉 + with open(path, "r", encoding="utf-8") as f: + # yaml.safe_load 可以安全地解析 YAML 內容,避免執行任意程式碼 config_data = yaml.safe_load(f) except FileNotFoundError: + # 如果檔案找不到,拋出一個更具描述性的錯誤 raise FileNotFoundError(f"設定檔 '{path}' 不存在。請確保檔案路徑正確。") except Exception as e: + # 捕捉其他可能的錯誤,如權限問題或 YAML 格式錯誤 raise IOError(f"讀取或解析設定檔 '{path}' 時發生錯誤: {e}") - tuning_params = TuningParamsConfig(**config_data['initial_tuning_params']) - floating_config = FloatingControllerConfig(**config_data['floating_controller']) - + # 將 YAML 中讀取的字典資料,實例化為對應的 dataclass 物件 + # 這提供了型別檢查和自動完成的好處 + tuning_params = TuningParamsConfig(**config_data["initial_tuning_params"]) + floating_config = FloatingControllerConfig(**config_data["floating_controller"]) + + # 建立最終的 AppConfig 物件,將所有解析後的設定組合起來 config_obj = AppConfig( - mujoco_model_file=config_data['mujoco_model_file'], - - onnx_models=config_data['onnx_models'], - policy_transition_duration=config_data.get('policy_transition_duration', 0.5), - - num_motors=config_data['num_motors'], - physics_timestep=config_data['physics_timestep'], - control_freq=config_data['control_freq'], - control_dt=1.0 / config_data['control_freq'], - warmup_duration=config_data['warmup_duration'], - command_scaling_factors=config_data['command_scaling_factors'], - - keyboard_velocity_adjust_step=config_data['keyboard_velocity_adjust_step'], - gamepad_sensitivity=config_data['gamepad_sensitivity'], - param_adjust_steps=config_data['param_adjust_steps'], - + # 使用 .get() 方法安全地讀取可選的設定值,如果不存在則使用預設值 + use_virtual_teensy=config_data.get("use_virtual_teensy", False), + mujoco_model_file=config_data["mujoco_model_file"], + onnx_models=config_data["onnx_models"], + policy_transition_duration=config_data.get("policy_transition_duration", 0.5), + num_motors=config_data["num_motors"], + physics_timestep=config_data["physics_timestep"], + control_freq=config_data["control_freq"], + # 在載入時直接計算出 control_dt,方便後續使用 + control_dt=1.0 / config_data["control_freq"], + warmup_duration=config_data["warmup_duration"], + command_scaling_factors=config_data["command_scaling_factors"], + keyboard_velocity_adjust_step=config_data["keyboard_velocity_adjust_step"], + gamepad_sensitivity=config_data["gamepad_sensitivity"], + param_adjust_steps=config_data["param_adjust_steps"], + # 賦予先前建立的結構化設定物件 initial_tuning_params=tuning_params, - floating_controller=floating_config + floating_controller=floating_config, ) - + + # 在控制台打印成功訊息,方便除錯 print("✅ 設定檔載入成功 (包含懸浮控制器設定)。") + # 回傳完整的設定物件 return config_obj diff --git a/src/core/state.py b/src/core/state.py index 3a318a5..429eb4d 100644 --- a/src/core/state.py +++ b/src/core/state.py @@ -82,21 +82,16 @@ class SimulationState: latest_onnx_input: np.ndarray = field(default_factory=lambda: np.array([])) latest_action_raw: np.ndarray = field(default_factory=lambda: np.zeros(12)) latest_final_ctrl: np.ndarray = field(default_factory=lambda: np.zeros(12)) + # 硬體輸入解析後的最新 51 維觀測向量 + latest_observation_51: np.ndarray = field(default_factory=lambda: np.zeros(51)) + # 上一次送出的動作 (供觀測使用) + last_action: np.ndarray = field(default_factory=lambda: np.zeros(12)) # --- 物理狀態 (由主驅動者直接更新) --- # 這些是物理世界的最直接反映,由擁有 sim.data 的模組 (SimulationController) 直接更新 latest_pos: np.ndarray = field(default_factory=lambda: np.zeros(3)) latest_quat: np.ndarray = field(default_factory=lambda: np.array([1., 0., 0., 0.])) latest_joint_positions: np.ndarray = field(default_factory=lambda: np.zeros(12)) - # [新增] v4.3.1 原始感測器數據 (由數據源更新,供 ObservationManager 使用) - # 這是原始數據的暫存區,由 SimulationController 或 HardwareController 填充。 - raw_torso_quat: np.ndarray = field(default_factory=lambda: np.array([1., 0., 0., 0.])) - raw_torso_linear_velocity_world: np.ndarray = field(default_factory=lambda: np.zeros(3)) - raw_torso_angular_velocity_world: np.ndarray = field(default_factory=lambda: np.zeros(3)) - raw_joint_positions: np.ndarray = field(default_factory=lambda: np.zeros(12)) - raw_joint_velocities: np.ndarray = field(default_factory=lambda: np.zeros(12)) - raw_accelerometer: np.ndarray = field(default_factory=lambda: np.zeros(3)) - raw_last_action: np.ndarray = field(default_factory=lambda: np.zeros(12)) # --- 請求旗標 (由 UI/輸入 發起,由主驅動者處理) --- hard_reset_requested: bool = False @@ -125,6 +120,8 @@ class SimulationState: # --- 設備連接與狀態 --- serial_is_connected: bool = False gamepad_is_connected: bool = False + # 由 Gamepad Presence Guard 更新,僅用於 UI 顯示 + ui_gamepad_connected: bool = False hardware_is_running: bool = False hardware_ai_is_active: bool = False hardware_status_text: str = "Not Connected" @@ -256,4 +253,4 @@ def toggle_input_mode(self, new_mode: str, clear_cmd: bool = True): self.input_mode = new_mode if clear_cmd: self.clear_command() - log.info(f"輸入模式已切換至: {self.input_mode}") \ No newline at end of file + log.info(f"輸入模式已切換至: {self.input_mode}") diff --git a/src/hardware/virtual_teensy.py b/src/hardware/virtual_teensy.py new file mode 100644 index 0000000..7a70d95 --- /dev/null +++ b/src/hardware/virtual_teensy.py @@ -0,0 +1,104 @@ +import time +import numpy as np + + +class VirtualTeensy: + """ + 模擬 Teensy 的序列埠介面 (serial-like interface): + - write(): 處理 monitor/move 等命令 + - readline(): 以固定頻率輸出一行 34 維資料 + """ + + def __init__(self, state, rate_hz: float = 50.0): + self.state = state + self.is_open = True + self.mode = "HUMAN" # or "POLICY_STREAM" + self._last_read_time = 0.0 + # 以 50Hz 為基準的最小間隔 + self._min_interval = 1.0 / float(rate_hz) + + # ---- 指令處理 ---- + def write(self, data: bytes) -> int: + try: + cmd = data.decode("utf-8").strip() + except Exception: + return len(data) + + if cmd.startswith("monitor"): + # monitor p -> 開始串流;monitor h -> 停止串流 + self.mode = "POLICY_STREAM" if "p" in cmd else "HUMAN" + + elif cmd == "stop": + sim = getattr(self.state, "sim", None) + if sim is not None and hasattr(sim, "data"): + # 將控制目標鎖在目前關節角度 + sim.data.ctrl[:] = sim.data.qpos[7:] + + elif cmd.startswith("move all"): + # move all <12個角度> + parts = cmd.split() + vals = parts[2:] if len(parts) >= 14 else [] + if len(vals) == 12: + tgt = np.array([float(v) for v in vals], dtype=np.float32) + scale = float(getattr(self.state, "action_scale", 1.0)) + sim = getattr(self.state, "sim", None) + if sim is not None and hasattr(sim, "data"): + base = getattr(sim, "default_pose", np.zeros(12, dtype=np.float32)) + sim.data.ctrl[:] = base + tgt * scale + + return len(data) + + def reset_input_buffer(self) -> None: + pass + + def close(self) -> None: + self.is_open = False + + # ---- 串流輸出(50 Hz)---- + def readline(self) -> bytes: + if not self.is_open or self.mode != "POLICY_STREAM": + return b"" + + now = time.perf_counter() + if now - self._last_read_time < self._min_interval: + return b"" + self._last_read_time = now + + sim = getattr(self.state, "sim", None) + + # 角速度(3)、重力向量(3)、加速度計(3)、俯仰(1)、關節角(12)、關節角速(12) = 34 + ang_vel = self._safe_call(sim, "imu_gyro_local", 3) # rad/s + g_vec = self._safe_call(sim, "gravity_local", 3, default=[0, 0, 1.0]) # 單位向量 + acc = self._safe_call(sim, "imu_accel_local", 3) # m/s^2 + pitch = float(getattr(sim, "pitch_rad", 0.0)) + qpos = self._safe_call(sim, "joint_positions", 12) # rad + qvel = self._safe_call(sim, "joint_velocities", 12) # rad/s + + vec = np.concatenate([ + ang_vel, + g_vec, + acc, + np.array([pitch], dtype=np.float32), + qpos, + qvel, + ]).astype(np.float32) + + if vec.shape[0] != 34: + return b"" + + line = ",".join(f"{x:.6f}" for x in vec) + "\n" + return line.encode("utf-8") + + @staticmethod + def _safe_call(sim, name: str, n: int, default=None): + """安全地呼叫模擬器方法,若失敗則回傳預設值。""" + if default is None: + default = [0.0] * n + if sim is None: + return np.array(default, dtype=np.float32) + fn = getattr(sim, name, None) + if callable(fn): + out = np.asarray(fn(), dtype=np.float32) + if out.shape[0] == n: + return out + return np.array(default, dtype=np.float32) diff --git a/src/utils/gamepad_presence_guard.py b/src/utils/gamepad_presence_guard.py new file mode 100644 index 0000000..b6c9da1 --- /dev/null +++ b/src/utils/gamepad_presence_guard.py @@ -0,0 +1,26 @@ +import threading +import time + + +def start_gamepad_presence_guard(state, interval_sec: float = 0.5): + """啟動搖桿存在偵測守門員,定期更新 state.ui_gamepad_connected。""" + try: + import pygame + except Exception: + return None # 沒有 pygame 模組就直接略過 + + state.ui_gamepad_connected = False + + def _loop(): + pygame.joystick.init() + while True: + try: + n = pygame.joystick.get_count() + state.ui_gamepad_connected = n > 0 + except Exception: + state.ui_gamepad_connected = False + time.sleep(interval_sec) + + t = threading.Thread(target=_loop, daemon=True) + t.start() + return t diff --git a/tools/validate_virtual_teensy_50hz.py b/tools/validate_virtual_teensy_50hz.py new file mode 100644 index 0000000..3e54358 --- /dev/null +++ b/tools/validate_virtual_teensy_50hz.py @@ -0,0 +1,108 @@ +import os +import sys +import time +import numpy as np + +# 為了能從工具資料夾直接執行,將專案根目錄加入模組搜尋路徑 +# Append project root to module search path for direct execution +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) + +from src.hardware.virtual_teensy import VirtualTeensy +from src.core.state import SimulationState +from src.controllers.hardware_controller import construct_observation_51 +from src.core.config import TuningParamsConfig # 導入調校參數資料類別 + + +class _DummySim: + """極簡模擬器,用於驗證虛擬Teensy輸出格式。""" + + def __init__(self): + self._t = 0.0 + + def step(self): + self._t += 0.02 + + def imu_gyro_local(self): + return np.array([0.0, 0.0, np.sin(self._t)], dtype=np.float32) + + def gravity_local(self): + return np.array([0.0, 0.0, 1.0], dtype=np.float32) + + def imu_accel_local(self): + return np.array([0.0, 0.0, -9.81], dtype=np.float32) + + @property + def pitch_rad(self): + return float(0.1 * np.sin(self._t)) + + def joint_positions(self): + return np.linspace(-0.5, 0.5, 12).astype(np.float32) + + def joint_velocities(self): + return np.zeros(12, dtype=np.float32) + + +class _DummyConfig: + """ + 簡化版設定,提供最少必要欄位給 SimulationState 使用。 + Minimal config for SimulationState so the validation tool can run. + """ + + use_virtual_teensy = True + num_motors = 12 # 模擬用的馬達數 + + # 使用正式的 TuningParamsConfig,確保 __dict__ 含有所需欄位 + # Use real TuningParamsConfig so __dict__ has required fields + initial_tuning_params = TuningParamsConfig( + kp=0.0, # 比例增益 (Proportional gain) + kd=0.0, # 微分增益 (Derivative gain) + action_scale=1.0, # 動作縮放係數 + bias=0.0, # 力矩偏置 + ) + + +def main(): + state = SimulationState(_DummyConfig()) + state.sim = _DummySim() + + vt = VirtualTeensy(state, rate_hz=50.0) + vt.write(b"monitor p\n") + + hw = type("HW", (), {})() + last_print = time.time() + got = 0 + + while got < 250: # 約 5 秒 + b = vt.readline() + if not b: + time.sleep(0.001) + continue + parts = b.decode("utf-8").strip().split(",") + if len(parts) != 34: + continue + v = np.asarray([float(x) for x in parts], dtype=np.float32) + + hw.angular_velocity_radps = v[0:3] + hw.gravity_vector_norm = v[3:6] + hw.accelerometer_ms2 = v[6:9] + hw.pitch_rad = float(v[9]) + hw.joint_positions_rad = v[10:22] + hw.joint_velocities_radps = v[22:34] + + obs = construct_observation_51(state, hw) + got += 1 + + now = time.time() + if now - last_print > 1.0: + print( + f"[sample] 34D gyro={hw.angular_velocity_radps.round(3)}, " + f"q0={hw.joint_positions_rad[0]:.3f}; 51D shape={obs.shape}" + ) + last_print = now + + vt.close() + print("OK.") + + +if __name__ == "__main__": + main()