Skip to content

Commit e96a0a2

Browse files
committed
Add models to fill in new Molnix data
1 parent fa31fbf commit e96a0a2

3 files changed

Lines changed: 491 additions & 21 deletions

File tree

api/management/commands/sync_molnix_appraisals.py

Lines changed: 178 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
import json
2+
from datetime import datetime
23

34
from django.conf import settings
45
from django.core.management.base import BaseCommand
6+
from django.utils import timezone
7+
from django.utils.dateparse import parse_datetime
58

69
from api.logger import logger
710
from api.molnix_utils import MolnixApi
8-
9-
DEBUG_LEVEL = 2 # Set to 0 for no debug, higher numbers for more verbose debug output
10-
OUTPUT = 0 # 0=print only, 1=print + DB (TODO), 2=DB only (TODO)
11+
from deployments.models import (
12+
MolnixAppraisal,
13+
MolnixAppraiser,
14+
RrmsEventParticipation,
15+
RrmsPersonSnapshot,
16+
)
17+
18+
DEBUG_LEVEL = 1 # Set to 0 for no debug, higher numbers (1 or 2) for more verbose debug output
19+
OUTPUT = 2 # 0=print only, 1=print + DB, 2=DB only
1120
APPRAISALS_PER_PAGE = 15
1221
EVENTS_PER_PAGE = 15
1322

@@ -80,6 +89,116 @@ def output_record(stdout, payload):
8089
stdout.write(json.dumps(payload, indent=2, sort_keys=True))
8190

8291

92+
def normalize_datetime(value):
93+
if value is None:
94+
return None
95+
if isinstance(value, str):
96+
value = parse_datetime(value)
97+
if isinstance(value, datetime):
98+
if timezone.is_naive(value):
99+
return timezone.make_aware(value, timezone.get_current_timezone())
100+
return value
101+
return value
102+
103+
104+
def write_record(record_type, data):
105+
if OUTPUT not in (1, 2):
106+
return False
107+
try:
108+
if record_type == "molnix_appraisal":
109+
molnix_id = data.get("molnix_id")
110+
if molnix_id is None:
111+
return False
112+
MolnixAppraisal.objects.update_or_create(
113+
molnix_id=molnix_id,
114+
defaults={
115+
"target_id": data.get("target_id"),
116+
"deployment_molnix_id": data.get("deployment_molnix_id"),
117+
"stage": data.get("stage"),
118+
"appraisers_count": data.get("appraisers_count"),
119+
"score": data.get("score"),
120+
"deployment_country_id": data.get("deployment_country_id"),
121+
"deployment_start": data.get("deployment_start"),
122+
"deployment_end": data.get("deployment_end"),
123+
"deployment_title": data.get("deployment_title"),
124+
"sending_organization_id": data.get("sending_organization_id"),
125+
"receiving_organization_id": data.get("receiving_organization_id"),
126+
"deployment_tags_json": data.get("deployment_tags_json"),
127+
"competencies_json": data.get("competencies_json"),
128+
"created_at": data.get("created_at"),
129+
"updated_at": data.get("updated_at"),
130+
},
131+
)
132+
return True
133+
if record_type == "molnix_appraiser":
134+
molnix_id = data.get("molnix_id")
135+
if molnix_id is None:
136+
return False
137+
MolnixAppraiser.objects.update_or_create(
138+
molnix_id=molnix_id,
139+
defaults={
140+
"appraisal_molnix_id": data.get("appraisal_molnix_id"),
141+
"appraiser_type": data.get("appraiser_type"),
142+
"person_id": data.get("person_id"),
143+
"required": data.get("required"),
144+
"notified_at": data.get("notified_at"),
145+
"completed_at": data.get("completed_at"),
146+
"created_at": data.get("created_at"),
147+
"updated_at": data.get("updated_at"),
148+
},
149+
)
150+
return True
151+
if record_type == "rrms_person_snapshot":
152+
person_id = data.get("person_id")
153+
if person_id is None:
154+
return False
155+
RrmsPersonSnapshot.objects.update_or_create(
156+
person_id=person_id,
157+
defaults={
158+
"person_status": data.get("person_status"),
159+
"sex": data.get("sex"),
160+
"current_availability": data.get("current_availability"),
161+
"outofscope": data.get("outofscope"),
162+
"organization_id": data.get("organization_id"),
163+
"organization_name": data.get("organization_name"),
164+
"roles_json": data.get("roles_json"),
165+
"languages_json": data.get("languages_json"),
166+
"tags_json": data.get("tags_json"),
167+
"source_updated_at": data.get("source_updated_at"),
168+
},
169+
)
170+
return True
171+
if record_type == "rrms_event_participation":
172+
event_id = data.get("event_id")
173+
person_id = data.get("person_id")
174+
event_person_role = data.get("event_person_role")
175+
if event_id is None or person_id is None:
176+
return False
177+
RrmsEventParticipation.objects.update_or_create(
178+
event_id=event_id,
179+
person_id=person_id,
180+
event_person_role=event_person_role,
181+
defaults={
182+
"event_name": data.get("event_name"),
183+
"event_type": data.get("event_type"),
184+
"event_scale_type": data.get("event_scale_type"),
185+
"event_from": data.get("event_from"),
186+
"event_to": data.get("event_to"),
187+
"participant_start": data.get("participant_start"),
188+
"participant_end": data.get("participant_end"),
189+
"requested": data.get("requested"),
190+
"event_organization_id": data.get("event_organization_id"),
191+
"event_organization_name": data.get("event_organization_name"),
192+
"venue": data.get("venue"),
193+
"tags_json": data.get("tags_json"),
194+
},
195+
)
196+
return True
197+
except Exception as ex:
198+
logger.error("Failed to write %s: %s" % (record_type, str(ex)))
199+
return False
200+
201+
83202
def get_deployment_payload(value):
84203
if isinstance(value, dict):
85204
return value
@@ -208,6 +327,7 @@ def filter_person_data(person_data, org_lookup):
208327
"organization_name": org_name,
209328
"current_availability": payload.get("current_availability"),
210329
"outofscope": payload.get("outofscope"),
330+
"source_updated_at": normalize_datetime(payload.get("updated_at")),
211331
}
212332

