Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a0ec650
feat: organization enrichment v2 workflow and caching mechanism
skwowet Oct 14, 2025
9c847e6
Merge branch 'main' into feat/CM-686
skwowet Oct 14, 2025
1f92c5f
fix: linter and prettier
skwowet Oct 14, 2025
eef4ea1
fix: linter and prettier
skwowet Oct 14, 2025
51c86ce
feat: handle multiple verified primary domains via llm based selection
skwowet Oct 14, 2025
0d55df3
fix: linter and prettier
skwowet Oct 14, 2025
788be3d
refactor: update organization enrichment activities and improve domai…
skwowet Oct 15, 2025
a3ebe52
fix: make organization enrichment cache data nullable and update enri…
skwowet Oct 16, 2025
4c6bd3d
Merge branch 'main' into feat/CM-686
skwowet Oct 16, 2025
96d00b6
fix: update organization enrichment envs
skwowet Oct 16, 2025
12adae9
fix: rm error field in type
skwowet Oct 16, 2025
b4f35cb
fix: pgsql alias in query
skwowet Oct 16, 2025
82d6c98
chore: reduce query for enrichable organizations for testing
skwowet Oct 16, 2025
76a4fed
chore: add debugger logs
skwowet Oct 16, 2025
891e680
chore: add more debugger logs
skwowet Oct 16, 2025
b84bd89
chore: add more debug loggers
skwowet Oct 16, 2025
4d8ed76
fix: update internal API request format
skwowet Oct 16, 2025
ccd8cad
fix: update organization attribute mapping and API request format
skwowet Oct 16, 2025
9648d53
fix: update organization attribute source to use internal API
skwowet Oct 16, 2025
9bd3691
fix: add internal API to organization attribute source priority
skwowet Oct 16, 2025
2351f57
chore: add LLM edge case debugger
skwowet Oct 16, 2025
d744073
fix: llm model max_tokens
skwowet Oct 16, 2025
1207bc8
fix: edge cases with llm and also rm debug logs
skwowet Oct 16, 2025
5ec8f4b
fix: pr comments by cursor-bot
skwowet Oct 17, 2025
a42af02
fix: pr comments by cursor-bot
skwowet Oct 17, 2025
3bd97f3
Merge branch 'main' into feat/CM-686
skwowet Oct 17, 2025
861b33f
fix: resolve pr reviews and comments
skwowet Oct 17, 2025
e5f5bf4
refactor: rename EnrichmentService class and update attribute sources…
skwowet Oct 17, 2025
fdc0782
fix: update member enrichment last tried timestamp on failure
skwowet Oct 17, 2025
19d3fbc
fix: reduce max concurrent requests from 5 to 3
skwowet Oct 17, 2025
6676423
fix: update platform assignment in email identity creation
skwowet Oct 17, 2025
c1c15c6
Merge branch 'main' into feat/CM-686
skwowet Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- drop old materialized view if exists
drop materialized view if exists "organizationsGlobalActivityCount" cascade;

-- create the materialized view
create materialized view "organizationsGlobalActivityCount" as
select
osa."organizationId",
sum(osa."activityCount") as total_count
Comment thread
skwowet marked this conversation as resolved.
Outdated
from "organizationSegmentsAgg" osa
where osa."segmentId" in (
select id
from segments
where "grandparentId" is not null
and "parentId" is not null
)
group by osa."organizationId"
order by sum(osa."activityCount") desc;

create unique index ix_organization_global_activity_count_organization_id
on "organizationsGlobalActivityCount" ("organizationId");

create index ix_organization_global_activity_count on "organizationsGlobalActivityCount" (total_count);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table "organizationEnrichmentCache"
alter column "data" drop not null;
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { OrganizationField, queryOrgs } from '@crowd/data-access-layer'
import {
deleteMemberAffiliations,
fetchMemberAffiliations,
insertMemberAffiliations,
} from '@crowd/data-access-layer/src/member_segment_affiliations'
import { OrganizationField, queryOrgs } from '@crowd/data-access-layer/src/orgs'
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
import { IMemberAffiliation, IOrganization, SegmentData } from '@crowd/types'

