Skip to content

Commit 932c436

Browse files
authored
Bulk update bucket hits for clustered reports (#128)
1 parent 2d94303 commit 932c436

2 files changed

Lines changed: 66 additions & 7 deletions

File tree

server/reportmanager/management/commands/triage_new_reports.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from itertools import batched
23
from logging import getLogger
34

@@ -9,7 +10,13 @@
910
ClusteringConfig,
1011
ClusterReport,
1112
)
12-
from reportmanager.models import Bucket, ClusteringJob, ClusteringJobType, ReportEntry
13+
from reportmanager.models import (
14+
Bucket,
15+
BucketHit,
16+
ClusteringJob,
17+
ClusteringJobType,
18+
ReportEntry,
19+
)
1320

1421
LOG = getLogger("reportmanager.triage")
1522

@@ -75,10 +82,10 @@ def cluster_unmatched_reports(
7582
def apply_domain_bucketing_fallback(
7683
unmatched_reports: list[ClusterReport],
7784
report_entries: dict[int, ReportEntry],
78-
) -> int:
85+
) -> tuple[int, list[tuple[int, datetime]]]:
7986
"""Add unclustered reports to default domain-based buckets."""
8087
if not unmatched_reports:
81-
return 0
88+
return 0, []
8289

8390
LOG.info(
8491
f"Applying domain-based bucketing to {len(unmatched_reports)} reports that didn't cluster" # noqa
@@ -98,6 +105,7 @@ def apply_domain_bucketing_fallback(
98105
existing_buckets.update({bucket["domain"]: bucket["id"] for bucket in buckets})
99106

100107
entries_to_update = []
108+
bucket_hits = []
101109
buckets_created = 0
102110

103111
for report in unmatched_reports:
@@ -118,14 +126,15 @@ def apply_domain_bucketing_fallback(
118126
existing_buckets[report.domain] = bucket_id
119127

120128
entry = report_entries[report.id]
121-
entry.bucket_id = bucket_id
129+
entry.bucket_id = bucket_id # type: ignore[attr-defined]
122130
entries_to_update.append(entry)
131+
bucket_hits.append((bucket_id, entry.reported_at))
123132

124133
if entries_to_update:
125134
ReportEntry.objects.bulk_update(entries_to_update, ["bucket_id"])
126135

127136
LOG.info(f"Applied domain-based bucketing to {len(entries_to_update)} reports")
128-
return buckets_created
137+
return buckets_created, bucket_hits
129138

130139

131140
def get_cluster_bucket(
@@ -184,6 +193,7 @@ def run_triage(job: ClusteringJob) -> None:
184193
unmatched_reports = []
185194
low_quality_reports = []
186195
entries_to_update = []
196+
bucket_hits = []
187197

188198
for report in unbucketed_reports:
189199
if report.ok_to_cluster:
@@ -194,6 +204,7 @@ def run_triage(job: ClusteringJob) -> None:
194204
entry.cluster_id = cluster_id
195205
entry.bucket_id = bucket_id
196206
entries_to_update.append(entry)
207+
bucket_hits.append((bucket_id, entry.reported_at))
197208
else:
198209
# Track unmatched reports for further clustering
199210
unmatched_reports.append(report)
@@ -219,7 +230,13 @@ def run_triage(job: ClusteringJob) -> None:
219230
# Fall back to domain-based bucketing for reports that still don't have clusters
220231
# and low-quality reports
221232
remaining = still_unmatched + low_quality_reports
222-
fallback_buckets = apply_domain_bucketing_fallback(remaining, report_entries)
233+
fallback_buckets, fallback_bucket_hits = apply_domain_bucketing_fallback(
234+
remaining, report_entries
235+
)
236+
237+
all_bucket_hits = bucket_hits + fallback_bucket_hits
238+
if all_bucket_hits:
239+
BucketHit.bulk_increment_counts(all_bucket_hits)
223240

224241
total_buckets = buckets_created + fallback_buckets
225242
complete_job(job, success=True, buckets_created=total_buckets)

server/reportmanager/models.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44
import json
55
import re
6+
from collections import defaultdict
67
from dataclasses import dataclass
7-
from datetime import timedelta
8+
from datetime import datetime, timedelta
89
from itertools import batched
910
from logging import getLogger
1011
from urllib.parse import urlsplit
@@ -376,6 +377,47 @@ def increment_count(cls, bucket_id, begin):
376377
counter.count += 1
377378
counter.save()
378379

380+
@classmethod
381+
@transaction.atomic
382+
def bulk_increment_counts(cls, bucket_hits: list[tuple[int, datetime]]) -> None:
383+
"""Bulk increment BucketHit counts for multiple reports."""
384+
if not bucket_hits:
385+
return
386+
387+
# Aggregate counts of reports per bucket_id per hour
388+
buckethit_updates: dict[tuple[int, datetime], int] = defaultdict(int)
389+
bucket_ids: set[int] = set()
390+
begins: set[datetime] = set()
391+
392+
for bucket_id, reported_at in bucket_hits:
393+
normalized_time = reported_at.replace(microsecond=0, second=0, minute=0)
394+
buckethit_updates[(bucket_id, normalized_time)] += 1
395+
bucket_ids.add(bucket_id)
396+
begins.add(normalized_time)
397+
398+
existing = {
399+
(h.bucket_id, h.begin): h # type: ignore[attr-defined]
400+
for h in cls.objects.select_for_update().filter(
401+
bucket_id__in=bucket_ids, begin__in=begins
402+
)
403+
}
404+
405+
to_update: list = []
406+
to_create: list = []
407+
408+
for (bucket_id, begin), count in buckethit_updates.items():
409+
if (bucket_id, begin) in existing:
410+
hit = existing[(bucket_id, begin)]
411+
hit.count += count
412+
to_update.append(hit)
413+
else:
414+
to_create.append(cls(bucket_id=bucket_id, begin=begin, count=count))
415+
416+
if to_update:
417+
cls.objects.bulk_update(to_update, ["count"])
418+
if to_create:
419+
cls.objects.bulk_create(to_create)
420+
379421
class Meta(TypedModelMeta):
380422
constraints = (
381423
models.UniqueConstraint(

0 commit comments

Comments
 (0)