The Indexing module manages OpenSearch integration for hybrid search capabilities. It combines traditional keyword search (BM25) with semantic vector search using embeddings, providing powerful retrieval for multi-modal academic content.
multi_modal_rag/indexing/
└── opensearch_manager.py # OpenSearch client and search logic
File: multi_modal_rag/indexing/opensearch_manager.py
Manages all OpenSearch operations including index creation, document indexing, and hybrid search. Uses sentence-transformers for embedding generation and OpenSearch's kNN capabilities for semantic search.
from multi_modal_rag.indexing import OpenSearchManager
manager = OpenSearchManager(
host='localhost',
port=9200,
use_ssl=True,
username='admin',
password='MyStrongPassword@2024!'
)Parameters:
host(str, optional): OpenSearch host address. Default:'localhost'port(int, optional): OpenSearch port. Default:9200use_ssl(bool, optional): Use SSL/TLS connection. Default:Trueusername(str, optional): Authentication username. Default:'admin'password(str, optional): Authentication password. Default:'MyStrongPassword@2024!'
Connection Testing:
- Automatically tests connection on initialization
- Sets
self.connected = Trueif successful - Logs error and continues with limited functionality if connection fails
Embedding Model:
- Uses
SentenceTransformer('all-MiniLM-L6-v2') - Generates 384-dimensional embeddings
- Lightweight and fast (suitable for free-tier deployment)
Example:
manager = OpenSearchManager(
host='localhost',
port=9200
)
if manager.connected:
print("✅ Connected to OpenSearch")
else:
print("⚠️ OpenSearch not available - limited functionality")Creates an OpenSearch index with mappings optimized for multi-modal academic content.
Parameters:
index_name(str): Name of the index to create
Returns: True if successful, False otherwise
Index Configuration:
{
'settings': {
'index': {
'number_of_shards': 2,
'number_of_replicas': 1,
'knn': True # Enable k-NN for vector search
}
},
'mappings': {
'properties': {
'content_type': {'type': 'keyword'},
'title': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword'}}
},
'abstract': {'type': 'text'},
'content': {'type': 'text'},
'authors': {'type': 'keyword'},
'publication_date': {'type': 'date'},
'url': {'type': 'keyword'},
'transcript': {'type': 'text'},
'diagram_descriptions': {'type': 'text'},
'key_concepts': {'type': 'keyword'},
'citations': {
'type': 'nested',
'properties': {
'text': {'type': 'text'},
'source': {'type': 'keyword'}
}
},
'embedding': {
'type': 'knn_vector',
'dimension': 384
},
'metadata': {
'type': 'object',
'enabled': True
}
}
}
}Example:
manager = OpenSearchManager()
success = manager.create_index("research_assistant")
if success:
print("Index created successfully")
else:
print("Failed to create index")Behavior:
- Checks if index already exists before creating
- Skips creation if index exists (doesn't overwrite)
- Returns
Falseif not connected to OpenSearch
Indexes a single document with automatic embedding generation.
Parameters:
index_name(str): Target index namedocument(Dict): Document to index
Returns: OpenSearch response dict, or None on error
Document Structure:
document = {
'content_type': str, # 'paper', 'video', or 'podcast'
'title': str,
'abstract': str, # For papers
'content': str, # Main text content
'authors': List[str],
'publication_date': str, # ISO format date
'url': str,
'transcript': str, # For videos/podcasts
'diagram_descriptions': str,# For papers with diagrams
'key_concepts': List[str],
'metadata': Dict # Additional metadata
}Automatic Processing:
- Combines title + abstract + content (first 1000 chars) into searchable text
- Generates 384-dim embedding using
SentenceTransformer - Adds embedding to document
- Indexes document in OpenSearch
Example:
manager = OpenSearchManager()
paper_doc = {
'content_type': 'paper',
'title': 'Attention Is All You Need',
'abstract': 'The dominant sequence transduction models...',
'content': 'We propose a new simple network architecture...',
'authors': ['Ashish Vaswani', 'Noam Shazeer'],
'publication_date': '2017-06-12',
'url': 'https://arxiv.org/abs/1706.03762',
'key_concepts': ['transformer', 'attention', 'neural networks']
}
response = manager.index_document("research_assistant", paper_doc)
if response:
print(f"Indexed document with ID: {response['_id']}")Embedding Generation:
# Internally performed by index_document()
searchable_text = f"{document.get('title', '')} {document.get('abstract', '')} {document.get('content', '')[:1000]}"
embedding = self.embedding_model.encode(searchable_text).tolist()
document['embedding'] = embedding # 384-dimensional vectorBulk indexes multiple documents efficiently.
Parameters:
index_name(str): Target index namedocuments(List[Dict]): List of documents to index
Returns: Number of successfully indexed documents, or None on error
Example:
manager = OpenSearchManager()
papers = [
{
'content_type': 'paper',
'title': 'Paper 1',
'content': 'Content 1...',
# ... other fields
},
{
'content_type': 'paper',
'title': 'Paper 2',
'content': 'Content 2...',
# ... other fields
},
# ... more papers
]
success_count = manager.bulk_index("research_assistant", papers)
print(f"Successfully indexed {success_count} documents")Performance:
- Uses OpenSearch bulk API for efficiency
- Processes embeddings for all documents before indexing
- Much faster than individual
index_document()calls - Recommended for batches > 10 documents
Progress Logging:
INFO - Starting bulk indexing of 50 documents to 'research_assistant'
DEBUG - Processing document 1/50 for bulk index: Attention Is All You Need
DEBUG - Processing document 2/50 for bulk index: BERT: Pre-training...
...
DEBUG - Executing bulk index operation...
INFO - ✅ Bulk indexed 50 documents successfully to 'research_assistant'
Performs hybrid search combining keyword matching with semantic similarity.
Parameters:
index_name(str): Index to searchquery(str): Search queryk(int, optional): Number of results to return. Default: 10
Returns: List of result dictionaries:
[
{
'score': float, # Relevance score
'source': Dict # Source document
},
# ... more results
]Search Algorithm:
The current implementation uses text-based multi-match search with field boosting:
{
'size': k,
'query': {
'multi_match': {
'query': query,
'fields': [
'title^3', # 3x weight
'abstract^2', # 2x weight
'content', # 1x weight
'transcript', # 1x weight
'key_concepts^2' # 2x weight
],
'type': 'best_fields',
'fuzziness': 'AUTO'
}
}
}Field Boosting Explained:
title^3: Title matches weighted 3x (most important)abstract^2: Abstract matches weighted 2xkey_concepts^2: Concept matches weighted 2xcontent,transcript: Standard 1x weight
Fuzziness: AUTO handles typos (1-2 character edits allowed)
Example:
manager = OpenSearchManager()
results = manager.hybrid_search(
index_name="research_assistant",
query="transformer architecture",
k=5
)
for result in results:
print(f"Score: {result['score']:.2f}")
print(f"Title: {result['source']['title']}")
print(f"Type: {result['source']['content_type']}")
print("---")Output:
Score: 15.42
Title: Attention Is All You Need
Type: paper
---
Score: 12.18
Title: BERT: Pre-training of Deep Bidirectional Transformers
Type: paper
---
Score: 8.94
Title: Illustrated Transformer
Type: video
---
Vector Search (Disabled):
Previous versions used kNN vector search, but it's currently disabled for OpenSearch 3.x compatibility:
# Original hybrid search (commented out)
{
'query': {
'bool': {
'should': [
# Keyword search
{'multi_match': {...}},
# Semantic search
{
'knn': {
'embedding': {
'vector': query_embedding,
'k': k
}
}
}
]
}
}
}To re-enable vector search, modify the query structure and generate query embeddings.
'content_type': {'type': 'keyword'}- Purpose: Identify document type (paper, video, podcast)
- Type:
keyword(exact match, not analyzed) - Usage: Filtering by content type in queries
Example Query:
{
'query': {
'bool': {
'must': [{'match': {'content': 'neural networks'}}],
'filter': [{'term': {'content_type': 'paper'}}]
}
}
}'title': {
'type': 'text',
'fields': {
'keyword': {'type': 'keyword'}
}
}textfield: Full-text search, analyzed (tokenized, lowercased)keywordsubfield: Exact match, sorting, aggregations- Usage: Primary search field with 3x boost
Example:
# Text search (matches "attention mechanism")
{'match': {'title': 'attention'}}
# Exact match (must match entire title)
{'term': {'title.keyword': 'Attention Is All You Need'}}
# Sorting
{'sort': [{'title.keyword': 'asc'}]}'authors': {'type': 'keyword'}- Type:
keywordarray (exact match) - Purpose: Filter by specific authors, aggregations
- Usage: Author filtering, co-author analysis
Example:
# Find all papers by author
{'term': {'authors': 'Geoffrey Hinton'}}
# Aggregation: top authors
{
'aggs': {
'top_authors': {
'terms': {'field': 'authors', 'size': 10}
}
}
}'embedding': {
'type': 'knn_vector',
'dimension': 384
}- Type: k-NN vector for semantic search
- Dimension: 384 (from
all-MiniLM-L6-v2) - Purpose: Semantic similarity matching
- Usage: Vector search for conceptual matching
Example (when enabled):
query_embedding = embedding_model.encode("neural networks").tolist()
{
'query': {
'knn': {
'embedding': {
'vector': query_embedding,
'k': 10
}
}
}
}'citations': {
'type': 'nested',
'properties': {
'text': {'type': 'text'},
'source': {'type': 'keyword'}
}
}- Type:
nested(allows querying within citation objects) - Purpose: Store and search extracted citations
- Usage: Citation analysis, reference tracking
Example:
# Find documents citing specific source
{
'query': {
'nested': {
'path': 'citations',
'query': {
'term': {'citations.source': 'Vaswani et al., 2017'}
}
}
}
}Model: all-MiniLM-L6-v2
Characteristics:
- Size: 80MB (lightweight)
- Dimension: 384
- Speed: ~1000 sentences/second (CPU)
- Quality: Good for general semantic similarity
Why This Model?:
- Free: No API costs
- Fast: Suitable for real-time indexing
- Accurate: 0.68 Spearman correlation on STS benchmark
- Lightweight: Runs on CPU without GPU
# 1. Prepare searchable text
searchable_text = f"{title} {abstract} {content[:1000]}"
# 2. Generate embedding
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
embedding = model.encode(searchable_text) # numpy array (384,)
# 3. Convert to list for JSON serialization
embedding_list = embedding.tolist() # List[float] (384 elements)
# 4. Store in document
document['embedding'] = embedding_listQuery: "deep learning models for language understanding"
Traditional Keyword Search would miss:
- "neural architectures for NLP"
- "transformer networks for text comprehension"
Semantic Search (using embeddings) finds:
- Documents about BERT, GPT (even without exact keywords)
- Papers on attention mechanisms (related concept)
- Content about language models (semantic similarity)
Combines strengths of both approaches:
| Search Type | Strengths | Weaknesses |
|---|---|---|
| Keyword (BM25) | - Exact matches - Fast - Handles rare terms well |
- Misses synonyms - No semantic understanding |
| Vector (kNN) | - Semantic similarity - Finds related concepts - Handles paraphrasing |
- May miss exact terms - Slower - Requires embeddings |
| Hybrid | - Best of both worlds - Balanced precision/recall |
- More complex - Requires tuning |
The system currently uses multi-match with field boosting:
Advantages:
- Simple and fast
- No vector computation at query time
- Works well for exact and fuzzy matches
Limitations:
- No semantic similarity
- Relies on keyword overlap
- May miss conceptually similar content
To re-enable full hybrid search:
def hybrid_search(self, index_name: str, query: str, k: int = 10):
# 1. Generate query embedding
query_embedding = self.embedding_model.encode(query).tolist()
# 2. Construct hybrid query
search_query = {
'size': k,
'query': {
'bool': {
'should': [
# Keyword search (BM25)
{
'multi_match': {
'query': query,
'fields': ['title^3', 'abstract^2', 'content'],
'type': 'best_fields'
}
},
# Vector search (kNN)
{
'script_score': {
'query': {'match_all': {}},
'script': {
'source': "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
'params': {'query_vector': query_embedding}
}
}
}
]
}
}
}
return self.client.search(index=index_name, body=search_query)Score Combination:
- BM25 score: 0-10 (keyword relevance)
- Cosine similarity: 0-2 (semantic similarity, +1 offset)
- Combined: Sum of both (higher = more relevant)
manager = OpenSearchManager()
results = manager.hybrid_search("research_assistant", "machine learning")# Direct OpenSearch query (bypass hybrid_search)
query = {
'query': {
'bool': {
'must': [
{'match': {'content': 'neural networks'}}
],
'filter': [
{'term': {'content_type': 'video'}}
]
}
}
}
response = manager.client.search(index="research_assistant", body=query)query = {
'query': {
'bool': {
'must': [
{'match': {'title': 'transformers'}}
],
'filter': [
{
'range': {
'publication_date': {
'gte': '2020-01-01',
'lte': '2024-12-31'
}
}
}
]
}
}
}query = {
'query': {
'bool': {
'must': [
{'match': {'content': 'attention mechanism'}}
],
'filter': [
{'term': {'authors': 'Yoshua Bengio'}}
]
}
}
}query = {
'size': 0, # Don't return documents
'aggs': {
'papers_per_year': {
'date_histogram': {
'field': 'publication_date',
'calendar_interval': 'year'
}
},
'top_concepts': {
'terms': {
'field': 'key_concepts',
'size': 20
}
}
}
}
response = manager.client.search(index="research_assistant", body=query)
print(response['aggregations'])Single Document:
- Embedding generation: ~10-50ms
- Index operation: ~50-100ms
- Total: ~60-150ms per document
Bulk Indexing (100 documents):
- Embedding generation: ~1-5 seconds
- Bulk index operation: ~500ms-1s
- Total: ~1.5-6 seconds (10-60ms per doc)
Optimization Tips:
-
Use Bulk Indexing:
# Bad: 100 individual calls for doc in documents: manager.index_document(index, doc) # Good: 1 bulk call manager.bulk_index(index, documents)
-
Batch Embedding Generation:
# Encode all texts at once (faster) texts = [f"{d['title']} {d['content']}" for d in documents] embeddings = model.encode(texts) # Batch processing for doc, emb in zip(documents, embeddings): doc['embedding'] = emb.tolist()
-
Increase Shard Count (for large indices):
'settings': { 'number_of_shards': 5, # More shards = more parallelism 'number_of_replicas': 1 }
Text Search:
- Query time: ~10-50ms (10K documents)
- Query time: ~50-200ms (1M documents)
Vector Search (when enabled):
- Query time: ~50-100ms (10K documents)
- Query time: ~200-500ms (1M documents)
Optimization Tips:
-
Limit Result Size:
results = manager.hybrid_search(index, query, k=10) # Not k=1000
-
Use Filters (before scoring):
# Filters don't contribute to score (faster) {'filter': [{'term': {'content_type': 'paper'}}]}
-
Field Selection (return only needed fields):
{ 'query': {...}, '_source': ['title', 'authors', 'url'] # Don't return large 'content' field } -
Enable Caching:
{ 'query': { 'bool': { 'filter': [ {'term': {'content_type': 'paper'}} # Cached ] } } }
manager = OpenSearchManager(host='localhost', port=9200)
if not manager.connected:
print("OpenSearch unavailable - using fallback search")
# Implement fallback logic (e.g., SQLite FTS)success = manager.create_index("research_assistant")
if not success:
# Check if index exists
if manager.client.indices.exists(index="research_assistant"):
print("Index already exists - using existing index")
else:
print("Failed to create index - check permissions")response = manager.index_document(index, document)
if response is None:
# Log error and continue
logger.error(f"Failed to index: {document.get('title')}")
else:
logger.info(f"Indexed: {response['_id']}")try:
results = manager.hybrid_search(index, query)
except Exception as e:
logger.error(f"Search failed: {e}")
results = [] # Return empty resultsfrom opensearchpy import OpenSearch, helpers
from sentence_transformers import SentenceTransformerInstallation:
pip install opensearch-py sentence-transformersOpenSearch Server:
# Docker (recommended for development)
docker run -p 9200:9200 -e "discovery.type=single-node" opensearchproject/opensearch:latest
# Verify connection
curl -X GET "https://localhost:9200" -u admin:admin -kError: ConnectionError: Connection refused
Solution:
- Check OpenSearch is running:
docker ps - Verify port:
curl http://localhost:9200 - Check firewall settings
Error: SSLError: certificate verify failed
Solution: Set verify_certs=False in initialization:
manager = OpenSearchManager(
use_ssl=True,
verify_certs=False # For self-signed certs
)Error: OSError: Can't load model
Solution: Pre-download model:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
# Downloads to ~/.cache/torch/sentence_transformers/Cause: Index already exists
Solution: Delete and recreate, or use existing index:
# Delete existing index
manager.client.indices.delete(index="research_assistant")
# Recreate
manager.create_index("research_assistant")Possible Causes:
-
Index is empty: Check document count
count = manager.client.count(index="research_assistant") print(f"Documents: {count['count']}")
-
Field mismatch: Verify field names in index
mapping = manager.client.indices.get_mapping(index="research_assistant") print(mapping)
-
Query syntax error: Test with simple match query
{'query': {'match_all': {}}} # Should return all docs
Add custom text analysis during index creation:
index_body = {
'settings': {
'analysis': {
'analyzer': {
'scientific_analyzer': {
'type': 'custom',
'tokenizer': 'standard',
'filter': ['lowercase', 'asciifolding', 'porter_stem']
}
}
}
},
'mappings': {
'properties': {
'content': {
'type': 'text',
'analyzer': 'scientific_analyzer'
}
}
}
}Search across multiple indices:
results = manager.client.search(
index=['research_assistant', 'archived_papers'],
body={'query': {'match': {'title': 'transformers'}}}
)Store queries and match documents to them:
# Index a query
manager.client.index(
index='research_queries',
body={
'query': {'match': {'content': 'neural networks'}}
}
)
# Percolate document against stored queries
response = manager.client.search(
index='research_queries',
body={
'query': {
'percolate': {
'field': 'query',
'document': {
'content': 'This paper discusses neural network architectures...'
}
}
}
}
)- Re-enable Vector Search: Full kNN + BM25 hybrid
- Query Expansion: Use LLM to expand queries before search
- Re-ranking: Use Gemini to re-rank top results
- Federated Search: Search external APIs alongside OpenSearch
- Caching Layer: Redis cache for frequent queries
# Add query expansion
def expand_query(self, query: str) -> str:
"""Use LLM to add synonyms and related terms"""
pass
# Add re-ranking
def rerank_results(self, query: str, results: List[Dict]) -> List[Dict]:
"""Use Gemini to re-rank results by relevance"""
pass
# Add caching
def cached_search(self, query: str, k: int = 10) -> List[Dict]:
"""Cache frequent query results"""
pass