Skip to content

Commit 13c3933

Browse files
authored
fix: integration results errors with no identity for platform (CM-1111) (#4078)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 2180328 commit 13c3933

4 files changed

Lines changed: 120 additions & 75 deletions

File tree

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -334,18 +334,40 @@ export default class ActivityService extends LoggerBase {
334334
if (identities.length === 1) {
335335
activity.username = identities[0].value
336336
} else if (identities.length === 0) {
337-
this.log.error(
338-
{ platform, activity },
339-
`Activity's member does not have an identity for the platform!`,
337+
// Fall back to same-platform email identity — handles old gerrit records where
338+
// only a type:email identity was stored (before the gerrit integration
339+
// gained the email-as-username fallback).
340+
const emailFallback = activity.member.identities.find(
341+
(i) => i.platform === platform && i.type === MemberIdentityType.EMAIL && i.value,
340342
)
341-
results.set(resultId, {
342-
success: false,
343-
err: new UnrepeatableError(
344-
`Activity's member does not have an identity for the platform: ${platform}!`,
345-
),
346-
})
347-
348-
continue
343+
if (emailFallback && emailFallback.verified) {
344+
activity.username = emailFallback.value
345+
activity.member.identities.push({
346+
platform,
347+
type: MemberIdentityType.USERNAME,
348+
value: emailFallback.value,
349+
verified: true,
350+
source: emailFallback.source,
351+
})
352+
} else if (emailFallback) {
353+
// Email identity exists but is unverified — cannot safely use as username key.
354+
results.set(resultId, {
355+
success: false,
356+
err: new UnrepeatableError(
357+
`Activity's member has no verified username or email identity for platform: ${platform}!`,
358+
),
359+
})
360+
continue
361+
} else {
362+
// No usable identity at all (e.g. git commit with empty author email).
363+
// Nothing to attribute — skip silently rather than error.
364+
this.log.warn(
365+
{ platform, resultId },
366+
`Activity's member has no usable identity for the platform, skipping.`,
367+
)
368+
results.set(resultId, { success: true })
369+
continue
370+
}
349371
} else {
350372
this.log.error(
351373
{ platform, activity },
@@ -459,7 +481,13 @@ export default class ActivityService extends LoggerBase {
459481
if (!success) {
460482
resultMap.set(resultId, { success: false, err })
461483
} else {
462-
relevantPayloads.push(single(payloads, (a) => a.resultId === resultId))
484+
const payload = single(payloads, (a) => a.resultId === resultId)
485+
if (!payload.activity.username?.trim()) {
486+
// prepareMemberData found no usable identity — mark as processed and skip.
487+
resultMap.set(resultId, { success: true })
488+
} else {
489+
relevantPayloads.push(payload)
490+
}
463491
}
464492
}
465493

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,10 @@ export default class MemberService extends LoggerBase {
477477
}
478478

479479
const key = orgCacheKey(org)
480+
const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined
480481
let orgIdPromise: Promise<string | undefined>
481-
if (key && orgPromiseCache?.has(key)) {
482-
orgIdPromise = orgPromiseCache.get(key)
482+
if (cachedOrgPromise) {
483+
orgIdPromise = cachedOrgPromise
483484
} else {
484485
orgIdPromise = logExecutionTimeV2(
485486
() => orgService.findOrCreate(platform, integrationId, org),
@@ -492,10 +493,12 @@ export default class MemberService extends LoggerBase {
492493
}
493494
}
494495
const orgId = await orgIdPromise
495-
organizations.push({
496-
id: orgId,
497-
source: org.source,
498-
})
496+
if (orgId) {
497+
organizations.push({
498+
id: orgId,
499+
source: org.source,
500+
})
501+
}
499502
}
500503
}
501504

@@ -711,9 +714,10 @@ export default class MemberService extends LoggerBase {
711714
this.log.trace({ memberId: id }, 'Finding or creating organization!')
712715

713716
const key = orgCacheKey(org)
717+
const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined
714718
let orgIdPromise: Promise<string | undefined>
715-
if (key && orgPromiseCache?.has(key)) {
716-
orgIdPromise = orgPromiseCache.get(key)
719+
if (cachedOrgPromise) {
720+
orgIdPromise = cachedOrgPromise
717721
} else {
718722
orgIdPromise = logExecutionTimeV2(
719723
() => orgService.findOrCreate(platform, integrationId, org),
@@ -726,10 +730,12 @@ export default class MemberService extends LoggerBase {
726730
}
727731
}
728732
const orgId = await orgIdPromise
729-
organizations.push({
730-
id: orgId,
731-
source: data.source,
732-
})
733+
if (orgId) {
734+
organizations.push({
735+
id: orgId,
736+
source: data.source,
737+
})
738+
}
733739
}
734740
}
735741

@@ -839,9 +845,10 @@ export default class MemberService extends LoggerBase {
839845
],
840846
}
841847
const key = orgCacheKey(org)
848+
const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined
842849
let orgIdPromise: Promise<string | undefined>
843-
if (key && orgPromiseCache?.has(key)) {
844-
orgIdPromise = orgPromiseCache.get(key)
850+
if (cachedOrgPromise) {
851+
orgIdPromise = cachedOrgPromise
845852
} else {
846853
orgIdPromise = orgService.findOrCreate(
847854
OrganizationAttributeSource.EMAIL,

services/apps/data_sink_worker/src/service/organization.service.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,11 @@ export class OrganizationService extends LoggerBase {
2525
source: string,
2626
integrationId: string,
2727
data: IOrganization,
28-
): Promise<string> {
29-
const id = await this.store.transactionally(async (txStore) => {
28+
): Promise<string | undefined> {
29+
return this.store.transactionally(async (txStore) => {
3030
const qe = dbStoreQx(txStore)
31-
const id = await findOrCreateOrganization(qe, source, data, integrationId, true)
32-
return id
31+
return findOrCreateOrganization(qe, source, data, integrationId, true)
3332
})
34-
35-
if (!id) {
36-
throw new Error('Organization not found or created!')
37-
}
38-
39-
return id
4033
}
4134

4235
public async addToMember(

services/apps/git_integration/src/crowdgit/services/commit/commit_service.py

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -538,47 +538,22 @@ def create_activities_from_commit(
538538
committer_name = commit["committer_name"]
539539
committer_email = commit["committer_email"]
540540

541-
# Create author activity
542-
author = {
543-
"username": author_email,
544-
"displayName": author_name,
545-
"emails": [author_email],
546-
}
547-
activity = self.create_activity(
548-
remote=remote,
549-
commit=commit,
550-
activity_type="authored-commit",
551-
member=author,
552-
source_id=commit_hash,
553-
segment_id=segment_id,
554-
re_onboarding_count=re_onboarding_count,
555-
)
556-
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
557-
activity, segment_id, integration_id
558-
)
559-
activities_db.append(activity_db)
560-
activities_queue.append(activity_kafka)
561-
562-
# Only create committer activity if author and committer are different
563-
if author_name != committer_name or author_email != committer_email:
564-
# IMPORTANT: hash_input has a typo in "commited" instead of "committed"
565-
# however fixing it requires recalculating sourceId/parentSourceId for ALL git activities in db
566-
# so far the typo doesn't have any major effect, since the activity type "committed-commit" is correct
567-
hash_input = f"{commit_hash}commited-commit{committer_email}"
568-
committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest()
569-
570-
committer = {
571-
"username": committer_email,
572-
"displayName": committer_name,
573-
"emails": [committer_email],
541+
# Create author activity — skip if email is empty (no identity to attach to)
542+
author_email = author_email.strip() if author_email else ""
543+
if not author_email:
544+
self.logger.warning(f"Skipping authored-commit for {commit_hash} — empty author email")
545+
else:
546+
author = {
547+
"username": author_email,
548+
"displayName": author_name,
549+
"emails": [author_email],
574550
}
575551
activity = self.create_activity(
576552
remote=remote,
577553
commit=commit,
578-
activity_type="committed-commit",
579-
member=committer,
580-
source_id=committer_source_id,
581-
source_parent_id=commit_hash,
554+
activity_type="authored-commit",
555+
member=author,
556+
source_id=commit_hash,
582557
segment_id=segment_id,
583558
re_onboarding_count=re_onboarding_count,
584559
)
@@ -588,23 +563,65 @@ def create_activities_from_commit(
588563
activities_db.append(activity_db)
589564
activities_queue.append(activity_kafka)
590565

566+
# Only create committer activity if author and committer are different
567+
committer_email = committer_email.strip() if committer_email else ""
568+
if author_name != committer_name or author_email != committer_email:
569+
if not committer_email:
570+
self.logger.warning(
571+
f"Skipping committed-commit for {commit_hash} — empty committer email"
572+
)
573+
else:
574+
# IMPORTANT: hash_input has a typo in "commited" instead of "committed"
575+
# however fixing it requires recalculating sourceId/parentSourceId for ALL git activities in db
576+
# so far the typo doesn't have any major effect, since the activity type "committed-commit" is correct
577+
hash_input = f"{commit_hash}commited-commit{committer_email}"
578+
committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest()
579+
580+
committer = {
581+
"username": committer_email,
582+
"displayName": committer_name,
583+
"emails": [committer_email],
584+
}
585+
activity = self.create_activity(
586+
remote=remote,
587+
commit=commit,
588+
activity_type="committed-commit",
589+
member=committer,
590+
source_id=committer_source_id,
591+
source_parent_id=commit_hash,
592+
segment_id=segment_id,
593+
re_onboarding_count=re_onboarding_count,
594+
)
595+
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
596+
activity, segment_id, integration_id
597+
)
598+
activities_db.append(activity_db)
599+
activities_queue.append(activity_kafka)
600+
591601
# Process extracted activities from commit message
592602
extracted_activities = self.extract_activities(commit["message"])
593603
for extracted_activity in extracted_activities:
594604
activity_type, member_data = list(extracted_activity.items())[0]
595605

606+
trailer_email = (member_data.get("email") or "").strip()
607+
if not trailer_email:
608+
self.logger.warning(
609+
f"Skipping {activity_type} for {commit_hash} — empty email in commit trailer"
610+
)
611+
continue
612+
596613
# Convert activity type to lowercase and add "-commit" suffix
597614
# This matches the legacy behavior: "signed-off-by" -> "signed-off-commit"
598615
activity_type = activity_type.lower().replace("-by", "") + "-commit"
599616

600617
member = {
601618
"displayName": member_data["name"],
602-
"emails": [member_data["email"]],
619+
"emails": [trailer_email],
603620
}
604621

605622
# Generate unique source ID for extracted activity
606623
source_id = hashlib.sha1(
607-
(commit_hash + activity_type + member_data["email"]).encode("utf-8")
624+
(commit_hash + activity_type + trailer_email).encode("utf-8")
608625
).hexdigest()
609626
activity = self.create_activity(
610627
remote=remote,

0 commit comments

Comments
 (0)