Skip to content

Commit d372ef7

Browse files
wayneshnaxeldunkel
andauthored
Feat: Tika Integration and Batch Indexing (#132)
* Feat/tika integration (#94) * feat(Tika) Integration von Tika zur Textextraktion * feat(Tika) Integration of Apache Tika for text extraction * feat(Tika): Complete Tika integration with text extraction and docker-compose setup - Add Tika service to docker-compose.yml - Implement text sanitization and document validation - Improve batch processing with concurrency control * fix(comments) translated comments into english fix(docker) removed ports (only used for testing) * feat(indexing): Implement batch indexing for Meilisearch This change introduces batch processing for indexing emails into Meilisearch to significantly improve performance and throughput during ingestion. This change is based on the batch processing method previously contributed by @axeldunkel. Previously, each email was indexed individually, resulting in a high number of separate API calls. This approach was inefficient, especially for large mailboxes. The `processMailbox` queue worker now accumulates emails into a batch before sending them to the `IndexingService`. The service then uses the `addDocuments` Meilisearch API endpoint to index the entire batch in a single request, reducing network overhead and improving indexing speed. A new environment variable, `MEILI_INDEXING_BATCH`, has been added to make the batch size configurable, with a default of 500. Additionally, this commit includes minor refactoring: - The `TikaService` has been moved to its own dedicated file. - The `PendingEmail` type has been moved to the shared `@open-archiver/types` package. * chore(jobs): make continuous sync job scheduling idempotent Adds a static `jobId` to the repeatable 'schedule-continuous-sync' job. This prevents duplicate jobs from being scheduled if the server restarts. By providing a unique ID, the queue will update the existing repeatable job instead of creating a new one, ensuring the sync runs only at the configured frequency. --------- Co-authored-by: axeldunkel <53174090+axeldunkel@users.noreply.github.com> Co-authored-by: Wayne <5291640+ringoinca@users.noreply.github.com>
1 parent e9a65f9 commit d372ef7

14 files changed

Lines changed: 786 additions & 75 deletions

File tree

.env.example

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ DATABASE_URL="postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/$
1919
# Meilisearch
2020
MEILI_MASTER_KEY=aSampleMasterKey
2121
MEILI_HOST=http://meilisearch:7700
22-
22+
# The number of emails to batch together for indexing. Defaults to 500.
23+
MEILI_INDEXING_BATCH=500
2324

2425

2526
# Redis (We use Valkey, which is Redis-compatible and open source)
@@ -60,6 +61,8 @@ RATE_LIMIT_WINDOW_MS=60000
6061
# The maximum number of API requests allowed from an IP within the window. Defaults to 100.
6162
RATE_LIMIT_MAX_REQUESTS=100
6263

64+
65+
6366
# JWT
6467
# IMPORTANT: Change this to a long, random, and secret string in your .env file
6568
JWT_SECRET=a-very-secret-key-that-you-should-change
@@ -70,3 +73,9 @@ JWT_EXPIRES_IN="7d"
7073
# IMPORTANT: Generate a secure, random 32-byte hex string for this
7174
# You can use `openssl rand -hex 32` to generate a key.
7275
ENCRYPTION_KEY=
76+
77+
# Apache Tika Integration
78+
# ONLY active if TIKA_URL is set
79+
TIKA_URL=http://tika:9998
80+
81+

docker-compose.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ services:
5252
networks:
5353
- open-archiver-net
5454

55+
tika:
56+
image: apache/tika:3.2.2.0-full
57+
container_name: tika
58+
restart: always
59+
networks:
60+
- open-archiver-net
61+
5562
volumes:
5663
pgdata:
5764
driver: local
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { storage } from './storage';
22
import { app } from './app';
3-
import { searchConfig } from './search';
3+
import { searchConfig, meiliConfig } from './search';
44
import { connection as redisConfig } from './redis';
55
import { apiConfig } from './api';
66

77
export const config = {
88
storage,
99
app,
1010
search: searchConfig,
11+
meili: meiliConfig,
1112
redis: redisConfig,
1213
api: apiConfig,
1314
};

packages/backend/src/config/search.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ export const searchConfig = {
44
host: process.env.MEILI_HOST || 'http://127.0.0.1:7700',
55
apiKey: process.env.MEILI_MASTER_KEY || '',
66
};
7+
8+
export const meiliConfig = {
9+
indexingBatchSize: process.env.MEILI_INDEXING_BATCH
10+
? parseInt(process.env.MEILI_INDEXING_BATCH)
11+
: 500,
12+
};
Lines changed: 87 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import PDFParser from 'pdf2json';
22
import mammoth from 'mammoth';
33
import xlsx from 'xlsx';
4+
import { logger } from '../config/logger';
5+
import { OcrService } from '../services/OcrService';
46

7+
// Legacy PDF extraction (with improved memory management)
58
function extractTextFromPdf(buffer: Buffer): Promise<string> {
69
return new Promise((resolve) => {
710
const pdfParser = new PDFParser(null, true);
@@ -10,34 +13,60 @@ function extractTextFromPdf(buffer: Buffer): Promise<string> {
1013
const finish = (text: string) => {
1114
if (completed) return;
1215
completed = true;
13-
pdfParser.removeAllListeners();
16+
17+
// explicit cleanup
18+
try {
19+
pdfParser.removeAllListeners();
20+
} catch (e) {
21+
// Ignore cleanup errors
22+
}
23+
1424
resolve(text);
1525
};
1626

17-
pdfParser.on('pdfParser_dataError', () => finish(''));
18-
pdfParser.on('pdfParser_dataReady', () => finish(pdfParser.getRawTextContent()));
27+
pdfParser.on('pdfParser_dataError', (err: any) => {
28+
logger.warn('PDF parsing error:', err?.parserError || 'Unknown error');
29+
finish('');
30+
});
31+
32+
pdfParser.on('pdfParser_dataReady', () => {
33+
try {
34+
const text = pdfParser.getRawTextContent();
35+
finish(text || '');
36+
} catch (err) {
37+
logger.warn('Error getting PDF text content:', err);
38+
finish('');
39+
}
40+
});
1941

2042
try {
2143
pdfParser.parseBuffer(buffer);
2244
} catch (err) {
23-
console.error('Error parsing PDF buffer', err);
45+
logger.error('Error parsing PDF buffer:', err);
2446
finish('');
2547
}
2648

27-
// Prevent hanging if the parser never emits events
28-
setTimeout(() => finish(''), 10000);
49+
// reduced Timeout for better performance
50+
setTimeout(() => {
51+
logger.warn('PDF parsing timed out');
52+
finish('');
53+
}, 5000);
2954
});
3055
}
3156

32-
export async function extractText(buffer: Buffer, mimeType: string): Promise<string> {
57+
// Legacy text extraction for various formats
58+
async function extractTextLegacy(buffer: Buffer, mimeType: string): Promise<string> {
3359
try {
3460
if (mimeType === 'application/pdf') {
61+
// Check PDF size (memory protection)
62+
if (buffer.length > 50 * 1024 * 1024) { // 50MB Limit
63+
logger.warn('PDF too large for legacy extraction, skipping');
64+
return '';
65+
}
3566
return await extractTextFromPdf(buffer);
3667
}
3768

38-
if (
39-
mimeType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
40-
) {
69+
if (mimeType === 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') {
4170
const { value } = await mammoth.extractRawText({ buffer });
4271
return value;
4372
}
@@ -50,7 +79,7 @@ export async function extractText(buffer: Buffer, mimeType: string): Promise<str
5079
const sheetText = xlsx.utils.sheet_to_txt(sheet);
5180
fullText += sheetText + '\n';
5281
}
53-
return fullText;
82+
return fullText.trim();
5483
}
5584

5685
if (
@@ -60,11 +89,54 @@ export async function extractText(buffer: Buffer, mimeType: string): Promise<str
6089
) {
6190
return buffer.toString('utf-8');
6291
}
92+
93+
return '';
6394
} catch (error) {
64-
console.error(`Error extracting text from attachment with MIME type ${mimeType}:`, error);
65-
return ''; // Return empty string on failure
95+
logger.error(`Error extracting text from attachment with MIME type ${mimeType}:`, error);
96+
97+
// Force garbage collection if available
98+
if (global.gc) {
99+
global.gc();
100+
}
101+
102+
return '';
103+
}
104+
}
105+
106+
// Main extraction function
107+
export async function extractText(buffer: Buffer, mimeType: string): Promise<string> {
108+
// Input validation
109+
if (!buffer || buffer.length === 0) {
110+
return '';
111+
}
112+
113+
if (!mimeType) {
114+
logger.warn('No MIME type provided for text extraction');
115+
return '';
66116
}
67117

68-
console.warn(`Unsupported MIME type for text extraction: ${mimeType}`);
69-
return ''; // Return empty string for unsupported types
118+
// General size limit
119+
const maxSize = process.env.TIKA_URL ? 100 * 1024 * 1024 : 50 * 1024 * 1024; // 100MB for Tika, 50MB for Legacy
120+
if (buffer.length > maxSize) {
121+
logger.warn(`File too large for text extraction: ${buffer.length} bytes (limit: ${maxSize})`);
122+
return '';
123+
}
124+
125+
// Decide between Tika and legacy
126+
const tikaUrl = process.env.TIKA_URL;
127+
128+
if (tikaUrl) {
129+
// Tika decides what it can parse
130+
logger.debug(`Using Tika for text extraction: ${mimeType}`);
131+
const ocrService = new OcrService()
132+
try {
133+
return await ocrService.extractTextWithTika(buffer, mimeType);
134+
} catch (error) {
135+
logger.error({ error }, "OCR text extraction failed, returning empty string")
136+
return ''
137+
}
138+
} else {
139+
// extract using legacy mode
140+
return await extractTextLegacy(buffer, mimeType);
141+
}
70142
}

packages/backend/src/jobs/processors/index-email.processor.ts renamed to packages/backend/src/jobs/processors/index-email-batch.processor.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import { IndexingService } from '../../services/IndexingService';
33
import { SearchService } from '../../services/SearchService';
44
import { StorageService } from '../../services/StorageService';
55
import { DatabaseService } from '../../services/DatabaseService';
6+
import { PendingEmail } from '@open-archiver/types';
67

78
const searchService = new SearchService();
89
const storageService = new StorageService();
910
const databaseService = new DatabaseService();
1011
const indexingService = new IndexingService(databaseService, searchService, storageService);
1112

12-
export default async function (job: Job<{ emailId: string }>) {
13-
const { emailId } = job.data;
14-
console.log(`Indexing email with ID: ${emailId}`);
15-
await indexingService.indexEmailById(emailId);
13+
export default async function (job: Job<{ emails: PendingEmail[] }>) {
14+
const { emails } = job.data;
15+
console.log(`Indexing email batch with ${emails.length} emails`);
16+
await indexingService.indexEmailBatch(emails);
1617
}

packages/backend/src/jobs/processors/process-mailbox.processor.ts

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
import { Job } from 'bullmq';
2-
import { IProcessMailboxJob, SyncState, ProcessMailboxError } from '@open-archiver/types';
2+
import {
3+
IProcessMailboxJob,
4+
SyncState,
5+
ProcessMailboxError,
6+
PendingEmail,
7+
} from '@open-archiver/types';
38
import { IngestionService } from '../../services/IngestionService';
49
import { logger } from '../../config/logger';
510
import { EmailProviderFactory } from '../../services/EmailProviderFactory';
611
import { StorageService } from '../../services/StorageService';
12+
import { IndexingService } from '../../services/IndexingService';
13+
import { SearchService } from '../../services/SearchService';
14+
import { DatabaseService } from '../../services/DatabaseService';
15+
import { config } from '../../config';
16+
717

818
/**
919
* This processor handles the ingestion of emails for a single user's mailbox.
@@ -15,9 +25,16 @@ import { StorageService } from '../../services/StorageService';
1525
*/
1626
export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncState, string>) => {
1727
const { ingestionSourceId, userEmail } = job.data;
28+
const BATCH_SIZE: number = config.meili.indexingBatchSize;
29+
let emailBatch: PendingEmail[] = [];
1830

1931
logger.info({ ingestionSourceId, userEmail }, `Processing mailbox for user`);
2032

33+
const searchService = new SearchService();
34+
const storageService = new StorageService();
35+
const databaseService = new DatabaseService();
36+
const indexingService = new IndexingService(databaseService, searchService, storageService);
37+
2138
try {
2239
const source = await IngestionService.findById(ingestionSourceId);
2340
if (!source) {
@@ -26,22 +43,38 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
2643

2744
const connector = EmailProviderFactory.createConnector(source);
2845
const ingestionService = new IngestionService();
29-
const storageService = new StorageService();
3046

31-
// Pass the sync state for the entire source, the connector will handle per-user logic if necessary
3247
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
3348
if (email) {
34-
await ingestionService.processEmail(email, source, storageService, userEmail);
49+
const processedEmail = await ingestionService.processEmail(
50+
email,
51+
source,
52+
storageService,
53+
userEmail
54+
);
55+
if (processedEmail) {
56+
emailBatch.push(processedEmail);
57+
if (emailBatch.length >= BATCH_SIZE) {
58+
await indexingService.indexEmailBatch(emailBatch);
59+
emailBatch = [];
60+
}
61+
}
3562
}
3663
}
3764

38-
const newSyncState = connector.getUpdatedSyncState(userEmail);
65+
if (emailBatch.length > 0) {
66+
await indexingService.indexEmailBatch(emailBatch);
67+
emailBatch = [];
68+
}
3969

70+
const newSyncState = connector.getUpdatedSyncState(userEmail);
4071
logger.info({ ingestionSourceId, userEmail }, `Finished processing mailbox for user`);
41-
42-
// Return the new sync state to be aggregated by the parent flow
4372
return newSyncState;
4473
} catch (error) {
74+
if (emailBatch.length > 0) {
75+
await indexingService.indexEmailBatch(emailBatch);
76+
}
77+
4578
logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox');
4679
const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred';
4780
const processMailboxError: ProcessMailboxError = {

packages/backend/src/jobs/schedulers/sync-scheduler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const scheduleContinuousSync = async () => {
88
'schedule-continuous-sync',
99
{},
1010
{
11+
jobId: 'schedule-continuous-sync',
1112
repeat: {
1213
pattern: config.app.syncFrequency,
1314
},

0 commit comments

Comments
 (0)