213333

@@ -223,15 +343,15 @@ def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None):
223343
"appraisers_count": appraisal.get("appraisers_count"),
224344
"score": appraisal.get("score"),
225345
"deployment_country_id": deployment.get("country_id"),
226-
"deployment_start": deployment.get("start"),
227-
"deployment_end": deployment.get("end"),
346+
"deployment_start": normalize_datetime(deployment.get("start")),
347+
"deployment_end": normalize_datetime(deployment.get("end")),
228348
"deployment_title": deployment.get("title"),
229349
"sending_organization_id": sending_org_id,
230350
"receiving_organization_id": receiving_org_id,
231351
"deployment_tags_json": deployment.get("tags"),
232352
"competencies_json": appraisal.get("competencies"),
233-
"created_at": appraisal.get("created_at"),
234-
"updated_at": appraisal.get("updated_at"),
353+
"created_at": normalize_datetime(appraisal.get("created_at")),
354+
"updated_at": normalize_datetime(appraisal.get("updated_at")),
235355
}
236356

237357

@@ -244,10 +364,10 @@ def normalize_appraiser(appraiser):
244364
"appraiser_type": appraiser.get("appraiser_type"),
245365
"person_id": appraiser.get("person_id"),
246366
"required": appraiser.get("required"),
247-
"notified_at": appraiser.get("notified_at"),
248-
"completed_at": appraiser.get("completed_at"),
249-
"created_at": appraiser.get("created_at"),
250-
"updated_at": appraiser.get("updated_at"),
367+
"notified_at": normalize_datetime(appraiser.get("notified_at")),
368+
"completed_at": normalize_datetime(appraiser.get("completed_at")),
369+
"created_at": normalize_datetime(appraiser.get("created_at")),
370+
"updated_at": normalize_datetime(appraiser.get("updated_at")),
251371
}
252372

253373

@@ -268,10 +388,10 @@ def normalize_event_participation(event, org_lookup):
268388
"event_person_role": pivot.get("role"),
269389
"event_type": event.get("event_type"),
270390
"event_scale_type": event.get("type"),
271-
"event_from": event.get("from"),
272-
"event_to": event.get("to"),
273-
"participant_start": pivot.get("start"),
274-
"participant_end": pivot.get("end"),
391+
"event_from": normalize_datetime(event.get("from")),
392+
"event_to": normalize_datetime(event.get("to")),
393+
"participant_start": normalize_datetime(pivot.get("start")),
394+
"participant_end": normalize_datetime(pivot.get("end")),
275395
"requested": pivot.get("requested"),
276396
"event_organization_id": org_id,
277397
"event_organization_name": org_name,
@@ -282,12 +402,14 @@ def normalize_event_participation(event, org_lookup):
282402
return records
283403

284404

285-
def handle_person_ids(molnix, person_ids, org_lookup, stdout):
405+
def handle_person_ids(molnix, person_ids, org_lookup, stdout, db_write_counts):
286406
person_snapshot_cache = {}
287407
for person_id in person_ids:
288408
cached_snapshot = person_snapshot_cache.get(person_id)
289409
if cached_snapshot is not None:
290410
output_record(stdout, {"record_type": "rrms_person_snapshot", "data": cached_snapshot})
411+
if write_record("rrms_person_snapshot", cached_snapshot):
412+
db_write_counts["rrms_person_snapshot"] += 1
291413
continue
292414
log_debug(1, "Fetching person_id %s" % person_id)
293415
person_data = safe_call_api(molnix, path="people/%s" % person_id, label="people/%s" % person_id)
@@ -311,6 +433,8 @@ def handle_person_ids(molnix, person_ids, org_lookup, stdout):
311433
)
312434
person_snapshot_cache[person_id] = filtered_person_data
313435
output_record(stdout, {"record_type": "rrms_person_snapshot", "data": filtered_person_data})
436+
if write_record("rrms_person_snapshot", filtered_person_data):
437+
db_write_counts["rrms_person_snapshot"] += 1
314438

