Skip to content

Commit 3e15c35

Browse files
authored
fix: batch fetch repositories for integrations (#3889)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 720b1ca commit 3e15c35

10 files changed

Lines changed: 158 additions & 51 deletions

File tree

backend/src/database/repositories/integrationRepository.ts

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import {
99
fetchGlobalIntegrationsStatusCount,
1010
fetchGlobalNotConnectedIntegrations,
1111
fetchGlobalNotConnectedIntegrationsCount,
12-
getNangoMappingsForIntegration,
12+
getNangoMappingsForIntegrations,
1313
} from '@crowd/data-access-layer/src/integrations'
14-
import { SequelizeQueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
15-
import { getReposGroupedByOrg } from '@crowd/data-access-layer/src/repositories'
14+
import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
15+
import { getReposGroupedByOrgForIntegrations } from '@crowd/data-access-layer/src/repositories'
1616
import { IntegrationRunState, PlatformType } from '@crowd/types'
1717

1818
import SequelizeFilterUtils from '../utils/sequelizeFilterUtils'
@@ -172,7 +172,7 @@ class IntegrationRepository {
172172
throw new Error404()
173173
}
174174

175-
return this._populateRelations(record)
175+
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
176176
}
177177

178178
static async findActiveIntegrationByPlatform(platform: PlatformType) {
@@ -188,7 +188,7 @@ class IntegrationRepository {
188188
throw new Error404()
189189
}
190190

191-
return this._populateRelations(record)
191+
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
192192
}
193193

194194
/**
@@ -213,7 +213,7 @@ class IntegrationRepository {
213213
throw new Error404()
214214
}
215215

216-
return Promise.all(records.map((record) => this._populateRelations(record)))
216+
return this._populateRelationsForRows(records, SequelizeRepository.getQueryExecutor(options))
217217
}
218218

219219
static async findByStatus(
@@ -263,7 +263,7 @@ class IntegrationRepository {
263263
throw new Error404()
264264
}
265265

266-
return this._populateRelations(record)
266+
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
267267
}
268268

269269
static async findById(id, options: IRepositoryOptions) {
@@ -283,7 +283,7 @@ class IntegrationRepository {
283283
throw new Error404()
284284
}
285285

286-
return this._populateRelations(record)
286+
return this._populateRelations(record, SequelizeRepository.getQueryExecutor(options))
287287
}
288288

289289
static async count(filter, options: IRepositoryOptions) {
@@ -474,7 +474,7 @@ class IntegrationRepository {
474474
transaction: SequelizeRepository.getTransaction(options),
475475
})
476476

477-
rows = await this._populateRelationsForRows(rows)
477+
rows = await this._populateRelationsForRows(rows, SequelizeRepository.getQueryExecutor(options))
478478

479479
// Some integrations (i.e GitHub, Discord, Discourse, Groupsio) receive new data via webhook post-onboarding.
480480
// We track their last processedAt separately, and not using updatedAt.
@@ -573,26 +573,80 @@ class IntegrationRepository {
573573
}))
574574
}
575575

576-
static async _populateRelationsForRows(rows) {
576+
static async _populateRelationsForRows(rows, qx: QueryExecutor) {
577577
if (!rows) {
578578
return rows
579579
}
580580

581-
return Promise.all(rows.map((record) => this._populateRelations(record)))
581+
const records = rows.map((record) => record.get({ plain: true }))
582+
583+
const nangoIntegrationIds = records
584+
.filter((r) => r.platform === PlatformType.GITHUB_NANGO)
585+
.map((r) => r.id)
586+
587+
const githubIntegrationIds = records
588+
.filter(
589+
(r) =>
590+
(r.platform === PlatformType.GITHUB || r.platform === PlatformType.GITHUB_NANGO) &&
591+
r.settings?.orgs?.length > 0,
592+
)
593+
.map((r) => r.id)
594+
595+
const [allNangoMappings, allReposByOrg] = await Promise.all([
596+
getNangoMappingsForIntegrations(qx, nangoIntegrationIds),
597+
getReposGroupedByOrgForIntegrations(qx, githubIntegrationIds),
598+
])
599+
600+
return records.map((output) => {
601+
if (output.platform === PlatformType.GITHUB_NANGO) {
602+
const nangoMapping = allNangoMappings[output.id]
603+
if (nangoMapping && Object.keys(nangoMapping).length > 0) {
604+
output.settings = { ...output.settings, nangoMapping }
605+
}
606+
}
607+
608+
if (
609+
(output.platform === PlatformType.GITHUB ||
610+
output.platform === PlatformType.GITHUB_NANGO) &&
611+
output.settings?.orgs?.length > 0
612+
) {
613+
const reposByOrg = allReposByOrg[output.id]
614+
615+
if (reposByOrg && Object.keys(reposByOrg).length > 0) {
616+
output.settings = {
617+
...output.settings,
618+
orgs: output.settings.orgs.map((org) => ({
619+
...org,
620+
repos: (reposByOrg[org.name] || []).map((r) => ({
621+
url: r.url,
622+
name: r.name,
623+
owner: r.owner,
624+
forkedFrom: r.forkedFrom,
625+
updatedAt: r.updatedAt,
626+
})),
627+
})),
628+
}
629+
}
630+
631+
delete output.settings.repos
632+
delete output.settings.unavailableRepos
633+
}
634+
635+
return output
636+
})
582637
}
583638

584-
static async _populateRelations(record) {
639+
static async _populateRelations(record, qx: QueryExecutor) {
585640
if (!record) {
586641
return record
587642
}
588643

589644
const output = record.get({ plain: true })
590645

591-
const qx = new SequelizeQueryExecutor(record.sequelize)
592-
593646
// For github-nango integrations, populate settings.nangoMapping from dedicated table
594647
if (output.platform === PlatformType.GITHUB_NANGO) {
595-
const nangoMapping = await getNangoMappingsForIntegration(qx, output.id)
648+
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [output.id])
649+
const nangoMapping = allNangoMappings[output.id] || {}
596650
if (Object.keys(nangoMapping).length > 0) {
597651
output.settings = { ...output.settings, nangoMapping }
598652
}
@@ -603,7 +657,8 @@ class IntegrationRepository {
603657
(output.platform === PlatformType.GITHUB || output.platform === PlatformType.GITHUB_NANGO) &&
604658
output.settings?.orgs?.length > 0
605659
) {
606-
const reposByOrg = await getReposGroupedByOrg(qx, output.id)
660+
const allReposByOrg = await getReposGroupedByOrgForIntegrations(qx, [output.id])
661+
const reposByOrg = allReposByOrg[output.id] || {}
607662

608663
// Only overwrite orgs[].repos from the repositories table if there are rows.
609664
// During the 'mapping' phase (legacy github connect), repos live in settings

services/apps/cron_service/src/jobs/nangoMonitoring.job.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
INangoIntegrationData,
1313
fetchNangoCursorRowsForIntegration,
1414
fetchNangoIntegrationData,
15-
getNangoMappingsForIntegration,
15+
getNangoMappingsForIntegrations,
1616
} from '@crowd/data-access-layer/src/integrations'
1717
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
1818
import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories'
@@ -73,7 +73,10 @@ const job: IJobDefinition = {
7373
for (const int of allIntegrations) {
7474
if (int.platform === PlatformType.GITHUB_NANGO) {
7575
// Fetch nango mappings from the dedicated table
76-
const nangoMapping = await getNangoMappingsForIntegration(pgpQx(dbConnection), int.id)
76+
const allNangoMappings = await getNangoMappingsForIntegrations(pgpQx(dbConnection), [
77+
int.id,
78+
])
79+
const nangoMapping = allNangoMappings[int.id] || {}
7780

7881
// Check which repos are connected to nango by comparing repositories table vs nango_mapping
7982
const repoRows = await getReposForGithubIntegration(pgpQx(dbConnection), int.id)

services/apps/cron_service/src/jobs/nangoTrigger.job.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/da
55
import {
66
fetchNangoIntegrationDataForCheck,
77
fetchNangoLastCheckedAt,
8-
getNangoMappingsForIntegration,
8+
getNangoMappingsForIntegrations,
99
} from '@crowd/data-access-layer/src/integrations'
1010
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
1111
import {
@@ -81,7 +81,8 @@ const job: IJobDefinition = {
8181
{ owner: string; repoName: string; repositoryId: string | null }
8282
> = {}
8383
if (platform === NangoIntegration.GITHUB) {
84-
nangoMapping = await getNangoMappingsForIntegration(qx, id)
84+
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [id])
85+
nangoMapping = allNangoMappings[id] || {}
8586
if (Object.keys(nangoMapping).length === 0) {
8687
// ignore non-nango github integrations
8788
continue

services/apps/nango_worker/src/activities/nangoActivities.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
fetchIntegrationById,
88
findIntegrationDataForNangoWebhookProcessing,
99
getNangoCursor,
10-
getNangoMappingsForIntegration,
10+
getNangoMappingsForIntegrations,
1111
linkNangoMappingToRepository,
1212
removeGithubNangoConnection,
1313
removeNangoCursorsByConnection,
@@ -326,10 +326,11 @@ export async function analyzeGithubIntegration(
326326
const finalRepos = Array.from(repoSet.values())
327327

328328
// fetch nango mappings from the dedicated table
329-
const nangoMapping = await getNangoMappingsForIntegration(
329+
const allNangoMappings = await getNangoMappingsForIntegrations(
330330
dbStoreQx(svc.postgres.writer),
331-
integrationId,
331+
[integrationId],
332332
)
333+
const nangoMapping = allNangoMappings[integrationId] || {}
333334
const connectionIds = Object.keys(nangoMapping)
334335

335336
// determine which connections to delete if needed

services/apps/nango_worker/src/bin/check-disconnected-connections.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
22
import {
33
fetchNangoIntegrationData,
4-
getNangoMappingsForIntegration,
4+
getNangoMappingsForIntegrations,
55
} from '@crowd/data-access-layer/src/integrations'
66
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
77
import { getServiceLogger } from '@crowd/logging'
@@ -40,8 +40,8 @@ setImmediate(async () => {
4040
const connectionIds: string[] = []
4141
for (const int of nangoIntegrations) {
4242
if (int.platform === PlatformType.GITHUB_NANGO) {
43-
const nangoMapping = await getNangoMappingsForIntegration(qx, int.id)
44-
connectionIds.push(...Object.keys(nangoMapping))
43+
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [int.id])
44+
connectionIds.push(...Object.keys(allNangoMappings[int.id] || {}))
4545
} else {
4646
connectionIds.push(int.id)
4747
}

services/apps/nango_worker/src/bin/check-disconnected-integrations.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
22
import {
33
fetchNangoDeletedIntegrationData,
4-
getNangoMappingsForIntegration,
4+
getNangoMappingsForIntegrations,
55
} from '@crowd/data-access-layer/src/integrations'
66
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
77
import { getServiceLogger } from '@crowd/logging'
@@ -46,7 +46,8 @@ setImmediate(async () => {
4646

4747
for (const int of deletedNangoIntegrations) {
4848
if (int.platform === PlatformType.GITHUB_NANGO) {
49-
const nangoMapping = await getNangoMappingsForIntegration(qx, int.id)
49+
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [int.id])
50+
const nangoMapping = allNangoMappings[int.id] || {}
5051
const connectionIdsForIntegration = Object.keys(nangoMapping)
5152
if (connectionIdsForIntegration.length > 0) {
5253
connectionIds.push(...connectionIdsForIntegration)

services/apps/nango_worker/src/bin/check-nango-mapping.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/da
22
import {
33
fetchNangoCursorRowsForIntegration,
44
fetchNangoIntegrationData,
5-
getNangoMappingsForIntegration,
5+
getNangoMappingsForIntegrations,
66
} from '@crowd/data-access-layer/src/integrations'
77
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
88
import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories'
@@ -43,7 +43,8 @@ async function collectStats(): Promise<Stats> {
4343

4444
for (const integration of integrations) {
4545
// Fetch nango mappings from the dedicated table
46-
const nangoMapping = await getNangoMappingsForIntegration(qx, integration.id)
46+
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [integration.id])
47+
const nangoMapping = allNangoMappings[integration.id] || {}
4748
const connectionIds = Object.keys(nangoMapping)
4849

4950
// Track connectionIds that don't have cursors

services/apps/nango_worker/src/bin/full-resync.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/d
22
import {
33
clearNangoCursors,
44
findIntegrationDataForNangoWebhookProcessing,
5-
getNangoMappingsForIntegration,
5+
getNangoMappingsForIntegrations,
66
} from '@crowd/data-access-layer/src/integrations'
77
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
88
import { getServiceLogger } from '@crowd/logging'
@@ -39,11 +39,10 @@ setImmediate(async () => {
3939
try {
4040
const toTrigger: string[] = []
4141
if (integration.platform === PlatformType.GITHUB_NANGO) {
42-
const nangoMapping = await getNangoMappingsForIntegration(
43-
pgpQx(dbConnection),
42+
const allNangoMappings = await getNangoMappingsForIntegrations(pgpQx(dbConnection), [
4443
integration.id,
45-
)
46-
toTrigger.push(...Object.keys(nangoMapping))
44+
])
45+
toTrigger.push(...Object.keys(allNangoMappings[integration.id] || {}))
4746
} else if (integration.platform === PlatformType.GERRIT) {
4847
toTrigger.push(integration.id)
4948
} else {

services/libs/data-access-layer/src/integrations/index.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,8 @@ export async function findNangoRepositoriesToBeRemoved(
633633
return []
634634
}
635635

636-
const nangoMappings = await getNangoMappingsForIntegration(qx, integrationId)
636+
const allNangoMappings = await getNangoMappingsForIntegrations(qx, [integrationId])
637+
const nangoMappings = allNangoMappings[integrationId] || {}
637638

638639
if (Object.keys(nangoMappings).length === 0) {
639640
return []
@@ -664,19 +665,25 @@ export interface INangoMappingRow {
664665
updatedAt: string
665666
}
666667

667-
export async function getNangoMappingsForIntegration(
668+
export type NangoMappingEntry = { owner: string; repoName: string; repositoryId: string | null }
669+
670+
export async function getNangoMappingsForIntegrations(
668671
qx: QueryExecutor,
669-
integrationId: string,
670-
): Promise<Record<string, { owner: string; repoName: string; repositoryId: string | null }>> {
672+
integrationIds: string[],
673+
): Promise<Record<string, Record<string, NangoMappingEntry>>> {
674+
if (integrationIds.length === 0) return {}
675+
671676
const rows: INangoMappingRow[] = await qx.select(
672-
`SELECT * FROM integration.nango_mapping WHERE "integrationId" = $(integrationId)`,
673-
{ integrationId },
677+
`SELECT * FROM integration.nango_mapping WHERE "integrationId" IN ($(integrationIds:csv))`,
678+
{ integrationIds },
674679
)
675680

676-
const result: Record<string, { owner: string; repoName: string; repositoryId: string | null }> =
677-
{}
681+
const result: Record<string, Record<string, NangoMappingEntry>> = {}
678682
for (const row of rows) {
679-
result[row.connectionId] = {
683+
if (!result[row.integrationId]) {
684+
result[row.integrationId] = {}
685+
}
686+
result[row.integrationId][row.connectionId] = {
680687
owner: row.owner,
681688
repoName: row.repoName,
682689
repositoryId: row.repositoryId,

0 commit comments

Comments
 (0)