Skip to content

Commit be61454

Browse files
RobotGFclaude
authored andcommitted
[fix] Address PR #83 review feedback for metrics exporter
- Use process RSS memory instead of manual tensor size estimation - Remove redundant _op_counts/_sample_counts (IntervalPerfMonitor already tracks) - Remove TQ_METRICS_ENABLED env var override; metrics controlled by config only - Trim non-essential metrics (partition_fields, allocated_samples, storage_fields, data_memory, op_total, samples_total) to keep only essential ones - Fix thread-safety: iterate over dict snapshots in collect methods - Fix lifecycle docstring (start_metrics, not __init__) - Use storage unit's self-reported ID as canonical Prometheus label - Fix test skip reason to reflect actual dependencies - Remove dashboard panels referencing deleted metrics Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6867804 commit be61454

6 files changed

Lines changed: 19 additions & 268 deletions

File tree

scripts/grafana_dashboard.json

Lines changed: 0 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -209,23 +209,6 @@
209209
],
210210
"type": "timeseries"
211211
},
212-
{
213-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
214-
"fieldConfig": {
215-
"defaults": {
216-
"color": { "mode": "palette-classic" },
217-
"custom": { "fillOpacity": 10, "lineWidth": 2 }
218-
}
219-
},
220-
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 23 },
221-
"id": 21,
222-
"options": { "legend": { "calcs": ["lastNotNull"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
223-
"title": "Allocated Sample Slots per Partition",
224-
"targets": [
225-
{ "expr": "tq_partition_allocated_samples", "legendFormat": "{{ partition_id }}" }
226-
],
227-
"type": "timeseries"
228-
},
229212
{
230213
"datasource": { "type": "prometheus", "uid": "${datasource}" },
231214
"fieldConfig": {
@@ -349,152 +332,6 @@
349332
{ "expr": "tq_storage_memory_rss_bytes", "legendFormat": "{{ storage_unit_id }}" }
350333
],
351334
"type": "timeseries"
352-
},
353-
{
354-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
355-
"fieldConfig": {
356-
"defaults": {
357-
"color": { "mode": "palette-classic" },
358-
"custom": { "fillOpacity": 10, "lineWidth": 2 },
359-
"unit": "bytes"
360-
}
361-
},
362-
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 54 },
363-
"id": 34,
364-
"options": { "legend": { "calcs": ["lastNotNull"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
365-
"title": "Storage Data Memory (Estimated)",
366-
"targets": [
367-
{ "expr": "tq_storage_data_memory_bytes", "legendFormat": "{{ storage_unit_id }}" }
368-
],
369-
"type": "timeseries"
370-
},
371-
{
372-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
373-
"fieldConfig": {
374-
"defaults": {
375-
"color": { "mode": "palette-classic" },
376-
"custom": { "fillOpacity": 20, "lineWidth": 2, "stacking": { "mode": "none" } },
377-
"unit": "ops"
378-
}
379-
},
380-
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 62 },
381-
"id": 35,
382-
"options": { "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
383-
"title": "Storage Put/Get/Clear Rate (per second)",
384-
"targets": [
385-
{ "expr": "rate(tq_storage_op_total{op_type=\"PUT_DATA\"}[$__rate_interval])", "legendFormat": "PUT {{ storage_unit_id }}" },
386-
{ "expr": "rate(tq_storage_op_total{op_type=\"GET_DATA\"}[$__rate_interval])", "legendFormat": "GET {{ storage_unit_id }}" },
387-
{ "expr": "rate(tq_storage_op_total{op_type=\"CLEAR_DATA\"}[$__rate_interval])", "legendFormat": "CLEAR {{ storage_unit_id }}" }
388-
],
389-
"type": "timeseries"
390-
},
391-
{
392-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
393-
"fieldConfig": {
394-
"defaults": {
395-
"color": { "mode": "palette-classic" },
396-
"custom": { "fillOpacity": 10, "lineWidth": 2 }
397-
}
398-
},
399-
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 62 },
400-
"id": 36,
401-
"options": { "legend": { "calcs": ["lastNotNull"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
402-
"title": "Storage Cumulative Operations",
403-
"targets": [
404-
{ "expr": "tq_storage_op_total", "legendFormat": "{{ op_type }} {{ storage_unit_id }}" }
405-
],
406-
"type": "timeseries"
407-
},
408-
{
409-
"collapsed": false,
410-
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 70 },
411-
"id": 104,
412-
"title": "Data Lifecycle (Leak Detection)",
413-
"type": "row"
414-
},
415-
{
416-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
417-
"fieldConfig": {
418-
"defaults": {
419-
"color": { "mode": "palette-classic" },
420-
"custom": { "fillOpacity": 10, "lineWidth": 2 },
421-
"unit": "short"
422-
}
423-
},
424-
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 71 },
425-
"id": 40,
426-
"options": { "legend": { "calcs": ["mean", "max"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
427-
"title": "Sample Put / Get / Clear Rate (per second)",
428-
"targets": [
429-
{ "expr": "sum(rate(tq_storage_samples_total{op_type=\"PUT_DATA\"}[$__rate_interval]))", "legendFormat": "put" },
430-
{ "expr": "sum(rate(tq_storage_samples_total{op_type=\"GET_DATA\"}[$__rate_interval]))", "legendFormat": "get" },
431-
{ "expr": "sum(rate(tq_storage_samples_total{op_type=\"CLEAR_DATA\"}[$__rate_interval]))", "legendFormat": "clear" }
432-
],
433-
"type": "timeseries"
434-
},
435-
{
436-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
437-
"description": "put - cleared across all storage units. A steadily rising line indicates samples are being written but never cleared — potential leak. Should stay bounded in healthy RL training.",
438-
"fieldConfig": {
439-
"defaults": {
440-
"color": { "mode": "thresholds" },
441-
"thresholds": { "steps": [
442-
{ "color": "green", "value": null },
443-
{ "color": "yellow", "value": 5000 },
444-
{ "color": "red", "value": 10000 }
445-
]},
446-
"custom": { "fillOpacity": 15, "lineWidth": 2 },
447-
"unit": "short"
448-
}
449-
},
450-
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 71 },
451-
"id": 41,
452-
"options": { "legend": { "calcs": ["lastNotNull"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
453-
"title": "Samples In-Flight (put - cleared) [Leak Indicator]",
454-
"targets": [
455-
{ "expr": "sum(tq_storage_samples_total{op_type=\"PUT_DATA\"}) - sum(tq_storage_samples_total{op_type=\"CLEAR_DATA\"}) or vector(0)", "legendFormat": "in-flight samples" }
456-
],
457-
"type": "timeseries"
458-
},
459-
{
460-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
461-
"fieldConfig": {
462-
"defaults": {
463-
"color": { "mode": "palette-classic" },
464-
"custom": { "fillOpacity": 10, "lineWidth": 2 },
465-
"unit": "short"
466-
}
467-
},
468-
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 79 },
469-
"id": 42,
470-
"options": { "legend": { "calcs": ["lastNotNull"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
471-
"title": "Cumulative Samples Put vs Cleared (per Storage Unit)",
472-
"targets": [
473-
{ "expr": "tq_storage_samples_total{op_type=\"PUT_DATA\"}", "legendFormat": "put {{ storage_unit_id }}" },
474-
{ "expr": "tq_storage_samples_total{op_type=\"CLEAR_DATA\"}", "legendFormat": "cleared {{ storage_unit_id }}" }
475-
],
476-
"type": "timeseries"
477-
},
478-
{
479-
"datasource": { "type": "prometheus", "uid": "${datasource}" },
480-
"description": "Ratio of cleared samples to put samples. Healthy value approaches 1.0 over time. A value stuck well below 1.0 means samples accumulate without being reclaimed.",
481-
"fieldConfig": {
482-
"defaults": {
483-
"color": { "mode": "continuous-GrYlRd" },
484-
"custom": { "fillOpacity": 15, "lineWidth": 2 },
485-
"unit": "percentunit",
486-
"min": 0,
487-
"max": 1
488-
}
489-
},
490-
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 79 },
491-
"id": 43,
492-
"options": { "legend": { "calcs": ["lastNotNull"], "displayMode": "table", "placement": "bottom" }, "tooltip": { "mode": "multi" } },
493-
"title": "Clear / Put Ratio (per Storage Unit) [Reclaim Health]",
494-
"targets": [
495-
{ "expr": "tq_storage_samples_total{op_type=\"CLEAR_DATA\"} / on(storage_unit_id) clamp_min(tq_storage_samples_total{op_type=\"PUT_DATA\"}, 1)", "legendFormat": "{{ storage_unit_id }}" }
496-
],
497-
"type": "timeseries"
498335
}
499336
],
500337
"refresh": "10s",

tests/test_metrics.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
except (ImportError, OSError):
2929
_HAS_DEPS = False
3030

31-
pytestmark = pytest.mark.skipif(not _HAS_DEPS, reason="torch / CUDA dependencies unavailable")
31+
pytestmark = pytest.mark.skipif(not _HAS_DEPS, reason="prometheus_client / psutil / pyzmq dependencies unavailable")
3232

3333

3434
# ---------------------------------------------------------------------------
@@ -98,8 +98,6 @@ def test_all_metrics_are_registered(self):
9898
"tq_controller_memory_rss_bytes",
9999
"tq_partitions_total",
100100
"tq_partition_samples_total",
101-
"tq_partition_fields_total",
102-
"tq_partition_allocated_samples",
103101
"tq_partition_production_progress",
104102
"tq_partition_consumption_progress",
105103
"tq_global_index_allocated_total",
@@ -110,9 +108,7 @@ def test_all_metrics_are_registered(self):
110108
"tq_storage_capacity_total",
111109
"tq_storage_active_keys_total",
112110
"tq_storage_utilization_ratio",
113-
"tq_storage_fields_total",
114111
"tq_storage_memory_rss_bytes",
115-
"tq_storage_data_memory_bytes",
116112
]
117113

118114
registered = {m.name for m in exporter.registry.collect()}
@@ -250,9 +246,7 @@ def test_storage_metrics_populated_on_success(self):
250246
"storage_unit_id": "SU_001",
251247
"capacity": 1000,
252248
"active_keys": 250,
253-
"fields_count": 3,
254249
"process_rss_bytes": 512 * 1024 * 1024,
255-
"data_memory_bytes": 256 * 1024 * 1024,
256250
}
257251
)
258252

@@ -261,9 +255,7 @@ def test_storage_metrics_populated_on_success(self):
261255
assert exporter.storage_capacity.labels(storage_unit_id="SU_001")._value.get() == 1000
262256
assert exporter.storage_active_keys.labels(storage_unit_id="SU_001")._value.get() == 250
263257
assert exporter.storage_utilization.labels(storage_unit_id="SU_001")._value.get() == 0.25
264-
assert exporter.storage_fields.labels(storage_unit_id="SU_001")._value.get() == 3
265258
assert exporter.storage_memory_rss.labels(storage_unit_id="SU_001")._value.get() == 512 * 1024 * 1024
266-
assert exporter.storage_data_memory.labels(storage_unit_id="SU_001")._value.get() == 256 * 1024 * 1024
267259

268260
def test_storage_metrics_handles_query_failure(self):
269261
"""If a storage unit query fails, other units should still be collected."""
@@ -289,7 +281,6 @@ def mock_query(su_info, su_id):
289281
"active_keys": 100,
290282
"fields_count": 2,
291283
"process_rss_bytes": 100 * 1024 * 1024,
292-
"data_memory_bytes": 50 * 1024 * 1024,
293284
}
294285

295286
exporter._query_storage_unit = mock_query

transfer_queue/config.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# and use transfer_queue.init(conf) to overwrite the config entries.
33

44
# Prometheus metrics exporter.
5-
# Can also be enabled via the TQ_METRICS_ENABLED=true environment variable.
65
metrics:
76
enabled: false
87

transfer_queue/interface.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -431,9 +431,6 @@ def init(conf: Optional[DictConfig] = None) -> Optional[DictConfig]:
431431

432432
# start Prometheus metrics exporter if enabled
433433
metrics_enabled = final_conf.get("metrics", {}).get("enabled", False)
434-
if not metrics_enabled:
435-
# Also check environment variable as a convenience override
436-
metrics_enabled = os.environ.get("TQ_METRICS_ENABLED", "false").lower() == "true"
437434
if metrics_enabled:
438435
metrics_endpoint = ray.get(_TRANSFER_QUEUE_CONTROLLER.start_metrics.remote())
439436
final_conf.metrics.endpoint = metrics_endpoint

transfer_queue/metrics.py

Lines changed: 16 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class TQMetricsExporter:
5757
SimpleStorageUnit instances (via ZMQ ``GET_METRICS`` requests).
5858
5959
Lifecycle:
60-
1. Created by ``TransferQueueController.__init__`` when metrics are enabled.
60+
1. Created by ``TransferQueueController.start_metrics()`` when metrics are enabled.
6161
2. ``start()`` launches the HTTP server and a background collection thread.
6262
3. The collection thread calls ``collect_controller_metrics`` and
6363
``collect_storage_metrics`` every ``TQ_METRICS_COLLECT_INTERVAL`` seconds.
@@ -95,15 +95,6 @@ def _define_metrics(self) -> None:
9595
self.partition_samples = Gauge(
9696
"tq_partition_samples_total", "Number of active samples in a partition", ["partition_id"], registry=r
9797
)
98-
self.partition_fields = Gauge(
99-
"tq_partition_fields_total", "Number of fields in a partition", ["partition_id"], registry=r
100-
)
101-
self.partition_allocated_samples = Gauge(
102-
"tq_partition_allocated_samples",
103-
"Number of allocated sample slots in a partition",
104-
["partition_id"],
105-
registry=r,
106-
)
10798
self.partition_production_progress = Gauge(
10899
"tq_partition_production_progress",
109100
"Production progress ratio (0.0-1.0)",
@@ -159,30 +150,9 @@ def _define_metrics(self) -> None:
159150
["storage_unit_id"],
160151
registry=r,
161152
)
162-
self.storage_fields = Gauge(
163-
"tq_storage_fields_total", "Number of fields in storage unit", ["storage_unit_id"], registry=r
164-
)
165153
self.storage_memory_rss = Gauge(
166154
"tq_storage_memory_rss_bytes", "Storage unit process RSS memory", ["storage_unit_id"], registry=r
167155
)
168-
self.storage_data_memory = Gauge(
169-
"tq_storage_data_memory_bytes",
170-
"Estimated data memory in storage unit",
171-
["storage_unit_id"],
172-
registry=r,
173-
)
174-
self.storage_op_total = Gauge(
175-
"tq_storage_op_total",
176-
"Cumulative operation count on storage unit",
177-
["storage_unit_id", "op_type"],
178-
registry=r,
179-
)
180-
self.storage_samples_total = Gauge(
181-
"tq_storage_samples_total",
182-
"Cumulative number of samples processed by storage unit",
183-
["storage_unit_id", "op_type"],
184-
registry=r,
185-
)
186156

187157
@contextmanager
188158
def measure(self, op_type: str):
@@ -220,16 +190,16 @@ def collect_controller_metrics(self) -> None:
220190
except Exception:
221191
pass
222192

223-
# Partition-level
224-
current_pids = set(ctrl.partitions.keys())
193+
# Partition-level — iterate over a snapshot to avoid RuntimeError
194+
# if partitions dict is mutated concurrently.
195+
partitions_snapshot = list(ctrl.partitions.items())
196+
current_pids = {pid for pid, _ in partitions_snapshot}
225197
current_consumption_labels: set[tuple[str, str]] = set()
226198
self.partitions_total.set(len(current_pids))
227199

228-
for pid, partition in ctrl.partitions.items():
200+
for pid, partition in partitions_snapshot:
229201
stats = partition.get_statistics()
230202
self.partition_samples.labels(partition_id=pid).set(stats["total_samples_num"])
231-
self.partition_fields.labels(partition_id=pid).set(stats["total_fields_num"])
232-
self.partition_allocated_samples.labels(partition_id=pid).set(stats["allocated_samples_num"])
233203
self.partition_production_progress.labels(partition_id=pid).set(stats.get("production_progress", 0))
234204

235205
for task_name, cstats in stats.get("consumption_statistics", {}).items():
@@ -242,8 +212,6 @@ def collect_controller_metrics(self) -> None:
242212
for stale_pid in self._known_partition_ids - current_pids:
243213
for metric in (
244214
self.partition_samples,
245-
self.partition_fields,
246-
self.partition_allocated_samples,
247215
self.partition_production_progress,
248216
):
249217
try:
@@ -267,25 +235,24 @@ def collect_storage_metrics(self) -> None:
267235
if not self._storage_unit_infos:
268236
return
269237

270-
for su_id, su_info in self._storage_unit_infos.items():
238+
# Iterate over a snapshot to avoid RuntimeError from concurrent mutation.
239+
storage_snapshot = list(self._storage_unit_infos.items())
240+
for su_id, su_info in storage_snapshot:
271241
try:
272242
metrics = self._query_storage_unit(su_info, su_id)
273243
if metrics is None:
274244
continue
245+
# Use the storage unit's own ID from the response as the
246+
# canonical label to keep dashboard labels consistent with logs.
247+
label = metrics.get("storage_unit_id", su_id)
275248
capacity = metrics.get("capacity", 0)
276249
active = metrics.get("active_keys", 0)
277-
self.storage_capacity.labels(storage_unit_id=su_id).set(capacity)
278-
self.storage_active_keys.labels(storage_unit_id=su_id).set(active)
279-
self.storage_utilization.labels(storage_unit_id=su_id).set(
250+
self.storage_capacity.labels(storage_unit_id=label).set(capacity)
251+
self.storage_active_keys.labels(storage_unit_id=label).set(active)
252+
self.storage_utilization.labels(storage_unit_id=label).set(
280253
active / capacity if capacity > 0 else 0.0
281254
)
282-
self.storage_fields.labels(storage_unit_id=su_id).set(metrics.get("fields_count", 0))
283-
self.storage_memory_rss.labels(storage_unit_id=su_id).set(metrics.get("process_rss_bytes", 0))
284-
self.storage_data_memory.labels(storage_unit_id=su_id).set(metrics.get("data_memory_bytes", 0))
285-
for op_type, count in metrics.get("op_counts", {}).items():
286-
self.storage_op_total.labels(storage_unit_id=su_id, op_type=op_type).set(count)
287-
for op_type, count in metrics.get("sample_counts", {}).items():
288-
self.storage_samples_total.labels(storage_unit_id=su_id, op_type=op_type).set(count)
255+
self.storage_memory_rss.labels(storage_unit_id=label).set(metrics.get("process_rss_bytes", 0))
289256
except Exception as e:
290257
logger.warning(f"Failed to collect metrics from storage unit {su_id}: {e}")
291258

0 commit comments

Comments
 (0)