Skip to content

Commit e30d37c

Browse files
committed
Redesign Molnix Appraisal sync
1 parent af121f4 commit e30d37c

3 files changed

Lines changed: 177 additions & 41 deletions

File tree

api/management/commands/sync_molnix_appraisals.py

Lines changed: 148 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,14 @@ def write_record(record_type, data):
113113
if molnix_id is None:
114114
return False
115115
personnel = None
116-
target_id = data.get("target_id")
117-
if target_id is not None:
118-
personnel = Personnel.objects.filter(molnix_id=target_id).first()
116+
appraised_person_id = data.get("appraised_person_id")
117+
if appraised_person_id is not None:
118+
personnel = Personnel.objects.filter(molnix_id=appraised_person_id).first()
119119
MolnixAppraisal.objects.update_or_create(
120120
molnix_id=molnix_id,
121121
defaults={
122122
"target_id": data.get("target_id"),
123+
"appraised_person_id": data.get("appraised_person_id"),
123124
"deployment_molnix_id": data.get("deployment_molnix_id"),
124125
"stage": data.get("stage"),
125126
"appraisers_count": data.get("appraisers_count"),
@@ -197,24 +198,22 @@ def write_record(record_type, data):
197198
event_person_role = data.get("event_person_role")
198199
if event_id is None or person_id is None:
199200
return False
200-
RrmsEventParticipation.objects.update_or_create(
201+
RrmsEventParticipation.objects.create(
201202
event_id=event_id,
202203
person_id=person_id,
203204
event_person_role=event_person_role,
204-
defaults={
205-
"event_name": data.get("event_name"),
206-
"event_type": data.get("event_type"),
207-
"event_scale_type": data.get("event_scale_type"),
208-
"event_from": data.get("event_from"),
209-
"event_to": data.get("event_to"),
210-
"participant_start": data.get("participant_start"),
211-
"participant_end": data.get("participant_end"),
212-
"requested": data.get("requested"),
213-
"event_organization_id": data.get("event_organization_id"),
214-
"event_organization_name": data.get("event_organization_name"),
215-
"venue": data.get("venue"),
216-
"tags_json": data.get("tags_json"),
217-
},
205+
event_name=data.get("event_name"),
206+
event_type=data.get("event_type"),
207+
event_scale_type=data.get("event_scale_type"),
208+
event_from=data.get("event_from"),
209+
event_to=data.get("event_to"),
210+
participant_start=data.get("participant_start"),
211+
participant_end=data.get("participant_end"),
212+
requested=data.get("requested"),
213+
event_organization_id=data.get("event_organization_id"),
214+
event_organization_name=data.get("event_organization_name"),
215+
venue=data.get("venue"),
216+
tags_json=data.get("tags_json"),
218217
)
219218
return True
220219
except Exception as ex:
@@ -380,6 +379,7 @@ def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None):
380379
return {
381380
"molnix_id": appraisal.get("id"),
382381
"target_id": appraisal.get("target_id"),
382+
"appraised_person_id": deployment.get("person_id"),
383383
"deployment_molnix_id": deployment.get("id"),
384384
"stage": appraisal.get("stage"),
385385
"appraisers_count": appraisal.get("appraisers_count"),
@@ -397,12 +397,12 @@ def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None):
397397
}
398398

399399

400-
def normalize_appraiser(appraiser):
400+
def normalize_appraiser(appraiser, appraisal_molnix_id=None):
401401
if not isinstance(appraiser, dict):
402402
return {}
403403
return {
404404
"molnix_id": appraiser.get("id"),
405-
"appraisal_molnix_id": appraiser.get("appraisal_id"),
405+
"appraisal_molnix_id": appraiser.get("appraisal_id") or appraisal_molnix_id,
406406
"appraiser_type": appraiser.get("appraiser_type"),
407407
"person_id": appraiser.get("person_id"),
408408
"required": appraiser.get("required"),
@@ -415,18 +415,26 @@ def normalize_appraiser(appraiser):
415415

416416
def normalize_event_participation(event, org_lookup, country_lookup):
417417
if not isinstance(event, dict):
418-
return []
418+
return [], 0, []
419419
org_id, org_name = normalize_org(event.get("organization"), org_lookup, country_lookup)
420420
people = event.get("person") if isinstance(event.get("person"), list) else []
421421
records = []
422+
mismatch_count = 0
423+
mismatch_samples = []
422424
for person in people:
423425
if not isinstance(person, dict):
424426
continue
425427
pivot = person.get("pivot") if isinstance(person.get("pivot"), dict) else {}
428+
pivot_person_id = pivot.get("person_id")
429+
person_id = person.get("id")
430+
if pivot_person_id is not None and person_id is not None and pivot_person_id != person_id:
431+
mismatch_count += 1
432+
if len(mismatch_samples) < 25:
433+
mismatch_samples.append((event.get("id"), person_id, pivot_person_id))
426434
record = {
427435
"event_id": event.get("id"),
428436
"event_name": event.get("name"),
429-
"person_id": person.get("id"),
437+
"person_id": person_id,
430438
"event_person_role": pivot.get("role"),
431439
"event_type": event.get("event_type"),
432440
"event_scale_type": event.get("type"),
@@ -441,17 +449,19 @@ def normalize_event_participation(event, org_lookup, country_lookup):
441449
"tags_json": event.get("tags"),
442450
}
443451
records.append(record)
444-
return records
452+
return records, mismatch_count, mismatch_samples
445453

446454

447455
def handle_person_ids(molnix, person_ids, org_lookup, country_lookup, stdout, db_write_counts):
448456
person_snapshot_cache = {}
457+
saved_ids = set()
449458
for person_id in person_ids:
450459
cached_snapshot = person_snapshot_cache.get(person_id)
451460
if cached_snapshot is not None:
452461
output_record(stdout, {"record_type": "rrms_person_snapshot", "data": cached_snapshot})
453462
if write_record("rrms_person_snapshot", cached_snapshot):
454463
db_write_counts["rrms_person_snapshot"] += 1
464+
saved_ids.add(person_id)
455465
continue
456466
log_debug(1, "Fetching person_id %s" % person_id)
457467
person_data = safe_call_api(molnix, path="people/%s" % person_id, label="people/%s" % person_id)
@@ -477,6 +487,8 @@ def handle_person_ids(molnix, person_ids, org_lookup, country_lookup, stdout, db
477487
output_record(stdout, {"record_type": "rrms_person_snapshot", "data": filtered_person_data})
478488
if write_record("rrms_person_snapshot", filtered_person_data):
479489
db_write_counts["rrms_person_snapshot"] += 1
490+
saved_ids.add(person_id)
491+
return saved_ids
480492

481493

482494
class Command(BaseCommand):
@@ -505,6 +517,11 @@ def handle(self, *args, **options):
505517
appraisals_stream_count = 0
506518
appraisers_stream_count = 0
507519
events_stream_count = 0
520+
appraisal_ids = set()
521+
appraisal_duplicate_count = 0
522+
appraised_person_ids = set()
523+
appraised_person_null_count = 0
524+
appraiser_parent_ids = set()
508525
db_write_counts = {
509526
"molnix_appraisal": 0,
510527
"molnix_appraiser": 0,
@@ -542,7 +559,7 @@ def handle(self, *args, **options):
542559
for appraisal in appraisals:
543560
if not isinstance(appraisal, dict):
544561
continue
545-
appraisal_payload = appraisal.get("appraisal")
562+
appraisal_payload = appraisal.get("appraisal") if isinstance(appraisal.get("appraisal"), dict) else None
546563
deployment_id = appraisal_payload.get("deployment", {}).get("id") if isinstance(appraisal_payload, dict) else None
547564
sending_org_id, receiving_org_id = fetch_deployment_org_ids(
548565
molnix,
@@ -551,19 +568,50 @@ def handle(self, *args, **options):
551568
org_lookup,
552569
country_lookup,
553570
)
554-
appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id)
555-
if appraisal_data:
556-
output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data})
557-
if write_record("molnix_appraisal", appraisal_data):
558-
db_write_counts["molnix_appraisal"] += 1
559-
appraisals_stream_count += 1
560-
appraiser_data = normalize_appraiser(appraisal)
571+
appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id)
572+
if appraisal_data:
573+
appraisal_id = appraisal_data.get("molnix_id")
574+
if appraisal_id is not None:
575+
if appraisal_id in appraisal_ids:
576+
appraisal_duplicate_count += 1
577+
appraisal_ids.add(appraisal_id)
578+
appraised_person_id = appraisal_data.get("appraised_person_id")
579+
if appraised_person_id is None:
580+
appraised_person_null_count += 1
581+
else:
582+
appraised_person_ids.add(appraised_person_id)
583+
output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data})
584+
if write_record("molnix_appraisal", appraisal_data):
585+
db_write_counts["molnix_appraisal"] += 1
586+
appraisals_stream_count += 1
587+
if appraisal_data.get("appraised_person_id") is not None:
588+
person_ids.append(appraisal_data.get("appraised_person_id"))
589+
appraiser_payloads = []
590+
if isinstance(appraisal_payload, dict) and isinstance(appraisal_payload.get("appraisers"), list):
591+
appraiser_payloads = appraisal_payload.get("appraisers")
592+
if appraiser_payloads:
593+
for appraiser_payload in appraiser_payloads:
594+
appraiser_data = normalize_appraiser(appraiser_payload, appraisal_data.get("molnix_id"))
595+
if appraiser_data:
596+
appraiser_parent_id = appraiser_data.get("appraisal_molnix_id")
597+
if appraiser_parent_id is not None:
598+
appraiser_parent_ids.add(appraiser_parent_id)
599+
output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data})
600+
if write_record("molnix_appraiser", appraiser_data):
601+
db_write_counts["molnix_appraiser"] += 1
602+
appraisers_stream_count += 1
603+
collect_person_ids([appraiser_data], person_ids)
604+
else:
605+
appraiser_data = normalize_appraiser(appraisal, appraisal_data.get("molnix_id"))
561606
if appraiser_data:
607+
appraiser_parent_id = appraiser_data.get("appraisal_molnix_id")
608+
if appraiser_parent_id is not None:
609+
appraiser_parent_ids.add(appraiser_parent_id)
562610
output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data})
563611
if write_record("molnix_appraiser", appraiser_data):
564612
db_write_counts["molnix_appraiser"] += 1
565613
appraisers_stream_count += 1
566-
collect_person_ids([appraiser_data], person_ids)
614+
collect_person_ids([appraiser_data], person_ids)
567615
total += 1
568616
if not should_continue(data, appraisals):
569617
log_debug(1, "Pagination indicates no more pages")
@@ -572,6 +620,10 @@ def handle(self, *args, **options):
572620

