33import random
44import time
55from collections .abc import Iterable , Sequence
6- from typing import TYPE_CHECKING
6+ from typing import TYPE_CHECKING , TypedDict
77
88import click
99from sqlalchemy .orm import Session , sessionmaker
2424 from opentelemetry .metrics import Counter , Histogram
2525
2626
27+ class RelatedCountsDict (TypedDict ):
28+ node_executions : int
29+ offloads : int
30+ app_logs : int
31+ trigger_logs : int
32+ pauses : int
33+ pause_reasons : int
34+
35+
2736class WorkflowRunCleanupMetrics :
2837 """
2938 Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
@@ -230,7 +239,7 @@ def run(self) -> None:
230239
231240 total_runs_deleted = 0
232241 total_runs_targeted = 0
233- related_totals = self ._empty_related_counts () if self .dry_run else None
242+ related_totals : RelatedCountsDict | None = self ._empty_related_counts () if self .dry_run else None
234243 batch_index = 0
235244 last_seen : tuple [datetime .datetime , str ] | None = None
236245 status = "success"
@@ -312,8 +321,7 @@ def run(self) -> None:
312321 int ((time .monotonic () - count_start ) * 1000 ),
313322 )
314323 if related_totals is not None :
315- for key in related_totals :
316- related_totals [key ] += batch_counts .get (key , 0 )
324+ self ._accumulate_related_counts (related_totals , batch_counts )
317325 sample_ids = ", " .join (run .id for run in free_runs [:5 ])
318326 click .echo (
319327 click .style (
@@ -506,7 +514,7 @@ def _count_trigger_logs(self, session: Session, run_ids: Sequence[str]) -> int:
506514 return trigger_repo .count_by_run_ids (run_ids )
507515
508516 @staticmethod
509- def _empty_related_counts () -> dict [ str , int ] :
517+ def _empty_related_counts () -> RelatedCountsDict :
510518 return {
511519 "node_executions" : 0 ,
512520 "offloads" : 0 ,
@@ -517,7 +525,7 @@ def _empty_related_counts() -> dict[str, int]:
517525 }
518526
519527 @staticmethod
520- def _format_related_counts (counts : dict [ str , int ] ) -> str :
528+ def _format_related_counts (counts : RelatedCountsDict ) -> str :
521529 return (
522530 f"node_executions { counts ['node_executions' ]} , "
523531 f"offloads { counts ['offloads' ]} , "
@@ -527,6 +535,15 @@ def _format_related_counts(counts: dict[str, int]) -> str:
527535 f"pause_reasons { counts ['pause_reasons' ]} "
528536 )
529537
538+ @staticmethod
539+ def _accumulate_related_counts (totals : RelatedCountsDict , batch : dict [str , int ]) -> None :
540+ totals ["node_executions" ] += batch .get ("node_executions" , 0 )
541+ totals ["offloads" ] += batch .get ("offloads" , 0 )
542+ totals ["app_logs" ] += batch .get ("app_logs" , 0 )
543+ totals ["trigger_logs" ] += batch .get ("trigger_logs" , 0 )
544+ totals ["pauses" ] += batch .get ("pauses" , 0 )
545+ totals ["pause_reasons" ] += batch .get ("pause_reasons" , 0 )
546+
530547 def _count_node_executions (self , session : Session , runs : Sequence [WorkflowRun ]) -> tuple [int , int ]:
531548 run_ids = [run .id for run in runs ]
532549 repo = DifyAPIRepositoryFactory .create_api_workflow_node_execution_repository (
0 commit comments