-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcommand_base.py
More file actions
148 lines (125 loc) · 5.92 KB
/
command_base.py
File metadata and controls
148 lines (125 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""Django management command base class for CollectorRunnable-backed collectors."""
# Design notes (review summary):
# - Template method: handle() -> get_collector(**options) -> phase(run) -> phase(sync_pinecone).
# - ABC: subclasses that omit get_collector() raise TypeError at instantiation, not at import.
# - Each _run_collector_phase uses try/except/finally; finally clears _error_phase even if
# handle_error, classify_failure, or logging raises (double fault cleanup).
# - classify_failure (core.errors) maps a core dependency surface; many SDK/DB errors stay
# unknown—override handle_error on the collector when you need a specific category.
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any
from django.core.management.base import BaseCommand, CommandError
from core.collectors.base_collector import CollectorRunnable
from core.protocol_dto import TrackerResultDataclass
from core.protocols import TrackerResult
logger = logging.getLogger(__name__)
def _records_collected(result: TrackerResult) -> int:
"""Sum count values, excluding meta keys like ``errors``."""
return sum(
v for k, v in result.counts.items() if k not in ("errors", "failed_count")
)
def _log_collector_result(collector: CollectorRunnable, result: TrackerResult) -> None:
collector_id = getattr(collector, "name", None)
if not isinstance(collector_id, str) or not collector_id:
collector_id = collector.__class__.__name__
records = _records_collected(result)
extra: dict[str, Any] = {
"collector": collector_id,
"success": result.success,
"records_collected": records,
"error_count": len(result.errors),
"duration_seconds": result.duration_seconds,
"counts": dict(result.counts),
}
if isinstance(result, TrackerResultDataclass):
extra["result_repr"] = repr(result)
extra["result_json"] = result.to_json()
logger.info(
"Collector finished: collector=%s success=%s records_collected=%s "
"error_count=%s duration_seconds=%s counts=%s result=%s",
collector_id,
result.success,
records,
len(result.errors),
result.duration_seconds,
dict(result.counts),
extra.get("result_repr", result),
extra=extra,
)
class BaseCollectorCommand(ABC, BaseCommand):
"""
Thin Django ``BaseCommand`` adapter using the template-method pattern.
**Flow:** :meth:`django.core.management.base.BaseCommand.handle` is implemented as
``get_collector(**options)``, then :meth:`_run_collector_phase` with ``collector.run``,
then :meth:`_run_collector_phase` with ``collector.sync_pinecone``.
**``get_collector`` contract:** Must return a :class:`CollectorRunnable`—any object
with ``run()``, ``sync_pinecone()``, and ``handle_error(exc)``. Typical implementations
return an :class:`~core.collectors.base_collector.AbstractCollector` instance (or
any other :class:`CollectorRunnable`). Subclasses that
do not implement :meth:`get_collector` cannot be instantiated (``TypeError`` from
``abc``), which surfaces as soon as the command object is constructed, usually when
Django loads the command.
**Errors:** :class:`~django.core.management.base.CommandError` is logged with
``failure_category`` set to ``\"command\"`` and re-raised without calling
``handle_error``. Any other :class:`Exception` is passed to ``collector.handle_error``
(which classifies via :func:`core.errors.classify_failure` and logs), then re-raised.
A ``finally`` block always removes ``collector._error_phase`` after each phase.
"""
@abstractmethod
def get_collector(self, **options: Any) -> CollectorRunnable:
"""
Build the collector instance from parsed CLI options.
Args:
**options: Keyword arguments forwarded from :meth:`handle` (Django-parsed
command-line options and defaults).
Returns:
A :class:`CollectorRunnable` executed by :meth:`handle` (``run`` then
``sync_pinecone``).
"""
def handle(self, *args: Any, **options: Any) -> None:
collector = self.get_collector(**options)
self._run_collector_phase(collector, collector.run)
result = getattr(collector, "last_result", None)
if result is not None and isinstance(result, TrackerResult):
_log_collector_result(collector, result)
self._run_collector_phase(collector, collector.sync_pinecone)
def _run_collector_phase(
self,
collector: CollectorRunnable,
phase: Any,
) -> None:
"""
Run a single zero-argument callable phase on *collector*.
Sets ``collector._error_phase`` to the callable's ``__name__`` (for example
``\"run\"`` or ``\"sync_pinecone\"``) before invoking *phase*, clears it in
``finally``, and routes failures per :class:`BaseCollectorCommand` error rules.
Args:
collector: Object providing ``handle_error`` for non-command failures.
phase: Bound method or callable with no arguments (typically
``collector.run`` or ``collector.sync_pinecone``).
Returns:
None
"""
phase_name = getattr(phase, "__name__", str(phase))
setattr(collector, "_error_phase", phase_name)
try:
phase()
except CommandError:
logger.error(
"Collector raised CommandError during %s",
phase_name,
extra={
"collector": collector.__class__.__name__,
"collector_phase": phase_name,
"failure_category": "command",
},
)
raise
except Exception as exc:
collector.handle_error(exc)
raise
finally:
if hasattr(collector, "_error_phase"):
delattr(collector, "_error_phase")