Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import commandLineArgs from 'command-line-args'
import { randomUUID } from 'crypto'

import { DEFAULT_TENANT_ID } from '@crowd/common'
import { getDbConnection } from '@crowd/data-access-layer/src/database'
import { getServiceLogger } from '@crowd/logging'
import { getTemporalClient } from '@crowd/temporal'
import { WorkflowIdConflictPolicy, getTemporalClient } from '@crowd/temporal'
import { TemporalWorkflowId } from '@crowd/types'

import { DB_CONFIG, TEMPORAL_CONFIG } from '@/conf'

Expand Down Expand Up @@ -99,25 +100,24 @@ setImmediate(async () => {
try {
log.info(`Processing member: ${member.id}`)

const uuid = randomUUID()

await temporal.workflow.start('memberUpdate', {
await temporal.workflow.signalWithStart('memberUpdate', {
taskQueue: 'profiles',
workflowId: `member-update-fix-unaffiliation/${organizationId}/${member.id}/${uuid}`,
retry: {
maximumAttempts: 10,
},
args: [
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${member.id}`,
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
signal: 'refreshAffiliations',
signalArgs: [
{
member: {
id: member.id,
},
member: { id: member.id },
memberOrganizationIds: [organizationId],
syncToOpensearch: false,
},
],
retry: {
maximumAttempts: 10,
},
args: [],
searchAttributes: {
TenantId: ['875c38bd-2b1b-4e91-ad07-0cfbabb4c49f'], // default tenantId
TenantId: [DEFAULT_TENANT_ID],
},
})

Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 15 additions & 42 deletions services/apps/entity_merging_worker/src/activities/members.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow'
import { WorkflowIdConflictPolicy } from '@temporalio/workflow'

import { DEFAULT_TENANT_ID } from '@crowd/common'
import {
Expand Down Expand Up @@ -27,47 +27,20 @@ export async function deleteMember(memberId: string): Promise<void> {
export async function recalculateActivityAffiliationsOfMemberAsync(
memberId: string,
): Promise<void> {
const workflowId = `${TemporalWorkflowId.MEMBER_UPDATE}/${memberId}`

try {
const handle = svc.temporal.workflow.getHandle(workflowId)
const { status } = await handle.describe()

if (status.name === 'RUNNING') {
await handle.result()
}
} catch (err) {
if (err.name !== 'WorkflowNotFoundError') {
svc.log.error({ err }, 'Failed to check workflow state')
throw err
}
}

try {
await svc.temporal.workflow.start('memberUpdate', {
taskQueue: 'profiles',
workflowId,
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL,
retry: {
maximumAttempts: 10,
},
args: [
{
member: {
id: memberId,
},
},
],
})
} catch (err) {
if (err.name === 'WorkflowExecutionAlreadyStartedError') {
svc.log.info({ workflowId }, 'Workflow already started, skipping')
return
}

throw err
}
await svc.temporal.workflow.signalWithStart('memberUpdate', {
taskQueue: 'profiles',
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${memberId}`,
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
signal: 'refreshAffiliations',
signalArgs: [{ member: { id: memberId }, memberOrganizationIds: [], syncToOpensearch: false }],
retry: {
maximumAttempts: 10,
},
args: [],
searchAttributes: {
TenantId: [DEFAULT_TENANT_ID],
},
})
}

export async function syncMember(memberId: string): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ export async function updateMemberUsingSquashedPayload(
): Promise<boolean> {
const affectedOrgIds: string[] = []
const orgIdsToSync: string[] = []
let affiliationNeedsRefresh = false

const wasUpdated = await svc.postgres.writer.transactionally(async (tx) => {
let didUpdate = false
Expand Down Expand Up @@ -581,6 +582,12 @@ export async function updateMemberUsingSquashedPayload(
isHighConfidenceSourceSelectedForWorkExperiences,
)

// Enrichment often deletes and recreates the same orgs with identical dates.
// Skip the refresh when the timeline that drives activityRelations hasn't changed.
affiliationNeedsRefresh =
results.toUpdate.size > 0 ||
hasMemberOrganizationTimelineChange(results.toDelete, results.toCreate)
Comment thread
skwowet marked this conversation as resolved.

if (results.toDelete.length > 0) {
for (const org of results.toDelete) {
didUpdate = true
Expand Down Expand Up @@ -676,7 +683,7 @@ export async function updateMemberUsingSquashedPayload(
)
}

if (affectedOrgIds.length > 0) {
if (affiliationNeedsRefresh && affectedOrgIds.length > 0) {
const commonMemberService = new CommonMemberService(
pgpQx(svc.postgres.writer.connection()),
svc.temporal,
Expand Down Expand Up @@ -774,6 +781,28 @@ function sanitizeWorkExperienceDateRanges(
})
}

/**
* Returns true when the set of (orgId, startDate, endDate) tuples differs
* between deletes and creates. Fields like title or source don't affect
* the affiliation timeline, so they're intentionally ignored.
*/
function hasMemberOrganizationTimelineChange(
toDelete: IMemberOrganizationData[],
toCreate: IMemberEnrichmentDataNormalizedOrganization[],
): boolean {
const toKey = (orgId: string, start: string | null | undefined, end: string | null | undefined) =>
`${orgId}|${start ? start.substring(0, 10) : ''}|${end ? end.substring(0, 10) : ''}`

const deletedKeys = new Set(toDelete.map((d) => toKey(d.orgId, d.dateStart, d.dateEnd)))
const createdKeys = new Set(toCreate.map((c) => toKey(c.organizationId, c.startDate, c.endDate)))

if (deletedKeys.size !== createdKeys.size) return true
for (const key of deletedKeys) {
if (!createdKeys.has(key)) return true
}
return false
}

function prepareWorkExperiences(
oldVersion: IMemberOrganizationData[],
newVersion: IMemberEnrichmentDataNormalizedOrganization[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export const scheduleMembersEnrichment = async () => {
type: 'startWorkflow',
workflowType: triggerMembersEnrichment,
taskQueue: 'members-enrichment',
workflowExecutionTimeout: '2 hours',
workflowExecutionTimeout: '3 hours',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,46 @@ export async function enrichMember(
// skip enrichment if member no longer exists
if (!member) return

let changeInEnrichmentSourceData = false
// Fetch all sources in parallel so one slow or rate-limited source doesn't block the rest.
// The workflow fails fast, but source activities already in flight may still update cache.
const sourceResults = await Promise.all(
sources.map(async (source): Promise<boolean> => {
const caches = await findMemberEnrichmentCache([source], input.id)
Comment thread
skwowet marked this conversation as resolved.
Outdated
const cache = caches.find((c) => c.source === source)

for (const source of sources) {
// find if there's already saved enrichment data in source
const caches = await findMemberEnrichmentCache([source], input.id)
const cache = caches.find((c) => c.source === source)
// cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds
if (await isCacheObsolete(source, cache)) {
const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant getEnrichmentInput activity calls per source

Low Severity

The PR specifically hoisted findMemberEnrichmentCache outside the per-source loop to avoid redundant DB round-trips (line 47), yet getEnrichmentInput(input) is still called inside the Promise.all mapper — once per source with an obsolete cache. Since getEnrichmentInput depends only on input (not source), every call schedules an identical Temporal activity that returns the same result. With Promise.all, these N identical activities now run in parallel, wasting worker resources and inflating the workflow history.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c8ee3e9. Configure here.


// cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds
if (await isCacheObsolete(source, cache)) {
const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input)

if (!(await hasRemainingCredits(source))) {
// no credits remaining, throw error to fail the workflow
throw ApplicationFailure.create({
message: `No credits remaining for source ${source}`,
type: 'MEMBER_ENRICHMENT_NO_CREDITS',
nonRetryable: true,
})
}
if (!(await hasRemainingCredits(source))) {
// no credits remaining, throw error to fail the workflow
throw ApplicationFailure.create({
message: `No credits remaining for source ${source}`,
type: 'MEMBER_ENRICHMENT_NO_CREDITS',
nonRetryable: true,
})
}

const data = await getEnrichmentData(source, enrichmentInput)
const data = await getEnrichmentData(source, enrichmentInput)

if (!cache) {
await insertMemberEnrichmentCache(source, input.id, data)
if (data) {
changeInEnrichmentSourceData = true
if (!cache) {
await insertMemberEnrichmentCache(source, input.id, data)
return !!data
} else if (data === null && cache.data !== null) {
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
} else if (sourceHasDifferentDataComparedToCache(cache, data)) {
await updateMemberEnrichmentCache(source, input.id, data)
return true
} else {
// data is same as cache, only update cache.updatedAt
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
}
} else if (data === null && cache.data !== null) {
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
} else if (sourceHasDifferentDataComparedToCache(cache, data)) {
await updateMemberEnrichmentCache(source, input.id, data)
changeInEnrichmentSourceData = true
} else {
// data is same as cache, only update cache.updatedAt
await touchMemberEnrichmentCacheUpdatedAt(source, input.id)
}
}
}
return false
}),
)
Comment thread
skwowet marked this conversation as resolved.

const changeInEnrichmentSourceData = sourceResults.some(Boolean)

if (changeInEnrichmentSourceData && input.activityCount > 100) {
// Member enrichment data has been updated, use squasher again!
Expand Down
5 changes: 3 additions & 2 deletions services/apps/profiles_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
"@crowd/data-access-layer": "workspace:*",
"@crowd/opensearch": "workspace:*",
"@crowd/types": "workspace:*",
"@temporalio/activity": "~1.11.8",
"@temporalio/client": "~1.11.8",
"@temporalio/workflow": "~1.11.8",
"axios": "^1.6.8",
"lodash.mergewith": "^4.6.2",
"tsx": "^4.7.1",
"typescript": "^5.6.3",
"lodash.mergewith": "^4.6.2"
"typescript": "^5.6.3"
},
"devDependencies": {
"@types/node": "^20.8.2",
Expand Down
7 changes: 6 additions & 1 deletion services/apps/profiles_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ import {
calculateProjectMemberAggregates,
getSegmentHierarchy,
} from './activities/member/memberAggregates'
import { syncMember, updateMemberAffiliations } from './activities/member/memberUpdate'
import {
syncMember,
triggerMemberAffiliationsRefresh,
updateMemberAffiliations,
} from './activities/member/memberUpdate'
import {
calculateProjectGroupOrganizationAggregates,
calculateProjectOrganizationAggregates,
Expand All @@ -28,6 +32,7 @@ import {

export {
updateMemberAffiliations,
triggerMemberAffiliationsRefresh,
syncMember,
syncOrganization,
findMembersInOrganization,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,50 @@
import { heartbeat } from '@temporalio/activity'
import { WorkflowIdConflictPolicy } from '@temporalio/client'

import { DEFAULT_TENANT_ID } from '@crowd/common'
import { refreshMemberOrganizationAffiliations } from '@crowd/data-access-layer/src/member-organization-affiliation'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { SearchSyncApiClient } from '@crowd/opensearch'
import { TemporalWorkflowId } from '@crowd/types'

import { svc } from '../../main'

/*
updateMemberAffiliations is a Temporal activity that updates all affiliations for
a given member.
*/
export async function updateMemberAffiliations(memberId: string): Promise<void> {
const qx = pgpQx(svc.postgres.writer.connection())
await refreshMemberOrganizationAffiliations(qx, memberId)
}

export async function triggerMemberAffiliationsRefresh(
memberId: string,
memberOrganizationIds: string[] = [],
syncToOpensearch = false,
waitForCompletion = false,
): Promise<void> {
const handle = await svc.temporal.workflow.signalWithStart('memberUpdate', {
taskQueue: 'profiles',
workflowId: `${TemporalWorkflowId.MEMBER_UPDATE}/${memberId}`,
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
signal: 'refreshAffiliations',
signalArgs: [{ member: { id: memberId }, memberOrganizationIds, syncToOpensearch }],
retry: {
maximumAttempts: 10,
},
args: [],
searchAttributes: {
TenantId: [DEFAULT_TENANT_ID],
},
})

if (waitForCompletion) {
const heartbeatInterval = setInterval(() => heartbeat({ memberId }), 30_000)
try {
await handle.result()
} finally {
clearInterval(heartbeatInterval)
}
Comment thread
skwowet marked this conversation as resolved.
Outdated
}
}

export async function syncMember(memberId: string): Promise<void> {
const syncApi = new SearchSyncApiClient({
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'],
Expand Down
Loading
Loading