diff --git a/backend/app/database/faces.py b/backend/app/database/faces.py index 0e43f7117..a51f31001 100644 --- a/backend/app/database/faces.py +++ b/backend/app/database/faces.py @@ -1,52 +1,56 @@ import sqlite3 import json import numpy as np -from typing import Optional, List, Dict, Union, TypedDict +from typing import Optional, List, Dict, Union, TypedDict, Any from app.config.settings import DATABASE_PATH # Type definitions FaceId = int ImageId = str -ClusterId = int -BoundingBox = Dict[str, Union[int, float]] -FaceEmbedding = np.ndarray +ClusterId = str +FaceEmbedding = np.ndarray # 512-dim vector +BoundingBox = Dict[str, int] # {'x': int, 'y': int, 'width': int, 'height': int} class FaceData(TypedDict): - """Represents the full faces table structure""" - face_id: FaceId image_id: ImageId - cluster_id: Optional[ClusterId] - embeddings: FaceEmbedding # Numpy array in application, stored as JSON string in DB + embeddings: FaceEmbedding confidence: Optional[float] bbox: Optional[BoundingBox] + cluster_id: Optional[ClusterId] -FaceClusterMapping = Dict[FaceId, Optional[ClusterId]] +def get_db_conn(): + """Helper to get connection with Foreign Keys enabled.""" + conn = sqlite3.connect(DATABASE_PATH) + conn.execute("PRAGMA foreign_keys = ON") + return conn def db_create_faces_table() -> None: + """Create the faces table if it doesn't exist.""" conn = None try: - conn = sqlite3.connect(DATABASE_PATH) - conn.execute("PRAGMA foreign_keys = ON") + conn = get_db_conn() cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS faces ( face_id INTEGER PRIMARY KEY AUTOINCREMENT, - image_id TEXT, - cluster_id INTEGER, - embeddings TEXT, + image_id TEXT NOT NULL, + cluster_id TEXT, + embeddings JSON NOT NULL, confidence REAL, - bbox TEXT, - FOREIGN KEY (image_id) REFERENCES images(id) ON DELETE CASCADE, - FOREIGN KEY (cluster_id) REFERENCES face_clusters(cluster_id) ON DELETE SET NULL + bbox JSON, + FOREIGN KEY(image_id) REFERENCES images(id) ON DELETE CASCADE ) """ ) conn.commit() + except sqlite3.Error as e: + print(f"Error creating faces table: {e}") + raise finally: if conn is not None: conn.close() @@ -58,118 +62,172 @@ def db_insert_face_embeddings( confidence: Optional[float] = None, bbox: Optional[BoundingBox] = None, cluster_id: Optional[ClusterId] = None, -) -> FaceId: +) -> Optional[FaceId]: """ Insert face embeddings with additional metadata. - - - Args: - image_id: ID of the image this face belongs to - embeddings: Face embedding vector (numpy array) - confidence: Confidence score for face detection (optional) - bbox: Bounding box coordinates as dict with keys: x, y, width, height (optional) - cluster_id: ID of the face cluster this face belongs to (optional) """ - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() - + conn = None try: - embeddings_json = json.dumps([emb.tolist() for emb in embeddings]) + conn = get_db_conn() + cursor = conn.cursor() - # Convert bbox to JSON string if provided + embeddings_json = json.dumps(embeddings.tolist()) bbox_json = json.dumps(bbox) if bbox is not None else None cursor.execute( """ INSERT INTO faces (image_id, cluster_id, embeddings, confidence, bbox) VALUES (?, ?, ?, ?, ?) - """, + """, (image_id, cluster_id, embeddings_json, confidence, bbox_json), ) face_id = cursor.lastrowid conn.commit() return face_id + except sqlite3.Error as e: + print(f"Error inserting face embeddings: {e}") + if conn: + conn.rollback() + return None finally: - conn.close() + if conn is not None: + conn.close() def db_insert_face_embeddings_by_image_id( image_id: ImageId, embeddings: Union[FaceEmbedding, List[FaceEmbedding]], - confidence: Optional[Union[float, List[float]]] = None, - bbox: Optional[Union[BoundingBox, List[BoundingBox]]] = None, - cluster_id: Optional[Union[ClusterId, List[ClusterId]]] = None, -) -> Union[FaceId, List[FaceId]]: + confidence: Optional[Union[float, List[Optional[float]]]] = None, + bbox: Optional[Union[BoundingBox, List[Optional[BoundingBox]]]] = None, + cluster_id: Optional[Union[ClusterId, List[Optional[ClusterId]]]] = None, +) -> Union[Optional[FaceId], List[Optional[FaceId]]]: """ - Insert face embeddings using image path (convenience function). - - Args: - image_id: Image ID (uuid string) - embeddings: Face embedding vector (numpy array) or list of embeddings - confidence: Confidence score(s) for face detection (optional) - bbox: Bounding box coordinates or list of bounding boxes (optional) - cluster_id: Cluster ID(s) for the face(s) (optional) + Insert face embeddings with robust input handling. + Safely handles single arrays, lists of arrays, and 2D numpy arrays. """ + + # 1. Handle Empty List (Prevent Crash) + if isinstance(embeddings, list) and len(embeddings) == 0: + return [] + + # 2. Handle 2D Numpy Array (Fixes CodeRabbit Issue) + # If input is (N, 512), convert it to list of N arrays + if isinstance(embeddings, np.ndarray) and embeddings.ndim == 2: + embeddings = list(embeddings) - # Handle multiple faces in one image - if ( - isinstance(embeddings, list) - and len(embeddings) > 0 - and isinstance(embeddings[0], np.ndarray) - ): - face_ids = [] + # Check if we are handling a list of embeddings + is_list_input = isinstance(embeddings, list) and len(embeddings) > 0 and isinstance(embeddings[0], np.ndarray) + + if is_list_input: + face_ids: List[Optional[FaceId]] = [] for i, emb in enumerate(embeddings): - conf = ( - confidence[i] - if isinstance(confidence, list) and i < len(confidence) - else confidence - ) - bb = bbox[i] if isinstance(bbox, list) and i < len(bbox) else bbox - cid = ( - cluster_id[i] - if isinstance(cluster_id, list) and i < len(cluster_id) - else cluster_id - ) + # Extract single confidence value safely + conf: Optional[float] = None + if isinstance(confidence, list) and i < len(confidence): + conf = confidence[i] + elif isinstance(confidence, (int, float)): + conf = float(confidence) + + # Extract single bbox value safely + bb: Optional[BoundingBox] = None + if isinstance(bbox, list) and i < len(bbox): + bb = bbox[i] + elif isinstance(bbox, dict): + bb = bbox + + # Extract single cluster_id value safely + cid: Optional[ClusterId] = None + if isinstance(cluster_id, list) and i < len(cluster_id): + cid = cluster_id[i] + elif isinstance(cluster_id, str): + cid = cluster_id + face_id = db_insert_face_embeddings(image_id, emb, conf, bb, cid) face_ids.append(face_id) return face_ids else: - # Single face + # Single face - extract single values from potential lists + single_conf: Optional[float] = None + if isinstance(confidence, list) and len(confidence) > 0: + single_conf = confidence[0] + elif isinstance(confidence, (int, float)): + single_conf = float(confidence) + + single_bbox: Optional[BoundingBox] = None + if isinstance(bbox, list) and len(bbox) > 0: + single_bbox = bbox[0] + elif isinstance(bbox, dict): + single_bbox = bbox + + single_cid: Optional[ClusterId] = None + if isinstance(cluster_id, list) and len(cluster_id) > 0: + single_cid = cluster_id[0] + elif isinstance(cluster_id, str): + single_cid = cluster_id + return db_insert_face_embeddings( - image_id, embeddings, confidence, bbox, cluster_id + image_id, embeddings, single_conf, single_bbox, single_cid ) -def get_all_face_embeddings(): - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() - +def get_all_face_embeddings() -> List[Dict[str, Any]]: + """ + Get all face embeddings with associated image data. + Filters out corrupted records where embeddings are missing. + """ + conn = None try: + conn = get_db_conn() + cursor = conn.cursor() + + # Step 1: Get all faces with their image data cursor.execute( """ SELECT + f.face_id, f.embeddings, f.bbox, - i.id, - i.path, - i.folder_id, + i.id, + i.path, + i.folder_id, i.thumbnailPath, i.metadata, - i.isTagged, - m.name as tag_name + i.isTagged + FROM faces f + JOIN images i ON f.image_id = i.id + ORDER BY i.path, f.face_id + """ + ) + face_results = cursor.fetchall() + + # Step 2: Get tags for all images that have faces + cursor.execute( + """ + SELECT DISTINCT i.id, m.name as tag_name FROM faces f - JOIN images i ON f.image_id=i.id + JOIN images i ON f.image_id = i.id LEFT JOIN image_classes ic ON i.id = ic.image_id LEFT JOIN mappings m ON ic.class_id = m.class_id - """ + WHERE m.name IS NOT NULL + """ ) - results = cursor.fetchall() + tag_results = cursor.fetchall() from app.utils.images import image_util_parse_metadata - images_dict = {} + # Build a mapping of image_id -> list of tags + image_tags: Dict[str, List[str]] = {} + for image_id, tag_name in tag_results: + if image_id not in image_tags: + image_tags[image_id] = [] + if tag_name not in image_tags[image_id]: + image_tags[image_id].append(tag_name) + + # Step 3: Construct the result list + faces: List[Dict[str, Any]] = [] for ( + face_id, embeddings, bbox, image_id, @@ -178,99 +236,98 @@ def get_all_face_embeddings(): thumbnail_path, metadata, is_tagged, - tag_name, - ) in results: - if image_id not in images_dict: - try: - embeddings_json = json.loads(embeddings) - bbox_json = json.loads(bbox) - except json.JSONDecodeError: + ) in face_results: + try: + # CRITICAL FIX: Handle Null/Empty embeddings to prevent downstream crashes + if not embeddings: + continue + + embeddings_json = json.loads(embeddings) + bbox_json = json.loads(bbox) if bbox else None + + if embeddings_json is None: continue - images_dict[image_id] = { - "embeddings": embeddings_json, - "bbox": bbox_json, - "id": image_id, - "path": path, - "folder_id": folder_id, - "thumbnailPath": thumbnail_path, - "metadata": image_util_parse_metadata(metadata), - "isTagged": bool(is_tagged), - "tags": [], - } - - # Add tag if it exists - if tag_name: - images_dict[image_id]["tags"].append(tag_name) - # Convert to list and set tags to None if empty - images = [] - for image_data in images_dict.values(): - if not image_data["tags"]: - image_data["tags"] = None - images.append(image_data) + except json.JSONDecodeError: + print(f"Error decoding JSON for face {face_id}") + continue + + # Attach tags belonging to this image + tags = image_tags.get(image_id) + if tags is not None and len(tags) == 0: + tags = None + + faces.append({ + "face_id": face_id, + "embeddings": embeddings_json, + "bbox": bbox_json, + # Image Metadata + "id": image_id, + "path": path, + "folder_id": folder_id, + "thumbnailPath": thumbnail_path, + "metadata": image_util_parse_metadata(metadata), + "isTagged": bool(is_tagged), + "tags": tags, + }) - # Sort by path - images.sort(key=lambda x: x["path"]) - return images + return faces + except sqlite3.Error as e: + print(f"Error getting face embeddings: {e}") + return [] finally: - conn.close() + if conn is not None: + conn.close() def db_get_faces_unassigned_clusters() -> List[Dict[str, Union[FaceId, FaceEmbedding]]]: - """ - Get all faces that don't have assigned clusters. - - Returns: - List of dictionaries containing face_id and embeddings (as numpy array) - """ - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() - + """Get faces that haven't been assigned to a cluster yet.""" + conn = None try: - cursor.execute("SELECT face_id, embeddings FROM faces WHERE cluster_id IS NULL") + conn = get_db_conn() + cursor = conn.cursor() + cursor.execute("SELECT face_id, embeddings FROM faces WHERE cluster_id IS NULL") rows = cursor.fetchall() - faces = [] + faces: List[Dict[str, Union[FaceId, FaceEmbedding]]] = [] for row in rows: face_id, embeddings_json = row - # Convert JSON string back to numpy array + if not embeddings_json: + continue embeddings = np.array(json.loads(embeddings_json)) faces.append({"face_id": face_id, "embeddings": embeddings}) return faces + except sqlite3.Error as e: + print(f"Error getting unassigned faces: {e}") + return [] finally: - conn.close() - - -def db_get_all_faces_with_cluster_names() -> ( - List[Dict[str, Union[FaceId, FaceEmbedding, Optional[str]]]] -): - """ - Get all faces with their corresponding cluster names. + if conn is not None: + conn.close() - Returns: - List of dictionaries containing face_id, embeddings (as numpy array), and cluster_name - """ - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() +def db_get_all_faces_with_cluster_names() -> List[Dict[str, Union[FaceId, FaceEmbedding, Optional[str]]]]: + """Get all faces with their cluster names.""" + conn = None try: + conn = get_db_conn() + cursor = conn.cursor() + cursor.execute( """ SELECT f.face_id, f.embeddings, fc.cluster_name FROM faces f LEFT JOIN face_clusters fc ON f.cluster_id = fc.cluster_id - ORDER BY f.face_id """ ) - rows = cursor.fetchall() - faces = [] + faces: List[Dict[str, Union[FaceId, FaceEmbedding, Optional[str]]]] = [] for row in rows: face_id, embeddings_json, cluster_name = row - # Convert JSON string back to numpy array + if not embeddings_json: + continue embeddings = np.array(json.loads(embeddings_json)) faces.append( { @@ -281,40 +338,34 @@ def db_get_all_faces_with_cluster_names() -> ( ) return faces + except sqlite3.Error as e: + print(f"Error getting faces with cluster names: {e}") + return [] finally: - conn.close() + if conn is not None: + conn.close() def db_update_face_cluster_ids_batch( - face_cluster_mapping: List[Dict[str, Union[FaceId, ClusterId]]], + face_cluster_mapping: List[Dict[str, Union[FaceId, ClusterId, None]]], cursor: Optional[sqlite3.Cursor] = None, ) -> None: - """ - Update cluster IDs for multiple faces in batch. - - Args: - face_cluster_mapping: List of dictionaries containing face_id and cluster_id pairs - Each dict should have keys: 'face_id' and 'cluster_id' - cursor: Optional existing database cursor. If None, creates a new connection. - - Example: - face_cluster_mapping = [ - {'face_id': 1, 'cluster_id': 'uuid-cluster-1'}, - {'face_id': 2, 'cluster_id': 'uuid-cluster-2'}, - {'face_id': 3, 'cluster_id': None} # To unassign cluster - ] - """ + """Update cluster IDs for multiple faces in batch.""" if not face_cluster_mapping: return + conn = None own_connection = cursor is None + if own_connection: - conn = sqlite3.connect(DATABASE_PATH) + conn = get_db_conn() cursor = conn.cursor() + if cursor is None: + raise ValueError("Database cursor is required") + try: - # Prepare update data as tuples (cluster_id, face_id) - update_data = [] + update_data: List[tuple] = [] for mapping in face_cluster_mapping: face_id = mapping.get("face_id") cluster_id = mapping.get("cluster_id") @@ -322,73 +373,67 @@ def db_update_face_cluster_ids_batch( cursor.executemany( """ - UPDATE faces - SET cluster_id = ? + UPDATE faces + SET cluster_id = ? WHERE face_id = ? """, update_data, ) - if own_connection: + if own_connection and conn: conn.commit() - except Exception: - if own_connection: + except sqlite3.Error as e: + if own_connection and conn: conn.rollback() - print("Error updating face cluster IDs in batch.") + print(f"Error updating face cluster IDs in batch: {e}") raise finally: - if own_connection: + if own_connection and conn: conn.close() -def db_get_cluster_mean_embeddings() -> List[Dict[str, Union[str, FaceEmbedding]]]: - """ - Get cluster IDs and their corresponding mean face embeddings. - - Returns: - List of dictionaries containing cluster_id and mean_embedding (as numpy array) - Only returns clusters that have at least one face assigned - """ - conn = sqlite3.connect(DATABASE_PATH) - cursor = conn.cursor() - +def db_get_cluster_mean_embeddings() -> List[Dict[str, Union[int, FaceEmbedding]]]: + """Get mean embeddings for each cluster.""" + conn = None try: + conn = get_db_conn() + cursor = conn.cursor() + cursor.execute( """ SELECT f.cluster_id, f.embeddings FROM faces f WHERE f.cluster_id IS NOT NULL - ORDER BY f.cluster_id """ ) - rows = cursor.fetchall() if not rows: return [] - # Group embeddings by cluster_id - cluster_embeddings = {} + cluster_embeddings: Dict[str, List[np.ndarray]] = {} for row in rows: cluster_id, embeddings_json = row - # Convert JSON string back to numpy array + if not embeddings_json: + continue embeddings = np.array(json.loads(embeddings_json)) if cluster_id not in cluster_embeddings: cluster_embeddings[cluster_id] = [] cluster_embeddings[cluster_id].append(embeddings) - # Calculate mean embeddings for each cluster - cluster_means = [] + cluster_means: List[Dict[str, Union[int, FaceEmbedding]]] = [] for cluster_id, embeddings_list in cluster_embeddings.items(): - # Stack all embeddings for this cluster and calculate mean stacked_embeddings = np.stack(embeddings_list) mean_embedding = np.mean(stacked_embeddings, axis=0) - cluster_means.append( {"cluster_id": cluster_id, "mean_embedding": mean_embedding} ) return cluster_means + except sqlite3.Error as e: + print(f"Error getting cluster mean embeddings: {e}") + return [] finally: - conn.close() + if conn is not None: + conn.close() \ No newline at end of file