573621
event_page = 1
574622
seen_event_ids = set()
623+
duplicate_event_keys = {}
624+
duplicate_event_samples = []
625+
event_person_mismatch_count = 0
626+
event_person_mismatch_samples = []
575627
while True:
576628
log_debug(1, "Fetching events page %d" % event_page)
577629
events_payload = safe_call_api(
@@ -610,8 +662,23 @@ def handle(self, *args, **options):
610662
else:
611663
should_fetch_next = should_continue(events_payload, events)
612664
for event in events:
613-
records = normalize_event_participation(event, org_lookup, country_lookup)
665+
records, mismatch_count, mismatch_samples = normalize_event_participation(event, org_lookup, country_lookup)
666+
event_person_mismatch_count += mismatch_count
667+
if mismatch_samples and len(event_person_mismatch_samples) < 25:
668+
remaining = 25 - len(event_person_mismatch_samples)
669+
event_person_mismatch_samples.extend(mismatch_samples[:remaining])
614670
for record in records:
671+
event_key = (
672+
record.get("event_id"),
673+
record.get("person_id"),
674+
record.get("event_person_role"),
675+
)
676+
if event_key in duplicate_event_keys:
677+
duplicate_event_keys[event_key] += 1
678+
if len(duplicate_event_samples) < 25:
679+
duplicate_event_samples.append(event_key)
680+
else:
681+
duplicate_event_keys[event_key] = 1
615682
output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record})
616683
if write_record("rrms_event_participation", record):
617684
db_write_counts["rrms_event_participation"] += 1
@@ -631,8 +698,12 @@ def handle(self, *args, **options):
631698
"Collected %d appraisal person_id values and %d event person_id values"
632699
% (len(appraisal_person_ids), len(event_person_ids)),
633700
)
634-
handle_person_ids(molnix, appraisal_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts)
635-
handle_person_ids(molnix, event_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts)
701+
saved_appraisal_person_ids = handle_person_ids(
702+
molnix, appraisal_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts
703+
)
704+
saved_event_person_ids = handle_person_ids(
705+
molnix, event_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts
706+
)
636707
# log_debug(1, "Smoke test: response_capacity endpoint")
637708
# response_capacity_data = molnix.call_api(path="response_capacity")
638709
# self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True))
@@ -657,6 +728,48 @@ def handle(self, *args, **options):
657728
db_write_counts["rrms_person_snapshot"],
658729
)
659730
)
731+
if appraisal_duplicate_count:
732+
logger.warning("Duplicate appraisal molnix_id values observed: %d" % appraisal_duplicate_count)
733+
if appraised_person_null_count:
734+
logger.warning("Null appraisal appraised_person_id values observed: %d" % appraised_person_null_count)
735+
orphan_appraiser_parents = appraiser_parent_ids.difference(appraisal_ids)
736+
if orphan_appraiser_parents:
737+
logger.warning("Appraiser parent appraisal ids missing in appraisals: %d" % len(orphan_appraiser_parents))
738+
orphan_appraised_person_ids = appraised_person_ids.difference(saved_appraisal_person_ids)
739+
if orphan_appraised_person_ids:
740+
logger.warning("Appraised person ids missing after refresh: %d" % len(orphan_appraised_person_ids))
741+
orphan_event_person_ids = set(event_person_ids).difference(saved_event_person_ids)
742+
if orphan_event_person_ids:
743+
logger.warning("Event person ids missing after refresh: %d" % len(orphan_event_person_ids))
744+
if event_person_mismatch_count:
745+
logger.warning("Event person id mismatch rows observed: %d" % event_person_mismatch_count)
746+
if event_person_mismatch_samples:
747+
logger.warning("Event person id mismatch samples: %s" % (event_person_mismatch_samples,))
748+
duplicate_event_rows = sum(count - 1 for count in duplicate_event_keys.values() if count > 1)
749+
if duplicate_event_rows:
750+
logger.warning(
751+
"RRMS event participation duplicates observed: %d duplicate rows across %d keys"
752+
% (duplicate_event_rows, len([count for count in duplicate_event_keys.values() if count > 1]))
753+
)
754+
if duplicate_event_samples:
755+
logger.warning("Duplicate event keys (sample): %s" % (duplicate_event_samples,))
756+
try:
757+
personnel_qs = Personnel.objects.filter(molnix_id__isnull=False)
758+
personnel_total = personnel_qs.count()
759+
personnel_molnix_ids = list(personnel_qs.values_list("molnix_id", flat=True).distinct())
760+
resolved_personnel = (
761+
RrmsPersonSnapshot.objects.filter(person_id__in=personnel_molnix_ids)
762+
.values_list("person_id", flat=True)
763+
.distinct()
764+
.count()
765+
)
766+
unresolved_personnel = len(personnel_molnix_ids) - resolved_personnel
767+
logger.info(
768+
"Personnel molnix_id bridge: total=%d distinct=%d resolved=%d unresolved=%d"
769+
% (personnel_total, len(personnel_molnix_ids), resolved_personnel, unresolved_personnel)
770+
)
771+
except Exception as ex:
772+
logger.error("Failed to compute personnel molnix_id bridge stats: %s" % str(ex))
660773
if OUTPUT == 2:
661774
self.stdout.write("Completed DB-only run.")
662775
molnix.logout()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Generated by Django 4.2.29 on 2026-05-11 00:00
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("deployments", "0094_erureadinesstype_ns_contribution"),
10+
("deployments", "0094_molnixappraisal_molnixappraiser_rrmspersonsnapshot_and_more"),
11+
]
12+
13+
operations = [
14+
migrations.AddField(
15+
model_name="molnixappraisal",
16+
name="appraised_person_id",
17+
field=models.BigIntegerField(blank=True, null=True),
18+
),
19+
migrations.AddIndex(
20+
model_name="molnixappraisal",
21+
index=models.Index(fields=["appraised_person_id"], name="molnix_appraised_person_idx"),
22+
),
23+
migrations.RemoveConstraint(
24+
model_name="rrmseventparticipation",
25+
name="rrms_event_person_role_uniq",
26+
),
27+
]

