Skip to content

Commit 6a154a8

Browse files
committed
Handle sync error: remove failed jobs, force sync
1 parent ac4dae0 commit 6a154a8

5 files changed

Lines changed: 50 additions & 21 deletions

File tree

docs/.vitepress/config.mts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ export default defineConfig({
2323
nav: [
2424
{ text: 'Home', link: '/' },
2525
{ text: 'Github', link: 'https://github.com/LogicLabs-OU/OpenArchiver' },
26-
{ text: "Website", link: 'https://openarchiver.com/' }
26+
{ text: "Website", link: 'https://openarchiver.com/' },
27+
{ text: "Discord", link: 'https://discord.gg/Qpv4BmHp' }
2728
],
2829
sidebar: [
2930
{

packages/backend/src/jobs/processors/continuous-sync.processor.ts

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,6 @@ export default async (job: Job<IContinuousSyncJob>) => {
2424

2525
try {
2626
const jobs = [];
27-
// if (!connector.listAllUsers) {
28-
// // This is for single-mailbox providers like Generic IMAP
29-
// let userEmail = 'Default';
30-
// if (connector instanceof ImapConnector) {
31-
// userEmail = connector.returnImapUserEmail();
32-
// }
33-
// jobs.push({
34-
// name: 'process-mailbox',
35-
// queueName: 'ingestion',
36-
// data: {
37-
// ingestionSourceId: source.id,
38-
// userEmail: userEmail
39-
// }
40-
// });
41-
// } else {
42-
// For multi-mailbox providers like Google Workspace and M365
4327
for await (const user of connector.listAllUsers()) {
4428
if (user.primaryEmail) {
4529
jobs.push({
@@ -48,6 +32,14 @@ export default async (job: Job<IContinuousSyncJob>) => {
4832
data: {
4933
ingestionSourceId: source.id,
5034
userEmail: user.primaryEmail
35+
},
36+
opts: {
37+
removeOnComplete: {
38+
age: 60 * 10 // 10 minutes
39+
},
40+
removeOnFail: {
41+
age: 60 * 30 // 30 minutes
42+
}
5143
}
5244
});
5345
}
@@ -62,7 +54,11 @@ export default async (job: Job<IContinuousSyncJob>) => {
6254
ingestionSourceId,
6355
isInitialImport: false
6456
},
65-
children: jobs
57+
children: jobs,
58+
opts: {
59+
removeOnComplete: true,
60+
removeOnFail: true
61+
}
6662
});
6763
}
6864

packages/backend/src/jobs/processors/initial-import.processor.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ export default async (job: Job<IInitialImportJob>) => {
3333
data: {
3434
ingestionSourceId,
3535
userEmail: user.primaryEmail,
36+
},
37+
opts: {
38+
removeOnComplete: {
39+
age: 60 * 10 // 10 minutes
40+
},
41+
removeOnFail: {
42+
age: 60 * 30 // 30 minutes
43+
}
3644
}
3745
});
3846
userCount++;
@@ -49,7 +57,11 @@ export default async (job: Job<IInitialImportJob>) => {
4957
userCount,
5058
isInitialImport: true
5159
},
52-
children: jobs
60+
children: jobs,
61+
opts: {
62+
removeOnComplete: true,
63+
removeOnFail: true
64+
}
5365
});
5466
} else {
5567
// If there are no users, we can consider the import finished and set to active

packages/backend/src/services/IngestionService.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { and, eq } from 'drizzle-orm';
1010
import { CryptoService } from './CryptoService';
1111
import { EmailProviderFactory } from './EmailProviderFactory';
1212
import { ingestionQueue } from '../jobs/queues';
13+
import type { JobType } from 'bullmq';
1314
import { StorageService } from './StorageService';
1415
import type { IInitialImportJob, EmailObject } from '@open-archiver/types';
1516
import { archivedEmails, attachments as attachmentsSchema, emailAttachments } from '../database/schema';
@@ -142,11 +143,29 @@ export class IngestionService {
142143

143144
public static async triggerForceSync(id: string): Promise<void> {
144145
const source = await this.findById(id);
145-
146+
logger.info({ ingestionSourceId: id }, 'Force syncing started.');
146147
if (!source) {
147148
throw new Error('Ingestion source not found');
148149
}
149150

151+
// Clean up existing jobs for this source to break any stuck flows
152+
const jobTypes: JobType[] = ['active', 'waiting', 'failed', 'delayed', 'paused'];
153+
const jobs = await ingestionQueue.getJobs(jobTypes);
154+
for (const job of jobs) {
155+
if (job.data.ingestionSourceId === id) {
156+
try {
157+
await job.remove();
158+
logger.info({ jobId: job.id, ingestionSourceId: id }, 'Removed stale job during force sync.');
159+
} catch (error) {
160+
logger.error({ err: error, jobId: job.id }, 'Failed to remove stale job.');
161+
}
162+
}
163+
}
164+
165+
// Reset status to 'active'
166+
await this.update(id, { status: 'active', lastSyncStatusMessage: 'Force sync triggered by user.' });
167+
168+
150169
await ingestionQueue.add('continuous-sync', { ingestionSourceId: source.id });
151170
}
152171

packages/frontend/src/routes/dashboard/ingestions/+page.svelte

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@
179179
<DropdownMenu.Item onclick={() => openEditDialog(source)}
180180
>Edit</DropdownMenu.Item
181181
>
182-
<DropdownMenu.Item onclick={() => handleSync(source.id)}>Sync</DropdownMenu.Item
182+
<DropdownMenu.Item onclick={() => handleSync(source.id)}
183+
>Force sync</DropdownMenu.Item
183184
>
184185
<DropdownMenu.Separator />
185186
<DropdownMenu.Item class="text-red-600" onclick={() => openDeleteDialog(source)}

0 commit comments

Comments
 (0)