From 75613660bf14b0957d1309f6893a947db1ef2c2f Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Wed, 25 Feb 2026 12:14:16 +0100 Subject: [PATCH 01/10] =?UTF-8?q?Sync=20Molnix=20appraisals=20=E2=80=93=20?= =?UTF-8?q?v0.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commands/sync_molnix_appraisals.py | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 api/management/commands/sync_molnix_appraisals.py diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py new file mode 100644 index 000000000..94f826ed8 --- /dev/null +++ b/api/management/commands/sync_molnix_appraisals.py @@ -0,0 +1,101 @@ +import json + +from django.conf import settings +from django.core.management.base import BaseCommand + +from api.logger import logger +from api.molnix_utils import MolnixApi + +DEBUG_LEVEL = 2 # Set to 0 for no debug, higher numbers for more verbose debug output + + +def extract_appraisals(payload): + if isinstance(payload, list): + return payload + if not isinstance(payload, dict): + return [] + if "original" in payload and isinstance(payload["original"], dict): + original = payload["original"] + if "data" in original and isinstance(original["data"], list): + return original["data"] + if "appraisals" in payload: + appraisals = payload["appraisals"] + if isinstance(appraisals, dict) and "data" in appraisals: + return appraisals["data"] + if isinstance(appraisals, list): + return appraisals + if "data" in payload and isinstance(payload["data"], list): + return payload["data"] + return [] + + +def should_continue(payload, appraisals): + if not appraisals: + return False + if not isinstance(payload, dict): + return True + original = payload.get("original") + if isinstance(original, dict): + if original.get("next_page_url") in ("", None, False): + return False + current_page = original.get("current_page") + last_page = original.get("last_page") + if isinstance(current_page, int) and isinstance(last_page, int) and current_page >= last_page: + return False + if "next" in payload and not payload["next"]: + return False + return True + + +def log_debug(level, message): + if DEBUG_LEVEL >= level: + logger.info("[debug-%d] %s" % (level, message)) + + +class Command(BaseCommand): + help = "Fetch and print Molnix appraisals" + + def handle(self, *args, **options): + logger.info("Starting Sync Molnix Appraisals job") + molnix = MolnixApi(url=settings.MOLNIX_API_BASE, username=settings.MOLNIX_USERNAME, password=settings.MOLNIX_PASSWORD) + try: + molnix.login() + logger.info("Logged into Molnix") + except Exception as ex: + logger.error("Failed to login to Molnix API: %s" % str(ex)) + return + + page = 1 + total = 0 + while True: + log_debug(1, "Fetching page %d" % page) + data = molnix.call_api(path="appraisals", params={"page": page}) + appraisals = extract_appraisals(data) + if isinstance(data, dict): + original = data.get("original") if isinstance(data.get("original"), dict) else {} + log_debug( + 1, + "Pagination current=%s last=%s next_url=%s count=%d" + % ( + original.get("current_page"), + original.get("last_page"), + original.get("next_page_url"), + len(appraisals), + ), + ) + log_debug(2, "Top-level keys: %s" % sorted(list(data.keys()))) + if original: + log_debug(2, "Original keys: %s" % sorted(list(original.keys()))) + if not appraisals: + log_debug(1, "No appraisals returned, stopping") + break + for appraisal in appraisals: + self.stdout.write(json.dumps(appraisal, indent=2, sort_keys=True)) + total += 1 + if not should_continue(data, appraisals): + log_debug(1, "Pagination indicates no more pages") + break + page += 1 + + logger.info("Printed %d appraisals" % total) + molnix.logout() From 86725980a725195d615b0dd1fb037d399813d42e Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Wed, 25 Feb 2026 20:31:31 +0100 Subject: [PATCH 02/10] =?UTF-8?q?Sync=20Molnix=20appraisals=20=E2=80=93=20?= =?UTF-8?q?v0.2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commands/sync_molnix_appraisals.py | 75 ++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 94f826ed8..56cd5066c 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -52,6 +52,63 @@ def log_debug(level, message): logger.info("[debug-%d] %s" % (level, message)) +def remove_tags_and_deployments(value, parent_key=None): + if isinstance(value, dict): + cleaned = {} + for key, item in value.items(): + if key == "tags": + continue + if parent_key == "appraisal" and key == "deployment": + continue + cleaned[key] = remove_tags_and_deployments(item, key) + return cleaned + if isinstance(value, list): + return [remove_tags_and_deployments(item, parent_key) for item in value] + return value + + +def collect_person_ids(value, collected): + if isinstance(value, dict): + for key, item in value.items(): + if key == "person_id": + collected.append(item) + else: + collect_person_ids(item, collected) + return + if isinstance(value, list): + for item in value: + collect_person_ids(item, collected) + + +def find_person_payload(value): + # if isinstance(value, dict): + if any(key in value for key in ("sex", "fullname", "organization", "current_availability")): + return value + for item in value.values(): + found = find_person_payload(item) + if found is not None: + return found + # occured never: + # if isinstance(value, list): + # for item in value: + # found = find_person_payload(item) + # if found is not None: + # return found + return None + + +def filter_person_data(person_data): + payload = find_person_payload(person_data) + if not isinstance(payload, dict): + return {} + return { + "sex": payload.get("sex"), + "fullname": payload.get("fullname"), + "organization": payload.get("organization"), + "current_availability": payload.get("current_availability"), + } + + class Command(BaseCommand): help = "Fetch and print Molnix appraisals" @@ -67,6 +124,7 @@ def handle(self, *args, **options): page = 1 total = 0 + person_ids = [] while True: log_debug(1, "Fetching page %d" % page) data = molnix.call_api(path="appraisals", params={"page": page}) @@ -90,12 +148,27 @@ def handle(self, *args, **options): log_debug(1, "No appraisals returned, stopping") break for appraisal in appraisals: - self.stdout.write(json.dumps(appraisal, indent=2, sort_keys=True)) + collect_person_ids(appraisal, person_ids) + cleaned_appraisal = remove_tags_and_deployments(appraisal) + self.stdout.write(json.dumps(cleaned_appraisal, indent=2, sort_keys=True)) total += 1 if not should_continue(data, appraisals): log_debug(1, "Pagination indicates no more pages") break page += 1 + unique_person_ids = sorted({pid for pid in person_ids if pid is not None}) + # self.stdout.write(json.dumps(unique_person_ids, indent=2, sort_keys=True)) + log_debug(1, "Collected %d person_id values" % len(unique_person_ids)) + for person_id in unique_person_ids: + log_debug(1, "Fetching person_id %s" % person_id) + person_data = molnix.call_api(path="people/%s" % person_id) + filtered_person_data = filter_person_data(person_data) + if not filtered_person_data: + log_debug(2, "No person payload found for person_id %s" % person_id) + self.stdout.write(json.dumps(filtered_person_data, indent=2, sort_keys=True)) + # log_debug(1, "Smoke test: response_capacity endpoint") + # response_capacity_data = molnix.call_api(path="response_capacity") + # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) logger.info("Printed %d appraisals" % total) molnix.logout() From 26310c7f060ec857c3b422755d1068b325b05b0b Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Thu, 26 Mar 2026 12:33:39 +0100 Subject: [PATCH 03/10] Create distinct molnix_appraisals and molnix_appraisers --- .../commands/sync_molnix_appraisals.py | 92 +++++++++++++++---- 1 file changed, 75 insertions(+), 17 deletions(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 56cd5066c..9336a30b6 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -7,6 +7,7 @@ from api.molnix_utils import MolnixApi DEBUG_LEVEL = 2 # Set to 0 for no debug, higher numbers for more verbose debug output +APPRAISALS_PER_PAGE = 15 def extract_appraisals(payload): @@ -67,17 +68,15 @@ def remove_tags_and_deployments(value, parent_key=None): return value -def collect_person_ids(value, collected): - if isinstance(value, dict): - for key, item in value.items(): - if key == "person_id": - collected.append(item) - else: - collect_person_ids(item, collected) +def collect_person_ids(appraiser_records, collected): + if not isinstance(appraiser_records, list): return - if isinstance(value, list): - for item in value: - collect_person_ids(item, collected) + for record in appraiser_records: + if not isinstance(record, dict): + continue + person_id = record.get("person_id") + if person_id is not None: + collected.append(person_id) def find_person_payload(value): @@ -109,6 +108,46 @@ def filter_person_data(person_data): } +def normalize_appraisal(appraisal): + if not isinstance(appraisal, dict): + return {} + cleaned = remove_tags_and_deployments(appraisal, "appraisal") + return { + "id": cleaned.get("id"), + "target_id": cleaned.get("target_id"), + "stage": cleaned.get("stage"), + "created_at": cleaned.get("created_at"), + "updated_at": cleaned.get("updated_at"), + "appraisers_count": cleaned.get("appraisers_count"), + "objectives": cleaned.get("objectives"), + "competencies": cleaned.get("competencies"), + "score": cleaned.get("score"), + "appraisers": cleaned.get("appraisers"), + } + + +def normalize_appraiser(appraiser): + if not isinstance(appraiser, dict): + return {} + cleaned = remove_tags_and_deployments(appraiser) + return { + "id": cleaned.get("id"), + "appraisal_id": cleaned.get("appraisal_id"), + "appraiser_type": cleaned.get("appraiser_type"), + "person_id": cleaned.get("person_id"), + "name": cleaned.get("name"), + "email": cleaned.get("email"), + "position_title": cleaned.get("position_title"), + "required": cleaned.get("required"), + "notified_at": cleaned.get("notified_at"), + "notification_counter": cleaned.get("notification_counter"), + "completed_at": cleaned.get("completed_at"), + "created_at": cleaned.get("created_at"), + "updated_at": cleaned.get("updated_at"), + "responses": cleaned.get("responses"), + } + + class Command(BaseCommand): help = "Fetch and print Molnix appraisals" @@ -125,9 +164,11 @@ def handle(self, *args, **options): page = 1 total = 0 person_ids = [] + appraisals_stream_count = 0 + appraisers_stream_count = 0 while True: log_debug(1, "Fetching page %d" % page) - data = molnix.call_api(path="appraisals", params={"page": page}) + data = molnix.call_api(path="appraisals", params={"page": page, "per_page": APPRAISALS_PER_PAGE}) appraisals = extract_appraisals(data) if isinstance(data, dict): original = data.get("original") if isinstance(data.get("original"), dict) else {} @@ -148,9 +189,21 @@ def handle(self, *args, **options): log_debug(1, "No appraisals returned, stopping") break for appraisal in appraisals: - collect_person_ids(appraisal, person_ids) - cleaned_appraisal = remove_tags_and_deployments(appraisal) - self.stdout.write(json.dumps(cleaned_appraisal, indent=2, sort_keys=True)) + if not isinstance(appraisal, dict): + continue + appraisal_data = normalize_appraisal(appraisal.get("appraisal")) + if appraisal_data: + self.stdout.write( + json.dumps({"record_type": "molnix_appraisal", "data": appraisal_data}, indent=2, sort_keys=True) + ) + appraisals_stream_count += 1 + appraiser_data = normalize_appraiser(appraisal) + if appraiser_data: + self.stdout.write( + json.dumps({"record_type": "molnix_appraiser", "data": appraiser_data}, indent=2, sort_keys=True) + ) + appraisers_stream_count += 1 + collect_person_ids([appraiser_data], person_ids) total += 1 if not should_continue(data, appraisals): log_debug(1, "Pagination indicates no more pages") @@ -158,7 +211,6 @@ def handle(self, *args, **options): page += 1 unique_person_ids = sorted({pid for pid in person_ids if pid is not None}) - # self.stdout.write(json.dumps(unique_person_ids, indent=2, sort_keys=True)) log_debug(1, "Collected %d person_id values" % len(unique_person_ids)) for person_id in unique_person_ids: log_debug(1, "Fetching person_id %s" % person_id) @@ -166,9 +218,15 @@ def handle(self, *args, **options): filtered_person_data = filter_person_data(person_data) if not filtered_person_data: log_debug(2, "No person payload found for person_id %s" % person_id) - self.stdout.write(json.dumps(filtered_person_data, indent=2, sort_keys=True)) + self.stdout.write( + json.dumps( + {"record_type": "molnix_person_sex", "person_id": person_id, "data": filtered_person_data}, + indent=2, + sort_keys=True, + ) + ) # log_debug(1, "Smoke test: response_capacity endpoint") # response_capacity_data = molnix.call_api(path="response_capacity") # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) - logger.info("Printed %d appraisals" % total) + logger.info("Printed %d items (appraisals=%d appraisers=%d)" % (total, appraisals_stream_count, appraisers_stream_count)) molnix.logout() From 52044e450337f5e4f367fb8207036487704b4d45 Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Fri, 10 Apr 2026 13:23:02 +0200 Subject: [PATCH 04/10] =?UTF-8?q?Sync=20Molnix=20appraisals=20=E2=80=93=20?= =?UTF-8?q?v0.3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commands/sync_molnix_appraisals.py | 353 +++++++++++++++--- 1 file changed, 292 insertions(+), 61 deletions(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 9336a30b6..6f402c34c 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -8,6 +8,7 @@ DEBUG_LEVEL = 2 # Set to 0 for no debug, higher numbers for more verbose debug output APPRAISALS_PER_PAGE = 15 +EVENTS_PER_PAGE = 15 def extract_appraisals(payload): @@ -30,6 +31,26 @@ def extract_appraisals(payload): return [] +def extract_events(payload): + if isinstance(payload, list): + return payload + if not isinstance(payload, dict): + return [] + if "original" in payload and isinstance(payload["original"], dict): + original = payload["original"] + if "data" in original and isinstance(original["data"], list): + return original["data"] + if "events" in payload: + events = payload["events"] + if isinstance(events, dict) and "data" in events: + return events["data"] + if isinstance(events, list): + return events + if "data" in payload and isinstance(payload["data"], list): + return payload["data"] + return [] + + def should_continue(payload, appraisals): if not appraisals: return False @@ -53,19 +74,92 @@ def log_debug(level, message): logger.info("[debug-%d] %s" % (level, message)) -def remove_tags_and_deployments(value, parent_key=None): +def get_deployment_payload(value): if isinstance(value, dict): - cleaned = {} - for key, item in value.items(): - if key == "tags": - continue - if parent_key == "appraisal" and key == "deployment": - continue - cleaned[key] = remove_tags_and_deployments(item, key) - return cleaned - if isinstance(value, list): - return [remove_tags_and_deployments(item, parent_key) for item in value] - return value + return value + return {} + + +def extract_list_payload(payload): + if isinstance(payload, list): + return payload + if isinstance(payload, dict) and isinstance(payload.get("data"), list): + return payload.get("data") + return [] + + +def extract_org_list(payload): + if isinstance(payload, list): + return payload + if not isinstance(payload, dict): + return [] + if "original" in payload and isinstance(payload["original"], dict): + original = payload["original"] + if isinstance(original.get("data"), list): + return original.get("data") + if isinstance(payload.get("data"), list): + return payload.get("data") + if isinstance(payload.get("organizations"), list): + return payload.get("organizations") + return [] + + +def normalize_org(value, org_lookup): + if isinstance(value, dict): + org_id = value.get("id") + org_name = value.get("name") or org_lookup.get(org_id) + return org_id, org_name + if value is None: + return None, None + org_id = value + org_name = org_lookup.get(org_id) + return org_id, org_name + + +def build_org_lookup(molnix): + try: + payload = molnix.call_api(path="system/organizations") + except Exception as ex: + logger.error("Failed to fetch organizations: %s" % str(ex)) + return {} + orgs = extract_org_list(payload) + lookup = {} + for org in orgs: + if not isinstance(org, dict): + continue + org_id = org.get("id") + org_name = org.get("name") + if org_id is not None: + lookup[org_id] = org_name + log_debug(1, "Loaded %d organizations" % len(lookup)) + return lookup + + +def safe_call_api(molnix, path, params=None, label=None): + try: + return molnix.call_api(path=path, params=params or {}) + except Exception as ex: + if label is None: + label = path + logger.error("Failed to fetch %s: %s" % (label, str(ex))) + return None + + +def fetch_deployment_org_ids(molnix, deployment_id, cache): + if deployment_id is None: + return None, None + if deployment_id in cache: + return cache[deployment_id] + payload = safe_call_api(molnix, path="deployments/%s" % deployment_id, label="deployment/%s" % deployment_id) + if not isinstance(payload, dict): + cache[deployment_id] = (None, None) + return cache[deployment_id] + sending_org = payload.get("sending_organization") + receiving_org = payload.get("receiving_organization") + sending_id = sending_org.get("id") if isinstance(sending_org, dict) else sending_org + receiving_id = receiving_org.get("id") if isinstance(receiving_org, dict) else receiving_org + cache[deployment_id] = (sending_id, receiving_id) + return cache[deployment_id] def collect_person_ids(appraiser_records, collected): @@ -80,8 +174,9 @@ def collect_person_ids(appraiser_records, collected): def find_person_payload(value): - # if isinstance(value, dict): - if any(key in value for key in ("sex", "fullname", "organization", "current_availability")): + if not isinstance(value, dict): + return None + if any(key in value for key in ("sex", "organization", "current_availability", "outofscope")): return value for item in value.values(): found = find_person_payload(item) @@ -96,58 +191,134 @@ def find_person_payload(value): return None -def filter_person_data(person_data): +def filter_person_data(person_data, org_lookup): payload = find_person_payload(person_data) if not isinstance(payload, dict): return {} + org_id, org_name = normalize_org(payload.get("organization"), org_lookup) return { "sex": payload.get("sex"), - "fullname": payload.get("fullname"), - "organization": payload.get("organization"), + "organization_id": org_id, + "organization_name": org_name, "current_availability": payload.get("current_availability"), + "outofscope": payload.get("outofscope"), } -def normalize_appraisal(appraisal): +def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None): if not isinstance(appraisal, dict): return {} - cleaned = remove_tags_and_deployments(appraisal, "appraisal") + deployment = get_deployment_payload(appraisal.get("deployment")) return { - "id": cleaned.get("id"), - "target_id": cleaned.get("target_id"), - "stage": cleaned.get("stage"), - "created_at": cleaned.get("created_at"), - "updated_at": cleaned.get("updated_at"), - "appraisers_count": cleaned.get("appraisers_count"), - "objectives": cleaned.get("objectives"), - "competencies": cleaned.get("competencies"), - "score": cleaned.get("score"), - "appraisers": cleaned.get("appraisers"), + "molnix_id": appraisal.get("id"), + "target_id": appraisal.get("target_id"), + "deployment_molnix_id": deployment.get("id"), + "stage": appraisal.get("stage"), + "appraisers_count": appraisal.get("appraisers_count"), + "score": appraisal.get("score"), + "deployment_country_id": deployment.get("country_id"), + "deployment_start": deployment.get("start"), + "deployment_end": deployment.get("end"), + "deployment_title": deployment.get("title"), + "sending_organization_id": sending_org_id, + "receiving_organization_id": receiving_org_id, + "deployment_tags_json": deployment.get("tags"), + "competencies_json": appraisal.get("competencies"), + "created_at": appraisal.get("created_at"), + "updated_at": appraisal.get("updated_at"), } def normalize_appraiser(appraiser): if not isinstance(appraiser, dict): return {} - cleaned = remove_tags_and_deployments(appraiser) return { - "id": cleaned.get("id"), - "appraisal_id": cleaned.get("appraisal_id"), - "appraiser_type": cleaned.get("appraiser_type"), - "person_id": cleaned.get("person_id"), - "name": cleaned.get("name"), - "email": cleaned.get("email"), - "position_title": cleaned.get("position_title"), - "required": cleaned.get("required"), - "notified_at": cleaned.get("notified_at"), - "notification_counter": cleaned.get("notification_counter"), - "completed_at": cleaned.get("completed_at"), - "created_at": cleaned.get("created_at"), - "updated_at": cleaned.get("updated_at"), - "responses": cleaned.get("responses"), + "molnix_id": appraiser.get("id"), + "appraisal_molnix_id": appraiser.get("appraisal_id"), + "appraiser_type": appraiser.get("appraiser_type"), + "person_id": appraiser.get("person_id"), + "required": appraiser.get("required"), + "notified_at": appraiser.get("notified_at"), + "completed_at": appraiser.get("completed_at"), + "created_at": appraiser.get("created_at"), + "updated_at": appraiser.get("updated_at"), } +def normalize_event_participation(event, org_lookup): + if not isinstance(event, dict): + return [] + org_id, org_name = normalize_org(event.get("organization"), org_lookup) + people = event.get("person") if isinstance(event.get("person"), list) else [] + records = [] + for person in people: + if not isinstance(person, dict): + continue + pivot = person.get("pivot") if isinstance(person.get("pivot"), dict) else {} + record = { + "event_id": event.get("id"), + "event_name": event.get("name"), + "person_id": person.get("id"), + "event_person_role": pivot.get("role"), + "event_type": event.get("event_type"), + "event_scale_type": event.get("type"), + "event_from": event.get("from"), + "event_to": event.get("to"), + "participant_start": pivot.get("start"), + "participant_end": pivot.get("end"), + "requested": pivot.get("requested"), + "event_organization_id": org_id, + "event_organization_name": org_name, + "venue": event.get("venue"), + "tags_json": event.get("tags"), + } + records.append(record) + return records + + +def handle_person_ids(molnix, person_ids, org_lookup, stdout): + person_snapshot_cache = {} + for person_id in person_ids: + cached_snapshot = person_snapshot_cache.get(person_id) + if cached_snapshot is not None: + stdout.write( + json.dumps( + {"record_type": "rrms_person_snapshot", "data": cached_snapshot}, + indent=2, + sort_keys=True, + ) + ) + continue + log_debug(1, "Fetching person_id %s" % person_id) + person_data = safe_call_api(molnix, path="people/%s" % person_id, label="people/%s" % person_id) + if person_data is None: + log_debug(2, "Skipping person_id %s due to people endpoint failure" % person_id) + continue + roles_payload = safe_call_api(molnix, path="people/%s/roles" % person_id, label="people/%s/roles" % person_id) + languages_payload = safe_call_api(molnix, path="people/%s/languages" % person_id, label="people/%s/languages" % person_id) + tags_payload = safe_call_api(molnix, path="people/%s/tags" % person_id, label="people/%s/tags" % person_id) + filtered_person_data = filter_person_data(person_data, org_lookup) + if not filtered_person_data: + log_debug(2, "No person payload found for person_id %s" % person_id) + filtered_person_data = {} + filtered_person_data.update( + { + "person_id": person_id, + "roles_json": extract_list_payload(roles_payload) if roles_payload is not None else [], + "languages_json": extract_list_payload(languages_payload) if languages_payload is not None else [], + "tags_json": extract_list_payload(tags_payload) if tags_payload is not None else [], + } + ) + person_snapshot_cache[person_id] = filtered_person_data + stdout.write( + json.dumps( + {"record_type": "rrms_person_snapshot", "data": filtered_person_data}, + indent=2, + sort_keys=True, + ) + ) + + class Command(BaseCommand): help = "Fetch and print Molnix appraisals" @@ -161,14 +332,24 @@ def handle(self, *args, **options): logger.error("Failed to login to Molnix API: %s" % str(ex)) return + org_lookup = build_org_lookup(molnix) + page = 1 total = 0 person_ids = [] + event_person_ids = [] appraisals_stream_count = 0 appraisers_stream_count = 0 + events_stream_count = 0 + deployment_org_cache = {} while True: log_debug(1, "Fetching page %d" % page) - data = molnix.call_api(path="appraisals", params={"page": page, "per_page": APPRAISALS_PER_PAGE}) + data = safe_call_api( + molnix, path="appraisals", params={"page": page, "per_page": APPRAISALS_PER_PAGE}, label="appraisals" + ) + if data is None: + log_debug(1, "Appraisals call failed, stopping") + break appraisals = extract_appraisals(data) if isinstance(data, dict): original = data.get("original") if isinstance(data.get("original"), dict) else {} @@ -191,7 +372,10 @@ def handle(self, *args, **options): for appraisal in appraisals: if not isinstance(appraisal, dict): continue - appraisal_data = normalize_appraisal(appraisal.get("appraisal")) + appraisal_payload = appraisal.get("appraisal") + deployment_id = appraisal_payload.get("deployment", {}).get("id") if isinstance(appraisal_payload, dict) else None + sending_org_id, receiving_org_id = fetch_deployment_org_ids(molnix, deployment_id, deployment_org_cache) + appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) if appraisal_data: self.stdout.write( json.dumps({"record_type": "molnix_appraisal", "data": appraisal_data}, indent=2, sort_keys=True) @@ -210,23 +394,70 @@ def handle(self, *args, **options): break page += 1 - unique_person_ids = sorted({pid for pid in person_ids if pid is not None}) - log_debug(1, "Collected %d person_id values" % len(unique_person_ids)) - for person_id in unique_person_ids: - log_debug(1, "Fetching person_id %s" % person_id) - person_data = molnix.call_api(path="people/%s" % person_id) - filtered_person_data = filter_person_data(person_data) - if not filtered_person_data: - log_debug(2, "No person payload found for person_id %s" % person_id) - self.stdout.write( - json.dumps( - {"record_type": "molnix_person_sex", "person_id": person_id, "data": filtered_person_data}, - indent=2, - sort_keys=True, - ) + event_page = 1 + while True: + log_debug(1, "Fetching events page %d" % event_page) + events_payload = safe_call_api( + molnix, path="events", params={"page": event_page, "per_page": EVENTS_PER_PAGE}, label="events" ) + if events_payload is None: + log_debug(1, "Events call failed, stopping") + break + events = extract_events(events_payload) + if isinstance(events_payload, dict): + original = events_payload.get("original") if isinstance(events_payload.get("original"), dict) else {} + log_debug( + 1, + "Events pagination current=%s last=%s next_url=%s count=%d" + % ( + original.get("current_page"), + original.get("last_page"), + original.get("next_page_url"), + len(events), + ), + ) + if not events: + log_debug(1, "No events returned, stopping") + break + for event in events: + records = normalize_event_participation(event, org_lookup) + for record in records: + self.stdout.write( + json.dumps( + {"record_type": "rrms_event_participation", "data": record}, + indent=2, + sort_keys=True, + ) + ) + events_stream_count += 1 + if record.get("person_id") is not None: + event_person_ids.append(record.get("person_id")) + if not should_continue(events_payload, events): + log_debug(1, "Events pagination indicates no more pages") + break + event_page += 1 + + appraisal_person_ids = sorted({pid for pid in person_ids if pid is not None}) + event_person_ids = sorted({pid for pid in event_person_ids if pid is not None}) + unique_person_ids = sorted({pid for pid in appraisal_person_ids + event_person_ids if pid is not None}) + log_debug( + 1, + "Collected %d appraisal person_id values and %d event person_id values" + % (len(appraisal_person_ids), len(event_person_ids)), + ) + handle_person_ids(molnix, appraisal_person_ids, org_lookup, self.stdout) + handle_person_ids(molnix, event_person_ids, org_lookup, self.stdout) # log_debug(1, "Smoke test: response_capacity endpoint") # response_capacity_data = molnix.call_api(path="response_capacity") # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) - logger.info("Printed %d items (appraisals=%d appraisers=%d)" % (total, appraisals_stream_count, appraisers_stream_count)) + logger.info( + "Printed %d items (appraisals=%d appraisers=%d events=%d persons=%d)" + % ( + total, + appraisals_stream_count, + appraisers_stream_count, + events_stream_count, + len(unique_person_ids), + ) + ) molnix.logout() From 4cafa675afbbc33b0b55258a61f299fc503fbd1e Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Fri, 10 Apr 2026 13:28:46 +0200 Subject: [PATCH 05/10] =?UTF-8?q?Defining=20output=20=E2=80=93=20screen/db?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commands/sync_molnix_appraisals.py | 62 ++++++++----------- 1 file changed, 26 insertions(+), 36 deletions(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 6f402c34c..a1df86aec 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -7,6 +7,7 @@ from api.molnix_utils import MolnixApi DEBUG_LEVEL = 2 # Set to 0 for no debug, higher numbers for more verbose debug output +OUTPUT = 0 # 0=print only, 1=print + DB (TODO), 2=DB only (TODO) APPRAISALS_PER_PAGE = 15 EVENTS_PER_PAGE = 15 @@ -74,6 +75,11 @@ def log_debug(level, message): logger.info("[debug-%d] %s" % (level, message)) +def output_record(stdout, payload): + if OUTPUT in (0, 1): + stdout.write(json.dumps(payload, indent=2, sort_keys=True)) + + def get_deployment_payload(value): if isinstance(value, dict): return value @@ -281,13 +287,7 @@ def handle_person_ids(molnix, person_ids, org_lookup, stdout): for person_id in person_ids: cached_snapshot = person_snapshot_cache.get(person_id) if cached_snapshot is not None: - stdout.write( - json.dumps( - {"record_type": "rrms_person_snapshot", "data": cached_snapshot}, - indent=2, - sort_keys=True, - ) - ) + output_record(stdout, {"record_type": "rrms_person_snapshot", "data": cached_snapshot}) continue log_debug(1, "Fetching person_id %s" % person_id) person_data = safe_call_api(molnix, path="people/%s" % person_id, label="people/%s" % person_id) @@ -310,13 +310,7 @@ def handle_person_ids(molnix, person_ids, org_lookup, stdout): } ) person_snapshot_cache[person_id] = filtered_person_data - stdout.write( - json.dumps( - {"record_type": "rrms_person_snapshot", "data": filtered_person_data}, - indent=2, - sort_keys=True, - ) - ) + output_record(stdout, {"record_type": "rrms_person_snapshot", "data": filtered_person_data}) class Command(BaseCommand): @@ -334,6 +328,9 @@ def handle(self, *args, **options): org_lookup = build_org_lookup(molnix) + if OUTPUT == 2: + self.stdout.write("OUTPUT=2 (DB-only mode) is selected; DB writes are not implemented yet.") + page = 1 total = 0 person_ids = [] @@ -377,15 +374,11 @@ def handle(self, *args, **options): sending_org_id, receiving_org_id = fetch_deployment_org_ids(molnix, deployment_id, deployment_org_cache) appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) if appraisal_data: - self.stdout.write( - json.dumps({"record_type": "molnix_appraisal", "data": appraisal_data}, indent=2, sort_keys=True) - ) + output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) appraisals_stream_count += 1 appraiser_data = normalize_appraiser(appraisal) if appraiser_data: - self.stdout.write( - json.dumps({"record_type": "molnix_appraiser", "data": appraiser_data}, indent=2, sort_keys=True) - ) + output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data}) appraisers_stream_count += 1 collect_person_ids([appraiser_data], person_ids) total += 1 @@ -422,13 +415,7 @@ def handle(self, *args, **options): for event in events: records = normalize_event_participation(event, org_lookup) for record in records: - self.stdout.write( - json.dumps( - {"record_type": "rrms_event_participation", "data": record}, - indent=2, - sort_keys=True, - ) - ) + output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record}) events_stream_count += 1 if record.get("person_id") is not None: event_person_ids.append(record.get("person_id")) @@ -450,14 +437,17 @@ def handle(self, *args, **options): # log_debug(1, "Smoke test: response_capacity endpoint") # response_capacity_data = molnix.call_api(path="response_capacity") # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) - logger.info( - "Printed %d items (appraisals=%d appraisers=%d events=%d persons=%d)" - % ( - total, - appraisals_stream_count, - appraisers_stream_count, - events_stream_count, - len(unique_person_ids), + if OUTPUT in (0, 1): + logger.info( + "Printed %d items (appraisals=%d appraisers=%d events=%d persons=%d)" + % ( + total, + appraisals_stream_count, + appraisers_stream_count, + events_stream_count, + len(unique_person_ids), + ) ) - ) + if OUTPUT == 2: + self.stdout.write("Completed DB-only run (writes not implemented yet).") molnix.logout() From 36bfaca7f65b5728475eecf5bf2a36ac8683d2ab Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Fri, 10 Apr 2026 14:39:28 +0200 Subject: [PATCH 06/10] Add models to fill in new Molnix data --- .../commands/sync_molnix_appraisals.py | 199 ++++++++++++++++-- ...ixappraiser_rrmspersonsnapshot_and_more.py | 163 ++++++++++++++ deployments/models.py | 150 +++++++++++++ 3 files changed, 491 insertions(+), 21 deletions(-) create mode 100644 deployments/migrations/0094_molnixappraisal_molnixappraiser_rrmspersonsnapshot_and_more.py diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index a1df86aec..902b85757 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -1,13 +1,22 @@ import json +from datetime import datetime from django.conf import settings from django.core.management.base import BaseCommand +from django.utils import timezone +from django.utils.dateparse import parse_datetime from api.logger import logger from api.molnix_utils import MolnixApi - -DEBUG_LEVEL = 2 # Set to 0 for no debug, higher numbers for more verbose debug output -OUTPUT = 0 # 0=print only, 1=print + DB (TODO), 2=DB only (TODO) +from deployments.models import ( + MolnixAppraisal, + MolnixAppraiser, + RrmsEventParticipation, + RrmsPersonSnapshot, +) + +DEBUG_LEVEL = 1 # Set to 0 for no debug, higher numbers (1 or 2) for more verbose debug output +OUTPUT = 2 # 0=print only, 1=print + DB, 2=DB only APPRAISALS_PER_PAGE = 15 EVENTS_PER_PAGE = 15 @@ -80,6 +89,116 @@ def output_record(stdout, payload): stdout.write(json.dumps(payload, indent=2, sort_keys=True)) +def normalize_datetime(value): + if value is None: + return None + if isinstance(value, str): + value = parse_datetime(value) + if isinstance(value, datetime): + if timezone.is_naive(value): + return timezone.make_aware(value, timezone.get_current_timezone()) + return value + return value + + +def write_record(record_type, data): + if OUTPUT not in (1, 2): + return False + try: + if record_type == "molnix_appraisal": + molnix_id = data.get("molnix_id") + if molnix_id is None: + return False + MolnixAppraisal.objects.update_or_create( + molnix_id=molnix_id, + defaults={ + "target_id": data.get("target_id"), + "deployment_molnix_id": data.get("deployment_molnix_id"), + "stage": data.get("stage"), + "appraisers_count": data.get("appraisers_count"), + "score": data.get("score"), + "deployment_country_id": data.get("deployment_country_id"), + "deployment_start": data.get("deployment_start"), + "deployment_end": data.get("deployment_end"), + "deployment_title": data.get("deployment_title"), + "sending_organization_id": data.get("sending_organization_id"), + "receiving_organization_id": data.get("receiving_organization_id"), + "deployment_tags_json": data.get("deployment_tags_json"), + "competencies_json": data.get("competencies_json"), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + }, + ) + return True + if record_type == "molnix_appraiser": + molnix_id = data.get("molnix_id") + if molnix_id is None: + return False + MolnixAppraiser.objects.update_or_create( + molnix_id=molnix_id, + defaults={ + "appraisal_molnix_id": data.get("appraisal_molnix_id"), + "appraiser_type": data.get("appraiser_type"), + "person_id": data.get("person_id"), + "required": data.get("required"), + "notified_at": data.get("notified_at"), + "completed_at": data.get("completed_at"), + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + }, + ) + return True + if record_type == "rrms_person_snapshot": + person_id = data.get("person_id") + if person_id is None: + return False + RrmsPersonSnapshot.objects.update_or_create( + person_id=person_id, + defaults={ + "person_status": data.get("person_status"), + "sex": data.get("sex"), + "current_availability": data.get("current_availability"), + "outofscope": data.get("outofscope"), + "organization_id": data.get("organization_id"), + "organization_name": data.get("organization_name"), + "roles_json": data.get("roles_json"), + "languages_json": data.get("languages_json"), + "tags_json": data.get("tags_json"), + "source_updated_at": data.get("source_updated_at"), + }, + ) + return True + if record_type == "rrms_event_participation": + event_id = data.get("event_id") + person_id = data.get("person_id") + event_person_role = data.get("event_person_role") + if event_id is None or person_id is None: + return False + RrmsEventParticipation.objects.update_or_create( + event_id=event_id, + person_id=person_id, + event_person_role=event_person_role, + defaults={ + "event_name": data.get("event_name"), + "event_type": data.get("event_type"), + "event_scale_type": data.get("event_scale_type"), + "event_from": data.get("event_from"), + "event_to": data.get("event_to"), + "participant_start": data.get("participant_start"), + "participant_end": data.get("participant_end"), + "requested": data.get("requested"), + "event_organization_id": data.get("event_organization_id"), + "event_organization_name": data.get("event_organization_name"), + "venue": data.get("venue"), + "tags_json": data.get("tags_json"), + }, + ) + return True + except Exception as ex: + logger.error("Failed to write %s: %s" % (record_type, str(ex))) + return False + + def get_deployment_payload(value): if isinstance(value, dict): return value @@ -208,6 +327,7 @@ def filter_person_data(person_data, org_lookup): "organization_name": org_name, "current_availability": payload.get("current_availability"), "outofscope": payload.get("outofscope"), + "source_updated_at": normalize_datetime(payload.get("updated_at")), } @@ -223,15 +343,15 @@ def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None): "appraisers_count": appraisal.get("appraisers_count"), "score": appraisal.get("score"), "deployment_country_id": deployment.get("country_id"), - "deployment_start": deployment.get("start"), - "deployment_end": deployment.get("end"), + "deployment_start": normalize_datetime(deployment.get("start")), + "deployment_end": normalize_datetime(deployment.get("end")), "deployment_title": deployment.get("title"), "sending_organization_id": sending_org_id, "receiving_organization_id": receiving_org_id, "deployment_tags_json": deployment.get("tags"), "competencies_json": appraisal.get("competencies"), - "created_at": appraisal.get("created_at"), - "updated_at": appraisal.get("updated_at"), + "created_at": normalize_datetime(appraisal.get("created_at")), + "updated_at": normalize_datetime(appraisal.get("updated_at")), } @@ -244,10 +364,10 @@ def normalize_appraiser(appraiser): "appraiser_type": appraiser.get("appraiser_type"), "person_id": appraiser.get("person_id"), "required": appraiser.get("required"), - "notified_at": appraiser.get("notified_at"), - "completed_at": appraiser.get("completed_at"), - "created_at": appraiser.get("created_at"), - "updated_at": appraiser.get("updated_at"), + "notified_at": normalize_datetime(appraiser.get("notified_at")), + "completed_at": normalize_datetime(appraiser.get("completed_at")), + "created_at": normalize_datetime(appraiser.get("created_at")), + "updated_at": normalize_datetime(appraiser.get("updated_at")), } @@ -268,10 +388,10 @@ def normalize_event_participation(event, org_lookup): "event_person_role": pivot.get("role"), "event_type": event.get("event_type"), "event_scale_type": event.get("type"), - "event_from": event.get("from"), - "event_to": event.get("to"), - "participant_start": pivot.get("start"), - "participant_end": pivot.get("end"), + "event_from": normalize_datetime(event.get("from")), + "event_to": normalize_datetime(event.get("to")), + "participant_start": normalize_datetime(pivot.get("start")), + "participant_end": normalize_datetime(pivot.get("end")), "requested": pivot.get("requested"), "event_organization_id": org_id, "event_organization_name": org_name, @@ -282,12 +402,14 @@ def normalize_event_participation(event, org_lookup): return records -def handle_person_ids(molnix, person_ids, org_lookup, stdout): +def handle_person_ids(molnix, person_ids, org_lookup, stdout, db_write_counts): person_snapshot_cache = {} for person_id in person_ids: cached_snapshot = person_snapshot_cache.get(person_id) if cached_snapshot is not None: output_record(stdout, {"record_type": "rrms_person_snapshot", "data": cached_snapshot}) + if write_record("rrms_person_snapshot", cached_snapshot): + db_write_counts["rrms_person_snapshot"] += 1 continue log_debug(1, "Fetching person_id %s" % person_id) person_data = safe_call_api(molnix, path="people/%s" % person_id, label="people/%s" % person_id) @@ -311,6 +433,8 @@ def handle_person_ids(molnix, person_ids, org_lookup, stdout): ) person_snapshot_cache[person_id] = filtered_person_data output_record(stdout, {"record_type": "rrms_person_snapshot", "data": filtered_person_data}) + if write_record("rrms_person_snapshot", filtered_person_data): + db_write_counts["rrms_person_snapshot"] += 1 class Command(BaseCommand): @@ -329,7 +453,7 @@ def handle(self, *args, **options): org_lookup = build_org_lookup(molnix) if OUTPUT == 2: - self.stdout.write("OUTPUT=2 (DB-only mode) is selected; DB writes are not implemented yet.") + self.stdout.write("OUTPUT=2 (DB-only mode) is selected.") page = 1 total = 0 @@ -338,6 +462,12 @@ def handle(self, *args, **options): appraisals_stream_count = 0 appraisers_stream_count = 0 events_stream_count = 0 + db_write_counts = { + "molnix_appraisal": 0, + "molnix_appraiser": 0, + "rrms_event_participation": 0, + "rrms_person_snapshot": 0, + } deployment_org_cache = {} while True: log_debug(1, "Fetching page %d" % page) @@ -375,10 +505,14 @@ def handle(self, *args, **options): appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) if appraisal_data: output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) + if write_record("molnix_appraisal", appraisal_data): + db_write_counts["molnix_appraisal"] += 1 appraisals_stream_count += 1 appraiser_data = normalize_appraiser(appraisal) if appraiser_data: output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data}) + if write_record("molnix_appraiser", appraiser_data): + db_write_counts["molnix_appraiser"] += 1 appraisers_stream_count += 1 collect_person_ids([appraiser_data], person_ids) total += 1 @@ -388,6 +522,7 @@ def handle(self, *args, **options): page += 1 event_page = 1 + seen_event_ids = set() while True: log_debug(1, "Fetching events page %d" % event_page) events_payload = safe_call_api( @@ -412,14 +547,26 @@ def handle(self, *args, **options): if not events: log_debug(1, "No events returned, stopping") break + page_event_ids = [event.get("id") for event in events if isinstance(event, dict) and event.get("id") is not None] + if page_event_ids and set(page_event_ids).issubset(seen_event_ids): + log_debug(1, "Events page contains only previously seen ids, stopping") + break + seen_event_ids.update(page_event_ids) + if len(events) < EVENTS_PER_PAGE: + log_debug(1, "Events page size below per_page, stopping") + should_fetch_next = False + else: + should_fetch_next = should_continue(events_payload, events) for event in events: records = normalize_event_participation(event, org_lookup) for record in records: output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record}) + if write_record("rrms_event_participation", record): + db_write_counts["rrms_event_participation"] += 1 events_stream_count += 1 if record.get("person_id") is not None: event_person_ids.append(record.get("person_id")) - if not should_continue(events_payload, events): + if not should_fetch_next: log_debug(1, "Events pagination indicates no more pages") break event_page += 1 @@ -432,8 +579,8 @@ def handle(self, *args, **options): "Collected %d appraisal person_id values and %d event person_id values" % (len(appraisal_person_ids), len(event_person_ids)), ) - handle_person_ids(molnix, appraisal_person_ids, org_lookup, self.stdout) - handle_person_ids(molnix, event_person_ids, org_lookup, self.stdout) + handle_person_ids(molnix, appraisal_person_ids, org_lookup, self.stdout, db_write_counts) + handle_person_ids(molnix, event_person_ids, org_lookup, self.stdout, db_write_counts) # log_debug(1, "Smoke test: response_capacity endpoint") # response_capacity_data = molnix.call_api(path="response_capacity") # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) @@ -448,6 +595,16 @@ def handle(self, *args, **options): len(unique_person_ids), ) ) + if OUTPUT in (1, 2): + logger.info( + "DB writes (appraisals=%d appraisers=%d events=%d persons=%d)" + % ( + db_write_counts["molnix_appraisal"], + db_write_counts["molnix_appraiser"], + db_write_counts["rrms_event_participation"], + db_write_counts["rrms_person_snapshot"], + ) + ) if OUTPUT == 2: - self.stdout.write("Completed DB-only run (writes not implemented yet).") + self.stdout.write("Completed DB-only run.") molnix.logout() diff --git a/deployments/migrations/0094_molnixappraisal_molnixappraiser_rrmspersonsnapshot_and_more.py b/deployments/migrations/0094_molnixappraisal_molnixappraiser_rrmspersonsnapshot_and_more.py new file mode 100644 index 000000000..ac6ed4878 --- /dev/null +++ b/deployments/migrations/0094_molnixappraisal_molnixappraiser_rrmspersonsnapshot_and_more.py @@ -0,0 +1,163 @@ +# Generated by Django 4.2.29 on 2026-04-10 11:39 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('deployments', '0093_sector_title_ar_sector_title_en_sector_title_es_and_more'), + ] + + operations = [ + migrations.CreateModel( + name='MolnixAppraisal', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('molnix_id', models.BigIntegerField(unique=True)), + ('target_id', models.BigIntegerField()), + ('deployment_molnix_id', models.BigIntegerField(blank=True, null=True)), + ('stage', models.CharField(blank=True, max_length=64, null=True)), + ('appraisers_count', models.IntegerField(blank=True, null=True)), + ('score', models.DecimalField(blank=True, decimal_places=3, max_digits=7, null=True)), + ('deployment_country_id', models.IntegerField(blank=True, null=True)), + ('deployment_start', models.DateTimeField(blank=True, null=True)), + ('deployment_end', models.DateTimeField(blank=True, null=True)), + ('deployment_title', models.CharField(blank=True, max_length=255, null=True)), + ('sending_organization_id', models.BigIntegerField(blank=True, null=True)), + ('receiving_organization_id', models.BigIntegerField(blank=True, null=True)), + ('deployment_tags_json', models.JSONField(blank=True, null=True)), + ('competencies_json', models.JSONField(blank=True, null=True)), + ('created_at', models.DateTimeField(blank=True, null=True)), + ('updated_at', models.DateTimeField(blank=True, null=True)), + ], + options={ + 'verbose_name': 'Molnix Appraisal', + 'verbose_name_plural': 'Molnix Appraisals', + }, + ), + migrations.CreateModel( + name='MolnixAppraiser', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('molnix_id', models.BigIntegerField(unique=True)), + ('appraisal_molnix_id', models.BigIntegerField()), + ('appraiser_type', models.CharField(blank=True, max_length=32, null=True)), + ('person_id', models.BigIntegerField(blank=True, null=True)), + ('required', models.BooleanField(blank=True, null=True)), + ('notified_at', models.DateTimeField(blank=True, null=True)), + ('completed_at', models.DateTimeField(blank=True, null=True)), + ('created_at', models.DateTimeField(blank=True, null=True)), + ('updated_at', models.DateTimeField(blank=True, null=True)), + ], + options={ + 'verbose_name': 'Molnix Appraiser', + 'verbose_name_plural': 'Molnix Appraisers', + }, + ), + migrations.CreateModel( + name='RrmsPersonSnapshot', + fields=[ + ('person_id', models.BigIntegerField(primary_key=True, serialize=False)), + ('person_status', models.CharField(blank=True, max_length=32, null=True)), + ('sex', models.CharField(blank=True, max_length=32, null=True)), + ('current_availability', models.CharField(blank=True, max_length=64, null=True)), + ('outofscope', models.BooleanField(blank=True, null=True)), + ('organization_id', models.BigIntegerField(blank=True, null=True)), + ('organization_name', models.CharField(blank=True, max_length=255, null=True)), + ('roles_json', models.JSONField(blank=True, null=True)), + ('languages_json', models.JSONField(blank=True, null=True)), + ('tags_json', models.JSONField(blank=True, null=True)), + ('source_updated_at', models.DateTimeField(blank=True, null=True)), + ('personnel', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='rrms_person_snapshots', to='deployments.personnel')), + ], + options={ + 'verbose_name': 'RRMS Person Snapshot', + 'verbose_name_plural': 'RRMS Person Snapshots', + 'db_table': 'rrms_person_snapshot', + }, + ), + migrations.CreateModel( + name='RrmsEventParticipation', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('event_id', models.BigIntegerField()), + ('event_name', models.CharField(blank=True, max_length=255, null=True)), + ('person_id', models.BigIntegerField()), + ('event_person_role', models.CharField(blank=True, max_length=128, null=True)), + ('event_type', models.CharField(blank=True, max_length=128, null=True)), + ('event_scale_type', models.CharField(blank=True, max_length=128, null=True)), + ('event_from', models.DateTimeField(blank=True, null=True)), + ('event_to', models.DateTimeField(blank=True, null=True)), + ('participant_start', models.DateTimeField(blank=True, null=True)), + ('participant_end', models.DateTimeField(blank=True, null=True)), + ('requested', models.BooleanField(blank=True, null=True)), + ('event_organization_id', models.BigIntegerField(blank=True, null=True)), + ('event_organization_name', models.CharField(blank=True, max_length=255, null=True)), + ('venue', models.CharField(blank=True, max_length=255, null=True)), + ('tags_json', models.JSONField(blank=True, null=True)), + ], + options={ + 'verbose_name': 'RRMS Event Participation', + 'verbose_name_plural': 'RRMS Event Participation', + 'db_table': 'rrms_event_participation', + 'indexes': [models.Index(fields=['event_id'], name='rrms_event_id_idx'), models.Index(fields=['person_id'], name='rrms_event_person_idx')], + }, + ), + migrations.AddConstraint( + model_name='rrmseventparticipation', + constraint=models.UniqueConstraint(fields=('event_id', 'person_id', 'event_person_role'), name='rrms_event_person_role_uniq'), + ), + migrations.AddField( + model_name='molnixappraiser', + name='appraisal', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='appraisers', to='deployments.molnixappraisal'), + ), + migrations.AddField( + model_name='molnixappraiser', + name='personnel', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='molnix_appraisers', to='deployments.personnel'), + ), + migrations.AddField( + model_name='molnixappraisal', + name='personnel', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='molnix_appraisals', to='deployments.personnel'), + ), + migrations.AddIndex( + model_name='rrmspersonsnapshot', + index=models.Index(fields=['organization_id'], name='rrms_person_org_idx'), + ), + migrations.AddIndex( + model_name='rrmspersonsnapshot', + index=models.Index(fields=['personnel'], name='rrms_personnel_idx'), + ), + migrations.AddIndex( + model_name='molnixappraiser', + index=models.Index(fields=['appraisal_molnix_id'], name='molnix_appr_mol_idx'), + ), + migrations.AddIndex( + model_name='molnixappraiser', + index=models.Index(fields=['appraisal'], name='molnix_appr_idx'), + ), + migrations.AddIndex( + model_name='molnixappraiser', + index=models.Index(fields=['person_id'], name='molnix_appr_person_idx'), + ), + migrations.AddIndex( + model_name='molnixappraiser', + index=models.Index(fields=['personnel'], name='molnix_appr_personnel_idx'), + ), + migrations.AddIndex( + model_name='molnixappraisal', + index=models.Index(fields=['target_id'], name='molnix_app_target_idx'), + ), + migrations.AddIndex( + model_name='molnixappraisal', + index=models.Index(fields=['personnel'], name='molnix_app_personnel_idx'), + ), + migrations.AddIndex( + model_name='molnixappraisal', + index=models.Index(fields=['updated_at'], name='molnix_app_updated_idx'), + ), + ] diff --git a/deployments/models.py b/deployments/models.py index 1441be0f0..172c37d99 100644 --- a/deployments/models.py +++ b/deployments/models.py @@ -276,6 +276,156 @@ def get_tags_for_category(self, molnix_category): return ", ".join(names) +@reversion.register() +class RrmsPersonSnapshot(models.Model): + person_id = models.BigIntegerField(primary_key=True) + person_status = models.CharField(max_length=32, null=True, blank=True) + sex = models.CharField(max_length=32, null=True, blank=True) + current_availability = models.CharField(max_length=64, null=True, blank=True) + outofscope = models.BooleanField(null=True, blank=True) + organization_id = models.BigIntegerField(null=True, blank=True) + organization_name = models.CharField(max_length=255, null=True, blank=True) + roles_json = JSONField(null=True, blank=True) + languages_json = JSONField(null=True, blank=True) + tags_json = JSONField(null=True, blank=True) + personnel = models.ForeignKey( + Personnel, + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="rrms_person_snapshots", + ) + source_updated_at = models.DateTimeField(null=True, blank=True) + + class Meta: + db_table = "rrms_person_snapshot" + indexes = [ + models.Index(fields=["organization_id"], name="rrms_person_org_idx"), + models.Index(fields=["personnel"], name="rrms_personnel_idx"), + ] + verbose_name = _("RRMS Person Snapshot") + verbose_name_plural = _("RRMS Person Snapshots") + + def __str__(self): + return "RRMS Person %s" % self.person_id + + +@reversion.register() +class MolnixAppraisal(models.Model): + molnix_id = models.BigIntegerField(unique=True) + target_id = models.BigIntegerField() + deployment_molnix_id = models.BigIntegerField(null=True, blank=True) + stage = models.CharField(max_length=64, null=True, blank=True) + appraisers_count = models.IntegerField(null=True, blank=True) + score = models.DecimalField(max_digits=7, decimal_places=3, null=True, blank=True) + deployment_country_id = models.IntegerField(null=True, blank=True) + deployment_start = models.DateTimeField(null=True, blank=True) + deployment_end = models.DateTimeField(null=True, blank=True) + deployment_title = models.CharField(max_length=255, null=True, blank=True) + sending_organization_id = models.BigIntegerField(null=True, blank=True) + receiving_organization_id = models.BigIntegerField(null=True, blank=True) + deployment_tags_json = JSONField(null=True, blank=True) + competencies_json = JSONField(null=True, blank=True) + personnel = models.ForeignKey( + Personnel, + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="molnix_appraisals", + ) + created_at = models.DateTimeField(null=True, blank=True) + updated_at = models.DateTimeField(null=True, blank=True) + + class Meta: + indexes = [ + models.Index(fields=["target_id"], name="molnix_app_target_idx"), + models.Index(fields=["personnel"], name="molnix_app_personnel_idx"), + models.Index(fields=["updated_at"], name="molnix_app_updated_idx"), + ] + verbose_name = _("Molnix Appraisal") + verbose_name_plural = _("Molnix Appraisals") + + def __str__(self): + return "Molnix Appraisal %s" % self.molnix_id + + +@reversion.register() +class MolnixAppraiser(models.Model): + molnix_id = models.BigIntegerField(unique=True) + appraisal_molnix_id = models.BigIntegerField() + appraisal = models.ForeignKey( + MolnixAppraisal, + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="appraisers", + ) + appraiser_type = models.CharField(max_length=32, null=True, blank=True) + person_id = models.BigIntegerField(null=True, blank=True) + personnel = models.ForeignKey( + Personnel, + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="molnix_appraisers", + ) + required = models.BooleanField(null=True, blank=True) + notified_at = models.DateTimeField(null=True, blank=True) + completed_at = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(null=True, blank=True) + updated_at = models.DateTimeField(null=True, blank=True) + + class Meta: + indexes = [ + models.Index(fields=["appraisal_molnix_id"], name="molnix_appr_mol_idx"), + models.Index(fields=["appraisal"], name="molnix_appr_idx"), + models.Index(fields=["person_id"], name="molnix_appr_person_idx"), + models.Index(fields=["personnel"], name="molnix_appr_personnel_idx"), + ] + verbose_name = _("Molnix Appraiser") + verbose_name_plural = _("Molnix Appraisers") + + def __str__(self): + return "Molnix Appraiser %s" % self.molnix_id + + +@reversion.register() +class RrmsEventParticipation(models.Model): + event_id = models.BigIntegerField() + event_name = models.CharField(max_length=255, null=True, blank=True) + person_id = models.BigIntegerField() + event_person_role = models.CharField(max_length=128, null=True, blank=True) + event_type = models.CharField(max_length=128, null=True, blank=True) + event_scale_type = models.CharField(max_length=128, null=True, blank=True) + event_from = models.DateTimeField(null=True, blank=True) + event_to = models.DateTimeField(null=True, blank=True) + participant_start = models.DateTimeField(null=True, blank=True) + participant_end = models.DateTimeField(null=True, blank=True) + requested = models.BooleanField(null=True, blank=True) + event_organization_id = models.BigIntegerField(null=True, blank=True) + event_organization_name = models.CharField(max_length=255, null=True, blank=True) + venue = models.CharField(max_length=255, null=True, blank=True) + tags_json = JSONField(null=True, blank=True) + + class Meta: + db_table = "rrms_event_participation" + constraints = [ + models.UniqueConstraint( + fields=["event_id", "person_id", "event_person_role"], + name="rrms_event_person_role_uniq", + ) + ] + indexes = [ + models.Index(fields=["event_id"], name="rrms_event_id_idx"), + models.Index(fields=["person_id"], name="rrms_event_person_idx"), + ] + verbose_name = _("RRMS Event Participation") + verbose_name_plural = _("RRMS Event Participation") + + def __str__(self): + return "RRMS Event %s Person %s" % (self.event_id, self.person_id) + + @reversion.register() class PartnerSocietyActivities(models.Model): activity = models.CharField(verbose_name=_("activity"), max_length=50) From 5020abe34d0851896f4534a3ad47a07701e89baf Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Wed, 15 Apr 2026 11:28:47 +0200 Subject: [PATCH 07/10] Country lookup based on organisation name --- .../commands/sync_molnix_appraisals.py | 82 +++++++++++++++---- 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 902b85757..966837a8d 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -7,10 +7,12 @@ from django.utils.dateparse import parse_datetime from api.logger import logger +from api.models import Country from api.molnix_utils import MolnixApi from deployments.models import ( MolnixAppraisal, MolnixAppraiser, + Personnel, RrmsEventParticipation, RrmsPersonSnapshot, ) @@ -19,6 +21,7 @@ OUTPUT = 2 # 0=print only, 1=print + DB, 2=DB only APPRAISALS_PER_PAGE = 15 EVENTS_PER_PAGE = 15 +EVENTS_LAST_PAGE_DEFAULT = 13000 def extract_appraisals(payload): @@ -109,6 +112,10 @@ def write_record(record_type, data): molnix_id = data.get("molnix_id") if molnix_id is None: return False + personnel = None + target_id = data.get("target_id") + if target_id is not None: + personnel = Personnel.objects.filter(molnix_id=target_id).first() MolnixAppraisal.objects.update_or_create( molnix_id=molnix_id, defaults={ @@ -125,6 +132,7 @@ def write_record(record_type, data): "receiving_organization_id": data.get("receiving_organization_id"), "deployment_tags_json": data.get("deployment_tags_json"), "competencies_json": data.get("competencies_json"), + "personnel": personnel, "created_at": data.get("created_at"), "updated_at": data.get("updated_at"), }, @@ -134,12 +142,25 @@ def write_record(record_type, data): molnix_id = data.get("molnix_id") if molnix_id is None: return False + appraisal = None + personnel = None + appraisal_molnix_id = data.get("appraisal_molnix_id") + if appraisal_molnix_id is not None: + appraisal = MolnixAppraisal.objects.filter(molnix_id=appraisal_molnix_id).first() + if appraisal is not None: + personnel = appraisal.personnel + if personnel is None: + person_id = data.get("person_id") + if person_id is not None: + personnel = Personnel.objects.filter(molnix_id=person_id).first() MolnixAppraiser.objects.update_or_create( molnix_id=molnix_id, defaults={ "appraisal_molnix_id": data.get("appraisal_molnix_id"), + "appraisal": appraisal, "appraiser_type": data.get("appraiser_type"), "person_id": data.get("person_id"), + "personnel": personnel, "required": data.get("required"), "notified_at": data.get("notified_at"), "completed_at": data.get("completed_at"), @@ -152,6 +173,7 @@ def write_record(record_type, data): person_id = data.get("person_id") if person_id is None: return False + personnel = Personnel.objects.filter(molnix_id=person_id).first() RrmsPersonSnapshot.objects.update_or_create( person_id=person_id, defaults={ @@ -164,6 +186,7 @@ def write_record(record_type, data): "roles_json": data.get("roles_json"), "languages_json": data.get("languages_json"), "tags_json": data.get("tags_json"), + "personnel": personnel, "source_updated_at": data.get("source_updated_at"), }, ) @@ -229,13 +252,19 @@ def extract_org_list(payload): return [] -def normalize_org(value, org_lookup): +def normalize_org(value, org_lookup, country_lookup): if isinstance(value, dict): org_id = value.get("id") org_name = value.get("name") or org_lookup.get(org_id) return org_id, org_name if value is None: return None, None + if isinstance(value, str): + key = value.strip().lower() + match = country_lookup.get(key) + if match is not None: + return match["id"], match["name"] + return None, value org_id = value org_name = org_lookup.get(org_id) return org_id, org_name @@ -260,6 +289,19 @@ def build_org_lookup(molnix): return lookup +def build_country_lookup(): + lookup = {} + for country in Country.objects.values("id", "name", "society_name"): + name = country.get("name") + if isinstance(name, str) and name.strip(): + lookup.setdefault(name.strip().lower(), {"id": country["id"], "name": name}) + society_name = country.get("society_name") + if isinstance(society_name, str) and society_name.strip(): + lookup.setdefault(society_name.strip().lower(), {"id": country["id"], "name": society_name}) + log_debug(1, "Loaded %d country name mappings" % len(lookup)) + return lookup + + def safe_call_api(molnix, path, params=None, label=None): try: return molnix.call_api(path=path, params=params or {}) @@ -270,7 +312,7 @@ def safe_call_api(molnix, path, params=None, label=None): return None -def fetch_deployment_org_ids(molnix, deployment_id, cache): +def fetch_deployment_org_ids(molnix, deployment_id, cache, org_lookup, country_lookup): if deployment_id is None: return None, None if deployment_id in cache: @@ -281,8 +323,8 @@ def fetch_deployment_org_ids(molnix, deployment_id, cache): return cache[deployment_id] sending_org = payload.get("sending_organization") receiving_org = payload.get("receiving_organization") - sending_id = sending_org.get("id") if isinstance(sending_org, dict) else sending_org - receiving_id = receiving_org.get("id") if isinstance(receiving_org, dict) else receiving_org + sending_id, _sending_name = normalize_org(sending_org, org_lookup, country_lookup) + receiving_id, _receiving_name = normalize_org(receiving_org, org_lookup, country_lookup) cache[deployment_id] = (sending_id, receiving_id) return cache[deployment_id] @@ -316,11 +358,11 @@ def find_person_payload(value): return None -def filter_person_data(person_data, org_lookup): +def filter_person_data(person_data, org_lookup, country_lookup): payload = find_person_payload(person_data) if not isinstance(payload, dict): return {} - org_id, org_name = normalize_org(payload.get("organization"), org_lookup) + org_id, org_name = normalize_org(payload.get("organization"), org_lookup, country_lookup) return { "sex": payload.get("sex"), "organization_id": org_id, @@ -371,10 +413,10 @@ def normalize_appraiser(appraiser): } -def normalize_event_participation(event, org_lookup): +def normalize_event_participation(event, org_lookup, country_lookup): if not isinstance(event, dict): return [] - org_id, org_name = normalize_org(event.get("organization"), org_lookup) + org_id, org_name = normalize_org(event.get("organization"), org_lookup, country_lookup) people = event.get("person") if isinstance(event.get("person"), list) else [] records = [] for person in people: @@ -402,7 +444,7 @@ def normalize_event_participation(event, org_lookup): return records -def handle_person_ids(molnix, person_ids, org_lookup, stdout, db_write_counts): +def handle_person_ids(molnix, person_ids, org_lookup, country_lookup, stdout, db_write_counts): person_snapshot_cache = {} for person_id in person_ids: cached_snapshot = person_snapshot_cache.get(person_id) @@ -419,7 +461,7 @@ def handle_person_ids(molnix, person_ids, org_lookup, stdout, db_write_counts): roles_payload = safe_call_api(molnix, path="people/%s/roles" % person_id, label="people/%s/roles" % person_id) languages_payload = safe_call_api(molnix, path="people/%s/languages" % person_id, label="people/%s/languages" % person_id) tags_payload = safe_call_api(molnix, path="people/%s/tags" % person_id, label="people/%s/tags" % person_id) - filtered_person_data = filter_person_data(person_data, org_lookup) + filtered_person_data = filter_person_data(person_data, org_lookup, country_lookup) if not filtered_person_data: log_debug(2, "No person payload found for person_id %s" % person_id) filtered_person_data = {} @@ -451,6 +493,7 @@ def handle(self, *args, **options): return org_lookup = build_org_lookup(molnix) + country_lookup = build_country_lookup() if OUTPUT == 2: self.stdout.write("OUTPUT=2 (DB-only mode) is selected.") @@ -501,7 +544,13 @@ def handle(self, *args, **options): continue appraisal_payload = appraisal.get("appraisal") deployment_id = appraisal_payload.get("deployment", {}).get("id") if isinstance(appraisal_payload, dict) else None - sending_org_id, receiving_org_id = fetch_deployment_org_ids(molnix, deployment_id, deployment_org_cache) + sending_org_id, receiving_org_id = fetch_deployment_org_ids( + molnix, + deployment_id, + deployment_org_cache, + org_lookup, + country_lookup, + ) appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) if appraisal_data: output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) @@ -534,12 +583,15 @@ def handle(self, *args, **options): events = extract_events(events_payload) if isinstance(events_payload, dict): original = events_payload.get("original") if isinstance(events_payload.get("original"), dict) else {} + events_last_page = original.get("last_page") + if events_last_page is None: + events_last_page = EVENTS_LAST_PAGE_DEFAULT log_debug( 1, "Events pagination current=%s last=%s next_url=%s count=%d" % ( original.get("current_page"), - original.get("last_page"), + events_last_page, original.get("next_page_url"), len(events), ), @@ -558,7 +610,7 @@ def handle(self, *args, **options): else: should_fetch_next = should_continue(events_payload, events) for event in events: - records = normalize_event_participation(event, org_lookup) + records = normalize_event_participation(event, org_lookup, country_lookup) for record in records: output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record}) if write_record("rrms_event_participation", record): @@ -579,8 +631,8 @@ def handle(self, *args, **options): "Collected %d appraisal person_id values and %d event person_id values" % (len(appraisal_person_ids), len(event_person_ids)), ) - handle_person_ids(molnix, appraisal_person_ids, org_lookup, self.stdout, db_write_counts) - handle_person_ids(molnix, event_person_ids, org_lookup, self.stdout, db_write_counts) + handle_person_ids(molnix, appraisal_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts) + handle_person_ids(molnix, event_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts) # log_debug(1, "Smoke test: response_capacity endpoint") # response_capacity_data = molnix.call_api(path="response_capacity") # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) From 85ad725ae7da2557269543a6deb587b90d50730f Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Mon, 11 May 2026 17:40:24 +0200 Subject: [PATCH 08/10] Redesign Molnix Appraisal sync --- .../commands/sync_molnix_appraisals.py | 183 ++++++++++++++---- ...rson_and_event_participation_constraint.py | 27 +++ deployments/models.py | 8 +- 3 files changed, 177 insertions(+), 41 deletions(-) create mode 100644 deployments/migrations/0095_molnix_appraisal_appraised_person_and_event_participation_constraint.py diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 966837a8d..84c87ca92 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -113,13 +113,14 @@ def write_record(record_type, data): if molnix_id is None: return False personnel = None - target_id = data.get("target_id") - if target_id is not None: - personnel = Personnel.objects.filter(molnix_id=target_id).first() + appraised_person_id = data.get("appraised_person_id") + if appraised_person_id is not None: + personnel = Personnel.objects.filter(molnix_id=appraised_person_id).first() MolnixAppraisal.objects.update_or_create( molnix_id=molnix_id, defaults={ "target_id": data.get("target_id"), + "appraised_person_id": data.get("appraised_person_id"), "deployment_molnix_id": data.get("deployment_molnix_id"), "stage": data.get("stage"), "appraisers_count": data.get("appraisers_count"), @@ -197,24 +198,22 @@ def write_record(record_type, data): event_person_role = data.get("event_person_role") if event_id is None or person_id is None: return False - RrmsEventParticipation.objects.update_or_create( + RrmsEventParticipation.objects.create( event_id=event_id, person_id=person_id, event_person_role=event_person_role, - defaults={ - "event_name": data.get("event_name"), - "event_type": data.get("event_type"), - "event_scale_type": data.get("event_scale_type"), - "event_from": data.get("event_from"), - "event_to": data.get("event_to"), - "participant_start": data.get("participant_start"), - "participant_end": data.get("participant_end"), - "requested": data.get("requested"), - "event_organization_id": data.get("event_organization_id"), - "event_organization_name": data.get("event_organization_name"), - "venue": data.get("venue"), - "tags_json": data.get("tags_json"), - }, + event_name=data.get("event_name"), + event_type=data.get("event_type"), + event_scale_type=data.get("event_scale_type"), + event_from=data.get("event_from"), + event_to=data.get("event_to"), + participant_start=data.get("participant_start"), + participant_end=data.get("participant_end"), + requested=data.get("requested"), + event_organization_id=data.get("event_organization_id"), + event_organization_name=data.get("event_organization_name"), + venue=data.get("venue"), + tags_json=data.get("tags_json"), ) return True except Exception as ex: @@ -380,6 +379,7 @@ def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None): return { "molnix_id": appraisal.get("id"), "target_id": appraisal.get("target_id"), + "appraised_person_id": deployment.get("person_id"), "deployment_molnix_id": deployment.get("id"), "stage": appraisal.get("stage"), "appraisers_count": appraisal.get("appraisers_count"), @@ -397,12 +397,12 @@ def normalize_appraisal(appraisal, sending_org_id=None, receiving_org_id=None): } -def normalize_appraiser(appraiser): +def normalize_appraiser(appraiser, appraisal_molnix_id=None): if not isinstance(appraiser, dict): return {} return { "molnix_id": appraiser.get("id"), - "appraisal_molnix_id": appraiser.get("appraisal_id"), + "appraisal_molnix_id": appraiser.get("appraisal_id") or appraisal_molnix_id, "appraiser_type": appraiser.get("appraiser_type"), "person_id": appraiser.get("person_id"), "required": appraiser.get("required"), @@ -415,18 +415,26 @@ def normalize_appraiser(appraiser): def normalize_event_participation(event, org_lookup, country_lookup): if not isinstance(event, dict): - return [] + return [], 0, [] org_id, org_name = normalize_org(event.get("organization"), org_lookup, country_lookup) people = event.get("person") if isinstance(event.get("person"), list) else [] records = [] + mismatch_count = 0 + mismatch_samples = [] for person in people: if not isinstance(person, dict): continue pivot = person.get("pivot") if isinstance(person.get("pivot"), dict) else {} + pivot_person_id = pivot.get("person_id") + person_id = person.get("id") + if pivot_person_id is not None and person_id is not None and pivot_person_id != person_id: + mismatch_count += 1 + if len(mismatch_samples) < 25: + mismatch_samples.append((event.get("id"), person_id, pivot_person_id)) record = { "event_id": event.get("id"), "event_name": event.get("name"), - "person_id": person.get("id"), + "person_id": person_id, "event_person_role": pivot.get("role"), "event_type": event.get("event_type"), "event_scale_type": event.get("type"), @@ -441,17 +449,19 @@ def normalize_event_participation(event, org_lookup, country_lookup): "tags_json": event.get("tags"), } records.append(record) - return records + return records, mismatch_count, mismatch_samples def handle_person_ids(molnix, person_ids, org_lookup, country_lookup, stdout, db_write_counts): person_snapshot_cache = {} + saved_ids = set() for person_id in person_ids: cached_snapshot = person_snapshot_cache.get(person_id) if cached_snapshot is not None: output_record(stdout, {"record_type": "rrms_person_snapshot", "data": cached_snapshot}) if write_record("rrms_person_snapshot", cached_snapshot): db_write_counts["rrms_person_snapshot"] += 1 + saved_ids.add(person_id) continue log_debug(1, "Fetching person_id %s" % person_id) person_data = safe_call_api(molnix, path="people/%s" % person_id, label="people/%s" % person_id) @@ -477,6 +487,8 @@ def handle_person_ids(molnix, person_ids, org_lookup, country_lookup, stdout, db output_record(stdout, {"record_type": "rrms_person_snapshot", "data": filtered_person_data}) if write_record("rrms_person_snapshot", filtered_person_data): db_write_counts["rrms_person_snapshot"] += 1 + saved_ids.add(person_id) + return saved_ids class Command(BaseCommand): @@ -505,6 +517,11 @@ def handle(self, *args, **options): appraisals_stream_count = 0 appraisers_stream_count = 0 events_stream_count = 0 + appraisal_ids = set() + appraisal_duplicate_count = 0 + appraised_person_ids = set() + appraised_person_null_count = 0 + appraiser_parent_ids = set() db_write_counts = { "molnix_appraisal": 0, "molnix_appraiser": 0, @@ -542,7 +559,7 @@ def handle(self, *args, **options): for appraisal in appraisals: if not isinstance(appraisal, dict): continue - appraisal_payload = appraisal.get("appraisal") + appraisal_payload = appraisal.get("appraisal") if isinstance(appraisal.get("appraisal"), dict) else None deployment_id = appraisal_payload.get("deployment", {}).get("id") if isinstance(appraisal_payload, dict) else None sending_org_id, receiving_org_id = fetch_deployment_org_ids( molnix, @@ -551,19 +568,50 @@ def handle(self, *args, **options): org_lookup, country_lookup, ) - appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) - if appraisal_data: - output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) - if write_record("molnix_appraisal", appraisal_data): - db_write_counts["molnix_appraisal"] += 1 - appraisals_stream_count += 1 - appraiser_data = normalize_appraiser(appraisal) + appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) + if appraisal_data: + appraisal_id = appraisal_data.get("molnix_id") + if appraisal_id is not None: + if appraisal_id in appraisal_ids: + appraisal_duplicate_count += 1 + appraisal_ids.add(appraisal_id) + appraised_person_id = appraisal_data.get("appraised_person_id") + if appraised_person_id is None: + appraised_person_null_count += 1 + else: + appraised_person_ids.add(appraised_person_id) + output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) + if write_record("molnix_appraisal", appraisal_data): + db_write_counts["molnix_appraisal"] += 1 + appraisals_stream_count += 1 + if appraisal_data.get("appraised_person_id") is not None: + person_ids.append(appraisal_data.get("appraised_person_id")) + appraiser_payloads = [] + if isinstance(appraisal_payload, dict) and isinstance(appraisal_payload.get("appraisers"), list): + appraiser_payloads = appraisal_payload.get("appraisers") + if appraiser_payloads: + for appraiser_payload in appraiser_payloads: + appraiser_data = normalize_appraiser(appraiser_payload, appraisal_data.get("molnix_id")) + if appraiser_data: + appraiser_parent_id = appraiser_data.get("appraisal_molnix_id") + if appraiser_parent_id is not None: + appraiser_parent_ids.add(appraiser_parent_id) + output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data}) + if write_record("molnix_appraiser", appraiser_data): + db_write_counts["molnix_appraiser"] += 1 + appraisers_stream_count += 1 + collect_person_ids([appraiser_data], person_ids) + else: + appraiser_data = normalize_appraiser(appraisal, appraisal_data.get("molnix_id")) if appraiser_data: + appraiser_parent_id = appraiser_data.get("appraisal_molnix_id") + if appraiser_parent_id is not None: + appraiser_parent_ids.add(appraiser_parent_id) output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data}) if write_record("molnix_appraiser", appraiser_data): db_write_counts["molnix_appraiser"] += 1 appraisers_stream_count += 1 - collect_person_ids([appraiser_data], person_ids) + collect_person_ids([appraiser_data], person_ids) total += 1 if not should_continue(data, appraisals): log_debug(1, "Pagination indicates no more pages") @@ -572,6 +620,10 @@ def handle(self, *args, **options): event_page = 1 seen_event_ids = set() + duplicate_event_keys = {} + duplicate_event_samples = [] + event_person_mismatch_count = 0 + event_person_mismatch_samples = [] while True: log_debug(1, "Fetching events page %d" % event_page) events_payload = safe_call_api( @@ -610,8 +662,23 @@ def handle(self, *args, **options): else: should_fetch_next = should_continue(events_payload, events) for event in events: - records = normalize_event_participation(event, org_lookup, country_lookup) + records, mismatch_count, mismatch_samples = normalize_event_participation(event, org_lookup, country_lookup) + event_person_mismatch_count += mismatch_count + if mismatch_samples and len(event_person_mismatch_samples) < 25: + remaining = 25 - len(event_person_mismatch_samples) + event_person_mismatch_samples.extend(mismatch_samples[:remaining]) for record in records: + event_key = ( + record.get("event_id"), + record.get("person_id"), + record.get("event_person_role"), + ) + if event_key in duplicate_event_keys: + duplicate_event_keys[event_key] += 1 + if len(duplicate_event_samples) < 25: + duplicate_event_samples.append(event_key) + else: + duplicate_event_keys[event_key] = 1 output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record}) if write_record("rrms_event_participation", record): db_write_counts["rrms_event_participation"] += 1 @@ -631,8 +698,12 @@ def handle(self, *args, **options): "Collected %d appraisal person_id values and %d event person_id values" % (len(appraisal_person_ids), len(event_person_ids)), ) - handle_person_ids(molnix, appraisal_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts) - handle_person_ids(molnix, event_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts) + saved_appraisal_person_ids = handle_person_ids( + molnix, appraisal_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts + ) + saved_event_person_ids = handle_person_ids( + molnix, event_person_ids, org_lookup, country_lookup, self.stdout, db_write_counts + ) # log_debug(1, "Smoke test: response_capacity endpoint") # response_capacity_data = molnix.call_api(path="response_capacity") # self.stdout.write(json.dumps(response_capacity_data, indent=2, sort_keys=True)) @@ -657,6 +728,48 @@ def handle(self, *args, **options): db_write_counts["rrms_person_snapshot"], ) ) + if appraisal_duplicate_count: + logger.warning("Duplicate appraisal molnix_id values observed: %d" % appraisal_duplicate_count) + if appraised_person_null_count: + logger.warning("Null appraisal appraised_person_id values observed: %d" % appraised_person_null_count) + orphan_appraiser_parents = appraiser_parent_ids.difference(appraisal_ids) + if orphan_appraiser_parents: + logger.warning("Appraiser parent appraisal ids missing in appraisals: %d" % len(orphan_appraiser_parents)) + orphan_appraised_person_ids = appraised_person_ids.difference(saved_appraisal_person_ids) + if orphan_appraised_person_ids: + logger.warning("Appraised person ids missing after refresh: %d" % len(orphan_appraised_person_ids)) + orphan_event_person_ids = set(event_person_ids).difference(saved_event_person_ids) + if orphan_event_person_ids: + logger.warning("Event person ids missing after refresh: %d" % len(orphan_event_person_ids)) + if event_person_mismatch_count: + logger.warning("Event person id mismatch rows observed: %d" % event_person_mismatch_count) + if event_person_mismatch_samples: + logger.warning("Event person id mismatch samples: %s" % (event_person_mismatch_samples,)) + duplicate_event_rows = sum(count - 1 for count in duplicate_event_keys.values() if count > 1) + if duplicate_event_rows: + logger.warning( + "RRMS event participation duplicates observed: %d duplicate rows across %d keys" + % (duplicate_event_rows, len([count for count in duplicate_event_keys.values() if count > 1])) + ) + if duplicate_event_samples: + logger.warning("Duplicate event keys (sample): %s" % (duplicate_event_samples,)) + try: + personnel_qs = Personnel.objects.filter(molnix_id__isnull=False) + personnel_total = personnel_qs.count() + personnel_molnix_ids = list(personnel_qs.values_list("molnix_id", flat=True).distinct()) + resolved_personnel = ( + RrmsPersonSnapshot.objects.filter(person_id__in=personnel_molnix_ids) + .values_list("person_id", flat=True) + .distinct() + .count() + ) + unresolved_personnel = len(personnel_molnix_ids) - resolved_personnel + logger.info( + "Personnel molnix_id bridge: total=%d distinct=%d resolved=%d unresolved=%d" + % (personnel_total, len(personnel_molnix_ids), resolved_personnel, unresolved_personnel) + ) + except Exception as ex: + logger.error("Failed to compute personnel molnix_id bridge stats: %s" % str(ex)) if OUTPUT == 2: self.stdout.write("Completed DB-only run.") molnix.logout() diff --git a/deployments/migrations/0095_molnix_appraisal_appraised_person_and_event_participation_constraint.py b/deployments/migrations/0095_molnix_appraisal_appraised_person_and_event_participation_constraint.py new file mode 100644 index 000000000..dbb4f5456 --- /dev/null +++ b/deployments/migrations/0095_molnix_appraisal_appraised_person_and_event_participation_constraint.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.29 on 2026-05-11 00:00 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("deployments", "0094_erureadinesstype_ns_contribution"), + ("deployments", "0094_molnixappraisal_molnixappraiser_rrmspersonsnapshot_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="molnixappraisal", + name="appraised_person_id", + field=models.BigIntegerField(blank=True, null=True), + ), + migrations.AddIndex( + model_name="molnixappraisal", + index=models.Index(fields=["appraised_person_id"], name="molnix_appraised_person_idx"), + ), + migrations.RemoveConstraint( + model_name="rrmseventparticipation", + name="rrms_event_person_role_uniq", + ), + ] diff --git a/deployments/models.py b/deployments/models.py index 172c37d99..ee3ef795b 100644 --- a/deployments/models.py +++ b/deployments/models.py @@ -314,6 +314,7 @@ def __str__(self): class MolnixAppraisal(models.Model): molnix_id = models.BigIntegerField(unique=True) target_id = models.BigIntegerField() + appraised_person_id = models.BigIntegerField(null=True, blank=True) deployment_molnix_id = models.BigIntegerField(null=True, blank=True) stage = models.CharField(max_length=64, null=True, blank=True) appraisers_count = models.IntegerField(null=True, blank=True) @@ -338,6 +339,7 @@ class MolnixAppraisal(models.Model): class Meta: indexes = [ + models.Index(fields=["appraised_person_id"], name="molnix_appraised_person_idx"), models.Index(fields=["target_id"], name="molnix_app_target_idx"), models.Index(fields=["personnel"], name="molnix_app_personnel_idx"), models.Index(fields=["updated_at"], name="molnix_app_updated_idx"), @@ -409,12 +411,6 @@ class RrmsEventParticipation(models.Model): class Meta: db_table = "rrms_event_participation" - constraints = [ - models.UniqueConstraint( - fields=["event_id", "person_id", "event_person_role"], - name="rrms_event_person_role_uniq", - ) - ] indexes = [ models.Index(fields=["event_id"], name="rrms_event_id_idx"), models.Index(fields=["person_id"], name="rrms_event_person_idx"), From 66feb0b9773108b960588616273c03a4271c8970 Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Wed, 20 May 2026 13:28:39 +0200 Subject: [PATCH 09/10] Add 4 new Molnix API endpoints --- .../commands/sync_molnix_appraisals.py | 68 +++++++------- assets | 2 +- deployments/drf_views.py | 70 +++++++++++++- deployments/filters.py | 57 ++++++++++++ deployments/serializers.py | 91 +++++++++++++++++++ main/urls.py | 8 ++ 6 files changed, 259 insertions(+), 37 deletions(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 84c87ca92..20b272398 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -568,30 +568,41 @@ def handle(self, *args, **options): org_lookup, country_lookup, ) - appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) - if appraisal_data: - appraisal_id = appraisal_data.get("molnix_id") - if appraisal_id is not None: - if appraisal_id in appraisal_ids: - appraisal_duplicate_count += 1 - appraisal_ids.add(appraisal_id) - appraised_person_id = appraisal_data.get("appraised_person_id") - if appraised_person_id is None: - appraised_person_null_count += 1 + appraisal_data = normalize_appraisal(appraisal_payload, sending_org_id, receiving_org_id) + if appraisal_data: + appraisal_id = appraisal_data.get("molnix_id") + if appraisal_id is not None: + if appraisal_id in appraisal_ids: + appraisal_duplicate_count += 1 + appraisal_ids.add(appraisal_id) + appraised_person_id = appraisal_data.get("appraised_person_id") + if appraised_person_id is None: + appraised_person_null_count += 1 + else: + appraised_person_ids.add(appraised_person_id) + output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) + if write_record("molnix_appraisal", appraisal_data): + db_write_counts["molnix_appraisal"] += 1 + appraisals_stream_count += 1 + if appraisal_data.get("appraised_person_id") is not None: + person_ids.append(appraisal_data.get("appraised_person_id")) + appraiser_payloads = [] + if isinstance(appraisal_payload, dict) and isinstance(appraisal_payload.get("appraisers"), list): + appraiser_payloads = appraisal_payload.get("appraisers") + if appraiser_payloads: + for appraiser_payload in appraiser_payloads: + appraiser_data = normalize_appraiser(appraiser_payload, appraisal_data.get("molnix_id")) + if appraiser_data: + appraiser_parent_id = appraiser_data.get("appraisal_molnix_id") + if appraiser_parent_id is not None: + appraiser_parent_ids.add(appraiser_parent_id) + output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data}) + if write_record("molnix_appraiser", appraiser_data): + db_write_counts["molnix_appraiser"] += 1 + appraisers_stream_count += 1 + collect_person_ids([appraiser_data], person_ids) else: - appraised_person_ids.add(appraised_person_id) - output_record(self.stdout, {"record_type": "molnix_appraisal", "data": appraisal_data}) - if write_record("molnix_appraisal", appraisal_data): - db_write_counts["molnix_appraisal"] += 1 - appraisals_stream_count += 1 - if appraisal_data.get("appraised_person_id") is not None: - person_ids.append(appraisal_data.get("appraised_person_id")) - appraiser_payloads = [] - if isinstance(appraisal_payload, dict) and isinstance(appraisal_payload.get("appraisers"), list): - appraiser_payloads = appraisal_payload.get("appraisers") - if appraiser_payloads: - for appraiser_payload in appraiser_payloads: - appraiser_data = normalize_appraiser(appraiser_payload, appraisal_data.get("molnix_id")) + appraiser_data = normalize_appraiser(appraisal, appraisal_data.get("molnix_id")) if appraiser_data: appraiser_parent_id = appraiser_data.get("appraisal_molnix_id") if appraiser_parent_id is not None: @@ -601,17 +612,6 @@ def handle(self, *args, **options): db_write_counts["molnix_appraiser"] += 1 appraisers_stream_count += 1 collect_person_ids([appraiser_data], person_ids) - else: - appraiser_data = normalize_appraiser(appraisal, appraisal_data.get("molnix_id")) - if appraiser_data: - appraiser_parent_id = appraiser_data.get("appraisal_molnix_id") - if appraiser_parent_id is not None: - appraiser_parent_ids.add(appraiser_parent_id) - output_record(self.stdout, {"record_type": "molnix_appraiser", "data": appraiser_data}) - if write_record("molnix_appraiser", appraiser_data): - db_write_counts["molnix_appraiser"] += 1 - appraisers_stream_count += 1 - collect_person_ids([appraiser_data], person_ids) total += 1 if not should_continue(data, appraisals): log_debug(1, "Pagination indicates no more pages") diff --git a/assets b/assets index 4f457e1b6..9758a5685 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit 4f457e1b6e7cbcf67b8fa0bd293894f8a2bd4ef3 +Subproject commit 9758a56851025cfa38023baf5466b743c522a653 diff --git a/deployments/drf_views.py b/deployments/drf_views.py index e86168385..494712db2 100644 --- a/deployments/drf_views.py +++ b/deployments/drf_views.py @@ -15,7 +15,7 @@ from drf_spectacular.utils import extend_schema from openpyxl import Workbook from rest_framework import viewsets -from rest_framework.authentication import TokenAuthentication +from rest_framework.authentication import SessionAuthentication, TokenAuthentication from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response @@ -31,7 +31,15 @@ from main.serializers import CsvListMixin from main.utils import is_tableau -from .filters import EmergencyProjectFilter, ERUOwnerFilter, ProjectFilter +from .filters import ( + EmergencyProjectFilter, + ERUOwnerFilter, + MolnixAppraisalFilter, + MolnixAppraiserFilter, + ProjectFilter, + RrmsEventParticipationFilter, + RrmsPersonSnapshotFilter, +) from .models import ( ERU, EmergencyProject, @@ -41,6 +49,8 @@ ERUReadiness, ERUReadinessType, ERUType, + MolnixAppraisal, + MolnixAppraiser, OperationTypes, PartnerSocietyDeployment, Personnel, @@ -48,6 +58,8 @@ ProgrammeTypes, Project, RegionalProject, + RrmsEventParticipation, + RrmsPersonSnapshot, Sector, Statuses, ) @@ -65,6 +77,8 @@ GlobalProjectNSOngoingProjectsStatsSerializer, GlobalProjectOverviewSerializer, MiniERUReadinessTypeSerializer, + MolnixAppraisalSerializer, + MolnixAppraiserSerializer, PartnerDeploymentSerializer, PartnerDeploymentTableauSerializer, PersonnelCsvSerializer, @@ -77,6 +91,8 @@ ProjectRegionOverviewSerializer, ProjectSerializer, RegionalProjectSerializer, + RrmsEventParticipationSerializer, + RrmsPersonSnapshotSerializer, ) from .utils import get_previous_months @@ -410,6 +426,56 @@ def get_renderer_context(self): return context +@extend_schema( + request=None, + responses=MolnixAppraisalSerializer(many=True), +) +class MolnixAppraisalViewset(viewsets.ReadOnlyModelViewSet): + authentication_classes = (SessionAuthentication, TokenAuthentication) + permission_classes = (IsAuthenticated, DenyGuestUserPermission) + queryset = MolnixAppraisal.objects.all() + serializer_class = MolnixAppraisalSerializer + filterset_class = MolnixAppraisalFilter + ordering_fields = ("updated_at", "created_at", "molnix_id", "appraised_person_id") + + +@extend_schema( + request=None, + responses=MolnixAppraiserSerializer(many=True), +) +class MolnixAppraiserViewset(viewsets.ReadOnlyModelViewSet): + authentication_classes = (SessionAuthentication, TokenAuthentication) + permission_classes = (IsAuthenticated, DenyGuestUserPermission) + queryset = MolnixAppraiser.objects.all() + serializer_class = MolnixAppraiserSerializer + filterset_class = MolnixAppraiserFilter + ordering_fields = ("updated_at", "created_at", "molnix_id", "appraisal_molnix_id") + + +@extend_schema( + request=None, + responses=RrmsPersonSnapshotSerializer(many=True), +) +class RrmsPersonSnapshotViewset(viewsets.ReadOnlyModelViewSet): + authentication_classes = (SessionAuthentication, TokenAuthentication) + permission_classes = (IsAuthenticated, DenyGuestUserPermission) + queryset = RrmsPersonSnapshot.objects.all() + serializer_class = RrmsPersonSnapshotSerializer + filterset_class = RrmsPersonSnapshotFilter + ordering_fields = ("source_updated_at", "person_id") + + +@extend_schema( + request=None, + responses=RrmsEventParticipationSerializer(many=True), +) +class RrmsEventParticipationViewset(viewsets.ReadOnlyModelViewSet): + queryset = RrmsEventParticipation.objects.all() + serializer_class = RrmsEventParticipationSerializer + filterset_class = RrmsEventParticipationFilter + ordering_fields = ("event_from", "event_to", "event_id", "person_id") + + class AggregateDeployments(APIView): """ Get aggregated data for personnel deployments diff --git a/deployments/filters.py b/deployments/filters.py index 0403414ad..f9726337c 100644 --- a/deployments/filters.py +++ b/deployments/filters.py @@ -10,9 +10,13 @@ EmergencyProjectActivitySector, ERUOwner, ERUType, + MolnixAppraisal, + MolnixAppraiser, OperationTypes, ProgrammeTypes, Project, + RrmsEventParticipation, + RrmsPersonSnapshot, Sector, SectorTag, Statuses, @@ -161,3 +165,56 @@ def qs(self): if available is not None: eru_qs = eru_qs.filter(available=available) return qs.filter(eru__in=eru_qs).distinct() + + +class MolnixAppraisalFilter(filters.FilterSet): + appraised_person_id = filters.NumberFilter(field_name="appraised_person_id", lookup_expr="exact") + molnix_id = filters.NumberFilter(field_name="molnix_id", lookup_expr="exact") + deployment_molnix_id = filters.NumberFilter(field_name="deployment_molnix_id", lookup_expr="exact") + stage = filters.CharFilter(field_name="stage", lookup_expr="exact") + + class Meta: + model = MolnixAppraisal + fields = { + "updated_at": ("exact", "gt", "gte", "lt", "lte"), + "created_at": ("exact", "gt", "gte", "lt", "lte"), + } + + +class MolnixAppraiserFilter(filters.FilterSet): + appraisal_molnix_id = filters.NumberFilter(field_name="appraisal_molnix_id", lookup_expr="exact") + person_id = filters.NumberFilter(field_name="person_id", lookup_expr="exact") + appraiser_type = filters.CharFilter(field_name="appraiser_type", lookup_expr="exact") + + class Meta: + model = MolnixAppraiser + fields = { + "updated_at": ("exact", "gt", "gte", "lt", "lte"), + "created_at": ("exact", "gt", "gte", "lt", "lte"), + } + + +class RrmsPersonSnapshotFilter(filters.FilterSet): + person_id = filters.NumberFilter(field_name="person_id", lookup_expr="exact") + organization_id = filters.NumberFilter(field_name="organization_id", lookup_expr="exact") + person_status = filters.CharFilter(field_name="person_status", lookup_expr="exact") + + class Meta: + model = RrmsPersonSnapshot + fields = { + "source_updated_at": ("exact", "gt", "gte", "lt", "lte"), + } + + +class RrmsEventParticipationFilter(filters.FilterSet): + event_id = filters.NumberFilter(field_name="event_id", lookup_expr="exact") + person_id = filters.NumberFilter(field_name="person_id", lookup_expr="exact") + event_person_role = filters.CharFilter(field_name="event_person_role", lookup_expr="exact") + event_type = filters.CharFilter(field_name="event_type", lookup_expr="exact") + + class Meta: + model = RrmsEventParticipation + fields = { + "event_from": ("exact", "gt", "gte", "lt", "lte"), + "event_to": ("exact", "gt", "gte", "lt", "lte"), + } diff --git a/deployments/serializers.py b/deployments/serializers.py index d6b26a5dd..be10677ec 100644 --- a/deployments/serializers.py +++ b/deployments/serializers.py @@ -37,6 +37,8 @@ ERUReadiness, ERUReadinessType, Event, + MolnixAppraisal, + MolnixAppraiser, MolnixTag, OperationTypes, PartnerSocietyActivities, @@ -47,6 +49,8 @@ Project, Region, RegionalProject, + RrmsEventParticipation, + RrmsPersonSnapshot, ) @@ -291,6 +295,93 @@ class Meta: ) +class MolnixAppraisalSerializer(ModelSerializer): + class Meta: + model = MolnixAppraisal + fields = ( + "id", + "molnix_id", + "target_id", + "appraised_person_id", + "deployment_molnix_id", + "stage", + "appraisers_count", + "score", + "deployment_country_id", + "deployment_start", + "deployment_end", + "deployment_title", + "sending_organization_id", + "receiving_organization_id", + "deployment_tags_json", + "competencies_json", + "personnel", + "created_at", + "updated_at", + ) + + +class MolnixAppraiserSerializer(ModelSerializer): + class Meta: + model = MolnixAppraiser + fields = ( + "id", + "molnix_id", + "appraisal_molnix_id", + "appraisal", + "appraiser_type", + "person_id", + "personnel", + "required", + "notified_at", + "completed_at", + "created_at", + "updated_at", + ) + + +class RrmsPersonSnapshotSerializer(ModelSerializer): + class Meta: + model = RrmsPersonSnapshot + fields = ( + "person_id", + "person_status", + "sex", + "current_availability", + "outofscope", + "organization_id", + "organization_name", + "roles_json", + "languages_json", + "tags_json", + "personnel", + "source_updated_at", + ) + + +class RrmsEventParticipationSerializer(ModelSerializer): + class Meta: + model = RrmsEventParticipation + fields = ( + "id", + "event_id", + "event_name", + "person_id", + "event_person_role", + "event_type", + "event_scale_type", + "event_from", + "event_to", + "participant_start", + "participant_end", + "requested", + "event_organization_id", + "event_organization_name", + "venue", + "tags_json", + ) + + class PersonnelSerializerSuper(ModelSerializer): # Superusers can see molnix_status country_from = MiniCountrySerializer(allow_null=True) diff --git a/main/urls.py b/main/urls.py index 758a8c055..71fc52c06 100644 --- a/main/urls.py +++ b/main/urls.py @@ -140,6 +140,14 @@ router.register(r"personnel_deployment", deployment_views.PersonnelDeploymentViewset, basename="personnel_deployment") router.register(r"personnel", deployment_views.PersonnelViewset, basename="personnel") +router.register(r"molnix-appraisals", deployment_views.MolnixAppraisalViewset, basename="molnix_appraisals") +router.register(r"molnix-appraisers", deployment_views.MolnixAppraiserViewset, basename="molnix_appraisers") +router.register(r"rrms-person-snapshots", deployment_views.RrmsPersonSnapshotViewset, basename="rrms_person_snapshots") +router.register( + r"rrms-event-participation", + deployment_views.RrmsEventParticipationViewset, + basename="rrms_event_participation", +) router.register(r"personnel_by_event", api_views.DeploymentsByEventViewset, basename="personnel_by_event") router.register(r"profile", api_views.ProfileViewset, basename="profile") router.register(r"project", deployment_views.ProjectViewset, basename="project") From 4c5b9e07953e4be67721cf97742c3eae0c85a14e Mon Sep 17 00:00:00 2001 From: Szabo Zoltan Date: Wed, 20 May 2026 16:12:11 +0200 Subject: [PATCH 10/10] Update RRMS event participation to upsert and log create/update counts --- .../commands/sync_molnix_appraisals.py | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/api/management/commands/sync_molnix_appraisals.py b/api/management/commands/sync_molnix_appraisals.py index 20b272398..ff683abf5 100644 --- a/api/management/commands/sync_molnix_appraisals.py +++ b/api/management/commands/sync_molnix_appraisals.py @@ -198,24 +198,26 @@ def write_record(record_type, data): event_person_role = data.get("event_person_role") if event_id is None or person_id is None: return False - RrmsEventParticipation.objects.create( + _event_participation, created = RrmsEventParticipation.objects.update_or_create( event_id=event_id, person_id=person_id, event_person_role=event_person_role, - event_name=data.get("event_name"), - event_type=data.get("event_type"), - event_scale_type=data.get("event_scale_type"), - event_from=data.get("event_from"), - event_to=data.get("event_to"), - participant_start=data.get("participant_start"), - participant_end=data.get("participant_end"), - requested=data.get("requested"), - event_organization_id=data.get("event_organization_id"), - event_organization_name=data.get("event_organization_name"), - venue=data.get("venue"), - tags_json=data.get("tags_json"), + defaults={ + "event_name": data.get("event_name"), + "event_type": data.get("event_type"), + "event_scale_type": data.get("event_scale_type"), + "event_from": data.get("event_from"), + "event_to": data.get("event_to"), + "participant_start": data.get("participant_start"), + "participant_end": data.get("participant_end"), + "requested": data.get("requested"), + "event_organization_id": data.get("event_organization_id"), + "event_organization_name": data.get("event_organization_name"), + "venue": data.get("venue"), + "tags_json": data.get("tags_json"), + }, ) - return True + return "created" if created else "updated" except Exception as ex: logger.error("Failed to write %s: %s" % (record_type, str(ex))) return False @@ -522,6 +524,8 @@ def handle(self, *args, **options): appraised_person_ids = set() appraised_person_null_count = 0 appraiser_parent_ids = set() + event_participation_created = 0 + event_participation_updated = 0 db_write_counts = { "molnix_appraisal": 0, "molnix_appraiser": 0, @@ -680,8 +684,13 @@ def handle(self, *args, **options): else: duplicate_event_keys[event_key] = 1 output_record(self.stdout, {"record_type": "rrms_event_participation", "data": record}) - if write_record("rrms_event_participation", record): + event_write_status = write_record("rrms_event_participation", record) + if event_write_status: db_write_counts["rrms_event_participation"] += 1 + if event_write_status == "created": + event_participation_created += 1 + elif event_write_status == "updated": + event_participation_updated += 1 events_stream_count += 1 if record.get("person_id") is not None: event_person_ids.append(record.get("person_id")) @@ -728,6 +737,11 @@ def handle(self, *args, **options): db_write_counts["rrms_person_snapshot"], ) ) + if event_participation_created or event_participation_updated: + logger.info( + "RRMS event participation upserts: created=%d updated=%d" + % (event_participation_created, event_participation_updated) + ) if appraisal_duplicate_count: logger.warning("Duplicate appraisal molnix_id values observed: %d" % appraisal_duplicate_count) if appraised_person_null_count: