Skip to content

Commit 1f35aaa

Browse files
committed
chunking and enbedding optimised
1 parent 2fd5fdf commit 1f35aaa

1 file changed

Lines changed: 64 additions & 43 deletions

File tree

devtron-docs-rag-server/vector_store.py

Lines changed: 64 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,15 @@ def embed_documents(self, texts: List[str]) -> List[List[float]]:
5959
"""
6060
# Add instruction prefix for better retrieval (recommended by BGE)
6161
texts_with_prefix = [f"passage: {text}" for text in texts]
62-
embeddings = self.model.encode(texts_with_prefix, show_progress_bar=False)
62+
63+
# Use smaller batch size for CPU to avoid memory issues and provide progress
64+
# batch_size=8 is a good balance between speed and memory on CPU
65+
embeddings = self.model.encode(
66+
texts_with_prefix,
67+
show_progress_bar=False,
68+
batch_size=8,
69+
convert_to_numpy=True
70+
)
6371
return embeddings.tolist()
6472

6573
def embed_query(self, text: str) -> List[float]:
@@ -215,14 +223,15 @@ async def index_documents(self, documents: List[Dict[str, Any]]) -> None:
215223

216224
logger.info(f"Starting indexing: {len(documents)} documents")
217225

218-
# Process documents in batches
219-
batch_size = 10
226+
# Process documents in smaller batches to avoid timeout
227+
# Reduced from 10 to 5 to process fewer chunks at once
228+
batch_size = 5
220229
total_batches = (len(documents) + batch_size - 1) // batch_size
221230

222231
for i in range(0, len(documents), batch_size):
223232
batch = documents[i:i + batch_size]
224233
batch_num = (i // batch_size) + 1
225-
logger.info(f"Processing batch {batch_num}/{total_batches}")
234+
logger.info(f"Processing batch {batch_num}/{total_batches} (docs {i+1}-{min(i+batch_size, len(documents))})")
226235
await self._index_batch(batch)
227236

228237
logger.info(f"✓ Indexing complete: {len(documents)} documents")
@@ -254,50 +263,62 @@ async def _index_batch(self, documents: List[Dict[str, Any]]) -> None:
254263
'chunk_index': idx
255264
})
256265

257-
# Generate embeddings
258-
logger.info(f"Generating embeddings for {len(rows)} chunks...")
259-
texts = [row['content'] for row in rows]
260-
embeddings = self.embeddings.embed_documents(texts)
266+
logger.info(f"Processing {len(rows)} chunks from {len(documents)} documents")
267+
268+
# Process chunks in smaller sub-batches to avoid timeout
269+
# Embedding generation is CPU-intensive, so we process 20 chunks at a time
270+
chunk_batch_size = 20
271+
total_chunks = len(rows)
261272

262-
# Insert into database
263273
conn = self.pool.getconn()
264274
try:
265-
with conn.cursor() as cur:
266-
# Prepare data for batch insert
267-
values = [
268-
(
269-
row['id'],
270-
row['title'],
271-
row['source'],
272-
row['header'],
273-
row['content'],
274-
row['chunk_index'],
275-
embeddings[i]
276-
)
277-
for i, row in enumerate(rows)
278-
]
275+
for chunk_start in range(0, total_chunks, chunk_batch_size):
276+
chunk_end = min(chunk_start + chunk_batch_size, total_chunks)
277+
chunk_batch = rows[chunk_start:chunk_end]
279278

280-
# Batch insert
281-
execute_values(
282-
cur,
283-
"""
284-
INSERT INTO documents
285-
(id, title, source, header, content, chunk_index, embedding)
286-
VALUES %s
287-
ON CONFLICT (id) DO UPDATE SET
288-
title = EXCLUDED.title,
289-
source = EXCLUDED.source,
290-
header = EXCLUDED.header,
291-
content = EXCLUDED.content,
292-
chunk_index = EXCLUDED.chunk_index,
293-
embedding = EXCLUDED.embedding,
294-
updated_at = CURRENT_TIMESTAMP
295-
""",
296-
values
297-
)
279+
# Generate embeddings for this sub-batch
280+
logger.info(f" Generating embeddings for chunks {chunk_start+1}-{chunk_end}/{total_chunks}...")
281+
texts = [row['content'] for row in chunk_batch]
282+
embeddings = self.embeddings.embed_documents(texts)
298283

299-
conn.commit()
300-
logger.info(f"✓ Indexed {len(rows)} chunks")
284+
# Insert into database
285+
with conn.cursor() as cur:
286+
# Prepare data for batch insert
287+
values = [
288+
(
289+
chunk_batch[i]['id'],
290+
chunk_batch[i]['title'],
291+
chunk_batch[i]['source'],
292+
chunk_batch[i]['header'],
293+
chunk_batch[i]['content'],
294+
chunk_batch[i]['chunk_index'],
295+
embeddings[i]
296+
)
297+
for i in range(len(chunk_batch))
298+
]
299+
300+
# Batch insert
301+
execute_values(
302+
cur,
303+
"""
304+
INSERT INTO documents
305+
(id, title, source, header, content, chunk_index, embedding)
306+
VALUES %s
307+
ON CONFLICT (id) DO UPDATE SET
308+
title = EXCLUDED.title,
309+
source = EXCLUDED.source,
310+
header = EXCLUDED.header,
311+
content = EXCLUDED.content,
312+
chunk_index = EXCLUDED.chunk_index,
313+
embedding = EXCLUDED.embedding,
314+
updated_at = CURRENT_TIMESTAMP
315+
""",
316+
values
317+
)
318+
conn.commit()
319+
logger.info(f" ✓ Stored {len(chunk_batch)} chunks in database")
320+
321+
logger.info(f"✓ Batch complete: {total_chunks} chunks indexed")
301322
finally:
302323
self.pool.putconn(conn)
303324

0 commit comments

Comments
 (0)