Skip to content

Commit d10ccf0

Browse files
committed
ref: perform search in sqlite, saves a lot of memory
1 parent 1cbc195 commit d10ccf0

3 files changed

Lines changed: 83 additions & 177 deletions

File tree

dcoraid/dbmodel/meta_cache.py

Lines changed: 42 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from itertools import chain
2+
import json
23
import logging
34
import pathlib
45
import threading
@@ -92,23 +93,18 @@ def __init__(self,
9293
#: Dict of dataset ID indices, mapping dataset ID to organization
9394
self._map_id_org = {}
9495

96+
#: List of dates
97+
self._dates = []
98+
9599
#: List of booleans indicating whether dataset was created by the user
96100
self.datasets_user_owned = []
97101

98-
# Search blob array
99-
self._srt_blobs = None
100-
101102
self._lock = threading.Lock()
102103

103104
with self._lock:
104105
self._initialize(org_ids)
105106

106107
def _initialize(self, org_ids=None):
107-
# List of all dataset dictionaries (only used during init)
108-
datasets: list[dict] = []
109-
# List of search data (only used during init)
110-
rows: list[tuple] = []
111-
112108
# Initialize the databases
113109
if org_ids is None:
114110
db_paths = list(self.base_dir.glob("org_*.db"))
@@ -127,59 +123,36 @@ def _initialize(self, org_ids=None):
127123

128124
# populate registry, dataset list, and search array data
129125
# initial blob size for dataset search
130-
blob_size = 256
126+
data_created = []
127+
data_ids = []
128+
data_user = []
129+
#: Dict of dataset ID organizations
130+
self._map_id_org = {}
131+
#: List of dates
131132
for db in self._databases.values():
132133
for ds_dict in db:
133-
ds_id: str = ds_dict.get("id", "")
134-
m_created: str = ds_dict.get("metadata_created", "")
135-
136-
# Build a double-space‑separated string of **only values**
137-
value_blob = _create_blob_for_search(ds_dict)
138-
blob_size = max(blob_size, len(value_blob))
139-
140-
rows.append((ds_id, m_created, value_blob))
141-
datasets.append(ds_dict)
134+
data_created.append(ds_dict["metadata_created"])
135+
data_ids.append(ds_dict["id"])
136+
data_user.append(ds_dict["creator_user_id"] == self.user_id)
137+
self._map_id_org[ds_dict["id"]] = ds_dict["owner_org"]
142138
self._registry_org.setdefault(ds_dict["owner_org"],
143139
[]).append(ds_dict["id"])
144140

145-
# Convert the Python list of tuples into a NumPy 2‑D object array
146-
# TODO: If there are performance issues, we can create these arrays
147-
# in chunks of e.g. 10000 in the above for-loop.
148-
data_dtype = [
149-
# "4b1f7c53-9d7f-2c53-aeb6-eaa8ecf10ca9"
150-
("id", "<U36"),
151-
# 2025-09-18T08:09:52.947634
152-
("created", "<U26"),
153-
# initialize with maximum size
154-
("blob", f"<U{blob_size}")
155-
]
156-
if rows:
157-
data = np.array(rows, dtype=data_dtype) # shape (n_records, 3)
158-
else:
159-
# initialize emtpy cache
160-
data = np.empty(0, dtype=data_dtype)
161-
162141
# Sort the dataset according to creation date, descending
163-
sort_idx = np.argsort(data["created"])[::-1]
164-
#: Blobs for searching, sorted by creation date descending
165-
self._srt_blobs = data[sort_idx]
142+
sort_idx = np.argsort(data_created)[::-1]
143+
144+
self._dates = [data_created[ii] for ii in sort_idx]
166145

167146
#: List of booleans indicating whether dataset was created by the user
168-
self.datasets_user_owned = [
169-
datasets[ii]["creator_user_id"] == self.user_id
170-
for ii in sort_idx
171-
]
147+
self.datasets_user_owned = [data_user[ii] for ii in sort_idx]
172148

173149
#: List of dataset IDs
174-
self._map_index_id = [datasets[idx]["id"] for idx in sort_idx]
150+
self._map_index_id = [data_ids[idx] for idx in sort_idx]
175151

176152
#: Dict of dataset ID indices
177153
self._map_id_index = {
178154
ds_id: ii for (ii, ds_id) in enumerate(self._map_index_id)}
179155

180-
#: Dict of dataset ID organizations
181-
self._map_id_org = {ds["id"]: ds["owner_org"] for ds in datasets}
182-
183156
#: List of dataset dictionaries
184157
self.datasets = SQLiteBackedROListOfDicts(
185158
sqlite_dbs=self._databases,
@@ -243,21 +216,18 @@ def search(self,
243216
if not norm_query:
244217
return [] # empty query -> no results
245218

246-
# `np.strings.find` works element‑wise on an array of strings
247-
# and returns the index of the first occurrence (or -1 if not found).
248-
# `self._srt_blobs[:, 2]` is the column that holds the lower‑cased
249-
# blobs.
250-
match_mask = np.strings.find(self._srt_blobs["blob"], norm_query) != -1
219+
data_ids = []
220+
for db in self._databases.values():
221+
data_ids += db.search(norm_query)
251222

252-
if limit is not None:
253-
# Remove all items that are above a threshold.
254-
# TODO: apply the limit already during search by chunking?
255-
idx_lim = np.where(np.cumsum(match_mask) > limit)[0]
256-
if idx_lim.size:
257-
match_mask[idx_lim.min():] = False
223+
if limit:
224+
# sort the data_ids according to their creation date
225+
data_dates = [
226+
self._dates[self._map_id_index[ds_id]] for ds_id in data_ids]
227+
sorter = np.argsort(data_dates)[::-1]
228+
data_ids = [data_ids[idx] for idx in sorter[:limit]]
258229

259-
idx_result = np.where(match_mask)[0]
260-
return [self.datasets[idx] for idx in idx_result]
230+
return [self[ds_id] for ds_id in data_ids]
261231

262232
def upsert_dataset(self, ds_dict: dict[str, Any]) -> None:
263233
"""Insert a new dataset or update an existing one
@@ -300,35 +270,19 @@ def _upsert_dataset_insert(self, ds_dict):
300270
m_created = ds_dict["metadata_created"]
301271

302272
blob = _create_blob_for_search(ds_dict)
303-
if len(blob) > int(self._srt_blobs.dtype["blob"].str[2:]):
304-
# Increase the search blob size.
305-
new_dtype = [("id", "<U36"),
306-
("created", "<U26"),
307-
("blob", f"<U{len(blob) + 10}")
308-
]
309-
else:
310-
new_dtype = self._srt_blobs.dtype
311273

312274
# registry
313275
self._registry_org.setdefault(org_id, []).append(ds_id)
314276

315277
# search array
316-
dates = np.array(self._srt_blobs["created"], copy=True)
317-
new_size = dates.size + 1
318-
new_idx = np.searchsorted(dates, m_created)
319-
# we can set refcheck to False, since we created a copy above
320-
dates.resize(new_size, refcheck=False)
321-
new_blobs = np.empty(new_size, dtype=new_dtype)
322-
new_blobs[:new_idx] = self._srt_blobs[:new_idx]
323-
new_blobs[new_idx] = (ds_id, m_created, blob)
324-
new_blobs[new_idx + 1:] = self._srt_blobs[new_idx:]
325-
self._srt_blobs = new_blobs
278+
new_idx = np.searchsorted(self._dates, m_created)
279+
self._dates.insert(new_idx, m_created)
326280

327281
# persistent database
328282
if org_id not in self._databases:
329283
self._databases[org_id] = SQLiteKeyJSONDatabase(
330284
db_name=self.base_dir / f"org_{org_id}.db")
331-
self._databases[org_id][ds_id] = ds_dict
285+
self._databases[org_id][ds_id] = (ds_dict, blob)
332286

333287
# user's dataset list
334288
self.datasets_user_owned.insert(
@@ -340,7 +294,7 @@ def _upsert_dataset_insert(self, ds_dict):
340294
self._map_index_id.insert(new_idx, ds_id)
341295

342296
# update the ds_id to index dictionary
343-
for idx in range(new_idx, new_size):
297+
for idx in range(new_idx, len(self._dates)):
344298
self._map_id_index[self._map_index_id[idx]] = idx
345299

346300
# insert the organization
@@ -357,23 +311,11 @@ def _upsert_dataset_update(self, ds_dict):
357311

358312
# registry does not need updating (only contains ds_id)
359313

360-
# Find the index in the database
361-
idx = np.where(self._srt_blobs["id"] == ds_id)[0][0]
362-
363314
# search array
364315
blob = _create_blob_for_search(ds_dict)
365-
if len(blob) > int(self._srt_blobs.dtype["blob"].str[2:]):
366-
# Rewrite the search blobs, because this blob is bigger than any
367-
# of the blobs before.
368-
new_dtype = [("id", "<U36"),
369-
("created", "<U26"),
370-
("blob", f"<U{len(blob) + 10}")
371-
]
372-
self._srt_blobs = np.array(self._srt_blobs, dtype=new_dtype)
373-
self._srt_blobs["blob"][idx] = blob
374316

375317
# persistent database
376-
self._databases[org_id][ds_id] = ds_dict
318+
self._databases[org_id][ds_id] = (ds_dict, blob)
377319

378320
def upsert_many(self, dataset_dicts, org_id=None):
379321
"""Insert or update multiple datasets at once
@@ -427,44 +369,28 @@ def _upsert_many_insert(self, org_id, dataset_dicts):
427369
"""
428370
ds_ids = [ds_dict["id"] for ds_dict in dataset_dicts]
429371
ms_created = [ds_dict["metadata_created"] for ds_dict in dataset_dicts]
430-
blobs_new = [
431-
_create_blob_for_search(ds_dict) for ds_dict in dataset_dicts]
432-
blob_max_len = max(len(b) for b in blobs_new)
433-
434-
if blob_max_len > int(self._srt_blobs.dtype["blob"].str[2:]):
435-
# Increase the search blob size.
436-
new_dtype = [("id", "<U36"),
437-
("created", "<U26"),
438-
("blob", f"<U{blob_max_len + 10}")
439-
]
440-
else:
441-
new_dtype = self._srt_blobs.dtype
442372

443373
# registry
444374
self._registry_org.setdefault(org_id, []).__add__(ds_ids)
445375

446376
# search array
447-
dates_cur = np.array(self._srt_blobs["created"], copy=True)
448-
size_old = dates_cur.size
377+
dates_cur = np.array(self._dates, copy=True)
449378
dates_new = np.array(ms_created)
450379
dates_comb = np.concatenate((dates_cur, dates_new))
451380
# sort according to dates descending
452381
sorter = np.argsort(dates_comb)[::-1]
453-
454-
new_blobs = np.empty(dates_comb.size, dtype=new_dtype)
455-
new_blobs[:size_old] = self._srt_blobs["blob"]
456-
new_blobs["blob"][size_old:] = blobs_new
457-
new_blobs["created"][size_old:] = dates_new
458-
new_blobs["id"][size_old:] = ds_ids
459-
new_blobs = new_blobs[sorter]
460-
461-
self._srt_blobs = new_blobs
382+
self._dates = [dates_comb[ii] for ii in sorter]
462383

463384
# persistent database
464385
if org_id not in self._databases:
465386
self._databases[org_id] = SQLiteKeyJSONDatabase(
466387
db_name=self.base_dir / f"org_{org_id}.db")
467-
self._databases[org_id].insert_many(dataset_dicts)
388+
# TODO: Do this in batches of 1000 to save memory?
389+
insert_data = [
390+
(ds_dict["id"],
391+
json.dumps(ds_dict),
392+
_create_blob_for_search(ds_dict)) for ds_dict in dataset_dicts]
393+
self._databases[org_id].insert_many(insert_data)
468394

469395
# user's dataset list
470396
datasets_user_owned_unsrt = (

dcoraid/dbmodel/meta_cache_sqlite.py

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ def __init__(self, db_name: str | pathlib.Path):
1818
self.conn = sqlite3.connect(db_name, check_same_thread=False)
1919
self.cursor = self.conn.cursor()
2020
self.cursor.execute("""
21-
CREATE TABLE IF NOT EXISTS key_value_store
22-
(key TEXT PRIMARY KEY, value TEXT)
21+
CREATE TABLE IF NOT EXISTS kv_store
22+
(key TEXT PRIMARY KEY, json_data TEXT, blob_data TEXT)
2323
""")
2424

2525
def __iter__(self):
26-
self.cursor.execute("SELECT value FROM key_value_store")
26+
self.cursor.execute("SELECT json_data FROM kv_store")
2727
while True:
2828
row = self.cursor.fetchone()
2929
if row is None:
@@ -39,71 +39,79 @@ def __exit__(self, exc_type, exc_val, exc_tb):
3939
def __getitem__(self, key):
4040
return self.read(key)
4141

42-
def __setitem__(self, key, value):
43-
self.create(key, value)
42+
def __setitem__(self, key, data):
43+
ddict, blob = data
44+
self.create(key, ddict, blob)
4445

45-
def create(self, key, value):
46+
def create(self, key: str, ddict: dict, blob: str):
4647
"""
47-
Create a new key-value pair in the database.
48+
Create a new key-value-blob triple in the database.
4849
49-
Args:
50-
key (str): The key for the new value.
51-
value (str): The value associated with the key.
50+
Parameters
51+
----------
52+
key
53+
The key for the new value.
54+
ddict
55+
Data dictionary that is encoded in JSON
56+
blob
57+
Text blob to store alongside data for searching
5258
53-
Returns:
54-
None
5559
"""
5660
self.cursor.execute(
57-
'INSERT OR REPLACE INTO key_value_store VALUES (?, ?)',
58-
(key, json.dumps(value)))
61+
'INSERT OR REPLACE INTO kv_store VALUES (?, ?, ?)',
62+
(key, json.dumps(ddict), blob))
5963
self.conn.commit()
6064

61-
def insert_many(self, dataset_dicts):
65+
def insert_many(self, db_insert):
6266
"""Insert multiple datasets at once to this database"""
63-
db_insert = [(ds["id"], json.dumps(ds)) for ds in dataset_dicts]
64-
self.cursor.executemany('INSERT INTO key_value_store VALUES (?, ?)',
67+
self.cursor.executemany('INSERT INTO kv_store VALUES (?, ?, ?)',
6568
db_insert)
6669
self.conn.commit()
6770

6871
def read(self, key):
6972
"""
7073
Read the value associated with a given key from the database.
7174
72-
Args:
73-
key (str): The key to read the value for.
75+
Parameters
76+
----------
77+
key
78+
The key to read the dictionary for.
7479
75-
Returns:
76-
str: The value associated with the key, or None
77-
if the key does not exist.
80+
Returns
81+
-------
82+
ddict
83+
The corresponding dictionary extracted from `json_data`
7884
"""
7985
self.cursor.execute(
80-
"SELECT value FROM key_value_store WHERE key = ?",
86+
"SELECT json_data FROM kv_store WHERE key = ?",
8187
(key,))
8288
result = self.cursor.fetchone()
8389
if result is None:
8490
raise KeyError(f"Key '{key}' does not exist.")
8591
return json.loads(result[0])
8692

93+
def search(self, query):
94+
"""Search for a string in the blob_data"""
95+
self.cursor.execute(
96+
'SELECT key FROM kv_store WHERE blob_data LIKE ?', (f'%{query}%',))
97+
return [item[0] for item in self.cursor.fetchall()]
98+
8799
def pop(self, key):
88100
"""
89-
Delete a key-value pair from the database.
90-
91-
Args:
92-
key (str): The key to delete.
101+
Delete a key from the database.
93102
94-
Returns:
95-
None
103+
Parameters
104+
----------
105+
key
106+
The key to delete.
96107
"""
97108
self.cursor.execute(
98-
'DELETE FROM key_value_store WHERE key = ?',
109+
'DELETE FROM kv_store WHERE key = ?',
99110
(key,))
100111
self.conn.commit()
101112

102113
def close(self):
103114
"""
104115
Close the database connection.
105-
106-
Returns:
107-
None
108116
"""
109117
self.conn.close()

0 commit comments

Comments
 (0)