deployments/models.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ def __str__(self):
314314
class MolnixAppraisal(models.Model):
315315
molnix_id = models.BigIntegerField(unique=True)
316316
target_id = models.BigIntegerField()
317+
appraised_person_id = models.BigIntegerField(null=True, blank=True)
317318
deployment_molnix_id = models.BigIntegerField(null=True, blank=True)
318319
stage = models.CharField(max_length=64, null=True, blank=True)
319320
appraisers_count = models.IntegerField(null=True, blank=True)
@@ -338,6 +339,7 @@ class MolnixAppraisal(models.Model):
338339

339340
class Meta:
340341
indexes = [
342+
models.Index(fields=["appraised_person_id"], name="molnix_appraised_person_idx"),
341343
models.Index(fields=["target_id"], name="molnix_app_target_idx"),
342344
models.Index(fields=["personnel"], name="molnix_app_personnel_idx"),
343345
models.Index(fields=["updated_at"], name="molnix_app_updated_idx"),
@@ -409,12 +411,6 @@ class RrmsEventParticipation(models.Model):
409411

410412
class Meta:
411413
db_table = "rrms_event_participation"
412-
constraints = [
413-
models.UniqueConstraint(
414-
fields=["event_id", "person_id", "event_person_role"],
415-
name="rrms_event_person_role_uniq",
416-
)
417-
]
418414
indexes = [
419415
models.Index(fields=["event_id"], name="rrms_event_id_idx"),
420416
models.Index(fields=["person_id"], name="rrms_event_person_idx"),

0 commit comments

Comments
 (0)