Skip to content

Commit 7b13cd3

Browse files
authored
Merge pull request #81 from PySport/feature/refetch-validation-failed
Add invalidate_revision for data quality-driven refetch
2 parents fba41de + d4987e6 commit 7b13cd3

6 files changed

Lines changed: 158 additions & 1 deletion

File tree

ingestify/application/dataset_store.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
)
2121

2222
from ingestify.domain.models.dataset.dataset import DatasetState
23-
from ingestify.domain.models.dataset.events import RevisionAdded, MetadataUpdated
23+
from ingestify.domain.models.dataset.events import (
24+
RevisionAdded,
25+
MetadataUpdated,
26+
RevisionInvalidated,
27+
)
2428
from ingestify.domain.models.dataset.file import NotModifiedFile
2529
from ingestify.domain.models.dataset.file_collection import FileCollection
2630
from ingestify.domain.models.dataset.revision import RevisionSource
@@ -491,6 +495,18 @@ def update_dataset(
491495

492496
return revision
493497

498+
def invalidate_revision(self, dataset: Dataset, reason: str = ""):
499+
"""Mark the current revision as VALIDATION_FAILED and reset
500+
last_modified_at so the dataset is refetched on the next run.
501+
502+
Args:
503+
dataset: Dataset whose current revision should be invalidated
504+
reason: Human-readable reason for invalidation
505+
"""
506+
self.dataset_repository.invalidate_revision(dataset)
507+
508+
self.dispatch(RevisionInvalidated(dataset=dataset, reason=reason))
509+
494510
def destroy_dataset(self, dataset: Dataset):
495511
# TODO: remove files. Now we leave some orphaned files around
496512
self.dataset_repository.destroy(dataset)

ingestify/domain/models/dataset/dataset_repository.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ def get_dataset_last_modified_at_map(
3636
dataset+revision+file graph."""
3737
return {}
3838

39+
@abstractmethod
40+
def invalidate_revision(self, dataset: Dataset):
41+
"""Mark the current revision as VALIDATION_FAILED and reset
42+
last_modified_at on the dataset."""
43+
pass
44+
3945
@abstractmethod
4046
def destroy(self, dataset: Dataset):
4147
pass

ingestify/domain/models/dataset/events.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,9 @@ class SelectorSkipped(DomainEvent):
3232
class DatasetSkipped(DomainEvent):
3333
dataset: Dataset
3434
event_type: ClassVar[str] = "dataset_skipped"
35+
36+
37+
class RevisionInvalidated(DomainEvent):
38+
dataset: Dataset
39+
reason: str
40+
event_type: ClassVar[str] = "revision_invalidated"

ingestify/domain/models/fetch_policy.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import timedelta
22

33
from ingestify.domain import Dataset, Identifier, DatasetResource
4+
from ingestify.domain.models.dataset.revision import RevisionState
45
from ingestify.utils import utcnow
56

67

@@ -22,6 +23,8 @@ def should_refetch(
2223
# TODO: this is weird? Dataset without any data. Fetch error?
2324
return True
2425
elif current_revision:
26+
if current_revision.state == RevisionState.VALIDATION_FAILED:
27+
return True
2528
files_last_modified = {
2629
file.file_id: file.last_modified
2730
for file in dataset_resource.files.values()

ingestify/infra/store/dataset/sqlalchemy/repository.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from sqlalchemy.orm import Session, Query, sessionmaker, scoped_session
2727

2828
from ingestify.domain import File, Revision
29+
from ingestify.domain.models.dataset.revision import RevisionState
2930
from ingestify.domain.models import (
3031
Dataset,
3132
DatasetCollection,
@@ -675,6 +676,30 @@ def _save(self, datasets: list[Dataset]):
675676
else:
676677
connection.commit()
677678

679+
def invalidate_revision(self, dataset: Dataset):
680+
current_revision = dataset.current_revision
681+
with self.connect() as connection:
682+
# Set revision state to VALIDATION_FAILED
683+
connection.execute(
684+
self.revision_table.update()
685+
.where(self.revision_table.c.dataset_id == dataset.dataset_id)
686+
.where(
687+
self.revision_table.c.revision_id == current_revision.revision_id
688+
)
689+
.values(state=RevisionState.VALIDATION_FAILED)
690+
)
691+
# Reset last_modified_at so the pre-check cache doesn't skip it
692+
connection.execute(
693+
self.dataset_table.update()
694+
.where(self.dataset_table.c.dataset_id == dataset.dataset_id)
695+
.values(last_modified_at=None)
696+
)
697+
connection.commit()
698+
699+
# Update in-memory state
700+
current_revision.state = RevisionState.VALIDATION_FAILED
701+
dataset.last_modified_at = None
702+
678703
def destroy(self, dataset: Dataset):
679704
with self.connect() as connection:
680705
try:
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""Tests for invalidating revisions and refetching."""
2+
from datetime import datetime, timezone
3+
4+
from ingestify import Source, DatasetResource
5+
from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector
6+
from ingestify.domain.models.dataset.collection_metadata import (
7+
DatasetCollectionMetadata,
8+
)
9+
from ingestify.domain.models.dataset.revision import RevisionState
10+
from ingestify.domain.models.fetch_policy import FetchPolicy
11+
from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan
12+
13+
# Fixed timestamp so last_modified doesn't change between runs
14+
FIXED_TIME = datetime(2026, 1, 1, tzinfo=timezone.utc)
15+
16+
call_count = 0
17+
18+
19+
def counting_loader(file_resource, current_file, **kwargs):
20+
global call_count
21+
call_count += 1
22+
return DraftFile.from_input(f"data-{call_count}", data_feed_key="f1")
23+
24+
25+
class SimpleSource(Source):
26+
provider = "test_provider"
27+
28+
def find_datasets(
29+
self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs
30+
):
31+
r = DatasetResource(
32+
dataset_resource_id={"item_id": 1},
33+
provider=self.provider,
34+
dataset_type="test",
35+
name="item-1",
36+
)
37+
r.add_file(
38+
last_modified=FIXED_TIME,
39+
data_feed_key="f1",
40+
data_spec_version="v1",
41+
file_loader=counting_loader,
42+
)
43+
yield r
44+
45+
46+
def _setup(engine):
47+
dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}})
48+
engine.add_ingestion_plan(
49+
IngestionPlan(
50+
source=SimpleSource("s"),
51+
fetch_policy=FetchPolicy(),
52+
dataset_type="test",
53+
selectors=[Selector.build({}, data_spec_versions=dsv)],
54+
data_spec_versions=dsv,
55+
)
56+
)
57+
58+
59+
def test_normal_second_run_skips(engine):
60+
"""Verify a second run with same last_modified does NOT refetch."""
61+
global call_count
62+
call_count = 0
63+
_setup(engine)
64+
65+
engine.run()
66+
assert call_count == 1
67+
68+
engine.run()
69+
assert call_count == 1, "Should NOT refetch when nothing changed"
70+
71+
72+
def test_invalidate_revision_triggers_refetch(engine):
73+
"""Invalidating a revision causes ingestify to refetch on next run."""
74+
global call_count
75+
call_count = 0
76+
_setup(engine)
77+
78+
# First run: creates the dataset
79+
engine.run()
80+
assert call_count == 1
81+
82+
# Invalidate the current revision
83+
datasets = list(
84+
engine.store.get_dataset_collection(
85+
provider="test_provider", dataset_type="test"
86+
)
87+
)
88+
dataset = datasets[0]
89+
engine.store.invalidate_revision(dataset, reason="Data quality check failed")
90+
91+
# Verify state
92+
datasets = list(
93+
engine.store.get_dataset_collection(
94+
provider="test_provider", dataset_type="test"
95+
)
96+
)
97+
assert datasets[0].current_revision.state == RevisionState.VALIDATION_FAILED
98+
99+
# Second run: should refetch
100+
engine.run()
101+
assert call_count == 2, "Dataset with invalidated revision should be refetched"

0 commit comments

Comments
 (0)