Skip to content

Commit 042a1e7

Browse files
authored
Merge pull request #84 from PySport/feature/invalidate-revisions-batch
Add invalidate_revisions for batched revision invalidation
2 parents a1e5725 + 5219914 commit 042a1e7

10 files changed

Lines changed: 155 additions & 42 deletions

File tree

ingestify/application/dataset_store.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -497,15 +497,19 @@ def update_dataset(
497497

498498
def invalidate_revision(self, dataset: Dataset, reason: str = ""):
499499
"""Mark the current revision as VALIDATION_FAILED and reset
500-
last_modified_at so the dataset is refetched on the next run.
501-
502-
Args:
503-
dataset: Dataset whose current revision should be invalidated
504-
reason: Human-readable reason for invalidation
505-
"""
506-
self.dataset_repository.invalidate_revision(dataset)
507-
508-
self.dispatch(RevisionInvalidated(dataset=dataset, reason=reason))
500+
last_modified_at so the dataset is refetched on the next run."""
501+
self.invalidate_revisions([dataset], reason=reason)
502+
503+
def invalidate_revisions(self, datasets: list, reason: str = ""):
504+
"""Batch invalidate revisions. Batches DB updates and event writes
505+
per 1000 datasets for efficiency."""
506+
batch_size = 1000
507+
for i in range(0, len(datasets), batch_size):
508+
batch = datasets[i : i + batch_size]
509+
self.dataset_repository.invalidate_revisions(batch)
510+
self.event_bus.dispatch_many(
511+
[RevisionInvalidated(dataset=ds, reason=reason) for ds in batch]
512+
)
509513

510514
def destroy_dataset(self, dataset: Dataset):
511515
# TODO: remove files. Now we leave some orphaned files around

ingestify/domain/models/dataset/dataset_repository.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,15 @@ def get_dataset_last_modified_at_map(
3636
dataset+revision+file graph."""
3737
return {}
3838

39-
@abstractmethod
4039
def invalidate_revision(self, dataset: Dataset):
4140
"""Mark the current revision as VALIDATION_FAILED and reset
4241
last_modified_at on the dataset."""
42+
self.invalidate_revisions([dataset])
43+
44+
@abstractmethod
45+
def invalidate_revisions(self, datasets: list[Dataset]):
46+
"""Batch invalidate: mark current revisions as VALIDATION_FAILED
47+
and reset last_modified_at on the datasets."""
4348
pass
4449

4550
@abstractmethod

ingestify/domain/models/event/dispatcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@
66
class Dispatcher(Protocol):
77
def dispatch(self, event: DomainEvent):
88
pass
9+
10+
def dispatch_many(self, events: list[DomainEvent]):
11+
pass

ingestify/domain/models/event/event_bus.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ def __init__(self, queue):
1414
def dispatch(self, event):
1515
self.queue.put(event)
1616

17+
def dispatch_many(self, events):
18+
for event in events:
19+
self.queue.put(event)
20+
1721

1822
class EventBus:
1923
def __init__(self):
@@ -37,3 +41,11 @@ def dispatch(self, event):
3741
except Exception as e:
3842
logger.exception(f"Failed to handle {event}")
3943
raise Exception(f"Failed to handle {event}") from e
44+
45+
def dispatch_many(self, events):
46+
for dispatcher in self.dispatchers:
47+
try:
48+
dispatcher.dispatch_many(events)
49+
except Exception as e:
50+
logger.exception(f"Failed to handle {len(events)} events")
51+
raise Exception(f"Failed to handle {len(events)} events") from e

ingestify/domain/models/event/publisher.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,14 @@ def dispatch(self, event: DomainEvent):
1919
except Exception:
2020
logger.exception(f"Failed to handle {event} by {subscriber}")
2121

22+
def dispatch_many(self, events: list[DomainEvent]):
23+
for subscriber in self.subscribers:
24+
try:
25+
subscriber.handle_many(events)
26+
except Exception:
27+
logger.exception(
28+
f"Failed to handle {len(events)} events by {subscriber}"
29+
)
30+
2231
def add_subscriber(self, subscriber: Subscriber):
2332
self.subscribers.append(subscriber)

ingestify/domain/models/event/subscriber.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,8 @@ def handle(self, event: DomainEvent):
4444
self.on_revision_added(event)
4545
elif isinstance(event, RevisionInvalidated):
4646
self.on_revision_invalidated(event)
47+
48+
def handle_many(self, events: list[DomainEvent]):
49+
"""Handle a batch of events. Override for efficient bulk writes."""
50+
for event in events:
51+
self.handle(event)

ingestify/infra/event_log/event_log.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,24 @@ def __init__(self, engine, table_prefix: str = ""):
3131
self._table.create(engine, checkfirst=True)
3232

3333
def write(self, event: DomainEvent) -> None:
34+
self.write_many([event])
35+
36+
def write_many(self, events: list[DomainEvent]) -> None:
37+
if not events:
38+
return
39+
now = utcnow()
40+
rows = [
41+
{
42+
"event_type": type(event).event_type,
43+
"payload_json": event.model_dump(mode="json"),
44+
"source": event.dataset.provider,
45+
"dataset_id": event.dataset.dataset_id,
46+
"created_at": now,
47+
}
48+
for event in events
49+
]
3450
with self._engine.connect() as conn:
35-
conn.execute(
36-
self._table.insert().values(
37-
event_type=type(event).event_type,
38-
payload_json=event.model_dump(mode="json"),
39-
source=event.dataset.provider,
40-
dataset_id=event.dataset.dataset_id,
41-
created_at=utcnow(),
42-
)
43-
)
51+
conn.execute(self._table.insert(), rows)
4452
conn.commit()
4553

4654
def fetch_batch(self, last_event_id: int, batch_size: int) -> list:

ingestify/infra/event_log/subscriber.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ def _write(self, event) -> None:
3434
event.dataset.dataset_id,
3535
)
3636

