Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import random
import time
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypedDict

import click
from sqlalchemy.orm import Session, sessionmaker
Expand All @@ -12,7 +12,7 @@
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict
from repositories.factory import DifyAPIRepositoryFactory
from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository
from services.billing_service import BillingService, SubscriptionPlan
Expand All @@ -24,6 +24,15 @@
from opentelemetry.metrics import Counter, Histogram


class RelatedCountsDict(TypedDict):
node_executions: int
offloads: int
app_logs: int
trigger_logs: int
pauses: int
pause_reasons: int


class WorkflowRunCleanupMetrics:
"""
Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs.
Expand Down Expand Up @@ -233,7 +242,7 @@ def run(self) -> None:

total_runs_deleted = 0
total_runs_targeted = 0
related_totals = self._empty_related_counts() if self.dry_run else None
related_totals: RelatedCountsDict | None = self._empty_related_counts() if self.dry_run else None
batch_index = 0
last_seen: tuple[datetime.datetime, str] | None = None
status = "success"
Expand Down Expand Up @@ -315,8 +324,7 @@ def run(self) -> None:
int((time.monotonic() - count_start) * 1000),
)
if related_totals is not None:
for k in _RELATED_RECORD_KEYS:
related_totals[k] += batch_counts.get(k, 0) # type: ignore[literal-required,operator]
self._accumulate_related_counts(related_totals, batch_counts)
sample_ids = ", ".join(run.id for run in free_runs[:5])
click.echo(
click.style(
Expand Down Expand Up @@ -515,7 +523,7 @@ def _count_trigger_logs(self, session: Session, run_ids: Sequence[str]) -> int:
return trigger_repo.count_by_run_ids(run_ids)

@staticmethod
def _empty_related_counts() -> dict[str, int]:
def _empty_related_counts() -> RelatedCountsDict:
return {
"node_executions": 0,
"offloads": 0,
Expand All @@ -526,7 +534,7 @@ def _empty_related_counts() -> dict[str, int]:
}

@staticmethod
def _format_related_counts(counts: dict[str, int]) -> str:
def _format_related_counts(counts: RelatedCountsDict) -> str:
return (
f"node_executions {counts['node_executions']}, "
f"offloads {counts['offloads']}, "
Expand All @@ -536,6 +544,15 @@ def _format_related_counts(counts: dict[str, int]) -> str:
f"pause_reasons {counts['pause_reasons']}"
)

@staticmethod
def _accumulate_related_counts(totals: RelatedCountsDict, batch: RunsWithRelatedCountsDict) -> None:
totals["node_executions"] += batch.get("node_executions", 0)
totals["offloads"] += batch.get("offloads", 0)
totals["app_logs"] += batch.get("app_logs", 0)
totals["trigger_logs"] += batch.get("trigger_logs", 0)
totals["pauses"] += batch.get("pauses", 0)
totals["pause_reasons"] += batch.get("pause_reasons", 0)

def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]:
run_ids = [run.id for run in runs]
repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
Expand Down
Loading