Skip to content

Commit 26b58ee

Browse files
committed
Merge branch 'main' into deprecate-questdb-CM-420
2 parents ca6e3f1 + b0d2e27 commit 26b58ee

13 files changed

Lines changed: 175 additions & 41 deletions

File tree

backend/src/services/integrationService.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,12 @@ export default class IntegrationService {
252252
}
253253

254254
if (IntegrationService.isCodePlatform(platform) && platform !== PlatformType.GIT) {
255-
this.gitConnectOrUpdate({
256-
remotes: repositories,
257-
})
255+
await this.gitConnectOrUpdate(
256+
{
257+
remotes: repositories,
258+
},
259+
txOptions,
260+
)
258261
}
259262

260263
return integration
@@ -833,7 +836,11 @@ export default class IntegrationService {
833836
}
834837

835838
async githubNangoConnect(settings, mapping, integrationId?: string) {
836-
const transaction = await SequelizeRepository.createTransaction(this.options)
839+
const existingTransaction = SequelizeRepository.getTransaction(this.options)
840+
841+
const transaction =
842+
existingTransaction || (await SequelizeRepository.createTransaction(this.options))
843+
837844
const txOptions = {
838845
...this.options,
839846
transaction,
@@ -881,7 +888,9 @@ export default class IntegrationService {
881888
)
882889
}
883890

884-
await SequelizeRepository.commitTransaction(transaction)
891+
if (!existingTransaction) {
892+
await SequelizeRepository.commitTransaction(transaction)
893+
}
885894

886895
await this.options.temporal.workflow.start('syncGithubIntegration', {
887896
taskQueue: 'nango',
@@ -896,7 +905,9 @@ export default class IntegrationService {
896905
return integration
897906
} catch (err) {
898907
this.options.log.error(err, 'Error while creating or updating GitHub integration!')
899-
await SequelizeRepository.rollbackTransaction(transaction)
908+
if (!existingTransaction) {
909+
await SequelizeRepository.rollbackTransaction(transaction)
910+
}
900911
throw err
901912
}
902913
}
@@ -1280,7 +1291,9 @@ export default class IntegrationService {
12801291
return null
12811292
}
12821293

1283-
const existingTransaction = SequelizeRepository.getTransaction(options || this.options)
1294+
const currentOptions = options || this.options
1295+
const existingTransaction =
1296+
currentOptions.transaction || SequelizeRepository.getTransaction(currentOptions)
12841297
const transaction =
12851298
existingTransaction || (await SequelizeRepository.createTransaction(options || this.options))
12861299
let integration

services/apps/git_integration/src/crowdgit/services/clone/clone_service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def _perform_minimal_clone(self, path: str, remote: str) -> None:
6262
)
6363
self.logger.info("Initializing minimal clone")
6464
await run_shell_command(
65-
["git", "clone", "--depth=1", "--no-tags", "--single-branch", remote, path], cwd=path
65+
["git", "clone", "--depth=1", "--no-tags", "--single-branch", remote, "."], cwd=path
6666
)
6767
self.logger.info("Minimal clone initialized successfully")
6868

@@ -280,7 +280,9 @@ async def _calculate_batch_depth(self, repo_path: str, remote: str) -> int:
280280
async def _perform_full_clone(self, repo_path: str, remote: str):
281281
"""Perform full repository clone"""
282282
self.logger.info(f"Performing full clone for repo {remote}...")
283-
await run_shell_command(["git", "clone", remote, repo_path], cwd=repo_path)
283+
await run_shell_command(
284+
["git", "clone", "--no-tags", "--single-branch", remote, "."], cwd=repo_path
285+
)
284286
self.logger.info(f"Successfully completed full clone of repository: {remote}")
285287

286288
async def has_default_branch_changed(self, remote: str, saved_branch: str | None) -> bool:

services/apps/script_executor_worker/src/activities.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import {
2+
blockMemberOrganizationAffiliation,
3+
getOrganizationMembers,
4+
} from './activities/block-organization-affiliation'
15
import {
26
findDuplicateMembersAfterDate,
37
moveMemberActivityRelations,
@@ -22,6 +26,7 @@ import {
2226
findMemberIdentitiesGroupedByPlatform,
2327
findMemberMergeActions,
2428
} from './activities/dissect-member'
29+
import { calculateMemberAffiliations } from './activities/fix-activity-foreign-keys'
2530
import {
2631
getBotMembersWithOrgAffiliation,
2732
removeBotMemberOrganization,
@@ -78,4 +83,7 @@ export {
7883
getBotMembersWithOrgAffiliation,
7984
removeBotMemberOrganization,
8085
unlinkOrganizationFromBotActivities,
86+
blockMemberOrganizationAffiliation,
87+
getOrganizationMembers,
88+
calculateMemberAffiliations,
8189
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { pgpQx } from '@crowd/data-access-layer'
2+
import { changeOverride } from '@crowd/data-access-layer/src/member_organization_affiliation_overrides'
3+
import OrganizationRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/organization.repo'
4+
import { IMemberOrganization } from '@crowd/types'
5+
6+
import { svc } from '../main'
7+
8+
export async function getOrganizationMembers(
9+
organizationId: string,
10+
limit = 100,
11+
offset = 0,
12+
): Promise<IMemberOrganization[]> {
13+
try {
14+
const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log)
15+
return orgRepo.findOrganizationMembers(organizationId, limit, offset)
16+
} catch (error) {
17+
svc.log.error(error, 'Error getting organization members!')
18+
throw error
19+
}
20+
}
21+
22+
export async function blockMemberOrganizationAffiliation(
23+
memberId: string,
24+
memberOrganizationId: string,
25+
): Promise<void> {
26+
try {
27+
const qx = pgpQx(svc.postgres.writer.connection())
28+
return changeOverride(qx, {
29+
memberId,
30+
memberOrganizationId,
31+
allowAffiliation: false,
32+
isPrimaryWorkExperience: false,
33+
})
34+
} catch (error) {
35+
svc.log.error(error, 'Error blocking organization affiliation!')
36+
throw error
37+
}
38+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { refreshMemberOrganizationAffiliations } from '@crowd/data-access-layer/src/member-organization-affiliation'
2+
import MergeActionRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/mergeAction.repo'
3+
import { EntityType } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/types'
4+
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
5+
import { IMergeAction } from '@crowd/types'
6+
7+
import { svc } from '../../main'
8+
9+
export async function findMergeActionsWithDeletedSecondaryEntities(
10+
limit: number,
11+
offset: number,
12+
entityType: EntityType,
13+
): Promise<IMergeAction[]> {
14+
try {
15+
const mergeActionRepo = new MergeActionRepository(svc.postgres.reader.connection(), svc.log)
16+
return mergeActionRepo.findMergeActionsWithDeletedSecondaryEntities(limit, offset, entityType)
17+
} catch (error) {
18+
svc.log.error(error, `Error getting merge actions with deleted secondary ${entityType}s !`)
19+
throw error
20+
}
21+
}
22+
23+
export async function calculateMemberAffiliations(memberId: string): Promise<void> {
24+
try {
25+
const qx = pgpQx(svc.postgres.writer.connection())
26+
await refreshMemberOrganizationAffiliations(qx, memberId)
27+
} catch (err) {
28+
throw new Error(err)
29+
}
30+
}

services/apps/script_executor_worker/src/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,8 @@ export interface IDedupActivityRelationsArgs extends IScriptBatchTestArgs {
5656
groupsPerRun?: number
5757
cursor?: Omit<IActivityRelationDuplicateGroup, 'activityIds'>
5858
}
59+
60+
export interface IBlockOrganizationAffiliationArgs {
61+
organizationId: string
62+
offset?: number
63+
}

services/apps/script_executor_worker/src/workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { blockOrganizationAffiliation } from './workflows/block-organization-affiliation'
12
import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members'
23
import { cleanupMembers } from './workflows/cleanup/members'
34
import { cleanupOrganizations } from './workflows/cleanup/organizations'
@@ -22,4 +23,5 @@ export {
2223
processLLMVerifiedMerges,
2324
cleanupDuplicateMembers,
2425
fixBotMembersAffiliation,
26+
blockOrganizationAffiliation,
2527
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { continueAsNew, proxyActivities } from '@temporalio/workflow'
2+
3+
import * as activities from '../activities'
4+
import { IBlockOrganizationAffiliationArgs } from '../types'
5+
import { chunkArray } from '../utils/common'
6+
7+
const { getOrganizationMembers, blockMemberOrganizationAffiliation, calculateMemberAffiliations } =
8+
proxyActivities<typeof activities>({
9+
startToCloseTimeout: '30 minutes',
10+
})
11+
12+
export async function blockOrganizationAffiliation(
13+
args: IBlockOrganizationAffiliationArgs,
14+
): Promise<void> {
15+
const MEMBERS_PER_RUN = 500
16+
const BATCH_SIZE = 50
17+
const OFFSET = args.offset ?? 0
18+
19+
const memberOrganizations = await getOrganizationMembers(
20+
args.organizationId,
21+
MEMBERS_PER_RUN,
22+
OFFSET,
23+
)
24+
25+
if (memberOrganizations.length === 0) {
26+
console.log('No more organization members to block!')
27+
return
28+
}
29+
30+
// Step 1: Block all affiliations in batches
31+
for (const chunk of chunkArray(memberOrganizations, BATCH_SIZE)) {
32+
await Promise.all(chunk.map((mo) => blockMemberOrganizationAffiliation(mo.memberId, mo.id)))
33+
}
34+
35+
// Step 2: Deduplicate memberIds and calculate affiliations
36+
const uniqueMemberIds = Array.from(new Set(memberOrganizations.map((mo) => mo.memberId)))
37+
for (const chunk of chunkArray(uniqueMemberIds, BATCH_SIZE)) {
38+
await Promise.all(chunk.map((memberId) => calculateMemberAffiliations(memberId)))
39+
}
40+
41+
// Step 3: Continue pagination
42+
await continueAsNew<typeof blockOrganizationAffiliation>({
43+
...args,
44+
offset: OFFSET + MEMBERS_PER_RUN,
45+
})
46+
}

services/libs/data-access-layer/src/old/apps/integration_sync_worker/member.repo.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,6 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
7979
where mnm."memberId" = $(id)
8080
and m2."deletedAt" is null
8181
group by mnm."memberId"),
82-
member_tags as (select mt."memberId",
83-
json_agg(
84-
json_build_object(
85-
'id', t.id,
86-
'name', t.name
87-
)
88-
) as all_tags,
89-
jsonb_agg(t.id) as all_ids
90-
from "memberTags" mt
91-
inner join tags t on mt."tagId" = t.id
92-
where mt."memberId" = $(id)
93-
and t."deletedAt" is null
94-
group by mt."memberId"),
9582
member_organizations as (select mo."memberId",
9683
os."segmentId",
9784
json_agg(
@@ -157,7 +144,6 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
157144
158145
i.identities,
159146
coalesce(mo.all_organizations, json_build_array()) as organizations,
160-
coalesce(mt.all_tags, json_build_array()) as tags,
161147
coalesce(tmd.to_merge_ids, array []::text[]) as "toMergeIds",
162148
coalesce(nmd.no_merge_ids, array []::text[]) as "noMergeIds"
163149
from "memberSegments" ms
@@ -166,7 +152,6 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
166152
inner join activity_data ad on ms."memberId" = ad."memberId" and ms."segmentId" = ad."segmentId"
167153
left join to_merge_data tmd on m.id = tmd."memberId"
168154
left join no_merge_data nmd on m.id = nmd."memberId"
169-
left join member_tags mt on ms."memberId" = mt."memberId"
170155
left join member_organizations mo on ms."memberId" = mo."memberId" and ms."segmentId" = mo."segmentId"
171156
where ms."memberId" = $(id)
172157
and m."deletedAt" is null;`,

services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ class MemberRepository {
208208
{ name: 'memberSegmentAffiliations', conditions: ['memberId'] },
209209
{ name: 'memberSegmentsAgg', conditions: ['memberId'] },
210210
{ name: 'memberSegments', conditions: ['memberId'] },
211-
{ name: 'memberTags', conditions: ['memberId'] },
212211
{ name: 'memberToMerge', conditions: ['memberId', 'toMergeId'] },
213212
{ name: 'memberToMergeRaw', conditions: ['memberId', 'toMergeId'] },
214213
{ name: 'members', conditions: ['id'] },

0 commit comments

Comments
 (0)