Skip to content

Commit 188e8a4

Browse files
committed
Updates based on discussion in PR #226
* Rename Config.METRICS > ALLOWED_METRICS and change from list to set * Add Metric dataclass and update MetricsClient class to use it * Update calls in Workflow class to use Metric dataclass * Update dependencies
1 parent 2c690db commit 188e8a4

4 files changed

Lines changed: 121 additions & 121 deletions

File tree

dsc/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
METRICS_NAMESPACE = "dso"
99

10-
METRICS = [
10+
ALLOWED_METRICS = {
1111
"item_submitted", # item submitted to DSS
1212
"submission_error", # error during submission to DSS
1313
"ingested_item", # item ingested successfully into DSpace
1414
"ingest_error", # error during attempted item ingest into DSpace
15-
]
15+
}
1616

1717

1818
class Config:

dsc/utils/aws/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from dsc.utils.aws.metrics import MetricsClient
1+
from dsc.utils.aws.metrics import Metric, MetricsClient
22
from dsc.utils.aws.s3 import S3Client
33
from dsc.utils.aws.ses import SESClient
44
from dsc.utils.aws.sqs import SQSClient
55

6-
__all__ = ["MetricsClient", "S3Client", "SESClient", "SQSClient"]
6+
__all__ = ["Metric", "MetricsClient", "S3Client", "SESClient", "SQSClient"]

dsc/utils/aws/metrics.py

Lines changed: 90 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
from __future__ import annotations
44

55
import logging
6+
from dataclasses import dataclass
67

78
import boto3
89

9-
from dsc.config import METRICS, METRICS_NAMESPACE
10+
from dsc.config import METRICS_NAMESPACE
1011

1112
logger = logging.getLogger(__name__)
1213

@@ -43,84 +44,66 @@
4344
)
4445

4546

47+
@dataclass
48+
class Metric:
49+
"""A class representing a single metric to be published to CloudWatch."""
50+
51+
name: str
52+
value: int
53+
unit: str
54+
dimensions: dict[str, str] | None = None
55+
56+
4657
class MetricsClient:
4758
"""A simple client to record metrics to AWS CloudWatch."""
4859

49-
def __init__(self) -> None:
60+
def __init__(self, allowed_metrics: set[str] | None = None) -> None:
5061
"""Initialize the MetricsClient."""
51-
self.cloudwatch = boto3.client("cloudwatch")
52-
self.batch_metrics: list[dict] = []
62+
self.namespace = METRICS_NAMESPACE
63+
self.allowed_metrics: set[str] | None = allowed_metrics
64+
self._cloudwatch = boto3.client("cloudwatch")
65+
self.batch_metrics: list[Metric] = []
5366

5467
def publish_single_metric(
5568
self,
56-
metric_name: str,
57-
value: int,
58-
unit: str,
59-
metric_dimensions: dict[str, str] | None = None,
69+
metric: Metric,
6070
) -> None:
61-
"""Publish a single metric to CloudWatch.
62-
63-
Args:
64-
metric_name: The name of the metric to publish.
65-
value: The value of the metric.
66-
unit: The unit of the metric.
67-
metric_dimensions: Optional dictionary of dimension names and values.
68-
69-
Raises:
70-
ValueError: If unit is invalid.
71-
"""
72-
metric_data = self._validate_and_build_metric_data(
73-
metric_name, value, unit, metric_dimensions
74-
)
75-
self._push_metric_data([metric_data])
71+
"""Publish a single metric to CloudWatch."""
72+
self._validate_metric(metric)
73+
self._push_metric_data([metric])
7674

