From 116cba65ea84ec3377f48e8f11460be4091c51c5 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:38:10 +0530 Subject: [PATCH 1/3] chore: add duplicate organization pruning script --- .../script_executor_worker/src/activities.ts | 12 +-- .../activities/cleanup/duplicate-members.ts | 41 ---------- .../prune-duplicate-organizations.ts | 25 ++++++ .../apps/script_executor_worker/src/types.ts | 6 -- .../script_executor_worker/src/workflows.ts | 4 +- .../workflows/cleanup/duplicate-members.ts | 64 --------------- .../prune-duplicate-organizations.ts | 37 +++++++++ .../organization.repo.ts | 81 +++++++++++++++++++ 8 files changed, 151 insertions(+), 119 deletions(-) delete mode 100644 services/apps/script_executor_worker/src/activities/cleanup/duplicate-members.ts create mode 100644 services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts delete mode 100644 services/apps/script_executor_worker/src/workflows/cleanup/duplicate-members.ts create mode 100644 services/apps/script_executor_worker/src/workflows/prune-duplicate-organizations.ts diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index f0b9f96810..bad2073e49 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -2,10 +2,6 @@ import { blockMemberOrganizationAffiliation, getOrganizationMembers, } from './activities/block-organization-affiliation' -import { - findDuplicateMembersAfterDate, - moveMemberActivityRelations, -} from './activities/cleanup/duplicate-members' import { deleteMember, getMembersToCleanup, syncRemoveMember } from './activities/cleanup/member' import { deleteOrganization, @@ -44,6 +40,10 @@ import { findMembersWithSameVerifiedEmailsInDifferentPlatforms, } from './activities/merge-members-with-similar-identities' import { getUnprocessedLLMApprovedSuggestions } from './activities/process-llm-verified-merges' +import { + getOrganizationsToPrune, + pruneOrganization, +} from './activities/prune-duplicate-organizations' import { deleteIndexedEntities } from './activities/sync/entity-index' import { getMembersForSync, syncMembersBatch } from './activities/sync/member' import { getOrganizationsForSync, syncOrganizationsBatch } from './activities/sync/organization' @@ -78,12 +78,12 @@ export { deleteIndexedEntities, getUnprocessedLLMApprovedSuggestions, getWorkflowsCount, - findDuplicateMembersAfterDate, - moveMemberActivityRelations, getBotMembersWithOrgAffiliation, removeBotMemberOrganization, unlinkOrganizationFromBotActivities, blockMemberOrganizationAffiliation, getOrganizationMembers, calculateMemberAffiliations, + getOrganizationsToPrune, + pruneOrganization, } diff --git a/services/apps/script_executor_worker/src/activities/cleanup/duplicate-members.ts b/services/apps/script_executor_worker/src/activities/cleanup/duplicate-members.ts deleted file mode 100644 index d573d00d31..0000000000 --- a/services/apps/script_executor_worker/src/activities/cleanup/duplicate-members.ts +++ /dev/null @@ -1,41 +0,0 @@ -import ActivityRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activity.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo' -import { - EntityType, - IDuplicateMembersToCleanup, -} from '@crowd/data-access-layer/src/old/apps/script_executor_worker/types' - -import { svc } from '../../main' - -export async function findDuplicateMembersAfterDate( - cutoffDate: string, - limit: number, - checkByActivityIdentity: boolean, - checkByTwitterIdentity: boolean, -): Promise { - try { - const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log) - return memberRepo.findDuplicateMembersAfterDate( - cutoffDate, - limit, - checkByActivityIdentity, - checkByTwitterIdentity, - ) - } catch (error) { - svc.log.error(error, 'Error finding duplicate members after cutoff date!') - throw error - } -} - -export async function moveMemberActivityRelations( - primaryId: string, - secondaryId: string, -): Promise { - try { - const activityRepo = new ActivityRepository(svc.postgres.writer.connection(), svc.log) - await activityRepo.moveActivityRelations(primaryId, secondaryId, EntityType.MEMBER) - } catch (error) { - svc.log.error(error, 'Error updating activity relations for duplicate members!') - throw error - } -} diff --git a/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts b/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts new file mode 100644 index 0000000000..9bf81c391b --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts @@ -0,0 +1,25 @@ +import OrganizationRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/organization.repo' + +import { svc } from '../main' + +export async function pruneOrganization(orgId: string): Promise { + try { + const orgRepo = new OrganizationRepository(svc.postgres.writer.connection(), svc.log) + await orgRepo.pruneOrganization(orgId) + } catch (error) { + svc.log.error(error, 'Error pruning organization in database!') + throw error + } +} + +export async function getOrganizationsToPrune( + batchSize: number, +): Promise<{ id: string; displayName: string }[]> { + try { + const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log) + return orgRepo.getOrganizationsToPrune(batchSize) + } catch (error) { + svc.log.error(error, 'Error getting organizations to prune!') + throw error + } +} diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 8c94682e0c..007bb23511 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -46,12 +46,6 @@ export interface IProcessLLMVerifiedMergesArgs extends IScriptBatchTestArgs { type: string } -export interface ICleanupDuplicateMembersArgs extends IScriptBatchTestArgs { - cutoffDate?: string - checkByActivityIdentity?: boolean - checkByTwitterIdentity?: boolean -} - export interface IDedupActivityRelationsArgs extends IScriptBatchTestArgs { groupsPerRun?: number cursor?: Omit diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 5a1cd5468b..3031b313c8 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -1,5 +1,4 @@ import { blockOrganizationAffiliation } from './workflows/block-organization-affiliation' -import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members' import { cleanupMembers } from './workflows/cleanup/members' import { cleanupOrganizations } from './workflows/cleanup/organizations' import { dissectMember } from './workflows/dissectMember' @@ -8,6 +7,7 @@ import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from '. import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliation' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges' +import { pruneDuplicateOrganizations } from './workflows/prune-duplicate-organizations' import { syncMembers } from './workflows/sync/members' import { syncOrganizations } from './workflows/sync/organizations' @@ -21,7 +21,7 @@ export { cleanupMembers, cleanupOrganizations, processLLMVerifiedMerges, - cleanupDuplicateMembers, + pruneDuplicateOrganizations, fixBotMembersAffiliation, blockOrganizationAffiliation, } diff --git a/services/apps/script_executor_worker/src/workflows/cleanup/duplicate-members.ts b/services/apps/script_executor_worker/src/workflows/cleanup/duplicate-members.ts deleted file mode 100644 index 7f986359f2..0000000000 --- a/services/apps/script_executor_worker/src/workflows/cleanup/duplicate-members.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { continueAsNew, proxyActivities, sleep } from '@temporalio/workflow' - -import * as activities from '../../activities' -import { ICleanupDuplicateMembersArgs } from '../../types' - -const { getWorkflowsCount, findDuplicateMembersAfterDate, mergeMembers } = proxyActivities< - typeof activities ->({ - startToCloseTimeout: '30 minutes', - retry: { maximumAttempts: 3, backoffCoefficient: 3 }, -}) - -export async function cleanupDuplicateMembers(args: ICleanupDuplicateMembersArgs): Promise { - const BATCH_SIZE = args.batchSize ?? 100 - const testRun = args.testRun ?? false - const cutoffDate = args.cutoffDate ?? '2025-05-18' - const checkByActivityIdentity = args.checkByActivityIdentity ?? false - const checkByTwitterIdentity = args.checkByTwitterIdentity ?? false - const WORKFLOWS_THRESHOLD = 20 - - const workflowTypeToCheck = 'finishMemberMerging' - let workflowsCount = await getWorkflowsCount(workflowTypeToCheck, 'Running') - - // Prevent blowing up postgres with too many merge workflows - while (workflowsCount > WORKFLOWS_THRESHOLD) { - console.log(`Too many running finishMemberMerging workflows (count: ${workflowsCount})`) - - // Wait for 5 minutes - await sleep('5 minutes') - - workflowsCount = await getWorkflowsCount(workflowTypeToCheck, 'Running') - } - - const results = await findDuplicateMembersAfterDate( - cutoffDate, - BATCH_SIZE, - checkByActivityIdentity, - checkByTwitterIdentity, - ) - - if (results.length === 0) { - console.log('No duplicate members found!') - return - } - - // execute merge in parallel - try { - await Promise.all( - results.map((result) => { - console.log(`Merging members ${result.primaryId} and ${result.secondaryId}`) - return mergeMembers(result.primaryId, result.secondaryId) - }), - ) - } catch (error) { - console.error('Error merging members!', error) - } - - if (testRun) { - console.log('Test run completed - stopping after first batch!') - return - } - - await continueAsNew(args) -} diff --git a/services/apps/script_executor_worker/src/workflows/prune-duplicate-organizations.ts b/services/apps/script_executor_worker/src/workflows/prune-duplicate-organizations.ts new file mode 100644 index 0000000000..0f262f14ec --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/prune-duplicate-organizations.ts @@ -0,0 +1,37 @@ +import { proxyActivities } from '@temporalio/workflow' + +import * as activities from '../activities' +import { IScriptBatchTestArgs } from '../types' +import { chunkArray } from '../utils/common' + +const { getOrganizationsToPrune, pruneOrganization, syncRemoveOrganization } = proxyActivities< + typeof activities +>({ + startToCloseTimeout: '30 minutes', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, +}) + +export async function pruneDuplicateOrganizations(args: IScriptBatchTestArgs): Promise { + const BATCH_SIZE = args.batchSize ?? 100 + + const organizationsToPrune = await getOrganizationsToPrune(BATCH_SIZE) + + if (organizationsToPrune.length === 0) { + console.log('No more organizations to prune!') + return + } + + const CHUNK_SIZE = 25 + + for (const chunk of chunkArray(organizationsToPrune, CHUNK_SIZE)) { + const cleanupTasks = chunk.map(async (o) => { + console.log('Pruning organization', o.displayName) + await syncRemoveOrganization(o.id) + return pruneOrganization(o.id) + }) + + await Promise.all(cleanupTasks).catch((err) => { + console.error('Error pruning organizations!', err) + }) + } +} diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts index c97193e08b..f255d659ea 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts @@ -179,6 +179,87 @@ class OrganizationRepository { { organizationId, limit, offset }, ) } + + public async pruneOrganization(organizationId: string): Promise { + const tablesToDelete = [ + { name: 'organizationNoMerge', conditions: ['organizationId', 'noMergeId'] }, + { name: 'organizationToMerge', conditions: ['organizationId', 'toMergeId'] }, + { name: 'organizationToMergeRaw', conditions: ['organizationId', 'toMergeId'] }, + { name: 'organizationEnrichmentCache', conditions: ['organizationId'] }, + { name: 'organizationEnrichments', conditions: ['organizationId'] }, + { name: 'memberSegmentAffiliations', conditions: ['organizationId'] }, + { name: 'organizationSegmentsAgg', conditions: ['organizationId'] }, + { name: 'organizationSegments', conditions: ['organizationId'] }, + { name: 'orgAttributes', conditions: ['organizationId'] }, + { name: 'organizationIdentities', conditions: ['organizationId'] }, + { name: 'memberOrganizations', conditions: ['organizationId'] }, + { name: 'organizations', conditions: ['id'] }, + ] + + await this.connection.tx(async (tx) => { + for (const table of tablesToDelete) { + const whereClause = table.conditions + .map((field) => `"${field}" = $(organizationId)`) + .join(' OR ') + await tx.none(`DELETE FROM "${table.name}" WHERE ${whereClause}`, { organizationId }) + } + }) + } + + public async getOrganizationsToPrune( + batchSize: number, + ): Promise<{ id: string; displayName: string }[]> { + return this.connection.query( + ` + WITH email_providers AS ( + SELECT unnest(ARRAY[ + 'gmail.com', 'gmail.co.uk', 'gmail.com.au', 'gmail.com.tr', + 'yahoo.com', 'yahoo.co.uk', 'yahoo.com.br', 'yahoo.co.in', 'yahoo.fr', 'yahoo.es', 'yahoo.it', 'yahoo.de', 'yahoo.ca', 'yahoo.com.au', 'yahoo.in', 'yahoo.co.jp', 'yahoo.com.ar', 'yahoo.com.mx', 'yahoo.co.id', 'yahoo.com.sg', 'yahoo.co.za', 'yahoo.com.ph', 'yahoo.com.tw', 'yahoo.com.hk', 'yahoo.com.vn', + 'hotmail.com', 'hotmail.co.uk', 'hotmail.fr', 'hotmail.ca', 'hotmail.it', 'hotmail.es', 'hotmail.de', 'hotmail.com.au', 'hotmail.com.mx', + 'icloud.com', 'icloud.com.cn', + 'fastmail.com', 'tutanota.com', 'tuta.io', + 'gmx.com', 'gmx.de', 'gmx.net', 'gmx.at', 'gmx.ch', 'gmx.fr', 'gmx.co.uk', + 'aol.com', 'aol.co.uk', 'aol.fr', 'aol.de', + 'msn.com', 'wanadoo.fr', 'orange.fr', 'comcast.net', + 'live.com', 'live.co.uk', 'live.fr', 'live.nl', 'live.it', 'live.com.au', 'live.ca', 'live.cn', + 'rediffmail.com', 'sify.com', 'indiatimes.com', 'free.fr', 'web.de', + 'yandex.ru', 'yandex.com', 'yandex.com.tr', 'ya.ru', + 'ymail.com', 'libero.it', + 'outlook.com', 'outlook.fr', 'outlook.co.uk', 'outlook.de', 'outlook.es', 'outlook.it', 'outlook.com.au', 'outlook.com.br', 'outlook.com.mx', 'outlook.co.jp', 'outlook.in', 'outlook.com.sg', 'outlook.co.za', 'outlook.co.in', + 'uol.com.br', 'bol.com.br', + 'mail.ru', 'inbox.ru', 'list.ru', 'bk.ru', + 'mail.com', 'mail.de', 'mail.co.uk', + 'cox.net', 'sbcglobal.net', 'sfr.fr', 'verizon.net', 'googlemail.com', 'ig.com.br', 'bigpond.com', 'bigpond.net.au', 'terra.com.br', 'neuf.fr', 'alice.it', 'rocketmail.com', 'att.net', 'laposte.net', 'bellsouth.net', 'charter.net', 'rambler.ru', 'tiscali.it', 'tiscali.co.uk', 'shaw.ca', 'sky.com', 'earthlink.net', 'optonline.net', 'freenet.de', 't-online.de', 'aliceadsl.fr', 'virgilio.it', 'home.nl', 'qq.com', 'vip.qq.com', 'telenet.be', 'pandora.be', 'me.com', 'voila.fr', 'planet.nl', 'tin.it', 'ntlworld.com', 'arcor.de', 'frontiernet.net', 'hetnet.nl', 'zonnet.nl', 'club-internet.fr', 'juno.com', 'optusnet.com.au', 'blueyonder.co.uk', 'bluewin.ch', 'skynet.be', 'sympatico.ca', 'windstream.net', 'mac.com', 'centurytel.net', 'chello.nl', 'aim.com', + 'protonmail.com', 'protonmail.ch', 'proton.me', 'pm.me', 'duck.com', + 'zoho.com', 'zohomail.com', + 'users.noreply.github.com', + '126.com', '139.com', '163.com', '188.com', 'foxmail.com', 'tom.com', '21cn.com', 'yeah.net', + 'naver.com', 'daum.net', 'hanmail.net', + 'hey.com', 'inbox.com', 'lycos.com', 'excite.com', 'hushmail.com', 'mailfence.com', 'mailbox.org', 'posteo.de', 'startmail.com', 'runbox.com', 'countermail.com', 'mynet.com', + 'wp.pl', 'onet.pl', 'interia.pl', 'o2.pl', + 'seznam.cz', 'centrum.cz', + 'mailinator.com', 'guerrillamail.com', '10minutemail.com', 'tempmail.com' + ]) AS provider + ) + SELECT DISTINCT o.id, o."displayName" + FROM organizations o + INNER JOIN "organizationIdentities" oi + ON o.id = oi."organizationId" + INNER JOIN email_providers ep + ON LOWER(oi.value) = ep.provider + WHERE o."deletedAt" IS NULL + AND oi.type = 'primary-domain' + AND NOT EXISTS ( + SELECT 1 + FROM "memberOrganizations" mo + WHERE mo."organizationId" = o.id + AND (mo.title IS NOT NULL AND mo.title != '') + AND (mo.source IS NOT NULL AND mo.source NOT IN ('email-domain')) + ) + `, + { batchSize }, + ) + } } export default OrganizationRepository From bd1a315ca96e1de9b39cbead32ca0ccaab036947 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:48:30 +0530 Subject: [PATCH 2/3] fix: resolve pr comments --- .../src/old/apps/script_executor_worker/organization.repo.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts index f255d659ea..6a53f66a20 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts @@ -253,9 +253,11 @@ class OrganizationRepository { SELECT 1 FROM "memberOrganizations" mo WHERE mo."organizationId" = o.id + AND mo."deletedAt" IS NULL AND (mo.title IS NOT NULL AND mo.title != '') AND (mo.source IS NOT NULL AND mo.source NOT IN ('email-domain')) ) + LIMIT $(batchSize); `, { batchSize }, ) From fd852e451465222309d70d5edad84f375ccfd30d Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Wed, 12 Nov 2025 19:15:27 +0530 Subject: [PATCH 3/3] chore: add script for pruning and refreshing member organizations --- .../script_executor_worker/src/activities.ts | 6 ++ .../prune-duplicate-organizations.ts | 38 +++++++++++++ .../script_executor_worker/src/workflows.ts | 2 + .../prune-incorrect-member-organizations.ts | 42 ++++++++++++++ .../organization.repo.ts | 55 ++++++++++++++++++- 5 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 services/apps/script_executor_worker/src/workflows/prune-incorrect-member-organizations.ts diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index bad2073e49..240d50ec08 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -41,8 +41,11 @@ import { } from './activities/merge-members-with-similar-identities' import { getUnprocessedLLMApprovedSuggestions } from './activities/process-llm-verified-merges' import { + getMemberOrganizationsToPrune, getOrganizationsToPrune, + pruneMemberOrganization, pruneOrganization, + refreshMemberAffiliations, } from './activities/prune-duplicate-organizations' import { deleteIndexedEntities } from './activities/sync/entity-index' import { getMembersForSync, syncMembersBatch } from './activities/sync/member' @@ -86,4 +89,7 @@ export { calculateMemberAffiliations, getOrganizationsToPrune, pruneOrganization, + getMemberOrganizationsToPrune, + pruneMemberOrganization, + refreshMemberAffiliations, } diff --git a/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts b/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts index 9bf81c391b..67dfa01f69 100644 --- a/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts +++ b/services/apps/script_executor_worker/src/activities/prune-duplicate-organizations.ts @@ -1,4 +1,7 @@ +import { refreshMemberOrganizationAffiliations } from '@crowd/data-access-layer/src/member-organization-affiliation' +import { deleteMemberOrganizations } from '@crowd/data-access-layer/src/members' import OrganizationRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/organization.repo' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { svc } from '../main' @@ -23,3 +26,38 @@ export async function getOrganizationsToPrune( throw error } } + +export async function getMemberOrganizationsToPrune( + batchSize: number, +): Promise<{ id: string; memberId: string }[]> { + try { + const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log) + return orgRepo.getMemberOrganizationsToPrune(batchSize) + } catch (error) { + svc.log.error(error, 'Error getting member organizations to prune!') + throw error + } +} + +export async function pruneMemberOrganization( + memberOrganizationId: string, + memberId: string, +): Promise { + try { + const qx = pgpQx(svc.postgres.writer.connection()) + await deleteMemberOrganizations(qx, memberId, [memberOrganizationId], false) + } catch (error) { + svc.log.error(error, 'Error pruning member organization!') + throw error + } +} + +export async function refreshMemberAffiliations(memberId: string): Promise { + try { + const qx = pgpQx(svc.postgres.writer.connection()) + await refreshMemberOrganizationAffiliations(qx, memberId) + } catch (error) { + svc.log.error(error, 'Error refreshing member affiliations!') + throw error + } +} diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 3031b313c8..f5bb33f30a 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -8,6 +8,7 @@ import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliatio import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges' import { pruneDuplicateOrganizations } from './workflows/prune-duplicate-organizations' +import { pruneIncorrectMemberOrganizations } from './workflows/prune-incorrect-member-organizations' import { syncMembers } from './workflows/sync/members' import { syncOrganizations } from './workflows/sync/organizations' @@ -24,4 +25,5 @@ export { pruneDuplicateOrganizations, fixBotMembersAffiliation, blockOrganizationAffiliation, + pruneIncorrectMemberOrganizations, } diff --git a/services/apps/script_executor_worker/src/workflows/prune-incorrect-member-organizations.ts b/services/apps/script_executor_worker/src/workflows/prune-incorrect-member-organizations.ts new file mode 100644 index 0000000000..e295c8478e --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/prune-incorrect-member-organizations.ts @@ -0,0 +1,42 @@ +import { proxyActivities } from '@temporalio/workflow' + +import * as activities from '../activities' +import { IScriptBatchTestArgs } from '../types' +import { chunkArray } from '../utils/common' + +const { getMemberOrganizationsToPrune, pruneMemberOrganization, refreshMemberAffiliations } = + proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, + }) + +export async function pruneIncorrectMemberOrganizations(args: IScriptBatchTestArgs): Promise { + const BATCH_SIZE = args.batchSize ?? 100 + + const memberOrganizationsToPrune = await getMemberOrganizationsToPrune(BATCH_SIZE) + + if (memberOrganizationsToPrune.length === 0) { + console.log('No more member organizations to prune!') + return + } + + const memberIdsToRefresh = new Set() + const CHUNK_SIZE = 25 + + for (const chunk of chunkArray(memberOrganizationsToPrune, CHUNK_SIZE)) { + const cleanupTasks = chunk.map(async (mo) => { + console.log('Pruning member organization', mo.id) + await pruneMemberOrganization(mo.id, mo.memberId) + memberIdsToRefresh.add(mo.memberId) + }) + + await Promise.all(cleanupTasks).catch((err) => { + console.error('Error pruning member organizations!', err) + }) + } + + for (const memberId of memberIdsToRefresh) { + console.log('Refreshing member affiliations', memberId) + await refreshMemberAffiliations(memberId) + } +} diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts index 6a53f66a20..5409b7a408 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts @@ -254,7 +254,7 @@ class OrganizationRepository { FROM "memberOrganizations" mo WHERE mo."organizationId" = o.id AND mo."deletedAt" IS NULL - AND (mo.title IS NOT NULL AND mo.title != '') + AND (mo.title IS NOT NULL OR mo.title != '') AND (mo.source IS NOT NULL AND mo.source NOT IN ('email-domain')) ) LIMIT $(batchSize); @@ -262,6 +262,59 @@ class OrganizationRepository { { batchSize }, ) } + + public async getMemberOrganizationsToPrune( + batchSize: number, + ): Promise<{ id: string; memberId: string }[]> { + return this.connection.query( + ` + WITH email_providers AS ( + SELECT unnest(ARRAY[ + 'gmail.com', 'gmail.co.uk', 'gmail.com.au', 'gmail.com.tr', + 'yahoo.com', 'yahoo.co.uk', 'yahoo.com.br', 'yahoo.co.in', 'yahoo.fr', 'yahoo.es', 'yahoo.it', 'yahoo.de', 'yahoo.ca', 'yahoo.com.au', 'yahoo.in', 'yahoo.co.jp', 'yahoo.com.ar', 'yahoo.com.mx', 'yahoo.co.id', 'yahoo.com.sg', 'yahoo.co.za', 'yahoo.com.ph', 'yahoo.com.tw', 'yahoo.com.hk', 'yahoo.com.vn', + 'hotmail.com', 'hotmail.co.uk', 'hotmail.fr', 'hotmail.ca', 'hotmail.it', 'hotmail.es', 'hotmail.de', 'hotmail.com.au', 'hotmail.com.mx', + 'icloud.com', 'icloud.com.cn', + 'fastmail.com', 'tutanota.com', 'tuta.io', + 'gmx.com', 'gmx.de', 'gmx.net', 'gmx.at', 'gmx.ch', 'gmx.fr', 'gmx.co.uk', + 'aol.com', 'aol.co.uk', 'aol.fr', 'aol.de', + 'msn.com', 'wanadoo.fr', 'orange.fr', 'comcast.net', + 'live.com', 'live.co.uk', 'live.fr', 'live.nl', 'live.it', 'live.com.au', 'live.ca', 'live.cn', + 'rediffmail.com', 'sify.com', 'indiatimes.com', 'free.fr', 'web.de', + 'yandex.ru', 'yandex.com', 'yandex.com.tr', 'ya.ru', + 'ymail.com', 'libero.it', + 'outlook.com', 'outlook.fr', 'outlook.co.uk', 'outlook.de', 'outlook.es', 'outlook.it', 'outlook.com.au', 'outlook.com.br', 'outlook.com.mx', 'outlook.co.jp', 'outlook.in', 'outlook.com.sg', 'outlook.co.za', 'outlook.co.in', + 'uol.com.br', 'bol.com.br', + 'mail.ru', 'inbox.ru', 'list.ru', 'bk.ru', + 'mail.com', 'mail.de', 'mail.co.uk', + 'cox.net', 'sbcglobal.net', 'sfr.fr', 'verizon.net', 'googlemail.com', 'ig.com.br', 'bigpond.com', 'bigpond.net.au', 'terra.com.br', 'neuf.fr', 'alice.it', 'rocketmail.com', 'att.net', 'laposte.net', 'bellsouth.net', 'charter.net', 'rambler.ru', 'tiscali.it', 'tiscali.co.uk', 'shaw.ca', 'sky.com', 'earthlink.net', 'optonline.net', 'freenet.de', 't-online.de', 'aliceadsl.fr', 'virgilio.it', 'home.nl', 'qq.com', 'vip.qq.com', 'telenet.be', 'pandora.be', 'me.com', 'voila.fr', 'planet.nl', 'tin.it', 'ntlworld.com', 'arcor.de', 'frontiernet.net', 'hetnet.nl', 'zonnet.nl', 'club-internet.fr', 'juno.com', 'optusnet.com.au', 'blueyonder.co.uk', 'bluewin.ch', 'skynet.be', 'sympatico.ca', 'windstream.net', 'mac.com', 'centurytel.net', 'chello.nl', 'aim.com', + 'protonmail.com', 'protonmail.ch', 'proton.me', 'pm.me', 'duck.com', + 'zoho.com', 'zohomail.com', + 'users.noreply.github.com', + '126.com', '139.com', '163.com', '188.com', 'foxmail.com', 'tom.com', '21cn.com', 'yeah.net', + 'naver.com', 'daum.net', 'hanmail.net', + 'hey.com', 'inbox.com', 'lycos.com', 'excite.com', 'hushmail.com', 'mailfence.com', 'mailbox.org', 'posteo.de', 'startmail.com', 'runbox.com', 'countermail.com', 'mynet.com', + 'wp.pl', 'onet.pl', 'interia.pl', 'o2.pl', + 'seznam.cz', 'centrum.cz', + 'mailinator.com', 'guerrillamail.com', '10minutemail.com', 'tempmail.com' + ]) AS provider + ), + orgs_with_email_provider AS ( + SELECT DISTINCT o.id + FROM organizations o + INNER JOIN "organizationIdentities" oi ON o.id = oi."organizationId" + INNER JOIN email_providers ep ON LOWER(oi.value) = ep.provider + WHERE o."deletedAt" IS NULL + AND oi.type = 'primary-domain' + ) + SELECT DISTINCT mo.id, mo."memberId" + FROM "memberOrganizations" mo + INNER JOIN orgs_with_email_provider oep ON mo."organizationId" = oep.id + WHERE mo.source = 'email-domain' AND (mo.title IS NULL OR mo.title = '') + LIMIT $(batchSize); + `, + { batchSize }, + ) + } } export default OrganizationRepository