From 17590e0bc9e1aad9aa8a1a267fcb10653fefacca Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Tue, 9 Jun 2026 12:01:06 -0700 Subject: [PATCH 1/5] feat(protocols): introduce LibraryDocsTrackerResult and CollectBoostLibrariesResult DTOs --- .../run_boost_library_docs_tracker.py | 18 ++- boost_library_docs_tracker/protocol_impl.py | 33 +++++ ..._run_boost_library_docs_tracker_command.py | 10 +- .../commands/collect_boost_libraries.py | 106 ++++++++++------ .../run_boost_github_activity_tracker.py | 15 ++- boost_library_tracker/protocol_impl.py | 37 ++++++ .../test_collect_boost_libraries_command.py | 6 +- .../tests/test_protocol_impl.py | 12 ++ boost_library_usage_dashboard/collectors.py | 6 +- .../protocol_impl.py | 27 ++++ .../run_boost_mailing_list_tracker.py | 31 ++++- boost_mailing_list_tracker/protocol_impl.py | 52 ++++++++ .../tests/test_protocol_impl.py | 18 +++ .../commands/run_boost_usage_tracker.py | 33 ++++- clang_github_tracker/collectors.py | 25 +++- clang_github_tracker/protocol_impl.py | 73 +++++++++++ .../tests/test_protocol_impl.py | 21 ++++ core/collectors/README.md | 6 +- core/collectors/__init__.py | 4 + core/collectors/base_collector.py | 52 +++++++- core/collectors/command_base.py | 34 +++++ core/incremental_state.py | 15 +++ core/management/commands/startcollector.py | 5 +- core/protocols.py | 14 +++ .../protocol_assignment_positive.py | 2 + .../test_collector_protocol_conformance.py | 80 ++++++++++++ core/tests/test_collectors_base.py | 117 ++++++++++++++---- core/tests/test_protocols.py | 18 +++ core/tracker_result.py | 48 +++++++ .../commands/run_cppa_pinecone_sync.py | 23 ++-- cppa_pinecone_sync/protocol_impl.py | 31 +++++ .../tests/test_protocol_impl.py | 15 +++ .../commands/run_cppa_slack_tracker.py | 25 +++- cppa_slack_tracker/protocol_impl.py | 43 +++++++ .../tests/test_protocol_impl.py | 15 +++ .../commands/run_cppa_user_tracker.py | 10 +- .../run_cppa_youtube_script_tracker.py | 12 +- cppa_youtube_script_tracker/protocol_impl.py | 28 +++++ .../backfill_discord_activity_tracker.py | 12 +- .../commands/run_discord_activity_tracker.py | 46 +++++-- discord_activity_tracker/protocol_impl.py | 2 + docs/Core_public_API.md | 17 ++- docs/How_to_add_a_collector.md | 8 +- docs/Tutorial_building_a_collector.md | 12 +- docs/service_api/core_protocols.md | 3 + github_activity_tracker/protocol_impl.py | 18 +++ .../tests/test_protocol_impl.py | 8 ++ wg21_paper_tracker/collectors.py | 18 +-- wg21_paper_tracker/protocol_impl.py | 27 ++++ .../tests/test_protocol_impl.py | 14 +++ 50 files changed, 1166 insertions(+), 139 deletions(-) create mode 100644 boost_library_docs_tracker/protocol_impl.py create mode 100644 boost_library_tracker/protocol_impl.py create mode 100644 boost_library_tracker/tests/test_protocol_impl.py create mode 100644 boost_library_usage_dashboard/protocol_impl.py create mode 100644 boost_mailing_list_tracker/protocol_impl.py create mode 100644 boost_mailing_list_tracker/tests/test_protocol_impl.py create mode 100644 clang_github_tracker/protocol_impl.py create mode 100644 clang_github_tracker/tests/test_protocol_impl.py create mode 100644 core/incremental_state.py create mode 100644 core/tests/test_collector_protocol_conformance.py create mode 100644 core/tracker_result.py create mode 100644 cppa_pinecone_sync/protocol_impl.py create mode 100644 cppa_pinecone_sync/tests/test_protocol_impl.py create mode 100644 cppa_slack_tracker/protocol_impl.py create mode 100644 cppa_slack_tracker/tests/test_protocol_impl.py create mode 100644 cppa_youtube_script_tracker/protocol_impl.py create mode 100644 wg21_paper_tracker/protocol_impl.py create mode 100644 wg21_paper_tracker/tests/test_protocol_impl.py diff --git a/boost_library_docs_tracker/management/commands/run_boost_library_docs_tracker.py b/boost_library_docs_tracker/management/commands/run_boost_library_docs_tracker.py index df71b95c..0a72f814 100644 --- a/boost_library_docs_tracker/management/commands/run_boost_library_docs_tracker.py +++ b/boost_library_docs_tracker/management/commands/run_boost_library_docs_tracker.py @@ -44,6 +44,8 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import TrackerResult +from boost_library_docs_tracker.protocol_impl import LibraryDocsTrackerResult from boost_library_docs_tracker import fetcher, services, workspace from boost_library_docs_tracker.preprocessor import preprocess_for_pinecone @@ -72,10 +74,10 @@ def validate_config(self) -> None: if max_pages is not None and max_pages < 1: raise CommandError("--max-pages must be at least 1.") - def collect(self) -> None: + def collect(self) -> TrackerResult: o = self.options try: - self.cmd._run( + return self.cmd._run( versions_arg=o["versions"], library_filter=o["library"], dry_run=o["dry_run"], @@ -169,7 +171,7 @@ def _run( max_pages, use_local, cleanup_extract, - ): + ) -> LibraryDocsTrackerResult: versions = self._resolve_versions(versions_arg) self.stdout.write( f"Processing {len(versions)} version(s): {', '.join(versions)}" @@ -177,8 +179,9 @@ def _run( mode = "local-zip" if use_local else "HTTP crawl" self.stdout.write(f"Scrape mode: {mode}") + total_pages = 0 for version in versions: - self._process_version( + total_pages += self._process_version( version=version, library_filter=library_filter, dry_run=dry_run, @@ -191,6 +194,12 @@ def _run( reason = "dry run" if dry_run else "--skip-pinecone set" self.stdout.write(f"Skipping Pinecone sync ({reason}).") + return LibraryDocsTrackerResult.from_run( + versions=len(versions), + pages=total_pages, + dry_run=dry_run, + ) + def _process_version( self, *, version, library_filter, dry_run, max_pages, use_local, cleanup_extract ): @@ -247,6 +256,7 @@ def _process_version( ) self.stdout.write(f"[{version}] Done — {total_pages} pages total.") + return total_pages def _prepare_local_source(self, *, version: str) -> tuple[Path, Path]: """Download and extract the Boost source zip for a version. diff --git a/boost_library_docs_tracker/protocol_impl.py b/boost_library_docs_tracker/protocol_impl.py new file mode 100644 index 00000000..8947bbfb --- /dev/null +++ b/boost_library_docs_tracker/protocol_impl.py @@ -0,0 +1,33 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for library docs tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Mapping + + +@dataclass(frozen=True) +class LibraryDocsTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for docs scrape runs.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_run( + cls, + *, + versions: int, + pages: int = 0, + dry_run: bool = False, + ) -> LibraryDocsTrackerResult: + return cls( + success=True, + counts={ + "versions": versions, + "pages": pages, + "dry_run": int(dry_run), + }, + ) diff --git a/boost_library_docs_tracker/tests/test_run_boost_library_docs_tracker_command.py b/boost_library_docs_tracker/tests/test_run_boost_library_docs_tracker_command.py index f87ead16..c5f1a1cd 100644 --- a/boost_library_docs_tracker/tests/test_run_boost_library_docs_tracker_command.py +++ b/boost_library_docs_tracker/tests/test_run_boost_library_docs_tracker_command.py @@ -443,8 +443,16 @@ def test_call_command_dry_run_skips_pinecone(boost_library_version): ver.version = "boost-1.81.0" ver.save() buf = StringIO() + from boost_library_docs_tracker.protocol_impl import LibraryDocsTrackerResult + with ( - patch.object(Command, "_run") as run_mock, + patch.object( + Command, + "_run", + return_value=LibraryDocsTrackerResult.from_run( + versions=1, pages=0, dry_run=True + ), + ) as run_mock, patch.object(Command, "_sync_pinecone") as sync_mock, ): call_command( diff --git a/boost_library_tracker/management/commands/collect_boost_libraries.py b/boost_library_tracker/management/commands/collect_boost_libraries.py index 8e0ac188..de64d15a 100644 --- a/boost_library_tracker/management/commands/collect_boost_libraries.py +++ b/boost_library_tracker/management/commands/collect_boost_libraries.py @@ -17,8 +17,13 @@ import logging import re +from collections.abc import Sequence -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import CommandError + +from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import TrackerResult +from boost_library_tracker.protocol_impl import CollectBoostLibrariesResult from django.db import transaction from boost_library_tracker.models import ( @@ -174,7 +179,7 @@ def _collect_libraries_for_version( boost_version, ref: str, *, - client: GitHubAPIClient = None, + client: GitHubAPIClient | None = None, dry_run: bool = False, ) -> tuple[int, int]: """ @@ -238,7 +243,53 @@ def _collect_libraries_for_version( return created_total, len(lib_submodules) -class Command(BaseCommand): +class CollectBoostLibrariesCollector(AbstractCollector): + """Collect Boost versions and library metadata from boostorg/boost.""" + + def __init__(self, cmd: "Command", options: dict) -> None: + self.cmd = cmd + self.options = options + + @property + def name(self) -> str: + return "collect_boost_libraries" + + def validate_config(self) -> None: + return None + + def collect(self) -> TrackerResult: + dry_run = self.options.get("dry_run", False) + limit = self.options.get("limit") + + try: + boost_versions_list = _parse_boost_version_option( + self.options.get("boost_version") + ) + except CommandError as e: + logger.error("Error parsing --boost-version: %s", e) + raise + + target_releases: Sequence[tuple[str, str | None]] = [] + + if boost_versions_list and "all" == boost_versions_list[0]: + target_releases = all_boost_versions_from_api() or [] + elif boost_versions_list and "new" not in boost_versions_list: + target_releases = [(ref, None) for ref in boost_versions_list] + elif not boost_versions_list or "new" in boost_versions_list: + target_releases = new_boost_versions_from_api() + + if not target_releases: + logger.warning("No releases to process") + return CollectBoostLibrariesResult.empty(dry_run=dry_run) + + if limit: + target_releases = target_releases[:limit] + logger.info("Processing first %s releases", limit) + + return self.cmd._process_refs(target_releases, dry_run=dry_run) + + +class Command(BaseCollectorCommand): """Management command: collect Boost versions and library metadata.""" help = ( @@ -268,44 +319,15 @@ def add_arguments(self, parser): help="Fetch and report what would be done; no DB writes.", ) - def handle(self, *_args, **options): - - dry_run = options.get("dry_run", False) - limit = options.get("limit") - - try: - boost_versions_list = _parse_boost_version_option( - options.get("boost_version") - ) - except CommandError as e: - logger.error("Error parsing --boost-version: %s", e) - return - - target_releases: list[tuple[str, str]] = [] - - if boost_versions_list and "all" == boost_versions_list[0]: - target_releases = all_boost_versions_from_api() - elif boost_versions_list and "new" not in boost_versions_list: - target_releases = [(ref, None) for ref in boost_versions_list] - elif not boost_versions_list or "new" in boost_versions_list: - target_releases = new_boost_versions_from_api() - - if not target_releases: - logger.warning("No releases to process") - return - - if limit: - target_releases = target_releases[:limit] - logger.info("Processing first %s releases", limit) - - self._process_refs(target_releases, dry_run=dry_run) + def get_collector(self, **options) -> AbstractCollector: + return CollectBoostLibrariesCollector(cmd=self, options=dict(options)) def _process_refs( self, - target_releases: list[tuple[str, str | None]], + target_releases: Sequence[tuple[str, str | None]], *, dry_run: bool = False, - ) -> None: + ) -> CollectBoostLibrariesResult: """Process (ref, published_at) pairs; each ref in its own transaction. ``published_at`` is set when refs came from the GitHub releases API; use None @@ -315,14 +337,18 @@ def _process_refs( if dry_run: logger.info("Dry run: no DB writes.") logger.info("Would process %s releases", len(target_releases)) - return + return CollectBoostLibrariesResult.from_totals( + versions_created=0, + library_versions_created=0, + dry_run=True, + ) total_versions_created = 0 total_lib_versions_created = 0 client = get_github_client(use="scraping") if not client: logger.error("Could not create GitHub Client") - return + return CollectBoostLibrariesResult.empty() for tag, sha in target_releases: if not sha: @@ -363,3 +389,7 @@ def _process_refs( total_versions_created, total_lib_versions_created, ) + return CollectBoostLibrariesResult.from_totals( + versions_created=total_versions_created, + library_versions_created=total_lib_versions_created, + ) diff --git a/boost_library_tracker/management/commands/run_boost_github_activity_tracker.py b/boost_library_tracker/management/commands/run_boost_github_activity_tracker.py index c3139658..d6781d4b 100644 --- a/boost_library_tracker/management/commands/run_boost_github_activity_tracker.py +++ b/boost_library_tracker/management/commands/run_boost_github_activity_tracker.py @@ -28,6 +28,8 @@ ensure_repository_owner, get_or_create_repository, ) +from core.protocols import TrackerResult +from github_activity_tracker.protocol_impl import GitHubSyncTrackerResult from github_activity_tracker.sync import sync_github from boost_library_tracker.services import get_or_create_boost_library_repo @@ -374,8 +376,8 @@ def validate_config(self) -> None: except ValueError as e: raise CommandError(str(e)) from e - def collect(self) -> None: - self.cmd._handle_core(self.options) + def collect(self) -> TrackerResult: + return self.cmd._handle_core(self.options) def sync_pinecone(self) -> None: o = self.options @@ -454,7 +456,7 @@ def add_arguments(self, parser): def get_collector(self, **options): return BoostGithubActivityCollector(cmd=self, options=dict(options)) - def _handle_core(self, options): + def _handle_core(self, options) -> GitHubSyncTrackerResult: dry_run = options["dry_run"] skip_github_sync = options["skip_github_sync"] skip_markdown_export = options["skip_markdown_export"] @@ -511,7 +513,7 @@ def _handle_core(self, options): if not skip_pinecone: logger.info("dry-run would run Pinecone upsert for issues and PRs") logger.info("finished successfully") - return + return GitHubSyncTrackerResult(success=True, counts={}) synced_repos: list = [] if not skip_github_sync: @@ -555,6 +557,11 @@ def _handle_core(self, options): logger.info("skipping Pinecone (--skip-pinecone)") logger.info("finished successfully") + repo_results = [ + GitHubSyncTrackerResult.from_sync_dict(sr) + for _own, _repo, _boost_repo, sr in synced_repos + ] + return GitHubSyncTrackerResult.merge(*repo_results) except Exception as e: logger.exception("command failed: %s", e) raise diff --git a/boost_library_tracker/protocol_impl.py b/boost_library_tracker/protocol_impl.py new file mode 100644 index 00000000..48ae5eb6 --- /dev/null +++ b/boost_library_tracker/protocol_impl.py @@ -0,0 +1,37 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for boost library tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Mapping + + +@dataclass(frozen=True) +class CollectBoostLibrariesResult: + """Structured :class:`~core.protocols.TrackerResult` for library metadata collection.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_totals( + cls, + *, + versions_created: int, + library_versions_created: int, + dry_run: bool = False, + ) -> CollectBoostLibrariesResult: + return cls( + success=True, + counts={ + "versions": versions_created, + "library_versions": library_versions_created, + "dry_run": int(dry_run), + }, + ) + + @classmethod + def empty(cls, *, dry_run: bool = False) -> CollectBoostLibrariesResult: + return cls(success=True, counts={"dry_run": int(dry_run)}) diff --git a/boost_library_tracker/tests/test_collect_boost_libraries_command.py b/boost_library_tracker/tests/test_collect_boost_libraries_command.py index c1c44c27..fea06874 100644 --- a/boost_library_tracker/tests/test_collect_boost_libraries_command.py +++ b/boost_library_tracker/tests/test_collect_boost_libraries_command.py @@ -12,7 +12,6 @@ from model_bakery import baker from boost_library_tracker.management.commands.collect_boost_libraries import ( - Command as CollectCmd, _collect_libraries_for_version, _normalize_ref, _parse_boost_version_option, @@ -52,7 +51,10 @@ def test_parse_boost_version_option_errors(): @pytest.mark.django_db def test_collect_handle_parse_error_logs(caplog): caplog.set_level(logging.ERROR) - CollectCmd().handle(boost_version="badtag", dry_run=False) + with pytest.raises(CommandError, match="Invalid release ref"): + call_command( + "collect_boost_libraries", boost_version="badtag", stdout=StringIO() + ) assert any("Error parsing" in r.message for r in caplog.records) diff --git a/boost_library_tracker/tests/test_protocol_impl.py b/boost_library_tracker/tests/test_protocol_impl.py new file mode 100644 index 00000000..f22aa53f --- /dev/null +++ b/boost_library_tracker/tests/test_protocol_impl.py @@ -0,0 +1,12 @@ +"""Tests for :mod:`boost_library_tracker.protocol_impl`.""" + +from core.protocols import TrackerResult + +from boost_library_tracker.protocol_impl import CollectBoostLibrariesResult + + +def test_collect_boost_libraries_result() -> None: + r = CollectBoostLibrariesResult.from_totals( + versions_created=1, library_versions_created=10 + ) + assert isinstance(r, TrackerResult) diff --git a/boost_library_usage_dashboard/collectors.py b/boost_library_usage_dashboard/collectors.py index f7c637f0..3d409f7f 100644 --- a/boost_library_usage_dashboard/collectors.py +++ b/boost_library_usage_dashboard/collectors.py @@ -6,6 +6,8 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector +from core.protocols import TrackerResult +from boost_library_usage_dashboard.protocol_impl import UsageDashboardTrackerResult from boost_library_usage_dashboard.analyzer import BoostUsageDashboardAnalyzer from boost_library_usage_dashboard.publisher import publish_dashboard from boost_library_usage_dashboard.renderer import render_dashboard_html @@ -42,9 +44,10 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> TrackerResult: output_dir = get_workspace_path("boost_library_usage_dashboard").resolve() output_dir.mkdir(parents=True, exist_ok=True) + stats = None if not self.skip_collect: logger.info("Step 1: Collecting dashboard data from PostgreSQL...") @@ -105,3 +108,4 @@ def collect(self) -> None: repo=repo, branch=branch, ) + return UsageDashboardTrackerResult.from_stats(stats) diff --git a/boost_library_usage_dashboard/protocol_impl.py b/boost_library_usage_dashboard/protocol_impl.py new file mode 100644 index 00000000..26ba6c94 --- /dev/null +++ b/boost_library_usage_dashboard/protocol_impl.py @@ -0,0 +1,27 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for usage dashboard.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Mapping + + +@dataclass(frozen=True) +class UsageDashboardTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for dashboard runs.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_stats(cls, stats: Mapping[str, Any] | None) -> UsageDashboardTrackerResult: + if stats is None: + return cls(success=True, counts={}) + repos = int(stats.get("repos_analyzed") or stats.get("total_repos") or 0) + return cls(success=True, counts={"repos_analyzed": repos}) + + @classmethod + def skipped(cls) -> UsageDashboardTrackerResult: + return cls(success=True, counts={}) diff --git a/boost_mailing_list_tracker/management/commands/run_boost_mailing_list_tracker.py b/boost_mailing_list_tracker/management/commands/run_boost_mailing_list_tracker.py index f8d044f6..b3d1d0d7 100644 --- a/boost_mailing_list_tracker/management/commands/run_boost_mailing_list_tracker.py +++ b/boost_mailing_list_tracker/management/commands/run_boost_mailing_list_tracker.py @@ -16,6 +16,11 @@ from django.utils.dateparse import parse_datetime from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import IncrementalState, TrackerResult +from boost_mailing_list_tracker.protocol_impl import ( + MailingListIncrementalState, + MailingListTrackerResult, +) from boost_mailing_list_tracker.email_formatter import format_email from boost_mailing_list_tracker.fetcher import ( @@ -248,7 +253,13 @@ def pre_collect(self) -> None: self._resolved_start_date, ) - def collect(self) -> None: + def load_incremental_state(self) -> IncrementalState | None: + marker = getattr(self, "_resolved_start_date", None) or self.start_date + if marker and str(marker).strip(): + return MailingListIncrementalState.from_start_date(str(marker).strip()) + return None + + def collect(self) -> TrackerResult: end_date = self.end_date start_date = self._resolved_start_date @@ -263,12 +274,22 @@ def collect(self) -> None: self.stdout.write(self.style.WARNING("No emails fetched from API.")) logger.info("run_boost_mailing_list_tracker: no emails fetched") emails = [] + if self.dry_run: + return MailingListTrackerResult.from_run( + fetched=0, created=0, skipped=0, dry_run=True + ) + return MailingListTrackerResult.from_run(fetched=0, created=0, skipped=0) self.stdout.write(f"Fetched {len(emails)} emails from API.") self._fetched_email_count = len(emails) if self.dry_run: - return + return MailingListTrackerResult.from_run( + fetched=self._fetched_email_count, + created=0, + skipped=0, + dry_run=True, + ) for email_data in emails: msg_id = email_data.get("msg_id", "") @@ -314,6 +335,12 @@ def collect(self) -> None: e, ) + return MailingListTrackerResult.from_run( + fetched=self._fetched_email_count, + created=self._created_count, + skipped=self._skipped_count, + ) + def post_collect(self) -> None: if self.dry_run: self.stdout.write( diff --git a/boost_mailing_list_tracker/protocol_impl.py b/boost_mailing_list_tracker/protocol_impl.py new file mode 100644 index 00000000..c43c05b2 --- /dev/null +++ b/boost_mailing_list_tracker/protocol_impl.py @@ -0,0 +1,52 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for mailing list tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Mapping + + +@dataclass(frozen=True) +class MailingListTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for mailing list runs.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_run( + cls, + *, + fetched: int, + created: int, + skipped: int, + dry_run: bool = False, + ) -> MailingListTrackerResult: + return cls( + success=True, + counts={ + "fetched": fetched, + "created": created, + "skipped": skipped, + "dry_run": int(dry_run), + }, + ) + + +@dataclass(frozen=True) +class MailingListIncrementalState: + """Checkpoint between mailing list runs.""" + + checkpoint_token: str | None + human_readable_marker: str | None + extras: Mapping[str, Any] = field(default_factory=dict) + + @classmethod + def from_start_date(cls, start_date: str | None) -> MailingListIncrementalState: + return cls( + checkpoint_token="mailing_list:boost", + human_readable_marker=start_date, + extras={"start_date": start_date}, + ) diff --git a/boost_mailing_list_tracker/tests/test_protocol_impl.py b/boost_mailing_list_tracker/tests/test_protocol_impl.py new file mode 100644 index 00000000..949cfe10 --- /dev/null +++ b/boost_mailing_list_tracker/tests/test_protocol_impl.py @@ -0,0 +1,18 @@ +"""Tests for :mod:`boost_mailing_list_tracker.protocol_impl`.""" + +from core.protocols import IncrementalState, TrackerResult + +from boost_mailing_list_tracker.protocol_impl import ( + MailingListIncrementalState, + MailingListTrackerResult, +) + + +def test_mailing_list_tracker_result() -> None: + r = MailingListTrackerResult.from_run(fetched=5, created=2, skipped=1) + assert isinstance(r, TrackerResult) + + +def test_mailing_list_incremental_state() -> None: + st = MailingListIncrementalState.from_start_date("2024-06-01") + assert isinstance(st, IncrementalState) diff --git a/boost_usage_tracker/management/commands/run_boost_usage_tracker.py b/boost_usage_tracker/management/commands/run_boost_usage_tracker.py index 30bca4c4..36f50c15 100644 --- a/boost_usage_tracker/management/commands/run_boost_usage_tracker.py +++ b/boost_usage_tracker/management/commands/run_boost_usage_tracker.py @@ -20,8 +20,12 @@ from django.utils.dateparse import parse_datetime from django.core.management.base import CommandError -from core.collectors.base_collector import AbstractCollector -from core.collectors.command_base import BaseCollectorCommand +from core.collectors import ( + AbstractCollector, + BaseCollectorCommand, + GenericTrackerResult, +) +from core.protocols import TrackerResult from boost_usage_tracker.models import BoostExternalRepository from boost_usage_tracker.boost_searcher import ( @@ -40,11 +44,24 @@ get_or_create_repository, ) from core.operations.github_ops import get_github_client -from core.operations.github_ops.client import ConnectionException, RateLimitException +from core.operations.github_ops.client import ( + ConnectionException, + GitHubAPIClient, + RateLimitException, +) from core.operations.github_ops.tokens import validate_github_token_for_use logger = logging.getLogger(__name__) + +def _require_github_client() -> GitHubAPIClient: + """Return a scraping GitHub client or raise if credentials are unavailable.""" + client = get_github_client(use="scraping") + if client is None: + raise RuntimeError("GitHub client unavailable for boost_usage_tracker") + return client + + # --------------------------------------------------------------------------- # Ensure GitHubRepository from a search result # --------------------------------------------------------------------------- @@ -196,7 +213,7 @@ def task_monitor_content( until.date(), min_stars, ) - client = get_github_client(use="scraping") + client = _require_github_client() repo_results = search_repos_with_date_splitting( client, @@ -236,7 +253,7 @@ def task_monitor_stars( ) -> None: """Monthly task: find all C++ repos with 10+ stars, process new ones.""" now = datetime.now(timezone.utc) - client = get_github_client(use="scraping") + client = _require_github_client() # Load all already-tracked repos with their current star counts. # full_name is "owner/repo_name"; map to (repo_pk, current_stars) so we can @@ -344,22 +361,26 @@ def validate_config(self) -> None: except ValueError as e: raise CommandError(str(e)) from e - def collect(self) -> None: + def collect(self) -> TrackerResult: logger.info( "run_boost_usage_tracker: starting (task=%s, dry_run=%s)", self.task_filter or "all", self.dry_run, ) try: + tasks_run = 0 if not self.task_filter or self.task_filter == "monitor_content": task_monitor_content( self.since, self.until, self.min_stars, self.dry_run ) + tasks_run += 1 if not self.task_filter or self.task_filter == "monitor_stars": task_monitor_stars(self.min_stars, self.dry_run) + tasks_run += 1 logger.info("run_boost_usage_tracker: finished successfully") + return GenericTrackerResult.ok(tasks=tasks_run, dry_run=int(self.dry_run)) except (ConnectionException, RateLimitException) as e: logger.exception( "run_boost_usage_tracker failed (rate limit / connection): %s", diff --git a/clang_github_tracker/collectors.py b/clang_github_tracker/collectors.py index dca59c9d..f7b41a7c 100644 --- a/clang_github_tracker/collectors.py +++ b/clang_github_tracker/collectors.py @@ -10,6 +10,11 @@ from django.core.management.base import CommandError from core.collectors.base_collector import AbstractCollector +from core.protocols import IncrementalState, TrackerResult +from clang_github_tracker.protocol_impl import ( + ClangGithubIncrementalState, + ClangGithubTrackerResult, +) from core.utils.datetime_parsing import parse_iso_datetime from clang_github_tracker import state_manager as clang_state from clang_github_tracker.sync_raw import sync_clang_github_activity @@ -92,7 +97,16 @@ def validate_config(self) -> None: except ValueError as e: raise CommandError(str(e)) from e - def collect(self) -> None: + def load_incremental_state(self) -> IncrementalState | None: + start_commit, start_item, _end = clang_state.resolve_start_end_dates( + self._since_dt, self._until_dt + ) + return ClangGithubIncrementalState.from_watermarks( + start_commit=start_commit, + start_item=start_item, + ) + + def collect(self) -> TrackerResult: start_commit, start_item, end_date = clang_state.resolve_start_end_dates( self._since_dt, self._until_dt ) @@ -115,10 +129,11 @@ def collect(self) -> None: if not self.skip_pinecone: logger.info("dry-run: would run Pinecone upsert for issues and PRs") logger.info("dry-run finished") - return + return ClangGithubTrackerResult.dry_run() issue_numbers: list[int] = [] pr_numbers: list[int] = [] + commits_saved = 0 if not self.skip_github_sync: commits_saved, issue_numbers, pr_numbers = sync_clang_github_activity( @@ -176,6 +191,12 @@ def collect(self) -> None: logger.info("skipping remote push (--skip-remote-push)") logger.info("run_clang_github_tracker finished successfully") + return ClangGithubTrackerResult.from_sync( + commits_saved=commits_saved, + issue_count=len(issue_numbers), + pr_count=len(pr_numbers), + md_files=len(new_files), + ) def sync_pinecone(self) -> None: if self.dry_run or self.skip_pinecone: diff --git a/clang_github_tracker/protocol_impl.py b/clang_github_tracker/protocol_impl.py new file mode 100644 index 00000000..380d8c83 --- /dev/null +++ b/clang_github_tracker/protocol_impl.py @@ -0,0 +1,73 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for Clang GitHub tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Mapping + + +@dataclass(frozen=True) +class ClangGithubTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for Clang sync outcomes.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_sync( + cls, + *, + commits_saved: int, + issue_count: int, + pr_count: int, + md_files: int = 0, + ) -> ClangGithubTrackerResult: + return cls( + success=True, + counts={ + "commits": commits_saved, + "issues": issue_count, + "pull_requests": pr_count, + "md_files": md_files, + }, + ) + + @classmethod + def dry_run(cls) -> ClangGithubTrackerResult: + return cls(success=True, counts={}) + + +@dataclass(frozen=True) +class ClangGithubIncrementalState: + """Checkpoint between Clang GitHub runs.""" + + checkpoint_token: str | None + human_readable_marker: str | None + extras: Mapping[str, Any] = field(default_factory=dict) + + @classmethod + def from_watermarks( + cls, + *, + start_commit: datetime | str | None, + start_item: datetime | str | None, + ) -> ClangGithubIncrementalState: + def _marker_value(value: datetime | str | None) -> str | None: + if value is None: + return None + if isinstance(value, datetime): + return value.isoformat() + return value + + commit_marker = _marker_value(start_commit) + item_marker = _marker_value(start_item) + marker_parts = [p for p in (commit_marker, item_marker) if p is not None] + marker = "|".join(marker_parts) if marker_parts else None + return cls( + checkpoint_token="clang:llvm/llvm-project", + human_readable_marker=marker, + extras={"start_commit": commit_marker, "start_item": item_marker}, + ) diff --git a/clang_github_tracker/tests/test_protocol_impl.py b/clang_github_tracker/tests/test_protocol_impl.py new file mode 100644 index 00000000..1e14a9cb --- /dev/null +++ b/clang_github_tracker/tests/test_protocol_impl.py @@ -0,0 +1,21 @@ +"""Tests for :mod:`clang_github_tracker.protocol_impl`.""" + +from core.protocols import IncrementalState, TrackerResult + +from clang_github_tracker.protocol_impl import ( + ClangGithubIncrementalState, + ClangGithubTrackerResult, +) + + +def test_clang_tracker_result_from_sync() -> None: + r = ClangGithubTrackerResult.from_sync( + commits_saved=1, issue_count=2, pr_count=3, md_files=4 + ) + assert isinstance(r, TrackerResult) + assert r.counts["commits"] == 1 + + +def test_clang_incremental_state() -> None: + st = ClangGithubIncrementalState.from_watermarks(start_commit="c", start_item="i") + assert isinstance(st, IncrementalState) diff --git a/core/collectors/README.md b/core/collectors/README.md index d39e2f1c..4a165de7 100644 --- a/core/collectors/README.md +++ b/core/collectors/README.md @@ -6,8 +6,10 @@ Collector orchestration shared by every `run_*` management command. | Module | Role | | --- | --- | -| [`base_collector.py`](base_collector.py) | `AbstractCollector` (`validate_config`, `collect`), `CollectorRunnable` protocol, lifecycle mixin (`sync_pinecone`, `handle_error`). | -| [`command_base.py`](command_base.py) | `BaseCollectorCommand` — Django template: `get_collector` → `run` → `sync_pinecone`. | +| [`base_collector.py`](base_collector.py) | `AbstractCollector` (`validate_config`, `collect() -> TrackerResult`), `CollectorRunnable` protocol, lifecycle mixin (`load_incremental_state`, `sync_pinecone`, `handle_error`). | +| [`command_base.py`](command_base.py) | `BaseCollectorCommand` — Django template: `get_collector` → `run` (logs `TrackerResult`) → `sync_pinecone`. | +| [`../tracker_result.py`](../tracker_result.py) | `GenericTrackerResult` — default frozen `TrackerResult` DTO. | +| [`../incremental_state.py`](../incremental_state.py) | `GenericIncrementalState` — default frozen `IncrementalState` DTO. | ## Usage diff --git a/core/collectors/__init__.py b/core/collectors/__init__.py index 14aa2351..aa8079d9 100644 --- a/core/collectors/__init__.py +++ b/core/collectors/__init__.py @@ -6,11 +6,15 @@ ) from core.collectors.command_base import BaseCollectorCommand from core.errors import CollectorFailureCategory, classify_failure +from core.incremental_state import GenericIncrementalState +from core.tracker_result import GenericTrackerResult __all__ = [ "AbstractCollector", "BaseCollectorCommand", "CollectorFailureCategory", "CollectorRunnable", + "GenericIncrementalState", + "GenericTrackerResult", "classify_failure", ] diff --git a/core/collectors/base_collector.py b/core/collectors/base_collector.py index ca40c60c..0c30b62c 100644 --- a/core/collectors/base_collector.py +++ b/core/collectors/base_collector.py @@ -10,10 +10,13 @@ from __future__ import annotations import logging +import time from abc import ABC, abstractmethod from typing import Protocol, runtime_checkable from core.errors import classify_failure +from core.protocols import IncrementalState, TrackerResult, require_tracker_result +from core.tracker_result import with_duration_if_missing logger = logging.getLogger(__name__) @@ -28,7 +31,7 @@ class CollectorRunnable(Protocol): routes failures through :meth:`handle_error` (except :class:`~django.core.management.base.CommandError`). """ - def run(self) -> None: + def run(self) -> TrackerResult | None: """Main collection phase; see :class:`AbstractCollector`.""" ... @@ -40,6 +43,11 @@ def handle_error(self, exc: BaseException) -> None: """Log *exc* with structured ``failure_category``; must not swallow *exc*.""" ... + @property + def last_result(self) -> TrackerResult | None: + """Outcome of the most recent successful :meth:`run`, if any.""" + ... + class _CollectorLifecycleMixin: """ @@ -58,6 +66,10 @@ class _CollectorLifecycleMixin: :meth:`handle_error` when you need a different category or extra context. """ + _last_result: TrackerResult | None + _incremental_state_in: IncrementalState | None + _incremental_state_out: IncrementalState | None + def pre_collect(self) -> None: """ Optional setup before :meth:`~AbstractCollector.validate_config`. @@ -74,11 +86,22 @@ def post_collect(self) -> None: """ Optional teardown or reporting after :meth:`~AbstractCollector.collect` succeeds. - Default is no-op. + Default persists :attr:`_incremental_state_out` when set. Returns: None """ + state_out = getattr(self, "_incremental_state_out", None) + if state_out is not None: + self.persist_incremental_state(state_out) + return None + + def load_incremental_state(self) -> IncrementalState | None: + """Load checkpoint from DB/workspace; override in incremental collectors.""" + return None + + def persist_incremental_state(self, state: IncrementalState) -> None: + """Persist checkpoint after a successful run; override in incremental collectors.""" return None def on_error(self, exc: BaseException) -> None: @@ -132,6 +155,11 @@ def sync_pinecone(self) -> None: """ return None + @property + def last_result(self) -> TrackerResult | None: + """Outcome of the most recent successful :meth:`run`, if any.""" + return getattr(self, "_last_result", None) + class AbstractCollector(_CollectorLifecycleMixin, ABC): """ @@ -155,6 +183,9 @@ class AbstractCollector(_CollectorLifecycleMixin, ABC): :func:`classify_failure` does not map your domain errors cleanly. """ + def __init_subclass__(cls, **kwargs: object) -> None: + super().__init_subclass__(**kwargs) + @property @abstractmethod def name(self) -> str: @@ -180,12 +211,12 @@ def validate_config(self) -> None: """ @abstractmethod - def collect(self) -> None: + def collect(self) -> TrackerResult: """ Main collection work (fetch, transform, persist). Returns: - None + A :class:`~core.protocols.TrackerResult` describing the run outcome. Raises: Exception: Domain failures; propagate after logging when run under @@ -196,12 +227,21 @@ def collect(self) -> None: conventions. """ - def run(self) -> None: + def run(self) -> TrackerResult: + self._incremental_state_in = None + self._incremental_state_out = None + started = time.monotonic() try: self.pre_collect() self.validate_config() - self.collect() + self._incremental_state_in = self.load_incremental_state() + raw_result = self.collect() + result = require_tracker_result(raw_result) + elapsed = time.monotonic() - started + result = with_duration_if_missing(result, elapsed) + self._last_result = result self.post_collect() + return result except Exception as exc: try: self.on_error(exc) diff --git a/core/collectors/command_base.py b/core/collectors/command_base.py index 6d322d22..de853dc5 100644 --- a/core/collectors/command_base.py +++ b/core/collectors/command_base.py @@ -17,10 +17,41 @@ from django.core.management.base import BaseCommand, CommandError from core.collectors.base_collector import CollectorRunnable +from core.protocols import TrackerResult logger = logging.getLogger(__name__) +def _records_collected(result: TrackerResult) -> int: + """Sum count values, excluding meta keys like ``errors``.""" + return sum(v for k, v in result.counts.items() if k != "errors") + + +def _log_collector_result(collector: CollectorRunnable, result: TrackerResult) -> None: + collector_id = getattr(collector, "name", None) + if not isinstance(collector_id, str) or not collector_id: + collector_id = collector.__class__.__name__ + records = _records_collected(result) + logger.info( + "Collector finished: collector=%s success=%s records_collected=%s " + "error_count=%s duration_seconds=%s counts=%s", + collector_id, + result.success, + records, + len(result.errors), + result.duration_seconds, + dict(result.counts), + extra={ + "collector": collector_id, + "success": result.success, + "records_collected": records, + "error_count": len(result.errors), + "duration_seconds": result.duration_seconds, + "counts": dict(result.counts), + }, + ) + + class BaseCollectorCommand(ABC, BaseCommand): """ Thin Django ``BaseCommand`` adapter using the template-method pattern. @@ -61,6 +92,9 @@ def get_collector(self, **options: Any) -> CollectorRunnable: def handle(self, *args: Any, **options: Any) -> None: collector = self.get_collector(**options) self._run_collector_phase(collector, collector.run) + result = getattr(collector, "last_result", None) + if result is not None and isinstance(result, TrackerResult): + _log_collector_result(collector, result) self._run_collector_phase(collector, collector.sync_pinecone) def _run_collector_phase( diff --git a/core/incremental_state.py b/core/incremental_state.py new file mode 100644 index 00000000..564ce939 --- /dev/null +++ b/core/incremental_state.py @@ -0,0 +1,15 @@ +"""Shared :class:`~core.protocols.IncrementalState` implementation for collectors.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Mapping + + +@dataclass(frozen=True) +class GenericIncrementalState: + """Default frozen DTO satisfying :class:`~core.protocols.IncrementalState`.""" + + checkpoint_token: str | None + human_readable_marker: str | None + extras: Mapping[str, Any] = field(default_factory=dict) diff --git a/core/management/commands/startcollector.py b/core/management/commands/startcollector.py index c12802a9..8ad91bbf 100644 --- a/core/management/commands/startcollector.py +++ b/core/management/commands/startcollector.py @@ -172,7 +172,7 @@ def _run_command_py(app_label: str, pascal: str) -> str: import logging from typing import Any - from core.collectors import AbstractCollector, BaseCollectorCommand + from core.collectors import AbstractCollector, BaseCollectorCommand, GenericTrackerResult import {app_label}.services as services @@ -195,7 +195,7 @@ def validate_config(self) -> None: if not self.source_key or not self.source_key.strip(): raise ValueError("source_key must not be empty") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: logger.info("run_{app_label}: starting") _, created = services.record_run(source_key=self.source_key.strip()) self.stdout.write( @@ -204,6 +204,7 @@ def collect(self) -> None: ) ) logger.info("run_{app_label}: finished successfully") + return GenericTrackerResult.ok(runs=1) class Command(BaseCollectorCommand): diff --git a/core/protocols.py b/core/protocols.py index e2c58b0b..b0ed8ad8 100644 --- a/core/protocols.py +++ b/core/protocols.py @@ -17,6 +17,7 @@ from __future__ import annotations +from collections.abc import Sequence from datetime import datetime from typing import Any, Mapping, Protocol, runtime_checkable @@ -33,6 +34,12 @@ def success(self) -> bool: ... @property def counts(self) -> Mapping[str, int]: ... + @property + def errors(self) -> Sequence[str]: ... + + @property + def duration_seconds(self) -> float | None: ... + @runtime_checkable class ActivityRecord(Protocol): @@ -86,3 +93,10 @@ def require_activity_record(obj: object) -> ActivityRecord: if not isinstance(obj, ActivityRecord): raise TypeError(f"expected ActivityRecord, got {type(obj).__name__!r}") return obj + + +def require_incremental_state(obj: object) -> IncrementalState: + """Return *obj* if it satisfies :class:`IncrementalState`; else raise ``TypeError``.""" + if not isinstance(obj, IncrementalState): + raise TypeError(f"expected IncrementalState, got {type(obj).__name__!r}") + return obj diff --git a/core/pyright_samples/protocol_assignment_positive.py b/core/pyright_samples/protocol_assignment_positive.py index bd968ca7..0de6ac90 100644 --- a/core/pyright_samples/protocol_assignment_positive.py +++ b/core/pyright_samples/protocol_assignment_positive.py @@ -12,6 +12,8 @@ class _LocalTrackerResult: success: bool counts: Mapping[str, int] + errors: tuple[str, ...] = () + duration_seconds: float | None = None def sample_tracker_result() -> TrackerResult: diff --git a/core/tests/test_collector_protocol_conformance.py b/core/tests/test_collector_protocol_conformance.py new file mode 100644 index 00000000..5aff0bb9 --- /dev/null +++ b/core/tests/test_collector_protocol_conformance.py @@ -0,0 +1,80 @@ +"""Runtime protocol conformance for collector DTO implementations.""" + +from __future__ import annotations + +import pytest + +from core.incremental_state import GenericIncrementalState +from core.protocols import IncrementalState, TrackerResult +from core.tracker_result import GenericTrackerResult + +from boost_library_docs_tracker.protocol_impl import LibraryDocsTrackerResult +from boost_library_tracker.protocol_impl import CollectBoostLibrariesResult +from boost_library_usage_dashboard.protocol_impl import UsageDashboardTrackerResult +from boost_mailing_list_tracker.protocol_impl import ( + MailingListIncrementalState, + MailingListTrackerResult, +) +from clang_github_tracker.protocol_impl import ( + ClangGithubIncrementalState, + ClangGithubTrackerResult, +) +from cppa_pinecone_sync.protocol_impl import PineconeSyncTrackerResult +from cppa_slack_tracker.protocol_impl import SlackIncrementalState, SlackTrackerResult +from cppa_youtube_script_tracker.protocol_impl import YoutubeScriptTrackerResult +from discord_activity_tracker.protocol_impl import ( + DiscordCollectionTrackerResult, + DiscordIncrementalState, +) +from github_activity_tracker.protocol_impl import ( + GitHubIncrementalState, + GitHubSyncTrackerResult, +) +from wg21_paper_tracker.protocol_impl import Wg21PaperTrackerResult +from wg21_reflector_collector.protocol_impl import ( + Wg21ReflectorIncrementalState, + Wg21ReflectorTrackerResult, +) + + +@pytest.mark.parametrize( + "result", + [ + GenericTrackerResult.ok(), + GitHubSyncTrackerResult(success=True, counts={"issues": 1}), + DiscordCollectionTrackerResult(success=True, counts={"messages": 2}), + PineconeSyncTrackerResult.from_sync_dict( + {"upserted": 1, "total": 1, "failed_count": 0} + ), + UsageDashboardTrackerResult.from_stats({"repos_analyzed": 3}), + Wg21PaperTrackerResult.dry_run(), + ClangGithubTrackerResult.dry_run(), + MailingListTrackerResult.from_run(fetched=1, created=1, skipped=0), + SlackTrackerResult.dry_run(), + YoutubeScriptTrackerResult.from_run(videos=1), + Wg21ReflectorTrackerResult.dry_run(), + LibraryDocsTrackerResult.from_run(versions=1, pages=5), + CollectBoostLibrariesResult.empty(), + ], +) +def test_tracker_result_isinstance(result: TrackerResult) -> None: + assert isinstance(result, TrackerResult) + assert result.errors == () or isinstance(result.errors, tuple) + + +@pytest.mark.parametrize( + "state", + [ + GenericIncrementalState(checkpoint_token="t", human_readable_marker="m"), + GitHubIncrementalState.from_repo_watermark(repo_id=1, marker="2024"), + DiscordIncrementalState.from_after_date(after=None), + MailingListIncrementalState.from_start_date("2024-01-01"), + SlackIncrementalState.from_team(team_id="T1", start_date="2024-01-01"), + ClangGithubIncrementalState.from_watermarks( + start_commit="abc", start_item="2024" + ), + Wg21ReflectorIncrementalState.from_since("2024-01-01"), + ], +) +def test_incremental_state_isinstance(state: IncrementalState) -> None: + assert isinstance(state, IncrementalState) diff --git a/core/tests/test_collectors_base.py b/core/tests/test_collectors_base.py index 57b4f2fe..9df73b58 100644 --- a/core/tests/test_collectors_base.py +++ b/core/tests/test_collectors_base.py @@ -9,6 +9,9 @@ import core.collectors.base_collector as collector_lifecycle from core.collectors.base_collector import AbstractCollector from core.collectors.command_base import BaseCollectorCommand +from core.tracker_result import GenericTrackerResult + +_OK = GenericTrackerResult.ok() class _CallCommandCollector(AbstractCollector): @@ -26,10 +29,11 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: from django.core.management import call_command as _call_command _call_command(self._command_name) + return _OK def test_call_command_collector_collect_invokes_call_command(): @@ -55,8 +59,9 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: phases.append("run") + return _OK def sync_pinecone(self) -> None: phases.append("sync") @@ -80,7 +85,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise CommandError("planned", returncode=3) class Cmd(BaseCollectorCommand): @@ -102,8 +107,8 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: - return None + def collect(self) -> GenericTrackerResult: + return _OK collector = PhaseCollector() collector._error_phase = "fetch" @@ -124,7 +129,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise RuntimeError("boom") class Cmd(BaseCollectorCommand): @@ -157,7 +162,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise ValueError("bad input") class Cmd(BaseCollectorCommand): @@ -182,7 +187,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise RuntimeError("primary") held: dict[str, AbstractCollector] = {} @@ -220,8 +225,9 @@ def pre_collect(self) -> None: def validate_config(self) -> None: order.append("validate") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: order.append("collect") + return _OK def post_collect(self) -> None: order.append("post_collect") @@ -239,8 +245,8 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: - return None + def collect(self) -> GenericTrackerResult: + return _OK Minimal().run() @@ -260,8 +266,9 @@ def pre_collect(self) -> None: def validate_config(self) -> None: calls.append("validate") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: calls.append("collect") + return _OK def post_collect(self) -> None: calls.append("post_collect") @@ -287,8 +294,9 @@ def validate_config(self) -> None: calls.append("validate") raise ValueError("bad config") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: calls.append("collect") + return _OK def post_collect(self) -> None: calls.append("post_collect") @@ -312,7 +320,7 @@ def name(self) -> str: def validate_config(self) -> None: calls.append("validate") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: calls.append("collect") raise RuntimeError("collect failed") @@ -338,8 +346,9 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: calls.append("collect") + return _OK def post_collect(self) -> None: raise RuntimeError("post failed") @@ -361,7 +370,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise RuntimeError("primary") def on_error(self, exc: BaseException) -> None: @@ -380,7 +389,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise RuntimeError("primary") def on_error(self, exc: BaseException) -> None: @@ -401,7 +410,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise RuntimeError("boom") def on_error(self, exc: BaseException) -> None: @@ -432,7 +441,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: raise CommandError("planned", returncode=3) def on_error(self, exc: BaseException) -> None: @@ -461,8 +470,8 @@ def name(self) -> str: def validate_config(self) -> None: pass - def collect(self) -> None: - pass + def collect(self) -> GenericTrackerResult: + return _OK c = Named() c._error_phase = "collect" @@ -470,3 +479,69 @@ def collect(self) -> None: c.handle_error(RuntimeError("x")) mock_exc.assert_called_once() assert "named_slug" in str(mock_exc.call_args) + + +def test_abstract_collector_run_rejects_non_protocol_return(): + class BadReturn(AbstractCollector): + @property + def name(self) -> str: + return "bad_return" + + def validate_config(self) -> None: + return None + + def collect(self): + return {"success": True, "counts": {}} + + with pytest.raises(TypeError, match="TrackerResult"): + BadReturn().run() + + +def test_abstract_collector_run_sets_duration_and_last_result(): + class Counting(AbstractCollector): + @property + def name(self) -> str: + return "counting" + + def validate_config(self) -> None: + return None + + def collect(self) -> GenericTrackerResult: + return GenericTrackerResult.ok(items=3) + + collector = Counting() + result = collector.run() + assert result.counts["items"] == 3 + assert result.duration_seconds is not None + assert collector.last_result is result + + +def test_base_collector_command_logs_tracker_result_fields(): + class OkCollector(AbstractCollector): + @property + def name(self) -> str: + return "logged_collector" + + def validate_config(self) -> None: + return None + + def collect(self) -> GenericTrackerResult: + return GenericTrackerResult.ok(messages=2) + + class Cmd(BaseCollectorCommand): + help = "test" + + def get_collector(self, **options): + return OkCollector() + + import core.collectors.command_base as cmd_mod + + with patch.object(cmd_mod.logger, "info") as mock_info: + Cmd(stdout=StringIO(), stderr=StringIO()).handle() + finished = [ + c + for c in mock_info.call_args_list + if c.args and "Collector finished" in str(c.args[0]) + ] + assert finished + assert finished[0].kwargs["extra"]["records_collected"] == 2 diff --git a/core/tests/test_protocols.py b/core/tests/test_protocols.py index b45c2bb2..adae3652 100644 --- a/core/tests/test_protocols.py +++ b/core/tests/test_protocols.py @@ -20,8 +20,10 @@ IncrementalState, TrackerResult, require_activity_record, + require_incremental_state, require_tracker_result, ) +from core.tracker_result import GenericTrackerResult from discord_activity_tracker.protocol_impl import ( DiscordActivityRecord, DiscordCollectionTrackerResult, @@ -40,6 +42,22 @@ def test_tracker_result_isinstance_github_dataclass() -> None: r = GitHubSyncTrackerResult(success=True, counts={"issues": 2, "pull_requests": 1}) assert isinstance(r, TrackerResult) assert r.counts["issues"] == 2 + assert r.errors == () + assert r.duration_seconds is None + + +def test_tracker_result_isinstance_generic_dataclass() -> None: + r = GenericTrackerResult.ok(items=1) + assert isinstance(r, TrackerResult) + assert r.errors == () + + +def test_require_incremental_state_raises_type_error_on_bad_object() -> None: + class NotState: + checkpoint_token = "x" + + with pytest.raises(TypeError, match="IncrementalState"): + require_incremental_state(NotState()) def test_activity_record_isinstance_discord_dataclass() -> None: diff --git a/core/tracker_result.py b/core/tracker_result.py new file mode 100644 index 00000000..91d77475 --- /dev/null +++ b/core/tracker_result.py @@ -0,0 +1,48 @@ +"""Shared :class:`~core.protocols.TrackerResult` implementation for collectors.""" + +from __future__ import annotations + +from dataclasses import dataclass, field, is_dataclass, replace +from typing import TYPE_CHECKING, Mapping + +if TYPE_CHECKING: + from core.protocols import TrackerResult + + +@dataclass(frozen=True) +class GenericTrackerResult: + """Default frozen DTO satisfying :class:`~core.protocols.TrackerResult`.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def ok(cls, **counts: int) -> GenericTrackerResult: + """Build a successful result with the given count fields.""" + return cls(success=True, counts=dict(counts)) + + @classmethod + def failed(cls, *errors: str, **counts: int) -> GenericTrackerResult: + """Build a failed result with optional error messages and counts.""" + return cls(success=False, counts=dict(counts), errors=tuple(errors)) + + def with_duration(self, duration_seconds: float) -> GenericTrackerResult: + """Return a copy with ``duration_seconds`` set (for framework backfill).""" + if self.duration_seconds is not None: + return self + return replace(self, duration_seconds=duration_seconds) + + +def with_duration_if_missing( + result: TrackerResult, duration_seconds: float +) -> TrackerResult: + """Return *result* unchanged if duration is set; else a copy with duration backfilled.""" + if result.duration_seconds is not None: + return result + if isinstance(result, GenericTrackerResult): + return result.with_duration(duration_seconds) + if is_dataclass(result) and not isinstance(result, type): + return replace(result, duration_seconds=duration_seconds) # type: ignore[return-value] + return result diff --git a/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py b/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py index 813175fb..28fa46b3 100644 --- a/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py +++ b/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py @@ -18,7 +18,9 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import TrackerResult +from cppa_pinecone_sync.protocol_impl import PineconeSyncTrackerResult from cppa_pinecone_sync.sync import sync_to_pinecone from cppa_pinecone_sync.types import PineconeInstance @@ -68,7 +70,7 @@ def validate_config(self) -> None: except (ValueError, ImportError) as e: raise CommandError(str(e)) from e - def collect(self) -> None: + def collect(self) -> TrackerResult: logger.info( "run_cppa_pinecone_sync: starting app_type=%s namespace=%s preprocessor=%s", self.app_type, @@ -76,22 +78,23 @@ def collect(self) -> None: self.preprocessor_path, ) - result = sync_to_pinecone( + raw = sync_to_pinecone( self.app_type, self.namespace, self._preprocess_fn, instance=self.instance, ) + result = PineconeSyncTrackerResult.from_sync_dict(raw) logger.info( "CPPA Pinecone Sync completed: upserted=%s, total=%s, failed_count=%s", - result["upserted"], - result["total"], - result["failed_count"], + result.counts.get("upserted", 0), + result.counts.get("total", 0), + result.counts.get("errors", 0), ) - if result.get("errors"): - for err in result["errors"]: - logger.warning("Sync error: %s", err) + for err in result.errors: + logger.warning("Sync error: %s", err) logger.info("run_cppa_pinecone_sync: finished successfully") + return result class Command(BaseCollectorCommand): @@ -153,7 +156,7 @@ def get_collector(self, **options: Any) -> AbstractCollector: return CppaPineconeSyncCollector( app_type=app_type, - namespace=namespace, - preprocessor_path=preprocessor_path, + namespace=namespace or "", + preprocessor_path=preprocessor_path or "", instance=instance, ) diff --git a/cppa_pinecone_sync/protocol_impl.py b/cppa_pinecone_sync/protocol_impl.py new file mode 100644 index 00000000..f7a0b10f --- /dev/null +++ b/cppa_pinecone_sync/protocol_impl.py @@ -0,0 +1,31 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for Pinecone sync.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Mapping + + +@dataclass(frozen=True) +class PineconeSyncTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for ``sync_to_pinecone`` outcomes.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_sync_dict(cls, d: Mapping[str, Any]) -> PineconeSyncTrackerResult: + raw_errors = d.get("errors") or [] + errors = tuple(str(e) for e in raw_errors) + failed = int(d.get("failed_count") or 0) + return cls( + success=failed == 0 and not errors, + counts={ + "upserted": int(d.get("upserted") or 0), + "total": int(d.get("total") or 0), + "errors": failed, + }, + errors=errors, + ) diff --git a/cppa_pinecone_sync/tests/test_protocol_impl.py b/cppa_pinecone_sync/tests/test_protocol_impl.py new file mode 100644 index 00000000..fc844988 --- /dev/null +++ b/cppa_pinecone_sync/tests/test_protocol_impl.py @@ -0,0 +1,15 @@ +"""Tests for :mod:`cppa_pinecone_sync.protocol_impl`.""" + +from core.protocols import TrackerResult + +from cppa_pinecone_sync.protocol_impl import PineconeSyncTrackerResult + + +def test_pinecone_sync_tracker_result_from_sync_dict() -> None: + r = PineconeSyncTrackerResult.from_sync_dict( + {"upserted": 2, "total": 3, "failed_count": 1, "errors": ["x"]} + ) + assert isinstance(r, TrackerResult) + assert r.counts["upserted"] == 2 + assert r.errors == ("x",) + assert r.success is False diff --git a/cppa_slack_tracker/management/commands/run_cppa_slack_tracker.py b/cppa_slack_tracker/management/commands/run_cppa_slack_tracker.py index 3680fca0..612ebbaa 100644 --- a/cppa_slack_tracker/management/commands/run_cppa_slack_tracker.py +++ b/cppa_slack_tracker/management/commands/run_cppa_slack_tracker.py @@ -20,6 +20,8 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import IncrementalState, TrackerResult +from cppa_slack_tracker.protocol_impl import SlackIncrementalState, SlackTrackerResult from cppa_slack_tracker.models import SlackTeam from cppa_slack_tracker.services import save_slack_message @@ -63,6 +65,7 @@ def __init__( self.team_id = team_id self.options = options self._team: SlackTeam | None = None + self._counts: dict[str, int] = {} @property def name(self) -> str: @@ -71,13 +74,18 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def load_incremental_state(self) -> IncrementalState | None: + start = (self.options.get("start_date") or "").strip() or None + return SlackIncrementalState.from_team(team_id=self.team_id, start_date=start) + + def collect(self) -> TrackerResult: dry_run = self.options.get("dry_run", False) if dry_run: self._print_dry_run() - return + return SlackTrackerResult.dry_run() self._team = sync_team(self.team_id) + self._counts = {} if self.options.get("sync_users"): self._sync_users(self._team) @@ -98,6 +106,8 @@ def collect(self) -> None: self._sync_channels(self._team) self._sync_messages(self._team) + return SlackTrackerResult.from_counts(**self._counts) + def sync_pinecone(self) -> None: if self.options.get("dry_run"): return @@ -179,6 +189,8 @@ def _sync_users(self, team: SlackTeam) -> None: team_id=team.team_id, include_bots=True, ) + self._counts["users"] = self._counts.get("users", 0) + success_count + self._counts["user_errors"] = self._counts.get("user_errors", 0) + error_count logger.info( "Synced %s users, %s errors", success_count, @@ -194,6 +206,10 @@ def _sync_channels(self, team: SlackTeam) -> None: channel_id=channel_id, team_id=team.team_id, ) + self._counts["channels"] = self._counts.get("channels", 0) + success_count + self._counts["channel_errors"] = ( + self._counts.get("channel_errors", 0) + error_count + ) logger.info( "Synced %s channels, %s errors", success_count, @@ -208,6 +224,9 @@ def _sync_channel_users(self, team: SlackTeam) -> None: team, channel_id=channel_id, ) + self._counts["channel_memberships"] = ( + self._counts.get("channel_memberships", 0) + success_count + ) logger.info( "Synced %s channel member lists, %s errors", success_count, @@ -315,6 +334,8 @@ def _sync_messages(self, team: SlackTeam) -> None: logger.info("Syncing messages per channel...") for channel in channels: s, e = sync_messages(channel, start_date=start_d, end_date=end_d) + self._counts["messages"] = self._counts.get("messages", 0) + s + self._counts["message_errors"] = self._counts.get("message_errors", 0) + e logger.info( " #%s: %s saved, %s errors", channel.channel_name, diff --git a/cppa_slack_tracker/protocol_impl.py b/cppa_slack_tracker/protocol_impl.py new file mode 100644 index 00000000..45911f84 --- /dev/null +++ b/cppa_slack_tracker/protocol_impl.py @@ -0,0 +1,43 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for CPPA Slack tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Mapping + + +@dataclass(frozen=True) +class SlackTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for Slack sync runs.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_counts(cls, **counts: int) -> SlackTrackerResult: + return cls(success=True, counts=dict(counts)) + + @classmethod + def dry_run(cls) -> SlackTrackerResult: + return cls(success=True, counts={}) + + +@dataclass(frozen=True) +class SlackIncrementalState: + """Checkpoint between Slack message sync runs.""" + + checkpoint_token: str | None + human_readable_marker: str | None + extras: Mapping[str, Any] = field(default_factory=dict) + + @classmethod + def from_team( + cls, *, team_id: str, start_date: str | None + ) -> SlackIncrementalState: + return cls( + checkpoint_token=f"slack:team:{team_id}", + human_readable_marker=start_date, + extras={"team_id": team_id, "start_date": start_date}, + ) diff --git a/cppa_slack_tracker/tests/test_protocol_impl.py b/cppa_slack_tracker/tests/test_protocol_impl.py new file mode 100644 index 00000000..f6971f4d --- /dev/null +++ b/cppa_slack_tracker/tests/test_protocol_impl.py @@ -0,0 +1,15 @@ +"""Tests for :mod:`cppa_slack_tracker.protocol_impl`.""" + +from core.protocols import IncrementalState, TrackerResult + +from cppa_slack_tracker.protocol_impl import SlackIncrementalState, SlackTrackerResult + + +def test_slack_tracker_result() -> None: + r = SlackTrackerResult.from_counts(messages=10, users=5) + assert isinstance(r, TrackerResult) + + +def test_slack_incremental_state() -> None: + st = SlackIncrementalState.from_team(team_id="T", start_date="2024-01-01") + assert isinstance(st, IncrementalState) diff --git a/cppa_user_tracker/management/commands/run_cppa_user_tracker.py b/cppa_user_tracker/management/commands/run_cppa_user_tracker.py index 1d78c120..33cbf3b1 100644 --- a/cppa_user_tracker/management/commands/run_cppa_user_tracker.py +++ b/cppa_user_tracker/management/commands/run_cppa_user_tracker.py @@ -9,7 +9,12 @@ import logging from typing import Any -from core.collectors import AbstractCollector, BaseCollectorCommand +from core.collectors import ( + AbstractCollector, + BaseCollectorCommand, + GenericTrackerResult, +) +from core.protocols import TrackerResult logger = logging.getLogger(__name__) @@ -28,11 +33,12 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> TrackerResult: logger.info("run_cppa_user_tracker: starting") # Stub: add logic (stage relations, merge into Identity/BaseProfile, etc.) self.stdout.write(self.style.SUCCESS("CPPA User Tracker completed (stub).")) logger.info("run_cppa_user_tracker: finished successfully") + return GenericTrackerResult.ok() class Command(BaseCollectorCommand): diff --git a/cppa_youtube_script_tracker/management/commands/run_cppa_youtube_script_tracker.py b/cppa_youtube_script_tracker/management/commands/run_cppa_youtube_script_tracker.py index 0fc7ef73..ffd0bd6f 100644 --- a/cppa_youtube_script_tracker/management/commands/run_cppa_youtube_script_tracker.py +++ b/cppa_youtube_script_tracker/management/commands/run_cppa_youtube_script_tracker.py @@ -27,6 +27,8 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import TrackerResult +from cppa_youtube_script_tracker.protocol_impl import YoutubeScriptTrackerResult from django.utils.dateparse import parse_datetime from cppa_user_tracker.services import get_or_create_youtube_speaker @@ -461,7 +463,7 @@ def validate_config(self) -> None: "(when --end-time is omitted, the end is the current UTC time)." ) - def collect(self) -> None: + def collect(self) -> TrackerResult: o = self.options start_time_arg = (o.get("start_time") or "").strip() end_time_arg = (o.get("end_time") or "").strip() @@ -495,10 +497,11 @@ def collect(self) -> None: f"{end_time.isoformat()}. No API calls or DB writes." ) ) - return + return YoutubeScriptTrackerResult.from_run(dry_run=True) - self.cmd._phase_2(start_time, end_time, channel_title) + videos = self.cmd._phase_2(start_time, end_time, channel_title) self.cmd._phase_3(skip_transcript) + return YoutubeScriptTrackerResult.from_run(videos=videos or 0) except Exception: logger.exception("run_cppa_youtube_script_tracker: unhandled error") @@ -583,7 +586,7 @@ def _phase_1(self, dry_run: bool) -> None: def _phase_2( self, start_time: datetime, end_time: datetime, channel_title: str - ) -> None: + ) -> int: created_count, skipped_count = _run_phase_2(start_time, end_time, channel_title) if created_count == 0 and skipped_count == 0: self.stdout.write(self.style.WARNING("Phase 2: no new videos fetched.")) @@ -599,6 +602,7 @@ def _phase_2( created_count, skipped_count, ) + return created_count def _phase_3(self, skip_transcript: bool) -> None: if skip_transcript: diff --git a/cppa_youtube_script_tracker/protocol_impl.py b/cppa_youtube_script_tracker/protocol_impl.py new file mode 100644 index 00000000..58a42c09 --- /dev/null +++ b/cppa_youtube_script_tracker/protocol_impl.py @@ -0,0 +1,28 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for YouTube script tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Mapping + + +@dataclass(frozen=True) +class YoutubeScriptTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for YouTube runs.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_run( + cls, + *, + videos: int = 0, + dry_run: bool = False, + ) -> YoutubeScriptTrackerResult: + return cls( + success=True, + counts={"videos": videos, "dry_run": int(dry_run)}, + ) diff --git a/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py b/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py index cefe9e1a..99f99906 100644 --- a/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py +++ b/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py @@ -33,6 +33,8 @@ from asgiref.sync import sync_to_async from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import TrackerResult +from discord_activity_tracker.protocol_impl import DiscordCollectionTrackerResult from discord_activity_tracker.pinecone_runner import task_discord_pinecone_sync from discord_activity_tracker.services import ( get_or_create_discord_channel, @@ -89,7 +91,7 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: + def collect(self) -> TrackerResult: import_dir = get_cpp_discussion_import_dir() json_files = sorted( filter_discord_export_json_paths(import_dir.rglob("*.json")) @@ -105,7 +107,9 @@ def collect(self) -> None: f" (dry-run) would import {_json_display_path(import_dir, p)}" ) self.stdout.write(self.style.WARNING("DRY RUN — no writes or deletes")) - return + return DiscordCollectionTrackerResult( + success=True, counts={"files": len(json_files), "dry_run": 1} + ) processed_total = 0 for i, json_path in enumerate(json_files, 1): @@ -140,6 +144,10 @@ def collect(self) -> None: f"{len(json_files)} file(s)" ) ) + return DiscordCollectionTrackerResult( + success=True, + counts={"messages": processed_total, "files": len(json_files)}, + ) async def _persist_channel( self, diff --git a/discord_activity_tracker/management/commands/run_discord_activity_tracker.py b/discord_activity_tracker/management/commands/run_discord_activity_tracker.py index f96d520a..432b009c 100644 --- a/discord_activity_tracker/management/commands/run_discord_activity_tracker.py +++ b/discord_activity_tracker/management/commands/run_discord_activity_tracker.py @@ -44,6 +44,11 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import IncrementalState, TrackerResult +from discord_activity_tracker.protocol_impl import ( + DiscordCollectionTrackerResult, + DiscordIncrementalState, +) from core.utils.datetime_parsing import parse_iso_datetime from discord_activity_tracker.models import DiscordServer from discord_activity_tracker.pinecone_runner import task_discord_pinecone_sync @@ -172,17 +177,17 @@ def task_discord_sync( after_date: datetime | None, before_date: datetime | None, collector: "DiscordActivityCollector", -) -> None: +) -> int: """DiscordChatExporter → parse → db_sync → archive JSON per channel.""" if skip_discord_sync: logger.info("skipping Discord fetch / DB / raw (--skip-discord-sync)") - return + return 0 if dry_run: logger.info( "dry-run would run DiscordChatExporter and persist messages + raw JSON" ) - return + return 0 raw_root = get_raw_dir() staging = get_exporter_staging_dir() @@ -269,6 +274,7 @@ def task_discord_sync( ) ) logger.debug("raw archive root: %s", raw_root) + return processed_total def task_markdown_export_and_push( @@ -365,8 +371,19 @@ def name(self) -> str: def validate_config(self) -> None: return None - def collect(self) -> None: - self.cmd._handle_core(self.options, collector=self) + def load_incremental_state(self) -> IncrementalState | None: + guild_id: int | None = getattr(settings, "DISCORD_SERVER_ID", None) + if not guild_id: + return None + after_date, _before = _resolve_exporter_date_bounds( + self.options, + guild_snowflake=guild_id, + channel_ids=self.channel_ids, + ) + return DiscordIncrementalState.from_after_date(after=after_date) + + def collect(self) -> TrackerResult: + return self.cmd._handle_core(self.options, collector=self) def sync_pinecone(self) -> None: if self.options.get("dry_run") or self.options.get("skip_pinecone"): @@ -524,7 +541,9 @@ def get_collector(self, **options: Any) -> AbstractCollector: opts["skip_pinecone"] = False return DiscordActivityCollector(cmd=self, options=opts) - def _handle_core(self, options: dict, collector: DiscordActivityCollector) -> None: + def _handle_core( + self, options: dict, collector: DiscordActivityCollector + ) -> DiscordCollectionTrackerResult: dry_run = options["dry_run"] skip_discord_sync = options["skip_discord_sync"] skip_markdown_export = options["skip_markdown_export"] @@ -615,9 +634,11 @@ def _handle_core(self, options: dict, collector: DiscordActivityCollector) -> No " Upper bound (--before): none (through present)" ) logger.info("finished successfully (dry-run)") - return + return DiscordCollectionTrackerResult( + success=True, counts={"dry_run": 1} + ) - task_discord_sync( + messages_synced = task_discord_sync( dry_run=False, skip_discord_sync=skip_discord_sync, user_token=user_token, @@ -640,6 +661,15 @@ def _handle_core(self, options: dict, collector: DiscordActivityCollector) -> No logger.info("skipping Pinecone (--skip-pinecone)") logger.info("finished successfully") + return DiscordCollectionTrackerResult( + success=True, + counts={ + "messages": messages_synced, + "channels": ( + len(collector.channel_ids) if collector.channel_ids else 0 + ), + }, + ) except Exception as e: logger.exception("command failed: %s", e) raise diff --git a/discord_activity_tracker/protocol_impl.py b/discord_activity_tracker/protocol_impl.py index 3dc73918..ec6b3ca5 100644 --- a/discord_activity_tracker/protocol_impl.py +++ b/discord_activity_tracker/protocol_impl.py @@ -23,6 +23,8 @@ class DiscordCollectionTrackerResult: success: bool counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None @dataclass(frozen=True) diff --git a/docs/Core_public_API.md b/docs/Core_public_API.md index 0c0acad0..8f4c2a9d 100644 --- a/docs/Core_public_API.md +++ b/docs/Core_public_API.md @@ -8,9 +8,11 @@ The `core` Django app holds shared infrastructure. Treat the following as the ** | Import | Purpose | |--------|---------| -| `core.collectors.AbstractCollector` | Collector contract: `name`, `validate_config()`, `collect()`; concrete `run()` runs validate then collect; optional `sync_pinecone()`, `handle_error()` with structured logging. | -| `core.collectors.CollectorRunnable` | `Protocol` for objects returned from `get_collector()` (`run`, `sync_pinecone`, `handle_error`). | -| `core.collectors.BaseCollectorCommand` | Thin `BaseCommand` adapter: runs `get_collector(**opts).run()` then `sync_pinecone()`. | +| `core.collectors.AbstractCollector` | Collector contract: `name`, `validate_config()`, `collect() -> TrackerResult`; concrete `run()` runs validate → `load_incremental_state()` → collect (validates result, backfills `duration_seconds`) → `post_collect()`; optional `sync_pinecone()`, `handle_error()` with structured logging. | +| `core.collectors.CollectorRunnable` | `Protocol` for objects returned from `get_collector()` (`run() -> TrackerResult`, `sync_pinecone`, `handle_error`, `last_result`). | +| `core.collectors.BaseCollectorCommand` | Thin `BaseCommand` adapter: runs `get_collector(**opts).run()`, logs structured `TrackerResult` fields, then `sync_pinecone()`. | +| `core.collectors.GenericTrackerResult` | Default frozen `TrackerResult` DTO (`ok()`, `failed()`); used by stubs and simple collectors. | +| `core.collectors.GenericIncrementalState` | Default frozen `IncrementalState` DTO for checkpoint hooks. | ### Application collectors @@ -20,6 +22,7 @@ All **application** collectors listed below subclass **`AbstractCollector`** (`n |--------|-----------------|----------------| | `run_boost_usage_tracker` | `BoostUsageTrackerCollector` | `boost_usage_tracker.management.commands.run_boost_usage_tracker` | | `run_boost_github_activity_tracker` | `BoostGithubActivityCollector` | `boost_library_tracker.management.commands.run_boost_github_activity_tracker` | +| `collect_boost_libraries` | `CollectBoostLibrariesCollector` | `boost_library_tracker.management.commands.collect_boost_libraries` | | `run_clang_github_tracker` | `ClangGithubTrackerCollector` | `clang_github_tracker.collectors` | | `run_boost_library_usage_dashboard` | `BoostLibraryUsageDashboardCollector` | `boost_library_usage_dashboard.collectors` | | `run_boost_library_docs_tracker` | `BoostLibraryDocsTrackerCollector` | `boost_library_docs_tracker.management.commands.run_boost_library_docs_tracker` | @@ -47,13 +50,15 @@ Structural contracts for **data** that crosses tracker layers (sync outcomes, ac | Import | Purpose | |--------|---------| -| `core.protocols.TrackerResult` | `@runtime_checkable` protocol: `success`, `counts` (`Mapping[str, int]`). | +| `core.protocols.TrackerResult` | `@runtime_checkable` protocol: `success`, `counts` (`Mapping[str, int]`), `errors` (`Sequence[str]`), `duration_seconds` (`float \| None`). | | `core.protocols.ActivityRecord` | `@runtime_checkable` protocol: portable activity row (`source_system`, `external_id`, `occurred_at`, …). | | `core.activity_types` | Typed `ActivityRecord` fields: `SourceSystem`, `ActivityType`, `ActorExternalId`, UTC `occurred_at` helpers, and `migrate_legacy_activity_fields` / `activity_record_to_legacy_dict` for string payloads. | | `core.protocols.IncrementalState` | `@runtime_checkable` protocol: `checkpoint_token`, `human_readable_marker`, `extras`. | -| `core.protocols.require_tracker_result` / `require_activity_record` | Runtime guards raising `TypeError` when an object does not satisfy the protocol. | +| `core.protocols.require_tracker_result` / `require_activity_record` / `require_incremental_state` | Runtime guards raising `TypeError` when an object does not satisfy the protocol. | -Implementations are frozen dataclasses in each tracker app (for example `github_activity_tracker.protocol_impl`, `discord_activity_tracker.protocol_impl`). Prefer dataclasses over plain `dict` for reliable `isinstance` checks with `@runtime_checkable`. +Implementations are frozen dataclasses in each tracker app's `protocol_impl.py` (for example `github_activity_tracker.protocol_impl`, `discord_activity_tracker.protocol_impl`, `boost_library_tracker.protocol_impl`). Simple collectors may return `GenericTrackerResult` directly. Prefer dataclasses over plain `dict` for reliable `isinstance` checks with `@runtime_checkable`. + +`AbstractCollector.collect()` must return a `TrackerResult`. Override `load_incremental_state()` / `persist_incremental_state()` when a collector needs checkpoint read/write between runs (default hooks are no-ops). **Local static check:** with dev dependencies installed (`requirements-dev.lock`), from the repo root run **`uv run pyright`** (same as the **`pyright`** job in [`.github/workflows/actions.yml`](../.github/workflows/actions.yml)). Root **`pyrightconfig.json`** scopes analysis to `core`, `github_activity_tracker`, and `discord_activity_tracker` and excludes **`core/pyright_samples/**`** from that run; **`core/tests/test_protocols.py`** still exercises positive/negative protocol assignment snippets via subprocess. diff --git a/docs/How_to_add_a_collector.md b/docs/How_to_add_a_collector.md index 52c75ad2..fd5d7e04 100644 --- a/docs/How_to_add_a_collector.md +++ b/docs/How_to_add_a_collector.md @@ -21,7 +21,7 @@ Add a task under the right group in `config/boost_collector_schedule.yaml` (see Stable imports live under **`core.collectors`** (re-exported in [`core/collectors/__init__.py`](../core/collectors/__init__.py)); see the **Collectors** table in [Core_public_API.md](Core_public_API.md#collectors) for `AbstractCollector`, `CollectorRunnable`, and `BaseCollectorCommand`. -- Subclass **`AbstractCollector`** and implement a stable `name` property, `validate_config()`, and `collect()`. The base provides concrete `run()` as `validate_config()` then `collect()`, plus `handle_error()` / `sync_pinecone()` from the shared lifecycle mixin. Use **`BaseCollectorCommand`** so the management command stays thin (`get_collector()` returns any **`CollectorRunnable`**: `run`, `sync_pinecone`, `handle_error`). +- Subclass **`AbstractCollector`** and implement a stable `name` property, `validate_config()`, and `collect() -> TrackerResult`. Return a frozen dataclass from your app's `protocol_impl.py`, or **`GenericTrackerResult`** for simple collectors. The base provides concrete `run()` (validate → `load_incremental_state()` → collect → `post_collect()`), plus `handle_error()` / `sync_pinecone()` from the shared lifecycle mixin. Use **`BaseCollectorCommand`** so the management command stays thin (`get_collector()` returns any **`CollectorRunnable`**: `run`, `sync_pinecone`, `handle_error`, `last_result`). ### Collector contracts (source of truth) @@ -51,6 +51,7 @@ my_skeleton_tracker/ __init__.py apps.py collectors.py + protocol_impl.py # optional at first; add when you outgrow GenericTrackerResult models.py services.py management/ @@ -135,7 +136,7 @@ from __future__ import annotations import logging -from core.collectors import AbstractCollector +from core.collectors import AbstractCollector, GenericTrackerResult from my_skeleton_tracker.services import record_skeleton_run # If you override handle_error, you can log or map errors explicitly, e.g.: @@ -162,7 +163,7 @@ class MySkeletonCollector(AbstractCollector): if not self.source_key or not self.source_key.strip(): raise ValueError("source_key must not be empty") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: obj, created = record_skeleton_run(source_key=self.source_key.strip()) logger.info( "skeleton run recorded source_key=%s run_count=%s created=%s", @@ -170,6 +171,7 @@ class MySkeletonCollector(AbstractCollector): obj.run_count, created, ) + return GenericTrackerResult.ok(runs=1, created=int(created)) # STANDARD: omit sync_pinecone unless you post-process (e.g. Pinecone); default is no-op. ``` diff --git a/docs/Tutorial_building_a_collector.md b/docs/Tutorial_building_a_collector.md index df88d62d..108bb53a 100644 --- a/docs/Tutorial_building_a_collector.md +++ b/docs/Tutorial_building_a_collector.md @@ -115,6 +115,8 @@ flowchart TB The generated collector follows this shape (abbreviated): ```python +from core.collectors import AbstractCollector, GenericTrackerResult + class HeartbeatDemoCollector(AbstractCollector): @property def name(self) -> str: @@ -124,9 +126,9 @@ class HeartbeatDemoCollector(AbstractCollector): if not self.source_key or not self.source_key.strip(): raise ValueError("source_key must not be empty") - def collect(self) -> None: + def collect(self) -> GenericTrackerResult: _, created = services.record_run(source_key=self.source_key.strip()) - # ... + return GenericTrackerResult.ok(runs=1, created=int(created)) class Command(BaseCollectorCommand): def get_collector(self, **_options: Any) -> AbstractCollector: @@ -156,7 +158,9 @@ sequenceDiagram Cmd->>Col: get_collector(options) Cmd->>Col: run() Col->>Col: validate_config() - Col->>Col: collect() + Col->>Col: load_incremental_state() + Col->>Col: collect() -> TrackerResult + Cmd->>Cmd: log TrackerResult counts/errors Cmd->>Col: sync_pinecone() ``` @@ -170,7 +174,7 @@ Implementation: [core/collectors/command_base.py](../core/collectors/command_bas |------|----------------|----------------|------------------| | `name` | `handle_error` logging | Stable slug for metrics/alerts | `"heartbeat_demo"` | | `validate_config()` | `run()` before I/O | Fast checks: env, CLI, empty keys | Reject empty `source_key` | -| `collect()` | `run()` | Orchestration; delegate DB to `services.py` | `services.record_run(...)` | +| `collect()` | `run()` | Orchestration; delegate DB to `services.py`; return `TrackerResult` | `GenericTrackerResult.ok(...)` after `services.record_run(...)` | | `run()` | Command | Template: validate → collect | Do not override | | `handle_error(exc)` | Command on non-`CommandError` | Log with `classify_failure` | Default is enough for most apps | | `sync_pinecone()` | Command after `run` | Post-run vector sync; default no-op | See §2.4 | diff --git a/docs/service_api/core_protocols.md b/docs/service_api/core_protocols.md index 6a16d508..e8bf9b48 100644 --- a/docs/service_api/core_protocols.md +++ b/docs/service_api/core_protocols.md @@ -40,12 +40,15 @@ Outcome of one logical collection or sync cycle. | --- | --- | | `success` | bool | | `counts` | Mapping[str, int] | +| `errors` | Sequence[str] | +| `duration_seconds` | float \| None | ## Module functions (generated) | Function | Parameters | Return type | Summary | | --- | --- | --- | --- | | `require_activity_record` | obj: object | ActivityRecord | Return *obj* if it satisfies :class:`ActivityRecord`; else raise ``TypeError``. | +| `require_incremental_state` | obj: object | IncrementalState | Return *obj* if it satisfies :class:`IncrementalState`; else raise ``TypeError``. | | `require_tracker_result` | obj: object | TrackerResult | Return *obj* if it satisfies :class:`TrackerResult`; else raise ``TypeError``. | diff --git a/github_activity_tracker/protocol_impl.py b/github_activity_tracker/protocol_impl.py index 48ba8641..87f17a8e 100644 --- a/github_activity_tracker/protocol_impl.py +++ b/github_activity_tracker/protocol_impl.py @@ -24,6 +24,8 @@ class GitHubSyncTrackerResult: success: bool counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None @classmethod def from_sync_dict(cls, d: dict[str, list[int]]) -> GitHubSyncTrackerResult: @@ -34,6 +36,22 @@ def from_sync_dict(cls, d: dict[str, list[int]]) -> GitHubSyncTrackerResult: counts={"issues": len(issues), "pull_requests": len(prs)}, ) + @classmethod + def merge(cls, *results: GitHubSyncTrackerResult) -> GitHubSyncTrackerResult: + """Combine per-repo results into one aggregate.""" + if not results: + return cls(success=True, counts={}) + counts: dict[str, int] = {} + errors: list[str] = [] + success = True + for r in results: + if not r.success: + success = False + for k, v in r.counts.items(): + counts[k] = counts.get(k, 0) + v + errors.extend(r.errors) + return cls(success=success, counts=counts, errors=tuple(errors)) + def sync_github_tracker_result( repo: Any, diff --git a/github_activity_tracker/tests/test_protocol_impl.py b/github_activity_tracker/tests/test_protocol_impl.py index 68e8382c..e61217bd 100644 --- a/github_activity_tracker/tests/test_protocol_impl.py +++ b/github_activity_tracker/tests/test_protocol_impl.py @@ -30,3 +30,11 @@ def test_github_sync_tracker_result_from_sync_dict(): r = GitHubSyncTrackerResult.from_sync_dict({"issues": [], "pull_requests": [1]}) assert r.counts["issues"] == 0 assert r.counts["pull_requests"] == 1 + + +def test_github_sync_tracker_result_merge(): + a = GitHubSyncTrackerResult.from_sync_dict({"issues": [1], "pull_requests": []}) + b = GitHubSyncTrackerResult.from_sync_dict({"issues": [2], "pull_requests": [3]}) + merged = GitHubSyncTrackerResult.merge(a, b) + assert isinstance(merged, TrackerResult) + assert merged.counts == {"issues": 2, "pull_requests": 1} diff --git a/wg21_paper_tracker/collectors.py b/wg21_paper_tracker/collectors.py index 6ad12608..f2268f36 100644 --- a/wg21_paper_tracker/collectors.py +++ b/wg21_paper_tracker/collectors.py @@ -9,6 +9,8 @@ from django.core.management.base import CommandError from core.collectors import AbstractCollector +from core.protocols import TrackerResult +from wg21_paper_tracker.protocol_impl import Wg21PaperTrackerResult from wg21_paper_tracker.pipeline import ( _normalize_mailing_date_label, run_tracker_pipeline, @@ -107,7 +109,7 @@ def _validated_bound( f"--to-date {self.to_date!r}." ) - def collect(self) -> None: + def collect(self) -> TrackerResult: if self.dry_run: if self.from_date or self.to_date: logger.info( @@ -118,21 +120,22 @@ def collect(self) -> None: ) else: logger.info("Dry run: skipping pipeline and GitHub dispatch.") - return + return Wg21PaperTrackerResult.dry_run() logger.info("Starting WG21 Paper Tracker...") try: - result = run_tracker_pipeline( + pipeline_result = run_tracker_pipeline( from_mailing_date=self.from_date, to_mailing_date=self.to_date, ) - n = result.new_paper_count + tracker_result = Wg21PaperTrackerResult.from_pipeline(pipeline_result) + n = pipeline_result.new_paper_count logger.info("Recorded %d new paper(s); %d URL(s) for dispatch.", n, n) if not n: logger.info("No new papers in this run. Skipping GitHub dispatch.") - return + return tracker_result repo = getattr(settings, "WG21_GITHUB_DISPATCH_REPO", "") or "" token = getattr(settings, "WG21_GITHUB_DISPATCH_TOKEN", "") or "" @@ -149,14 +152,15 @@ def collect(self) -> None: "and configure WG21_GITHUB_DISPATCH_REPO and " "WG21_GITHUB_DISPATCH_TOKEN." ) - return + return tracker_result trigger_github_repository_dispatch( repo, event_type, token, - list(result.new_paper_urls), + list(pipeline_result.new_paper_urls), ) logger.info("repository_dispatch sent successfully.") + return tracker_result except ValueError as e: raise CommandError(str(e)) from e diff --git a/wg21_paper_tracker/protocol_impl.py b/wg21_paper_tracker/protocol_impl.py new file mode 100644 index 00000000..e7b819c8 --- /dev/null +++ b/wg21_paper_tracker/protocol_impl.py @@ -0,0 +1,27 @@ +"""Frozen DTOs implementing :mod:`core.protocols` for WG21 paper tracker.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Mapping + +from wg21_paper_tracker.pipeline import TrackerPipelineResult + + +@dataclass(frozen=True) +class Wg21PaperTrackerResult: + """Structured :class:`~core.protocols.TrackerResult` for pipeline outcomes.""" + + success: bool + counts: Mapping[str, int] + errors: tuple[str, ...] = field(default_factory=tuple) + duration_seconds: float | None = None + + @classmethod + def from_pipeline(cls, result: TrackerPipelineResult) -> Wg21PaperTrackerResult: + n = result.new_paper_count + return cls(success=True, counts={"new_papers": n}) + + @classmethod + def dry_run(cls) -> Wg21PaperTrackerResult: + return cls(success=True, counts={}) diff --git a/wg21_paper_tracker/tests/test_protocol_impl.py b/wg21_paper_tracker/tests/test_protocol_impl.py new file mode 100644 index 00000000..fee17d2d --- /dev/null +++ b/wg21_paper_tracker/tests/test_protocol_impl.py @@ -0,0 +1,14 @@ +"""Tests for :mod:`wg21_paper_tracker.protocol_impl`.""" + +from core.protocols import TrackerResult + +from wg21_paper_tracker.pipeline import TrackerPipelineResult +from wg21_paper_tracker.protocol_impl import Wg21PaperTrackerResult + + +def test_wg21_paper_tracker_result_from_pipeline() -> None: + r = Wg21PaperTrackerResult.from_pipeline( + TrackerPipelineResult(new_paper_urls=("https://x",)) + ) + assert isinstance(r, TrackerResult) + assert r.counts["new_papers"] == 1 From efbbfbb4578c3220acf2b6b9c8d19082083dbf22 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Tue, 9 Jun 2026 12:16:22 -0700 Subject: [PATCH 2/5] refactor(tests): remove unused Wg21Reflector classes from test_collector_protocol_conformance.py --- core/tests/test_collector_protocol_conformance.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/tests/test_collector_protocol_conformance.py b/core/tests/test_collector_protocol_conformance.py index 5aff0bb9..0ea10160 100644 --- a/core/tests/test_collector_protocol_conformance.py +++ b/core/tests/test_collector_protocol_conformance.py @@ -31,10 +31,6 @@ GitHubSyncTrackerResult, ) from wg21_paper_tracker.protocol_impl import Wg21PaperTrackerResult -from wg21_reflector_collector.protocol_impl import ( - Wg21ReflectorIncrementalState, - Wg21ReflectorTrackerResult, -) @pytest.mark.parametrize( @@ -52,7 +48,6 @@ MailingListTrackerResult.from_run(fetched=1, created=1, skipped=0), SlackTrackerResult.dry_run(), YoutubeScriptTrackerResult.from_run(videos=1), - Wg21ReflectorTrackerResult.dry_run(), LibraryDocsTrackerResult.from_run(versions=1, pages=5), CollectBoostLibrariesResult.empty(), ], @@ -73,7 +68,6 @@ def test_tracker_result_isinstance(result: TrackerResult) -> None: ClangGithubIncrementalState.from_watermarks( start_commit="abc", start_item="2024" ), - Wg21ReflectorIncrementalState.from_since("2024-01-01"), ], ) def test_incremental_state_isinstance(state: IncrementalState) -> None: From dc1233f49c3d72487eceaeca1016c891d1010e6c Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Tue, 9 Jun 2026 12:42:40 -0700 Subject: [PATCH 3/5] feat(protocols): add MappingProxyType for counts and extras in various tracker result classes --- boost_library_docs_tracker/protocol_impl.py | 4 ++++ boost_library_tracker/protocol_impl.py | 4 ++++ core/collectors/base_collector.py | 18 ++++++++++++++---- core/collectors/command_base.py | 4 +++- core/incremental_state.py | 4 ++++ core/tracker_result.py | 15 +++++++++++++-- .../commands/run_cppa_pinecone_sync.py | 2 +- cppa_pinecone_sync/protocol_impl.py | 2 +- cppa_pinecone_sync/tests/test_protocol_impl.py | 1 + cppa_slack_tracker/protocol_impl.py | 7 +++++++ cppa_youtube_script_tracker/protocol_impl.py | 4 ++++ wg21_paper_tracker/protocol_impl.py | 4 ++++ 12 files changed, 60 insertions(+), 9 deletions(-) diff --git a/boost_library_docs_tracker/protocol_impl.py b/boost_library_docs_tracker/protocol_impl.py index 8947bbfb..64e99048 100644 --- a/boost_library_docs_tracker/protocol_impl.py +++ b/boost_library_docs_tracker/protocol_impl.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from types import MappingProxyType from typing import Mapping @@ -15,6 +16,9 @@ class LibraryDocsTrackerResult: errors: tuple[str, ...] = field(default_factory=tuple) duration_seconds: float | None = None + def __post_init__(self) -> None: + object.__setattr__(self, "counts", MappingProxyType(dict(self.counts))) + @classmethod def from_run( cls, diff --git a/boost_library_tracker/protocol_impl.py b/boost_library_tracker/protocol_impl.py index 48ae5eb6..73c72c7c 100644 --- a/boost_library_tracker/protocol_impl.py +++ b/boost_library_tracker/protocol_impl.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from types import MappingProxyType from typing import Mapping @@ -15,6 +16,9 @@ class CollectBoostLibrariesResult: errors: tuple[str, ...] = field(default_factory=tuple) duration_seconds: float | None = None + def __post_init__(self) -> None: + object.__setattr__(self, "counts", MappingProxyType(dict(self.counts))) + @classmethod def from_totals( cls, diff --git a/core/collectors/base_collector.py b/core/collectors/base_collector.py index 0c30b62c..718a92cf 100644 --- a/core/collectors/base_collector.py +++ b/core/collectors/base_collector.py @@ -15,7 +15,12 @@ from typing import Protocol, runtime_checkable from core.errors import classify_failure -from core.protocols import IncrementalState, TrackerResult, require_tracker_result +from core.protocols import ( + IncrementalState, + TrackerResult, + require_incremental_state, + require_tracker_result, +) from core.tracker_result import with_duration_if_missing logger = logging.getLogger(__name__) @@ -31,7 +36,7 @@ class CollectorRunnable(Protocol): routes failures through :meth:`handle_error` (except :class:`~django.core.management.base.CommandError`). """ - def run(self) -> TrackerResult | None: + def run(self) -> TrackerResult: """Main collection phase; see :class:`AbstractCollector`.""" ... @@ -93,7 +98,7 @@ def post_collect(self) -> None: """ state_out = getattr(self, "_incremental_state_out", None) if state_out is not None: - self.persist_incremental_state(state_out) + self.persist_incremental_state(require_incremental_state(state_out)) return None def load_incremental_state(self) -> IncrementalState | None: @@ -234,7 +239,12 @@ def run(self) -> TrackerResult: try: self.pre_collect() self.validate_config() - self._incremental_state_in = self.load_incremental_state() + loaded_state = self.load_incremental_state() + self._incremental_state_in = ( + require_incremental_state(loaded_state) + if loaded_state is not None + else None + ) raw_result = self.collect() result = require_tracker_result(raw_result) elapsed = time.monotonic() - started diff --git a/core/collectors/command_base.py b/core/collectors/command_base.py index de853dc5..0577be47 100644 --- a/core/collectors/command_base.py +++ b/core/collectors/command_base.py @@ -24,7 +24,9 @@ def _records_collected(result: TrackerResult) -> int: """Sum count values, excluding meta keys like ``errors``.""" - return sum(v for k, v in result.counts.items() if k != "errors") + return sum( + v for k, v in result.counts.items() if k not in ("errors", "failed_count") + ) def _log_collector_result(collector: CollectorRunnable, result: TrackerResult) -> None: diff --git a/core/incremental_state.py b/core/incremental_state.py index 564ce939..28bbbfcb 100644 --- a/core/incremental_state.py +++ b/core/incremental_state.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from types import MappingProxyType from typing import Any, Mapping @@ -13,3 +14,6 @@ class GenericIncrementalState: checkpoint_token: str | None human_readable_marker: str | None extras: Mapping[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + object.__setattr__(self, "extras", MappingProxyType(dict(self.extras))) diff --git a/core/tracker_result.py b/core/tracker_result.py index 91d77475..45de67f7 100644 --- a/core/tracker_result.py +++ b/core/tracker_result.py @@ -2,7 +2,8 @@ from __future__ import annotations -from dataclasses import dataclass, field, is_dataclass, replace +from dataclasses import dataclass, field, fields, is_dataclass, replace +from types import MappingProxyType from typing import TYPE_CHECKING, Mapping if TYPE_CHECKING: @@ -18,6 +19,9 @@ class GenericTrackerResult: errors: tuple[str, ...] = field(default_factory=tuple) duration_seconds: float | None = None + def __post_init__(self) -> None: + object.__setattr__(self, "counts", MappingProxyType(dict(self.counts))) + @classmethod def ok(cls, **counts: int) -> GenericTrackerResult: """Build a successful result with the given count fields.""" @@ -44,5 +48,12 @@ def with_duration_if_missing( if isinstance(result, GenericTrackerResult): return result.with_duration(duration_seconds) if is_dataclass(result) and not isinstance(result, type): - return replace(result, duration_seconds=duration_seconds) # type: ignore[return-value] + duration_field = next( + (f for f in fields(result) if f.name == "duration_seconds"), None + ) + if duration_field is not None and duration_field.init: + try: + return replace(result, duration_seconds=duration_seconds) # type: ignore[return-value] + except (TypeError, ValueError): + return result return result diff --git a/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py b/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py index 28fa46b3..65fb4254 100644 --- a/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py +++ b/cppa_pinecone_sync/management/commands/run_cppa_pinecone_sync.py @@ -89,7 +89,7 @@ def collect(self) -> TrackerResult: "CPPA Pinecone Sync completed: upserted=%s, total=%s, failed_count=%s", result.counts.get("upserted", 0), result.counts.get("total", 0), - result.counts.get("errors", 0), + result.counts.get("failed_count", 0), ) for err in result.errors: logger.warning("Sync error: %s", err) diff --git a/cppa_pinecone_sync/protocol_impl.py b/cppa_pinecone_sync/protocol_impl.py index f7a0b10f..2fe4feeb 100644 --- a/cppa_pinecone_sync/protocol_impl.py +++ b/cppa_pinecone_sync/protocol_impl.py @@ -25,7 +25,7 @@ def from_sync_dict(cls, d: Mapping[str, Any]) -> PineconeSyncTrackerResult: counts={ "upserted": int(d.get("upserted") or 0), "total": int(d.get("total") or 0), - "errors": failed, + "failed_count": failed, }, errors=errors, ) diff --git a/cppa_pinecone_sync/tests/test_protocol_impl.py b/cppa_pinecone_sync/tests/test_protocol_impl.py index fc844988..1f855806 100644 --- a/cppa_pinecone_sync/tests/test_protocol_impl.py +++ b/cppa_pinecone_sync/tests/test_protocol_impl.py @@ -11,5 +11,6 @@ def test_pinecone_sync_tracker_result_from_sync_dict() -> None: ) assert isinstance(r, TrackerResult) assert r.counts["upserted"] == 2 + assert r.counts["failed_count"] == 1 assert r.errors == ("x",) assert r.success is False diff --git a/cppa_slack_tracker/protocol_impl.py b/cppa_slack_tracker/protocol_impl.py index 45911f84..1b975e2f 100644 --- a/cppa_slack_tracker/protocol_impl.py +++ b/cppa_slack_tracker/protocol_impl.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from types import MappingProxyType from typing import Any, Mapping @@ -15,6 +16,9 @@ class SlackTrackerResult: errors: tuple[str, ...] = field(default_factory=tuple) duration_seconds: float | None = None + def __post_init__(self) -> None: + object.__setattr__(self, "counts", MappingProxyType(dict(self.counts))) + @classmethod def from_counts(cls, **counts: int) -> SlackTrackerResult: return cls(success=True, counts=dict(counts)) @@ -32,6 +36,9 @@ class SlackIncrementalState: human_readable_marker: str | None extras: Mapping[str, Any] = field(default_factory=dict) + def __post_init__(self) -> None: + object.__setattr__(self, "extras", MappingProxyType(dict(self.extras))) + @classmethod def from_team( cls, *, team_id: str, start_date: str | None diff --git a/cppa_youtube_script_tracker/protocol_impl.py b/cppa_youtube_script_tracker/protocol_impl.py index 58a42c09..89ccc08e 100644 --- a/cppa_youtube_script_tracker/protocol_impl.py +++ b/cppa_youtube_script_tracker/protocol_impl.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from types import MappingProxyType from typing import Mapping @@ -15,6 +16,9 @@ class YoutubeScriptTrackerResult: errors: tuple[str, ...] = field(default_factory=tuple) duration_seconds: float | None = None + def __post_init__(self) -> None: + object.__setattr__(self, "counts", MappingProxyType(dict(self.counts))) + @classmethod def from_run( cls, diff --git a/wg21_paper_tracker/protocol_impl.py b/wg21_paper_tracker/protocol_impl.py index e7b819c8..d959d403 100644 --- a/wg21_paper_tracker/protocol_impl.py +++ b/wg21_paper_tracker/protocol_impl.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field +from types import MappingProxyType from typing import Mapping from wg21_paper_tracker.pipeline import TrackerPipelineResult @@ -17,6 +18,9 @@ class Wg21PaperTrackerResult: errors: tuple[str, ...] = field(default_factory=tuple) duration_seconds: float | None = None + def __post_init__(self) -> None: + object.__setattr__(self, "counts", MappingProxyType(dict(self.counts))) + @classmethod def from_pipeline(cls, result: TrackerPipelineResult) -> Wg21PaperTrackerResult: n = result.new_paper_count From 44cfb1889783a2072213163f3ac99207cf1f923c Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 10 Jun 2026 11:50:00 -0700 Subject: [PATCH 4/5] feat(collectors): enhance collector behavior and error reporting --- CHANGELOG.md | 2 ++ STABILITY.md | 1 + core/collectors/base_collector.py | 2 +- core/tests/test_collectors_base.py | 31 ++++++++++++++++++- .../backfill_discord_activity_tracker.py | 31 +++++++++++++------ .../tests/test_backfill_command_extra.py | 1 + ...ckfill_discord_activity_tracker_command.py | 29 +++++++++++++++++ 7 files changed, 86 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11880761..a25f9ac1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **core.collectors:** `AbstractCollector.last_result` is set only after `post_collect()` completes successfully (including default incremental checkpoint persistence), matching the documented “most recent successful run” semantics. +- **discord_activity_tracker:** `backfill_discord_activity_tracker` reports per-file import failures on `DiscordCollectionTrackerResult` (`success=False`, `errors`, `failed_files` count) instead of always returning `success=True`. - **core.protocols / ActivityRecord:** `occurred_at` is timezone-aware UTC `datetime | None`; `source_system` is `SourceSystem` (`StrEnum`); `activity_type` is branded `ActivityType`; `actor_external_id` is `ActorExternalId` (`NewType`). Legacy string payloads use `core.activity_types.migrate_legacy_activity_fields` and `activity_record_to_legacy_dict` on GitHub/Discord `protocol_impl` dataclasses. - **Celery schedule:** Added `discord` group to `config/boost_collector_schedule.yaml` (`run_discord_activity_tracker` daily at 16:40 UTC). - **core.collectors:** Removed deprecated `CollectorBase` and `DjangoCommandCollector`; the supported collector contract is **`AbstractCollector`** + **`BaseCollectorCommand`** (see docs). diff --git a/STABILITY.md b/STABILITY.md index 719da41a..c886ce16 100644 --- a/STABILITY.md +++ b/STABILITY.md @@ -139,6 +139,7 @@ Supported in production with **forward migrations** and **CHANGELOG** notes. Not | --- | --- | | **PostgreSQL schema** | Changed only via Django migrations; every deploy runs `python manage.py migrate` | | **`services.py` functions** | Per-app write API; signatures may change in minor `0.x` releases when [docs/service_api/](docs/service_api/) and all callers are updated together. Cross-app reads should use **`services`** or **`sync_api`**, not foreign models (see [CONTRIBUTING.md](CONTRIBUTING.md)) | +| **Collector run outcomes** | `TrackerResult.success` and `errors` must reflect the real outcome (e.g. batch backfills must not report `success=True` when individual files fail). `AbstractCollector.last_result` is the most recent **fully** successful `run()` — after `collect()` **and** `post_collect()` (including checkpoint persistence) complete without error. | ### Tier C — Unstable diff --git a/core/collectors/base_collector.py b/core/collectors/base_collector.py index 718a92cf..8dd15e5b 100644 --- a/core/collectors/base_collector.py +++ b/core/collectors/base_collector.py @@ -249,8 +249,8 @@ def run(self) -> TrackerResult: result = require_tracker_result(raw_result) elapsed = time.monotonic() - started result = with_duration_if_missing(result, elapsed) - self._last_result = result self.post_collect() + self._last_result = result return result except Exception as exc: try: diff --git a/core/tests/test_collectors_base.py b/core/tests/test_collectors_base.py index 9df73b58..3a5f7c7d 100644 --- a/core/tests/test_collectors_base.py +++ b/core/tests/test_collectors_base.py @@ -356,9 +356,38 @@ def post_collect(self) -> None: def on_error(self, exc: BaseException) -> None: calls.append("on_error") + collector = AC() with pytest.raises(RuntimeError, match="post failed"): - AC().run() + collector.run() assert calls == ["collect", "on_error"] + assert collector.last_result is None + + +def test_abstract_collector_run_failure_in_persist_incremental_state_does_not_set_last_result(): + from core.incremental_state import GenericIncrementalState + + state_out = GenericIncrementalState(checkpoint_token="t", human_readable_marker="m") + + class AC(AbstractCollector): + @property + def name(self) -> str: + return "ac" + + def validate_config(self) -> None: + return None + + def collect(self) -> GenericTrackerResult: + self._incremental_state_out = state_out + return _OK + + def persist_incremental_state(self, state) -> None: + raise RuntimeError("persist failed") + + collector = AC() + with pytest.raises(RuntimeError, match="persist failed"): + collector.run() + + assert collector.last_result is None def test_abstract_collector_run_on_error_does_not_swallow_exception(): diff --git a/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py b/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py index 99f99906..23a1438a 100644 --- a/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py +++ b/discord_activity_tracker/management/commands/backfill_discord_activity_tracker.py @@ -112,10 +112,12 @@ def collect(self) -> TrackerResult: ) processed_total = 0 + failed_files = 0 + errors: list[str] = [] for i, json_path in enumerate(json_files, 1): + rel = _json_display_path(import_dir, json_path) try: data = parse_exported_json(json_path) - rel = _json_display_path(import_dir, json_path) envelope = validate_envelope(data, source=rel) guild_info = envelope.guild.model_dump(by_alias=True) channel_info = envelope.channel.model_dump(by_alias=True) @@ -134,19 +136,30 @@ def collect(self) -> TrackerResult: self.style.SUCCESS(f" Imported {count}; removed {rel}") ) except Exception as exc: - rel = _json_display_path(import_dir, json_path) + failed_files += 1 + err_msg = f"{rel}: {exc}" + errors.append(err_msg) logger.error("Failed to process %s: %s", rel, exc) self.stdout.write(self.style.ERROR(f" Failed {rel}: {exc}")) - self.stdout.write( - self.style.SUCCESS( - f"Import complete: {processed_total} messages from " - f"{len(json_files)} file(s)" - ) + summary = ( + f"Import complete: {processed_total} messages from " + f"{len(json_files)} file(s)" ) + if failed_files: + summary += f" ({failed_files} failed)" + self.stdout.write(self.style.WARNING(summary)) + else: + self.stdout.write(self.style.SUCCESS(summary)) + return DiscordCollectionTrackerResult( - success=True, - counts={"messages": processed_total, "files": len(json_files)}, + success=failed_files == 0, + counts={ + "messages": processed_total, + "files": len(json_files), + "failed_files": failed_files, + }, + errors=tuple(errors), ) async def _persist_channel( diff --git a/discord_activity_tracker/tests/test_backfill_command_extra.py b/discord_activity_tracker/tests/test_backfill_command_extra.py index d4c28876..cdaded74 100644 --- a/discord_activity_tracker/tests/test_backfill_command_extra.py +++ b/discord_activity_tracker/tests/test_backfill_command_extra.py @@ -82,6 +82,7 @@ def test_backfill_run_handles_bad_json(tmp_path, settings): assert "bad.json" in output assert "Failed bad.json:" in output assert "Import complete: 0 messages from 1 file(s)" in output + assert "(1 failed)" in output @pytest.mark.django_db diff --git a/discord_activity_tracker/tests/test_backfill_discord_activity_tracker_command.py b/discord_activity_tracker/tests/test_backfill_discord_activity_tracker_command.py index fafe179f..2780893b 100644 --- a/discord_activity_tracker/tests/test_backfill_discord_activity_tracker_command.py +++ b/discord_activity_tracker/tests/test_backfill_discord_activity_tracker_command.py @@ -104,6 +104,35 @@ def test_run_keeps_file_on_invalid_json(monkeypatch, tmp_path, settings): c.run() assert bad.exists() + result = c.last_result + assert result is not None + assert result.success is False + assert result.counts["failed_files"] == 1 + assert len(result.errors) == 1 + assert "bad.json" in result.errors[0] + + +def test_run_result_success_when_all_files_import(monkeypatch, tmp_path, settings): + monkeypatch.setattr(settings, "WORKSPACE_DIR", str(tmp_path)) + drop = tmp_path / "discord_activity_tracker" / "Discussion - c-cpp-discussion" + drop.mkdir(parents=True) + j = drop / "batch.json" + j.write_text(json.dumps(_minimal_export_payload()), encoding="utf-8") + + c = _collector(skip_pinecone=True) + with patch.object( + DiscordBackfillCollector, + "_persist_channel", + new_callable=AsyncMock, + return_value=1, + ): + c.run() + + result = c.last_result + assert result is not None + assert result.success is True + assert result.counts["failed_files"] == 0 + assert result.errors == () def test_dry_run_lists_files_no_delete(monkeypatch, tmp_path, settings): From aaf5ff707cbfec6989238e7eb2e0af1cd7593816 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 10 Jun 2026 11:59:23 -0700 Subject: [PATCH 5/5] feat(collectors): update collect method to return TrackerResult --- .../management/commands/run_reddit_activity_tracker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/reddit_activity_tracker/management/commands/run_reddit_activity_tracker.py b/reddit_activity_tracker/management/commands/run_reddit_activity_tracker.py index f1a1d769..e8c43de2 100644 --- a/reddit_activity_tracker/management/commands/run_reddit_activity_tracker.py +++ b/reddit_activity_tracker/management/commands/run_reddit_activity_tracker.py @@ -6,6 +6,8 @@ from typing import Any from core.collectors import AbstractCollector, BaseCollectorCommand +from core.protocols import TrackerResult +from core.tracker_result import GenericTrackerResult from reddit_activity_tracker.fetcher import build_session @@ -26,12 +28,13 @@ def name(self) -> str: def validate_config(self) -> None: build_session() - def collect(self) -> None: + def collect(self) -> TrackerResult: logger.info("run_reddit_activity_tracker: stub — fetch/upsert in PR2") self.stdout.write( self.style.SUCCESS("reddit_activity_tracker completed (stub)") ) logger.info("run_reddit_activity_tracker: finished successfully") + return GenericTrackerResult.ok() class Command(BaseCollectorCommand):