Skip to content

Commit 3436c62

Browse files
authored
Merge pull request #13 from O-Labz/memory-improvements
made more improvements for better performance and reliability for lar…
2 parents 69ee272 + 7b4f370 commit 3436c62

11 files changed

Lines changed: 443 additions & 86 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ ollama pull nomic-embed-text
7676

7777
docker run -d \
7878
--name context-simplo \
79+
--restart unless-stopped \
7980
-p 3001:3001 \
8081
-v "$HOME":/host:ro \
8182
-v context-simplo-data:/data \
@@ -84,6 +85,7 @@ docker run -d \
8485
-e LLM_PROVIDER=ollama \
8586
-e LLM_BASE_URL=http://host.docker.internal:11434 \
8687
-e LLM_EMBEDDING_MODEL=nomic-embed-text \
88+
-e GRAPH_MEMORY_LIMIT_MB=4096 \
8789
ohopson/context-simplo:latest
8890
```
8991

src/core/config.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,24 @@ import type {
2828
ResponseMode,
2929
} from './types.js';
3030

31+
/**
32+
* Default configuration values
33+
*
34+
* Key boot behavior config:
35+
* - autoIndex: When true, resumes incomplete indexing for previously-added repos on boot.
36+
* Does NOT auto-index the mount root. Repos must be explicitly added via API/MCP.
37+
* - watchEnabled: Gates per-repo watcher restore on boot. If true, restores watchers for
38+
* repos where isWatched=true. Does NOT auto-watch the mount root.
39+
* - autoWatch: Controls whether newly-added repos are automatically watched (via API/MCP).
40+
*/
3141
const DEFAULT_CONFIG = {
3242
llmProvider: 'none' as LLMProviderType,
3343
llmApiKey: undefined,
3444
llmBaseUrl: 'http://host.docker.internal:11434',
3545
llmEmbeddingModel: undefined,
3646
dataDir: '/data',
37-
autoIndex: true,
38-
watchEnabled: true,
47+
autoIndex: true, // Resume incomplete indexing on boot for previously-added repos
48+
watchEnabled: true, // Gate per-repo watcher restore on boot
3949
autoWatch: true,
4050
logLevel: 'info' as const,
4151
embeddingConcurrency: 5,

src/core/embedding-backfill.ts

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/**
2+
* Embedding Backfiller - Background worker for semantic embeddings
3+
*
4+
* What it does:
5+
* Continuously processes files with embedding_status='pending' from SQLite,
6+
* generating embeddings and upserting to LanceDB. Decouples semantic indexing
7+
* from the critical path so structural queries work immediately.
8+
*
9+
* Inputs: SQLite storage (source of truth for pending files)
10+
* Outputs: Embeddings written to LanceDB, files marked done/error
11+
* Constraints: Bounded concurrency, bounded memory (no unbounded queues)
12+
* Assumptions: Files are already parsed and in storage
13+
* Failure cases: File deleted, embedding provider down, vectorStore errors
14+
*
15+
* Design:
16+
* - Pull files in batches from SQLite (bounded memory, survives restart)
17+
* - Read file content, scrub secrets, chunk by symbols, embed via queue
18+
* - Upsert to vectorStore, mark file status (done/error)
19+
* - Bounded concurrency (N files in flight at once)
20+
* - Graceful error handling (log + mark error + continue, no crash)
21+
* - Clean start/stop for graceful shutdown
22+
*
23+
* Performance: Processes files at the rate the embedding provider allows
24+
* Concurrency: Async with configurable concurrency limit
25+
* Security: Scrubs secrets before embedding
26+
*/
27+
28+
import { readFile } from 'node:fs/promises';
29+
import { resolve } from 'node:path';
30+
import { EventEmitter } from 'events';
31+
import type { StorageProvider } from '../store/provider.js';
32+
import type { EmbeddingQueue } from './embedding-queue.js';
33+
import type { LanceDBVectorStore } from '../store/lance.js';
34+
import { scrubSecrets } from '../security/scrubber.js';
35+
import { chunkCodeForEmbedding } from '../llm/chunker.js';
36+
import { parseFile } from './parser.js';
37+
import { StoreError } from './errors.js';
38+
39+
export interface EmbeddingBackfillOptions {
40+
concurrency: number;
41+
batchSize: number;
42+
pollIntervalMs: number;
43+
workspaceRoot: string;
44+
}
45+
46+
export interface EmbeddingBackfillStats {
47+
filesProcessed: number;
48+
filesErrored: number;
49+
chunksCreated: number;
50+
isRunning: boolean;
51+
}
52+
53+
export class EmbeddingBackfiller extends EventEmitter {
54+
private storage: StorageProvider;
55+
private embeddingQueue: EmbeddingQueue;
56+
private vectorStore: LanceDBVectorStore;
57+
private options: EmbeddingBackfillOptions;
58+
private running = false;
59+
private inFlight = 0;
60+
private filesProcessed = 0;
61+
private filesErrored = 0;
62+
private chunksCreated = 0;
63+
private pollTimer: NodeJS.Timeout | null = null;
64+
65+
constructor(
66+
storage: StorageProvider,
67+
embeddingQueue: EmbeddingQueue,
68+
vectorStore: LanceDBVectorStore,
69+
options: EmbeddingBackfillOptions
70+
) {
71+
super();
72+
this.storage = storage;
73+
this.embeddingQueue = embeddingQueue;
74+
this.vectorStore = vectorStore;
75+
this.options = options;
76+
}
77+
78+
start(): void {
79+
if (this.running) {
80+
console.warn('EmbeddingBackfiller already running');
81+
return;
82+
}
83+
84+
this.running = true;
85+
console.log(
86+
`EmbeddingBackfiller started (concurrency: ${this.options.concurrency}, ` +
87+
`batch: ${this.options.batchSize}, poll: ${this.options.pollIntervalMs}ms)`
88+
);
89+
this.schedulePoll();
90+
}
91+
92+
async stop(): Promise<void> {
93+
if (!this.running) {
94+
return;
95+
}
96+
97+
this.running = false;
98+
99+
if (this.pollTimer) {
100+
clearTimeout(this.pollTimer);
101+
this.pollTimer = null;
102+
}
103+
104+
// Wait for in-flight operations to complete
105+
while (this.inFlight > 0) {
106+
await this.sleep(100);
107+
}
108+
109+
console.log('EmbeddingBackfiller stopped');
110+
}
111+
112+
getStats(): EmbeddingBackfillStats {
113+
return {
114+
filesProcessed: this.filesProcessed,
115+
filesErrored: this.filesErrored,
116+
chunksCreated: this.chunksCreated,
117+
isRunning: this.running,
118+
};
119+
}
120+
121+
private schedulePoll(): void {
122+
if (!this.running) {
123+
return;
124+
}
125+
126+
this.pollTimer = setTimeout(() => {
127+
this.poll().catch((error) => {
128+
console.error('EmbeddingBackfiller poll failed:', error);
129+
}).finally(() => {
130+
this.schedulePoll();
131+
});
132+
}, this.options.pollIntervalMs);
133+
}
134+
135+
private async poll(): Promise<void> {
136+
if (!this.running || this.inFlight >= this.options.concurrency) {
137+
return;
138+
}
139+
140+
const availableSlots = this.options.concurrency - this.inFlight;
141+
const files = this.storage.listPendingEmbeddingFiles(availableSlots);
142+
143+
if (files.length === 0) {
144+
return;
145+
}
146+
147+
for (const file of files) {
148+
if (!this.running || this.inFlight >= this.options.concurrency) {
149+
break;
150+
}
151+
152+
this.inFlight++;
153+
this.processFile(file.path, file.repositoryId)
154+
.catch((error) => {
155+
console.error(`EmbeddingBackfiller failed for ${file.path}:`, error);
156+
})
157+
.finally(() => {
158+
this.inFlight--;
159+
});
160+
}
161+
}
162+
163+
private async processFile(filePath: string, repositoryId: string): Promise<void> {
164+
try {
165+
// Read file content
166+
const absolutePath = resolve(this.options.workspaceRoot, filePath);
167+
let fileContent: string;
168+
try {
169+
fileContent = await readFile(absolutePath, 'utf-8');
170+
} catch (error) {
171+
throw new StoreError(
172+
'readFile',
173+
`File not readable: ${filePath}`,
174+
error as Error
175+
);
176+
}
177+
178+
// Scrub secrets
179+
const { scrubbed } = scrubSecrets(fileContent);
180+
181+
// Parse file to get AST nodes for chunking
182+
let parsed;
183+
try {
184+
parsed = await parseFile(filePath, repositoryId, this.options.workspaceRoot);
185+
} catch (error) {
186+
throw new StoreError(
187+
'parseFile',
188+
`Failed to parse ${filePath}`,
189+
error as Error
190+
);
191+
}
192+
193+
// Chunk code for embedding
194+
const chunks = chunkCodeForEmbedding(parsed, scrubbed);
195+
196+
if (chunks.length === 0) {
197+
// No chunks to embed (e.g., file only has imports)
198+
this.storage.updateFileEmbeddingStatus(filePath, 'done');
199+
this.filesProcessed++;
200+
this.emit('file:complete', filePath, 0);
201+
return;
202+
}
203+
204+
// Embed chunks via the queue (with backpressure)
205+
const texts = chunks.map((chunk) => chunk.content);
206+
const embeddings = await this.embeddingQueue.embed(texts);
207+
208+
// Attach embeddings to chunks
209+
const chunksWithEmbeddings = chunks.map((chunk, i) => ({
210+
...chunk,
211+
embedding: embeddings[i],
212+
}));
213+
214+
// Upsert to vectorStore
215+
await this.vectorStore.upsertChunks(chunksWithEmbeddings);
216+
217+
// Mark file as done
218+
this.storage.updateFileEmbeddingStatus(filePath, 'done');
219+
this.filesProcessed++;
220+
this.chunksCreated += chunks.length;
221+
this.emit('file:complete', filePath, chunks.length);
222+
} catch (error) {
223+
// Mark file as error (do not retry in this implementation)
224+
this.storage.updateFileEmbeddingStatus(filePath, 'error');
225+
this.filesErrored++;
226+
this.emit('file:error', filePath, error);
227+
console.warn(`EmbeddingBackfiller marked ${filePath} as error:`, error);
228+
}
229+
}
230+
231+
private sleep(ms: number): Promise<void> {
232+
return new Promise((resolve) => setTimeout(resolve, ms));
233+
}
234+
}

src/core/embedding-queue.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export interface EmbeddingQueueOptions {
3131
concurrency: number;
3232
batchSize: number;
3333
maxRetries: number;
34+
maxQueueDepth?: number;
3435
}
3536

3637
export interface EmbeddingJob {
@@ -59,6 +60,7 @@ export class EmbeddingQueue extends EventEmitter {
5960
private concurrency: number;
6061
private maxRetries: number;
6162
private batchSize: number;
63+
private maxQueueDepth: number;
6264
private draining = false;
6365

6466
constructor(provider: EmbeddingProvider, options: EmbeddingQueueOptions) {
@@ -67,13 +69,19 @@ export class EmbeddingQueue extends EventEmitter {
6769
this.concurrency = options.concurrency;
6870
this.maxRetries = options.maxRetries;
6971
this.batchSize = Math.max(1, options.batchSize || 1);
72+
this.maxQueueDepth = options.maxQueueDepth || Number.MAX_SAFE_INTEGER;
7073
}
7174

7275
async embed(texts: string[]): Promise<number[][]> {
7376
if (this.draining) {
7477
throw new LLMError('queue', 'Queue is draining, not accepting new jobs', false);
7578
}
7679

80+
// Backpressure: wait for queue to drain below maxQueueDepth
81+
while (this.queue.length >= this.maxQueueDepth) {
82+
await this.waitForDrain();
83+
}
84+
7785
return new Promise((resolve, reject) => {
7886
const job: EmbeddingJob = {
7987
id: `job_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`,
@@ -89,6 +97,19 @@ export class EmbeddingQueue extends EventEmitter {
8997
});
9098
}
9199

100+
private waitForDrain(): Promise<void> {
101+
return new Promise((resolve) => {
102+
const checkDrain = () => {
103+
if (this.queue.length < this.maxQueueDepth) {
104+
this.removeListener('drained', checkDrain);
105+
resolve();
106+
}
107+
};
108+
this.on('drained', checkDrain);
109+
checkDrain();
110+
});
111+
}
112+
92113
private async processQueue(): Promise<void> {
93114
while (this.inFlight < this.concurrency && this.queue.length > 0) {
94115
const job = this.queue.shift();
@@ -100,6 +121,7 @@ export class EmbeddingQueue extends EventEmitter {
100121
this.processJob(job).finally(() => {
101122
this.inFlight--;
102123
this.emit('progress', this.getStats());
124+
this.emit('drained');
103125
this.processQueue();
104126
});
105127
}

0 commit comments

Comments
 (0)