Skip to content

Commit 6a25464

Browse files
committed
enh: add MetaCache for persistent metadata management
1 parent 0c28b67 commit 6a25464

4 files changed

Lines changed: 529 additions & 4 deletions

File tree

CHANGELOG

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
- fix: error handling when connection to server fails on download (#75)
33
- fix: '_condensed' suffix for file stem randomly missing in downloads
44
- enh: add `SQLiteKeyJSONDatabase` for persistent metadata storage
5+
- enh: add `MetaCache` for persistent metadata management
56
- enh: add `CKANAPI.hostname`
67
- enh: sort search results by creation date instead of score and modified date
78
- enh: convenience kwarg `since_time` for `search_dataset_via_api`

dcoraid/dbmodel/meta_cache.py

Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
import logging
2+
import pathlib
3+
import traceback
4+
from typing import Any
5+
6+
import numpy as np
7+
8+
from .meta_cache_sqlite import SQLiteKeyJSONDatabase
9+
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class MetaCache:
15+
"""Cache dataset dictionaries (metadata)
16+
17+
The implementation uses an SQLite database which is loaded upon
18+
init and edited whenever data changes. For compute-intensive
19+
tasks (searching), the metadata are loaded into memory.
20+
"""
21+
def __init__(self,
22+
directory: str | pathlib.Path,
23+
circle_ids: list[str] = None,
24+
) -> None:
25+
"""
26+
Scan *directory* for ``circle_*.db`` files, load all of them
27+
and fill the numpy backing store.
28+
29+
Parameters
30+
----------
31+
directory : str | pathlib.Path
32+
Path to the folder that will hold the ``circle_<org_id>.db`` files.
33+
The folder is created automatically if it does not exist.
34+
circle_ids : list[str]
35+
List of circle IDs that should be taken into consideration.
36+
If set to None (default), all databases in the `directory`
37+
are loaded.
38+
"""
39+
self.base_dir = pathlib.Path(directory).expanduser().resolve()
40+
self.base_dir.mkdir(parents=True, exist_ok=True)
41+
42+
# The registry is a dictionary with circle IDs and a list of
43+
# dataset IDs as values.
44+
self._registry_org = {}
45+
46+
# List of all dataset dictionaries (only used during init)
47+
datasets = []
48+
# List of search data (only used during init)
49+
rows: list[tuple] = []
50+
51+
# Initialize the databases
52+
if circle_ids is None:
53+
db_paths = list(self.base_dir.glob("circle_*.db"))
54+
else:
55+
db_paths = [self.base_dir / f"circle_{c}.db" for c in circle_ids]
56+
# Dictionary of databases for persistent storage
57+
self._databases = {}
58+
for cp in db_paths:
59+
cid = cp.stem.split("_", 1)[-1]
60+
try:
61+
self._databases[cid] = SQLiteKeyJSONDatabase(cp)
62+
except BaseException:
63+
logger.error(
64+
f"Recreating broken DB '{cp}': {traceback.format_exc()}")
65+
cp.unlink()
66+
self._databases[cid] = SQLiteKeyJSONDatabase(cp)
67+
68+
# populate registry, dataset list, and search array data
69+
# initial blob size for dataset search
70+
blob_size = 256
71+
for db in self._databases.values():
72+
for ds_dict in db:
73+
ds_id: str = ds_dict.get("id", "")
74+
m_created: str = ds_dict.get("metadata_created", "")
75+
76+
# Build a double-space‑separated string of **only values**
77+
value_blob = _create_blob_for_search(ds_dict)
78+
blob_size = max(blob_size, len(value_blob))
79+
80+
rows.append((ds_id, m_created, value_blob))
81+
datasets.append(ds_dict)
82+
self._registry_org.setdefault(ds_dict["owner_org"],
83+
[]).append(ds_dict["id"])
84+
85+
# Convert the Python list of tuples into a NumPy 2‑D object array
86+
# TODO: If there are performance issues, we can create these arrays
87+
# in chunks of e.g. 10000 in the above for-loop.
88+
data_dtype = [
89+
# "4b1f7c53-9d7f-2c53-aeb6-eaa8ecf10ca9"
90+
("id", "<U36"),
91+
# 2025-09-18T08:09:52.947634
92+
("created", "<U26"),
93+
# initialize with maximum size
94+
("blob", f"<U{blob_size}")
95+
]
96+
if rows:
97+
data = np.array(rows, dtype=data_dtype) # shape (n_records, 3)
98+
else:
99+
# initialize emtpy cache
100+
data = np.empty(0, dtype=data_dtype)
101+
102+
# Sort the dataset according to creation date, descending
103+
sort_idx = np.argsort(data["created"])[::-1]
104+
#: Blobs for searching, sorted by creation date descending
105+
self._srt_blobs = data[sort_idx]
106+
107+
#: list of datasets, sorted by creation date descending
108+
self.datasets = [datasets[ii] for ii in sort_idx]
109+
110+
def __enter__(self):
111+
return self
112+
113+
def __exit__(self, exc_type, exc_val, exc_tb):
114+
self.close()
115+
116+
def close(self):
117+
for db in self._databases.values():
118+
db.close()
119+
120+
def search(self,
121+
query: str,
122+
limit: int = None
123+
) -> list[dict[str, Any]]:
124+
"""
125+
Free‑text search across **all** cached datasets.
126+
127+
The search is case‑insensitive and matches if the query appears
128+
anywhere in the JSON blob created from the database.
129+
130+
Parameters
131+
----------
132+
query: str
133+
Text to look for.
134+
limit: int | None, optional
135+
Maximum number of results to return. ``None`` (default) returns all
136+
matches.
137+
138+
Returns
139+
-------
140+
List[dict]
141+
A list of the matching dataset dictionaries
142+
"""
143+
norm_query = query.strip().lower()
144+
if not norm_query:
145+
return [] # empty query -> no results
146+
147+
# `np.strings.find` works element‑wise on an array of strings
148+
# and returns the index of the first occurrence (or -1 if not found).
149+
# `self._srt_blobs[:, 2]` is the column that holds the lower‑cased
150+
# blobs.
151+
match_mask = np.strings.find(self._srt_blobs["blob"], norm_query) != -1
152+
153+
if limit is not None:
154+
# Remove all items that are above a threshold.
155+
# TODO: apply the limit already during search by chunking?
156+
idx_lim = np.where(np.cumsum(match_mask) > limit)[0]
157+
if idx_lim.size:
158+
match_mask[idx_lim.min():] = False
159+
160+
idx_result = np.where(match_mask)[0]
161+
162+
return [self.datasets[idx] for idx in idx_result]
163+
164+
def upsert_dataset(self, ds_dict: dict[str, Any]) -> None:
165+
"""Insert a new dataset or update an existing one
166+
167+
If the dataset `id` is not present in the cache, it is
168+
appended to the persistent SQLite database and to the in‑memory
169+
structures.
170+
171+
If a dataset with the same `id` already exists, the record is
172+
replaced everywhere.
173+
174+
Parameters
175+
----------
176+
ds_dict : dict
177+
A CKAN‑style dataset dictionary.
178+
"""
179+
ds_id = ds_dict.get("id")
180+
org_id = ds_dict.get("owner_org")
181+
182+
# Is this dataset new?
183+
if ds_id not in self._registry_org.setdefault(org_id, []):
184+
# We have a new dataset
185+
self._upsert_dataset_insert(ds_dict)
186+
else:
187+
# We have an existing dataset
188+
if self._databases[org_id][ds_id] != ds_dict:
189+
# We have tp update the dataset
190+
self._upsert_dataset_update(ds_dict)
191+
192+
def _upsert_dataset_insert(self, ds_dict):
193+
"""Insert a new dataset
194+
195+
The search array will be rebuilt fully, because the order of
196+
datasets will change.
197+
"""
198+
ds_id = ds_dict["id"]
199+
org_id = ds_dict["owner_org"]
200+
m_created = ds_dict["metadata_created"]
201+
blob = _create_blob_for_search(ds_dict)
202+
if len(blob) > int(self._srt_blobs.dtype["blob"].str[2:]):
203+
# Increase the search blob size.
204+
new_dtype = [("id", "<U36"),
205+
("created", "<U26"),
206+
("blob", f"<U{len(blob) + 10}")
207+
]
208+
else:
209+
new_dtype = self._srt_blobs.dtype
210+
211+
# registry
212+
self._registry_org.setdefault(org_id, []).append(ds_id)
213+
214+
# search array
215+
dates = np.array(self._srt_blobs["created"])
216+
new_size = dates.size + 1
217+
new_idx = np.searchsorted(dates, m_created)
218+
dates.resize(new_size)
219+
new_blobs = np.empty(new_size, dtype=new_dtype)
220+
new_blobs[:new_idx] = self._srt_blobs[:new_idx]
221+
new_blobs[new_idx] = (ds_id, m_created, blob)
222+
new_blobs[new_idx + 1:] = self._srt_blobs[new_idx:]
223+
self._srt_blobs = new_blobs
224+
225+
# datasets
226+
self.datasets.insert(new_idx, ds_dict)
227+
228+
# persistent database
229+
if org_id not in self._databases:
230+
self._databases[org_id] = SQLiteKeyJSONDatabase(
231+
db_name=self.base_dir / f"circle_{org_id}.db")
232+
self._databases[org_id][ds_id] = ds_dict
233+
234+
def _upsert_dataset_update(self, ds_dict):
235+
"""Update an existing dataset
236+
237+
If the new blob for `ds_dict` is larger than the existing,
238+
a new copy of the search array is created in memory.
239+
"""
240+
ds_id = ds_dict["id"]
241+
org_id = ds_dict["owner_org"]
242+
243+
# registry does not need updating (only contains ds_id)
244+
245+
# Find the index in the database
246+
idx = np.where(self._srt_blobs["id"] == ds_id)[0][0]
247+
248+
# search array
249+
blob = _create_blob_for_search(ds_dict)
250+
if len(blob) > int(self._srt_blobs.dtype["blob"].str[2:]):
251+
# Rewrite the search blobs, because this blob is bigger than any
252+
# of the blobs before.
253+
new_dtype = [("id", "<U36"),
254+
("created", "<U26"),
255+
("blob", f"<U{len(blob) + 10}")
256+
]
257+
self._srt_blobs = np.array(self._srt_blobs, dtype=new_dtype)
258+
self._srt_blobs["blob"][idx] = blob
259+
260+
# cached datasets
261+
self.datasets[idx] = ds_dict
262+
263+
# persistent database
264+
self._databases[org_id][ds_id] = ds_dict
265+
266+
267+
def _create_blob_for_search(ds_dict):
268+
"""Create a string blob from a dataset dictionary for free text search"""
269+
values = _values_only(ds_dict,
270+
only_keys=[
271+
# dataset
272+
"authors",
273+
"creator_user_id",
274+
"doi",
275+
"id",
276+
"name",
277+
"notes",
278+
"title",
279+
# resource
280+
"resources",
281+
"dc:experiment:date",
282+
"dc:experiment:sample",
283+
"dc:setup:chip region",
284+
"dc:setup:identifier",
285+
"dc:setup:module composition",
286+
"description",
287+
# "id", # duplicate
288+
# "name", # duplicate
289+
"organization",
290+
# tags
291+
"tags",
292+
"display_name",
293+
# groups
294+
"groups",
295+
])
296+
value_blob: str = " ".join(values).lower()
297+
return value_blob
298+
299+
300+
def _values_only(obj: Any,
301+
only_keys: list[str],
302+
) -> list[str]:
303+
"""
304+
Recursively walk a JSON‑compatible object and return a flat list of the
305+
string representation of all values whose keys are in `only_keys`.
306+
Lists, tuples and dicts are traversed; other scalar types are converted
307+
with ``str``.
308+
"""
309+
vals = []
310+
311+
if isinstance(obj, dict):
312+
for k, v in obj.items():
313+
if k in only_keys:
314+
vals += _values_only(v, only_keys)
315+
elif isinstance(obj, (list, tuple)):
316+
for v in obj:
317+
vals += _values_only(v, only_keys)
318+
else:
319+
# scalar (str, int, float, bool, None)
320+
if obj not in [None, ""]:
321+
vals.append(str(obj))
322+
return vals

tests/common.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,13 @@ def make_dataset_dict(hint=""):
6767
return dataset_dict
6868

6969

70-
def make_dataset_dict_full_fake():
71-
org_id = f"{uuid.uuid4()}"
72-
ds_id = f"{uuid.uuid4()}"
70+
def make_dataset_dict_full_fake(org_id=None, ds_id=None, title=None):
71+
org_id = org_id or f"{uuid.uuid4()}"
72+
ds_id = ds_id or f"{uuid.uuid4()}"
7373
user_id = f"{uuid.uuid4()}"
7474
created = datetime.date.fromtimestamp(time.time() - 50).isoformat()
7575
modified = datetime.date.fromtimestamp(time.time()).isoformat()
76+
title = title or "Standard title"
7677

7778
return {"authors": "John Doe",
7879
"creator_user_id": f"{user_id}",
@@ -98,7 +99,8 @@ def make_dataset_dict_full_fake():
9899
"state": "active"},
99100
"owner_org": f"{org_id}",
100101
"private": True, "references": "",
101-
"state": "active", "title": "Dataset Title",
102+
"state": "active",
103+
"title": title,
102104
"type": "dataset", "resources": [
103105
{"cache_last_updated": None, "cache_url": None,
104106
"created": created,

0 commit comments

Comments
 (0)