Expand Down
3 changes: 2 additions & 1 deletion backend/src/database/repositories/memberRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import {
} from '@crowd/common'
import { CommonMemberService } from '@crowd/common_services'
import {
OrganizationField,
getActiveMembers,
getLastActivitiesForMembers,
queryActivityRelations,
queryOrgs,
} from '@crowd/data-access-layer'
import { findManyLfxMemberships } from '@crowd/data-access-layer/src/lfx_memberships'
import { findMaintainerRoles } from '@crowd/data-access-layer/src/maintainers'
Expand All @@ -41,7 +43,6 @@ import {
includeMemberToSegments,
} from '@crowd/data-access-layer/src/members/segments'
import { IDbMemberData } from '@crowd/data-access-layer/src/members/types'
import { OrganizationField, queryOrgs } from '@crowd/data-access-layer/src/orgs'
import { optionsQx } from '@crowd/data-access-layer/src/queryExecutor'
import {
fetchManySegments,
Expand Down
20 changes: 19 additions & 1 deletion backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IDbOrgAttribute,
IDbOrganization,
OrgIdentityField,
OrganizationField,
addOrgIdentity,
addOrgsToSegments,
cleanUpOrgIdentities,
Expand Down Expand Up @@ -1131,7 +1132,24 @@ class OrganizationRepository {
(a, b) => b.identities.length - a.identities.length,
)[0].organizationId

const result = await findOrgById(qx, orgIdWithMostIdentities)
const result = await findOrgById(qx, orgIdWithMostIdentities, [
OrganizationField.ID,
OrganizationField.DISPLAY_NAME,
OrganizationField.DESCRIPTION,
OrganizationField.LOGO,
OrganizationField.TAGS,
OrganizationField.EMPLOYEES,
OrganizationField.REVENUE_RANGE,
OrganizationField.IMPORT_HASH,
OrganizationField.LOCATION,
OrganizationField.TYPE,
OrganizationField.SIZE,
OrganizationField.HEADLINE,
OrganizationField.INDUSTRY,
OrganizationField.FOUNDED,
OrganizationField.IS_TEAM_ORGANIZATION,
OrganizationField.MANUALLY_CREATED,
])

return result
}
Expand Down
3 changes: 1 addition & 2 deletions backend/src/services/collectionService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { uniq } from 'lodash'

