Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ import {
blockMemberOrganizationAffiliation,
getOrganizationMembers,
} from './activities/block-organization-affiliation'
import {
findDuplicateMembersAfterDate,
moveMemberActivityRelations,
} from './activities/cleanup/duplicate-members'
import { deleteMember, getMembersToCleanup, syncRemoveMember } from './activities/cleanup/member'
import {
deleteOrganization,
Expand Down Expand Up @@ -44,6 +40,13 @@ import {
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
} from './activities/merge-members-with-similar-identities'
import { getUnprocessedLLMApprovedSuggestions } from './activities/process-llm-verified-merges'
import {
getMemberOrganizationsToPrune,
getOrganizationsToPrune,
pruneMemberOrganization,
pruneOrganization,
refreshMemberAffiliations,
} from './activities/prune-duplicate-organizations'
import { deleteIndexedEntities } from './activities/sync/entity-index'
import { getMembersForSync, syncMembersBatch } from './activities/sync/member'
import { getOrganizationsForSync, syncOrganizationsBatch } from './activities/sync/organization'
Expand Down Expand Up @@ -78,12 +81,15 @@ export {
deleteIndexedEntities,
getUnprocessedLLMApprovedSuggestions,
getWorkflowsCount,
findDuplicateMembersAfterDate,
moveMemberActivityRelations,
getBotMembersWithOrgAffiliation,
removeBotMemberOrganization,
unlinkOrganizationFromBotActivities,
blockMemberOrganizationAffiliation,
getOrganizationMembers,
calculateMemberAffiliations,
getOrganizationsToPrune,
pruneOrganization,
getMemberOrganizationsToPrune,
pruneMemberOrganization,
refreshMemberAffiliations,
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { refreshMemberOrganizationAffiliations } from '@crowd/data-access-layer/src/member-organization-affiliation'
import { deleteMemberOrganizations } from '@crowd/data-access-layer/src/members'
import OrganizationRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/organization.repo'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'

import { svc } from '../main'

export async function pruneOrganization(orgId: string): Promise<void> {
try {
const orgRepo = new OrganizationRepository(svc.postgres.writer.connection(), svc.log)
await orgRepo.pruneOrganization(orgId)
} catch (error) {
svc.log.error(error, 'Error pruning organization in database!')
throw error
}
}

export async function getOrganizationsToPrune(
batchSize: number,
): Promise<{ id: string; displayName: string }[]> {
try {
const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log)
return orgRepo.getOrganizationsToPrune(batchSize)
} catch (error) {
svc.log.error(error, 'Error getting organizations to prune!')
throw error
}
}

export async function getMemberOrganizationsToPrune(
batchSize: number,
): Promise<{ id: string; memberId: string }[]> {
try {
const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log)
return orgRepo.getMemberOrganizationsToPrune(batchSize)
} catch (error) {
svc.log.error(error, 'Error getting member organizations to prune!')
throw error
}
}

export async function pruneMemberOrganization(
memberOrganizationId: string,
memberId: string,
): Promise<void> {
try {
const qx = pgpQx(svc.postgres.writer.connection())
await deleteMemberOrganizations(qx, memberId, [memberOrganizationId], false)
} catch (error) {
svc.log.error(error, 'Error pruning member organization!')
throw error
}
}

export async function refreshMemberAffiliations(memberId: string): Promise<void> {
try {
const qx = pgpQx(svc.postgres.writer.connection())
await refreshMemberOrganizationAffiliations(qx, memberId)
} catch (error) {
svc.log.error(error, 'Error refreshing member affiliations!')
throw error
}
}
6 changes: 0 additions & 6 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ export interface IProcessLLMVerifiedMergesArgs extends IScriptBatchTestArgs {
type: string
}

export interface ICleanupDuplicateMembersArgs extends IScriptBatchTestArgs {
cutoffDate?: string
checkByActivityIdentity?: boolean
checkByTwitterIdentity?: boolean
}

export interface IDedupActivityRelationsArgs extends IScriptBatchTestArgs {
groupsPerRun?: number
cursor?: Omit<IActivityRelationDuplicateGroup, 'activityIds'>
Expand Down
6 changes: 4 additions & 2 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { blockOrganizationAffiliation } from './workflows/block-organization-affiliation'
import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members'
import { cleanupMembers } from './workflows/cleanup/members'
import { cleanupOrganizations } from './workflows/cleanup/organizations'
import { dissectMember } from './workflows/dissectMember'
Expand All @@ -8,6 +7,8 @@ import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from '.
import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliation'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges'
import { pruneDuplicateOrganizations } from './workflows/prune-duplicate-organizations'
import { pruneIncorrectMemberOrganizations } from './workflows/prune-incorrect-member-organizations'
import { syncMembers } from './workflows/sync/members'
import { syncOrganizations } from './workflows/sync/organizations'

Expand All @@ -21,7 +22,8 @@ export {
cleanupMembers,
cleanupOrganizations,
processLLMVerifiedMerges,
cleanupDuplicateMembers,
pruneDuplicateOrganizations,
fixBotMembersAffiliation,
blockOrganizationAffiliation,
pruneIncorrectMemberOrganizations,
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities'
import { IScriptBatchTestArgs } from '../types'
import { chunkArray } from '../utils/common'

const { getOrganizationsToPrune, pruneOrganization, syncRemoveOrganization } = proxyActivities<
typeof activities
>({
startToCloseTimeout: '30 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

export async function pruneDuplicateOrganizations(args: IScriptBatchTestArgs): Promise<void> {
const BATCH_SIZE = args.batchSize ?? 100

const organizationsToPrune = await getOrganizationsToPrune(BATCH_SIZE)

if (organizationsToPrune.length === 0) {
console.log('No more organizations to prune!')
return
}

const CHUNK_SIZE = 25

for (const chunk of chunkArray(organizationsToPrune, CHUNK_SIZE)) {
const cleanupTasks = chunk.map(async (o) => {
console.log('Pruning organization', o.displayName)
await syncRemoveOrganization(o.id)
return pruneOrganization(o.id)
})

await Promise.all(cleanupTasks).catch((err) => {
console.error('Error pruning organizations!', err)
})
}
}
Comment thread
skwowet marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities'
import { IScriptBatchTestArgs } from '../types'
import { chunkArray } from '../utils/common'

const { getMemberOrganizationsToPrune, pruneMemberOrganization, refreshMemberAffiliations } =
proxyActivities<typeof activities>({
startToCloseTimeout: '30 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

export async function pruneIncorrectMemberOrganizations(args: IScriptBatchTestArgs): Promise<void> {
const BATCH_SIZE = args.batchSize ?? 100

const memberOrganizationsToPrune = await getMemberOrganizationsToPrune(BATCH_SIZE)

if (memberOrganizationsToPrune.length === 0) {
console.log('No more member organizations to prune!')
return
}

const memberIdsToRefresh = new Set<string>()
const CHUNK_SIZE = 25

for (const chunk of chunkArray(memberOrganizationsToPrune, CHUNK_SIZE)) {
const cleanupTasks = chunk.map(async (mo) => {
console.log('Pruning member organization', mo.id)
await pruneMemberOrganization(mo.id, mo.memberId)
memberIdsToRefresh.add(mo.memberId)
})

await Promise.all(cleanupTasks).catch((err) => {
console.error('Error pruning member organizations!', err)
})
}

for (const memberId of memberIdsToRefresh) {
console.log('Refreshing member affiliations', memberId)
await refreshMemberAffiliations(memberId)
}
}
Loading
Loading