Skip to content

Commit fe2f98d

Browse files
authored
Fix: Ensure atomic DB operations and consistency in global reclustering (#562)
1 parent 8980d72 commit fe2f98d

6 files changed

Lines changed: 186 additions & 122 deletions

File tree

backend/app/database/albums.py

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,7 @@
11
import sqlite3
22
import bcrypt
3-
import time
4-
from contextlib import contextmanager
53
from app.config.settings import DATABASE_PATH
6-
7-
8-
@contextmanager
9-
def get_db_connection():
10-
"""Context manager for database connections with proper error handling and retries"""
11-
max_retries = 3
12-
retry_delay = 0.1
13-
14-
for attempt in range(max_retries):
15-
try:
16-
conn = sqlite3.connect(DATABASE_PATH, timeout=30.0)
17-
# Enable WAL mode for better concurrency
18-
conn.execute("PRAGMA journal_mode=WAL")
19-
# Set busy timeout
20-
conn.execute("PRAGMA busy_timeout=30000")
21-
# Enable foreign keys
22-
conn.execute("PRAGMA foreign_keys=ON")
23-
24-
try:
25-
yield conn
26-
conn.commit()
27-
except Exception as e:
28-
conn.rollback()
29-
raise e
30-
finally:
31-
conn.close()
32-
break
33-
34-
except sqlite3.OperationalError as e:
35-
if "database is locked" in str(e).lower() and attempt < max_retries - 1:
36-
time.sleep(retry_delay * (2**attempt)) # Exponential backoff
37-
continue
38-
else:
39-
raise e
4+
from app.database.connection import get_db_connection
405

416

427
def db_create_albums_table() -> None:

backend/app/database/connection.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import sqlite3
2+
from contextlib import contextmanager
3+
from typing import Generator
4+
from app.config.settings import DATABASE_PATH
5+
6+
7+
@contextmanager
8+
def get_db_connection() -> Generator[sqlite3.Connection, None, None]:
9+
"""
10+
SQLite connection context manager with all integrity constraints enforced.
11+
12+
- Enables all major relational integrity PRAGMAs
13+
- Works for both single and multi-step transactions
14+
- Automatically commits on success or rolls back on failure
15+
"""
16+
conn = sqlite3.connect(DATABASE_PATH)
17+
18+
# --- Strict enforcement of all relational and logical rules ---
19+
conn.execute("PRAGMA foreign_keys = ON;") # Enforce FK constraints
20+
conn.execute("PRAGMA ignore_check_constraints = OFF;") # Enforce CHECK constraints
21+
conn.execute("PRAGMA recursive_triggers = ON;") # Allow nested triggers
22+
conn.execute("PRAGMA defer_foreign_keys = OFF;") # Immediate FK checking
23+
conn.execute("PRAGMA case_sensitive_like = ON;") # Make LIKE case-sensitive
24+
25+
try:
26+
yield conn
27+
conn.commit()
28+
except Exception:
29+
conn.rollback()
30+
raise
31+
finally:
32+
conn.close()

backend/app/database/face_clusters.py

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,57 @@ def db_create_clusters_table() -> None:
3939
conn.close()
4040

4141

42-
def db_insert_clusters_batch(clusters: List[ClusterData]) -> List[ClusterId]:
42+
def db_delete_all_clusters(cursor: Optional[sqlite3.Cursor] = None) -> int:
43+
"""
44+
Delete all clusters from the database.
45+
46+
Args:
47+
cursor: Optional existing database cursor. If None, creates a new connection.
48+
49+
Returns:
50+
Number of deleted clusters
51+
"""
52+
own_connection = cursor is None
53+
if own_connection:
54+
conn = sqlite3.connect(DATABASE_PATH)
55+
cursor = conn.cursor()
56+
57+
try:
58+
cursor.execute("DELETE FROM face_clusters")
59+
deleted_count = cursor.rowcount
60+
if own_connection:
61+
conn.commit()
62+
return deleted_count
63+
except Exception:
64+
if own_connection:
65+
conn.rollback()
66+
print("Error deleting all clusters.")
67+
raise
68+
finally:
69+
if own_connection:
70+
conn.close()
71+
72+
73+
def db_insert_clusters_batch(
74+
clusters: List[ClusterData], cursor: Optional[sqlite3.Cursor] = None
75+
) -> List[ClusterId]:
4376
"""
4477
Insert multiple clusters into the database in batch.
4578
4679
Args:
4780
clusters: List of ClusterData objects containing cluster information.
48-
cluster_id: should be provided as UUID strings.
81+
cursor: Optional existing database cursor. If None, creates a new connection.
4982
5083
Returns:
5184
List of cluster IDs of the newly created clusters
5285
"""
5386
if not clusters:
5487
return []
5588

56-
conn = sqlite3.connect(DATABASE_PATH)
57-
cursor = conn.cursor()
89+
own_connection = cursor is None
90+
if own_connection:
91+
conn = sqlite3.connect(DATABASE_PATH)
92+
cursor = conn.cursor()
5893

5994
try:
6095
cluster_ids = []
@@ -72,14 +107,20 @@ def db_insert_clusters_batch(clusters: List[ClusterData]) -> List[ClusterId]:
72107
"""
73108
INSERT INTO face_clusters (cluster_id, cluster_name, face_image_base64)
74109
VALUES (?, ?, ?)
75-
""",
110+
""",
76111
insert_data,
77112
)
78113

79-
conn.commit()
114+
if own_connection:
115+
conn.commit()
80116
return cluster_ids
117+
except Exception:
118+
if own_connection:
119+
conn.rollback()
120+
raise
81121
finally:
82-
conn.close()
122+
if own_connection:
123+
conn.close()
83124

84125

85126
def db_get_cluster_by_id(cluster_id: ClusterId) -> Optional[ClusterData]:
@@ -145,18 +186,24 @@ def db_get_all_clusters() -> List[ClusterData]:
145186
def db_update_cluster(
146187
cluster_id: ClusterId,
147188
cluster_name: Optional[ClusterName] = None,
189+
conn: Optional[sqlite3.Connection] = None,
148190
) -> bool:
149191
"""
150192
Update an existing cluster.
151193
152194
Args:
153195
cluster_id: The ID of the cluster to update
154196
cluster_name: New cluster name (optional)
197+
conn: Optional existing database connection. If None, creates a new connection.
155198
156199
Returns:
157200
True if the cluster was updated, False if not found
158201
"""
159-
conn = sqlite3.connect(DATABASE_PATH)
202+
# Use provided connection or create a new one
203+
own_connection = conn is None
204+
if own_connection:
205+
conn = sqlite3.connect(DATABASE_PATH)
206+
160207
cursor = conn.cursor()
161208

162209
try:
@@ -185,26 +232,6 @@ def db_update_cluster(
185232
conn.close()
186233

187234

188-
def db_delete_all_clusters() -> int:
189-
"""
190-
Delete all entries from the face_clusters table.
191-
192-
Returns:
193-
Number of clusters deleted
194-
"""
195-
conn = sqlite3.connect(DATABASE_PATH)
196-
cursor = conn.cursor()
197-
198-
try:
199-
cursor.execute("DELETE FROM face_clusters")
200-
201-
deleted_count = cursor.rowcount
202-
conn.commit()
203-
return deleted_count
204-
finally:
205-
conn.close()
206-
207-
208235
def db_get_all_clusters_with_face_counts() -> (
209236
List[Dict[str, Union[str, Optional[str], int]]]
210237
):

backend/app/database/faces.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,13 +287,15 @@ def db_get_all_faces_with_cluster_names() -> (
287287

288288
def db_update_face_cluster_ids_batch(
289289
face_cluster_mapping: List[Dict[str, Union[FaceId, ClusterId]]],
290+
cursor: Optional[sqlite3.Cursor] = None,
290291
) -> None:
291292
"""
292293
Update cluster IDs for multiple faces in batch.
293294
294295
Args:
295296
face_cluster_mapping: List of dictionaries containing face_id and cluster_id pairs
296297
Each dict should have keys: 'face_id' and 'cluster_id'
298+
cursor: Optional existing database cursor. If None, creates a new connection.
297299
298300
Example:
299301
face_cluster_mapping = [
@@ -305,8 +307,10 @@ def db_update_face_cluster_ids_batch(
305307
if not face_cluster_mapping:
306308
return
307309

308-
conn = sqlite3.connect(DATABASE_PATH)
309-
cursor = conn.cursor()
310+
own_connection = cursor is None
311+
if own_connection:
312+
conn = sqlite3.connect(DATABASE_PATH)
313+
cursor = conn.cursor()
310314

311315
try:
312316
# Prepare update data as tuples (cluster_id, face_id)
@@ -325,9 +329,16 @@ def db_update_face_cluster_ids_batch(
325329
update_data,
326330
)
327331

328-
conn.commit()
332+
if own_connection:
333+
conn.commit()
334+
except Exception:
335+
if own_connection:
336+
conn.rollback()
337+
print("Error updating face cluster IDs in batch.")
338+
raise
329339
finally:
330-
conn.close()
340+
if own_connection:
341+
conn.close()
331342

332343

333344
def db_get_cluster_mean_embeddings() -> List[Dict[str, Union[str, FaceEmbedding]]]:

backend/app/database/metadata.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,23 @@ def db_get_metadata() -> Optional[Dict[str, Any]]:
5555
conn.close()
5656

5757

58-
def db_update_metadata(metadata: Dict[str, Any]) -> bool:
58+
def db_update_metadata(
59+
metadata: Dict[str, Any], cursor: Optional[sqlite3.Cursor] = None
60+
) -> bool:
5961
"""
6062
Update the metadata in the database.
6163
6264
Args:
6365
metadata: Dictionary containing metadata to store
66+
cursor: Optional existing database cursor. If None, creates a new connection.
6467
6568
Returns:
6669
True if the metadata was updated, False otherwise
6770
"""
68-
conn = sqlite3.connect(DATABASE_PATH)
69-
cursor = conn.cursor()
71+
own_connection = cursor is None
72+
if own_connection:
73+
conn = sqlite3.connect(DATABASE_PATH)
74+
cursor = conn.cursor()
7075

7176
try:
7277
metadata_json = json.dumps(metadata)
@@ -75,8 +80,16 @@ def db_update_metadata(metadata: Dict[str, Any]) -> bool:
7580
cursor.execute("DELETE FROM metadata")
7681
cursor.execute("INSERT INTO metadata (metadata) VALUES (?)", (metadata_json,))
7782

78-
updated = cursor.rowcount > 0
79-
conn.commit()
80-
return updated
83+
success = cursor.rowcount > 0
84+
if own_connection:
85+
conn.commit()
86+
return success
87+
except Exception as e:
88+
if own_connection:
89+
conn.rollback()
90+
91+
print(f"Error updating metadata: {e}")
92+
raise
8193
finally:
82-
conn.close()
94+
if own_connection:
95+
conn.close()

0 commit comments

Comments
 (0)