diff --git a/controller/choreography_trigger.py b/controller/choreography_trigger.py new file mode 100644 index 0000000..c8aef66 --- /dev/null +++ b/controller/choreography_trigger.py @@ -0,0 +1,42 @@ +import uuid +import json +from f.controller.config import DatabaseConfig +from f.controller.database import ArchiveDatabaseRepository +from f.controller.shared.otel_logging import get_logger + +logger = get_logger(__name__) + + +def main(request_data=None): + if not request_data: + return {"result": "FAILURE", "error": "Missing request_data"} + + breeder_ids = request_data.get('breeder_ids', []) + + if not breeder_ids or len(breeder_ids) < 2: + return {"result": "FAILURE", "error": "Need at least 2 breeder_ids"} + + repo = ArchiveDatabaseRepository(DatabaseConfig.ARCHIVE_DB) + repo.create_choreography_table() + + return _trigger_choreography(repo, breeder_ids) + + +def _trigger_choreography(repo, breeder_ids): + choreography_id = str(uuid.uuid4()) + + phases = [] + for breeder_id in breeder_ids: + phases.append({"observe_breeder": None, "label": "baseline"}) + phases.append({"observe_breeder": breeder_id, "label": "observe"}) + phases.append({"observe_breeder": None, "label": "recovery"}) + + repo.insert_choreography(choreography_id, breeder_ids, phases) + + logger.info(f"Triggered choreography {choreography_id} for breeders {breeder_ids}") + return { + "result": "SUCCESS", + "choreography_id": choreography_id, + "participants": breeder_ids, + "phases": len(phases) + } diff --git a/controller/database.py b/controller/database.py index c94db37..723f90c 100644 --- a/controller/database.py +++ b/controller/database.py @@ -135,6 +135,40 @@ def get_connection_url(self, breeder_id): f"{breeder_id}" ) + def create_choreography_table(self): + db_config = self.base_config.copy() + db_config['database'] = "archive_db" + + query = """ + CREATE TABLE IF NOT EXISTS interference_choreography ( + id UUID PRIMARY KEY, + participants TEXT[] NOT NULL, + phases JSONB NOT NULL, + current_phase INT DEFAULT 0, + status VARCHAR(20) DEFAULT 'running', + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() + ); + """ + + execute_query(db_config, query) + logger.info("Ensured interference_choreography table exists in archive_db") + + def insert_choreography(self, choreography_id, participants, phases): + db_config = self.base_config.copy() + db_config['database'] = "archive_db" + + participants_array = "{" + ",".join(f'"{p}"' for p in participants) + "}" + phases_json = json.dumps(phases).replace("'", "''") + + query = f""" + INSERT INTO interference_choreography (id, participants, phases, status) + VALUES ('{choreography_id}', '{participants_array}', '{phases_json}'::jsonb, 'running'); + """ + + execute_query(db_config, query) + logger.info(f"Inserted choreography {choreography_id} with participants {participants}") + class MetadataDatabaseRepository: """Repository for metadata database operations"""