Skip to content

Commit 1fa878c

Browse files
committed
aggressive optimization for embedding documents
1 parent 1f35aaa commit 1fa878c

1 file changed

Lines changed: 32 additions & 18 deletions

File tree

devtron-docs-rag-server/vector_store.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import json
77
import os
8+
import asyncio
89
from typing import List, Dict, Any, Optional
910
from pathlib import Path
1011
import hashlib
@@ -60,13 +61,14 @@ def embed_documents(self, texts: List[str]) -> List[List[float]]:
6061
# Add instruction prefix for better retrieval (recommended by BGE)
6162
texts_with_prefix = [f"passage: {text}" for text in texts]
6263

63-
# Use smaller batch size for CPU to avoid memory issues and provide progress
64-
# batch_size=8 is a good balance between speed and memory on CPU
64+
# Use very small batch size for CPU to minimize blocking time
65+
# batch_size=2 processes 2 texts at a time, reducing memory and blocking
6566
embeddings = self.model.encode(
6667
texts_with_prefix,
6768
show_progress_bar=False,
68-
batch_size=8,
69-
convert_to_numpy=True
69+
batch_size=2,
70+
convert_to_numpy=True,
71+
normalize_embeddings=False
7072
)
7173
return embeddings.tolist()
7274

@@ -223,17 +225,19 @@ async def index_documents(self, documents: List[Dict[str, Any]]) -> None:
223225

224226
logger.info(f"Starting indexing: {len(documents)} documents")
225227

226-
# Process documents in smaller batches to avoid timeout
227-
# Reduced from 10 to 5 to process fewer chunks at once
228-
batch_size = 5
229-
total_batches = (len(documents) + batch_size - 1) // batch_size
228+
# Process documents one at a time to minimize memory and allow health checks
229+
batch_size = 1
230+
total_batches = len(documents)
230231

231232
for i in range(0, len(documents), batch_size):
232233
batch = documents[i:i + batch_size]
233-
batch_num = (i // batch_size) + 1
234-
logger.info(f"Processing batch {batch_num}/{total_batches} (docs {i+1}-{min(i+batch_size, len(documents))})")
234+
batch_num = i + 1
235+
logger.info(f"Processing document {batch_num}/{total_batches}: {batch[0].get('title', 'Unknown')}")
235236
await self._index_batch(batch)
236237

238+
# Yield control to event loop to allow health checks to respond
239+
await asyncio.sleep(0.1)
240+
237241
logger.info(f"✓ Indexing complete: {len(documents)} documents")
238242

239243
async def _index_batch(self, documents: List[Dict[str, Any]]) -> None:
@@ -263,11 +267,11 @@ async def _index_batch(self, documents: List[Dict[str, Any]]) -> None:
263267
'chunk_index': idx
264268
})
265269

266-
logger.info(f"Processing {len(rows)} chunks from {len(documents)} documents")
270+
logger.info(f"Processing {len(rows)} chunks from {len(documents)} document(s)")
267271

268-
# Process chunks in smaller sub-batches to avoid timeout
269-
# Embedding generation is CPU-intensive, so we process 20 chunks at a time
270-
chunk_batch_size = 20
272+
# Process chunks in very small sub-batches to avoid blocking health checks
273+
# Reduced to 5 chunks at a time (~10-15 seconds per sub-batch)
274+
chunk_batch_size = 5
271275
total_chunks = len(rows)
272276

273277
conn = self.pool.getconn()
@@ -277,9 +281,16 @@ async def _index_batch(self, documents: List[Dict[str, Any]]) -> None:
277281
chunk_batch = rows[chunk_start:chunk_end]
278282

279283
# Generate embeddings for this sub-batch
280-
logger.info(f" Generating embeddings for chunks {chunk_start+1}-{chunk_end}/{total_chunks}...")
284+
logger.info(f" Embedding chunks {chunk_start+1}-{chunk_end}/{total_chunks}...")
281285
texts = [row['content'] for row in chunk_batch]
282-
embeddings = self.embeddings.embed_documents(texts)
286+
287+
# Run embedding in thread pool to avoid blocking event loop
288+
loop = asyncio.get_event_loop()
289+
embeddings = await loop.run_in_executor(
290+
None,
291+
self.embeddings.embed_documents,
292+
texts
293+
)
283294

284295
# Insert into database
285296
with conn.cursor() as cur:
@@ -316,9 +327,12 @@ async def _index_batch(self, documents: List[Dict[str, Any]]) -> None:
316327
values
317328
)
318329
conn.commit()
319-
logger.info(f" ✓ Stored {len(chunk_batch)} chunks in database")
330+
logger.info(f" ✓ Stored {len(chunk_batch)} chunks")
331+
332+
# Yield control to event loop to allow health checks
333+
await asyncio.sleep(0.1)
320334

321-
logger.info(f"✓ Batch complete: {total_chunks} chunks indexed")
335+
logger.info(f"✓ Document complete: {total_chunks} chunks indexed")
322336
finally:
323337
self.pool.putconn(conn)
324338

0 commit comments

Comments
 (0)