From d9975110a6ed581500399a01d82dd7422d113674 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:40:58 +0530 Subject: [PATCH 1/7] feat: infer memberOrganization stint dates from work-email activities Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...ganizations-email-domain-partial-index.sql | 3 + ...inferMemberOrganizationStintChanges.job.ts | 129 +++++++++ services/apps/data_sink_worker/package.json | 2 - .../src/bin/map-member-to-org.ts | 97 ------- .../src/bin/map-tenant-members-to-org.ts | 98 ------- .../src/service/activity.service.ts | 11 + .../src/service/member.service.ts | 56 +++- .../src/services/common.member.service.ts | 40 ++- .../common_services/src/services/index.ts | 2 +- .../src/services/member-organization.ts | 251 ++++++++++++++++++ .../src/services/member/unmerge.ts | 2 +- .../src/services/memberOrganization.ts | 113 -------- .../src/members/organizations.ts | 17 ++ .../data-access-layer/src/members/segments.ts | 20 +- .../repo/memberAffiliation.data.ts | 3 + services/libs/types/src/organizations.ts | 16 ++ 16 files changed, 542 insertions(+), 318 deletions(-) create mode 100644 backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql create mode 100644 services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts delete mode 100644 services/apps/data_sink_worker/src/bin/map-member-to-org.ts delete mode 100644 services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts create mode 100644 services/libs/common_services/src/services/member-organization.ts delete mode 100644 services/libs/common_services/src/services/memberOrganization.ts diff --git a/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql b/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql new file mode 100644 index 0000000000..404f5c18be --- /dev/null +++ b/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql @@ -0,0 +1,3 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain" + ON "memberOrganizations" ("memberId") + WHERE "source" = 'email-domain' AND "deletedAt" IS NULL; diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts new file mode 100644 index 0000000000..56fec83064 --- /dev/null +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -0,0 +1,129 @@ +import CronTime from 'cron-time-generator' +import { + inferMemberOrganizationStintChanges, + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE +} from '@crowd/common_services' +import { + createMemberOrganization, + fetchMemberOrganizationsBySource, + updateMemberOrganization, +} from '@crowd/data-access-layer' +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { pgpQx, QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' +import { REDIS_CONFIG, RedisClient, getRedisClient } from '@crowd/redis' +import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types' +import { IJobDefinition } from '../types' + +const job: IJobDefinition = { + name: 'infer-member-organization-stint-changes', + cronTime: CronTime.every(5).minutes(), + timeout: 10 * 60, + process: async (ctx) => { + const redis = await getRedisClient(REDIS_CONFIG()) + const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0) + const qx = pgpQx(db) + + // 1. Fetch a batch of work triggers (Member IDs) + const memberIds = await redis.sPop(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + if (!memberIds?.length) return + + ctx.log.info({ count: memberIds.length }, 'Processing pending members.') + const stats = { processed: 0, inserts: 0, updates: 0 } + + for (const memberId of memberIds) { + try { + // 2. Get the activity dates for this member + const activityDates = await popMemberOrganizationActivityDates(redis, memberId) + + // If the member has no activity dates, move to the next member + if (activityDates.length === 0) continue + + // 3. Sync with existing database state + const existingOrgs = await fetchMemberOrganizationsBySource( + qx, + memberId, + OrganizationSource.EMAIL_DOMAIN, + ) + + // 4. Calculate required stint changes + const stintChanges = inferMemberOrganizationStintChanges( + memberId, + existingOrgs, + activityDates, + ) + + if (stintChanges.length === 0) continue + + const counts = { + inserts: stintChanges.filter((c) => c.type === 'insert').length, + updates: stintChanges.filter((c) => c.type === 'update').length, + } + + ctx.log.info({ memberId, ...counts }, 'Stint changes identified.') + + ctx.log.debug( + { memberId, activityDates, existingOrgs, stintChanges }, + 'Stint inference trace.', + ) + + // @todo: Enable writes after dry-run validation + // await applyStintChanges(qx, stintChanges) + + stats.processed++ + stats.inserts += counts.inserts + stats.updates += counts.updates + } catch (err) { + ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') + } + } + + ctx.log.info(stats, 'Batch inference complete.') + }, +} + +async function popMemberOrganizationActivityDates( + redis: RedisClient, + memberId: string, +): Promise { + const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + + // hGetAll + del in a multi block makes the "Pop" atomic for the entire Hash + const [hash] = (await redis.multi().hGetAll(key).del(key).exec()) as [Record | null, number] + + if (!hash || Object.keys(hash).length === 0) return [] + + return Object.entries(hash) + .flatMap(([organizationId, datesJson]) => + (JSON.parse(datesJson) as string[]).map((date) => ({ organizationId, date })), + ) + .sort((a, b) => a.date.localeCompare(b.date)) +} + +async function applyStintChanges( + qx: QueryExecutor, + stintChanges: MemberOrgStintChange[], +): Promise { + for (const change of stintChanges) { + if (change.type === 'insert') { + await createMemberOrganization(qx, change.memberId, { + organizationId: change.organizationId, + dateStart: change.dateStart, + dateEnd: change.dateEnd, + source: OrganizationSource.EMAIL_DOMAIN, + }) + continue + } + + if (!change.id) { + throw new Error('Missing id for update stint change.') + } + + await updateMemberOrganization(qx, change.memberId, change.id, { + dateStart: change.dateStart, + dateEnd: change.dateEnd, + }) + } +} + +export default job \ No newline at end of file diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index c62c1e20d8..f9a2363081 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -17,8 +17,6 @@ "script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts", "script:process-results": "SERVICE=script tsx src/bin/process-results.ts", "script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts", - "script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts", - "script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts", "script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts", "script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts", "script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts", diff --git a/services/apps/data_sink_worker/src/bin/map-member-to-org.ts b/services/apps/data_sink_worker/src/bin/map-member-to-org.ts deleted file mode 100644 index 7f63a2dc18..0000000000 --- a/services/apps/data_sink_worker/src/bin/map-member-to-org.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' -import { dbStoreQx, findIdentitiesForMembers } from '@crowd/data-access-layer' -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' -import { getServiceLogger } from '@crowd/logging' -import { QueueFactory } from '@crowd/queue' -import { getRedisClient } from '@crowd/redis' -import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' -import { MemberIdentityType } from '@crowd/types' - -import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG, TEMPORAL_CONFIG } from '../conf' -import MemberService from '../service/member.service' -import { OrganizationService } from '../service/organization.service' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: memberId') - process.exit(1) -} - -const memberId = processArguments[0] - -setImmediate(async () => { - let temporal: TemporalClient | undefined - if (TEMPORAL_CONFIG().serverUrl) { - temporal = await getTemporalClient(TEMPORAL_CONFIG()) - } - - const redis = await getRedisClient(REDIS_CONFIG()) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - - const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) - const emitter = new DataSinkWorkerEmitter(queueClient, log) - await emitter.init() - - const dataSinkRepo = new DataSinkRepository(store, log) - const memberRepo = new MemberRepository(store, log) - - const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, log) - await searchSyncWorkerEmitter.init() - - const memberService = new MemberService(store, redis, temporal, log) - const orgService = new OrganizationService(store, log) - - try { - const members = await memberRepo.findByIds([memberId]) - - const member = members[0] - - if (!member) { - log.error({ memberId }, 'Member not found!') - process.exit(1) - } - - const identities = (await findIdentitiesForMembers(dbStoreQx(store), [memberId])).get(memberId) - log.info(`Processing memberId: ${member.id}`) - - const segmentIds = await dataSinkRepo.getSegmentIds() - const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id - - const emailIdentities = identities.filter( - (i) => i.verified && i.type === MemberIdentityType.EMAIL, - ) - if (emailIdentities.length > 0) { - const emails = emailIdentities.map((i) => i.value) - log.info({ memberId, emails }, 'Member emails!') - const orgs = await memberService.assignOrganizationByEmailDomain(null, emails) - - if (orgs.length > 0) { - log.info('Organizations found with matching email domains:', JSON.stringify(orgs)) - orgService.addToMember([segmentId], member.id, orgs) - - for (const org of orgs) { - await searchSyncWorkerEmitter.triggerOrganizationSync(org.id, true, segmentId) - } - - await searchSyncWorkerEmitter.triggerMemberSync(member.id, true, segmentId) - log.info('Done mapping member to organizations!') - } else { - log.info('No organizations found with matching email domains!') - } - } else { - log.info('No emails found for member!') - } - - process.exit(0) - } catch (err) { - log.error('Failed to map organizations for member!', err) - process.exit(1) - } -}) diff --git a/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts b/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts deleted file mode 100644 index 04d23a1179..0000000000 --- a/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' -import { getServiceLogger } from '@crowd/logging' -import { QueueFactory } from '@crowd/queue' -import { getRedisClient } from '@crowd/redis' -import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' - -import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG, TEMPORAL_CONFIG } from '../conf' -import MemberService from '../service/member.service' -import { OrganizationService } from '../service/organization.service' - -const log = getServiceLogger() - -setImmediate(async () => { - let temporal: TemporalClient | undefined - if (TEMPORAL_CONFIG().serverUrl) { - temporal = await getTemporalClient(TEMPORAL_CONFIG()) - } - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - - const redis = await getRedisClient(REDIS_CONFIG()) - - const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) - const emitter = new DataSinkWorkerEmitter(queueClient, log) - await emitter.init() - - const dataSinkRepo = new DataSinkRepository(store, log) - const memberRepo = new MemberRepository(store, log) - - const segmentIds = await dataSinkRepo.getSegmentIds() - const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id - - const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, log) - await searchSyncWorkerEmitter.init() - - const memberService = new MemberService(store, redis, temporal, log) - const orgService = new OrganizationService(store, log) - - const limit = 100 - let offset = 0 - let processedMembers = 0 - - let currentMemberId = null - let currentEmails = null - - try { - const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(segmentIds, { - limit, - offset, - countOnly: true, - }) - - log.info(`Total members found in the tenant: ${totalCount}`) - - do { - const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(segmentIds, { - limit, - offset, - }) - - // member -> organization based on email domain - for (const member of members) { - currentMemberId = member.id - currentEmails = member.emails - if (member.emails) { - const orgs = await memberService.assignOrganizationByEmailDomain(null, member.emails) - - if (orgs.length > 0) { - orgService.addToMember([segmentId], member.id, orgs) - - for (const org of orgs) { - await searchSyncWorkerEmitter.triggerOrganizationSync(org.id, true, segmentId) - } - - await searchSyncWorkerEmitter.triggerMemberSync(member.id, true, segmentId) - } - } - - processedMembers++ - log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`) - } - offset += limit - } while (totalCount > offset) - - log.info(`Member to organization association completed`) - process.exit(0) - } catch (err) { - log.error( - `Failed to assign member to organizations. Member ID: ${currentMemberId}, Emails: ${currentEmails}`, - err, - ) - process.exit(1) - } -}) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 5e4b4cdb7f..46d4f23c6d 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1197,6 +1197,7 @@ export default class ActivityService extends LoggerBase { value.platform, undefined, orgPromiseCache, + value.timestamp, ) .then((memberId) => { // map ids for members @@ -1342,6 +1343,7 @@ export default class ActivityService extends LoggerBase { payload.platform, undefined, orgPromiseCache, + payload.activity.timestamp, ) .then(() => { payload.memberId = payload.dbMember.id @@ -1400,6 +1402,7 @@ export default class ActivityService extends LoggerBase { payload.platform, undefined, orgPromiseCache, + payload.activity.timestamp, ) .then(() => { payload.objectMemberId = payload.dbObjectMember.id @@ -1447,11 +1450,19 @@ export default class ActivityService extends LoggerBase { ) as boolean if (!isBot) { + const verifiedEmailIdentity = payload.activity.member.identities?.find( + (i) => i.type === MemberIdentityType.EMAIL && i.verified, + ) + const emailDomain = verifiedEmailIdentity + ? verifiedEmailIdentity.value.split('@')[1] + : undefined + // associate activity with organization payload.organizationId = await this.commonMemberService.findAffiliation( payload.memberId, payload.segmentId, payload.activity.timestamp, + emailDomain, ) } else { // for bot members, we don't want to affiliate the activity with an organization diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 8aa78b1cac..25ba57c79b 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -12,7 +12,12 @@ import { isObjectEmpty, singleOrDefault, } from '@crowd/common' -import { BotDetectionService, CommonMemberService } from '@crowd/common_services' +import { + BotDetectionService, + CommonMemberService, + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, +} from '@crowd/common_services' import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer' import { findIdentitiesForMembers, @@ -121,6 +126,7 @@ export default class MemberService extends LoggerBase { platform: PlatformType, releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, + activityTimestamp?: string, ): Promise { return logExecutionTimeV2( async () => { @@ -448,6 +454,8 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, + id, + activityTimestamp, ), this.log, 'memberService -> create -> assignOrganizationByEmailDomain', @@ -505,6 +513,7 @@ export default class MemberService extends LoggerBase { platform: PlatformType, releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, + activityTimestamp?: string, ): Promise { await logExecutionTimeV2( async () => { @@ -682,6 +691,8 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, + id, + activityTimestamp, ), this.log, 'memberService -> update -> assignOrganizationByEmailDomain', @@ -733,6 +744,8 @@ export default class MemberService extends LoggerBase { integrationId: string, emails: string[], orgPromiseCache?: Map>, + memberId?: string, + activityTimestamp?: string, ): Promise { const orgService = new OrganizationService(this.store, this.log) const organizations: IOrganizationIdSource[] = [] @@ -791,6 +804,9 @@ export default class MemberService extends LoggerBase { id: orgId, source: orgSource, }) + if (memberId && activityTimestamp) { + await this.bufferMemberOrganizationActivityDates(memberId, orgId, activityTimestamp) + } } } @@ -1047,4 +1063,42 @@ export default class MemberService extends LoggerBase { }, }) } + + /** + * Queues member activity dates in Redis as raw input for stint inference. + * + * To maximize throughput, this uses a non-atomic HGET/HSET pattern. While + * concurrent writes may occasionally drop a date, the system is self-healing + * as future activity will re-populate the buffer. + */ + private async bufferMemberOrganizationActivityDates( + memberId: string, + organizationId: string, + activityTimestamp: string, + ): Promise { + // 1. Normalize the timestamp to a simple YYYY-MM-DD string + const date = new Date(activityTimestamp).toISOString().slice(0, 10) + const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + + // 2. Fetch and de-duplicate the date within the organization's buffer + const existing = await this.redisClient.hGet(key, organizationId) + const dates: string[] = existing ? JSON.parse(existing) : [] + + if (dates.includes(date)) { + this.log.trace({ memberId, organizationId, date }, 'Date already buffered, skipping.') + } else { + dates.push(date) + dates.sort() + + // 3. Update the buffer with the new sorted date list + await this.redisClient.hSet(key, organizationId, JSON.stringify(dates)) + this.log.debug( + { memberId, organizationId, date, totalDates: dates.length }, + 'Buffered activity date for stint inference.', + ) + } + + // 4. add the member to the inference queue + await this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + } } diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index 70c70e0647..e4ecd1956b 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -51,6 +51,7 @@ import { IWorkExperienceData } from '@crowd/data-access-layer/src/old/apps/data_ import { addOrgsToSegments } from '@crowd/data-access-layer/src/organizations' import { Logger, LoggerBase } from '@crowd/logging' import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal' +import { OrganizationSource } from '@crowd/types' import { MergeActionState, MergeActionStep, @@ -169,6 +170,7 @@ export class CommonMemberService extends LoggerBase { memberId: string, segmentId: string, timestamp: string, + emailDomain?: string, ): Promise { const manualAffiliation = await findMemberManualAffiliation( this.qx, @@ -180,7 +182,12 @@ export class CommonMemberService extends LoggerBase { return manualAffiliation.organizationId } - const currentEmployments = await findMemberWorkExperience(this.qx, memberId, timestamp) + const currentEmployments = await findMemberWorkExperience( + this.qx, + memberId, + timestamp, + emailDomain, + ) if (currentEmployments.length > 0) { return this.decidePrimaryOrganizationId(currentEmployments) } @@ -217,10 +224,37 @@ export class CommonMemberService extends LoggerBase { return primaryEmployment.organizationId } + // Filter by source priority + // Source rank: ui > email-domain > enrichment-* > Other + const rankSource = (source?: string) => { + if (source === OrganizationSource.UI) return 0 + if (source === OrganizationSource.EMAIL_DOMAIN) return 1 + if (source?.startsWith('enrichment-')) return 2 + return 3 + } + + let bestRank = 4 + let highestPrioritySourceExperiences: IWorkExperienceData[] = [] + + for (const exp of experiences) { + const rank = rankSource(exp.source) + if (rank < bestRank) { + bestRank = rank + highestPrioritySourceExperiences = [exp] + } else if (rank === bestRank) { + highestPrioritySourceExperiences.push(exp) + } + } + + // Keep only candidates from the highest-priority source tier + if (highestPrioritySourceExperiences.length === 1) { + return highestPrioritySourceExperiences[0].organizationId + } + // decide based on the member count in the organizations const memberCounts = await findMemberCountEstimateOfOrganizations( this.qx, - experiences.map((e) => e.organizationId), + highestPrioritySourceExperiences.map((e) => e.organizationId), ) if (memberCounts[0].memberCount > memberCounts[1].memberCount) { @@ -230,7 +264,7 @@ export class CommonMemberService extends LoggerBase { } // if there's a draw in the member count, use the one with the longer period - return getLongestDateRange(experiences).organizationId + return getLongestDateRange(highestPrioritySourceExperiences).organizationId } return null diff --git a/services/libs/common_services/src/services/index.ts b/services/libs/common_services/src/services/index.ts index 227ef64e34..8034c4cfc6 100644 --- a/services/libs/common_services/src/services/index.ts +++ b/services/libs/common_services/src/services/index.ts @@ -3,7 +3,7 @@ export * from './llm.service' export * from './common.member.service' export * from './member/unmerge' export * from './member/cache' -export * from './memberOrganization' +export * from './member-organization' export * from './bot.service' export * from './emitters' export * from './github.integration.service' diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts new file mode 100644 index 0000000000..ec094e24c9 --- /dev/null +++ b/services/libs/common_services/src/services/member-organization.ts @@ -0,0 +1,251 @@ +import { RedisClient } from '@crowd/redis' +import { + IMemberOrganization, + MemberOrgDate, + MemberOrgStintChange, + MemberRoleUnmergeStrategy, +} from '@crowd/types' + +function roleKey( + role: IMemberOrganization, + strategy: MemberRoleUnmergeStrategy, +): string | undefined { + if (strategy === MemberRoleUnmergeStrategy.SAME_MEMBER) { + return role.organizationId + } + return role.memberId +} + +function roleExistsInArray( + role: IMemberOrganization, + roles: IMemberOrganization[], + strategy: MemberRoleUnmergeStrategy, +): boolean { + const key = roleKey(role, strategy) + return roles.some( + (r) => + roleKey(r, strategy) === key && + r.title === role.title && + r.dateStart === role.dateStart && + r.dateEnd === role.dateEnd, + ) +} + +export function rolesIntersect( + roleA: IMemberOrganization, + roleB: IMemberOrganization, + strategy: MemberRoleUnmergeStrategy, +): boolean { + if (roleKey(roleA, strategy) !== roleKey(roleB, strategy) || roleA.title !== roleB.title) { + return false + } + + const startA = new Date(roleA.dateStart).getTime() + const endA = new Date(roleA.dateEnd).getTime() + const startB = new Date(roleB.dateStart).getTime() + const endB = new Date(roleB.dateEnd).getTime() + + return ( + (startA < startB && endA > startB) || + (startB < startA && endB > startA) || + (startA < startB && endA > endB) || + (startB < startA && endB > endA) + ) +} + +export function unmergeRoles( + mergedRoles: IMemberOrganization[], + primaryBackupRoles: IMemberOrganization[], + secondaryBackupRoles: IMemberOrganization[], + strategy: MemberRoleUnmergeStrategy, +): IMemberOrganization[] { + const unmergedRoles: IMemberOrganization[] = mergedRoles.filter( + (role) => + role.source === 'ui' || + !secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), + ) + + const editableRoles = mergedRoles.filter( + (role) => + role.source !== 'ui' && + secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), + ) + + for (const secondaryBackupRole of secondaryBackupRoles) { + const { dateStart, dateEnd } = secondaryBackupRole + + if (dateStart === null && dateEnd === null) { + if ( + roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && + roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) + ) { + unmergedRoles.push(secondaryBackupRole) + } + } else if (dateStart !== null && dateEnd === null) { + const currentRoleFromPrimaryBackup = primaryBackupRoles.find( + (r) => + roleKey(r, strategy) === roleKey(secondaryBackupRole, strategy) && + r.title === secondaryBackupRole.title && + r.dateStart !== null && + r.dateEnd === null, + ) + if (currentRoleFromPrimaryBackup) { + unmergedRoles.push(currentRoleFromPrimaryBackup) + } + } else if (dateStart !== null && dateEnd !== null) { + if ( + roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && + roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) + ) { + unmergedRoles.push(secondaryBackupRole) + } else { + const intersecting = editableRoles.find((r) => + rolesIntersect(secondaryBackupRole, r, strategy), + ) + + if (intersecting) { + const fromBackup = primaryBackupRoles.find((r) => + rolesIntersect(secondaryBackupRole, r, strategy), + ) + if (fromBackup) { + unmergedRoles.push(fromBackup) + } + } + } + } + } + + return unmergedRoles +} + +export const MEMBER_ORG_STINT_CHANGES_QUEUE = 'infer-member-organization-stint-changes:members' +export const MEMBER_ORG_STINT_CHANGES_DATES_PREFIX = 'infer-member-organization-stint-changes:dates' + +interface Stint { + id: string | null + organizationId: string + dateStart: string | null + dateEnd: string | null + isDirty: boolean + isNew: boolean +} + +/** + * Core logic to determine if activity dates should expand existing stints or create new ones + */ +export function inferMemberOrganizationStintChanges( + memberId: string, + existingRows: IMemberOrganization[], + orgDates: MemberOrgDate[], +): MemberOrgStintChange[] { + const toIso = (v: string | Date) => new Date(v).toISOString().split('T')[0] + const diff = (a: string, b: string) => Math.abs(Date.parse(b) - Date.parse(a)) / 86_400_000 + + // 1. Initialize local state to track modifications and new records + const stints: Stint[] = existingRows.map((r) => ({ + id: r.id ?? null, + organizationId: r.organizationId, + dateStart: r.dateStart ? toIso(r.dateStart) : null, + dateEnd: r.dateEnd ? toIso(r.dateEnd) : null, + isDirty: false, + isNew: false, + })) + + for (const { organizationId, date: targetDate } of orgDates) { + const orgStints = stints.filter((s) => s.organizationId === organizationId) + + // 2. Skip if the date is already covered by an existing stint + if ( + orgStints.some( + (s) => s.dateStart && s.dateEnd && targetDate >= s.dateStart && targetDate <= s.dateEnd, + ) + ) + continue + + // 3. Fill undated placeholder only when no dated stint exists yet (Rule 2) + const dated = orgStints.filter((s) => s.dateStart && s.dateEnd) + const placeholder = orgStints.find((s) => !s.dateStart && !s.dateEnd) + if (placeholder && dated.length === 0) { + placeholder.dateStart = placeholder.dateEnd = targetDate + placeholder.isDirty = true + continue + } + + // 4. Find the closest neighbor stint to see if expansion is possible + let neighbor: Stint | null = null + let minGap = Infinity + + for (const s of dated) { + const gap = + targetDate > s.dateEnd! ? diff(s.dateEnd!, targetDate) : diff(targetDate, s.dateStart!) + if (gap < minGap) { + minGap = gap + neighbor = s + } + } + + if (!neighbor) { + stints.push({ + id: null, + organizationId, + dateStart: targetDate, + dateEnd: targetDate, + isDirty: true, + isNew: true, + }) + continue + } + + // 5. Determine the gap window between neighbor and targetDate, then check if another org + // holds a significant (>= 30 day) dated stint that overlaps that window (multi-stint guard) + const gapStart = targetDate > neighbor.dateEnd! ? neighbor.dateEnd! : targetDate + const gapEnd = targetDate > neighbor.dateEnd! ? targetDate : neighbor.dateStart! + const hasConflict = stints.some( + (s) => + s.organizationId !== organizationId && + s.dateStart && + s.dateEnd && + s.dateStart < gapEnd && + s.dateEnd > gapStart && + diff(s.dateStart, s.dateEnd) >= 30, + ) + + const isForward = targetDate > neighbor.dateEnd! + + if (hasConflict) { + // 6a. Another org owns the gap — start a fresh stint rather than bridging + stints.push({ + id: null, + organizationId, + dateStart: targetDate, + dateEnd: targetDate, + isDirty: true, + isNew: true, + }) + } else if (isForward && minGap <= 30) { + // 6b. Forward extension within the debounce window — skip to avoid thrashing dateEnd. + // Backward extensions are not debounced (rare, only during historical re-ingestion). + continue + } else { + // 6c. Extend the neighbor in the appropriate direction + if (isForward) neighbor.dateEnd = targetDate + else neighbor.dateStart = targetDate + neighbor.isDirty = true + } + } + + // 7. Map only modified or new stints back to change objects + return stints + .filter((s) => s.isDirty) + .map((s): MemberOrgStintChange => { + const payload = { + memberId, + organizationId: s.organizationId, + dateStart: s.dateStart as string, + dateEnd: s.dateEnd as string, + } + + if (s.isNew) return { type: 'insert', ...payload } + return { type: 'update', id: s.id as string, ...payload } + }) +} diff --git a/services/libs/common_services/src/services/member/unmerge.ts b/services/libs/common_services/src/services/member/unmerge.ts index 7d79631d92..e56a0d6ab0 100644 --- a/services/libs/common_services/src/services/member/unmerge.ts +++ b/services/libs/common_services/src/services/member/unmerge.ts @@ -58,7 +58,7 @@ import { } from '@crowd/types' import { BotDetectionService } from '../bot.service' -import { unmergeRoles } from '../memberOrganization' +import { unmergeRoles } from '../member-organization' const logger = getServiceLogger() diff --git a/services/libs/common_services/src/services/memberOrganization.ts b/services/libs/common_services/src/services/memberOrganization.ts deleted file mode 100644 index b20139e3e4..0000000000 --- a/services/libs/common_services/src/services/memberOrganization.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { IMemberOrganization, MemberRoleUnmergeStrategy } from '@crowd/types' - -function roleKey( - role: IMemberOrganization, - strategy: MemberRoleUnmergeStrategy, -): string | undefined { - if (strategy === MemberRoleUnmergeStrategy.SAME_MEMBER) { - return role.organizationId - } - return role.memberId -} - -function roleExistsInArray( - role: IMemberOrganization, - roles: IMemberOrganization[], - strategy: MemberRoleUnmergeStrategy, -): boolean { - const key = roleKey(role, strategy) - return roles.some( - (r) => - roleKey(r, strategy) === key && - r.title === role.title && - r.dateStart === role.dateStart && - r.dateEnd === role.dateEnd, - ) -} - -export function rolesIntersect( - roleA: IMemberOrganization, - roleB: IMemberOrganization, - strategy: MemberRoleUnmergeStrategy, -): boolean { - if (roleKey(roleA, strategy) !== roleKey(roleB, strategy) || roleA.title !== roleB.title) { - return false - } - - const startA = new Date(roleA.dateStart).getTime() - const endA = new Date(roleA.dateEnd).getTime() - const startB = new Date(roleB.dateStart).getTime() - const endB = new Date(roleB.dateEnd).getTime() - - return ( - (startA < startB && endA > startB) || - (startB < startA && endB > startA) || - (startA < startB && endA > endB) || - (startB < startA && endB > endA) - ) -} - -export function unmergeRoles( - mergedRoles: IMemberOrganization[], - primaryBackupRoles: IMemberOrganization[], - secondaryBackupRoles: IMemberOrganization[], - strategy: MemberRoleUnmergeStrategy, -): IMemberOrganization[] { - const unmergedRoles: IMemberOrganization[] = mergedRoles.filter( - (role) => - role.source === 'ui' || - !secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), - ) - - const editableRoles = mergedRoles.filter( - (role) => - role.source !== 'ui' && - secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), - ) - - for (const secondaryBackupRole of secondaryBackupRoles) { - const { dateStart, dateEnd } = secondaryBackupRole - - if (dateStart === null && dateEnd === null) { - if ( - roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && - roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) - ) { - unmergedRoles.push(secondaryBackupRole) - } - } else if (dateStart !== null && dateEnd === null) { - const currentRoleFromPrimaryBackup = primaryBackupRoles.find( - (r) => - roleKey(r, strategy) === roleKey(secondaryBackupRole, strategy) && - r.title === secondaryBackupRole.title && - r.dateStart !== null && - r.dateEnd === null, - ) - if (currentRoleFromPrimaryBackup) { - unmergedRoles.push(currentRoleFromPrimaryBackup) - } - } else if (dateStart !== null && dateEnd !== null) { - if ( - roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && - roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) - ) { - unmergedRoles.push(secondaryBackupRole) - } else { - const intersecting = editableRoles.find((r) => - rolesIntersect(secondaryBackupRole, r, strategy), - ) - - if (intersecting) { - const fromBackup = primaryBackupRoles.find((r) => - rolesIntersect(secondaryBackupRole, r, strategy), - ) - if (fromBackup) { - unmergedRoles.push(fromBackup) - } - } - } - } - } - - return unmergedRoles -} diff --git a/services/libs/data-access-layer/src/members/organizations.ts b/services/libs/data-access-layer/src/members/organizations.ts index 205c871a82..fcd87bbf18 100644 --- a/services/libs/data-access-layer/src/members/organizations.ts +++ b/services/libs/data-access-layer/src/members/organizations.ts @@ -42,6 +42,23 @@ export async function fetchMemberOrganizations( ) } +export async function fetchMemberOrganizationsBySource( + qx: QueryExecutor, + memberId: string, + source: OrganizationSource, +): Promise { + return qx.select( + ` + SELECT "id", "organizationId", "dateStart", "dateEnd", "title", "memberId", "source" + FROM "memberOrganizations" + WHERE "memberId" = $(memberId) + AND "source" = $(source) + AND "deletedAt" IS NULL + `, + { memberId, source }, + ) +} + export async function fetchOrganizationMemberIds( qx: QueryExecutor, organizationId: string, diff --git a/services/libs/data-access-layer/src/members/segments.ts b/services/libs/data-access-layer/src/members/segments.ts index 4960dc079a..34a15f7b5a 100644 --- a/services/libs/data-access-layer/src/members/segments.ts +++ b/services/libs/data-access-layer/src/members/segments.ts @@ -206,7 +206,21 @@ export async function findMemberWorkExperience( qx: QueryExecutor, memberId: string, timestamp: string, + emailDomain?: string, ): Promise { + let domainOrClause = '' + if (emailDomain) { + domainOrClause = ` + OR (mo."source" = 'email-domain' AND EXISTS ( + SELECT 1 FROM "organizationIdentities" oi + WHERE oi."organizationId" = mo."organizationId" + AND oi.type = 'primary-domain' + AND oi.verified = true + AND LOWER(oi.value) = LOWER($(emailDomain)) + )) + ` + } + const result = await qx.select( ` SELECT @@ -215,17 +229,19 @@ export async function findMemberWorkExperience( FROM "memberOrganizations" mo LEFT JOIN "memberOrganizationAffiliationOverrides" ovr on ovr."memberOrganizationId" = mo."id" WHERE mo."memberId" = $(memberId) + AND mo."deletedAt" IS NULL + AND coalesce(ovr."allowAffiliation", true) = true AND ( (mo."dateStart" <= $(timestamp) AND mo."dateEnd" >= $(timestamp)) OR (mo."dateStart" <= $(timestamp) AND mo."dateEnd" IS NULL) + ${domainOrClause} ) - AND mo."deletedAt" IS NULL - AND coalesce(ovr."allowAffiliation", true) = true ORDER BY mo."dateStart" DESC, mo.id `, { memberId, timestamp, + emailDomain, }, ) diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts index df444d7631..5c53320503 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts @@ -1,3 +1,5 @@ +import { OrganizationSource } from '@crowd/types' + interface BaseData { memberId: string organizationId: string @@ -15,6 +17,7 @@ export interface IManualAffiliationData extends BaseData { export interface IWorkExperienceData extends BaseData { id: string + source: OrganizationSource } export interface IOrganizationMemberCount { diff --git a/services/libs/types/src/organizations.ts b/services/libs/types/src/organizations.ts index d5b01d6beb..649cc27f6b 100644 --- a/services/libs/types/src/organizations.ts +++ b/services/libs/types/src/organizations.ts @@ -92,6 +92,22 @@ export interface IMemberRoleWithOrganization extends IMemberOrganization { organizationLogo: string } +export interface MemberOrgDate { + organizationId: string + date: string // YYYY-MM-DD +} + +interface MemberOrgStintChangeBase { + memberId: string + organizationId: string + dateStart: string + dateEnd: string +} + +export type MemberOrgStintChange = + | ({ type: 'insert' } & MemberOrgStintChangeBase) + | ({ type: 'update'; id: string } & MemberOrgStintChangeBase) + export interface IExecutiveChange { joined_date?: string pdl_id?: string From 057464d0dec92f1af7c30c3043334b09791b7b73 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 24 Apr 2026 23:04:04 +0530 Subject: [PATCH 2/7] fix: resolve pr comments from ai bots Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...inferMemberOrganizationStintChanges.job.ts | 154 ++++++++---------- .../src/service/member.service.ts | 41 +++-- .../src/services/member-organization.ts | 18 +- 3 files changed, 101 insertions(+), 112 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 56fec83064..5e31bbf46c 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -1,18 +1,16 @@ import CronTime from 'cron-time-generator' -import { - inferMemberOrganizationStintChanges, - MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, - MEMBER_ORG_STINT_CHANGES_QUEUE -} from '@crowd/common_services' + import { - createMemberOrganization, - fetchMemberOrganizationsBySource, - updateMemberOrganization, -} from '@crowd/data-access-layer' + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, + inferMemberOrganizationStintChanges, +} from '@crowd/common_services' +import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer' import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' -import { pgpQx, QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' -import { REDIS_CONFIG, RedisClient, getRedisClient } from '@crowd/redis' -import { MemberOrgDate, MemberOrgStintChange, OrganizationSource } from '@crowd/types' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' +import { OrganizationSource } from '@crowd/types' + import { IJobDefinition } from '../types' const job: IJobDefinition = { @@ -24,8 +22,8 @@ const job: IJobDefinition = { const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0) const qx = pgpQx(db) - // 1. Fetch a batch of work triggers (Member IDs) - const memberIds = await redis.sPop(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + // 1. Get a batch of work + const memberIds = await redis.sRandMember(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) if (!memberIds?.length) return ctx.log.info({ count: memberIds.length }, 'Processing pending members.') @@ -33,97 +31,73 @@ const job: IJobDefinition = { for (const memberId of memberIds) { try { - // 2. Get the activity dates for this member - const activityDates = await popMemberOrganizationActivityDates(redis, memberId) - - // If the member has no activity dates, move to the next member - if (activityDates.length === 0) continue - - // 3. Sync with existing database state - const existingOrgs = await fetchMemberOrganizationsBySource( - qx, - memberId, - OrganizationSource.EMAIL_DOMAIN, - ) - - // 4. Calculate required stint changes - const stintChanges = inferMemberOrganizationStintChanges( - memberId, - existingOrgs, - activityDates, - ) - - if (stintChanges.length === 0) continue - - const counts = { - inserts: stintChanges.filter((c) => c.type === 'insert').length, - updates: stintChanges.filter((c) => c.type === 'update').length, + const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + const hash = await redis.hGetAll(datesKey) + + // If no data, just remove from queue and move on + if (!hash || Object.keys(hash).length === 0) { + await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + continue } - ctx.log.info({ memberId, ...counts }, 'Stint changes identified.') + // 2. Parse Redis data into domain objects + const { activityDates, orgIds } = parseMemberActivityHash(hash) - ctx.log.debug( - { memberId, activityDates, existingOrgs, stintChanges }, - 'Stint inference trace.', - ) + if (activityDates.length > 0) { + // 3. Compare with DB and calculate delta + const existingOrgs = await fetchMemberOrganizationsBySource( + qx, + memberId, + OrganizationSource.EMAIL_DOMAIN, + ) - // @todo: Enable writes after dry-run validation - // await applyStintChanges(qx, stintChanges) + const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) + + if (changes.length > 0) { + ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') + stats.inserts += changes.filter((c) => c.type === 'insert').length + stats.updates += changes.filter((c) => c.type === 'update').length + } + } + + // 4. Cleanup: Remove only the fields we actually read + await redis + .multi() + .hDel(datesKey, ...orgIds) + .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + .exec() stats.processed++ - stats.inserts += counts.inserts - stats.updates += counts.updates } catch (err) { ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') } } - ctx.log.info(stats, 'Batch inference complete.') + ctx.log.info(stats, 'Batch complete.') }, } -async function popMemberOrganizationActivityDates( - redis: RedisClient, - memberId: string, -): Promise { - const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` - - // hGetAll + del in a multi block makes the "Pop" atomic for the entire Hash - const [hash] = (await redis.multi().hGetAll(key).del(key).exec()) as [Record | null, number] - - if (!hash || Object.keys(hash).length === 0) return [] - - return Object.entries(hash) - .flatMap(([organizationId, datesJson]) => - (JSON.parse(datesJson) as string[]).map((date) => ({ organizationId, date })), - ) +/** + * Parses the Redis hash into a clean, typed list of activity dates. + */ +function parseMemberActivityHash(hash: Record) { + const orgIds = Object.keys(hash) + const activityDates = orgIds + .flatMap((organizationId) => { + try { + const dates = JSON.parse(hash[organizationId]) + return Array.isArray(dates) + ? dates + .filter((d): d is string => typeof d === 'string') + .map((date) => ({ organizationId, date })) + : [] + } catch { + return [] + } + }) .sort((a, b) => a.date.localeCompare(b.date)) -} - -async function applyStintChanges( - qx: QueryExecutor, - stintChanges: MemberOrgStintChange[], -): Promise { - for (const change of stintChanges) { - if (change.type === 'insert') { - await createMemberOrganization(qx, change.memberId, { - organizationId: change.organizationId, - dateStart: change.dateStart, - dateEnd: change.dateEnd, - source: OrganizationSource.EMAIL_DOMAIN, - }) - continue - } - - if (!change.id) { - throw new Error('Missing id for update stint change.') - } - await updateMemberOrganization(qx, change.memberId, change.id, { - dateStart: change.dateStart, - dateEnd: change.dateEnd, - }) - } + return { activityDates, orgIds } } -export default job \ No newline at end of file +export default job diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 25ba57c79b..3ce69e4acf 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -1076,29 +1076,42 @@ export default class MemberService extends LoggerBase { organizationId: string, activityTimestamp: string, ): Promise { - // 1. Normalize the timestamp to a simple YYYY-MM-DD string - const date = new Date(activityTimestamp).toISOString().slice(0, 10) + const date = new Date(activityTimestamp).toISOString().split('T')[0] const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` - // 2. Fetch and de-duplicate the date within the organization's buffer + // Safe parser for existing Redis strings. + // Returns a valid string array even if data is corrupt or missing. + const parseExistingDates = (value: string | null | undefined): string[] => { + if (!value) return [] + try { + const parsed = JSON.parse(value) + return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : [] + } catch { + this.log.warn('Corrupt dates buffer value detected during buffering.') + return [] + } + } + + // 1. Fetch current dates for this specific organization const existing = await this.redisClient.hGet(key, organizationId) - const dates: string[] = existing ? JSON.parse(existing) : [] + const dates: string[] = parseExistingDates(existing) - if (dates.includes(date)) { - this.log.trace({ memberId, organizationId, date }, 'Date already buffered, skipping.') - } else { + // 2. If the date is new, update the set and the queue + if (!dates.includes(date)) { dates.push(date) dates.sort() - // 3. Update the buffer with the new sorted date list - await this.redisClient.hSet(key, organizationId, JSON.stringify(dates)) + await Promise.all([ + // Update the specific org field in the hash + this.redisClient.hSet(key, organizationId, JSON.stringify(dates)), + // Ensure the member is in the processing queue + this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId), + ]) + this.log.debug( - { memberId, organizationId, date, totalDates: dates.length }, - 'Buffered activity date for stint inference.', + { memberId, organizationId, date, count: dates.length }, + 'Buffered activity date and queued member.', ) } - - // 4. add the member to the inference queue - await this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) } } diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts index ec094e24c9..23e93a50de 100644 --- a/services/libs/common_services/src/services/member-organization.ts +++ b/services/libs/common_services/src/services/member-organization.ts @@ -1,4 +1,3 @@ -import { RedisClient } from '@crowd/redis' import { IMemberOrganization, MemberOrgDate, @@ -130,6 +129,8 @@ interface Stint { isNew: boolean } +type DatedStint = Stint & { dateStart: string; dateEnd: string } + /** * Core logic to determine if activity dates should expand existing stints or create new ones */ @@ -163,7 +164,9 @@ export function inferMemberOrganizationStintChanges( continue // 3. Fill undated placeholder only when no dated stint exists yet (Rule 2) - const dated = orgStints.filter((s) => s.dateStart && s.dateEnd) + const dated = orgStints.filter( + (s): s is DatedStint => s.dateStart !== null && s.dateEnd !== null, + ) const placeholder = orgStints.find((s) => !s.dateStart && !s.dateEnd) if (placeholder && dated.length === 0) { placeholder.dateStart = placeholder.dateEnd = targetDate @@ -172,12 +175,12 @@ export function inferMemberOrganizationStintChanges( } // 4. Find the closest neighbor stint to see if expansion is possible - let neighbor: Stint | null = null + let neighbor: DatedStint | null = null let minGap = Infinity for (const s of dated) { const gap = - targetDate > s.dateEnd! ? diff(s.dateEnd!, targetDate) : diff(targetDate, s.dateStart!) + targetDate > s.dateEnd ? diff(s.dateEnd, targetDate) : diff(targetDate, s.dateStart) if (gap < minGap) { minGap = gap neighbor = s @@ -198,8 +201,9 @@ export function inferMemberOrganizationStintChanges( // 5. Determine the gap window between neighbor and targetDate, then check if another org // holds a significant (>= 30 day) dated stint that overlaps that window (multi-stint guard) - const gapStart = targetDate > neighbor.dateEnd! ? neighbor.dateEnd! : targetDate - const gapEnd = targetDate > neighbor.dateEnd! ? targetDate : neighbor.dateStart! + const isForward = targetDate > neighbor.dateEnd + const gapStart = isForward ? neighbor.dateEnd : targetDate + const gapEnd = isForward ? targetDate : neighbor.dateStart const hasConflict = stints.some( (s) => s.organizationId !== organizationId && @@ -210,8 +214,6 @@ export function inferMemberOrganizationStintChanges( diff(s.dateStart, s.dateEnd) >= 30, ) - const isForward = targetDate > neighbor.dateEnd! - if (hasConflict) { // 6a. Another org owns the gap — start a fresh stint rather than bridging stints.push({ From 0af601cf4ae41cc26eb5894bf64f810e453a290e Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 24 Apr 2026 23:20:32 +0530 Subject: [PATCH 3/7] fix: update redis member ID retrieval method and cleanup logic Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/jobs/inferMemberOrganizationStintChanges.job.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 5e31bbf46c..40b5929fd7 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -23,7 +23,7 @@ const job: IJobDefinition = { const qx = pgpQx(db) // 1. Get a batch of work - const memberIds = await redis.sRandMember(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) if (!memberIds?.length) return ctx.log.info({ count: memberIds.length }, 'Processing pending members.') @@ -63,7 +63,7 @@ const job: IJobDefinition = { // 4. Cleanup: Remove only the fields we actually read await redis .multi() - .hDel(datesKey, ...orgIds) + .hDel(datesKey, orgIds) .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) .exec() From 34354a596cacd2fa96dd1641e854becc1939148e Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sat, 25 Apr 2026 01:19:01 +0530 Subject: [PATCH 4/7] Update services/apps/data_sink_worker/src/service/member.service.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/apps/data_sink_worker/src/service/member.service.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 3ce69e4acf..cbe971d0d9 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -1087,7 +1087,10 @@ export default class MemberService extends LoggerBase { const parsed = JSON.parse(value) return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : [] } catch { - this.log.warn('Corrupt dates buffer value detected during buffering.') + this.log.warn( + { memberId, organizationId, key }, + 'Corrupt dates buffer value detected during buffering for member organization activity dates.', + ) return [] } } From f74aef1eef55c5cf013135f0748a780c73b3a160 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sat, 25 Apr 2026 02:44:47 +0530 Subject: [PATCH 5/7] fix: resolve pr comments from ai bots Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- ...inferMemberOrganizationStintChanges.job.ts | 27 +++++++++---------- .../src/services/member-organization.ts | 4 ++- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts index 40b5929fd7..2f6a88b6b6 100644 --- a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -82,21 +82,18 @@ const job: IJobDefinition = { */ function parseMemberActivityHash(hash: Record) { const orgIds = Object.keys(hash) - const activityDates = orgIds - .flatMap((organizationId) => { - try { - const dates = JSON.parse(hash[organizationId]) - return Array.isArray(dates) - ? dates - .filter((d): d is string => typeof d === 'string') - .map((date) => ({ organizationId, date })) - : [] - } catch { - return [] - } - }) - .sort((a, b) => a.date.localeCompare(b.date)) - + const activityDates = orgIds.flatMap((organizationId) => { + try { + const dates = JSON.parse(hash[organizationId]) + return Array.isArray(dates) + ? dates + .filter((d): d is string => typeof d === 'string') + .map((date) => ({ organizationId, date })) + : [] + } catch { + return [] + } + }) return { activityDates, orgIds } } diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts index 23e93a50de..fc8a7c38db 100644 --- a/services/libs/common_services/src/services/member-organization.ts +++ b/services/libs/common_services/src/services/member-organization.ts @@ -152,7 +152,9 @@ export function inferMemberOrganizationStintChanges( isNew: false, })) - for (const { organizationId, date: targetDate } of orgDates) { + const sortedDates = [...orgDates].sort((a, b) => a.date.localeCompare(b.date)) + + for (const { organizationId, date: targetDate } of sortedDates) { const orgStints = stints.filter((s) => s.organizationId === organizationId) // 2. Skip if the date is already covered by an existing stint From 68a79fc1b17eeba88b4c9f48cb6f8429836c45fb Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Sun, 26 Apr 2026 20:42:25 +0530 Subject: [PATCH 6/7] refactor: extract source rank logic into a separate func for clarity and reuse Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- services/libs/common/src/member.ts | 3 +++ .../src/services/common.member.service.ts | 13 ++----------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/services/libs/common/src/member.ts b/services/libs/common/src/member.ts index 2ca0d2c455..3518fe8884 100644 --- a/services/libs/common/src/member.ts +++ b/services/libs/common/src/member.ts @@ -82,6 +82,9 @@ export const calculateReach = (oldReach: any, newReach: any): { total: number } return out } +/** + * Lower rank wins when multiple member-organization sources overlap. + */ export function getMemberOrganizationSourceRank(source: string | null | undefined): number { if (source === OrganizationSource.UI) return 0 if (source === OrganizationSource.EMAIL_DOMAIN) return 1 diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index e4ecd1956b..f4752b6667 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -13,6 +13,7 @@ import { calculateReach, getEarliestValidDate, getLongestDateRange, + getMemberOrganizationSourceRank, mergeObjects, safeObjectMerge, } from '@crowd/common' @@ -51,7 +52,6 @@ import { IWorkExperienceData } from '@crowd/data-access-layer/src/old/apps/data_ import { addOrgsToSegments } from '@crowd/data-access-layer/src/organizations' import { Logger, LoggerBase } from '@crowd/logging' import { Client as TemporalClient, WorkflowIdReusePolicy } from '@crowd/temporal' -import { OrganizationSource } from '@crowd/types' import { MergeActionState, MergeActionStep, @@ -224,20 +224,11 @@ export class CommonMemberService extends LoggerBase { return primaryEmployment.organizationId } - // Filter by source priority - // Source rank: ui > email-domain > enrichment-* > Other - const rankSource = (source?: string) => { - if (source === OrganizationSource.UI) return 0 - if (source === OrganizationSource.EMAIL_DOMAIN) return 1 - if (source?.startsWith('enrichment-')) return 2 - return 3 - } - let bestRank = 4 let highestPrioritySourceExperiences: IWorkExperienceData[] = [] for (const exp of experiences) { - const rank = rankSource(exp.source) + const rank = getMemberOrganizationSourceRank(exp.source) if (rank < bestRank) { bestRank = rank highestPrioritySourceExperiences = [exp] From 9553e2cbce8ae8ef3543e18286fe962cbd65da03 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Tue, 28 Apr 2026 21:18:33 +0530 Subject: [PATCH 7/7] refactor: streamline email domain extraction logic in ActivityService Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../data_sink_worker/src/service/activity.service.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 46d4f23c6d..1ca2d5a4c1 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -9,6 +9,7 @@ import { distinctBy, escapeNullByte, generateUUIDv1, + isDomainExcluded, isValidEmail, parseGitHubNoreplyEmail, single, @@ -1450,12 +1451,10 @@ export default class ActivityService extends LoggerBase { ) as boolean if (!isBot) { - const verifiedEmailIdentity = payload.activity.member.identities?.find( - (i) => i.type === MemberIdentityType.EMAIL && i.verified, - ) - const emailDomain = verifiedEmailIdentity - ? verifiedEmailIdentity.value.split('@')[1] - : undefined + const emailDomain = payload.activity.member.identities + ?.filter((i) => i.type === MemberIdentityType.EMAIL && i.verified) + .map((i) => i.value.split('@')[1]?.toLowerCase()) + .find((domain) => domain && !isDomainExcluded(domain)) // associate activity with organization payload.organizationId = await this.commonMemberService.findAffiliation(