Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions controller/choreography_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import uuid
import json
from f.controller.config import DatabaseConfig
from f.controller.database import ArchiveDatabaseRepository
from f.controller.shared.otel_logging import get_logger

logger = get_logger(__name__)


def main(request_data=None):
if not request_data:
return {"result": "FAILURE", "error": "Missing request_data"}

breeder_ids = request_data.get('breeder_ids', [])

if not breeder_ids or len(breeder_ids) < 2:
return {"result": "FAILURE", "error": "Need at least 2 breeder_ids"}

repo = ArchiveDatabaseRepository(DatabaseConfig.ARCHIVE_DB)
repo.create_choreography_table()

return _trigger_choreography(repo, breeder_ids)


def _trigger_choreography(repo, breeder_ids):
choreography_id = str(uuid.uuid4())

phases = []
for breeder_id in breeder_ids:
phases.append({"observe_breeder": None, "label": "baseline"})
phases.append({"observe_breeder": breeder_id, "label": "observe"})
phases.append({"observe_breeder": None, "label": "recovery"})

repo.insert_choreography(choreography_id, breeder_ids, phases)

logger.info(f"Triggered choreography {choreography_id} for breeders {breeder_ids}")
return {
"result": "SUCCESS",
"choreography_id": choreography_id,
"participants": breeder_ids,
"phases": len(phases)
}
34 changes: 34 additions & 0 deletions controller/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,40 @@ def get_connection_url(self, breeder_id):
f"{breeder_id}"
)

def create_choreography_table(self):
db_config = self.base_config.copy()
db_config['database'] = "archive_db"

query = """
CREATE TABLE IF NOT EXISTS interference_choreography (
id UUID PRIMARY KEY,
participants TEXT[] NOT NULL,
phases JSONB NOT NULL,
current_phase INT DEFAULT 0,
status VARCHAR(20) DEFAULT 'running',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
"""

execute_query(db_config, query)
logger.info("Ensured interference_choreography table exists in archive_db")

def insert_choreography(self, choreography_id, participants, phases):
db_config = self.base_config.copy()
db_config['database'] = "archive_db"

participants_array = "{" + ",".join(f'"{p}"' for p in participants) + "}"
phases_json = json.dumps(phases).replace("'", "''")

query = f"""
INSERT INTO interference_choreography (id, participants, phases, status)
VALUES ('{choreography_id}', '{participants_array}', '{phases_json}'::jsonb, 'running');
"""

execute_query(db_config, query)
logger.info(f"Inserted choreography {choreography_id} with participants {participants}")

class MetadataDatabaseRepository:
"""Repository for metadata database operations"""

Expand Down
Loading