import { getCleanString } from '@crowd/common'
import { QueryExecutor } from '@crowd/data-access-layer'
import { OrganizationField, QueryExecutor, findOrgById, queryOrgs } from '@crowd/data-access-layer'
import { listCategoriesByIds } from '@crowd/data-access-layer/src/categories'
import {
CollectionField,
Expand Down Expand Up @@ -29,7 +29,6 @@ import {
fetchIntegrationsForSegment,
removePlainGitHubRepoMapping,
} from '@crowd/data-access-layer/src/integrations'
import { OrganizationField, findOrgById, queryOrgs } from '@crowd/data-access-layer/src/orgs'
import { QueryFilter } from '@crowd/data-access-layer/src/query'
import {
ICreateRepositoryGroup,
Expand Down
2 changes: 1 addition & 1 deletion backend/src/services/dataIssueService.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import axios from 'axios'

import { createHeading, createParagraph } from '@crowd/common'
import { OrganizationField, findOrgById } from '@crowd/data-access-layer'
import { createDataIssue } from '@crowd/data-access-layer/src/data_issues'
import { MemberField, findMemberById } from '@crowd/data-access-layer/src/members'
import { OrganizationField, findOrgById } from '@crowd/data-access-layer/src/orgs'
import { PgPromiseQueryExecutor } from '@crowd/data-access-layer/src/queryExecutor'
import { LoggerBase } from '@crowd/logging'
import { DataIssueEntity } from '@crowd/types'
Expand Down
15 changes: 12 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions services/apps/members_enrichment_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import {
isCacheObsolete,
normalizeEnrichmentData,
refreshMemberEnrichmentMaterializedView,
setMemberEnrichmentTryDate,
squashMultipleValueAttributesWithLLM,
squashWorkExperiencesWithLLM,
touchMemberEnrichmentCacheUpdatedAt,
touchMemberEnrichmentLastTriedAt,
updateMemberEnrichmentCache,
updateMemberUsingSquashedPayload,
} from './activities/enrichment'
Expand Down Expand Up @@ -45,7 +45,7 @@ import {
} from './activities/syncEnrichedData'

export {
setMemberEnrichmentTryDate,
touchMemberEnrichmentLastTriedAt,
getMemberById,
getEnrichableMembers,
getEnrichmentData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import {
findMemberEnrichmentCacheForAllSourcesDb,
insertMemberEnrichmentCacheDb,
insertWorkExperience,
setMemberEnrichmentTryDate as setMemberEnrichmentTryDateDb,
setMemberEnrichmentUpdateDate as setMemberEnrichmentUpdateDateDb,
setMemberEnrichmentLastTriedAt,
setMemberEnrichmentUpdatedAt,
touchMemberEnrichmentCacheUpdatedAtDb,
updateMemberEnrichmentCacheDb,
updateMemberOrg,
Expand Down Expand Up @@ -63,7 +63,7 @@ async function setRateLimitBackoff(
source: MemberEnrichmentSource,
backoffSeconds: number,
): Promise<void> {
const redisCache = new RedisCache(`enrichment-${source}`, svc.redis, svc.log)
const redisCache = new RedisCache(`member-enrichment-${source}`, svc.redis, svc.log)
const backoff = new RateLimitBackoff(redisCache, 'rate-limit-backoff')
await backoff.set(backoffSeconds)
}
Expand Down Expand Up @@ -147,7 +147,7 @@ export async function setHasRemainingCredits(
source: MemberEnrichmentSource,
hasCredits: boolean,
): Promise<void> {
const redisCache = new RedisCache(`enrichment-${source}`, svc.redis, svc.log)
const redisCache = new RedisCache(`member-enrichment-${source}`, svc.redis, svc.log)
if (hasCredits) {
await redisCache.set('hasRemainingCredits', 'true', 60)
} else {
Expand All @@ -156,12 +156,12 @@ export async function setHasRemainingCredits(
}

export async function getHasRemainingCredits(source: MemberEnrichmentSource): Promise<boolean> {
const redisCache = new RedisCache(`enrichment-${source}`, svc.redis, svc.log)
const redisCache = new RedisCache(`member-enrichment-${source}`, svc.redis, svc.log)
return (await redisCache.get('hasRemainingCredits')) === 'true'
}

export async function hasRemainingCreditsExists(source: MemberEnrichmentSource): Promise<boolean> {
const redisCache = new RedisCache(`enrichment-${source}`, svc.redis, svc.log)
const redisCache = new RedisCache(`member-enrichment-${source}`, svc.redis, svc.log)
return await redisCache.exists('hasRemainingCredits')
}

Expand Down Expand Up @@ -221,6 +221,10 @@ export async function updateMemberEnrichmentCache(
await updateMemberEnrichmentCacheDb(svc.postgres.writer.connection(), data, memberId, source)
}

export async function touchMemberEnrichmentLastTriedAt(memberId: string): Promise<void> {
await setMemberEnrichmentLastTriedAt(svc.postgres.writer.connection(), memberId)
}

export async function touchMemberEnrichmentCacheUpdatedAt(
source: MemberEnrichmentSource,
memberId: string,
Expand Down Expand Up @@ -453,10 +457,8 @@ export async function updateMemberUsingSquashedPayload(
await Promise.all(promises)

if (updated) {
await setMemberEnrichmentUpdateDateDb(tx.transaction(), memberId)
await setMemberEnrichmentUpdatedAt(tx.transaction(), memberId)
Comment thread
skwowet marked this conversation as resolved.
await syncMember(memberId)
} else {
await setMemberEnrichmentTryDateDb(tx.transaction(), memberId)
}

svc.log.debug({ memberId }, 'Member sources processed successfully!')
Expand Down Expand Up @@ -508,10 +510,6 @@ export function doesIncomingOrgExistInExistingOrgs(
)
}

export async function setMemberEnrichmentTryDate(memberId: string): Promise<void> {
await setMemberEnrichmentTryDateDb(svc.postgres.writer.connection(), memberId)
}

export async function getObsoleteSourcesOfMember(
memberId: string,
possibleSources: MemberEnrichmentSource[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { fetchMembersForEnrichment } from '@crowd/data-access-layer/src/old/apps
import { RateLimitBackoff, RedisCache } from '@crowd/redis'
import {
IEnrichableMember,
IMemberEnrichmentSourceQueryInput,
IEnrichmentSourceQueryInput,
MemberEnrichmentSource,
} from '@crowd/types'

Expand All @@ -30,14 +30,16 @@ export async function getEnrichableMembers(
).filter((s): s is MemberEnrichmentSource => s !== null)

let rows: IEnrichableMember[] = []
const sourceInputs: IMemberEnrichmentSourceQueryInput[] = availableSources.map((s) => {
const srv = EnrichmentSourceServiceFactory.getEnrichmentSourceService(s, svc.log)
return {
source: s,
cacheObsoleteAfterSeconds: srv.cacheObsoleteAfterSeconds,
enrichableBySql: srv.enrichableBySql,
}
})
const sourceInputs: IEnrichmentSourceQueryInput<MemberEnrichmentSource>[] = availableSources.map(
(s) => {
const srv = EnrichmentSourceServiceFactory.getEnrichmentSourceService(s, svc.log)
return {
source: s,
cacheObsoleteAfterSeconds: srv.cacheObsoleteAfterSeconds,
enrichableBySql: srv.enrichableBySql,
}
},
)

const db = svc.postgres.reader
rows = await fetchMembersForEnrichment(db, limit, sourceInputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const {
findMemberEnrichmentCache,
insertMemberEnrichmentCache,
touchMemberEnrichmentCacheUpdatedAt,
touchMemberEnrichmentLastTriedAt,
updateMemberEnrichmentCache,
isCacheObsolete,
getEnrichmentInput,
Expand Down Expand Up @@ -61,6 +62,9 @@ export async function enrichMember(

const data = await getEnrichmentData(source, enrichmentInput)

// Record enrichment attempt
await touchMemberEnrichmentLastTriedAt(input.id)
Comment thread
skwowet marked this conversation as resolved.
Outdated

if (!cache) {
await insertMemberEnrichmentCache(source, input.id, data)
if (data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const {
squashMultipleValueAttributesWithLLM,
squashWorkExperiencesWithLLM,
updateMemberUsingSquashedPayload,
setMemberEnrichmentTryDate,
cleanAttributeValue,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '2 minutes',
Expand Down Expand Up @@ -310,7 +309,5 @@ export async function processMemberSources(args: IProcessMemberSourcesArgs): Pro
return memberUpdated
}

await setMemberEnrichmentTryDate(args.memberId)

return false
}
5 changes: 4 additions & 1 deletion services/apps/organizations_enrichment_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
"@temporalio/workflow": "~1.11.8",
"peopledatalabs": "~6.1.5",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
"typescript": "^5.6.3",
"lodash.isequal": "^4.5.0",
"lodash.uniqby": "^4.7.0",
"axios": "~1.6.2"
},
"devDependencies": {
"@types/node": "^20.8.2",
Expand Down
37 changes: 29 additions & 8 deletions services/apps/organizations_enrichment_worker/src/activities.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
import {
getMaxEnrichedOrganizationsPerExecution,
getOrganizationsToEnrich,
syncToOpensearch,
tryEnrichOrganization,
applyEnrichmentToOrganization,
createOrganizationEnrichmentCache,
findOrganizationEnrichmentCache,
getEnrichmentData,
getEnrichmentInput,
getMaxConcurrentRequests,
isCacheObsolete,
normalizeEnrichmentData,
refreshOrganizationEnrichmentMaterializedView,
touchOrganizationEnrichmentCacheUpdatedAt,
touchOrganizationEnrichmentLastTriedAt,
updateOrganizationEnrichmentCache,
} from './activities/enrichment'
import { selectMostRelevantDomainWithLLM } from './activities/llm'
import { findOrganizationById, getEnrichableOrganizations } from './activities/organization'

export {
syncToOpensearch,
getOrganizationsToEnrich,
tryEnrichOrganization,
getMaxEnrichedOrganizationsPerExecution,
getMaxConcurrentRequests,
findOrganizationById,
getEnrichableOrganizations,
findOrganizationEnrichmentCache,
isCacheObsolete,
refreshOrganizationEnrichmentMaterializedView,
getEnrichmentInput,
getEnrichmentData,
createOrganizationEnrichmentCache,
updateOrganizationEnrichmentCache,
touchOrganizationEnrichmentCacheUpdatedAt,
touchOrganizationEnrichmentLastTriedAt,
normalizeEnrichmentData,
applyEnrichmentToOrganization,
selectMostRelevantDomainWithLLM,
}
Loading
Loading