Skip to content

Commit b3cfa99

Browse files
committed
feat: embedder concurrency + empty text fix
1 parent 9cd4ca8 commit b3cfa99

6 files changed

Lines changed: 46 additions & 17 deletions

File tree

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ EXTRACT_WORKERS=4 # Parallel worker processes
2323
# Embedder settings
2424
EMBED_INPUT_PREFIX=extracted # Input directory prefix (extracted text)
2525
EMBED_BATCH_SIZE=100 # Documents per batch
26+
EMBED_CONCURRENCY=20 # Parallel API requests (Tier 2: 5K RPM, adjust per tier)
2627
GOOGLE_API_KEY= # Required - get from https://aistudio.google.com/apikey
2728

2829
# Cloudflare R2 (optional - for cloud storage)

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ EXTRACT_WORKERS=4
266266
# Embedder
267267
EMBED_INPUT_PREFIX=extracted
268268
EMBED_BATCH_SIZE=100
269+
EMBED_CONCURRENCY=20 # Parallel API requests
269270
GOOGLE_API_KEY= # Required for embeddings
270271
```
271272

apps/cli/commands/embed.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export async function runEmbed(args: string[]) {
116116
inputPrefix: envConfig.embed.inputPrefix,
117117
model: "google",
118118
batchSize: flags.batchSize ?? 1000000,
119+
concurrency: envConfig.embed.concurrency,
119120
};
120121

121122
console.log("Document Embedder");
@@ -127,6 +128,7 @@ export async function runEmbed(args: string[]) {
127128
console.log(`Output: database (embedding column)`);
128129
console.log(`Model: gemini-embedding-001 (3072 dims)`);
129130
console.log(`Batch: ${config.batchSize >= 1000000 ? "all" : config.batchSize}`);
131+
console.log(`Concurrency: ${config.concurrency} parallel requests`);
130132
console.log("");
131133

132134
try {

packages/embedder/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface EmbedderConfig {
1717
inputPrefix: string;
1818
model: EmbeddingModel;
1919
batchSize: number;
20+
concurrency: number;
2021
};
2122
google: {
2223
apiKey: string;
@@ -43,6 +44,7 @@ export function loadEmbedderConfig(): EmbedderConfig {
4344
inputPrefix: env.EMBED_INPUT_PREFIX || "extracted",
4445
model: (env.EMBED_MODEL as EmbeddingModel) || "google",
4546
batchSize: parseInt(env.EMBED_BATCH_SIZE || "100", 10),
47+
concurrency: parseInt(env.EMBED_CONCURRENCY || "20", 10),
4648
},
4749
google: {
4850
apiKey: env.GOOGLE_API_KEY || "",

packages/embedder/processor.ts

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,25 @@ const API_BATCH_SIZE = 100; // Google supports up to 100 texts per request
1515
* Split text into overlapping chunks for embedding.
1616
*/
1717
function chunkText(text: string): string[] {
18-
if (text.length <= CHUNK_SIZE_CHARS) {
19-
return [text];
18+
const trimmed = text.trim();
19+
if (trimmed.length === 0) {
20+
return [];
21+
}
22+
if (trimmed.length <= CHUNK_SIZE_CHARS) {
23+
return [trimmed];
2024
}
2125

2226
const chunks: string[] = [];
2327
let start = 0;
2428

25-
while (start < text.length) {
26-
const end = Math.min(start + CHUNK_SIZE_CHARS, text.length);
27-
const chunk = text.slice(start, end).trim();
29+
while (start < trimmed.length) {
30+
const end = Math.min(start + CHUNK_SIZE_CHARS, trimmed.length);
31+
const chunk = trimmed.slice(start, end).trim();
2832
if (chunk.length > 0) {
2933
chunks.push(chunk);
3034
}
3135

32-
if (end >= text.length) break;
36+
if (end >= trimmed.length) break;
3337

3438
const nextStart = end - CHUNK_OVERLAP_CHARS;
3539
start = nextStart <= start ? start + 1 : nextStart;
@@ -70,7 +74,7 @@ function weightedAverageEmbeddings(embeddings: number[][], weights: number[]): n
7074
}
7175

7276
export async function processEmbeddings(config: EmbedConfig, verbose: boolean = false): Promise<void> {
73-
const { db, storage, inputPrefix, batchSize } = config;
77+
const { db, storage, inputPrefix, batchSize, concurrency } = config;
7478

7579
const apiKey = process.env.GOOGLE_API_KEY;
7680
if (!apiKey) {
@@ -147,7 +151,7 @@ export async function processEmbeddings(config: EmbedConfig, verbose: boolean =
147151

148152
try {
149153
// Process documents in batches
150-
const docBatchSize = 10; // Load 10 docs at a time from storage
154+
const docBatchSize = 50; // Load docs in batches to fill concurrent API slots
151155

152156
for (let i = 0; i < documents.length; i += docBatchSize) {
153157
const batch = documents.slice(i, i + docBatchSize);
@@ -161,6 +165,12 @@ export async function processEmbeddings(config: EmbedConfig, verbose: boolean =
161165
if (textContent) {
162166
const text = new TextDecoder().decode(textContent);
163167
const chunks = chunkText(text);
168+
if (chunks.length === 0) {
169+
if (verbose) {
170+
console.log(` Skipped: ${doc.id} (empty text content)`);
171+
}
172+
continue;
173+
}
164174
const weights = chunks.map((c) => c.length);
165175
docsWithChunks.push({ id: doc.id, chunks, weights });
166176
}
@@ -186,26 +196,34 @@ export async function processEmbeddings(config: EmbedConfig, verbose: boolean =
186196
}
187197
}
188198

189-
// Embed all chunks in API batches
199+
// Embed all chunks in API batches (with concurrency)
190200
const chunkEmbeddings: number[][] = new Array(allChunks.length);
191201

202+
// Prepare all batch tasks
203+
const batchTasks: { startIdx: number; chunks: string[] }[] = [];
192204
for (let j = 0; j < allChunks.length; j += API_BATCH_SIZE) {
193-
const batchChunks = allChunks.slice(j, j + API_BATCH_SIZE);
205+
batchTasks.push({
206+
startIdx: j,
207+
chunks: allChunks.slice(j, j + API_BATCH_SIZE),
208+
});
209+
}
194210

211+
// Process batches with concurrency limit
212+
const processBatch = async (task: { startIdx: number; chunks: string[] }) => {
195213
let retries = 0;
196214
const maxRetries = 3;
197215
while (retries < maxRetries) {
198216
try {
199-
const embeddings = await embedTexts(batchChunks);
217+
const embeddings = await embedTexts(task.chunks);
200218
for (let k = 0; k < embeddings.length; k++) {
201-
chunkEmbeddings[j + k] = embeddings[k];
219+
chunkEmbeddings[task.startIdx + k] = embeddings[k];
202220
}
203-
break;
221+
return;
204222
} catch (error: unknown) {
205223
const errorMsg = error instanceof Error ? error.message : String(error);
206224
if (errorMsg.includes("429") || errorMsg.includes("rate") || errorMsg.includes("quota")) {
207225
retries++;
208-
const waitTime = Math.pow(2, retries) * 10;
226+
const waitTime = Math.pow(2, retries) * 2; // Shorter wait for parallel
209227
if (verbose) {
210228
console.log(` Rate limited, waiting ${waitTime}s (retry ${retries}/${maxRetries})...`);
211229
}
@@ -215,9 +233,13 @@ export async function processEmbeddings(config: EmbedConfig, verbose: boolean =
215233
}
216234
}
217235
}
218-
if (retries >= maxRetries) {
219-
throw new Error(`Failed after ${maxRetries} retries due to rate limiting`);
220-
}
236+
throw new Error(`Failed after ${maxRetries} retries due to rate limiting`);
237+
};
238+
239+
// Run batches with concurrency limit
240+
for (let j = 0; j < batchTasks.length; j += concurrency) {
241+
const concurrentBatches = batchTasks.slice(j, j + concurrency);
242+
await Promise.all(concurrentBatches.map(processBatch));
221243
}
222244

223245
// Combine chunk embeddings per document and save

packages/embedder/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ export interface EmbedConfig {
2525
inputPrefix: string;
2626
model: EmbeddingModel;
2727
batchSize: number;
28+
concurrency: number;
2829
}

0 commit comments

Comments
 (0)