33import random
44import time
55from collections .abc import Iterable , Sequence
6- from typing import TYPE_CHECKING
6+ from typing import TYPE_CHECKING , TypedDict
77
88import click
99from sqlalchemy .orm import Session , sessionmaker
1212from enums .cloud_plan import CloudPlan
1313from extensions .ext_database import db
1414from models .workflow import WorkflowRun
15- from repositories .api_workflow_run_repository import APIWorkflowRunRepository
15+ from repositories .api_workflow_run_repository import APIWorkflowRunRepository , RunsWithRelatedCountsDict
1616from repositories .factory import DifyAPIRepositoryFactory
1717from repositories .sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
1818from services .billing_service import BillingService , SubscriptionPlan
2424 from opentelemetry .metrics import Counter , Histogram
2525
2626
27+ class RelatedCountsDict (TypedDict ):
28+ node_executions : int
29+ offloads : int
30+ app_logs : int
31+ trigger_logs : int
32+ pauses : int
33+ pause_reasons : int
34+
35+
2736class WorkflowRunCleanupMetrics :
2837 """
2938 Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
@@ -233,7 +242,7 @@ def run(self) -> None:
233242
234243 total_runs_deleted = 0
235244 total_runs_targeted = 0
236- related_totals = self ._empty_related_counts () if self .dry_run else None
245+ related_totals : RelatedCountsDict | None = self ._empty_related_counts () if self .dry_run else None
237246 batch_index = 0
238247 last_seen : tuple [datetime .datetime , str ] | None = None
239248 status = "success"
@@ -315,8 +324,7 @@ def run(self) -> None:
315324 int ((time .monotonic () - count_start ) * 1000 ),
316325 )
317326 if related_totals is not None :
318- for k in _RELATED_RECORD_KEYS :
319- related_totals [k ] += batch_counts .get (k , 0 ) # type: ignore[literal-required,operator]
327+ self ._accumulate_related_counts (related_totals , batch_counts )
320328 sample_ids = ", " .join (run .id for run in free_runs [:5 ])
321329 click .echo (
322330 click .style (
@@ -515,7 +523,7 @@ def _count_trigger_logs(self, session: Session, run_ids: Sequence[str]) -> int:
515523 return trigger_repo .count_by_run_ids (run_ids )
516524
517525 @staticmethod
518- def _empty_related_counts () -> dict [ str , int ] :
526+ def _empty_related_counts () -> RelatedCountsDict :
519527 return {
520528 "node_executions" : 0 ,
521529 "offloads" : 0 ,
@@ -526,7 +534,7 @@ def _empty_related_counts() -> dict[str, int]:
526534 }
527535
528536 @staticmethod
529- def _format_related_counts (counts : dict [ str , int ] ) -> str :
537+ def _format_related_counts (counts : RelatedCountsDict ) -> str :
530538 return (
531539 f"node_executions { counts ['node_executions' ]} , "
532540 f"offloads { counts ['offloads' ]} , "
@@ -536,6 +544,15 @@ def _format_related_counts(counts: dict[str, int]) -> str:
536544 f"pause_reasons { counts ['pause_reasons' ]} "
537545 )
538546
547+ @staticmethod
548+ def _accumulate_related_counts (totals : RelatedCountsDict , batch : RunsWithRelatedCountsDict ) -> None :
549+ totals ["node_executions" ] += batch .get ("node_executions" , 0 )
550+ totals ["offloads" ] += batch .get ("offloads" , 0 )
551+ totals ["app_logs" ] += batch .get ("app_logs" , 0 )
552+ totals ["trigger_logs" ] += batch .get ("trigger_logs" , 0 )
553+ totals ["pauses" ] += batch .get ("pauses" , 0 )
554+ totals ["pause_reasons" ] += batch .get ("pause_reasons" , 0 )
555+
539556 def _count_node_executions (self , session : Session , runs : Sequence [WorkflowRun ]) -> tuple [int , int ]:
540557 run_ids = [run .id for run in runs ]
541558 repo = DifyAPIRepositoryFactory .create_api_workflow_node_execution_repository (
0 commit comments