diff --git a/rock-conf/rock-local.yml b/rock-conf/rock-local.yml index a4f364d5f0..1999a885f5 100644 --- a/rock-conf/rock-local.yml +++ b/rock-conf/rock-local.yml @@ -7,3 +7,7 @@ ray: warmup: images: - "python:3.11" + +monitor: + enabled: true + via_rocklet: true diff --git a/rock/config.py b/rock/config.py index 7b829bf4b0..cad0587a91 100644 --- a/rock/config.py +++ b/rock/config.py @@ -76,6 +76,12 @@ class ProxyServiceConfig: aes_encrypt_key: str | None = None +@dataclass +class MonitorConfig: + enabled: bool = False + via_rocklet: bool = False + + @dataclass class DatabaseConfig: # Supported URL formats: @@ -204,6 +210,7 @@ class RockConfig: proxy_service: ProxyServiceConfig = field(default_factory=ProxyServiceConfig) scheduler: SchedulerConfig = field(default_factory=SchedulerConfig) database: DatabaseConfig = field(default_factory=DatabaseConfig) + monitor: MonitorConfig = field(default_factory=MonitorConfig) nacos_provider: NacosConfigProvider | None = None @classmethod @@ -247,6 +254,8 @@ def from_env(cls, config_path: str | None = None): kwargs["scheduler"] = SchedulerConfig(**config["scheduler"]) if "database" in config: kwargs["database"] = DatabaseConfig(**config["database"]) + if "monitor" in config: + kwargs["monitor"] = MonitorConfig(**config["monitor"]) return cls(**kwargs) diff --git a/rock/deployments/config.py b/rock/deployments/config.py index 35ab4fe4ab..8db3467a06 100644 --- a/rock/deployments/config.py +++ b/rock/deployments/config.py @@ -133,6 +133,12 @@ class DockerDeploymentConfig(DeploymentConfig): extended_params: dict[str, str] = Field(default_factory=dict) """Generic extension field for storing custom string key-value pairs.""" + monitor_enabled: bool = False + """Enable rocklet metrics monitor in container.""" + + monitor_via_rocklet: bool = False + """Route metrics monitoring through rocklet instead of Ray actor.""" + @model_validator(mode="before") def validate_platform_args(cls, data: dict) -> dict: """Validate and extract platform arguments from docker_args. diff --git a/rock/deployments/docker.py b/rock/deployments/docker.py index 67246ddeeb..f539c20da3 100644 --- a/rock/deployments/docker.py +++ b/rock/deployments/docker.py @@ -395,6 +395,10 @@ async def start(self): env_arg.extend(["-e", f"ROCK_TIME_ZONE={env_vars.ROCK_TIME_ZONE}"]) + # Rocklet metrics monitor + if self._config.monitor_enabled and self._config.monitor_via_rocklet: + env_arg.extend(["-e", "ROCK_MONITOR_ENABLE=true", "-e", "ROCK_MONITOR_VIA_ROCKLET=true"]) + # Kata DinD: prepare disk image and add volume mount + env var if self._config.use_kata_runtime: self._prepare_kata_disk() diff --git a/rock/deployments/manager.py b/rock/deployments/manager.py index 3805de5c3c..755534c5ea 100644 --- a/rock/deployments/manager.py +++ b/rock/deployments/manager.py @@ -37,6 +37,9 @@ async def init_config(self, config: DeploymentConfig) -> DockerDeploymentConfig: docker_deployment_config.enable_auto_clear = self._enable_runtime_auto_clear docker_deployment_config.runtime_config = self.rock_config.runtime + docker_deployment_config.monitor_enabled = self.rock_config.monitor.enabled + docker_deployment_config.monitor_via_rocklet = self.rock_config.monitor.via_rocklet + await self.rock_config.update() docker_deployment_config.actor_resource = self.rock_config.sandbox_config.actor_resource docker_deployment_config.actor_resource_num = self.rock_config.sandbox_config.actor_resource_num diff --git a/rock/env_vars.py b/rock/env_vars.py index be1f98899a..fc8e39f782 100644 --- a/rock/env_vars.py +++ b/rock/env_vars.py @@ -37,6 +37,7 @@ ROCK_PIP_INDEX_URL: str | None = "https://mirrors.aliyun.com/pypi/simple/" ROCK_MONITOR_ENABLE: bool = False + ROCK_MONITOR_VIA_ROCKLET: bool = False ROCK_PROJECT_ROOT: str | None = None ROCK_WORKER_ENV_TYPE: str | None = "local" ROCK_PYTHON_ENV_PATH: str | None = None @@ -126,6 +127,7 @@ "ROCK_DOCUUM_INSTALL_URL": lambda: os.getenv( "ROCK_DOCUUM_INSTALL_URL", "https://raw.githubusercontent.com/stepchowfun/docuum/main/install.sh" ), + "ROCK_MONITOR_VIA_ROCKLET": lambda: os.getenv("ROCK_MONITOR_VIA_ROCKLET", "false").lower() == "true", } diff --git a/rock/rocklet/monitor.py b/rock/rocklet/monitor.py new file mode 100644 index 0000000000..ac15c1d31e --- /dev/null +++ b/rock/rocklet/monitor.py @@ -0,0 +1,295 @@ +""" +Rocklet metrics monitor module. + +This module extracts the monitoring logic from BaseActor so that it can run +inside the rocklet process (or as a subprocess spawned by rocklet). When the +feature switch ROCK_MONITOR_VIA_ROCKLET is enabled, the rocklet is responsible +for collecting sandbox resource metrics and reporting them via OTLP, instead of +the Ray actor. +""" + +import asyncio +import multiprocessing +import os +import signal +import time + +import httpx +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +from rock.logger import init_logger +from rock.utils.system import get_instance_id, get_uniagent_endpoint + +logger = init_logger(__name__) + +DEFAULT_REPORT_INTERVAL = 10 +DEFAULT_EXPORT_INTERVAL_MILLIS = 10000 +DEFAULT_ROCKLET_PORT = 8000 + + +class RockletMetricsMonitor: + """Metrics monitor that runs inside the rocklet process. + + It periodically fetches sandbox resource statistics from the local rocklet + HTTP endpoint and reports them via OpenTelemetry OTLP exporter. + """ + + def __init__( + self, + sandbox_id: str, + rocklet_port: int = DEFAULT_ROCKLET_PORT, + report_interval: int = DEFAULT_REPORT_INTERVAL, + export_interval_millis: int = DEFAULT_EXPORT_INTERVAL_MILLIS, + env: str = "dev", + role: str = "test", + user_id: str = "default", + experiment_id: str = "default", + namespace: str = "default", + metrics_endpoint: str = "", + user_defined_tags: dict | None = None, + ): + self._sandbox_id = sandbox_id + self._rocklet_port = rocklet_port + self._report_interval = report_interval + self._export_interval_millis = export_interval_millis + self._env = env + self._role = role + self._user_id = user_id + self._experiment_id = experiment_id + self._namespace = namespace + self._metrics_endpoint = metrics_endpoint + self._user_defined_tags = user_defined_tags or {} + self._ip = get_instance_id() + self._host: str | None = None + self._gauges: dict = {} + self._scheduler: AsyncIOScheduler | None = None + self._http_client: httpx.AsyncClient | None = None + self._startup_time = time.time() + + def _init_otel(self): + """Initialize the OpenTelemetry metrics pipeline.""" + host, port = get_uniagent_endpoint() + self._host = host + logger.info( + f"RockletMetricsMonitor initializing OTLP with host={host}, port={port}, " + f"env={self._env}, role={self._role}" + ) + endpoint = self._metrics_endpoint or f"http://{host}:{port}/v1/metrics" + otlp_exporter = OTLPMetricExporter(endpoint=endpoint) + metric_reader = PeriodicExportingMetricReader( + otlp_exporter, + export_interval_millis=self._export_interval_millis, + ) + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + meter = metrics.get_meter("XRL_GATEWAY_CONFIG") + + self._gauges["cpu"] = meter.create_gauge(name="xrl_gateway.system.cpu", description="CPU Usage", unit="1") + self._gauges["mem"] = meter.create_gauge(name="xrl_gateway.system.memory", description="Memory Usage", unit="1") + self._gauges["disk"] = meter.create_gauge(name="xrl_gateway.system.disk", description="Disk Usage", unit="1") + self._gauges["net"] = meter.create_gauge( + name="xrl_gateway.system.network", description="Network Usage", unit="1" + ) + self._gauges["rt"] = meter.create_gauge( + name="xrl_gateway.system.lifespan_rt", description="Life Span Rt", unit="1" + ) + + async def _fetch_statistics(self) -> dict | None: + """Fetch sandbox statistics from the local rocklet HTTP endpoint.""" + url = f"http://localhost:{self._rocklet_port}/get_statistics" + try: + response = await self._http_client.get(url, timeout=3) + if response.status_code == 200: + return response.json() + logger.warning(f"Unexpected status {response.status_code} from {url}") + except Exception as e: + logger.error(f"Failed to fetch statistics from rocklet: {e}") + return None + + async def _collect_and_report_metrics(self): + """Collect metrics from rocklet and report via OTLP gauges.""" + start = time.perf_counter() + total_timeout = self._report_interval - 1 + try: + await asyncio.wait_for(self._report_single_sandbox(), timeout=total_timeout) + except asyncio.TimeoutError: + duration = time.perf_counter() - start + logger.error(f"Metrics collection timed out after {duration:.2f}s (limit: {total_timeout}s)") + + async def _report_single_sandbox(self): + """Fetch and report metrics for the sandbox.""" + start = time.perf_counter() + try: + stats = await self._fetch_statistics() + if stats is None or stats.get("cpu") is None: + logger.warning(f"No metrics returned for sandbox: {self._sandbox_id}") + return + + logger.debug(f"sandbox [{self._sandbox_id}] metrics = {stats}") + + attributes = { + "sandbox_id": self._sandbox_id, + "env": self._env, + "role": self._role, + "host": self._host or "", + "ip": self._ip, + "user_id": self._user_id, + "experiment_id": self._experiment_id, + "namespace": self._namespace, + } + if self._user_defined_tags: + attributes.update(self._user_defined_tags) + + self._gauges["cpu"].set(stats["cpu"], attributes=attributes) + self._gauges["mem"].set(stats["mem"], attributes=attributes) + self._gauges["disk"].set(stats["disk"], attributes=attributes) + self._gauges["net"].set(stats["net"], attributes=attributes) + + lifespan_rt = time.time() - self._startup_time + self._gauges["rt"].set(lifespan_rt, attributes=attributes) + + logger.debug(f"Successfully reported metrics for sandbox: {self._sandbox_id}") + report_rt = time.perf_counter() - start + logger.debug(f"Single sandbox report rt: {report_rt:.4f}s") + except Exception as e: + logger.error(f"Error collecting metrics for sandbox {self._sandbox_id}: {e}") + + async def start(self): + """Start the metrics collection scheduler.""" + self._init_otel() + self._http_client = httpx.AsyncClient() + self._scheduler = AsyncIOScheduler( + timezone="UTC", + job_defaults={"coalesce": True, "max_instances": 1, "misfire_grace_time": 30}, + ) + self._scheduler.add_job( + func=self._collect_and_report_metrics, + trigger=IntervalTrigger(seconds=self._report_interval), + id="rocklet_metrics_collection", + name="Rocklet Sandbox Resource Metrics Collection", + ) + self._scheduler.start() + logger.info( + f"RockletMetricsMonitor started for sandbox={self._sandbox_id}, " f"interval={self._report_interval}s" + ) + + async def stop(self): + """Stop the metrics collection scheduler and clean up resources.""" + if self._scheduler and self._scheduler.running: + logger.info("Stopping RockletMetricsMonitor scheduler...") + self._scheduler.shutdown(wait=True) + logger.info("RockletMetricsMonitor scheduler stopped") + self._scheduler = None + if self._http_client: + await self._http_client.aclose() + self._http_client = None + + +def _run_monitor_async( + sandbox_id: str, + rocklet_port: int, + report_interval: int, + export_interval_millis: int, + env: str, + role: str, + user_id: str, + experiment_id: str, + namespace: str, + metrics_endpoint: str, + user_defined_tags: dict, +): + """Entry point for the monitor subprocess — runs the async event loop.""" + monitor = RockletMetricsMonitor( + sandbox_id=sandbox_id, + rocklet_port=rocklet_port, + report_interval=report_interval, + export_interval_millis=export_interval_millis, + env=env, + role=role, + user_id=user_id, + experiment_id=experiment_id, + namespace=namespace, + metrics_endpoint=metrics_endpoint, + user_defined_tags=user_defined_tags, + ) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + def _handle_signal(signum, frame): + logger.info(f"Monitor subprocess received signal {signum}, shutting down...") + loop.call_soon_threadsafe(loop.stop) + + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + + try: + loop.run_until_complete(monitor.start()) + loop.run_forever() + finally: + loop.run_until_complete(monitor.stop()) + loop.close() + logger.info("Monitor subprocess exited") + + +def start_monitor_process( + sandbox_id: str = "", + rocklet_port: int = DEFAULT_ROCKLET_PORT, + report_interval: int = DEFAULT_REPORT_INTERVAL, + export_interval_millis: int = DEFAULT_EXPORT_INTERVAL_MILLIS, + env: str = "dev", + role: str = "test", + user_id: str = "default", + experiment_id: str = "default", + namespace: str = "default", + metrics_endpoint: str = "", + user_defined_tags: dict | None = None, +) -> multiprocessing.Process: + """Spawn a child process that runs the metrics monitor. + + Returns the ``multiprocessing.Process`` handle so the caller can manage its + lifecycle (e.g. terminate on shutdown). + """ + if not sandbox_id: + sandbox_id = os.getenv("SANDBOX_ID", os.getenv("HOSTNAME", "unknown")) + + process = multiprocessing.Process( + target=_run_monitor_async, + kwargs={ + "sandbox_id": sandbox_id, + "rocklet_port": rocklet_port, + "report_interval": report_interval, + "export_interval_millis": export_interval_millis, + "env": env, + "role": role, + "user_id": user_id, + "experiment_id": experiment_id, + "namespace": namespace, + "metrics_endpoint": metrics_endpoint, + "user_defined_tags": user_defined_tags or {}, + }, + daemon=True, + name="rocklet-metrics-monitor", + ) + process.start() + logger.info(f"Monitor subprocess started with pid={process.pid}") + return process + + +def stop_monitor_process(process: multiprocessing.Process | None): + """Gracefully stop the monitor subprocess.""" + if process is None or not process.is_alive(): + return + logger.info(f"Terminating monitor subprocess pid={process.pid}") + process.terminate() + process.join(timeout=5) + if process.is_alive(): + logger.warning(f"Monitor subprocess pid={process.pid} did not exit, killing") + process.kill() + process.join(timeout=2) + logger.info("Monitor subprocess stopped") diff --git a/rock/rocklet/server.py b/rock/rocklet/server.py index 8e1d9ef0e8..be4e746313 100755 --- a/rock/rocklet/server.py +++ b/rock/rocklet/server.py @@ -13,15 +13,19 @@ from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.status import HTTP_504_GATEWAY_TIMEOUT +from rock import env_vars from rock.actions import _ExceptionTransfer from rock.logger import init_logger from rock.rocklet import __version__ from rock.rocklet.local_api import local_router +from rock.rocklet.monitor import start_monitor_process, stop_monitor_process from rock.utils import EAGLE_EYE_TRACE_ID, REQUEST_TIMEOUT_SECONDS, sandbox_id_ctx_var, trace_id_ctx_var logger = init_logger("rocklet.server") app = FastAPI() +_monitor_process = None + app.include_router(local_router, tags=["local"]) @@ -97,6 +101,23 @@ async def exception_handler(request: Request, exc: Exception): return JSONResponse(status_code=511, content={"rockletexception": _exc.model_dump()}) +@app.on_event("startup") +async def startup_event(): + global _monitor_process + if env_vars.ROCK_MONITOR_ENABLE and env_vars.ROCK_MONITOR_VIA_ROCKLET: + logger.info("Starting rocklet metrics monitor subprocess...") + _monitor_process = start_monitor_process() + + +@app.on_event("shutdown") +async def shutdown_event(): + global _monitor_process + if _monitor_process is not None: + logger.info("Stopping rocklet metrics monitor subprocess...") + stop_monitor_process(_monitor_process) + _monitor_process = None + + @app.get("/") async def root(): return {"message": "hello world"} diff --git a/rock/sandbox/base_actor.py b/rock/sandbox/base_actor.py index 058f7dee17..a996b87f10 100644 --- a/rock/sandbox/base_actor.py +++ b/rock/sandbox/base_actor.py @@ -114,6 +114,9 @@ def _init_monitor(self): async def _setup_monitor(self): if not env_vars.ROCK_MONITOR_ENABLE: return + if env_vars.ROCK_MONITOR_VIA_ROCKLET: + logger.info("ROCK_MONITOR_VIA_ROCKLET is enabled, skipping actor-side monitoring setup") + return self._init_monitor() self._report_interval = 10 self._metrics_report_scheduler = AsyncIOScheduler( @@ -128,6 +131,8 @@ async def _setup_monitor(self): self._metrics_report_scheduler.start() def stop_monitoring(self): + if env_vars.ROCK_MONITOR_VIA_ROCKLET: + return if env_vars.ROCK_MONITOR_ENABLE and self._metrics_report_scheduler and self._metrics_report_scheduler.running: logger.info("Stopping APScheduler...") self._metrics_report_scheduler.shutdown(wait=True) diff --git a/tests/unit/test_rocklet_monitor.py b/tests/unit/test_rocklet_monitor.py new file mode 100644 index 0000000000..86bbf40017 --- /dev/null +++ b/tests/unit/test_rocklet_monitor.py @@ -0,0 +1,551 @@ +import asyncio +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from rock.rocklet.monitor import ( + RockletMetricsMonitor, + start_monitor_process, + stop_monitor_process, +) + + +class TestRockletMetricsMonitor: + """Tests for the RockletMetricsMonitor class.""" + + def _make_monitor(self, **kwargs): + defaults = { + "sandbox_id": "test-sandbox-001", + "rocklet_port": 18000, + "report_interval": 5, + "export_interval_millis": 5000, + "env": "test", + "role": "write", + "user_id": "user-123", + "experiment_id": "exp-456", + "namespace": "test-ns", + "metrics_endpoint": "http://localhost:4318/v1/metrics", + "user_defined_tags": {"service": "rock-test"}, + } + defaults.update(kwargs) + return RockletMetricsMonitor(**defaults) + + def test_init_default_values(self): + """Test monitor initializes with correct default values.""" + monitor = RockletMetricsMonitor(sandbox_id="sandbox-1") + assert monitor._sandbox_id == "sandbox-1" + assert monitor._rocklet_port == 8000 + assert monitor._report_interval == 10 + assert monitor._export_interval_millis == 10000 + assert monitor._env == "dev" + assert monitor._role == "test" + assert monitor._user_id == "default" + assert monitor._experiment_id == "default" + assert monitor._namespace == "default" + assert monitor._metrics_endpoint == "" + assert monitor._user_defined_tags == {} + assert monitor._scheduler is None + assert monitor._http_client is None + + def test_init_custom_values(self): + """Test monitor initializes with custom values.""" + monitor = self._make_monitor() + assert monitor._sandbox_id == "test-sandbox-001" + assert monitor._rocklet_port == 18000 + assert monitor._report_interval == 5 + assert monitor._export_interval_millis == 5000 + assert monitor._env == "test" + assert monitor._role == "write" + assert monitor._user_id == "user-123" + assert monitor._experiment_id == "exp-456" + assert monitor._namespace == "test-ns" + assert monitor._metrics_endpoint == "http://localhost:4318/v1/metrics" + assert monitor._user_defined_tags == {"service": "rock-test"} + + @patch("rock.rocklet.monitor.get_uniagent_endpoint", return_value=("10.0.0.1", "4318")) + @patch("rock.rocklet.monitor.OTLPMetricExporter") + @patch("rock.rocklet.monitor.PeriodicExportingMetricReader") + @patch("rock.rocklet.monitor.MeterProvider") + @patch("rock.rocklet.monitor.metrics") + def test_init_otel_with_custom_endpoint( + self, mock_metrics, mock_meter_provider, mock_reader, mock_exporter, mock_get_endpoint + ): + """Test OTEL initialization uses custom metrics_endpoint when provided.""" + monitor = self._make_monitor(metrics_endpoint="http://custom:9090/v1/metrics") + monitor._init_otel() + + mock_exporter.assert_called_once_with(endpoint="http://custom:9090/v1/metrics") + assert monitor._host == "10.0.0.1" + + @patch("rock.rocklet.monitor.get_uniagent_endpoint", return_value=("10.0.0.1", "4318")) + @patch("rock.rocklet.monitor.OTLPMetricExporter") + @patch("rock.rocklet.monitor.PeriodicExportingMetricReader") + @patch("rock.rocklet.monitor.MeterProvider") + @patch("rock.rocklet.monitor.metrics") + def test_init_otel_with_default_endpoint( + self, mock_metrics, mock_meter_provider, mock_reader, mock_exporter, mock_get_endpoint + ): + """Test OTEL initialization falls back to uniagent endpoint when no custom endpoint.""" + monitor = self._make_monitor(metrics_endpoint="") + monitor._init_otel() + + mock_exporter.assert_called_once_with(endpoint="http://10.0.0.1:4318/v1/metrics") + + @patch("rock.rocklet.monitor.get_uniagent_endpoint", return_value=("10.0.0.1", "4318")) + @patch("rock.rocklet.monitor.OTLPMetricExporter") + @patch("rock.rocklet.monitor.PeriodicExportingMetricReader") + @patch("rock.rocklet.monitor.MeterProvider") + @patch("rock.rocklet.monitor.metrics") + def test_init_otel_registers_gauges( + self, mock_metrics, mock_meter_provider, mock_reader, mock_exporter, mock_get_endpoint + ): + """Test OTEL initialization registers all five gauge metrics.""" + mock_meter = MagicMock() + mock_metrics.get_meter.return_value = mock_meter + + monitor = self._make_monitor() + monitor._init_otel() + + assert mock_meter.create_gauge.call_count == 5 + gauge_names = [call.kwargs["name"] for call in mock_meter.create_gauge.call_args_list] + assert "xrl_gateway.system.cpu" in gauge_names + assert "xrl_gateway.system.memory" in gauge_names + assert "xrl_gateway.system.disk" in gauge_names + assert "xrl_gateway.system.network" in gauge_names + assert "xrl_gateway.system.lifespan_rt" in gauge_names + + +class TestRockletMetricsMonitorAsync: + """Async tests for the RockletMetricsMonitor class.""" + + def _make_monitor(self, **kwargs): + defaults = { + "sandbox_id": "test-sandbox-001", + "rocklet_port": 18000, + "report_interval": 5, + "export_interval_millis": 5000, + "env": "test", + "role": "write", + "user_id": "user-123", + "experiment_id": "exp-456", + "namespace": "test-ns", + "metrics_endpoint": "http://localhost:4318/v1/metrics", + "user_defined_tags": {"service": "rock-test"}, + } + defaults.update(kwargs) + return RockletMetricsMonitor(**defaults) + + @pytest.mark.asyncio + async def test_fetch_statistics_success(self): + """Test successful statistics fetch from rocklet endpoint.""" + monitor = self._make_monitor() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"cpu": 25.0, "mem": 60.0, "disk": 40.0, "net": 1024} + + mock_client = AsyncMock() + mock_client.get.return_value = mock_response + monitor._http_client = mock_client + + result = await monitor._fetch_statistics() + assert result == {"cpu": 25.0, "mem": 60.0, "disk": 40.0, "net": 1024} + mock_client.get.assert_called_once_with("http://localhost:18000/get_statistics", timeout=3) + + @pytest.mark.asyncio + async def test_fetch_statistics_http_error(self): + """Test statistics fetch handles HTTP errors gracefully.""" + monitor = self._make_monitor() + mock_response = MagicMock() + mock_response.status_code = 500 + + mock_client = AsyncMock() + mock_client.get.return_value = mock_response + monitor._http_client = mock_client + + result = await monitor._fetch_statistics() + assert result is None + + @pytest.mark.asyncio + async def test_fetch_statistics_connection_error(self): + """Test statistics fetch handles connection errors gracefully.""" + monitor = self._make_monitor() + mock_client = AsyncMock() + mock_client.get.side_effect = ConnectionError("Connection refused") + monitor._http_client = mock_client + + result = await monitor._fetch_statistics() + assert result is None + + @pytest.mark.asyncio + async def test_report_single_sandbox_sets_gauges(self): + """Test that report_single_sandbox correctly sets all gauge values.""" + monitor = self._make_monitor() + monitor._host = "10.0.0.1" + + mock_cpu_gauge = MagicMock() + mock_mem_gauge = MagicMock() + mock_disk_gauge = MagicMock() + mock_net_gauge = MagicMock() + monitor._gauges = { + "cpu": mock_cpu_gauge, + "mem": mock_mem_gauge, + "disk": mock_disk_gauge, + "net": mock_net_gauge, + } + + stats = {"cpu": 25.0, "mem": 60.0, "disk": 40.0, "net": 1024} + with patch.object(monitor, "_fetch_statistics", new_callable=AsyncMock, return_value=stats): + await monitor._report_single_sandbox() + + expected_attributes = { + "sandbox_id": "test-sandbox-001", + "env": "test", + "role": "write", + "host": "10.0.0.1", + "ip": monitor._ip, + "user_id": "user-123", + "experiment_id": "exp-456", + "namespace": "test-ns", + "service": "rock-test", + } + mock_cpu_gauge.set.assert_called_once_with(25.0, attributes=expected_attributes) + mock_mem_gauge.set.assert_called_once_with(60.0, attributes=expected_attributes) + mock_disk_gauge.set.assert_called_once_with(40.0, attributes=expected_attributes) + mock_net_gauge.set.assert_called_once_with(1024, attributes=expected_attributes) + + @pytest.mark.asyncio + async def test_report_single_sandbox_no_metrics(self): + """Test that report_single_sandbox handles None metrics gracefully.""" + monitor = self._make_monitor() + monitor._gauges = { + "cpu": MagicMock(), + "mem": MagicMock(), + "disk": MagicMock(), + "net": MagicMock(), + } + + with patch.object(monitor, "_fetch_statistics", new_callable=AsyncMock, return_value=None): + await monitor._report_single_sandbox() + + for gauge in monitor._gauges.values(): + gauge.set.assert_not_called() + + @pytest.mark.asyncio + async def test_report_single_sandbox_cpu_none(self): + """Test that report_single_sandbox skips when cpu is None.""" + monitor = self._make_monitor() + monitor._gauges = { + "cpu": MagicMock(), + "mem": MagicMock(), + "disk": MagicMock(), + "net": MagicMock(), + } + + stats = {"cpu": None, "mem": 60.0, "disk": 40.0, "net": 1024} + with patch.object(monitor, "_fetch_statistics", new_callable=AsyncMock, return_value=stats): + await monitor._report_single_sandbox() + + for gauge in monitor._gauges.values(): + gauge.set.assert_not_called() + + @pytest.mark.asyncio + async def test_collect_and_report_metrics_timeout(self): + """Test that _collect_and_report_metrics handles timeout.""" + monitor = self._make_monitor(report_interval=2) + + async def slow_report(): + await asyncio.sleep(10) + + with patch.object(monitor, "_report_single_sandbox", side_effect=slow_report): + # Should not raise, just log the timeout + await monitor._collect_and_report_metrics() + + @pytest.mark.asyncio + @patch("rock.rocklet.monitor.get_uniagent_endpoint", return_value=("localhost", "4318")) + @patch("rock.rocklet.monitor.OTLPMetricExporter") + @patch("rock.rocklet.monitor.PeriodicExportingMetricReader") + @patch("rock.rocklet.monitor.MeterProvider") + @patch("rock.rocklet.monitor.metrics") + async def test_start_and_stop( + self, mock_metrics, mock_meter_provider, mock_reader, mock_exporter, mock_get_endpoint + ): + """Test monitor start and stop lifecycle.""" + mock_meter = MagicMock() + mock_metrics.get_meter.return_value = mock_meter + + monitor = self._make_monitor() + await monitor.start() + + assert monitor._scheduler is not None + assert monitor._scheduler.running + assert monitor._http_client is not None + + await monitor.stop() + assert monitor._scheduler is None + assert monitor._http_client is None + + @pytest.mark.asyncio + async def test_stop_without_start(self): + """Test that stop is safe to call without start.""" + monitor = self._make_monitor() + # Should not raise + await monitor.stop() + + +class TestMonitorProcessManagement: + """Tests for start_monitor_process and stop_monitor_process functions.""" + + @patch("rock.rocklet.monitor.multiprocessing.Process") + def test_start_monitor_process_with_explicit_sandbox_id(self, mock_process_cls): + """Test starting monitor process with explicit sandbox_id.""" + mock_process = MagicMock() + mock_process_cls.return_value = mock_process + + result = start_monitor_process(sandbox_id="my-sandbox", rocklet_port=9000) + + assert result is mock_process + mock_process.start.assert_called_once() + call_kwargs = mock_process_cls.call_args + assert call_kwargs.kwargs["kwargs"]["sandbox_id"] == "my-sandbox" + assert call_kwargs.kwargs["kwargs"]["rocklet_port"] == 9000 + assert call_kwargs.kwargs["daemon"] is True + assert call_kwargs.kwargs["name"] == "rocklet-metrics-monitor" + + @patch("rock.rocklet.monitor.multiprocessing.Process") + def test_start_monitor_process_sandbox_id_from_env(self, mock_process_cls): + """Test starting monitor process with sandbox_id from environment.""" + mock_process = MagicMock() + mock_process_cls.return_value = mock_process + + with patch.dict(os.environ, {"SANDBOX_ID": "env-sandbox-123"}): + start_monitor_process() + + call_kwargs = mock_process_cls.call_args + assert call_kwargs.kwargs["kwargs"]["sandbox_id"] == "env-sandbox-123" + + @patch("rock.rocklet.monitor.multiprocessing.Process") + def test_start_monitor_process_sandbox_id_from_hostname(self, mock_process_cls): + """Test starting monitor process with sandbox_id from HOSTNAME env.""" + mock_process = MagicMock() + mock_process_cls.return_value = mock_process + + env_without_sandbox_id = {k: v for k, v in os.environ.items() if k != "SANDBOX_ID"} + env_without_sandbox_id["HOSTNAME"] = "hostname-sandbox" + with patch.dict(os.environ, env_without_sandbox_id, clear=True): + start_monitor_process() + + call_kwargs = mock_process_cls.call_args + assert call_kwargs.kwargs["kwargs"]["sandbox_id"] == "hostname-sandbox" + + @patch("rock.rocklet.monitor.multiprocessing.Process") + def test_start_monitor_process_default_params(self, mock_process_cls): + """Test starting monitor process with default parameters.""" + mock_process = MagicMock() + mock_process_cls.return_value = mock_process + + start_monitor_process(sandbox_id="test") + + call_kwargs = mock_process_cls.call_args.kwargs["kwargs"] + assert call_kwargs["rocklet_port"] == 8000 + assert call_kwargs["report_interval"] == 10 + assert call_kwargs["export_interval_millis"] == 10000 + assert call_kwargs["env"] == "dev" + assert call_kwargs["role"] == "test" + assert call_kwargs["user_id"] == "default" + assert call_kwargs["experiment_id"] == "default" + assert call_kwargs["namespace"] == "default" + assert call_kwargs["metrics_endpoint"] == "" + assert call_kwargs["user_defined_tags"] == {} + + def test_stop_monitor_process_none(self): + """Test stop_monitor_process with None process.""" + # Should not raise + stop_monitor_process(None) + + def test_stop_monitor_process_not_alive(self): + """Test stop_monitor_process with a process that is not alive.""" + mock_process = MagicMock() + mock_process.is_alive.return_value = False + # Should not raise and should not call terminate + stop_monitor_process(mock_process) + mock_process.terminate.assert_not_called() + + def test_stop_monitor_process_alive_graceful(self): + """Test stop_monitor_process gracefully terminates a running process.""" + mock_process = MagicMock() + mock_process.is_alive.side_effect = [True, False] + mock_process.pid = 12345 + + stop_monitor_process(mock_process) + + mock_process.terminate.assert_called_once() + mock_process.join.assert_called_once_with(timeout=5) + mock_process.kill.assert_not_called() + + def test_stop_monitor_process_alive_force_kill(self): + """Test stop_monitor_process force-kills a process that won't terminate.""" + mock_process = MagicMock() + mock_process.is_alive.side_effect = [True, True] + mock_process.pid = 12345 + + stop_monitor_process(mock_process) + + mock_process.terminate.assert_called_once() + mock_process.kill.assert_called_once() + assert mock_process.join.call_count == 2 + + +class TestBaseActorMonitorSwitch: + """Tests for the ROCK_MONITOR_VIA_ROCKLET switch in BaseActor.""" + + @pytest.mark.asyncio + @patch("rock.sandbox.base_actor.env_vars") + async def test_setup_monitor_skipped_when_monitor_disabled(self, mock_env_vars): + """Test _setup_monitor does nothing when ROCK_MONITOR_ENABLE is False.""" + mock_env_vars.ROCK_MONITOR_ENABLE = False + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = False + + from rock.sandbox.base_actor import BaseActor + + mock_config = MagicMock() + mock_config.auto_clear_time = None + # mock_deployment = MagicMock() + + with patch.object(BaseActor, "__init__", lambda self, *a, **kw: None): + actor = BaseActor.__new__(BaseActor) + actor._metrics_report_scheduler = None + + await actor._setup_monitor() + # _init_monitor should not have been called — no scheduler created + assert actor._metrics_report_scheduler is None + + @pytest.mark.asyncio + @patch("rock.sandbox.base_actor.env_vars") + async def test_setup_monitor_skipped_when_via_rocklet_enabled(self, mock_env_vars): + """Test _setup_monitor skips actor monitoring when ROCK_MONITOR_VIA_ROCKLET is True.""" + mock_env_vars.ROCK_MONITOR_ENABLE = True + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = True + + from rock.sandbox.base_actor import BaseActor + + with patch.object(BaseActor, "__init__", lambda self, *a, **kw: None): + actor = BaseActor.__new__(BaseActor) + actor._metrics_report_scheduler = None + + await actor._setup_monitor() + # _init_monitor should not have been called — no scheduler created + assert actor._metrics_report_scheduler is None + + @patch("rock.sandbox.base_actor.env_vars") + def test_stop_monitoring_noop_when_via_rocklet(self, mock_env_vars): + """Test stop_monitoring does nothing when ROCK_MONITOR_VIA_ROCKLET is True.""" + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = True + + from rock.sandbox.base_actor import BaseActor + + with patch.object(BaseActor, "__init__", lambda self, *a, **kw: None): + actor = BaseActor.__new__(BaseActor) + mock_scheduler = MagicMock() + mock_scheduler.running = True + actor._metrics_report_scheduler = mock_scheduler + + actor.stop_monitoring() + mock_scheduler.shutdown.assert_not_called() + + @patch("rock.sandbox.base_actor.env_vars") + def test_stop_monitoring_works_when_via_rocklet_disabled(self, mock_env_vars): + """Test stop_monitoring works normally when ROCK_MONITOR_VIA_ROCKLET is False.""" + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = False + mock_env_vars.ROCK_MONITOR_ENABLE = True + + from rock.sandbox.base_actor import BaseActor + + with patch.object(BaseActor, "__init__", lambda self, *a, **kw: None): + actor = BaseActor.__new__(BaseActor) + mock_scheduler = MagicMock() + mock_scheduler.running = True + actor._metrics_report_scheduler = mock_scheduler + + actor.stop_monitoring() + mock_scheduler.shutdown.assert_called_once_with(wait=True) + + +class TestServerMonitorIntegration: + """Tests for the rocklet server startup/shutdown monitor integration.""" + + @pytest.mark.asyncio + @patch("rock.rocklet.server.env_vars") + @patch("rock.rocklet.server.start_monitor_process") + async def test_startup_starts_monitor_when_both_flags_enabled(self, mock_start, mock_env_vars): + """Test that startup event starts monitor when both flags are enabled.""" + mock_env_vars.ROCK_MONITOR_ENABLE = True + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = True + mock_process = MagicMock() + mock_start.return_value = mock_process + + import rock.rocklet.server as server_module + + server_module._monitor_process = None + await server_module.startup_event() + + mock_start.assert_called_once() + assert server_module._monitor_process is mock_process + + @pytest.mark.asyncio + @patch("rock.rocklet.server.env_vars") + @patch("rock.rocklet.server.start_monitor_process") + async def test_startup_skips_monitor_when_monitor_disabled(self, mock_start, mock_env_vars): + """Test that startup event skips monitor when ROCK_MONITOR_ENABLE is False.""" + mock_env_vars.ROCK_MONITOR_ENABLE = False + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = True + + import rock.rocklet.server as server_module + + server_module._monitor_process = None + await server_module.startup_event() + + mock_start.assert_not_called() + assert server_module._monitor_process is None + + @pytest.mark.asyncio + @patch("rock.rocklet.server.env_vars") + @patch("rock.rocklet.server.start_monitor_process") + async def test_startup_skips_monitor_when_via_rocklet_disabled(self, mock_start, mock_env_vars): + """Test that startup event skips monitor when ROCK_MONITOR_VIA_ROCKLET is False.""" + mock_env_vars.ROCK_MONITOR_ENABLE = True + mock_env_vars.ROCK_MONITOR_VIA_ROCKLET = False + + import rock.rocklet.server as server_module + + server_module._monitor_process = None + await server_module.startup_event() + + mock_start.assert_not_called() + assert server_module._monitor_process is None + + @pytest.mark.asyncio + @patch("rock.rocklet.server.stop_monitor_process") + async def test_shutdown_stops_monitor(self, mock_stop): + """Test that shutdown event stops the monitor process.""" + import rock.rocklet.server as server_module + + mock_process = MagicMock() + server_module._monitor_process = mock_process + + await server_module.shutdown_event() + + mock_stop.assert_called_once_with(mock_process) + assert server_module._monitor_process is None + + @pytest.mark.asyncio + @patch("rock.rocklet.server.stop_monitor_process") + async def test_shutdown_noop_when_no_monitor(self, mock_stop): + """Test that shutdown event is safe when no monitor process exists.""" + import rock.rocklet.server as server_module + + server_module._monitor_process = None + + await server_module.shutdown_event() + + mock_stop.assert_not_called()