Skip to content

Commit 7e06594

Browse files
committed
reg: several fixes for MetaCache, more tests, and a new wrapper method for adding many datasets
1 parent 7a10aac commit 7e06594

2 files changed

Lines changed: 199 additions & 78 deletions

File tree

dcoraid/dbmodel/meta_cache.py

Lines changed: 112 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class MetaCache:
4747
The implementation uses an SQLite database which is loaded upon
4848
init and edited whenever data changes. For compute-intensive
4949
tasks (searching), the metadata are loaded into memory.
50+
51+
Datasets are sorted according to "metadata_created", descending.
5052
"""
5153
def __init__(self,
5254
directory: str | pathlib.Path,
@@ -191,73 +193,6 @@ def close(self):
191193
for db in self._databases.values():
192194
db.close()
193195

194-
def insert_many(self, org_id, dataset_dicts):
195-
"""Insert multiple datasets at once for one organization
196-
197-
This is essentially the vectorization of
198-
:meth:`MetaCache._upsert_dataset_insert`.
199-
This method MUST NOT be used for "updating" datasets.
200-
"""
201-
ds_ids = [ds_dict["id"] for ds_dict in dataset_dicts]
202-
ms_created = [ds_dict["metadata_created"] for ds_dict in dataset_dicts]
203-
blobs_new = [
204-
_create_blob_for_search(ds_dict) for ds_dict in dataset_dicts]
205-
blob_max_len = max(len(b) for b in blobs_new)
206-
207-
if blob_max_len > int(self._srt_blobs.dtype["blob"].str[2:]):
208-
# Increase the search blob size.
209-
new_dtype = [("id", "<U36"),
210-
("created", "<U26"),
211-
("blob", f"<U{blob_max_len + 10}")
212-
]
213-
else:
214-
new_dtype = self._srt_blobs.dtype
215-
216-
# registry
217-
self._registry_org.setdefault(org_id, []).__add__(ds_ids)
218-
219-
# search array
220-
dates_cur = np.array(self._srt_blobs["created"], copy=True)
221-
size_old = dates_cur.size
222-
dates_new = np.array(ms_created)
223-
dates_comb = np.concatenate((dates_cur, dates_new))
224-
sorter = np.argsort(dates_comb)
225-
sorter_cur = sorter[:size_old]
226-
sorter_new = sorter[size_old:]
227-
228-
new_blobs = np.empty(dates_comb.size, dtype=new_dtype)
229-
new_blobs[sorter_cur] = self._srt_blobs
230-
new_blobs["blob"][sorter_new] = blobs_new
231-
new_blobs["created"][sorter_new] = dates_new
232-
new_blobs["id"][sorter_new] = ds_ids
233-
234-
self._srt_blobs = new_blobs
235-
236-
# datasets
237-
datasets_unsrt = self.datasets + dataset_dicts
238-
self.datasets = [datasets_unsrt[ii] for ii in sorter]
239-
240-
# persistent database
241-
if org_id not in self._databases:
242-
self._databases[org_id] = SQLiteKeyJSONDatabase(
243-
db_name=self.base_dir / f"org_{org_id}.db")
244-
self._databases[org_id].insert_many(dataset_dicts)
245-
246-
# user's dataset list
247-
datasets_user_owned_unsrt = (
248-
self.datasets_user_owned
249-
+ [ds["creator_user_id"] == self.user_id for ds in dataset_dicts]
250-
)
251-
self.datasets_user_owned = [
252-
datasets_user_owned_unsrt[ii] for ii in sorter]
253-
254-
# dataset IDs
255-
dataset_ids_unsr = self._dataset_ids + ds_ids
256-
self._dataset_ids = [dataset_ids_unsr[ii] for ii in sorter]
257-
258-
for (idx, ds_id) in enumerate(self._dataset_ids):
259-
self._dataset_index_dict[ds_id] = idx
260-
261196
def reset(self):
262197
"""Reset the entire database"""
263198
with self._lock:
@@ -435,6 +370,116 @@ def _upsert_dataset_update(self, ds_dict):
435370
# persistent database
436371
self._databases[org_id][ds_id] = ds_dict
437372

373+
def upsert_many(self, dataset_dicts, org_id=None):
374+
"""Insert or update multiple datasets at once
375+
376+
The implementation is faster when the organization ID `org_id`
377+
is specified. The recommended workflow is to query the DCOR
378+
server by organization.
379+
"""
380+
if self._registry_org.get(org_id):
381+
# Separate the datasets into new and existing datasets.
382+
ds_list_update = [ds_dict for ds_dict in dataset_dicts
383+
if ds_dict["id"] in self._dataset_ids]
384+
else:
385+
# We have not seen this organization before
386+
ds_list_update = []
387+
388+
if not ds_list_update:
389+
# Nothing needs to be updated, and all datasets are inserted.
390+
ds_list_insert = dataset_dicts
391+
else:
392+
ds_list_insert = [ds_dict for ds_dict in dataset_dicts
393+
if ds_dict["id"] not in self._dataset_ids]
394+
395+
# Update datasets
396+
for ds_dict in ds_list_update:
397+
if self._databases[org_id][ds_dict["id"]] != ds_dict:
398+
self._upsert_dataset_update(ds_dict)
399+
400+
if ds_list_insert:
401+
# Insert datasets by organization
402+
if org_id is not None:
403+
# All datasets belong to one organization
404+
self._upsert_many_insert(org_id=org_id,
405+
dataset_dicts=ds_list_insert)
406+
else:
407+
# Iterate over all organization IDs in dataset_dicts
408+
org_dict = {}
409+
for ds_dict in ds_list_insert:
410+
org_dict.setdefault(ds_dict["owner_org"],
411+
[]).append(ds_dict)
412+
for org_id, ds_list in org_dict.items():
413+
self._upsert_many_insert(org_id=org_id,
414+
dataset_dicts=ds_list)
415+
416+
def _upsert_many_insert(self, org_id, dataset_dicts):
417+
"""Insert multiple datasets at once for one organization
418+
419+
This is essentially the vectorization of
420+
:meth:`MetaCache._upsert_dataset_insert`.
421+
This method MUST NOT be used for "updating" datasets.
422+
"""
423+
ds_ids = [ds_dict["id"] for ds_dict in dataset_dicts]
424+
ms_created = [ds_dict["metadata_created"] for ds_dict in dataset_dicts]
425+
blobs_new = [
426+
_create_blob_for_search(ds_dict) for ds_dict in dataset_dicts]
427+
blob_max_len = max(len(b) for b in blobs_new)
428+
429+
if blob_max_len > int(self._srt_blobs.dtype["blob"].str[2:]):
430+
# Increase the search blob size.
431+
new_dtype = [("id", "<U36"),
432+
("created", "<U26"),
433+
("blob", f"<U{blob_max_len + 10}")
434+
]
435+
else:
436+
new_dtype = self._srt_blobs.dtype
437+
438+
# registry
439+
self._registry_org.setdefault(org_id, []).__add__(ds_ids)
440+
441+
# search array
442+
dates_cur = np.array(self._srt_blobs["created"], copy=True)
443+
size_old = dates_cur.size
444+
dates_new = np.array(ms_created)
445+
dates_comb = np.concatenate((dates_cur, dates_new))
446+
# sort according to dates descending
447+
sorter = np.argsort(dates_comb)[::-1]
448+
449+
new_blobs = np.empty(dates_comb.size, dtype=new_dtype)
450+
new_blobs[:size_old] = self._srt_blobs["blob"]
451+
new_blobs["blob"][size_old:] = blobs_new
452+
new_blobs["created"][size_old:] = dates_new
453+
new_blobs["id"][size_old:] = ds_ids
454+
new_blobs = new_blobs[sorter]
455+
456+
self._srt_blobs = new_blobs
457+
458+
# datasets
459+
datasets_unsrt = self.datasets + dataset_dicts
460+
self.datasets = [datasets_unsrt[ii] for ii in sorter]
461+
462+
# persistent database
463+
if org_id not in self._databases:
464+
self._databases[org_id] = SQLiteKeyJSONDatabase(
465+
db_name=self.base_dir / f"org_{org_id}.db")
466+
self._databases[org_id].insert_many(dataset_dicts)
467+
468+
# user's dataset list
469+
datasets_user_owned_unsrt = (
470+
self.datasets_user_owned
471+
+ [ds["creator_user_id"] == self.user_id for ds in dataset_dicts]
472+
)
473+
self.datasets_user_owned = [
474+
datasets_user_owned_unsrt[ii] for ii in sorter]
475+
476+
# dataset IDs
477+
dataset_ids_unsr = self._dataset_ids + ds_ids
478+
self._dataset_ids = [dataset_ids_unsr[ii] for ii in sorter]
479+
480+
for (idx, ds_id) in enumerate(self._dataset_ids):
481+
self._dataset_index_dict[ds_id] = idx
482+
438483

439484
def _create_blob_for_search(ds_dict: dict) -> str:
440485
"""Create a string blob from a dataset dictionary for free text search"""

tests/test_dbmodel_meta_cache.py

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def test_cache_dataset_index_dict_2(tmp_path):
9090
ds_list.append(make_dataset_dict_full_fake(time_created=t0 + ii))
9191

9292
# sort dataset list according to ID
93-
ds_list_edit = sorted(ds_list, key = lambda x: x["id"])
93+
ds_list_edit = sorted(ds_list, key=lambda x: x["id"])
9494
assert ds_list_edit != ds_list
9595

9696
with meta_cache.MetaCache(tmp_path) as mc:
@@ -109,15 +109,15 @@ def test_cache_dataset_index_dict_2_multi(tmp_path):
109109

110110
ds_list = []
111111
for ii in range(200):
112-
ds_list.append(make_dataset_dict_full_fake(time_created=t0 + ii,
112+
ds_list.append(make_dataset_dict_full_fake(time_created=t0 - ii,
113113
org_id=org_id))
114114

115115
# sort dataset list according to ID
116-
ds_list_edit = sorted(ds_list, key = lambda x: x["id"])
116+
ds_list_edit = sorted(ds_list, key=lambda x: x["id"])
117117
assert ds_list_edit != ds_list
118118

119119
with meta_cache.MetaCache(tmp_path) as mc:
120-
mc.insert_many(org_id, ds_list_edit)
120+
mc._upsert_many_insert(org_id, ds_list_edit)
121121

122122
for ii in range(200):
123123
assert mc._dataset_index_dict[ds_list[ii]["id"]] == ii
@@ -151,7 +151,6 @@ def test_cache_datasets_user_owned(tmp_path):
151151
assert mc.datasets[idx] == ds
152152
assert not mc.datasets_user_owned[idx]
153153

154-
155154
with meta_cache.MetaCache(tmp_path, user_id=user_id) as mc:
156155
# Make sure that the user dictionaries are recovered correctly
157156
for ds in ds_user:
@@ -187,12 +186,12 @@ def test_cache_datasets_user_owned_full(tmp_path):
187186
for _ in range(10)
188187
]
189188
ds_user += ds_dicts_a
190-
mc.insert_many(org_id, ds_dicts_a)
189+
mc._upsert_many_insert(org_id, ds_dicts_a)
191190
ds_dicts_b = [
192191
make_dataset_dict_full_fake(org_id=org_id)
193192
for _ in range(10)
194193
]
195-
mc.insert_many(org_id, ds_dicts_b)
194+
mc._upsert_many_insert(org_id, ds_dicts_b)
196195
ds_other += ds_dicts_b
197196

198197
# Make sure that the user dictionaries are stored correctly
@@ -206,7 +205,6 @@ def test_cache_datasets_user_owned_full(tmp_path):
206205
assert mc.datasets[idx] == ds
207206
assert not mc.datasets_user_owned[idx]
208207

209-
210208
with meta_cache.MetaCache(tmp_path, user_id=user_id) as mc:
211209
# Make sure that the user dictionaries are recovered correctly
212210
for ds in ds_user:
@@ -220,9 +218,86 @@ def test_cache_datasets_user_owned_full(tmp_path):
220218
assert not mc.datasets_user_owned[idx]
221219

222220

221+
def test_cache_upsert_many(tmp_path):
222+
"""Test the `upsert_many` wrapper"""
223+
ds_list = []
224+
225+
org_id = str(uuid.uuid4())
226+
227+
# create datasets in ascending order (order must be reversed in MetaCache)
228+
for ii in range(15):
229+
ds_list.append(make_dataset_dict_full_fake(
230+
org_id=org_id,
231+
time_created=time.time() - 100 + ii,
232+
))
233+
234+
with meta_cache.MetaCache(tmp_path) as mc:
235+
mc.upsert_many(ds_list, org_id=org_id)
236+
237+
# make sure datasets are in descending order
238+
assert mc.datasets == ds_list[::-1]
239+
240+
with meta_cache.MetaCache(tmp_path) as mc:
241+
assert mc.datasets == ds_list[::-1]
242+
243+
ds_list[0]["title"] = "Peter Pan"
244+
ds_list[1]["title"] = "Peter Pan 3"
245+
ds_list[-1]["title"] = "Peter Pan 2"
246+
247+
mc.upsert_many(ds_list, org_id=org_id)
248+
249+
assert mc.datasets == ds_list[::-1]
250+
# descending order again
251+
assert mc.datasets[-1]["title"] == "Peter Pan"
252+
assert mc.datasets[0]["title"] == "Peter Pan 2"
253+
assert mc.datasets[-2]["title"] == "Peter Pan 3"
254+
255+
256+
def test_cache_upsert_many_search(tmp_path):
257+
"""Test the `upsert_many` wrapper, searching data afterwards"""
258+
ds_list = []
259+
260+
org_id = str(uuid.uuid4())
261+
262+
# create datasets in ascending order (order must be reversed in MetaCache)
263+
for ii in range(15):
264+
ds_list.append(make_dataset_dict_full_fake(
265+
org_id=org_id,
266+
time_created=time.time() - 100 + ii,
267+
))
268+
269+
ds_list.append(make_dataset_dict_full_fake(title="Mordor",
270+
org_id=org_id,
271+
time_created=time.time() - 98,
272+
))
273+
274+
org_id_2 = str(uuid.uuid4())
275+
276+
ds_list.append(make_dataset_dict_full_fake(title="Frodo",
277+
org_id=org_id_2,
278+
time_created=time.time() - 98,
279+
))
280+
281+
for ii in range(18):
282+
ds_list.append(make_dataset_dict_full_fake(
283+
org_id=org_id_2,
284+
time_created=time.time() - 100 + ii,
285+
))
286+
287+
with meta_cache.MetaCache(tmp_path) as mc:
288+
mc.upsert_many(ds_list, org_id=org_id)
289+
# search for them in freshly-modified cache
290+
assert mc.search(query="Frodo")[0]["title"] == "Frodo"
291+
assert mc.search(query="Mordor")[0]["title"] == "Mordor"
292+
293+
with meta_cache.MetaCache(tmp_path) as mc:
294+
# search for them in loaded cache
295+
assert mc.search(query="Frodo")[0]["title"] == "Frodo"
296+
assert mc.search(query="Mordor")[0]["title"] == "Mordor"
297+
223298

224299
@pytest.mark.parametrize("previous_datasets", [0, 10, 100])
225-
def test_cache_insert_many(tmp_path, previous_datasets):
300+
def test_cache_upsert_many_insert(tmp_path, previous_datasets):
226301
# generate datasets
227302
all_ds_dicts = []
228303
org_ids = [str(uuid.uuid4()), str(uuid.uuid4())]
@@ -241,7 +316,7 @@ def test_cache_insert_many(tmp_path, previous_datasets):
241316
mc.upsert_dataset(make_dataset_dict_full_fake())
242317

243318
for (org_id, ds_dicts) in zip(org_ids, all_ds_dicts):
244-
mc.insert_many(org_id, ds_dicts)
319+
mc._upsert_many_insert(org_id, ds_dicts)
245320

246321
# Check whether the databases exist
247322
for org_id in org_ids:
@@ -406,7 +481,8 @@ def test_cache_search_updated_dataset_case(tmp_path):
406481
[[2, {"other-key": "dop"}, [["sinn"]]], ["2", "sinn"]],
407482
])
408483
def test_values_only(input, output):
409-
assert list(meta_cache._values_only(input, only_keys=["peter", "rin"])) == output
484+
assert list(meta_cache._values_only(input,
485+
only_keys=["peter", "rin"])) == output
410486

411487

412488
def test_create_blob_for_search():

0 commit comments

Comments
 (0)