Skip to content

Commit 2495b98

Browse files
authored
Add metrics client (#226)
* Add metrics client Why these changes are being introduced: * A metrics client class is needed to implement AWS Cloudwatch metrics How this addresses that need: * Add MetricsClient class with methods for publishing individual and batch metrics * Add METRICS_NAMESPACE and METRICS constants to config.py * Add metrics attributes to Workflow class * Add publish_metric method calls to submit_items and finalize_items methods * Update dependencies Side effects of this change: * None Relevant ticket(s): * NA * Updates based on discussion in PR #226 * Rename Config.METRICS > ALLOWED_METRICS and change from list to set * Add Metric dataclass and update MetricsClient class to use it * Update calls in Workflow class to use Metric dataclass * Update dependencies * More updates based on discussion in PR #226 * Add namespace param to Metric class and MetricsClient.__init__ * Rename publish_single_metric > publish_metric, _push_metric_data > _publish_metrics, add_metric_to_batch > add_metrics_to_batch, and publish_batch_metrics > publish_metrics_batch * Add logic to use Metric.namespace over MetricsClient.namespace * Remove validation from publish_metrics_batch * Update calls in Workflow class * Update MetricsClient's namespace to keyword arg * Updates based on Copilot review * Sort unit values list * Add multiple namespaces check and defensive chunking to _publish_metrics method * Update publish_metrics_batch method to clear only successfully published metrics * Add try/except blocks to publish_metric call in Workflow class * Update dependencies * Update .pre-commit-config.yaml * Update based on further Copilot review * Sort list before raising error containing the list * Further updates based on discussion in PR #226 * Add _publish_count_metric method and call in Workflow methods
1 parent f067eb7 commit 2495b98

7 files changed

Lines changed: 301 additions & 21 deletions

File tree

.pre-commit-config.yaml

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,28 @@
11
default_language_version:
2-
python: python3.12 # set for project python version
2+
python: python3.12
3+
default_stages:
4+
- pre-push
35
repos:
46
- repo: local
57
hooks:
6-
- id: black-apply
7-
name: black-apply
8-
entry: uv run black
8+
- id: ruff-format
9+
name: ruff-format
10+
entry: uv run ruff format --diff
911
language: system
1012
pass_filenames: true
11-
types: ["python"]
13+
types: [ "python" ]
14+
1215
- id: mypy
1316
name: mypy
1417
entry: uv run mypy
1518
language: system
1619
pass_filenames: true
17-
types: ["python"]
18-
exclude: "tests/"
19-
- id: ruff-apply
20-
name: ruff-apply
21-
entry: uv run ruff check --fix
20+
types: [ "python" ]
21+
exclude: "(tests/|output/|migrations/)"
22+
23+
- id: ruff-check
24+
name: ruff-check
25+
entry: uv run ruff check
2226
language: system
2327
pass_filenames: true
24-
types: ["python"]
25-
- id: pip-audit
26-
name: pip-audit
27-
entry: uv run pip-audit
28-
language: system
29-
pass_filenames: false
28+
types: [ "python" ]

dsc/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@
55

66
import sentry_sdk
77

8+
METRICS_NAMESPACE = "dso"
9+
10+
ALLOWED_METRICS = {
11+
"item_submitted", # item submitted to DSS
12+
"submission_error", # error during submission to DSS
13+
"ingested_item", # item ingested successfully into DSpace
14+
"ingest_error", # error during attempted item ingest into DSpace
15+
}
16+
817

918
class Config:
1019
REQUIRED_ENV_VARS: Iterable[str] = [

dsc/utils/aws/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
from dsc.utils.aws.metrics import Metric, MetricsClient
12
from dsc.utils.aws.s3 import S3Client
23
from dsc.utils.aws.ses import SESClient
34
from dsc.utils.aws.sqs import SQSClient
45

5-
__all__ = ["S3Client", "SESClient", "SQSClient"]
6+
__all__ = ["Metric", "MetricsClient", "S3Client", "SESClient", "SQSClient"]

dsc/utils/aws/metrics.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""AWS CloudWatch metrics client for workflow submission tracking."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from dataclasses import dataclass
7+
8+
import boto3
9+
10+
logger = logging.getLogger(__name__)
11+
12+
CLOUDWATCH_METRICS_LIMIT = 1000
13+
14+
UNIT_VALUES = frozenset(
15+
[
16+
"Bits",
17+
"Bits/Second",
18+
"Bytes",
19+
"Bytes/Second",
20+
"Count",
21+
"Count/Second",
22+
"Gigabits",
23+
"Gigabits/Second",
24+
"Gigabytes",
25+
"Gigabytes/Second",
26+
"Kilobits",
27+
"Kilobits/Second",
28+
"Kilobytes",
29+
"Kilobytes/Second",
30+
"Megabits",
31+
"Megabits/Second",
32+
"Megabytes",
33+
"Megabytes/Second",
34+
"Milliseconds",
35+
"Microseconds",
36+
"None",
37+
"Percent",
38+
"Seconds",
39+
"Terabits",
40+
"Terabits/Second",
41+
"Terabytes",
42+
"Terabytes/Second",
43+
]
44+
)
45+
46+
47+
@dataclass
48+
class Metric:
49+
"""A class representing a single metric to be published to CloudWatch."""
50+
51+
name: str
52+
value: int
53+
unit: str
54+
dimensions: dict[str, str] | None = None
55+
namespace: str | None = None
56+
57+
58+
class MetricsClient:
59+
"""A simple client to record metrics to AWS CloudWatch."""
60+
61+
def __init__(
62+
self, namespace: str | None = None, allowed_metrics: set[str] | None = None
63+
) -> None:
64+
"""Initialize the MetricsClient."""
65+
self.namespace = namespace
66+
self.allowed_metrics: set[str] | None = allowed_metrics
67+
self._cloudwatch = boto3.client("cloudwatch")
68+
self.batch_metrics: list[Metric] = []
69+
70+
def publish_metric(
71+
self,
72+
metric: Metric,
73+
) -> None:
74+
"""Publish a single metric to CloudWatch."""
75+
self._validate_metric(metric)
76+
self._publish_metrics([metric])
77+
78+
def _validate_metric(
79+
self,
80+
metric: Metric,
81+
) -> bool:
82+
"""Validate that a metric has required fields and allowed unit.
83+
84+
Args:
85+
metric: The Metric instance to validate.
86+
"""
87+
if not all(hasattr(metric, attr) for attr in ["name", "value", "unit"]):
88+
raise ValueError(
89+
f"Metric must have 'name', 'value', and 'unit' attributes. Invalid "
90+
f"metric: {metric}"
91+
)
92+
self._allowed_metric(metric.name)
93+
self._validate_metric_unit(metric.unit)
94+
return True
95+
96+
def _allowed_metric(self, metric_name: str) -> bool:
97+
"""Check if a metric name is in the allowed list of metrics for the application.
98+
99+
Args:
100+
metric_name: The name of the metric to check.
101+
"""
102+
if self.allowed_metrics and metric_name not in self.allowed_metrics:
103+
raise ValueError(
104+
f"Metric name '{metric_name}' is not in the allowed list of metrics: "
105+
f"{', '.join(self.allowed_metrics)}"
106+
)
107+
return True
108+
109+
def _validate_metric_unit(self, unit: str) -> bool:
110+
"""Validate that metric unit is allowed by AWS CloudWatch.
111+
112+
Args:
113+
unit: The unit to validate.
114+
115+
Raises:
116+
ValueError: If unit is not allowed by AWS CloudWatch.
117+
"""
118+
if unit not in UNIT_VALUES:
119+
allowed_units = ", ".join(sorted(UNIT_VALUES))
120+
raise ValueError(f"Invalid unit '{unit}'. Must be one of: {allowed_units}")
121+
return True
122+
123+
def _publish_metrics(self, metrics: list[Metric]) -> None:
124+
"""Publish metrics to CloudWatch.
125+
126+
Automatically chunks metrics if the list exceeds CloudWatch's limit
127+
of 1000 metrics per request.
128+
129+
Args:
130+
metrics: List of metric instances to publish.
131+
"""
132+
if not metrics:
133+
logger.info("No metrics to publish.")
134+
return
135+
136+
# Defensively chunk metrics if they exceed CloudWatch's limit
137+
if len(metrics) > CLOUDWATCH_METRICS_LIMIT:
138+
logger.info(
139+
f"Splitting {len(metrics)} metrics into chunks of "
140+
f"{CLOUDWATCH_METRICS_LIMIT} for CloudWatch compliance."
141+
)
142+
for i in range(0, len(metrics), CLOUDWATCH_METRICS_LIMIT):
143+
chunk = metrics[i : i + CLOUDWATCH_METRICS_LIMIT]
144+
self._publish_metrics(chunk)
145+
return
146+
147+
try:
148+
# Validate all metrics and ensure consistent namespace
149+
namespaces = set()
150+
metric_data = []
151+
for metric in metrics:
152+
self._validate_namespace(metric)
153+
selected_namespace = metric.namespace or self.namespace
154+
namespaces.add(selected_namespace)
155+
156+
metric_dict = {
157+
"MetricName": metric.name,
158+
"Value": metric.value,
159+
"Unit": metric.unit,
160+
}
161+
if metric.dimensions:
162+
metric_dict["Dimensions"] = [
163+
{"Name": key, "Value": value}
164+
for key, value in metric.dimensions.items()
165+
]
166+
metric_data.append(metric_dict)
167+
168+
# Ensure all metrics resolve to the same namespace
169+
if len(namespaces) > 1:
170+
raise ValueError( # noqa: TRY301
171+
f"Cannot publish metrics with different namespaces in a single "
172+
f"request. Found: {namespaces}"
173+
)
174+
175+
selected_namespace = namespaces.pop()
176+
self._cloudwatch.put_metric_data(
177+
Namespace=selected_namespace,
178+
MetricData=metric_data,
179+
)
180+
logger.info(
181+
f"Published {len(metrics)} metric(s) to CloudWatch namespace "
182+
f"'{selected_namespace}'."
183+
)
184+
except Exception:
185+
logger.exception(
186+
f"Failed to publish {len(metrics)} metric(s) to CloudWatch: "
187+
)
188+
raise
189+
190+
def _validate_namespace(self, metric: Metric) -> bool:
191+
"""Validate metric has a namespace or the client has a default namespace."""
192+
if not metric.namespace and not self.namespace:
193+
raise ValueError(
194+
f"Metric '{metric.name}' must have a namespace if no default "
195+
f"namespace is set for the MetricsClient."
196+
)
197+
return True
198+
199+
def add_metrics_to_batch(self, metrics: list[Metric]) -> None:
200+
"""Add metrics to the batch queue.
201+
202+
Args:
203+
metrics: The metrics to add to the batch.
204+
"""
205+
for metric in metrics:
206+
self._validate_metric(metric)
207+
self.batch_metrics.append(metric)
208+
209+
def publish_metrics_batch(self, batch_size: int = 20) -> None:
210+
"""Publish a batch of metrics to CloudWatch.
211+
212+
Clears the batch queue after successful publishing.
213+
214+
Args:
215+
batch_size: Number of metrics to publish in each batch. Must be less than
216+
CloudWatch's limit of 1000 metrics per request.
217+
218+
Raises:
219+
Exception: If publishing fails, metrics remain in the batch queue
220+
for retry or manual handling.
221+
"""
222+
if not self.batch_metrics:
223+
logger.info("No metrics to publish.")
224+
return
225+
226+
try:
227+
for x in range(0, len(self.batch_metrics), batch_size):
228+
batch = self.batch_metrics[x : x + batch_size]
229+
self._publish_metrics(batch)
230+
except Exception:
231+
# Keep only the unpublished metrics (starting from the failed batch)
232+
self.batch_metrics = self.batch_metrics[x:]
233+
raise
234+
235+
# Clear only if all batches published successfully
236+
self.batch_metrics.clear()

0 commit comments

Comments
 (0)