Skip to content

Commit aef771a

Browse files
committed
Add metrics client
Why these changes are being introduced: * A metrics client class is needed to implement AWS Cloudwatch metrics How this addresses that need: * Add MetricsClient class with methods for publishing individual and batch metrics * Add METRICS_NAMESPACE and METRICS constants to config.py * Add metrics attributes to Workflow class * Add publish_metric method calls to submit_items and finalize_items methods * Update dependencies Side effects of this change: * None Relevant ticket(s): * NA
1 parent cf71ca5 commit aef771a

5 files changed

Lines changed: 455 additions & 212 deletions

File tree

dsc/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@
44

55
import sentry_sdk
66

7+
METRICS_NAMESPACE = "dso"
8+
9+
METRICS = [
10+
"item_submitted", # item submitted to DSS
11+
"submission_error", # error during submission to DSS
12+
"ingested_item", # item ingested successfully into DSpace
13+
"ingest_error", # error during attempted item ingest into DSpace
14+
]
15+
716

817
class Config:
918
REQUIRED_ENV_VARS: Iterable[str] = [

dsc/utils/aws/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
from dsc.utils.aws.metrics import MetricsClient
12
from dsc.utils.aws.s3 import S3Client
23
from dsc.utils.aws.ses import SESClient
34
from dsc.utils.aws.sqs import SQSClient
45

5-
__all__ = ["S3Client", "SESClient", "SQSClient"]
6+
__all__ = ["MetricsClient", "S3Client", "SESClient", "SQSClient"]

dsc/utils/aws/metrics.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
"""AWS CloudWatch metrics client for workflow submission tracking."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
7+
import boto3
8+
9+
from dsc.config import METRICS, METRICS_NAMESPACE
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
UNIT_VALUES = frozenset(
15+
[
16+
"Seconds",
17+
"Microseconds",
18+
"Milliseconds",
19+
"Bytes",
20+
"Kilobytes",
21+
"Megabytes",
22+
"Gigabytes",
23+
"Terabytes",
24+
"Bits",
25+
"Kilobits",
26+
"Megabits",
27+
"Gigabits",
28+
"Terabits",
29+
"Percent",
30+
"Count",
31+
"Bytes/Second",
32+
"Kilobytes/Second",
33+
"Megabytes/Second",
34+
"Gigabytes/Second",
35+
"Terabytes/Second",
36+
"Bits/Second",
37+
"Kilobits/Second",
38+
"Megabits/Second",
39+
"Gigabits/Second",
40+
"Terabits/Second",
41+
"Count/Second",
42+
]
43+
)
44+
45+
46+
class MetricsClient:
47+
"""A simple client to record metrics to AWS CloudWatch."""
48+
49+
def __init__(self) -> None:
50+
"""Initialize the MetricsClient."""
51+
self.cloudwatch = boto3.client("cloudwatch")
52+
self.batch_metrics: list[dict] = []
53+
54+
def publish_single_metric(
55+
self,
56+
metric_name: str,
57+
value: int,
58+
unit: str,
59+
metric_dimensions: dict[str, str] | None = None,
60+
) -> None:
61+
"""Publish a single metric to CloudWatch.
62+
63+
Args:
64+
metric_name: The name of the metric to publish.
65+
value: The value of the metric.
66+
unit: The unit of the metric.
67+
metric_dimensions: Optional dictionary of dimension names and values.
68+
69+
Raises:
70+
ValueError: If unit is invalid.
71+
"""
72+
metric_data = self._validate_and_build_metric_data(
73+
metric_name, value, unit, metric_dimensions
74+
)
75+
self._push_metric_data([metric_data])
76+
77+
def _validate_and_build_metric_data(
78+
self,
79+
metric_name: str,
80+
value: int,
81+
unit: str,
82+
metric_dimensions: dict[str, str] | None = None,
83+
) -> dict:
84+
"""Validate and build a metric data dictionary for CloudWatch.
85+
86+
Args:
87+
metric_name: The name of the metric.
88+
value: The value of the metric.
89+
unit: The unit of the metric.
90+
metric_dimensions: Optional dictionary of dimension names and values.
91+
92+
Returns:
93+
A metric data dictionary formatted for CloudWatch.
94+
"""
95+
self._approved_metric(metric_name)
96+
self._validate_unit(unit)
97+
dimensions = [
98+
{"Name": name, "Value": dim_value}
99+
for name, dim_value in (
100+
metric_dimensions.items() if metric_dimensions else []
101+
)
102+
]
103+
return {
104+
"MetricName": metric_name,
105+
"Value": value,
106+
"Unit": unit,
107+
"Dimensions": dimensions,
108+
}
109+
110+
def _approved_metric(self, metric_name: str) -> bool:
111+
"""Check if a metric name is in the approved list of metrics for the application.
112+
113+
Args:
114+
metric_name: The name of the metric to check.
115+
"""
116+
if metric_name not in METRICS:
117+
raise ValueError(
118+
f"Metric name '{metric_name}' is not in the approved list of metrics: "
119+
f"{', '.join(METRICS)}"
120+
)
121+
return True
122+
123+
def _validate_unit(self, unit: str) -> None:
124+
"""Validate that metric unit is allowed by AWS CloudWatch.
125+
126+
Args:
127+
unit: The unit to validate.
128+
129+
Raises:
130+
ValueError: If unit is not allowed by AWS CloudWatch.
131+
"""
132+
if unit not in UNIT_VALUES:
133+
raise ValueError(
134+
f"Invalid unit '{unit}'. Must be one of: {', '.join(UNIT_VALUES)}"
135+
)
136+
137+
def _push_metric_data(self, metric_data: list[dict]) -> None:
138+
"""Push metric data to CloudWatch.
139+
140+
Args:
141+
metric_data: List of metric dictionaries to push.
142+
"""
143+
try:
144+
self.cloudwatch.put_metric_data(
145+
Namespace=METRICS_NAMESPACE, MetricData=metric_data
146+
)
147+
logger.info(f"Published metric with {metric_data} to CloudWatch.")
148+
except Exception:
149+
logger.exception(
150+
f"Failed to publish metric with {metric_data} to CloudWatch."
151+
)
152+
153+
def add_metric_to_batch(
154+
self,
155+
metric_name: str,
156+
value: int,
157+
unit: str,
158+
metric_dimensions: dict[str, str] | None = None,
159+
) -> None:
160+
"""Add a metric to the batch for later publishing.
161+
162+
Args:
163+
metric_name: The name of the metric.
164+
value: The value of the metric.
165+
unit: The unit of the metric.
166+
metric_dimensions: Optional dictionary of dimension names and values.
167+
168+
Raises:
169+
ValueError: If unit is invalid.
170+
"""
171+
metric_data = self._validate_and_build_metric_data(
172+
metric_name, value, unit, metric_dimensions
173+
)
174+
self.batch_metrics.append(metric_data)
175+
176+
def publish_batch_metrics(self, batch_size: int = 20) -> None:
177+
"""Publish all accumulated batch metrics to CloudWatch.
178+
179+
Raises:
180+
ValueError: If any metric has an invalid unit or missing required fields.
181+
"""
182+
if not self.batch_metrics:
183+
logger.info("No metrics to publish.")
184+
return
185+
186+
# Validate all metrics before publishing
187+
for metric in self.batch_metrics:
188+
if not all(key in metric for key in ["MetricName", "Value", "Unit"]):
189+
raise ValueError(
190+
f"Each metric must contain 'MetricName', 'Value', and 'Unit'. "
191+
f"Invalid metric: {metric}"
192+
)
193+
self._approved_metric(metric["MetricName"])
194+
self._validate_unit(metric["Unit"])
195+
196+
for x in range(0, len(self.batch_metrics), batch_size):
197+
self._push_metric_data(self.batch_metrics[x : x + batch_size])
198+
self.batch_metrics.clear()

dsc/workflows/base/workflow.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020
from dsc.item_submission import ItemSubmission
2121
from dsc.reports import Report
22-
from dsc.utils.aws import SESClient, SQSClient
22+
from dsc.utils.aws import MetricsClient, SESClient, SQSClient
2323
from dsc.utils.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY
2424

2525
if TYPE_CHECKING: # pragma: no cover
@@ -125,6 +125,11 @@ def __init__(self, batch_id: str) -> None:
125125
"skipped": 0,
126126
"errors": 0,
127127
}
128+
self.metrics_client = MetricsClient()
129+
self.metrics_dimensions = {
130+
"application": "dsc",
131+
"workflow_name": self.workflow_name,
132+
}
128133

129134
# cache list of bitstreams
130135
self._batch_bitstream_uris: list[str] | None = None
@@ -323,13 +328,27 @@ def submit_items(self, collection_handle: str) -> list:
323328
item_submission.status_details = None
324329
item_submission.submit_attempts += 1
325330
item_submission.upsert_db()
331+
332+
self.metrics_client.publish_single_metric(
333+
metric_name="item_submitted",
334+
value=1,
335+
unit="Count",
336+
metric_dimensions=self.metrics_dimensions,
337+
)
326338
except Exception as exception: # noqa: BLE001
327339
self.submission_summary["errors"] += 1
328340
item_submission.status = ItemSubmissionStatus.SUBMIT_FAILED
329341
item_submission.status_details = str(exception)
330342
item_submission.submit_attempts += 1
331343
item_submission.upsert_db()
332344

345+
self.metrics_client.publish_single_metric(
346+
metric_name="submission_error",
347+
value=1,
348+
unit="Count",
349+
metric_dimensions=self.metrics_dimensions,
350+
)
351+
333352
logger.info(
334353
f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' "
335354
f"for batch '{self.batch_id}': {json.dumps(self.submission_summary)}"
@@ -399,11 +418,26 @@ def finalize_items(self) -> None:
399418
item_submission.dspace_handle = result_message.dspace_handle
400419
sqs_results_summary["ingest_success"] += 1
401420
logger.debug(f"Record {log_str} was ingested")
421+
422+
self.metrics_client.publish_single_metric(
423+
metric_name="ingested_item",
424+
value=1,
425+
unit="Count",
426+
metric_dimensions=self.metrics_dimensions,
427+
)
402428
elif result_message.result_type == "error":
403429
item_submission.status = ItemSubmissionStatus.INGEST_FAILED
404430
item_submission.status_details = result_message.error_info
405431
sqs_results_summary["ingest_failed"] += 1
406432
logger.debug(f"Record {log_str} failed to ingest")
433+
434+
self.metrics_client.publish_single_metric(
435+
metric_name="ingest_error",
436+
value=1,
437+
unit="Count",
438+
metric_dimensions=self.metrics_dimensions,
439+
)
440+
407441
else:
408442
item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN
409443
sqs_results_summary["ingest_unknown"] += 1

0 commit comments

Comments
 (0)