Skip to content

Commit 8bcb1ea

Browse files
RobotGFclaude
authored andcommitted
[fix] Decouple metrics exporter from controller internals
- Replace direct controller access with snapshot-based collection: controller pushes plain-dict snapshots via update_controller_snapshot(), metrics thread reads dicts instead of live objects/tensors. - Make measure() fault-tolerant: wrap all prometheus calls in try/except so metrics failures never block the controller request loop. - Add dedicated snapshot daemon thread in controller. - Update tests to use snapshot-based API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent be61454 commit 8bcb1ea

3 files changed

Lines changed: 136 additions & 85 deletions

File tree

tests/test_metrics.py

Lines changed: 30 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
"""Unit tests for the Prometheus metrics exporter (transfer_queue.metrics)."""
1717

1818
import time
19-
from types import SimpleNamespace
2019
from unittest.mock import MagicMock
2120

2221
import pytest
@@ -32,54 +31,35 @@
3231

3332

3433
# ---------------------------------------------------------------------------
35-
# Helpers — lightweight fakes that satisfy TQMetricsExporter without Ray
34+
# Helpers — build snapshot dicts that TQMetricsExporter.update_controller_snapshot expects
3635
# ---------------------------------------------------------------------------
3736

3837

39-
def _make_fake_partition(
40-
partition_id: str,
38+
def _make_partition_snapshot(
4139
total_samples: int = 10,
42-
total_fields: int = 2,
43-
allocated_samples: int = 16,
4440
produced_ratio: float = 0.5,
4541
consumption: dict | None = None,
46-
):
47-
"""Return an object that mimics ``DataPartitionStatus.get_statistics()``."""
42+
) -> dict:
43+
"""Return a partition snapshot dict."""
4844
consumption_stats = {}
4945
if consumption:
5046
for task, progress in consumption.items():
51-
consumption_stats[task] = {
52-
"consumed_samples": int(total_samples * progress),
53-
"consumption_progress": progress,
54-
}
47+
consumption_stats[task] = {"consumption_progress": progress}
5548