37+
def _write_many(self, events) -> None:
38+
try:
39+
self._event_log.write_many(events)
40+
except Exception:
41+
logger.exception(
42+
"EventLogSubscriber: failed to write %d events",
43+
len(events),
44+
)
45+
3746
def on_dataset_created(self, event) -> None:
3847
self._write(event)
3948

@@ -45,3 +54,6 @@ def on_revision_added(self, event) -> None:
4554

4655
def on_revision_invalidated(self, event) -> None:
4756
self._write(event)
57+
58+
def handle_many(self, events) -> None:
59+
self._write_many(events)

ingestify/infra/store/dataset/sqlalchemy/repository.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -677,28 +677,34 @@ def _save(self, datasets: list[Dataset]):
677677
connection.commit()
678678

679679
def invalidate_revision(self, dataset: Dataset):
680-
current_revision = dataset.current_revision
680+
self.invalidate_revisions([dataset])
681+
682+
def invalidate_revisions(self, datasets: list[Dataset]):
683+
if not datasets:
684+
return
685+
686+
dataset_ids = [d.dataset_id for d in datasets]
687+
681688
with self.connect() as connection:
682-
# Set revision state to VALIDATION_FAILED
689+
# Batch update revision state
683690
connection.execute(
684691
self.revision_table.update()
685-
.where(self.revision_table.c.dataset_id == dataset.dataset_id)
686-
.where(
687-
self.revision_table.c.revision_id == current_revision.revision_id
688-
)
692+
.where(self.revision_table.c.dataset_id.in_(dataset_ids))
689693
.values(state=RevisionState.VALIDATION_FAILED)
690694
)
691-
# Reset last_modified_at so the pre-check cache doesn't skip it
695+
# Batch reset last_modified_at
692696
connection.execute(
693697
self.dataset_table.update()
694-
.where(self.dataset_table.c.dataset_id == dataset.dataset_id)
698+
.where(self.dataset_table.c.dataset_id.in_(dataset_ids))
695699
.values(last_modified_at=None)
696700
)
697701
connection.commit()
698702

699703
# Update in-memory state
700-
current_revision.state = RevisionState.VALIDATION_FAILED
701-
dataset.last_modified_at = None
704+
for dataset in datasets:
705+
if dataset.current_revision:
706+
dataset.current_revision.state = RevisionState.VALIDATION_FAILED
707+
dataset.last_modified_at = None
702708

703709
def destroy(self, dataset: Dataset):
704710
with self.connect() as connection:

ingestify/tests/test_refetch_validation_failed.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,27 @@ def counting_loader(file_resource, current_file, **kwargs):
2525
class SimpleSource(Source):
2626
provider = "test_provider"
2727

28+
def __init__(self, name, n_datasets=1):
29+
super().__init__(name)
30+
self.n_datasets = n_datasets
31+
2832
def find_datasets(
2933
self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs
3034
):
31-
r = DatasetResource(
32-
dataset_resource_id={"item_id": 1},
33-
provider=self.provider,
34-
dataset_type="test",
35-
name="item-1",
36-
)
37-
r.add_file(
38-
last_modified=FIXED_TIME,
39-
data_feed_key="f1",
40-
data_spec_version="v1",
41-
file_loader=counting_loader,
42-
)
43-
yield r
35+
for i in range(self.n_datasets):
36+
r = DatasetResource(
37+
dataset_resource_id={"item_id": i},
38+
provider=self.provider,
39+
dataset_type="test",
40+
name=f"item-{i}",
41+
)
42+
r.add_file(
43+
last_modified=FIXED_TIME,
44+
data_feed_key="f1",
45+
data_spec_version="v1",
46+
file_loader=counting_loader,
47+
)
48+
yield r
4449

4550

4651
def _setup(engine):
@@ -99,3 +104,47 @@ def test_invalidate_revision_triggers_refetch(engine):
99104
# Second run: should refetch
100105
engine.run()
101106
assert call_count == 2, "Dataset with invalidated revision should be refetched"
107+
108+
109+
def test_invalidate_revisions_batch(engine):
110+
"""invalidate_revisions works on multiple datasets at once."""
111+
global call_count
112+
call_count = 0
113+
114+
dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}})
115+
engine.add_ingestion_plan(
116+
IngestionPlan(
117+
source=SimpleSource("s", n_datasets=5),
118+
fetch_policy=FetchPolicy(),
119+
dataset_type="test",
120+
selectors=[Selector.build({}, data_spec_versions=dsv)],
121+
data_spec_versions=dsv,
122+
)
123+
)
124+
125+
# First run: creates 5 datasets
126+
engine.run()
127+
assert call_count == 5
128+
129+
# Batch invalidate all 5
130+
datasets = list(
131+
engine.store.get_dataset_collection(
132+
provider="test_provider", dataset_type="test"
133+
)
134+
)
135+
assert len(datasets) == 5
136+
engine.store.invalidate_revisions(datasets, reason="Batch test")
137+
138+
# Verify all invalidated
139+
datasets = list(
140+
engine.store.get_dataset_collection(
141+
provider="test_provider", dataset_type="test"
142+
)
143+
)
144+
for ds in datasets:
145+
assert ds.current_revision.state == RevisionState.VALIDATION_FAILED
146+
assert ds.last_modified_at is None
147+
148+
# Second run: should refetch all 5
149+
engine.run()
150+
assert call_count == 10, "All 5 invalidated datasets should be refetched"

0 commit comments

Comments
 (0)