diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 849335367b..d20a5b7309 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -334,18 +334,40 @@ export default class ActivityService extends LoggerBase { if (identities.length === 1) { activity.username = identities[0].value } else if (identities.length === 0) { - this.log.error( - { platform, activity }, - `Activity's member does not have an identity for the platform!`, + // Fall back to same-platform email identity — handles old gerrit records where + // only a type:email identity was stored (before the gerrit integration + // gained the email-as-username fallback). + const emailFallback = activity.member.identities.find( + (i) => i.platform === platform && i.type === MemberIdentityType.EMAIL && i.value, ) - results.set(resultId, { - success: false, - err: new UnrepeatableError( - `Activity's member does not have an identity for the platform: ${platform}!`, - ), - }) - - continue + if (emailFallback && emailFallback.verified) { + activity.username = emailFallback.value + activity.member.identities.push({ + platform, + type: MemberIdentityType.USERNAME, + value: emailFallback.value, + verified: true, + source: emailFallback.source, + }) + } else if (emailFallback) { + // Email identity exists but is unverified — cannot safely use as username key. + results.set(resultId, { + success: false, + err: new UnrepeatableError( + `Activity's member has no verified username or email identity for platform: ${platform}!`, + ), + }) + continue + } else { + // No usable identity at all (e.g. git commit with empty author email). + // Nothing to attribute — skip silently rather than error. + this.log.warn( + { platform, resultId }, + `Activity's member has no usable identity for the platform, skipping.`, + ) + results.set(resultId, { success: true }) + continue + } } else { this.log.error( { platform, activity }, @@ -459,7 +481,13 @@ export default class ActivityService extends LoggerBase { if (!success) { resultMap.set(resultId, { success: false, err }) } else { - relevantPayloads.push(single(payloads, (a) => a.resultId === resultId)) + const payload = single(payloads, (a) => a.resultId === resultId) + if (!payload.activity.username?.trim()) { + // prepareMemberData found no usable identity — mark as processed and skip. + resultMap.set(resultId, { success: true }) + } else { + relevantPayloads.push(payload) + } } } diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 74af51d9b0..8bdcd558fd 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -477,9 +477,10 @@ export default class MemberService extends LoggerBase { } const key = orgCacheKey(org) + const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined let orgIdPromise: Promise - if (key && orgPromiseCache?.has(key)) { - orgIdPromise = orgPromiseCache.get(key) + if (cachedOrgPromise) { + orgIdPromise = cachedOrgPromise } else { orgIdPromise = logExecutionTimeV2( () => orgService.findOrCreate(platform, integrationId, org), @@ -492,10 +493,12 @@ export default class MemberService extends LoggerBase { } } const orgId = await orgIdPromise - organizations.push({ - id: orgId, - source: org.source, - }) + if (orgId) { + organizations.push({ + id: orgId, + source: org.source, + }) + } } } @@ -711,9 +714,10 @@ export default class MemberService extends LoggerBase { this.log.trace({ memberId: id }, 'Finding or creating organization!') const key = orgCacheKey(org) + const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined let orgIdPromise: Promise - if (key && orgPromiseCache?.has(key)) { - orgIdPromise = orgPromiseCache.get(key) + if (cachedOrgPromise) { + orgIdPromise = cachedOrgPromise } else { orgIdPromise = logExecutionTimeV2( () => orgService.findOrCreate(platform, integrationId, org), @@ -726,10 +730,12 @@ export default class MemberService extends LoggerBase { } } const orgId = await orgIdPromise - organizations.push({ - id: orgId, - source: data.source, - }) + if (orgId) { + organizations.push({ + id: orgId, + source: data.source, + }) + } } } @@ -839,9 +845,10 @@ export default class MemberService extends LoggerBase { ], } const key = orgCacheKey(org) + const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined let orgIdPromise: Promise - if (key && orgPromiseCache?.has(key)) { - orgIdPromise = orgPromiseCache.get(key) + if (cachedOrgPromise) { + orgIdPromise = cachedOrgPromise } else { orgIdPromise = orgService.findOrCreate( OrganizationAttributeSource.EMAIL, diff --git a/services/apps/data_sink_worker/src/service/organization.service.ts b/services/apps/data_sink_worker/src/service/organization.service.ts index 91ad1c864e..afdace793b 100644 --- a/services/apps/data_sink_worker/src/service/organization.service.ts +++ b/services/apps/data_sink_worker/src/service/organization.service.ts @@ -25,18 +25,11 @@ export class OrganizationService extends LoggerBase { source: string, integrationId: string, data: IOrganization, - ): Promise { - const id = await this.store.transactionally(async (txStore) => { + ): Promise { + return this.store.transactionally(async (txStore) => { const qe = dbStoreQx(txStore) - const id = await findOrCreateOrganization(qe, source, data, integrationId, true) - return id + return findOrCreateOrganization(qe, source, data, integrationId, true) }) - - if (!id) { - throw new Error('Organization not found or created!') - } - - return id } public async addToMember( diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 005aa4b6a5..013167d13d 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -538,47 +538,22 @@ def create_activities_from_commit( committer_name = commit["committer_name"] committer_email = commit["committer_email"] - # Create author activity - author = { - "username": author_email, - "displayName": author_name, - "emails": [author_email], - } - activity = self.create_activity( - remote=remote, - commit=commit, - activity_type="authored-commit", - member=author, - source_id=commit_hash, - segment_id=segment_id, - re_onboarding_count=re_onboarding_count, - ) - activity_db, activity_kafka = self.prepare_activity_for_db_and_queue( - activity, segment_id, integration_id - ) - activities_db.append(activity_db) - activities_queue.append(activity_kafka) - - # Only create committer activity if author and committer are different - if author_name != committer_name or author_email != committer_email: - # IMPORTANT: hash_input has a typo in "commited" instead of "committed" - # however fixing it requires recalculating sourceId/parentSourceId for ALL git activities in db - # so far the typo doesn't have any major effect, since the activity type "committed-commit" is correct - hash_input = f"{commit_hash}commited-commit{committer_email}" - committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest() - - committer = { - "username": committer_email, - "displayName": committer_name, - "emails": [committer_email], + # Create author activity — skip if email is empty (no identity to attach to) + author_email = author_email.strip() if author_email else "" + if not author_email: + self.logger.warning(f"Skipping authored-commit for {commit_hash} — empty author email") + else: + author = { + "username": author_email, + "displayName": author_name, + "emails": [author_email], } activity = self.create_activity( remote=remote, commit=commit, - activity_type="committed-commit", - member=committer, - source_id=committer_source_id, - source_parent_id=commit_hash, + activity_type="authored-commit", + member=author, + source_id=commit_hash, segment_id=segment_id, re_onboarding_count=re_onboarding_count, ) @@ -588,23 +563,65 @@ def create_activities_from_commit( activities_db.append(activity_db) activities_queue.append(activity_kafka) + # Only create committer activity if author and committer are different + committer_email = committer_email.strip() if committer_email else "" + if author_name != committer_name or author_email != committer_email: + if not committer_email: + self.logger.warning( + f"Skipping committed-commit for {commit_hash} — empty committer email" + ) + else: + # IMPORTANT: hash_input has a typo in "commited" instead of "committed" + # however fixing it requires recalculating sourceId/parentSourceId for ALL git activities in db + # so far the typo doesn't have any major effect, since the activity type "committed-commit" is correct + hash_input = f"{commit_hash}commited-commit{committer_email}" + committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest() + + committer = { + "username": committer_email, + "displayName": committer_name, + "emails": [committer_email], + } + activity = self.create_activity( + remote=remote, + commit=commit, + activity_type="committed-commit", + member=committer, + source_id=committer_source_id, + source_parent_id=commit_hash, + segment_id=segment_id, + re_onboarding_count=re_onboarding_count, + ) + activity_db, activity_kafka = self.prepare_activity_for_db_and_queue( + activity, segment_id, integration_id + ) + activities_db.append(activity_db) + activities_queue.append(activity_kafka) + # Process extracted activities from commit message extracted_activities = self.extract_activities(commit["message"]) for extracted_activity in extracted_activities: activity_type, member_data = list(extracted_activity.items())[0] + trailer_email = (member_data.get("email") or "").strip() + if not trailer_email: + self.logger.warning( + f"Skipping {activity_type} for {commit_hash} — empty email in commit trailer" + ) + continue + # Convert activity type to lowercase and add "-commit" suffix # This matches the legacy behavior: "signed-off-by" -> "signed-off-commit" activity_type = activity_type.lower().replace("-by", "") + "-commit" member = { "displayName": member_data["name"], - "emails": [member_data["email"]], + "emails": [trailer_email], } # Generate unique source ID for extracted activity source_id = hashlib.sha1( - (commit_hash + activity_type + member_data["email"]).encode("utf-8") + (commit_hash + activity_type + trailer_email).encode("utf-8") ).hexdigest() activity = self.create_activity( remote=remote,