56-
stats = {
57-
"partition_id": partition_id,
58-
"created_at": time.time(),
49+
return {
5950
"total_samples_num": total_samples,
60-
"total_fields_num": total_fields,
61-
"allocated_samples_num": allocated_samples,
62-
"allocated_fields_num": total_fields,
63-
"registered_tasks": list((consumption or {}).keys()),
64-
"produced_samples": int(total_samples * produced_ratio),
6551
"production_progress": produced_ratio,
66-
"field_statistics": {},
6752
"consumption_statistics": consumption_stats,
6853
}
6954

70-
partition = MagicMock()
71-
partition.get_statistics.return_value = stats
72-
return partition
73-
7455

75-
def _make_fake_controller(partitions=None, allocated=10, reusable=2):
76-
"""Return a lightweight stand-in for ``TransferQueueController``."""
77-
ctrl = SimpleNamespace()
78-
ctrl.partitions = partitions or {}
79-
ctrl.index_manager = SimpleNamespace()
80-
ctrl.index_manager.allocated_indexes = set(range(allocated))
81-
ctrl.index_manager.reusable_indexes = list(range(reusable))
82-
return ctrl
56+
def _make_snapshot(partitions=None, allocated=10, reusable=2) -> dict:
57+
"""Return a controller metrics snapshot dict."""
58+
return {
59+
"partitions": partitions or {},
60+
"global_index_allocated": allocated,
61+
"global_index_reusable": reusable,
62+
}
8363

8464

8565
# ---------------------------------------------------------------------------
@@ -90,8 +70,7 @@ def _make_fake_controller(partitions=None, allocated=10, reusable=2):
9070
class TestMetricDefinitions:
9171
def test_all_metrics_are_registered(self):
9272
"""Verify that all expected metric families exist in the exporter's registry."""
93-
ctrl = _make_fake_controller()
94-
exporter = TQMetricsExporter(ctrl)
73+
exporter = TQMetricsExporter()
9574

9675
expected_prefixes = [
9776
"tq_controller_uptime_seconds",
@@ -123,9 +102,9 @@ def test_all_metrics_are_registered(self):
123102

124103
class TestControllerMetricsCollection:
125104
def test_collect_empty_controller(self):
126-
"""Collect metrics from a controller with no partitions — should not raise."""
127-
ctrl = _make_fake_controller(partitions={}, allocated=0, reusable=0)
128-
exporter = TQMetricsExporter(ctrl)
105+
"""Collect metrics from an empty snapshot — should not raise."""
106+
exporter = TQMetricsExporter()
107+
exporter.update_controller_snapshot(_make_snapshot(partitions={}, allocated=0, reusable=0))
129108
exporter.collect_controller_metrics()
130109

131110
assert exporter.partitions_total._value.get() == 0
@@ -134,15 +113,12 @@ def test_collect_empty_controller(self):
134113

135114
def test_collect_with_partitions(self):
136115
"""Partition-level metrics are populated correctly."""
137-
p1 = _make_fake_partition(
138-
"train_0", total_samples=20, produced_ratio=0.8, consumption={"gen": 0.5}
139-
)
140-
p2 = _make_fake_partition(
141-
"train_1", total_samples=10, produced_ratio=1.0, consumption={"gen": 1.0, "train": 0.3}
142-
)
143-
ctrl = _make_fake_controller(partitions={"train_0": p1, "train_1": p2}, allocated=30, reusable=5)
116+
p1 = _make_partition_snapshot(total_samples=20, produced_ratio=0.8, consumption={"gen": 0.5})
117+
p2 = _make_partition_snapshot(total_samples=10, produced_ratio=1.0, consumption={"gen": 1.0, "train": 0.3})
118+
snapshot = _make_snapshot(partitions={"train_0": p1, "train_1": p2}, allocated=30, reusable=5)
144119

145-
exporter = TQMetricsExporter(ctrl)
120+
exporter = TQMetricsExporter()
121+
exporter.update_controller_snapshot(snapshot)
146122
exporter.collect_controller_metrics()
147123

148124
assert exporter.partitions_total._value.get() == 2
@@ -163,8 +139,8 @@ def test_collect_with_partitions(self):
163139

164140
def test_uptime_increases(self):
165141
"""Controller uptime should be positive after collection."""
166-
ctrl = _make_fake_controller()
167-
exporter = TQMetricsExporter(ctrl)
142+
exporter = TQMetricsExporter()
143+
exporter.update_controller_snapshot(_make_snapshot())
168144
time.sleep(0.05)
169145
exporter.collect_controller_metrics()
170146
assert exporter.controller_uptime._value.get() > 0
@@ -177,8 +153,7 @@ def test_uptime_increases(self):
177153

178154
class TestMeasureContextManager:
179155
def test_measure_records_count_and_duration(self):
180-
ctrl = _make_fake_controller()
181-
exporter = TQMetricsExporter(ctrl)
156+
exporter = TQMetricsExporter()
182157

183158
with exporter.measure("GET_META"):
184159
time.sleep(0.01)
@@ -192,8 +167,7 @@ def test_measure_records_count_and_duration(self):
192167
assert hist._sum.get() > 0
193168

194169
def test_measure_records_errors(self):
195-
ctrl = _make_fake_controller()
196-
exporter = TQMetricsExporter(ctrl)
170+
exporter = TQMetricsExporter()
197171

198172
with pytest.raises(ValueError):
199173
with exporter.measure("BAD_OP"):
@@ -204,8 +178,7 @@ def test_measure_records_errors(self):
204178
assert exporter.request_total.labels(op_type="BAD_OP")._value.get() == 1.0
205179

206180
def test_multiple_ops_tracked_independently(self):
207-
ctrl = _make_fake_controller()
208-
exporter = TQMetricsExporter(ctrl)
181+
exporter = TQMetricsExporter()
209182

210183
for _ in range(3):
211184
with exporter.measure("GET_META"):
@@ -226,15 +199,13 @@ def test_multiple_ops_tracked_independently(self):
226199
class TestStorageMetricsCollection:
227200
def test_collect_with_no_storage_units(self):
228201
"""No storage units registered — collect should be a no-op."""
229-
ctrl = _make_fake_controller()
230-
exporter = TQMetricsExporter(ctrl)
202+
exporter = TQMetricsExporter()
231203
# Should not raise
232204
exporter.collect_storage_metrics()
233205

234206
def test_storage_metrics_populated_on_success(self):
235207
"""Verify storage gauges are set when _query_storage_unit returns data."""
236-
ctrl = _make_fake_controller()
237-
exporter = TQMetricsExporter(ctrl)
208+
exporter = TQMetricsExporter()
238209

239210
fake_su_info = MagicMock()
240211
fake_su_info.id = "SU_001"
@@ -259,8 +230,7 @@ def test_storage_metrics_populated_on_success(self):
259230

260231
def test_storage_metrics_handles_query_failure(self):
261232
"""If a storage unit query fails, other units should still be collected."""
262-
ctrl = _make_fake_controller()
263-
exporter = TQMetricsExporter(ctrl)
233+
exporter = TQMetricsExporter()
264234

265235
su1 = MagicMock()
266236
su1.id = "SU_001"

transfer_queue/controller.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,43 @@ def register_sampler(
21462146

21472147
# ==================== Metrics API ====================
21482148

2149+
def _build_metrics_snapshot(self) -> dict:
2150+
"""Build a plain-dict snapshot of controller state for the metrics exporter.
2151+
2152+
The snapshot contains only primitive / dict values — no references to
2153+
live controller objects — so the metrics thread can read it safely.
2154+
"""
2155+
partitions_data: dict = {}
2156+
for pid, partition in list(self.partitions.items()):
2157+
try:
2158+
stats = partition.get_statistics()
2159+
partitions_data[pid] = {
2160+
"total_samples_num": stats["total_samples_num"],
2161+
"production_progress": stats.get("production_progress", 0),
2162+
"consumption_statistics": {
2163+
task_name: {"consumption_progress": cstats.get("consumption_progress", 0)}
2164+
for task_name, cstats in stats.get("consumption_statistics", {}).items()
2165+
},
2166+
}
2167+
except Exception:
2168+
pass
2169+
2170+
return {
2171+
"partitions": partitions_data,
2172+
"global_index_allocated": len(self.index_manager.allocated_indexes),
2173+
"global_index_reusable": len(self.index_manager.reusable_indexes),
2174+
}
2175+
2176+
def _push_metrics_snapshot(self) -> None:
2177+
"""Push a fresh metrics snapshot to the exporter (called from controller threads)."""
2178+
if self._metrics is None:
2179+
return
2180+
try:
2181+
snapshot = self._build_metrics_snapshot()
2182+
self._metrics.update_controller_snapshot(snapshot)
2183+
except Exception as e:
2184+
logger.debug(f"[{self.controller_id}]: Failed to push metrics snapshot: {e}")
2185+
21492186
def start_metrics(self) -> str:
21502187
"""Initialize and start the Prometheus metrics exporter.
21512188
@@ -2160,11 +2197,27 @@ def start_metrics(self) -> str:
21602197
return self._metrics_endpoint
21612198
from transfer_queue.metrics import TQMetricsExporter
21622199

2163-
self._metrics = TQMetricsExporter(self)
2200+
self._metrics = TQMetricsExporter()
21642201
self._metrics_endpoint = self._metrics.start(node_ip=self._node_ip)
2202+
# Launch a daemon thread that periodically pushes controller state
2203+
# snapshots to the exporter, keeping them process-isolated.
2204+
self._metrics_snapshot_thread = Thread(
2205+
target=self._metrics_snapshot_loop,
2206+
name="TQMetricsSnapshotThread",
2207+
daemon=True,
2208+
)
2209+
self._metrics_snapshot_thread.start()
21652210
logger.info(f"[{self.controller_id}]: Prometheus metrics exporter started on {self._metrics_endpoint}")
21662211
return self._metrics_endpoint
21672212

2213+
def _metrics_snapshot_loop(self) -> None:
2214+
"""Periodically push a metrics snapshot to the exporter."""
2215+
from transfer_queue.metrics import TQ_METRICS_COLLECT_INTERVAL
2216+
2217+
while True:
2218+
self._push_metrics_snapshot()
2219+
time.sleep(TQ_METRICS_COLLECT_INTERVAL)
2220+
21682221
def register_storage_units_for_metrics(self, storage_unit_infos: dict) -> None:
21692222
"""Register storage unit ZMQ endpoints so the metrics collector can query them.
21702223

0 commit comments

Comments
 (0)