Skip to content

Commit 2106d9c

Browse files
committed
Implement system resource monitoring and parallel processing in Docker setup
- Added functions to retrieve system resources and calculate optimal parameters for processing. - Integrated a ThreadPoolExecutor for parallel chunk processing in the vectorization endpoint. - Introduced a new endpoint to fetch system information and current parameters. - Updated Firestore repository to handle chunked files with SHA256 checksum for integrity. - Enhanced VectorActionUseCase to utilize dynamic chunk size and worker count based on system info.
1 parent 6c9d25c commit 2106d9c

5 files changed

Lines changed: 127 additions & 24 deletions

File tree

docker/main.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import time
55
import logging
66
import os
7+
import multiprocessing
8+
import psutil
9+
from concurrent.futures import ThreadPoolExecutor
710
from InstructorEmbedding import INSTRUCTOR
811

912
# Configure logs
@@ -12,12 +15,46 @@
1215

1316
app = FastAPI()
1417

18+
def get_system_resources():
19+
"""Obtiene información sobre los recursos del sistema"""
20+
cpu_count = multiprocessing.cpu_count()
21+
memory = psutil.virtual_memory()
22+
return {
23+
"cpu_count": cpu_count,
24+
"memory_total": memory.total,
25+
"memory_available": memory.available
26+
}
27+
28+
def calculate_optimal_parameters(system_resources):
29+
"""Calcula los parámetros óptimos basados en los recursos del sistema"""
30+
# Usamos el 75% de los núcleos disponibles para workers
31+
max_workers = max(1, int(system_resources["cpu_count"] * 0.75))
32+
33+
# Calculamos el tamaño de chunk basado en la memoria disponible
34+
# Asumimos que cada embedding ocupa aproximadamente 1MB
35+
# y dejamos un margen de seguridad del 50%
36+
memory_per_chunk = 1024 * 1024 # 1MB por chunk
37+
available_chunks = int((system_resources["memory_available"] * 0.5) / memory_per_chunk)
38+
chunk_size = max(4, min(32, available_chunks // max_workers))
39+
40+
logger.info(f"System resources: {system_resources}")
41+
logger.info(f"Calculated parameters - max_workers: {max_workers}, chunk_size: {chunk_size}")
42+
43+
return max_workers, chunk_size
44+
45+
# Obtener recursos del sistema y calcular parámetros óptimos
46+
system_resources = get_system_resources()
47+
max_workers, chunk_size = calculate_optimal_parameters(system_resources)
48+
1549
# Global model state
1650
model_state = {
1751
"status": "starting", # starting, downloading, loading, warming_up, ready, error
1852
"progress": 0,
1953
"message": "Starting up...",
20-
"model": None
54+
"model": None,
55+
"executor": ThreadPoolExecutor(max_workers=max_workers),
56+
"chunk_size": chunk_size,
57+
"system_resources": system_resources
2158
}
2259

2360
# Input of the endpoint
@@ -99,5 +136,34 @@ async def vectorize(req: VectorizeRequest):
99136

100137
# Pair each text with its corresponding instruction
101138
pairs = [[instruction, text] for text, instruction in zip(req.instructions, req.texts)]
102-
embeddings = model_state["model"].encode(pairs)
103-
return {"embeddings": embeddings.tolist()}
139+
140+
# Dividir los pares en chunks para procesamiento paralelo
141+
chunks = [pairs[i:i + model_state["chunk_size"]] for i in range(0, len(pairs), model_state["chunk_size"])]
142+
143+
# Procesar chunks en paralelo
144+
futures = []
145+
for chunk in chunks:
146+
future = model_state["executor"].submit(model_state["model"].encode, chunk)
147+
futures.append(future)
148+
149+
# Recolectar resultados
150+
embeddings = []
151+
for future in futures:
152+
embeddings.extend(future.result().tolist())
153+
154+
return {"embeddings": embeddings}
155+
156+
@app.get("/system-info")
157+
async def get_system_info():
158+
"""Endpoint para obtener información del sistema y parámetros actuales"""
159+
return {
160+
"system_resources": model_state["system_resources"],
161+
"parameters": {
162+
"max_workers": model_state["executor"]._max_workers,
163+
"chunk_size": model_state["chunk_size"]
164+
}
165+
}
166+
167+
@app.on_event("shutdown")
168+
async def shutdown_event():
169+
model_state["executor"].shutdown(wait=True)

src/data/model/chunked_file.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import { createHash } from 'crypto';
2+
13
export class ChunkedFile {
24
id: string;
35
path: string;
46
index: number;
57
content: string;
68
chunks: string[];
9+
shasum: string = '';
710
vector: number[][] = [];
811

912
constructor(id: string, path: string, index: number, content: string, chunks: string[]) {
@@ -12,6 +15,11 @@ export class ChunkedFile {
1215
this.index = index;
1316
this.content = content;
1417
this.chunks = chunks;
18+
this.shasum = this.calculateShasum(content);
19+
}
20+
21+
private calculateShasum(content: string): string {
22+
return createHash('sha256').update(content).digest('hex');
1523
}
1624
}
1725

src/data/repository/docker_repository.ts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,8 @@ export class DockerRepository {
224224
}
225225
}
226226

227-
// Example 1: Embedding for semantic search
228-
// const vector1 = await getEmbedding(
229-
// "Represent the following text for semantic search",
230-
// "Implement a new feature for user authentication"
231-
// );
232-
233-
// Example 2: Embedding for classification
234-
// const vector2 = await getEmbedding(
235-
// "Classify the following text into a category",
236-
// "Fix the login button not working on mobile devices"
237-
// );
227+
getSystemInfo = async (): Promise<any> => {
228+
const response = await fetch('http://localhost:8000/system-info');
229+
return await response.json();
230+
}
238231
}

src/data/repository/firestore_repository.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { getApps, initializeApp } from 'firebase/app';
2-
import { Firestore, FirestoreError, collection, doc, getFirestore, writeBatch } from 'firebase/firestore';
2+
import { Firestore, FirestoreError, collection, doc, getDocs, getFirestore, query, setDoc, where, writeBatch } from 'firebase/firestore';
33
import { ChunkedFile } from '../model/chunked_file';
44

55
export class FirestoreRepository {
@@ -55,4 +55,25 @@ export class FirestoreRepository {
5555
);
5656
}
5757
}
58+
59+
setChunkedFile = async (project: string, branch: string, document: ChunkedFile): Promise<void> => {
60+
const finalBranch = branch.replace(/\//g, '-')
61+
const proyectCollection = collection(this.getFirestore(), project);
62+
const branchDocument = doc(proyectCollection, finalBranch);
63+
const filesCollection = collection(branchDocument, this.CHUNKS_COLLECTION);
64+
const chunkDocument = doc(filesCollection, document.id);
65+
await setDoc(chunkDocument, document);
66+
}
67+
68+
getChunkedFiles = async (project: string, branch: string, shasum: string): Promise<ChunkedFile[]> => {
69+
const finalBranch = branch.replace(/\//g, '-')
70+
const proyectCollection = collection(this.getFirestore(), project);
71+
const branchDocument = doc(proyectCollection, finalBranch);
72+
const filesCollection = collection(branchDocument, this.CHUNKS_COLLECTION);
73+
74+
const shaQuery = query(filesCollection, where('shasum', '==', shasum));
75+
const snapshot = await getDocs(shaQuery);
76+
const documents = snapshot.docs.map(doc => doc.data() as ChunkedFile);
77+
return documents;
78+
}
5879
}

src/usecase/vector_action_use_case.ts

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,18 @@ export class VectorActionUseCase implements ParamUseCase<Execution, Result[]> {
3131

3232
await this.dockerRepository.startContainer();
3333

34+
const systemInfo = await this.dockerRepository.getSystemInfo();
35+
logDebugInfo(`System info: ${JSON.stringify(systemInfo, null, 2)}`);
36+
const chunkSize = systemInfo.parameters.chunk_size as number;
37+
const maxWorkers = systemInfo.parameters.max_workers as number;
38+
3439
logDebugInfo(`Getting chunked files for ${param.repo} ${param.commit.branch}`);
3540

3641
const chunkedFiles = await this.fileRepository.getChunkedRepositoryContent(
3742
param.owner,
3843
param.repo,
3944
param.commit.branch,
40-
32,
45+
chunkSize,
4146
param.tokens.token
4247
);
4348

@@ -59,20 +64,30 @@ export class VectorActionUseCase implements ParamUseCase<Execution, Result[]> {
5964

6065
logDebugInfo(`Processing file ${i + 1}/${totalFiles} (${progress.toFixed(1)}%) - Estimated time remaining: ${Math.ceil(remainingTime)} seconds`);
6166

67+
const remoteChunkedFiles = await firestoreRepository.getChunkedFiles(
68+
param.repo,
69+
param.commit.branch,
70+
chunkedFile.shasum
71+
);
72+
73+
if (remoteChunkedFiles.length > 0 && remoteChunkedFiles[0].vector.length > 0) {
74+
processedChunkedFiles.push(chunkedFile);
75+
continue;
76+
}
77+
6278
const embeddings = await this.dockerRepository.getEmbedding(
6379
chunkedFile.chunks.map(chunk => [this.CODE_INSTRUCTION, chunk])
6480
);
6581
chunkedFile.vector = embeddings;
66-
processedChunkedFiles.push(chunkedFile);
67-
}
6882

69-
logDebugInfo(`Setting all chunked files to firestore for ${param.repo} ${param.commit.branch}`);
83+
await firestoreRepository.setChunkedFile(
84+
param.repo,
85+
param.commit.branch,
86+
chunkedFile
87+
);
7088

71-
await firestoreRepository.setAllChunkedFiles(
72-
param.repo,
73-
param.commit.branch,
74-
processedChunkedFiles
75-
);
89+
processedChunkedFiles.push(chunkedFile);
90+
}
7691

7792
logDebugInfo(`All chunked files set to firestore for ${param.repo} ${param.commit.branch}`);
7893

0 commit comments

Comments
 (0)