Skip to content

Commit aecc6b3

Browse files
authored
Merge pull request #49 from godon-dev/feat/breeder-update
feat: add breeder update with trial compatibility check
2 parents 7382df4 + b038c71 commit aecc6b3

2 files changed

Lines changed: 232 additions & 3 deletions

File tree

controller/breeder_service.py

Lines changed: 212 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66
from dateutil.parser import parse
77

8-
from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository
8+
from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository, execute_query
99
from f.controller.config import BreederConfig, BREEDER_CAPABILITIES, DatabaseConfig
1010
from f.controller.shared.otel_logging import get_logger
1111

@@ -816,7 +816,6 @@ def list_breeders(self):
816816
name = row[1]
817817
creation_ts = row[2]
818818

819-
# Format creation timestamp
820819
if isinstance(creation_ts, str):
821820
created_at = dateutil.parser.parse(creation_ts).isoformat()
822821
else:
@@ -837,4 +836,214 @@ def list_breeders(self):
837836
return {
838837
"result": "FAILURE",
839838
"error": str(e)
840-
}
839+
}
840+
841+
def _count_config_params(self, config):
842+
new_param_count = 0
843+
for category in ['sysctl', 'sysfs', 'cpufreq', 'ethtool']:
844+
category_settings = config.get('settings', {}).get(category, {})
845+
for param_name, param_config in category_settings.items():
846+
if category == 'ethtool':
847+
if isinstance(param_config, dict):
848+
new_param_count += len(param_config)
849+
else:
850+
new_param_count += 1
851+
if new_param_count == 0:
852+
for category_name, category_params in config.get('settings', {}).items():
853+
if isinstance(category_params, dict):
854+
for param_name, param_config in category_params.items():
855+
if isinstance(param_config, dict) and 'constraints' in param_config:
856+
new_param_count += 1
857+
return new_param_count
858+
859+
def _check_trial_compatibility(self, breeder_id, new_config, force=False):
860+
if force:
861+
return True, "Force mode: skipping compatibility check"
862+
863+
try:
864+
import optuna
865+
866+
db_name = f"breeder_{breeder_id.replace('-', '_')}"
867+
db_url = self.archive_repo.get_connection_url(db_name)
868+
storage = optuna.storages.RDBStorage(url=db_url)
869+
870+
studies = storage.get_all_study_names()
871+
if not studies:
872+
return True, "No existing trial data found"
873+
874+
study = optuna.load_study(study_name=studies[0], storage=storage)
875+
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
876+
if not completed_trials:
877+
return True, "No completed trials"
878+
879+
sample_trial = completed_trials[-1]
880+
old_param_count = len(sample_trial.params)
881+
old_obj_count = len(sample_trial.values) if sample_trial.values else 0
882+
883+
new_param_count = self._count_config_params(new_config)
884+
new_obj_count = len(new_config.get('objectives', []))
885+
886+
incompatibilities = []
887+
if old_param_count != new_param_count:
888+
incompatibilities.append(
889+
f"Parameter count: existing trials have {old_param_count}, new config has {new_param_count}"
890+
)
891+
if old_obj_count != new_obj_count:
892+
incompatibilities.append(
893+
f"Objective count: existing trials have {old_obj_count}, new config has {new_obj_count}"
894+
)
895+
896+
if incompatibilities:
897+
detail = "; ".join(incompatibilities)
898+
return False, f"Trial data incompatible: {detail}. Use force=true to clear trial history and start fresh."
899+
900+
return True, f"Compatible: {old_param_count} params, {old_obj_count} objectives"
901+
902+
except Exception as e:
903+
logger.warning(f"Could not check trial compatibility: {e}")
904+
return True, f"Compatibility check skipped: {e}"
905+
906+
def _clear_trial_data(self, breeder_id):
907+
db_name = f"breeder_{breeder_id.replace('-', '_')}"
908+
db_config = self.archive_repo.base_config.copy()
909+
db_config['database'] = db_name
910+
911+
tables_to_drop = ['trials', 'study_directions', 'study_user_attributes',
912+
'study_system_attributes', 'trial_user_attributes',
913+
'trial_system_attributes', 'trial_params',
914+
'trial_values', 'trial_intermediate_values',
915+
'study', 'alembic_version']
916+
917+
for table in tables_to_drop:
918+
try:
919+
execute_query(db_config, f"DROP TABLE IF EXISTS {table} CASCADE;")
920+
except Exception:
921+
pass
922+
923+
logger.info(f"Cleared trial data for breeder: {breeder_id}")
924+
925+
def update_breeder(self, breeder_id, new_config, force=False):
926+
try:
927+
self.metadata_repo.create_table()
928+
breeder_meta_data_row = self.metadata_repo.fetch_meta_data(breeder_id)
929+
930+
if not breeder_meta_data_row or len(breeder_meta_data_row) == 0:
931+
return {"result": "FAILURE", "error": f"Breeder with ID '{breeder_id}' not found"}
932+
933+
old_config = breeder_meta_data_row[0][3]
934+
breeder_instance_name = breeder_meta_data_row[0][1]
935+
936+
new_config['breeder'] = new_config.get('breeder', old_config.get('breeder', {}))
937+
new_config['breeder']['uuid'] = breeder_id
938+
939+
self._resolve_target_refs(new_config)
940+
BreederConfig.validate_minimal(new_config)
941+
942+
compatible, compat_detail = self._check_trial_compatibility(breeder_id, new_config, force=force)
943+
if not compatible:
944+
return {"result": "FAILURE", "error": compat_detail}
945+
946+
logger.info(f"Trial compatibility: {compat_detail}")
947+
948+
__uuid_common_name = f"breeder_{breeder_id.replace('-', '_')}"
949+
self.archive_repo.set_shutdown_requested(__uuid_common_name, value=True)
950+
951+
import time
952+
time.sleep(2)
953+
954+
old_worker_job_ids = old_config.get('worker_job_ids', [])
955+
if old_worker_job_ids:
956+
for job_id in old_worker_job_ids:
957+
cancel_job_by_id(job_id, reason=f"Updating breeder {breeder_id}")
958+
logger.info(f"Cancelled {len(old_worker_job_ids)} old worker jobs")
959+
960+
if force:
961+
self._clear_trial_data(breeder_id)
962+
logger.info(f"Force mode: cleared trial data for breeder {breeder_id}")
963+
964+
config_history = old_config.get('config_history', [])
965+
config_history.append({
966+
'config': {k: v for k, v in old_config.items() if k != 'config_history'},
967+
'updated_at': datetime.datetime.now().isoformat()
968+
})
969+
new_config['config_history'] = config_history
970+
971+
creation_ts_str = old_config.get('creation_ts')
972+
if creation_ts_str:
973+
new_config['creation_ts'] = creation_ts_str
974+
975+
self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config)
976+
977+
parallel_runs = new_config.get('run', {}).get('parallel', 1)
978+
targets = new_config.get('effectuation', {}).get('targets', [])
979+
targets_count = len(targets)
980+
is_cooperative = new_config.get('cooperation', {}).get('active', False)
981+
982+
worker_launch_failures = []
983+
target_count = 0
984+
worker_job_ids = []
985+
986+
for target in targets:
987+
for run_id in range(parallel_runs):
988+
flow_config = new_config.copy()
989+
flow_id = f'{breeder_instance_name}_{target_count}_{run_id}'
990+
991+
if not is_cooperative:
992+
flow_config = determine_config_shard(
993+
run_id=run_id,
994+
target_id=target_count,
995+
targets_count=targets_count,
996+
config=flow_config,
997+
parallel_runs_count=parallel_runs
998+
)
999+
1000+
try:
1001+
_, job_id = start_optimization_flow(
1002+
flow_id=flow_id,
1003+
shard_config=flow_config,
1004+
run_id=run_id,
1005+
target_id=target_count,
1006+
breeder_id=breeder_id
1007+
)
1008+
worker_job_ids.append(job_id)
1009+
except Exception as e:
1010+
worker_launch_failures.append({
1011+
"flow_id": flow_id,
1012+
"target": target_count,
1013+
"run": run_id,
1014+
"error": str(e),
1015+
"error_type": type(e).__name__
1016+
})
1017+
logger.error(f"Failed to launch worker {flow_id}: {e}")
1018+
1019+
target_count += 1
1020+
1021+
if worker_job_ids:
1022+
new_config['worker_job_ids'] = worker_job_ids
1023+
self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config)
1024+
1025+
if worker_launch_failures:
1026+
return {
1027+
"result": "PARTIAL_SUCCESS",
1028+
"error": f"Failed to launch {len(worker_launch_failures)} worker(s)",
1029+
"workers_started": len(worker_job_ids),
1030+
"workers_failed": len(worker_launch_failures),
1031+
"trials_cleared": force
1032+
}
1033+
1034+
logger.info(f"Successfully updated breeder: {breeder_id} (force={force})")
1035+
return {
1036+
"result": "SUCCESS",
1037+
"data": {
1038+
"breeder_id": breeder_id,
1039+
"name": breeder_instance_name,
1040+
"status": "active",
1041+
"workers_started": len(worker_job_ids),
1042+
"trials_cleared": force,
1043+
"config_history_entries": len(config_history)
1044+
}
1045+
}
1046+
1047+
except Exception as e:
1048+
logger.error(f"Failed to update breeder {breeder_id}: {e}")
1049+
return {"result": "FAILURE", "error": str(e)}

controller/breeder_update.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from f.controller.config import DatabaseConfig
2+
from f.controller.breeder_service import BreederService
3+
4+
def main(request_data=None):
5+
breeder_id = request_data.get('breeder_id') if request_data else None
6+
if not breeder_id:
7+
return {"result": "FAILURE", "error": "Missing breeder_id"}
8+
9+
new_config = request_data.get('config')
10+
if not new_config:
11+
return {"result": "FAILURE", "error": "Missing config"}
12+
13+
force = request_data.get('force', False)
14+
15+
service = BreederService(
16+
archive_db_config=DatabaseConfig.ARCHIVE_DB,
17+
meta_db_config=DatabaseConfig.META_DB
18+
)
19+
20+
return service.update_breeder(breeder_id, new_config, force=force)

0 commit comments

Comments
 (0)