Skip to content

Commit 4f54f62

Browse files
author
marcus
committed
fix
1 parent 6888b1f commit 4f54f62

2 files changed

Lines changed: 86 additions & 32 deletions

File tree

src/server/integrations/azureDevops.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ function pickString(value: unknown): string | null {
2626
return next.length > 0 ? next : null;
2727
}
2828

29+
function isLikelyEmail(value: string): boolean {
30+
return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value);
31+
}
32+
2933
function collectAzureIdentityCandidates(data: any): string[] {
3034
const candidates: string[] = [];
3135
const user = data?.authenticatedUser ?? {};
@@ -37,19 +41,21 @@ function collectAzureIdentityCandidates(data: any): string[] {
3741
};
3842

3943
push(user?.uniqueName);
40-
push(user?.providerDisplayName);
41-
push(user?.customDisplayName);
44+
// Display names are often not valid commit-author identifiers for Azure search,
45+
// and can explode query volume if used as author filter.
4246

4347
// Try common property keys where Azure may return account/email-like values.
44-
const propertyKeys = ["Mail", "Email", "Account", "account", "SignInAddress", "IdentityType"];
48+
const propertyKeys = ["Mail", "Email", "Account", "account", "SignInAddress"];
4549
for (const key of propertyKeys) {
4650
const candidate = props?.[key];
4751
push(candidate?.$value);
4852
push(candidate?.value);
4953
push(candidate);
5054
}
5155

52-
return candidates;
56+
return candidates
57+
.map((item) => item.trim().toLowerCase())
58+
.filter((item) => isLikelyEmail(item));
5359
}
5460

5561
async function fetchAzureIdentityAuthors(baseUrl: string, headers: Record<string, string>): Promise<string[]> {
@@ -103,7 +109,11 @@ export async function fetchAzureDailyMetrics(params: {
103109

104110
const identityCandidates = await fetchAzureIdentityAuthors(baseUrl, headers);
105111
const authorEmails = Array.from(
106-
new Set([params.fallbackEmail, ...params.authorEmails, ...identityCandidates].map((item) => item.trim().toLowerCase()).filter(Boolean)),
112+
new Set(
113+
[params.fallbackEmail, ...params.authorEmails, ...identityCandidates]
114+
.map((item) => item.trim().toLowerCase())
115+
.filter((item) => isLikelyEmail(item)),
116+
),
107117
);
108118
const events: EventMetric[] = [];
109119

src/server/queue/queue.ts

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,37 @@ const SyncJobStatus = {
4242
FAILED: "FAILED",
4343
} as const;
4444

45+
function buildDefaultSyncWindow(lastSyncedAt?: Date | null): { from: Date; to: Date } {
46+
const now = new Date();
47+
if (!lastSyncedAt) {
48+
const from = new Date(now);
49+
from.setUTCDate(from.getUTCDate() - 90);
50+
return { from, to: now };
51+
}
52+
53+
const from = new Date(lastSyncedAt);
54+
from.setUTCDate(from.getUTCDate() - 1);
55+
return { from, to: now };
56+
}
57+
58+
function splitDateRangeIntoChunks(from: Date, to: Date, chunkDays: number): Array<{ from: Date; to: Date }> {
59+
if (from > to) return [];
60+
const chunks: Array<{ from: Date; to: Date }> = [];
61+
let cursor = new Date(from);
62+
63+
while (cursor <= to) {
64+
const chunkStart = new Date(cursor);
65+
const chunkEnd = new Date(cursor);
66+
chunkEnd.setUTCDate(chunkEnd.getUTCDate() + chunkDays - 1);
67+
if (chunkEnd > to) chunkEnd.setTime(to.getTime());
68+
chunks.push({ from: chunkStart, to: chunkEnd });
69+
cursor = new Date(chunkEnd);
70+
cursor.setUTCDate(cursor.getUTCDate() + 1);
71+
}
72+
73+
return chunks;
74+
}
75+
4576
function isBackfillJob(jobData: unknown): jobData is { userId: string; options: SyncJobOptions } {
4677
if (!jobData || typeof jobData !== "object") return false;
4778
const candidate = jobData as { userId?: unknown; options?: SyncJobOptions };
@@ -50,33 +81,15 @@ function isBackfillJob(jobData: unknown): jobData is { userId: string; options:
5081

5182
export async function enqueueUserSync(userId: string, options?: SyncJobOptions) {
5283
if (backend === "supabase") {
53-
if (options?.provider) {
54-
await (prisma as any).syncJob.create({
55-
data: {
56-
userId,
57-
provider: options.provider,
58-
from: options?.from ? new Date(options.from) : null,
59-
to: options?.to ? new Date(options.to) : null,
60-
backfillYear: options?.backfillYear ?? null,
61-
status: SyncJobStatus.QUEUED,
62-
},
63-
});
64-
return;
65-
}
66-
67-
// Fan out "all providers" into one job per connected integration provider to keep
68-
// each serverless execution small enough for runtime limits.
6984
const integrations = await prisma.integration.findMany({
70-
where: { userId },
71-
select: { provider: true },
85+
where: options?.provider ? { userId, provider: options.provider } : { userId },
86+
select: { provider: true, lastSyncedAt: true },
7287
});
73-
const providers = Array.from(new Set(integrations.map((item) => item.provider)));
74-
75-
if (providers.length === 0) {
88+
if (integrations.length === 0) {
7689
await (prisma as any).syncJob.create({
7790
data: {
7891
userId,
79-
provider: null,
92+
provider: options?.provider ?? null,
8093
from: options?.from ? new Date(options.from) : null,
8194
to: options?.to ? new Date(options.to) : null,
8295
backfillYear: options?.backfillYear ?? null,
@@ -85,17 +98,48 @@ export async function enqueueUserSync(userId: string, options?: SyncJobOptions)
8598
});
8699
return;
87100
}
101+
const rows: Array<{
102+
userId: string;
103+
provider: SyncProvider;
104+
from: Date | null;
105+
to: Date | null;
106+
backfillYear: number | null;
107+
status: keyof typeof SyncJobStatus;
108+
}> = [];
109+
110+
for (const integration of integrations) {
111+
// Azure sync tends to exceed serverless runtime for larger orgs.
112+
// Split it into smaller date chunks so each process run stays short.
113+
if (integration.provider === "AZURE_DEVOPS") {
114+
const window =
115+
options?.from && options?.to
116+
? { from: new Date(options.from), to: new Date(options.to) }
117+
: buildDefaultSyncWindow(integration.lastSyncedAt);
118+
const chunks = splitDateRangeIntoChunks(window.from, window.to, 14);
119+
for (const chunk of chunks) {
120+
rows.push({
121+
userId,
122+
provider: integration.provider,
123+
from: chunk.from,
124+
to: chunk.to,
125+
backfillYear: options?.backfillYear ?? null,
126+
status: SyncJobStatus.QUEUED,
127+
});
128+
}
129+
continue;
130+
}
88131

89-
await (prisma as any).syncJob.createMany({
90-
data: providers.map((provider) => ({
132+
rows.push({
91133
userId,
92-
provider,
134+
provider: integration.provider,
93135
from: options?.from ? new Date(options.from) : null,
94136
to: options?.to ? new Date(options.to) : null,
95137
backfillYear: options?.backfillYear ?? null,
96138
status: SyncJobStatus.QUEUED,
97-
})),
98-
});
139+
});
140+
}
141+
142+
await (prisma as any).syncJob.createMany({ data: rows });
99143
return;
100144
}
101145

0 commit comments

Comments
 (0)