The CollectionDatabaseManager class provides a Python API for interacting with the SQLite database that stores all collected research content. This is the primary interface for database operations in the Multi-Modal Academic Research System.
Module: multi_modal_rag.database
Class: CollectionDatabaseManager
File: multi_modal_rag/database/db_manager.py
from multi_modal_rag.database import CollectionDatabaseManagerInitialize the database manager and create database schema if it doesn't exist.
Parameters:
db_path(str, optional): Path to the SQLite database file. Defaults to"data/collections.db"
Returns: CollectionDatabaseManager instance
Side Effects:
- Creates database directory if it doesn't exist
- Initializes all database tables if they don't exist
- Logs initialization message
Example:
from multi_modal_rag.database import CollectionDatabaseManager
# Use default database path
db = CollectionDatabaseManager()
# Use custom database path
db = CollectionDatabaseManager(db_path="custom/path/research.db")Raises:
OSError: If unable to create database directorysqlite3.Error: If database initialization fails
add_collection(content_type: str, title: str, source: str, url: str, metadata: Dict, indexed: bool = False) -> int
Add a new collection item to the database.
Parameters:
content_type(str): Type of content - must be'paper','video', or'podcast'title(str): Title of the contentsource(str): Source platform (e.g.,'arxiv','youtube','podcast_rss')url(str): Original URL of the contentmetadata(Dict): Additional metadata as a dictionary (will be JSON-encoded)indexed(bool, optional): Whether content is indexed in OpenSearch. Defaults toFalse
Returns: int - The auto-generated collection ID
Raises:
Exception: If database insertion fails (propagates after rollback)
Example:
db = CollectionDatabaseManager()
# Add a paper
collection_id = db.add_collection(
content_type='paper',
title='Attention Is All You Need',
source='arxiv',
url='https://arxiv.org/abs/1706.03762',
metadata={
'keywords': ['transformers', 'attention'],
'language': 'en'
},
indexed=False
)
print(f"Created collection with ID: {collection_id}")Notes:
- The
metadatadictionary is automatically JSON-encoded for storage collection_dateis automatically set to current timestampstatusis automatically set to'collected'
Retrieve all collections with pagination support.
Parameters:
limit(int, optional): Maximum number of results to return. Defaults to100offset(int, optional): Number of records to skip (for pagination). Defaults to0
Returns: List[Dict] - List of collection dictionaries, ordered by collection_date DESC
Example:
# Get first 100 collections
collections = db.get_all_collections()
# Get next 100 collections (pagination)
collections_page2 = db.get_all_collections(limit=100, offset=100)
# Get only 10 collections
recent_collections = db.get_all_collections(limit=10)
for collection in collections:
print(f"{collection['id']}: {collection['title']}")
print(f" Type: {collection['content_type']}")
print(f" Source: {collection['source']}")
print(f" Metadata: {collection['metadata']}") # Already parsed from JSONReturn Format:
[
{
'id': 1,
'content_type': 'paper',
'title': 'Attention Is All You Need',
'source': 'arxiv',
'url': 'https://arxiv.org/abs/1706.03762',
'collection_date': '2025-10-01 10:30:00',
'metadata': {'keywords': ['transformers']}, # Parsed from JSON
'status': 'collected',
'indexed': 1
},
...
]Notes:
- The
metadatafield is automatically parsed from JSON to a Python dictionary - Results are ordered by
collection_datein descending order (newest first)
Retrieve collections filtered by content type.
Parameters:
content_type(str): Type to filter by - must be'paper','video', or'podcast'limit(int, optional): Maximum number of results. Defaults to100
Returns: List[Dict] - List of matching collection dictionaries, ordered by collection_date DESC
Example:
# Get all papers
papers = db.get_collections_by_type('paper', limit=50)
# Get all videos
videos = db.get_collections_by_type('video')
# Get all podcasts
podcasts = db.get_collections_by_type('podcast', limit=20)
print(f"Found {len(papers)} papers")
for paper in papers:
print(f" - {paper['title']}")Return Format: Same as get_all_collections()
Retrieve full collection details including type-specific data from related tables.
Parameters:
collection_id(int): The unique ID of the collection
Returns:
Dict- Collection dictionary with additional'details'key containing type-specific dataNone- If collection doesn't exist
Example:
# Get paper with details
collection = db.get_collection_with_details(1)
if collection:
print(f"Title: {collection['title']}")
print(f"Type: {collection['content_type']}")
# Type-specific details
if collection['content_type'] == 'paper':
details = collection['details']
print(f"Authors: {', '.join(details['authors'])}")
print(f"ArXiv ID: {details['arxiv_id']}")
print(f"Abstract: {details['abstract'][:100]}...")
print(f"PDF Path: {details['pdf_path']}")
elif collection['content_type'] == 'video':
details = collection['details']
print(f"Channel: {details['channel']}")
print(f"Video ID: {details['video_id']}")
print(f"Duration: {details['duration']} seconds")
print(f"Views: {details['views']}")
elif collection['content_type'] == 'podcast':
details = collection['details']
print(f"Podcast: {details['podcast_name']}")
print(f"Episode ID: {details['episode_id']}")
print(f"Duration: {details['duration']} seconds")
else:
print("Collection not found")Return Format for Paper:
{
'id': 1,
'content_type': 'paper',
'title': 'Attention Is All You Need',
'source': 'arxiv',
'url': 'https://arxiv.org/abs/1706.03762',
'collection_date': '2025-10-01 10:30:00',
'metadata': {},
'status': 'collected',
'indexed': 1,
'details': {
'id': 1,
'collection_id': 1,
'arxiv_id': '1706.03762',
'pmc_id': None,
'abstract': 'The dominant sequence transduction models...',
'authors': ['Ashish Vaswani', 'Noam Shazeer'], # Parsed from JSON
'published_date': '2017-06-12',
'categories': ['cs.CL', 'cs.AI'], # Parsed from JSON
'pdf_path': 'data/papers/arxiv_1706.03762.pdf'
}
}Notes:
- For papers,
authorsandcategoriesin details are automatically parsed from JSON - If collection exists but has no type-specific record,
'details'key will not be present
Search collections by title or source using substring matching.
Parameters:
query(str): Search query stringlimit(int, optional): Maximum number of results. Defaults to50
Returns: List[Dict] - List of matching collection dictionaries, ordered by collection_date DESC
Example:
# Search for "transformer"
results = db.search_collections('transformer', limit=20)
# Search for "arxiv" (searches in source field too)
arxiv_papers = db.search_collections('arxiv')
# Search for "neural network"
nn_results = db.search_collections('neural network')
print(f"Found {len(results)} results")
for result in results:
print(f" {result['content_type']}: {result['title']}")Search Behavior:
- Case-insensitive substring matching
- Searches both
titleANDsourcefields - Uses SQL
LIKE '%query%'pattern - Returns results ordered by newest first
Return Format: Same as get_all_collections()
Add paper-specific data to the papers table.
Parameters:
collection_id(int): ID of the parent collection recordpaper_data(Dict): Dictionary containing paper details
Paper Data Fields:
arxiv_id(str, optional): ArXiv identifierpmc_id(str, optional): PubMed Central identifierabstract(str, optional): Paper abstractauthors(List[str], optional): List of author namespublished(str, optional): Publication datecategories(List[str], optional): List of category tagslocal_path(str, optional): Path to downloaded PDF
Returns: None
Raises:
Exception: If database insertion fails (propagates after rollback)
Example:
# First, create the collection
collection_id = db.add_collection(
content_type='paper',
title='Attention Is All You Need',
source='arxiv',
url='https://arxiv.org/abs/1706.03762',
metadata={}
)
# Then add paper-specific details
db.add_paper(collection_id, {
'arxiv_id': '1706.03762',
'pmc_id': None,
'abstract': 'The dominant sequence transduction models are based on complex recurrent or convolutional neural networks...',
'authors': ['Ashish Vaswani', 'Noam Shazeer', 'Niki Parmar', 'Jakob Uszkoreit'],
'published': '2017-06-12',
'categories': ['cs.CL', 'cs.AI', 'cs.LG'],
'local_path': 'data/papers/arxiv_1706.03762.pdf'
})Notes:
authorsandcategorieslists are automatically JSON-encoded for storage- All fields are optional (nullable in database)
Add video-specific data to the videos table.
Parameters:
collection_id(int): ID of the parent collection recordvideo_data(Dict): Dictionary containing video details
Video Data Fields:
video_id(str, optional): YouTube video IDauthor(str, optional): Channel name (mapped tochannelfield)length(int, optional): Duration in seconds (mapped todurationfield)views(int, optional): View countthumbnail_url(str, optional): Thumbnail image URLtranscript(str/bool, optional): Transcript text or boolean indicating availability
Returns: None
Raises:
Exception: If database insertion fails (propagates after rollback)
Example:
# Create collection
collection_id = db.add_collection(
content_type='video',
title='Neural Networks Explained',
source='youtube',
url='https://youtube.com/watch?v=abc123',
metadata={}
)
# Add video details
db.add_video(collection_id, {
'video_id': 'abc123',
'author': '3Blue1Brown',
'length': 1194,
'views': 5234891,
'thumbnail_url': 'https://i.ytimg.com/vi/abc123/maxresdefault.jpg',
'transcript': 'Hello and welcome to this video about neural networks...'
})Notes:
authoris mapped to thechanneldatabase fieldlengthis mapped to thedurationdatabase fieldtranscript_availableis set based on whethertranscriptfield is truthy- All fields are optional (nullable in database)
Add podcast-specific data to the podcasts table.
Parameters:
collection_id(int): ID of the parent collection recordpodcast_data(Dict): Dictionary containing podcast details
Podcast Data Fields:
episode_id(str, optional): Unique episode identifierpodcast_name(str, optional): Name of the podcast showaudio_url(str, optional): Direct URL to audio fileduration(int, optional): Episode duration in seconds
Returns: None
Raises:
Exception: If database insertion fails (propagates after rollback)
Example:
# Create collection
collection_id = db.add_collection(
content_type='podcast',
title='AI Safety Discussion - Episode 42',
source='podcast_rss',
url='https://podcast.ai/ep42',
metadata={'language': 'en'}
)
# Add podcast details
db.add_podcast(collection_id, {
'episode_id': 'ep-42',
'podcast_name': 'AI Alignment Podcast',
'audio_url': 'https://podcast.ai/audio/ep42.mp3',
'duration': 3600
})Notes:
- All fields are optional (nullable in database)
Mark a collection item as indexed in OpenSearch.
Parameters:
collection_id(int): The unique ID of the collection
Returns: None
Raises:
Exception: If database update fails (propagates after rollback)
Example:
# After successfully indexing in OpenSearch
collection_id = 42
db.mark_as_indexed(collection_id)
print(f"Collection {collection_id} marked as indexed")SQL Operation:
UPDATE collections SET indexed = 1 WHERE id = ?Retrieve comprehensive database statistics.
Parameters: None
Returns: Dict containing various statistics
Return Format:
{
'by_type': {
'paper': 1523,
'video': 342,
'podcast': 87
},
'indexed': 1845,
'not_indexed': 107,
'recent_7_days': 23,
'collection_history': [
{
'type': 'paper',
'source': 'arxiv',
'total': 1200
},
{
'type': 'paper',
'source': 'pubmed',
'total': 323
},
{
'type': 'video',
'source': 'youtube',
'total': 342
}
]
}Return Fields:
by_type(Dict[str, int]): Count of collections by content typeindexed(int): Number of indexed collectionsnot_indexed(int): Number of not-yet-indexed collectionsrecent_7_days(int): Collections added in the last 7 dayscollection_history(List[Dict]): Aggregated collection stats by type and source
Example:
stats = db.get_statistics()
print("Database Statistics:")
print(f" Total Papers: {stats['by_type'].get('paper', 0)}")
print(f" Total Videos: {stats['by_type'].get('video', 0)}")
print(f" Total Podcasts: {stats['by_type'].get('podcast', 0)}")
print(f" Indexed: {stats['indexed']}")
print(f" Pending Indexing: {stats['not_indexed']}")
print(f" Added Last 7 Days: {stats['recent_7_days']}")
print("\nCollection History:")
for item in stats['collection_history']:
print(f" {item['type']} from {item['source']}: {item['total']} items")Log statistics about a collection operation.
Parameters:
content_type(str): Type of content collectedquery(str): Search query usedresults_count(int): Number of results returned by the APIsource_api(str): API source (e.g.,'arxiv','youtube_api')
Returns: None
Raises:
Exception: If database insertion fails (propagates after rollback)
Example:
# After collecting papers from ArXiv
db.log_collection_stats(
content_type='paper',
query='machine learning',
results_count=25,
source_api='arxiv'
)
# After collecting videos
db.log_collection_stats(
content_type='video',
query='neural networks',
results_count=10,
source_api='youtube_api'
)Usage: This helps track which queries are being used, how many results are typically returned, and which APIs are most productive.
from multi_modal_rag.database import CollectionDatabaseManager
# Initialize database
db = CollectionDatabaseManager()
# === Add a Paper ===
paper_collection_id = db.add_collection(
content_type='paper',
title='Attention Is All You Need',
source='arxiv',
url='https://arxiv.org/abs/1706.03762',
metadata={'keywords': ['transformers', 'attention']},
indexed=False
)
db.add_paper(paper_collection_id, {
'arxiv_id': '1706.03762',
'abstract': 'The dominant sequence transduction models...',
'authors': ['Ashish Vaswani', 'Noam Shazeer'],
'published': '2017-06-12',
'categories': ['cs.CL', 'cs.AI'],
'local_path': 'data/papers/1706.03762.pdf'
})
# Log the collection
db.log_collection_stats('paper', 'transformers', 1, 'arxiv')
# === Add a Video ===
video_collection_id = db.add_collection(
content_type='video',
title='Neural Networks Explained',
source='youtube',
url='https://youtube.com/watch?v=abc123',
metadata={}
)
db.add_video(video_collection_id, {
'video_id': 'abc123',
'author': '3Blue1Brown',
'length': 1194,
'views': 5000000,
'thumbnail_url': 'https://i.ytimg.com/vi/abc123/maxresdefault.jpg',
'transcript': 'Hello and welcome...'
})
# === Query Collections ===
# Get all papers
papers = db.get_collections_by_type('paper', limit=10)
print(f"Found {len(papers)} papers")
# Search
results = db.search_collections('transformer')
print(f"Search found {len(results)} results")
# Get full details
details = db.get_collection_with_details(paper_collection_id)
print(f"Paper title: {details['title']}")
print(f"Authors: {', '.join(details['details']['authors'])}")
# === Mark as Indexed ===
db.mark_as_indexed(paper_collection_id)
# === Get Statistics ===
stats = db.get_statistics()
print(f"Total items: {sum(stats['by_type'].values())}")
print(f"Indexed: {stats['indexed']}, Pending: {stats['not_indexed']}")All methods that modify the database use try/except blocks with rollback:
try:
# Database operation
conn.commit()
except Exception as e:
logger.error(f"Error: {e}")
conn.rollback()
raise # Re-raise exception
finally:
conn.close()Best Practice:
try:
collection_id = db.add_collection(...)
db.add_paper(collection_id, {...})
except Exception as e:
print(f"Failed to add paper: {e}")
# Handle error appropriatelyImportant: SQLite connections are NOT thread-safe by default. Each method creates its own connection and closes it, which is safe for multi-threaded use, but not optimal for performance.
For high-concurrency scenarios, consider:
- Using connection pooling
- Implementing a connection-per-thread pattern
- Using a more robust database (PostgreSQL, MySQL)
For better performance with large datasets:
import sqlite3
conn = sqlite3.connect('data/collections.db')
cursor = conn.cursor()
# Add indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_content_type ON collections(content_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_indexed ON collections(indexed)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_collection_date ON collections(collection_date DESC)")
conn.commit()
conn.close()For inserting many items, use transactions:
import sqlite3
conn = sqlite3.connect('data/collections.db')
cursor = conn.cursor()
try:
# Disable autocommit for batch insert
for paper_data in large_paper_list:
cursor.execute("INSERT INTO collections ...")
# ... more inserts
conn.commit() # Commit all at once
except:
conn.rollback()
raise
finally:
conn.close()- Use
LIMITto avoid loading too many records - Add appropriate indexes on frequently queried fields
- Use
EXPLAIN QUERY PLANto analyze slow queries
import sqlite3
import shutil
from datetime import datetime
# Simple file backup
shutil.copy('data/collections.db', f'data/backup_{datetime.now().strftime("%Y%m%d")}.db')
# SQLite backup API (online backup)
source = sqlite3.connect('data/collections.db')
dest = sqlite3.connect('data/backup.db')
source.backup(dest)
dest.close()
source.close()