Skip to content

Commit 7c4b5f4

Browse files
committed
Further updates based on discussion in PR #226
* Add _publish_count_metric method and call in Workflow methods
1 parent a8f9f5c commit 7c4b5f4

1 file changed

Lines changed: 27 additions & 54 deletions

File tree

dsc/workflows/base/workflow.py

Lines changed: 27 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -335,20 +335,7 @@ def submit_items(self, collection_handle: str | None = None) -> list:
335335
item_submission.status_details = None
336336
item_submission.submit_attempts += 1
337337
item_submission.upsert_db()
338-
try:
339-
self.metrics_client.publish_metric(
340-
Metric(
341-
name="item_submitted",
342-
value=1,
343-
unit="Count",
344-
dimensions=self.metrics_dimensions,
345-
)
346-
)
347-
except Exception:
348-
logger.exception(
349-
f"Failed to publish item submitted metric for item "
350-
f"{item_identifier}"
351-
)
338+
self._publish_count_metric("item_submitted", f"item {item_identifier}")
352339
except NotImplementedError:
353340
raise
354341
except Exception as exception: # noqa: BLE001
@@ -357,20 +344,9 @@ def submit_items(self, collection_handle: str | None = None) -> list:
357344
item_submission.status_details = str(exception)
358345
item_submission.submit_attempts += 1
359346
item_submission.upsert_db()
360-
try:
361-
self.metrics_client.publish_metric(
362-
Metric(
363-
name="submission_error",
364-
value=1,
365-
unit="Count",
366-
dimensions=self.metrics_dimensions,
367-
)
368-
)
369-
except Exception:
370-
logger.exception(
371-
f"Failed to publish submission error metric for item "
372-
f"{item_submission.item_identifier}"
373-
)
347+
self._publish_count_metric(
348+
"submission_error", f"item {item_submission.item_identifier}"
349+
)
374350

375351
logger.info(
376352
f"Submitted messages to the DSS input queue '{CONFIG.sqs_queue_dss_input}' "
@@ -456,37 +432,13 @@ def finalize_items(self) -> None:
456432
)
457433
sqs_results_summary["ingest_success"] += 1
458434
logger.debug(f"Record {log_str} was ingested")
459-
try:
460-
self.metrics_client.publish_metric(
461-
Metric(
462-
name="ingested_item",
463-
value=1,
464-
unit="Count",
465-
dimensions=self.metrics_dimensions,
466-
)
467-
)
468-
except Exception:
469-
logger.exception(
470-
f"Failed to publish ingested item metric for record {log_str}"
471-
)
435+
self._publish_count_metric("ingested_item", f"record {log_str}")
472436
elif result_message.result_type == "error":
473437
item_submission.status = ItemSubmissionStatus.INGEST_FAILED
474438
item_submission.status_details = result_message.error_info
475439
sqs_results_summary["ingest_failed"] += 1
476440
logger.debug(f"Record {log_str} failed to ingest")
477-
try:
478-
self.metrics_client.publish_metric(
479-
Metric(
480-
name="ingest_error",
481-
value=1,
482-
unit="Count",
483-
dimensions=self.metrics_dimensions,
484-
)
485-
)
486-
except Exception:
487-
logger.exception(
488-
f"Failed to publish ingest error metric for record {log_str}"
489-
)
441+
self._publish_count_metric("ingest_error", f"record {log_str}")
490442

491443
else:
492444
item_submission.status = ItemSubmissionStatus.INGEST_UNKNOWN
@@ -513,6 +465,27 @@ def workflow_specific_processing(self) -> None:
513465
f"No extra processing for batch based on workflow: '{self.workflow_name}' "
514466
)
515467

468+
def _publish_count_metric(self, metric_name: str, log_data: str) -> None:
469+
"""Publish a count metric to CloudWatch.
470+
471+
Any exceptions are caught and logged.
472+
473+
Args:
474+
metric_name: The name of the metric to publish.
475+
log_data: Additional data included in the log message.
476+
"""
477+
try:
478+
self.metrics_client.publish_metric(
479+
Metric(
480+
name=metric_name,
481+
value=1,
482+
unit="Count",
483+
dimensions=self.metrics_dimensions,
484+
)
485+
)
486+
except Exception:
487+
logger.exception(f"Failed to publish '{metric_name}' metric: {log_data}")
488+
516489
def send_report(
517490
self,
518491
step: Literal["create", "submit", "finalize"],

0 commit comments

Comments
 (0)