77-
def _validate_and_build_metric_data(
75+
def _validate_metric(
7876
self,
79-
metric_name: str,
80-
value: int,
81-
unit: str,
82-
metric_dimensions: dict[str, str] | None = None,
83-
) -> dict:
84-
"""Validate and build a metric data dictionary for CloudWatch.
77+
metric: Metric,
78+
) -> bool:
79+
"""Validate that a metric has required fields and allowed unit.
8580
8681
Args:
87-
metric_name: The name of the metric.
88-
value: The value of the metric.
89-
unit: The unit of the metric.
90-
metric_dimensions: Optional dictionary of dimension names and values.
91-
92-
Returns:
93-
A metric data dictionary formatted for CloudWatch.
82+
metric: The Metric instance to validate.
9483
"""
95-
self._approved_metric(metric_name)
96-
self._validate_unit(unit)
97-
dimensions = [
98-
{"Name": name, "Value": dim_value}
99-
for name, dim_value in (
100-
metric_dimensions.items() if metric_dimensions else []
84+
if not all(hasattr(metric, attr) for attr in ["name", "value", "unit"]):
85+
raise ValueError(
86+
f"Metric must have 'name', 'value', and 'unit' attributes. Invalid "
87+
f"metric: {metric}"
10188
)
102-
]
103-
return {
104-
"MetricName": metric_name,
105-
"Value": value,
106-
"Unit": unit,
107-
"Dimensions": dimensions,
108-
}
89+
self._allowed_metric(metric.name)
90+
self._validate_metric_unit(metric.unit)
91+
return True
10992

110-
def _approved_metric(self, metric_name: str) -> bool:
111-
"""Check if a metric name is in the approved list of metrics for the application.
93+
def _allowed_metric(self, metric_name: str) -> bool:
94+
"""Check if a metric name is in the allowed list of metrics for the application.
11295
11396
Args:
11497
metric_name: The name of the metric to check.
11598
"""
116-
if metric_name not in METRICS:
99+
if self.allowed_metrics and metric_name not in self.allowed_metrics:
117100
raise ValueError(
118-
f"Metric name '{metric_name}' is not in the approved list of metrics: "
119-
f"{', '.join(METRICS)}"
101+
f"Metric name '{metric_name}' is not in the allowed list of metrics: "
102+
f"{', '.join(self.allowed_metrics)}"
120103
)
121104
return True
122105

123-
def _validate_unit(self, unit: str) -> None:
106+
def _validate_metric_unit(self, unit: str) -> bool:
124107
"""Validate that metric unit is allowed by AWS CloudWatch.
125108
126109
Args:
@@ -133,66 +116,75 @@ def _validate_unit(self, unit: str) -> None:
133116
raise ValueError(
134117
f"Invalid unit '{unit}'. Must be one of: {', '.join(UNIT_VALUES)}"
135118
)
119+
return True
136120

137-
def _push_metric_data(self, metric_data: list[dict]) -> None:
138-
"""Push metric data to CloudWatch.
121+
def _push_metric_data(self, metrics: list[Metric]) -> None:
122+
"""Push metrics to CloudWatch.
139123
140124
Args:
141-
metric_data: List of metric dictionaries to push.
125+
metrics: List of metric instances to push.
142126
"""
127+
if not metrics:
128+
logger.info("No metrics to publish.")
129+
return
130+
143131
try:
144-
self.cloudwatch.put_metric_data(
145-
Namespace=METRICS_NAMESPACE, MetricData=metric_data
132+
metric_data = []
133+
for metric in metrics:
134+
metric_dict = {
135+
"MetricName": metric.name,
136+
"Value": metric.value,
137+
"Unit": metric.unit,
138+
}
139+
if metric.dimensions:
140+
metric_dict["Dimensions"] = [
141+
{"Name": key, "Value": value}
142+
for key, value in metric.dimensions.items()
143+
]
144+
metric_data.append(metric_dict)
145+
146+
self._cloudwatch.put_metric_data(
147+
Namespace=self.namespace,
148+
MetricData=metric_data,
149+
)
150+
logger.info(
151+
f"Published {len(metrics)} metric(s) to CloudWatch namespace "
152+
f"'{self.namespace}'."
146153
)
147-
logger.info(f"Published metric with {metric_data} to CloudWatch.")
148154
except Exception:
149155
logger.exception(
150-
f"Failed to publish metric with {metric_data} to CloudWatch."
156+
f"Failed to publish {len(metrics)} metric(s) to CloudWatch: "
151157
)
158+
raise
152159

153-
def add_metric_to_batch(
154-
self,
155-
metric_name: str,
156-
value: int,
157-
unit: str,
158-
metric_dimensions: dict[str, str] | None = None,
159-
) -> None:
160-
"""Add a metric to the batch for later publishing.
160+
def add_metric_to_batch(self, metric: Metric) -> None:
161+
"""Add a metric to the batch queue.
161162
162163
Args:
163-
metric_name: The name of the metric.
164-
value: The value of the metric.
165-
unit: The unit of the metric.
166-
metric_dimensions: Optional dictionary of dimension names and values.
167-
168-
Raises:
169-
ValueError: If unit is invalid.
164+
metric: The metric to add to the batch.
170165
"""
171-
metric_data = self._validate_and_build_metric_data(
172-
metric_name, value, unit, metric_dimensions
173-
)
174-
self.batch_metrics.append(metric_data)
166+
self._validate_metric(metric)
167+
self.batch_metrics.append(metric)
175168

176169
def publish_batch_metrics(self, batch_size: int = 20) -> None:
177-
"""Publish all accumulated batch metrics to CloudWatch.
170+
"""Publish a batch of metrics to CloudWatch.
178171
179-
Raises:
180-
ValueError: If any metric has an invalid unit or missing required fields.
172+
Clears the batch queue after publishing.
173+
174+
Args:
175+
batch_size: Number of metrics to publish in each batch.
181176
"""
182177
if not self.batch_metrics:
183178
logger.info("No metrics to publish.")
184179
return
185180

186-
# Validate all metrics before publishing
187-
for metric in self.batch_metrics:
188-
if not all(key in metric for key in ["MetricName", "Value", "Unit"]):
189-
raise ValueError(
190-
f"Each metric must contain 'MetricName', 'Value', and 'Unit'. "
191-
f"Invalid metric: {metric}"
192-
)
193-
self._approved_metric(metric["MetricName"])
194-
self._validate_unit(metric["Unit"])
195-
196-
for x in range(0, len(self.batch_metrics), batch_size):
197-
self._push_metric_data(self.batch_metrics[x : x + batch_size])
198-
self.batch_metrics.clear()
181+
try:
182+
# Re-validate all metrics before publishing to CloudWatch
183+
for metric in self.batch_metrics:
184+
self._validate_metric(metric)
185+
186+
for x in range(0, len(self.batch_metrics), batch_size):
187+
batch = self.batch_metrics[x : x + batch_size]
188+
self._push_metric_data(batch)
189+
finally:
190+
self.batch_metrics.clear()

dsc/workflows/base/workflow.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import jsonschema
1111
import jsonschema.exceptions
1212

13-
from dsc.config import Config
13+
from dsc.config import ALLOWED_METRICS, Config
1414
from dsc.db.models import ItemSubmissionStatus
1515
from dsc.exceptions import (
1616
BatchCreationFailedError,
@@ -19,7 +19,7 @@
1919
)
2020
from dsc.item_submission import ItemSubmission
2121
from dsc.reports import CreateReport, FinalizeReport, SubmitReport
22-
from dsc.utils.aws import MetricsClient, SESClient, SQSClient
22+
from dsc.utils.aws import Metric, MetricsClient, SESClient, SQSClient
2323
from dsc.utils.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY
2424

2525
if TYPE_CHECKING: # pragma: no cover
@@ -130,7 +130,7 @@ def __init__(self, batch_id: str) -> None:
130130
"skipped": 0,
131131
"errors": 0,
132132
}
133-
self.metrics_client = MetricsClient()
133+
self.metrics_client = MetricsClient(allowed_metrics=ALLOWED_METRICS)
134134
self.metrics_dimensions = {
135135
"application": "dsc",
136136
"workflow_name": self.workflow_name,
@@ -334,10 +334,12 @@ def submit_items(self, collection_handle: str | None = None) -> list:
334334
item_submission.submit_attempts += 1
335335
item_submission.upsert_db()
336336
self.metrics_client.publish_single_metric(
337-
metric_name="item_submitted",
338-
value=1,
339-
unit="Count",
340-
metric_dimensions=self.metrics_dimensions,
337+
Metric(
338+
name="item_submitted",
339+
value=1,
340+
unit="Count",
341+
dimensions=self.metrics_dimensions,
342+
)
341343
)
342344
except NotImplementedError:
343345
raise
@@ -349,10 +351,12 @@ def submit_items(self, collection_handle: str | None = None) -> list:
349351
item_submission.upsert_db()
350352

351353
self.metrics_client.publish_single_metric(
352-
metric_name="submission_error",
353-
value=1,
354-
unit="Count",
355-
metric_dimensions=self.metrics_dimensions,
354+
Metric(
355+
name="submission_error",
356+
value=1,
357+
unit="Count",
358+
dimensions=self.metrics_dimensions,
359+
)
356360
)
357361

358362
logger.info(
@@ -441,10 +445,12 @@ def finalize_items(self) -> None:
441445
logger.debug(f"Record {log_str} was ingested")
442446

443447
self.metrics_client.publish_single_metric(
444-
metric_name="ingested_item",
445-
value=1,
446-
unit="Count",
447-
metric_dimensions=self.metrics_dimensions,
448+
Metric(
449+
name="ingested_item",
450+
value=1,
451+
unit="Count",
452+
dimensions=self.metrics_dimensions,
453+
)
448454
)
449455
elif result_message.result_type == "error":
450456
item_submission.status = ItemSubmissionStatus.INGEST_FAILED
@@ -453,10 +459,12 @@ def finalize_items(self) -> None:
453459
logger.debug(f"Record {log_str} failed to ingest")
454460

455461
self.metrics_client.publish_single_metric(
456-
metric_name="ingest_error",
457-
value=1,
458-
unit="Count",
459-
metric_dimensions=self.metrics_dimensions,
462+
Metric(
463+
name="ingest_error",
464+
value=1,
465+
unit="Count",
466+
dimensions=self.metrics_dimensions,
467+
)
460468
)
461469

462470
else:

0 commit comments

Comments
 (0)