diff --git a/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql b/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql new file mode 100644 index 0000000000..404f5c18be --- /dev/null +++ b/backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql @@ -0,0 +1,3 @@ +CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain" + ON "memberOrganizations" ("memberId") + WHERE "source" = 'email-domain' AND "deletedAt" IS NULL; diff --git a/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts new file mode 100644 index 0000000000..2f6a88b6b6 --- /dev/null +++ b/services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts @@ -0,0 +1,100 @@ +import CronTime from 'cron-time-generator' + +import { + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, + inferMemberOrganizationStintChanges, +} from '@crowd/common_services' +import { fetchMemberOrganizationsBySource } from '@crowd/data-access-layer' +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' +import { OrganizationSource } from '@crowd/types' + +import { IJobDefinition } from '../types' + +const job: IJobDefinition = { + name: 'infer-member-organization-stint-changes', + cronTime: CronTime.every(5).minutes(), + timeout: 10 * 60, + process: async (ctx) => { + const redis = await getRedisClient(REDIS_CONFIG()) + const db = await getDbConnection(WRITE_DB_CONFIG(), 2, 0) + const qx = pgpQx(db) + + // 1. Get a batch of work + const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500) + if (!memberIds?.length) return + + ctx.log.info({ count: memberIds.length }, 'Processing pending members.') + const stats = { processed: 0, inserts: 0, updates: 0 } + + for (const memberId of memberIds) { + try { + const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + const hash = await redis.hGetAll(datesKey) + + // If no data, just remove from queue and move on + if (!hash || Object.keys(hash).length === 0) { + await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + continue + } + + // 2. Parse Redis data into domain objects + const { activityDates, orgIds } = parseMemberActivityHash(hash) + + if (activityDates.length > 0) { + // 3. Compare with DB and calculate delta + const existingOrgs = await fetchMemberOrganizationsBySource( + qx, + memberId, + OrganizationSource.EMAIL_DOMAIN, + ) + + const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) + + if (changes.length > 0) { + ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') + stats.inserts += changes.filter((c) => c.type === 'insert').length + stats.updates += changes.filter((c) => c.type === 'update').length + } + } + + // 4. Cleanup: Remove only the fields we actually read + await redis + .multi() + .hDel(datesKey, orgIds) + .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) + .exec() + + stats.processed++ + } catch (err) { + ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') + } + } + + ctx.log.info(stats, 'Batch complete.') + }, +} + +/** + * Parses the Redis hash into a clean, typed list of activity dates. + */ +function parseMemberActivityHash(hash: Record) { + const orgIds = Object.keys(hash) + const activityDates = orgIds.flatMap((organizationId) => { + try { + const dates = JSON.parse(hash[organizationId]) + return Array.isArray(dates) + ? dates + .filter((d): d is string => typeof d === 'string') + .map((date) => ({ organizationId, date })) + : [] + } catch { + return [] + } + }) + return { activityDates, orgIds } +} + +export default job diff --git a/services/apps/data_sink_worker/package.json b/services/apps/data_sink_worker/package.json index c62c1e20d8..f9a2363081 100644 --- a/services/apps/data_sink_worker/package.json +++ b/services/apps/data_sink_worker/package.json @@ -17,8 +17,6 @@ "script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts", "script:process-results": "SERVICE=script tsx src/bin/process-results.ts", "script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts", - "script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts", - "script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts", "script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts", "script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts", "script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts", diff --git a/services/apps/data_sink_worker/src/bin/map-member-to-org.ts b/services/apps/data_sink_worker/src/bin/map-member-to-org.ts deleted file mode 100644 index 7f63a2dc18..0000000000 --- a/services/apps/data_sink_worker/src/bin/map-member-to-org.ts +++ /dev/null @@ -1,97 +0,0 @@ -import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' -import { dbStoreQx, findIdentitiesForMembers } from '@crowd/data-access-layer' -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' -import { getServiceLogger } from '@crowd/logging' -import { QueueFactory } from '@crowd/queue' -import { getRedisClient } from '@crowd/redis' -import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' -import { MemberIdentityType } from '@crowd/types' - -import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG, TEMPORAL_CONFIG } from '../conf' -import MemberService from '../service/member.service' -import { OrganizationService } from '../service/organization.service' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: memberId') - process.exit(1) -} - -const memberId = processArguments[0] - -setImmediate(async () => { - let temporal: TemporalClient | undefined - if (TEMPORAL_CONFIG().serverUrl) { - temporal = await getTemporalClient(TEMPORAL_CONFIG()) - } - - const redis = await getRedisClient(REDIS_CONFIG()) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - - const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) - const emitter = new DataSinkWorkerEmitter(queueClient, log) - await emitter.init() - - const dataSinkRepo = new DataSinkRepository(store, log) - const memberRepo = new MemberRepository(store, log) - - const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, log) - await searchSyncWorkerEmitter.init() - - const memberService = new MemberService(store, redis, temporal, log) - const orgService = new OrganizationService(store, log) - - try { - const members = await memberRepo.findByIds([memberId]) - - const member = members[0] - - if (!member) { - log.error({ memberId }, 'Member not found!') - process.exit(1) - } - - const identities = (await findIdentitiesForMembers(dbStoreQx(store), [memberId])).get(memberId) - log.info(`Processing memberId: ${member.id}`) - - const segmentIds = await dataSinkRepo.getSegmentIds() - const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id - - const emailIdentities = identities.filter( - (i) => i.verified && i.type === MemberIdentityType.EMAIL, - ) - if (emailIdentities.length > 0) { - const emails = emailIdentities.map((i) => i.value) - log.info({ memberId, emails }, 'Member emails!') - const orgs = await memberService.assignOrganizationByEmailDomain(null, emails) - - if (orgs.length > 0) { - log.info('Organizations found with matching email domains:', JSON.stringify(orgs)) - orgService.addToMember([segmentId], member.id, orgs) - - for (const org of orgs) { - await searchSyncWorkerEmitter.triggerOrganizationSync(org.id, true, segmentId) - } - - await searchSyncWorkerEmitter.triggerMemberSync(member.id, true, segmentId) - log.info('Done mapping member to organizations!') - } else { - log.info('No organizations found with matching email domains!') - } - } else { - log.info('No emails found for member!') - } - - process.exit(0) - } catch (err) { - log.error('Failed to map organizations for member!', err) - process.exit(1) - } -}) diff --git a/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts b/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts deleted file mode 100644 index 04d23a1179..0000000000 --- a/services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo' -import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo' -import { getServiceLogger } from '@crowd/logging' -import { QueueFactory } from '@crowd/queue' -import { getRedisClient } from '@crowd/redis' -import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal' - -import { DB_CONFIG, QUEUE_CONFIG, REDIS_CONFIG, TEMPORAL_CONFIG } from '../conf' -import MemberService from '../service/member.service' -import { OrganizationService } from '../service/organization.service' - -const log = getServiceLogger() - -setImmediate(async () => { - let temporal: TemporalClient | undefined - if (TEMPORAL_CONFIG().serverUrl) { - temporal = await getTemporalClient(TEMPORAL_CONFIG()) - } - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - - const redis = await getRedisClient(REDIS_CONFIG()) - - const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG()) - const emitter = new DataSinkWorkerEmitter(queueClient, log) - await emitter.init() - - const dataSinkRepo = new DataSinkRepository(store, log) - const memberRepo = new MemberRepository(store, log) - - const segmentIds = await dataSinkRepo.getSegmentIds() - const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id - - const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(queueClient, log) - await searchSyncWorkerEmitter.init() - - const memberService = new MemberService(store, redis, temporal, log) - const orgService = new OrganizationService(store, log) - - const limit = 100 - let offset = 0 - let processedMembers = 0 - - let currentMemberId = null - let currentEmails = null - - try { - const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(segmentIds, { - limit, - offset, - countOnly: true, - }) - - log.info(`Total members found in the tenant: ${totalCount}`) - - do { - const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(segmentIds, { - limit, - offset, - }) - - // member -> organization based on email domain - for (const member of members) { - currentMemberId = member.id - currentEmails = member.emails - if (member.emails) { - const orgs = await memberService.assignOrganizationByEmailDomain(null, member.emails) - - if (orgs.length > 0) { - orgService.addToMember([segmentId], member.id, orgs) - - for (const org of orgs) { - await searchSyncWorkerEmitter.triggerOrganizationSync(org.id, true, segmentId) - } - - await searchSyncWorkerEmitter.triggerMemberSync(member.id, true, segmentId) - } - } - - processedMembers++ - log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`) - } - offset += limit - } while (totalCount > offset) - - log.info(`Member to organization association completed`) - process.exit(0) - } catch (err) { - log.error( - `Failed to assign member to organizations. Member ID: ${currentMemberId}, Emails: ${currentEmails}`, - err, - ) - process.exit(1) - } -}) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 5e4b4cdb7f..46d4f23c6d 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1197,6 +1197,7 @@ export default class ActivityService extends LoggerBase { value.platform, undefined, orgPromiseCache, + value.timestamp, ) .then((memberId) => { // map ids for members @@ -1342,6 +1343,7 @@ export default class ActivityService extends LoggerBase { payload.platform, undefined, orgPromiseCache, + payload.activity.timestamp, ) .then(() => { payload.memberId = payload.dbMember.id @@ -1400,6 +1402,7 @@ export default class ActivityService extends LoggerBase { payload.platform, undefined, orgPromiseCache, + payload.activity.timestamp, ) .then(() => { payload.objectMemberId = payload.dbObjectMember.id @@ -1447,11 +1450,19 @@ export default class ActivityService extends LoggerBase { ) as boolean if (!isBot) { + const verifiedEmailIdentity = payload.activity.member.identities?.find( + (i) => i.type === MemberIdentityType.EMAIL && i.verified, + ) + const emailDomain = verifiedEmailIdentity + ? verifiedEmailIdentity.value.split('@')[1] + : undefined + // associate activity with organization payload.organizationId = await this.commonMemberService.findAffiliation( payload.memberId, payload.segmentId, payload.activity.timestamp, + emailDomain, ) } else { // for bot members, we don't want to affiliate the activity with an organization diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 8aa78b1cac..cbe971d0d9 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -12,7 +12,12 @@ import { isObjectEmpty, singleOrDefault, } from '@crowd/common' -import { BotDetectionService, CommonMemberService } from '@crowd/common_services' +import { + BotDetectionService, + CommonMemberService, + MEMBER_ORG_STINT_CHANGES_DATES_PREFIX, + MEMBER_ORG_STINT_CHANGES_QUEUE, +} from '@crowd/common_services' import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer' import { findIdentitiesForMembers, @@ -121,6 +126,7 @@ export default class MemberService extends LoggerBase { platform: PlatformType, releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, + activityTimestamp?: string, ): Promise { return logExecutionTimeV2( async () => { @@ -448,6 +454,8 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, + id, + activityTimestamp, ), this.log, 'memberService -> create -> assignOrganizationByEmailDomain', @@ -505,6 +513,7 @@ export default class MemberService extends LoggerBase { platform: PlatformType, releaseMemberLock?: () => Promise, orgPromiseCache?: Map>, + activityTimestamp?: string, ): Promise { await logExecutionTimeV2( async () => { @@ -682,6 +691,8 @@ export default class MemberService extends LoggerBase { integrationId, emailIdentities.map((i) => i.value), orgPromiseCache, + id, + activityTimestamp, ), this.log, 'memberService -> update -> assignOrganizationByEmailDomain', @@ -733,6 +744,8 @@ export default class MemberService extends LoggerBase { integrationId: string, emails: string[], orgPromiseCache?: Map>, + memberId?: string, + activityTimestamp?: string, ): Promise { const orgService = new OrganizationService(this.store, this.log) const organizations: IOrganizationIdSource[] = [] @@ -791,6 +804,9 @@ export default class MemberService extends LoggerBase { id: orgId, source: orgSource, }) + if (memberId && activityTimestamp) { + await this.bufferMemberOrganizationActivityDates(memberId, orgId, activityTimestamp) + } } } @@ -1047,4 +1063,58 @@ export default class MemberService extends LoggerBase { }, }) } + + /** + * Queues member activity dates in Redis as raw input for stint inference. + * + * To maximize throughput, this uses a non-atomic HGET/HSET pattern. While + * concurrent writes may occasionally drop a date, the system is self-healing + * as future activity will re-populate the buffer. + */ + private async bufferMemberOrganizationActivityDates( + memberId: string, + organizationId: string, + activityTimestamp: string, + ): Promise { + const date = new Date(activityTimestamp).toISOString().split('T')[0] + const key = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` + + // Safe parser for existing Redis strings. + // Returns a valid string array even if data is corrupt or missing. + const parseExistingDates = (value: string | null | undefined): string[] => { + if (!value) return [] + try { + const parsed = JSON.parse(value) + return Array.isArray(parsed) ? parsed.filter((d): d is string => typeof d === 'string') : [] + } catch { + this.log.warn( + { memberId, organizationId, key }, + 'Corrupt dates buffer value detected during buffering for member organization activity dates.', + ) + return [] + } + } + + // 1. Fetch current dates for this specific organization + const existing = await this.redisClient.hGet(key, organizationId) + const dates: string[] = parseExistingDates(existing) + + // 2. If the date is new, update the set and the queue + if (!dates.includes(date)) { + dates.push(date) + dates.sort() + + await Promise.all([ + // Update the specific org field in the hash + this.redisClient.hSet(key, organizationId, JSON.stringify(dates)), + // Ensure the member is in the processing queue + this.redisClient.sAdd(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId), + ]) + + this.log.debug( + { memberId, organizationId, date, count: dates.length }, + 'Buffered activity date and queued member.', + ) + } + } } diff --git a/services/libs/common/src/member.ts b/services/libs/common/src/member.ts index 2ca0d2c455..3518fe8884 100644 --- a/services/libs/common/src/member.ts +++ b/services/libs/common/src/member.ts @@ -82,6 +82,9 @@ export const calculateReach = (oldReach: any, newReach: any): { total: number } return out } +/** + * Lower rank wins when multiple member-organization sources overlap. + */ export function getMemberOrganizationSourceRank(source: string | null | undefined): number { if (source === OrganizationSource.UI) return 0 if (source === OrganizationSource.EMAIL_DOMAIN) return 1 diff --git a/services/libs/common_services/src/services/common.member.service.ts b/services/libs/common_services/src/services/common.member.service.ts index 70c70e0647..f4752b6667 100644 --- a/services/libs/common_services/src/services/common.member.service.ts +++ b/services/libs/common_services/src/services/common.member.service.ts @@ -13,6 +13,7 @@ import { calculateReach, getEarliestValidDate, getLongestDateRange, + getMemberOrganizationSourceRank, mergeObjects, safeObjectMerge, } from '@crowd/common' @@ -169,6 +170,7 @@ export class CommonMemberService extends LoggerBase { memberId: string, segmentId: string, timestamp: string, + emailDomain?: string, ): Promise { const manualAffiliation = await findMemberManualAffiliation( this.qx, @@ -180,7 +182,12 @@ export class CommonMemberService extends LoggerBase { return manualAffiliation.organizationId } - const currentEmployments = await findMemberWorkExperience(this.qx, memberId, timestamp) + const currentEmployments = await findMemberWorkExperience( + this.qx, + memberId, + timestamp, + emailDomain, + ) if (currentEmployments.length > 0) { return this.decidePrimaryOrganizationId(currentEmployments) } @@ -217,10 +224,28 @@ export class CommonMemberService extends LoggerBase { return primaryEmployment.organizationId } + let bestRank = 4 + let highestPrioritySourceExperiences: IWorkExperienceData[] = [] + + for (const exp of experiences) { + const rank = getMemberOrganizationSourceRank(exp.source) + if (rank < bestRank) { + bestRank = rank + highestPrioritySourceExperiences = [exp] + } else if (rank === bestRank) { + highestPrioritySourceExperiences.push(exp) + } + } + + // Keep only candidates from the highest-priority source tier + if (highestPrioritySourceExperiences.length === 1) { + return highestPrioritySourceExperiences[0].organizationId + } + // decide based on the member count in the organizations const memberCounts = await findMemberCountEstimateOfOrganizations( this.qx, - experiences.map((e) => e.organizationId), + highestPrioritySourceExperiences.map((e) => e.organizationId), ) if (memberCounts[0].memberCount > memberCounts[1].memberCount) { @@ -230,7 +255,7 @@ export class CommonMemberService extends LoggerBase { } // if there's a draw in the member count, use the one with the longer period - return getLongestDateRange(experiences).organizationId + return getLongestDateRange(highestPrioritySourceExperiences).organizationId } return null diff --git a/services/libs/common_services/src/services/index.ts b/services/libs/common_services/src/services/index.ts index 227ef64e34..8034c4cfc6 100644 --- a/services/libs/common_services/src/services/index.ts +++ b/services/libs/common_services/src/services/index.ts @@ -3,7 +3,7 @@ export * from './llm.service' export * from './common.member.service' export * from './member/unmerge' export * from './member/cache' -export * from './memberOrganization' +export * from './member-organization' export * from './bot.service' export * from './emitters' export * from './github.integration.service' diff --git a/services/libs/common_services/src/services/member-organization.ts b/services/libs/common_services/src/services/member-organization.ts new file mode 100644 index 0000000000..fc8a7c38db --- /dev/null +++ b/services/libs/common_services/src/services/member-organization.ts @@ -0,0 +1,255 @@ +import { + IMemberOrganization, + MemberOrgDate, + MemberOrgStintChange, + MemberRoleUnmergeStrategy, +} from '@crowd/types' + +function roleKey( + role: IMemberOrganization, + strategy: MemberRoleUnmergeStrategy, +): string | undefined { + if (strategy === MemberRoleUnmergeStrategy.SAME_MEMBER) { + return role.organizationId + } + return role.memberId +} + +function roleExistsInArray( + role: IMemberOrganization, + roles: IMemberOrganization[], + strategy: MemberRoleUnmergeStrategy, +): boolean { + const key = roleKey(role, strategy) + return roles.some( + (r) => + roleKey(r, strategy) === key && + r.title === role.title && + r.dateStart === role.dateStart && + r.dateEnd === role.dateEnd, + ) +} + +export function rolesIntersect( + roleA: IMemberOrganization, + roleB: IMemberOrganization, + strategy: MemberRoleUnmergeStrategy, +): boolean { + if (roleKey(roleA, strategy) !== roleKey(roleB, strategy) || roleA.title !== roleB.title) { + return false + } + + const startA = new Date(roleA.dateStart).getTime() + const endA = new Date(roleA.dateEnd).getTime() + const startB = new Date(roleB.dateStart).getTime() + const endB = new Date(roleB.dateEnd).getTime() + + return ( + (startA < startB && endA > startB) || + (startB < startA && endB > startA) || + (startA < startB && endA > endB) || + (startB < startA && endB > endA) + ) +} + +export function unmergeRoles( + mergedRoles: IMemberOrganization[], + primaryBackupRoles: IMemberOrganization[], + secondaryBackupRoles: IMemberOrganization[], + strategy: MemberRoleUnmergeStrategy, +): IMemberOrganization[] { + const unmergedRoles: IMemberOrganization[] = mergedRoles.filter( + (role) => + role.source === 'ui' || + !secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), + ) + + const editableRoles = mergedRoles.filter( + (role) => + role.source !== 'ui' && + secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), + ) + + for (const secondaryBackupRole of secondaryBackupRoles) { + const { dateStart, dateEnd } = secondaryBackupRole + + if (dateStart === null && dateEnd === null) { + if ( + roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && + roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) + ) { + unmergedRoles.push(secondaryBackupRole) + } + } else if (dateStart !== null && dateEnd === null) { + const currentRoleFromPrimaryBackup = primaryBackupRoles.find( + (r) => + roleKey(r, strategy) === roleKey(secondaryBackupRole, strategy) && + r.title === secondaryBackupRole.title && + r.dateStart !== null && + r.dateEnd === null, + ) + if (currentRoleFromPrimaryBackup) { + unmergedRoles.push(currentRoleFromPrimaryBackup) + } + } else if (dateStart !== null && dateEnd !== null) { + if ( + roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && + roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) + ) { + unmergedRoles.push(secondaryBackupRole) + } else { + const intersecting = editableRoles.find((r) => + rolesIntersect(secondaryBackupRole, r, strategy), + ) + + if (intersecting) { + const fromBackup = primaryBackupRoles.find((r) => + rolesIntersect(secondaryBackupRole, r, strategy), + ) + if (fromBackup) { + unmergedRoles.push(fromBackup) + } + } + } + } + } + + return unmergedRoles +} + +export const MEMBER_ORG_STINT_CHANGES_QUEUE = 'infer-member-organization-stint-changes:members' +export const MEMBER_ORG_STINT_CHANGES_DATES_PREFIX = 'infer-member-organization-stint-changes:dates' + +interface Stint { + id: string | null + organizationId: string + dateStart: string | null + dateEnd: string | null + isDirty: boolean + isNew: boolean +} + +type DatedStint = Stint & { dateStart: string; dateEnd: string } + +/** + * Core logic to determine if activity dates should expand existing stints or create new ones + */ +export function inferMemberOrganizationStintChanges( + memberId: string, + existingRows: IMemberOrganization[], + orgDates: MemberOrgDate[], +): MemberOrgStintChange[] { + const toIso = (v: string | Date) => new Date(v).toISOString().split('T')[0] + const diff = (a: string, b: string) => Math.abs(Date.parse(b) - Date.parse(a)) / 86_400_000 + + // 1. Initialize local state to track modifications and new records + const stints: Stint[] = existingRows.map((r) => ({ + id: r.id ?? null, + organizationId: r.organizationId, + dateStart: r.dateStart ? toIso(r.dateStart) : null, + dateEnd: r.dateEnd ? toIso(r.dateEnd) : null, + isDirty: false, + isNew: false, + })) + + const sortedDates = [...orgDates].sort((a, b) => a.date.localeCompare(b.date)) + + for (const { organizationId, date: targetDate } of sortedDates) { + const orgStints = stints.filter((s) => s.organizationId === organizationId) + + // 2. Skip if the date is already covered by an existing stint + if ( + orgStints.some( + (s) => s.dateStart && s.dateEnd && targetDate >= s.dateStart && targetDate <= s.dateEnd, + ) + ) + continue + + // 3. Fill undated placeholder only when no dated stint exists yet (Rule 2) + const dated = orgStints.filter( + (s): s is DatedStint => s.dateStart !== null && s.dateEnd !== null, + ) + const placeholder = orgStints.find((s) => !s.dateStart && !s.dateEnd) + if (placeholder && dated.length === 0) { + placeholder.dateStart = placeholder.dateEnd = targetDate + placeholder.isDirty = true + continue + } + + // 4. Find the closest neighbor stint to see if expansion is possible + let neighbor: DatedStint | null = null + let minGap = Infinity + + for (const s of dated) { + const gap = + targetDate > s.dateEnd ? diff(s.dateEnd, targetDate) : diff(targetDate, s.dateStart) + if (gap < minGap) { + minGap = gap + neighbor = s + } + } + + if (!neighbor) { + stints.push({ + id: null, + organizationId, + dateStart: targetDate, + dateEnd: targetDate, + isDirty: true, + isNew: true, + }) + continue + } + + // 5. Determine the gap window between neighbor and targetDate, then check if another org + // holds a significant (>= 30 day) dated stint that overlaps that window (multi-stint guard) + const isForward = targetDate > neighbor.dateEnd + const gapStart = isForward ? neighbor.dateEnd : targetDate + const gapEnd = isForward ? targetDate : neighbor.dateStart + const hasConflict = stints.some( + (s) => + s.organizationId !== organizationId && + s.dateStart && + s.dateEnd && + s.dateStart < gapEnd && + s.dateEnd > gapStart && + diff(s.dateStart, s.dateEnd) >= 30, + ) + + if (hasConflict) { + // 6a. Another org owns the gap — start a fresh stint rather than bridging + stints.push({ + id: null, + organizationId, + dateStart: targetDate, + dateEnd: targetDate, + isDirty: true, + isNew: true, + }) + } else if (isForward && minGap <= 30) { + // 6b. Forward extension within the debounce window — skip to avoid thrashing dateEnd. + // Backward extensions are not debounced (rare, only during historical re-ingestion). + continue + } else { + // 6c. Extend the neighbor in the appropriate direction + if (isForward) neighbor.dateEnd = targetDate + else neighbor.dateStart = targetDate + neighbor.isDirty = true + } + } + + // 7. Map only modified or new stints back to change objects + return stints + .filter((s) => s.isDirty) + .map((s): MemberOrgStintChange => { + const payload = { + memberId, + organizationId: s.organizationId, + dateStart: s.dateStart as string, + dateEnd: s.dateEnd as string, + } + + if (s.isNew) return { type: 'insert', ...payload } + return { type: 'update', id: s.id as string, ...payload } + }) +} diff --git a/services/libs/common_services/src/services/member/unmerge.ts b/services/libs/common_services/src/services/member/unmerge.ts index 7d79631d92..e56a0d6ab0 100644 --- a/services/libs/common_services/src/services/member/unmerge.ts +++ b/services/libs/common_services/src/services/member/unmerge.ts @@ -58,7 +58,7 @@ import { } from '@crowd/types' import { BotDetectionService } from '../bot.service' -import { unmergeRoles } from '../memberOrganization' +import { unmergeRoles } from '../member-organization' const logger = getServiceLogger() diff --git a/services/libs/common_services/src/services/memberOrganization.ts b/services/libs/common_services/src/services/memberOrganization.ts deleted file mode 100644 index b20139e3e4..0000000000 --- a/services/libs/common_services/src/services/memberOrganization.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { IMemberOrganization, MemberRoleUnmergeStrategy } from '@crowd/types' - -function roleKey( - role: IMemberOrganization, - strategy: MemberRoleUnmergeStrategy, -): string | undefined { - if (strategy === MemberRoleUnmergeStrategy.SAME_MEMBER) { - return role.organizationId - } - return role.memberId -} - -function roleExistsInArray( - role: IMemberOrganization, - roles: IMemberOrganization[], - strategy: MemberRoleUnmergeStrategy, -): boolean { - const key = roleKey(role, strategy) - return roles.some( - (r) => - roleKey(r, strategy) === key && - r.title === role.title && - r.dateStart === role.dateStart && - r.dateEnd === role.dateEnd, - ) -} - -export function rolesIntersect( - roleA: IMemberOrganization, - roleB: IMemberOrganization, - strategy: MemberRoleUnmergeStrategy, -): boolean { - if (roleKey(roleA, strategy) !== roleKey(roleB, strategy) || roleA.title !== roleB.title) { - return false - } - - const startA = new Date(roleA.dateStart).getTime() - const endA = new Date(roleA.dateEnd).getTime() - const startB = new Date(roleB.dateStart).getTime() - const endB = new Date(roleB.dateEnd).getTime() - - return ( - (startA < startB && endA > startB) || - (startB < startA && endB > startA) || - (startA < startB && endA > endB) || - (startB < startA && endB > endA) - ) -} - -export function unmergeRoles( - mergedRoles: IMemberOrganization[], - primaryBackupRoles: IMemberOrganization[], - secondaryBackupRoles: IMemberOrganization[], - strategy: MemberRoleUnmergeStrategy, -): IMemberOrganization[] { - const unmergedRoles: IMemberOrganization[] = mergedRoles.filter( - (role) => - role.source === 'ui' || - !secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), - ) - - const editableRoles = mergedRoles.filter( - (role) => - role.source !== 'ui' && - secondaryBackupRoles.some((r) => roleKey(r, strategy) === roleKey(role, strategy)), - ) - - for (const secondaryBackupRole of secondaryBackupRoles) { - const { dateStart, dateEnd } = secondaryBackupRole - - if (dateStart === null && dateEnd === null) { - if ( - roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && - roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) - ) { - unmergedRoles.push(secondaryBackupRole) - } - } else if (dateStart !== null && dateEnd === null) { - const currentRoleFromPrimaryBackup = primaryBackupRoles.find( - (r) => - roleKey(r, strategy) === roleKey(secondaryBackupRole, strategy) && - r.title === secondaryBackupRole.title && - r.dateStart !== null && - r.dateEnd === null, - ) - if (currentRoleFromPrimaryBackup) { - unmergedRoles.push(currentRoleFromPrimaryBackup) - } - } else if (dateStart !== null && dateEnd !== null) { - if ( - roleExistsInArray(secondaryBackupRole, editableRoles, strategy) && - roleExistsInArray(secondaryBackupRole, primaryBackupRoles, strategy) - ) { - unmergedRoles.push(secondaryBackupRole) - } else { - const intersecting = editableRoles.find((r) => - rolesIntersect(secondaryBackupRole, r, strategy), - ) - - if (intersecting) { - const fromBackup = primaryBackupRoles.find((r) => - rolesIntersect(secondaryBackupRole, r, strategy), - ) - if (fromBackup) { - unmergedRoles.push(fromBackup) - } - } - } - } - } - - return unmergedRoles -} diff --git a/services/libs/data-access-layer/src/members/organizations.ts b/services/libs/data-access-layer/src/members/organizations.ts index 205c871a82..fcd87bbf18 100644 --- a/services/libs/data-access-layer/src/members/organizations.ts +++ b/services/libs/data-access-layer/src/members/organizations.ts @@ -42,6 +42,23 @@ export async function fetchMemberOrganizations( ) } +export async function fetchMemberOrganizationsBySource( + qx: QueryExecutor, + memberId: string, + source: OrganizationSource, +): Promise { + return qx.select( + ` + SELECT "id", "organizationId", "dateStart", "dateEnd", "title", "memberId", "source" + FROM "memberOrganizations" + WHERE "memberId" = $(memberId) + AND "source" = $(source) + AND "deletedAt" IS NULL + `, + { memberId, source }, + ) +} + export async function fetchOrganizationMemberIds( qx: QueryExecutor, organizationId: string, diff --git a/services/libs/data-access-layer/src/members/segments.ts b/services/libs/data-access-layer/src/members/segments.ts index 4960dc079a..34a15f7b5a 100644 --- a/services/libs/data-access-layer/src/members/segments.ts +++ b/services/libs/data-access-layer/src/members/segments.ts @@ -206,7 +206,21 @@ export async function findMemberWorkExperience( qx: QueryExecutor, memberId: string, timestamp: string, + emailDomain?: string, ): Promise { + let domainOrClause = '' + if (emailDomain) { + domainOrClause = ` + OR (mo."source" = 'email-domain' AND EXISTS ( + SELECT 1 FROM "organizationIdentities" oi + WHERE oi."organizationId" = mo."organizationId" + AND oi.type = 'primary-domain' + AND oi.verified = true + AND LOWER(oi.value) = LOWER($(emailDomain)) + )) + ` + } + const result = await qx.select( ` SELECT @@ -215,17 +229,19 @@ export async function findMemberWorkExperience( FROM "memberOrganizations" mo LEFT JOIN "memberOrganizationAffiliationOverrides" ovr on ovr."memberOrganizationId" = mo."id" WHERE mo."memberId" = $(memberId) + AND mo."deletedAt" IS NULL + AND coalesce(ovr."allowAffiliation", true) = true AND ( (mo."dateStart" <= $(timestamp) AND mo."dateEnd" >= $(timestamp)) OR (mo."dateStart" <= $(timestamp) AND mo."dateEnd" IS NULL) + ${domainOrClause} ) - AND mo."deletedAt" IS NULL - AND coalesce(ovr."allowAffiliation", true) = true ORDER BY mo."dateStart" DESC, mo.id `, { memberId, timestamp, + emailDomain, }, ) diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts index df444d7631..5c53320503 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts @@ -1,3 +1,5 @@ +import { OrganizationSource } from '@crowd/types' + interface BaseData { memberId: string organizationId: string @@ -15,6 +17,7 @@ export interface IManualAffiliationData extends BaseData { export interface IWorkExperienceData extends BaseData { id: string + source: OrganizationSource } export interface IOrganizationMemberCount { diff --git a/services/libs/types/src/organizations.ts b/services/libs/types/src/organizations.ts index d5b01d6beb..649cc27f6b 100644 --- a/services/libs/types/src/organizations.ts +++ b/services/libs/types/src/organizations.ts @@ -92,6 +92,22 @@ export interface IMemberRoleWithOrganization extends IMemberOrganization { organizationLogo: string } +export interface MemberOrgDate { + organizationId: string + date: string // YYYY-MM-DD +} + +interface MemberOrgStintChangeBase { + memberId: string + organizationId: string + dateStart: string + dateEnd: string +} + +export type MemberOrgStintChange = + | ({ type: 'insert' } & MemberOrgStintChangeBase) + | ({ type: 'update'; id: string } & MemberOrgStintChangeBase) + export interface IExecutiveChange { joined_date?: string pdl_id?: string