-
Notifications
You must be signed in to change notification settings - Fork 731
fix: prevent enrichment timeouts and activityRelations write churn (CM-1179) #4098
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
83e38bd
ebab86d
db5ab12
3dba7bb
90c05d4
0a6f165
3e323a0
afd9f9b
dd6f728
c497bac
0d98e71
58f8308
c28412c
ab2d1a7
236562d
c8ee3e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,44 +43,46 @@ export async function enrichMember( | |
| // skip enrichment if member no longer exists | ||
| if (!member) return | ||
|
|
||
| let changeInEnrichmentSourceData = false | ||
| // Fetch all sources in parallel so one slow or rate-limited source doesn't block the rest. | ||
| // The workflow fails fast, but source activities already in flight may still update cache. | ||
| const sourceResults = await Promise.all( | ||
| sources.map(async (source): Promise<boolean> => { | ||
| const caches = await findMemberEnrichmentCache([source], input.id) | ||
|
skwowet marked this conversation as resolved.
Outdated
|
||
| const cache = caches.find((c) => c.source === source) | ||
|
|
||
| for (const source of sources) { | ||
| // find if there's already saved enrichment data in source | ||
| const caches = await findMemberEnrichmentCache([source], input.id) | ||
| const cache = caches.find((c) => c.source === source) | ||
| // cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds | ||
| if (await isCacheObsolete(source, cache)) { | ||
| const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant
|
||
|
|
||
| // cache is obsolete when it's not found or cache.updatedAt is older than cacheObsoleteAfterSeconds | ||
| if (await isCacheObsolete(source, cache)) { | ||
| const enrichmentInput: IEnrichmentSourceInput = await getEnrichmentInput(input) | ||
|
|
||
| if (!(await hasRemainingCredits(source))) { | ||
| // no credits remaining, throw error to fail the workflow | ||
| throw ApplicationFailure.create({ | ||
| message: `No credits remaining for source ${source}`, | ||
| type: 'MEMBER_ENRICHMENT_NO_CREDITS', | ||
| nonRetryable: true, | ||
| }) | ||
| } | ||
| if (!(await hasRemainingCredits(source))) { | ||
| // no credits remaining, throw error to fail the workflow | ||
| throw ApplicationFailure.create({ | ||
| message: `No credits remaining for source ${source}`, | ||
| type: 'MEMBER_ENRICHMENT_NO_CREDITS', | ||
| nonRetryable: true, | ||
| }) | ||
| } | ||
|
|
||
| const data = await getEnrichmentData(source, enrichmentInput) | ||
| const data = await getEnrichmentData(source, enrichmentInput) | ||
|
|
||
| if (!cache) { | ||
| await insertMemberEnrichmentCache(source, input.id, data) | ||
| if (data) { | ||
| changeInEnrichmentSourceData = true | ||
| if (!cache) { | ||
| await insertMemberEnrichmentCache(source, input.id, data) | ||
| return !!data | ||
| } else if (data === null && cache.data !== null) { | ||
| await touchMemberEnrichmentCacheUpdatedAt(source, input.id) | ||
| } else if (sourceHasDifferentDataComparedToCache(cache, data)) { | ||
| await updateMemberEnrichmentCache(source, input.id, data) | ||
| return true | ||
| } else { | ||
| // data is same as cache, only update cache.updatedAt | ||
| await touchMemberEnrichmentCacheUpdatedAt(source, input.id) | ||
| } | ||
| } else if (data === null && cache.data !== null) { | ||
| await touchMemberEnrichmentCacheUpdatedAt(source, input.id) | ||
| } else if (sourceHasDifferentDataComparedToCache(cache, data)) { | ||
| await updateMemberEnrichmentCache(source, input.id, data) | ||
| changeInEnrichmentSourceData = true | ||
| } else { | ||
| // data is same as cache, only update cache.updatedAt | ||
| await touchMemberEnrichmentCacheUpdatedAt(source, input.id) | ||
| } | ||
| } | ||
| } | ||
| return false | ||
| }), | ||
| ) | ||
|
skwowet marked this conversation as resolved.
|
||
|
|
||
| const changeInEnrichmentSourceData = sourceResults.some(Boolean) | ||
|
|
||
| if (changeInEnrichmentSourceData && input.activityCount > 100) { | ||
| // Member enrichment data has been updated, use squasher again! | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.