Skip to content

Commit 42b0f6e

Browse files
authored
V0.4.0 fix (#204)
* Jobs page responsive fix * feat(ingestion): Refactor email indexing into a dedicated background job This commit refactors the email indexing process to improve the performance and reliability of the ingestion pipeline. Previously, email indexing was performed synchronously within the mailbox processing job. This could lead to timeouts and failed ingestion cycles if the indexing step was slow or encountered errors. To address this, the indexing logic has been moved into a separate, dedicated background job queue (`indexingQueue`). Now, the mailbox processor simply adds a batch of emails to this queue. A separate worker then processes the indexing job asynchronously. This decoupling makes the ingestion process more robust: - It prevents slow indexing from blocking or failing the entire mailbox sync. - It allows for better resource management and scalability by handling indexing in a dedicated process. - It improves error handling, as a failed indexing job can be retried independently without affecting the main ingestion flow. Additionally, this commit includes minor documentation updates and removes a premature timeout in the PDF text extraction helper that was causing issues. --------- Co-authored-by: Wayne <5291640+ringoinca@users.noreply.github.com>
1 parent 6e1ebbb commit 42b0f6e

15 files changed

Lines changed: 286 additions & 94 deletions

File tree

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
[![Redis](https://img.shields.io/badge/Redis-DC382D?style=for-the-badge&logo=redis&logoColor=white)](https://redis.io)
88
[![SvelteKit](https://img.shields.io/badge/SvelteKit-FF3E00?style=for-the-badge&logo=svelte&logoColor=white)](https://svelte.dev/)
99

10-
**A secure, sovereign, and open-source platform for email archiving and eDiscovery.**
10+
**A secure, sovereign, and open-source platform for email archiving.**
1111

1212
Open Archiver provides a robust, self-hosted solution for archiving, storing, indexing, and searching emails from major platforms, including Google Workspace (Gmail), Microsoft 365, PST files, as well as generic IMAP-enabled email inboxes. Use Open Archiver to keep a permanent, tamper-proof record of your communication history, free from vendor lock-in.
1313

@@ -48,13 +48,13 @@ Password: openarchiver_demo
4848
- Zipped .eml files
4949
- Mbox files
5050

51-
- **Secure & Efficient Storage**: Emails are stored in the standard `.eml` format. The system uses deduplication and compression to minimize storage costs. All data is encrypted at rest.
51+
- **Secure & Efficient Storage**: Emails are stored in the standard `.eml` format. The system uses deduplication and compression to minimize storage costs. All files are encrypted at rest.
5252
- **Pluggable Storage Backends**: Support both local filesystem storage and S3-compatible object storage (like AWS S3 or MinIO).
5353
- **Powerful Search & eDiscovery**: A high-performance search engine indexes the full text of emails and attachments (PDF, DOCX, etc.).
5454
- **Thread discovery**: The ability to discover if an email belongs to a thread/conversation and present the context.
5555
- **Compliance & Retention**: Define granular retention policies to automatically manage the lifecycle of your data. Place legal holds on communications to prevent deletion during litigation (TBD).
5656
- **File Hash and Encryption**: Email and attachment file hash values are stored in the meta database upon ingestion, meaning any attempt to alter the file content will be identified, ensuring legal and regulatory compliance.
57-
- **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when (TBD).
57+
- **Comprehensive Auditing**: An immutable audit trail logs all system activities, ensuring you have a clear record of who accessed what and when.
5858

5959
## 🛠️ Tech Stack
6060

assets/screenshots/job-queue.png

259 KB
Loading

docs/user-guides/integrity-check.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Email Integrity Check
1+
# Integrity Check
22

33
Open Archiver allows you to verify the integrity of your archived emails and their attachments. This guide explains how the integrity check works and what the results mean.
44

packages/backend/src/helpers/textExtractor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ function extractTextFromPdf(buffer: Buffer): Promise<string> {
4747
}
4848

4949
// reduced Timeout for better performance
50-
setTimeout(() => {
51-
logger.warn('PDF parsing timed out');
52-
finish('');
53-
}, 5000);
50+
// setTimeout(() => {
51+
// logger.warn('PDF parsing timed out');
52+
// finish('');
53+
// }, 5000);
5454
});
5555
}
5656

packages/backend/src/jobs/processors/process-mailbox.processor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
3333
const searchService = new SearchService();
3434
const storageService = new StorageService();
3535
const databaseService = new DatabaseService();
36-
const indexingService = new IndexingService(databaseService, searchService, storageService);
3736

3837
try {
3938
const source = await IngestionService.findById(ingestionSourceId);
@@ -72,7 +71,8 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
7271
return newSyncState;
7372
} catch (error) {
7473
if (emailBatch.length > 0) {
75-
await indexingService.indexEmailBatch(emailBatch);
74+
await indexingQueue.add('index-email-batch', { emails: emailBatch });
75+
emailBatch = [];
7676
}
7777

7878
logger.error({ err: error, ingestionSourceId, userEmail }, 'Error processing mailbox');

packages/backend/src/jobs/processors/sync-cycle-finished.processor.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ export default async (job: Job<ISyncCycleFinishedJob, any, string>) => {
4949
// if data doesn't have error property, it is a successful job with SyncState
5050
const successfulJobs = allChildJobs.filter((v) => !v || !(v as any).error) as SyncState[];
5151

52-
const finalSyncState = deepmerge(
53-
...successfulJobs.filter((s) => s && Object.keys(s).length > 0)
54-
);
52+
const finalSyncState =
53+
successfulJobs.length > 0
54+
? deepmerge(...successfulJobs.filter((s) => s && Object.keys(s).length > 0))
55+
: {};
5556

5657
const source = await IngestionService.findById(ingestionSourceId);
5758
let status: IngestionStatus = 'active';
@@ -63,7 +64,9 @@ export default async (job: Job<ISyncCycleFinishedJob, any, string>) => {
6364
let message: string;
6465

6566
// Check for a specific rate-limit message from the successful jobs
66-
const rateLimitMessage = successfulJobs.find((j) => j.statusMessage)?.statusMessage;
67+
const rateLimitMessage = successfulJobs.find(
68+
(j) => j.statusMessage && j.statusMessage.includes('rate limit')
69+
)?.statusMessage;
6770

6871
if (failedJobs.length > 0) {
6972
status = 'error';

packages/backend/src/services/IndexingService.ts

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,17 @@ export class IndexingService {
9393
const batch = emails.slice(i, i + CONCURRENCY_LIMIT);
9494

9595
const batchDocuments = await Promise.allSettled(
96-
batch.map(async ({ email, sourceId, archivedId }) => {
96+
batch.map(async (pendingEmail) => {
9797
try {
98-
return await this.createEmailDocumentFromRawForBatch(
99-
email,
100-
sourceId,
101-
archivedId,
102-
email.userEmail || ''
103-
);
98+
const document = await this.indexEmailById(pendingEmail.archivedEmailId);
99+
if (document) {
100+
return document;
101+
}
102+
return null;
104103
} catch (error) {
105104
logger.error(
106105
{
107-
emailId: archivedId,
108-
sourceId,
109-
userEmail: email.userEmail || '',
110-
rawEmailData: JSON.stringify(email, null, 2),
106+
emailId: pendingEmail.archivedEmailId,
111107
error: error instanceof Error ? error.message : String(error),
112108
},
113109
'Failed to create document for email in batch'
@@ -118,10 +114,12 @@ export class IndexingService {
118114
);
119115

120116
for (const result of batchDocuments) {
121-
if (result.status === 'fulfilled') {
117+
if (result.status === 'fulfilled' && result.value) {
122118
rawDocuments.push(result.value);
123-
} else {
119+
} else if (result.status === 'rejected') {
124120
logger.error({ error: result.reason }, 'Failed to process email in batch');
121+
} else {
122+
logger.error({ result: result }, 'Failed to process email in batch, reason unknown.');
125123
}
126124
}
127125
}
@@ -195,10 +193,7 @@ export class IndexingService {
195193
}
196194
}
197195

198-
/**
199-
* @deprecated
200-
*/
201-
private async indexEmailById(emailId: string): Promise<void> {
196+
private async indexEmailById(emailId: string): Promise<EmailDocument | null> {
202197
const email = await this.dbService.db.query.archivedEmails.findFirst({
203198
where: eq(archivedEmails.id, emailId),
204199
});
@@ -228,13 +223,13 @@ export class IndexingService {
228223
emailAttachmentsResult,
229224
email.userEmail
230225
);
231-
await this.searchService.addDocuments('emails', [document], 'id');
226+
return document;
232227
}
233228

234229
/**
235230
* @deprecated
236231
*/
237-
private async indexByEmail(pendingEmail: PendingEmail): Promise<void> {
232+
/* private async indexByEmail(pendingEmail: PendingEmail): Promise<void> {
238233
const attachments: AttachmentsType = [];
239234
if (pendingEmail.email.attachments && pendingEmail.email.attachments.length > 0) {
240235
for (const attachment of pendingEmail.email.attachments) {
@@ -254,12 +249,12 @@ export class IndexingService {
254249
);
255250
// console.log(document);
256251
await this.searchService.addDocuments('emails', [document], 'id');
257-
}
252+
} */
258253

259254
/**
260255
* Creates a search document from a raw email object and its attachments.
261256
*/
262-
private async createEmailDocumentFromRawForBatch(
257+
/* private async createEmailDocumentFromRawForBatch(
263258
email: EmailObject,
264259
ingestionSourceId: string,
265260
archivedEmailId: string,
@@ -333,7 +328,7 @@ export class IndexingService {
333328
timestamp: new Date(email.receivedAt).getTime(),
334329
ingestionSourceId: ingestionSourceId,
335330
};
336-
}
331+
} */
337332

338333
private async createEmailDocumentFromRaw(
339334
email: EmailObject,

packages/backend/src/services/IngestionService.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ export class IngestionService {
186186
(key) =>
187187
key !== 'providerConfig' &&
188188
originalSource[key as keyof IngestionSource] !==
189-
decryptedSource[key as keyof IngestionSource]
189+
decryptedSource[key as keyof IngestionSource]
190190
);
191191
if (changedFields.length > 0) {
192192
await this.auditService.createAuditLog({
@@ -518,12 +518,8 @@ export class IngestionService {
518518
}
519519
}
520520

521-
email.userEmail = userEmail;
522-
523521
return {
524-
email,
525-
sourceId: source.id,
526-
archivedId: archivedEmail.id,
522+
archivedEmailId: archivedEmail.id,
527523
};
528524
} catch (error) {
529525
logger.error({

packages/backend/src/services/StorageService.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,79 @@ export class StorageService implements IStorageProvider {
8181
return Readable.from(decryptedContent);
8282
}
8383

84+
public async getStream(path: string): Promise<NodeJS.ReadableStream> {
85+
const stream = await this.provider.get(path);
86+
if (!this.encryptionKey) {
87+
return stream;
88+
}
89+
90+
// For encrypted files, we need to read the prefix and IV first.
91+
// This part still buffers a small, fixed amount of data, which is acceptable.
92+
const prefixAndIvBuffer = await new Promise<Buffer>((resolve, reject) => {
93+
const chunks: Buffer[] = [];
94+
let totalLength = 0;
95+
const targetLength = ENCRYPTION_PREFIX.length + 16;
96+
97+
const onData = (chunk: Buffer) => {
98+
chunks.push(chunk);
99+
totalLength += chunk.length;
100+
if (totalLength >= targetLength) {
101+
stream.removeListener('data', onData);
102+
resolve(Buffer.concat(chunks));
103+
}
104+
};
105+
106+
stream.on('data', onData);
107+
stream.on('error', reject);
108+
stream.on('end', () => {
109+
// Handle cases where the file is smaller than the prefix + IV
110+
if (totalLength < targetLength) {
111+
resolve(Buffer.concat(chunks));
112+
}
113+
});
114+
});
115+
116+
const prefix = prefixAndIvBuffer.subarray(0, ENCRYPTION_PREFIX.length);
117+
if (!prefix.equals(ENCRYPTION_PREFIX)) {
118+
// File is not encrypted, return a new stream containing the buffered prefix and the rest of the original stream
119+
const combinedStream = new Readable({
120+
read() { },
121+
});
122+
combinedStream.push(prefixAndIvBuffer);
123+
stream.on('data', (chunk) => {
124+
combinedStream.push(chunk);
125+
});
126+
stream.on('end', () => {
127+
combinedStream.push(null); // No more data
128+
});
129+
stream.on('error', (err) => {
130+
combinedStream.emit('error', err);
131+
});
132+
return combinedStream;
133+
}
134+
135+
try {
136+
const iv = prefixAndIvBuffer.subarray(
137+
ENCRYPTION_PREFIX.length,
138+
ENCRYPTION_PREFIX.length + 16
139+
);
140+
const decipher = createDecipheriv(this.algorithm, this.encryptionKey, iv);
141+
142+
// Push the remaining part of the initial buffer to the decipher
143+
const remainingBuffer = prefixAndIvBuffer.subarray(ENCRYPTION_PREFIX.length + 16);
144+
if (remainingBuffer.length > 0) {
145+
decipher.write(remainingBuffer);
146+
}
147+
148+
// Pipe the rest of the stream
149+
stream.pipe(decipher);
150+
151+
return decipher;
152+
} catch (error) {
153+
throw new Error('Failed to decrypt file. It may be corrupted or the key is incorrect.');
154+
}
155+
}
156+
84157
delete(path: string): Promise<void> {
85158
return this.provider.delete(path);
86159
}

packages/backend/src/services/ingestion-connectors/ImapConnector.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ export class ImapConnector implements IEmailConnector {
131131
} catch (err: any) {
132132
logger.error({ err, attempt }, `IMAP operation failed on attempt ${attempt}`);
133133
this.isConnected = false; // Force reconnect on next attempt
134+
this.client = this.createClient(); // Create a new client instance for the next retry
134135
if (attempt === maxRetries) {
135136
logger.error({ err }, 'IMAP operation failed after all retries.');
136137
throw err;

0 commit comments

Comments
 (0)