Skip to content

Commit b3d0c22

Browse files
committed
wip: parallelize file parsing and processing based on cpu count
Signed-off-by: Anupam Kumar <kyteinsky@gmail.com>
1 parent ea5208a commit b3d0c22

1 file changed

Lines changed: 15 additions & 2 deletions

File tree

context_chat_backend/task_fetcher.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import asyncio
77
import logging
8+
import os
89
from contextlib import suppress
910
from enum import Enum
1011
from io import BytesIO
@@ -35,6 +36,8 @@
3536
THREADS = {}
3637
LOGGER = logging.getLogger('ccb.task_fetcher')
3738
FILES_INDEXING_BATCH_SIZE = 64 # todo: config?
39+
# divides the batch into these many chunks
40+
PARALLEL_FILE_PARSING = max(1, (os.cpu_count() or 2) - 1) # todo: config?
3841
# max concurrent fetches to avoid overloading the NC server or hitting rate limits
3942
CONCURRENT_FILE_FETCHES = 10 # todo: config?
4043
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB, todo: config?
@@ -217,8 +220,18 @@ def _load_sources(source_items: dict[int, SourceItem]) -> dict[int, IndexingErro
217220
else:
218221
source_errors[file_id] = result
219222

220-
files_result = _load_sources(source_files)
221-
providers_result = _load_sources(q_items.content_providers)
223+
files_result = {}
224+
providers_result = {}
225+
chunk_size = FILES_INDEXING_BATCH_SIZE // PARALLEL_FILE_PARSING
226+
227+
# chunk file parsing for better file operation parallelism
228+
for i in range(0, len(source_files), chunk_size):
229+
chunk = dict(list(source_files.items())[i:i+chunk_size])
230+
files_result.update(_load_sources(chunk))
231+
232+
for i in range(0, len(q_items.content_providers), chunk_size):
233+
chunk = dict(list(q_items.content_providers.items())[i:i+chunk_size])
234+
providers_result.update(_load_sources(chunk))
222235

223236
if (
224237
any(isinstance(res, IndexingError) for res in files_result.values())

0 commit comments

Comments
 (0)