315439

316440
class Command(BaseCommand):
@@ -329,7 +453,7 @@ def handle(self, *args, **options):
329453
org_lookup = build_org_lookup(molnix)
330454

331455
if OUTPUT == 2:
332-
self.stdout.write("OUTPUT=2 (DB-only mode) is selected; DB writes are not implemented yet.")
456+
self.stdout.write("OUTPUT=2 (DB-only mode) is selected.")
333457

334458
page = 1
335459
total = 0
@@ -338,6 +462,12 @@ def handle(self, *args, **options):
338462
appraisals_stream_count = 0
339463
appraisers_stream_count = 0
340464
events_stream_count = 0
465+
db_write_counts = {
466+
"molnix_appraisal": 0,
467+
"molnix_appraiser": 0,
468+
"rrms_event_participation": 0,
469+
"rrms_person_snapshot": 0,
470+
}
341471
deployment_org_cache = {}
342472
while True:
343473
log_debug(1, "Fetching page %d" % page)
@@ -375,10 +505,14 @@ def handle(self, *args, **options):
375505
appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id)
376506
if appraisal_data:
377507
output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data})
508+
if write_record("molnix_appraisal", appraisal_data):
509+
db_write_counts["molnix_appraisal"] += 1
378510
appraisals_stream_count += 1
379511
appraiser_data = normalize_appraiser(appraisal)
380512
if appraiser_data:
381513
output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data})
514+
if write_record("molnix_appraiser", appraiser_data):
515+
db_write_counts["molnix_appraiser"] += 1
382516
appraisers_stream_count += 1
383517
collect_person_ids([appraiser_data], person_ids)
384518
total += 1
@@ -388,6 +522,7 @@ def handle(self, *args, **options):
388522
page += 1
389523

390524
event_page = 1
525+
seen_event_ids = set()
391526
while True:
392527
log_debug(1, "Fetching events page %d" % event_page)
393528
events_payload = safe_call_api(
@@ -412,14 +547,26 @@ def handle(self, *args, **options):
412547
if not events:
413548
log_debug(1, "No events returned, stopping")
414549
break
550+
page_event_ids = [event.get("id") for event in events if isinstance(event, dict) and event.get("id") is not None]
551+
if page_event_ids and set(page_event_ids).issubset(seen_event_ids):
552+
log_debug(1, "Events page contains only previously seen ids, stopping")
553+
break
554+
seen_event_ids.update(page_event_ids)
555+
if len(events) < EVENTS_PER_PAGE:
556+
log_debug(1, "Events page size below per_page, stopping")
557+
should_fetch_next = False
558+
else:
559+
should_fetch_next = should_continue(events_payload, events)
415560
for event in events:
416561
records = normalize_event_participation(event, org_lookup)
417562
for record in records:
418563
output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record})
564+
if write_record("rrms_event_participation", record):
565+
db_write_counts["rrms_event_participation"] += 1
419566
events_stream_count += 1
420567
if record.get("person_id") is not None:
421568
event_person_ids.append(record.get("person_id"))
422-
if not should_continue(events_payload, events):
569+
if not should_fetch_next:
423570
log_debug(1, "Events pagination indicates no more pages")
424571
break
425572
event_page += 1
@@ -432,8 +579,8 @@ def handle(self, *args, **options):
432579
"Collected %d appraisal person_id values and %d event person_id values"
433580
% (len(appraisal_person_ids), len(event_person_ids)),
434581
)
435-
handle_person_ids(molnix, appraisal_person_ids, org_lookup, self.stdout)
436-
handle_person_ids(molnix, event_person_ids, org_lookup, self.stdout)
582+
handle_person_ids(molnix, appraisal_person_ids, org_lookup, self.stdout, db_write_counts)
583+
handle_person_ids(molnix, event_person_ids, org_lookup, self.stdout, db_write_counts)
437584
# log_debug(1, "Smoke test: response_capacity endpoint")
438585
# response_capacity_data = molnix.call_api(path="response_capacity")
439586
# self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True))
@@ -448,6 +595,16 @@ def handle(self, *args, **options):
448595
len(unique_person_ids),
449596
)
450597
)
598+
if OUTPUT in (1, 2):
599+
logger.info(
600+
"DB writes (appraisals=%d appraisers=%d events=%d persons=%d)"
601+
% (
602+
db_write_counts["molnix_appraisal"],
603+
db_write_counts["molnix_appraiser"],
604+
db_write_counts["rrms_event_participation"],
605+
db_write_counts["rrms_person_snapshot"],
606+
)
607+
)
451608
if OUTPUT == 2:
452-
self.stdout.write("Completed DB-only run (writes not implemented yet).")
609+
self.stdout.write("Completed DB-only run.")
453610
molnix.logout()

0 commit comments

Comments
 (0)