Skip to content

Commit 8b401d2

Browse files
authored
chore: refactor some activities into a separate child workflow (CM-922) (#3831)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent ba87406 commit 8b401d2

7 files changed

Lines changed: 179 additions & 43 deletions

File tree

scripts/cli

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,21 @@ function select_services() {
225225
SERVICES+=("$filename")
226226
done < <(find "$CLI_HOME/services" -name '*.yaml')
227227

228-
# Show multiselect and store results in SELECTED_SERVICES
229-
multiselect SELECTED_SERVICES SERVICES[@]
228+
# Use fzf if available, fall back to built-in multiselect
229+
if command -v fzf &>/dev/null; then
230+
mapfile -t SELECTED_SERVICES < <(
231+
printf '%s\n' "${SERVICES[@]}" | sort | fzf --multi --height=40% --prompt="Select services (TAB to toggle, ENTER to confirm): "
232+
)
233+
else
234+
multiselect SELECTED_SERVICES SERVICES[@]
235+
fi
236+
237+
if [[ ${#SELECTED_SERVICES[@]} -eq 0 ]]; then
238+
error "No services selected!"
239+
exit 1
240+
fi
230241

242+
local selected_services=""
231243
for service in "${SELECTED_SERVICES[@]}"; do
232244
selected_services+="$service, "
233245
done

services/apps/nango_worker/src/types.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,27 @@ export interface IGithubRepoData {
2828
owner: string
2929
repoName: string
3030
}
31+
32+
export interface IDeleteGithubRepoConnectionArgs {
33+
integrationId: string
34+
providerConfigKey: string
35+
connectionId: string
36+
repo: IGithubRepoData
37+
}
38+
39+
export interface IDeleteDuplicateGithubConnectionArgs {
40+
integrationId: string
41+
providerConfigKey: string
42+
connectionId: string
43+
repo: IGithubRepoData
44+
}
45+
46+
export interface ISyncGithubRepoArgs {
47+
integrationId: string
48+
providerConfigKey: string
49+
repo: IGithubRepoData
50+
}
51+
52+
export interface ISyncGithubRepoResult {
53+
skipped: boolean
54+
}
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
1+
import { deleteDuplicateGithubConnection } from './workflows/deleteDuplicateGithubConnection'
2+
import { deleteGithubRepoConnection } from './workflows/deleteGithubRepoConnection'
13
import { processNangoWebhook } from './workflows/processNangoWebhook'
24
import { syncGithubIntegration } from './workflows/syncGithubIntegration'
5+
import { syncGithubRepo } from './workflows/syncGithubRepo'
36

4-
export { processNangoWebhook, syncGithubIntegration }
7+
export {
8+
deleteDuplicateGithubConnection,
9+
deleteGithubRepoConnection,
10+
processNangoWebhook,
11+
syncGithubIntegration,
12+
syncGithubRepo,
13+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { proxyActivities } from '@temporalio/workflow'
2+
3+
import type * as activities from '../activities/nangoActivities'
4+
import { IDeleteDuplicateGithubConnectionArgs } from '../types'
5+
6+
const activity = proxyActivities<typeof activities>({
7+
startToCloseTimeout: '5 minute',
8+
retry: { maximumAttempts: 10, backoffCoefficient: 2 },
9+
})
10+
11+
export async function deleteDuplicateGithubConnection(
12+
args: IDeleteDuplicateGithubConnectionArgs,
13+
): Promise<void> {
14+
const { integrationId, providerConfigKey, connectionId } = args
15+
16+
// Delete nango connection
17+
await activity.deleteConnection(integrationId, providerConfigKey, connectionId)
18+
19+
// Delete connection from integrations.settings.nangoMapping object
20+
await activity.removeGithubConnection(integrationId, connectionId)
21+
22+
// We don't unmap because this one was duplicated
23+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { proxyActivities } from '@temporalio/workflow'
2+
3+
import type * as activities from '../activities/nangoActivities'
4+
import { IDeleteGithubRepoConnectionArgs } from '../types'
5+
6+
const activity = proxyActivities<typeof activities>({
7+
startToCloseTimeout: '5 minute',
8+
retry: { maximumAttempts: 10, backoffCoefficient: 2 },
9+
})
10+
11+
export async function deleteGithubRepoConnection(
12+
args: IDeleteGithubRepoConnectionArgs,
13+
): Promise<void> {
14+
const { integrationId, providerConfigKey, connectionId, repo } = args
15+
16+
// Delete nango connection
17+
await activity.deleteConnection(integrationId, providerConfigKey, connectionId)
18+
19+
// Delete connection from integrations.settings.nangoMapping object
20+
await activity.removeGithubConnection(integrationId, connectionId)
21+
22+
// Delete from public.repositories
23+
await activity.unmapGithubRepo(integrationId, repo)
24+
}
Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,72 @@
1-
import { proxyActivities } from '@temporalio/workflow'
1+
import { ParentClosePolicy, executeChild, proxyActivities, startChild } from '@temporalio/workflow'
22

3-
import * as activities from '../activities/nangoActivities'
3+
import type * as activities from '../activities/nangoActivities'
44
import { ISyncGithubIntegrationArguments } from '../types'
55

6+
import { deleteDuplicateGithubConnection } from './deleteDuplicateGithubConnection'
7+
import { deleteGithubRepoConnection } from './deleteGithubRepoConnection'
8+
import { syncGithubRepo } from './syncGithubRepo'
9+
610
const activity = proxyActivities<typeof activities>({
711
startToCloseTimeout: '2 hour',
812
retry: { maximumAttempts: 20, backoffCoefficient: 2 },
913
})
1014

1115
export async function syncGithubIntegration(args: ISyncGithubIntegrationArguments): Promise<void> {
12-
const integrationId = args.integrationId
16+
const { integrationId } = args
1317

1418
const result = await activity.analyzeGithubIntegration(integrationId)
1519

16-
// delete connections that are no longer needed
20+
// Delete connections that are no longer needed - fire and forget (parallel)
1721
for (const repo of result.reposToDelete) {
18-
// delete nango connection
19-
await activity.deleteConnection(integrationId, result.providerConfigKey, repo.connectionId)
20-
21-
// delete connection from integrations.settings.nangoMapping object
22-
await activity.removeGithubConnection(integrationId, repo.connectionId)
23-
24-
// delete from public.repositories
25-
await activity.unmapGithubRepo(integrationId, repo.repo)
22+
await startChild(deleteGithubRepoConnection, {
23+
workflowId: `sync-github/${integrationId}/delete-connection/${repo.repo.owner}/${repo.repo.repoName}/${repo.connectionId}`,
24+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
25+
args: [
26+
{
27+
integrationId,
28+
providerConfigKey: result.providerConfigKey,
29+
connectionId: repo.connectionId,
30+
repo: repo.repo,
31+
},
32+
],
33+
})
2634
}
2735

28-
// delete duplicate connections
36+
// Delete duplicate connections - fire and forget (parallel)
2937
for (const repo of result.duplicatesToDelete) {
30-
// delete nango connection
31-
await activity.deleteConnection(integrationId, result.providerConfigKey, repo.connectionId)
32-
33-
// delete connection from integrations.settings.nangoMapping object
34-
await activity.removeGithubConnection(integrationId, repo.connectionId)
35-
36-
// we don't unmap because this one was duplicated
38+
await startChild(deleteDuplicateGithubConnection, {
39+
workflowId: `sync-github/${integrationId}/delete-duplicate/${repo.repo.owner}/${repo.repo.repoName}/${repo.connectionId}`,
40+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
41+
args: [
42+
{
43+
integrationId,
44+
providerConfigKey: result.providerConfigKey,
45+
connectionId: repo.connectionId,
46+
repo: repo.repo,
47+
},
48+
],
49+
})
3750
}
3851

39-
// create connections for repos that are not already connected
52+
// Create connections for repos that are not already connected - sequential (rate limiting)
4053
for (const repo of result.reposToSync) {
41-
const canCreate = await activity.canCreateGithubConnection()
42-
43-
if (!canCreate) {
54+
const { skipped } = await executeChild(syncGithubRepo, {
55+
workflowId: `sync-github/${integrationId}/create-connection/${repo.owner}/${repo.repoName}`,
56+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
57+
args: [
58+
{
59+
integrationId,
60+
providerConfigKey: result.providerConfigKey,
61+
repo,
62+
},
63+
],
64+
})
65+
66+
if (skipped) {
4467
await activity.logInfo(
4568
`Not enough time has passed since last connection! Skipping repo ${repo.owner}/${repo.repoName} from integration ${integrationId}!`,
4669
)
47-
continue
4870
}
49-
50-
// create nango connection
51-
const connectionId = await activity.createGithubConnection(integrationId, repo)
52-
53-
// add connection to integrations.settings.nangoMapping object
54-
await activity.setGithubConnection(integrationId, repo, connectionId)
55-
56-
// add repo to git integration
57-
await activity.updateGitIntegrationWithRepo(integrationId, repo)
58-
59-
// add repo to public.repositories (+ git.repositoryProcessing if first time)
60-
await activity.mapGithubRepoToRepositories(integrationId, repo)
61-
62-
// start nango sync
63-
await activity.startNangoSync(integrationId, result.providerConfigKey, connectionId)
6471
}
6572
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { proxyActivities } from '@temporalio/workflow'
2+
3+
import type * as activities from '../activities/nangoActivities'
4+
import { ISyncGithubRepoArgs, ISyncGithubRepoResult } from '../types'
5+
6+
const activity = proxyActivities<typeof activities>({
7+
startToCloseTimeout: '2 hour',
8+
retry: { maximumAttempts: 20, backoffCoefficient: 2 },
9+
})
10+
11+
export async function syncGithubRepo(args: ISyncGithubRepoArgs): Promise<ISyncGithubRepoResult> {
12+
const { integrationId, providerConfigKey, repo } = args
13+
14+
// Check if we can create a connection (rate limiting)
15+
const canCreate = await activity.canCreateGithubConnection()
16+
17+
if (!canCreate) {
18+
return { skipped: true }
19+
}
20+
21+
// Create nango connection
22+
const connectionId = await activity.createGithubConnection(integrationId, repo)
23+
24+
// Add connection to integrations.settings.nangoMapping object
25+
await activity.setGithubConnection(integrationId, repo, connectionId)
26+
27+
// Add repo to git integration
28+
await activity.updateGitIntegrationWithRepo(integrationId, repo)
29+
30+
// Add repo to public.repositories
31+
await activity.mapGithubRepoToRepositories(integrationId, repo)
32+
33+
// Start nango sync
34+
await activity.startNangoSync(integrationId, providerConfigKey, connectionId)
35+
36+
return { skipped: false }
37+
}

0 commit comments

Comments
 (0)