Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1506,17 +1506,20 @@ export default class ActivityService extends LoggerBase {
) as boolean

if (!isBot) {
const emailDomain = payload.activity.member.identities
?.filter((i) => i.type === MemberIdentityType.EMAIL && i.verified)
.map((i) => i.value.split('@')[1]?.toLowerCase())
.find((domain) => domain && !isDomainExcluded(domain))
// Trust the email the activity arrived with (username only).
// Public inbox domains (gmail, etc.) don't identify an org, so they're skipped.
const domain = isValidEmail(payload.activity.username)
? payload.activity.username.split('@')[1]?.toLowerCase()
: undefined

const affiliationEmailDomain = domain && !isDomainExcluded(domain) ? domain : undefined

// associate activity with organization
payload.organizationId = await this.commonMemberService.findAffiliation(
payload.memberId,
payload.segmentId,
payload.activity.timestamp,
emailDomain,
affiliationEmailDomain,
)
} else {
// for bot members, we don't want to affiliate the activity with an organization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
deleteMemberOrganizations,
fetchManyMemberOrgsWithOrgData,
fetchManyOrganizationAffiliationPolicies,
fetchManyOrganizationVerifiedPrimaryDomains,
fetchMemberOrganizations,
findAllUnkownDatedOrganizations,
findIdentitiesForMembers,
Expand All @@ -40,6 +41,7 @@ import {
moveAffiliationsBetweenMembers,
moveIdentitiesBetweenMembers,
moveOrgsBetweenMembers,
preferCompanyOverUniversityWhenOverlapping,
updateMember,
} from '@crowd/data-access-layer'
import { removeMemberToMerge } from '@crowd/data-access-layer/src/member_merge'
Expand Down Expand Up @@ -177,8 +179,9 @@ export class CommonMemberService extends LoggerBase {
memberId: string,
segmentId: string,
timestamp: string,
emailDomain?: string,
affiliationEmailDomain?: string,
): Promise<string | null> {
// 1. Manual Segment Affiliations always take absolute priority
const manualAffiliation = await findMemberManualAffiliation(
this.qx,
memberId,
Expand All @@ -189,16 +192,42 @@ export class CommonMemberService extends LoggerBase {
return manualAffiliation.organizationId
}

const currentEmployments = await findMemberWorkExperience(
this.qx,
memberId,
timestamp,
emailDomain,
)
// 2. When the activity carries an org email domain, match member orgs by verified primary domain
if (affiliationEmailDomain) {
const domainEmployments = await findMemberWorkExperience(
this.qx,
memberId,
timestamp,
affiliationEmailDomain,
)

if (domainEmployments.length > 0) {
return this.decidePrimaryOrganizationId(domainEmployments)
}
}

// 3. Date matching: work history active at this timestamp
const currentEmployments = await findMemberWorkExperience(this.qx, memberId, timestamp)
if (currentEmployments.length > 0) {
return this.decidePrimaryOrganizationId(currentEmployments)
let employments = currentEmployments

if (employments.length > 1) {
const organizationIds = [...new Set(employments.map((row) => row.organizationId))]

const memberOrgDomains = await fetchManyOrganizationVerifiedPrimaryDomains(
this.qx,
organizationIds,
)

// Also applies when step 2 found a domain but no matching member organization yet
// (e.g. ingest before stint inference).
employments = preferCompanyOverUniversityWhenOverlapping(employments, memberOrgDomains)
}

return this.decidePrimaryOrganizationId(employments)
}

// 4. Fallback: Most recent experiences with missing/unknown dates
const mostRecentUnknownDatedOrgs = await findMostRecentUnknownDatedOrganizations(
this.qx,
memberId,
Expand All @@ -208,6 +237,7 @@ export class CommonMemberService extends LoggerBase {
return this.decidePrimaryOrganizationId(mostRecentUnknownDatedOrgs)
}

// 5. Last Resort: Any historical undated organization tied to the member
const allUnkownDAtedOrgs = await findAllUnkownDatedOrganizations(this.qx, memberId)
if (allUnkownDAtedOrgs.length > 0) {
return this.decidePrimaryOrganizationId(allUnkownDAtedOrgs)
Comment thread
skwowet marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import {

import { findMemberAffiliations } from '../member_segment_affiliations'
import { IManualAffiliationData } from '../old/apps/data_sink_worker/repo/memberAffiliation.data'
import {
type OrganizationVerifiedPrimaryDomains,
fetchManyOrganizationVerifiedPrimaryDomains,
preferCompanyOverUniversityWhenOverlapping,
} from '../organizations/identities'
import { QueryExecutor } from '../queryExecutor'

import type { MemberOrganizationWithOverrides, TimelineItem } from './types'
Expand All @@ -30,6 +35,8 @@ async function prepareMemberOrganizationAffiliationTimeline(
qx: QueryExecutor,
memberId: string,
): Promise<TimelineItem[]> {
let memberOrgDomains: OrganizationVerifiedPrimaryDomains[] = []

const isDateInInterval = (date: Date, start: Date | null, end: Date | null) => {
return (!start || date >= start) && (!end || date <= end)
}
Expand Down Expand Up @@ -67,6 +74,20 @@ async function prepareMemberOrganizationAffiliationTimeline(
return getLongestDateRange(manualAffiliations)
}

const memberOrgs = orgs.filter(isMemberOrganizationWithOverrides)
const companyPreferredOrgs = preferCompanyOverUniversityWhenOverlapping(
memberOrgs,
memberOrgDomains,
)

if (companyPreferredOrgs.length < memberOrgs.length) {
const companyPreferredOrgIds = new Set(companyPreferredOrgs.map((row) => row.organizationId))
orgs = orgs.filter(
(row) =>
!isMemberOrganizationWithOverrides(row) || companyPreferredOrgIds.has(row.organizationId),
)
}

// first check if there's a primary work experience
const primaryOrgs = orgs.filter((row) => row.isPrimaryWorkExperience)
if (primaryOrgs.length > 0) {
Expand Down Expand Up @@ -308,23 +329,67 @@ async function prepareMemberOrganizationAffiliationTimeline(
.value() ?? null
}

// We separate global and manual timelines to prevent 'stale' organizationIds
// Member organizations apply globally, while member segment affiliations only override specific segments.
const organizationIds = Array.from(
new Set(memberOrganizations.map((row: MemberOrganizationWithOverrides) => row.organizationId)),
)

memberOrgDomains = await fetchManyOrganizationVerifiedPrimaryDomains(
qx,
organizationIds as string[],
)
Comment on lines +336 to +339

// Route activities exclusively to ONE timeline pass to prevent double-processing:
// 1. If an activity has a verified email domain, it lands in the email timeline.
// 2. Otherwise, fallback to the date-based timeline (excluding these known domains).
const emailDomains = [...new Set(memberOrgDomains.flatMap((row) => row.domains))]
const nonEmailActivityFilter =
emailDomains.length > 0 ? { excludeEmailDomains: emailDomains } : {}

// Separate global and manual timelines to prevent stale data.
// Global member orgs apply everywhere; manual segment affiliations act as localized overrides.
const baseTimeline = buildTimeline(memberOrganizations, fallbackOrganizationId).map((item) => ({
...item,
...nonEmailActivityFilter,
skipManualAffiliationSegments: manualAffiliations.length > 0,
}))

// Only keep items with an actual org. Gaps (null org) are already handled by the base timeline.
// Activities on a member-org email domain belong to that org, overriding whatever
// role the date-based timeline would have picked during an overlap.
const domainsByOrgId = _.keyBy(memberOrgDomains, 'orgId')
const memberOrgsPerDomain = _.flatMap(memberOrganizations, (memberOrganization) =>
(domainsByOrgId[memberOrganization.organizationId]?.domains ?? []).map((domain) => ({
memberOrganization,
domain,
})),
)

const emailAffiliations = _.flatMap(
_.groupBy(memberOrgsPerDomain, 'domain'),
(entries, matchEmailDomain) => {
const primary = selectPrimaryWorkExperience(entries.map((entry) => entry.memberOrganization))

return [
{
organizationId: primary.organizationId,
dateStart: new Date('1970-01-01').toISOString(),
dateEnd: null,
matchEmailDomain,
skipManualAffiliationSegments: manualAffiliations.length > 0,
},
]
},
)

// Only keep items with a valid org; gaps (null orgs) are already handled by the base timeline.
const manualTimeline = _.flatMap(
_.groupBy(manualAffiliations, 'segmentId'),
(affiliations, segmentId) => {
const items = buildTimeline(affiliations, null, false)
.filter((item) => item.organizationId !== null)
.map((item) => ({ ...item, segmentId }))

// Undated MSAs are invisible to buildTimeline (no dateStart to anchor the loop).
// Create a catch-all so the base pass's NOT EXISTS still has a matching manual item.
// Manual affiliations without dates are ignored by buildTimeline (no anchor point).
// Create a 1970 catch-all so the base pass's SQL `NOT EXISTS` check still matches them.
if (items.length === 0) {
const primary = selectPrimaryWorkExperience(affiliations)
items.push({
Expand All @@ -339,7 +404,7 @@ async function prepareMemberOrganizationAffiliationTimeline(
},
)

return [...baseTimeline, ...manualTimeline]
return [...baseTimeline, ...manualTimeline, ...emailAffiliations]
}

async function processAffiliationActivities(
Expand Down Expand Up @@ -377,6 +442,19 @@ async function processAffiliationActivities(
params.dateEnd = affiliation.dateEnd
}

// Give each pass a disjoint slice of activities by email so no row is written twice: matchEmailDomain
// takes activities on this org's domain, excludeEmailDomains takes the rest (no '@', or a foreign domain).
if (affiliation.matchEmailDomain) {
conditions.push(`split_part(lower(ar.username), '@', 2) = $(matchEmailDomain)`)
params.matchEmailDomain = affiliation.matchEmailDomain
} else if (affiliation.excludeEmailDomains?.length) {
conditions.push(`(
position('@' in coalesce(ar.username, '')) = 0
OR lower(split_part(ar.username, '@', 2)) NOT IN ($(excludeEmailDomains:csv))
)`)
params.excludeEmailDomains = affiliation.excludeEmailDomains
Comment thread
skwowet marked this conversation as resolved.
}
Comment thread
skwowet marked this conversation as resolved.

// Segment filtering (for manual affiliations)
if (affiliation.segmentId) {
conditions.push(`ar."segmentId" = $(segmentId)`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,15 @@ export type TimelineItem = {
organizationId: string | null
segmentId?: string
skipManualAffiliationSegments?: boolean

/**
* Routes activities by their email domain so timeline passes never overwrite each other.
* Claims activities that match this specific organization domain.
*/
matchEmailDomain?: string

/**
* Excludes activities belonging to these specific email domains from being claimed.
*/
excludeEmailDomains?: string[]
}
49 changes: 27 additions & 22 deletions services/libs/data-access-layer/src/members/segments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,42 +207,47 @@ export async function findMemberWorkExperience(
qx: QueryExecutor,
memberId: string,
timestamp: string,
emailDomain?: string,
orgDomain?: string,
): Promise<IWorkExperienceData[] | null> {
let domainOrClause = ''
if (emailDomain) {
domainOrClause = `
OR (mo."source" = 'email-domain' AND EXISTS (
SELECT 1 FROM "organizationIdentities" oi
WHERE oi."organizationId" = mo."organizationId"
AND oi.type = 'primary-domain'
AND oi.verified = true
AND LOWER(oi.value) = LOWER($(emailDomain))
))
`
}
// Base date filter used across all timeline queries
const dateCriteria = `
(mo."dateStart" <= $(timestamp) AND (mo."dateEnd" >= $(timestamp) OR mo."dateEnd" IS NULL))
`

// When an activity has an email domain, strictly force a match against verified org domains.
const activeAtTimestampClause = orgDomain
? `
AND EXISTS (
SELECT 1
FROM "organizationIdentities" oi
WHERE oi."organizationId" = mo."organizationId"
AND oi.type = 'primary-domain'
AND oi.verified = true
AND lower(oi.value) = lower($(orgDomain))
)
`
: `
AND ${dateCriteria}
`

const result = await qx.select(
`
SELECT
mo.*,
coalesce(ovr."isPrimaryWorkExperience", false) as "isPrimaryWorkExperience"
mo.*,
coalesce(ovr."isPrimaryWorkExperience", false) AS "isPrimaryWorkExperience"
FROM "memberOrganizations" mo
LEFT JOIN "memberOrganizationAffiliationOverrides" ovr on ovr."memberOrganizationId" = mo."id"
LEFT JOIN "memberOrganizationAffiliationOverrides" ovr
ON ovr."memberOrganizationId" = mo.id
WHERE mo."memberId" = $(memberId)
AND mo."deletedAt" IS NULL
AND coalesce(ovr."allowAffiliation", true) = true
AND (
(mo."dateStart" <= $(timestamp) AND mo."dateEnd" >= $(timestamp))
OR (mo."dateStart" <= $(timestamp) AND mo."dateEnd" IS NULL)
${domainOrClause}
)
${activeAtTimestampClause}
ORDER BY mo."dateStart" DESC, mo.id
`,
{
memberId,
timestamp,
emailDomain,
orgDomain,
},
)

Expand Down
Loading
Loading