Skip to content

Commit 3434e8d

Browse files
authored
v0.4.2-fix: improve ingestion error handling and error messages (#312)
* fix(backend): improve ingestion error handling and error messages This commit introduces a "force delete" mechanism for Ingestion Sources and improves error messages for file-based connectors. Changes: - Update `IngestionService.delete` to accept a `force` flag, bypassing the `checkDeletionEnabled` check. - Use `force` deletion when rolling back failed ingestion source creations (e.g., decryption errors or connection failures) to ensure cleanup even if deletion is globally disabled. - Enhance error messages in `EMLConnector`, `MboxConnector`, and `PSTConnector` to distinguish between missing local files and failed uploads, providing more specific feedback to the user. * feat(ingestion): optimize duplicate handling and fix race conditions in Google Workspace - Implement fast duplicate check (by Message-ID) to skip full content download for existing emails in Google Workspace and IMAP connectors. - Fix race condition in Google Workspace initial import by capturing `historyId` before listing messages, ensuring no data loss for incoming mail during import.
1 parent 7dac3b2 commit 3434e8d

8 files changed

Lines changed: 127 additions & 23 deletions

File tree

packages/backend/src/jobs/processors/process-mailbox.processor.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,16 @@ export const processMailboxProcessor = async (job: Job<IProcessMailboxJob, SyncS
4343
const connector = EmailProviderFactory.createConnector(source);
4444
const ingestionService = new IngestionService();
4545

46-
for await (const email of connector.fetchEmails(userEmail, source.syncState)) {
46+
// Create a callback to check for duplicates without fetching full email content
47+
const checkDuplicate = async (messageId: string) => {
48+
return await IngestionService.doesEmailExist(messageId, ingestionSourceId);
49+
};
50+
51+
for await (const email of connector.fetchEmails(
52+
userEmail,
53+
source.syncState,
54+
checkDuplicate
55+
)) {
4756
if (email) {
4857
const processedEmail = await ingestionService.processEmail(
4958
email,

packages/backend/src/services/EmailProviderFactory.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ export interface IEmailConnector {
2222
testConnection(): Promise<boolean>;
2323
fetchEmails(
2424
userEmail: string,
25-
syncState?: SyncState | null
25+
syncState?: SyncState | null,
26+
checkDuplicate?: (messageId: string) => Promise<boolean>
2627
): AsyncGenerator<EmailObject | null>;
2728
getUpdatedSyncState(userEmail?: string): SyncState;
2829
listAllUsers(): AsyncGenerator<MailboxUser>;

packages/backend/src/services/IngestionService.ts

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export class IngestionService {
8585

8686
const decryptedSource = this.decryptSource(newSource);
8787
if (!decryptedSource) {
88-
await this.delete(newSource.id, actor, actorIp);
88+
await this.delete(newSource.id, actor, actorIp, true);
8989
throw new Error(
9090
'Failed to process newly created ingestion source due to a decryption error.'
9191
);
@@ -107,7 +107,7 @@ export class IngestionService {
107107
}
108108
} catch (error) {
109109
// If connection fails, delete the newly created source and throw the error.
110-
await this.delete(decryptedSource.id, actor, actorIp);
110+
await this.delete(decryptedSource.id, actor, actorIp, true);
111111
throw error;
112112
}
113113
}
@@ -205,8 +205,15 @@ export class IngestionService {
205205
return decryptedSource;
206206
}
207207

208-
public static async delete(id: string, actor: User, actorIp: string): Promise<IngestionSource> {
209-
checkDeletionEnabled();
208+
public static async delete(
209+
id: string,
210+
actor: User,
211+
actorIp: string,
212+
force: boolean = false
213+
): Promise<IngestionSource> {
214+
if (!force) {
215+
checkDeletionEnabled();
216+
}
210217
const source = await this.findById(id);
211218
if (!source) {
212219
throw new Error('Ingestion source not found');
@@ -383,6 +390,25 @@ export class IngestionService {
383390
}
384391
}
385392

393+
/**
394+
* Quickly checks if an email exists in the database by its Message-ID header.
395+
* This is used to skip downloading duplicate emails during ingestion.
396+
*/
397+
public static async doesEmailExist(
398+
messageId: string,
399+
ingestionSourceId: string
400+
): Promise<boolean> {
401+
const existingEmail = await db.query.archivedEmails.findFirst({
402+
where: and(
403+
eq(archivedEmails.messageIdHeader, messageId),
404+
eq(archivedEmails.ingestionSourceId, ingestionSourceId)
405+
),
406+
columns: { id: true },
407+
});
408+
409+
return !!existingEmail;
410+
}
411+
386412
public async processEmail(
387413
email: EmailObject,
388414
source: IngestionSource,

packages/backend/src/services/ingestion-connectors/EMLConnector.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export class EMLConnector implements IEmailConnector {
5858
try {
5959
const filePath = this.getFilePath();
6060
if (!filePath) {
61-
throw Error('EML file path not provided.');
61+
throw Error('EML Zip file path not provided.');
6262
}
6363
if (!filePath.includes('.zip')) {
6464
throw Error('Provided file is not in the ZIP format.');
@@ -77,12 +77,21 @@ export class EMLConnector implements IEmailConnector {
7777
}
7878

7979
if (!fileExist) {
80-
throw Error('EML file not found or upload not finished yet, please wait.');
80+
if (this.credentials.localFilePath) {
81+
throw Error(`EML Zip file not found at path: ${this.credentials.localFilePath}`);
82+
} else {
83+
throw Error(
84+
'Uploaded EML Zip file not found. The upload may not have finished yet, or it failed.'
85+
);
86+
}
8187
}
8288

8389
return true;
8490
} catch (error) {
85-
logger.error({ error, credentials: this.credentials }, 'EML file validation failed.');
91+
logger.error(
92+
{ error, credentials: this.credentials },
93+
'EML Zip file validation failed.'
94+
);
8695
throw error;
8796
}
8897
}

packages/backend/src/services/ingestion-connectors/GoogleWorkspaceConnector.ts

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
132132
*/
133133
public async *fetchEmails(
134134
userEmail: string,
135-
syncState?: SyncState | null
135+
syncState?: SyncState | null,
136+
checkDuplicate?: (messageId: string) => Promise<boolean>
136137
): AsyncGenerator<EmailObject> {
137138
const authClient = this.getAuthClient(userEmail, [
138139
'https://www.googleapis.com/auth/gmail.readonly',
@@ -144,7 +145,7 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
144145

145146
// If no sync state is provided for this user, this is an initial import. Get all messages.
146147
if (!startHistoryId) {
147-
yield* this.fetchAllMessagesForUser(gmail, userEmail);
148+
yield* this.fetchAllMessagesForUser(gmail, userEmail, checkDuplicate);
148149
return;
149150
}
150151

@@ -170,6 +171,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
170171
if (messageAdded.message?.id) {
171172
try {
172173
const messageId = messageAdded.message.id;
174+
175+
// Optimization: Check for existence before fetching full content
176+
if (checkDuplicate && (await checkDuplicate(messageId))) {
177+
logger.debug(
178+
{ messageId, userEmail },
179+
'Skipping duplicate email (pre-check)'
180+
);
181+
continue;
182+
}
183+
173184
const metadataResponse = await gmail.users.messages.get({
174185
userId: userEmail,
175186
id: messageId,
@@ -258,8 +269,17 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
258269

259270
private async *fetchAllMessagesForUser(
260271
gmail: gmail_v1.Gmail,
261-
userEmail: string
272+
userEmail: string,
273+
checkDuplicate?: (messageId: string) => Promise<boolean>
262274
): AsyncGenerator<EmailObject> {
275+
// Capture the history ID at the start to ensure no emails are missed during the import process.
276+
// Any emails arriving during this import will be covered by the next sync starting from this point.
277+
// Overlaps are handled by the duplicate check.
278+
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
279+
if (profileResponse.data.historyId) {
280+
this.newHistoryId = profileResponse.data.historyId;
281+
}
282+
263283
let pageToken: string | undefined = undefined;
264284
do {
265285
const listResponse: Common.GaxiosResponseWithHTTP2<gmail_v1.Schema$ListMessagesResponse> =
@@ -277,6 +297,16 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
277297
if (message.id) {
278298
try {
279299
const messageId = message.id;
300+
301+
// Optimization: Check for existence before fetching full content
302+
if (checkDuplicate && (await checkDuplicate(messageId))) {
303+
logger.debug(
304+
{ messageId, userEmail },
305+
'Skipping duplicate email (pre-check)'
306+
);
307+
continue;
308+
}
309+
280310
const metadataResponse = await gmail.users.messages.get({
281311
userId: userEmail,
282312
id: messageId,
@@ -352,12 +382,6 @@ export class GoogleWorkspaceConnector implements IEmailConnector {
352382
}
353383
pageToken = listResponse.data.nextPageToken ?? undefined;
354384
} while (pageToken);
355-
356-
// After fetching all messages, get the latest history ID to use as the starting point for the next sync.
357-
const profileResponse = await gmail.users.getProfile({ userId: userEmail });
358-
if (profileResponse.data.historyId) {
359-
this.newHistoryId = profileResponse.data.historyId;
360-
}
361385
}
362386

363387
public getUpdatedSyncState(userEmail: string): SyncState {

packages/backend/src/services/ingestion-connectors/ImapConnector.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ export class ImapConnector implements IEmailConnector {
142142

143143
public async *fetchEmails(
144144
userEmail: string,
145-
syncState?: SyncState | null
145+
syncState?: SyncState | null,
146+
checkDuplicate?: (messageId: string) => Promise<boolean>
146147
): AsyncGenerator<EmailObject | null> {
147148
try {
148149
// list all mailboxes first
@@ -218,6 +219,22 @@ export class ImapConnector implements IEmailConnector {
218219
this.newMaxUids[mailboxPath] = msg.uid;
219220
}
220221

222+
// Optimization: Verify existence using Message-ID from envelope before fetching full body
223+
if (checkDuplicate && msg.envelope?.messageId) {
224+
const isDuplicate = await checkDuplicate(msg.envelope.messageId);
225+
if (isDuplicate) {
226+
logger.debug(
227+
{
228+
mailboxPath,
229+
uid: msg.uid,
230+
messageId: msg.envelope.messageId,
231+
},
232+
'Skipping duplicate email (pre-check)'
233+
);
234+
continue;
235+
}
236+
}
237+
221238
logger.debug({ mailboxPath, uid: msg.uid }, 'Processing message');
222239

223240
if (msg.envelope && msg.source) {

packages/backend/src/services/ingestion-connectors/MboxConnector.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,21 @@ export class MboxConnector implements IEmailConnector {
8282
}
8383

8484
if (!fileExist) {
85-
throw Error('Mbox file not found or upload not finished yet, please wait.');
85+
if (this.credentials.localFilePath) {
86+
throw Error(`Mbox file not found at path: ${this.credentials.localFilePath}`);
87+
} else {
88+
throw Error(
89+
'Uploaded Mbox file not found. The upload may not have finished yet, or it failed.'
90+
);
91+
}
8692
}
8793

8894
return true;
8995
} catch (error) {
90-
logger.error({ error, credentials: this.credentials }, 'Mbox file validation failed.');
96+
logger.error(
97+
{ error, credentials: this.credentials },
98+
'Mbox file validation failed.'
99+
);
91100
throw error;
92101
}
93102
}

packages/backend/src/services/ingestion-connectors/PSTConnector.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,20 @@ export class PSTConnector implements IEmailConnector {
161161
}
162162

163163
if (!fileExist) {
164-
throw Error('PST file not found or upload not finished yet, please wait.');
164+
if (this.credentials.localFilePath) {
165+
throw Error(`PST file not found at path: ${this.credentials.localFilePath}`);
166+
} else {
167+
throw Error(
168+
'Uploaded PST file not found. The upload may not have finished yet, or it failed.'
169+
);
170+
}
165171
}
166172
return true;
167173
} catch (error) {
168-
logger.error({ error, credentials: this.credentials }, 'PST file validation failed.');
174+
logger.error(
175+
{ error, credentials: this.credentials },
176+
'PST file validation failed.'
177+
);
169178
throw error;
170179
}
171180
}

0 commit comments

Comments
 (0)