Skip to content

Commit 43a9177

Browse files
committed
fix: speed up traces sync by skipping partitions that exist
1 parent 0e42523 commit 43a9177

2 files changed

Lines changed: 24 additions & 0 deletions

File tree

src/biodata_cache/backend.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ def get_versions_index(self) -> list[str]:
7070
"""Return the list of all available version folders from cache_versions.json."""
7171
pass # pragma: no cover
7272

73+
def partition_exists(self, table_name: str) -> bool:
74+
"""Return True if data already exists for the given partition."""
75+
return False
76+
7377

7478
class S3Backend(Backend):
7579
"""Stores and retrieves caches using AWS S3 with parquet files."""
@@ -149,6 +153,16 @@ def clear_partition(self, table_name: str) -> None:
149153
Delete={"Objects": to_delete[i : i + 1000]},
150154
)
151155

156+
def partition_exists(self, table_name: str) -> bool:
157+
"""Return True if any parquet chunk exists for a hive partition."""
158+
if "/" not in table_name:
159+
return False
160+
base, value = table_name.split("/", 1)
161+
partition_key = HIVE_PARTITION_KEYS[base]
162+
prefix = f"{_CACHE_ROOT}/{_VERSION_FOLDER}/{base}/{partition_key}={value}/"
163+
resp = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=prefix, MaxKeys=1)
164+
return resp.get("KeyCount", 0) > 0
165+
152166
def write_chunk(self, table_name: str, data: pd.DataFrame, chunk_idx: int) -> None:
153167
"""Append one numbered parquet chunk to a hive partition."""
154168
base, value = table_name.split("/", 1)
@@ -363,6 +377,11 @@ def get_versions_index(self) -> list[str]:
363377
"""Return the list of all available version folders from the in-memory index."""
364378
return json.loads(self._json_store.get("cache_versions.json", "[]"))
365379

380+
def partition_exists(self, table_name: str) -> bool:
381+
"""Return True if a partition has stored data in memory."""
382+
df = self._store.get(table_name)
383+
return df is not None and not df.empty
384+
366385
def clear_partition(self, table_name: str) -> None:
367386
"""Remove all chunks stored for a partitioned table."""
368387
self._store.pop(table_name, None)

src/biodata_cache/sync.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,11 @@ def update_all_tables(fast: bool = True, slow: bool = True) -> None:
279279
fib_subject_ids = (
280280
df_basics[fib_mask & (df_basics["data_level"] == "derived")]["subject_id"].dropna().unique()
281281
)
282+
fib_subject_ids = [
283+
subject_id
284+
for subject_id in fib_subject_ids
285+
if not BACKEND.partition_exists(f"{NAMES['fib_traces']}/{subject_id}")
286+
]
282287
if len(fib_subject_ids) > 0:
283288
fib_traces_fn = TABLE_REGISTRY[NAMES["fib_traces"]]
284289
try:

0 commit comments

Comments
 (0)