Skip to content

Commit fd27925

Browse files
committed
feat: add breeder update with trial compatibility check
Stop current workers, validate new config, check parameter/objective dimension compatibility with existing trial data. Incompatible updates are rejected unless force=true (clears trial history). Config history is preserved for rollback. Workers relaunch on same Optuna database.
1 parent 7382df4 commit fd27925

2 files changed

Lines changed: 218 additions & 31 deletions

File tree

controller/breeder_service.py

Lines changed: 198 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import os
66
from dateutil.parser import parse
77

8-
from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository
8+
from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository, execute_query
99
from f.controller.config import BreederConfig, BREEDER_CAPABILITIES, DatabaseConfig
1010
from f.controller.shared.otel_logging import get_logger
1111

@@ -796,45 +796,212 @@ def delete_breeder(self, breeder_id, force=False):
796796
logger.error(f"Failed to delete breeder {breeder_id}: {e}")
797797
return {"result": "FAILURE", "error": str(e)}
798798

799-
def list_breeders(self):
800-
"""List all breeders"""
799+
def _count_config_params(self, config):
800+
new_param_count = 0
801+
for category in ['sysctl', 'sysfs', 'cpufreq', 'ethtool']:
802+
category_settings = config.get('settings', {}).get(category, {})
803+
for param_name, param_config in category_settings.items():
804+
if category == 'ethtool':
805+
if isinstance(param_config, dict):
806+
new_param_count += len(param_config)
807+
else:
808+
new_param_count += 1
809+
if new_param_count == 0:
810+
for category_name, category_params in config.get('settings', {}).items():
811+
if isinstance(category_params, dict):
812+
for param_name, param_config in category_params.items():
813+
if isinstance(param_config, dict) and 'constraints' in param_config:
814+
new_param_count += 1
815+
return new_param_count
816+
817+
def _check_trial_compatibility(self, breeder_id, new_config, force=False):
818+
if force:
819+
return True, "Force mode: skipping compatibility check"
820+
801821
try:
802-
import dateutil.parser
822+
import optuna
823+
824+
db_name = f"breeder_{breeder_id.replace('-', '_')}"
825+
db_url = self.archive_repo.get_connection_url(db_name)
826+
storage = optuna.storages.RDBStorage(url=db_url)
827+
828+
studies = storage.get_all_study_names()
829+
if not studies:
830+
return True, "No existing trial data found"
831+
832+
study = optuna.load_study(study_name=studies[0], storage=storage)
833+
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
834+
if not completed_trials:
835+
return True, "No completed trials"
836+
837+
sample_trial = completed_trials[-1]
838+
old_param_count = len(sample_trial.params)
839+
old_obj_count = len(sample_trial.values) if sample_trial.values else 0
840+
841+
new_param_count = self._count_config_params(new_config)
842+
new_obj_count = len(new_config.get('objectives', []))
843+
844+
incompatibilities = []
845+
if old_param_count != new_param_count:
846+
incompatibilities.append(
847+
f"Parameter count: existing trials have {old_param_count}, new config has {new_param_count}"
848+
)
849+
if old_obj_count != new_obj_count:
850+
incompatibilities.append(
851+
f"Objective count: existing trials have {old_obj_count}, new config has {new_obj_count}"
852+
)
853+
854+
if incompatibilities:
855+
detail = "; ".join(incompatibilities)
856+
return False, f"Trial data incompatible: {detail}. Use force=true to clear trial history and start fresh."
857+
858+
return True, f"Compatible: {old_param_count} params, {old_obj_count} objectives"
803859

860+
except Exception as e:
861+
logger.warning(f"Could not check trial compatibility: {e}")
862+
return True, f"Compatibility check skipped: {e}"
863+
864+
def _clear_trial_data(self, breeder_id):
865+
db_name = f"breeder_{breeder_id.replace('-', '_')}"
866+
db_config = self.archive_repo.base_config.copy()
867+
db_config['database'] = db_name
868+
869+
tables_to_drop = ['trials', 'study_directions', 'study_user_attributes',
870+
'study_system_attributes', 'trial_user_attributes',
871+
'trial_system_attributes', 'trial_params',
872+
'trial_values', 'trial_intermediate_values',
873+
'study', 'alembic_version']
874+
875+
for table in tables_to_drop:
876+
try:
877+
execute_query(db_config, f"DROP TABLE IF EXISTS {table} CASCADE;")
878+
except Exception:
879+
pass
880+
881+
logger.info(f"Cleared trial data for breeder: {breeder_id}")
882+
883+
def update_breeder(self, breeder_id, new_config, force=False):
884+
try:
804885
self.metadata_repo.create_table()
805-
breeder_meta_data_list = self.metadata_repo.fetch_breeders_list()
886+
breeder_meta_data_row = self.metadata_repo.fetch_meta_data(breeder_id)
806887

807-
if not breeder_meta_data_list:
808-
return {
809-
"result": "SUCCESS",
810-
"data": []
811-
}
888+
if not breeder_meta_data_row or len(breeder_meta_data_row) == 0:
889+
return {"result": "FAILURE", "error": f"Breeder with ID '{breeder_id}' not found"}
812890

813-
configured_breeders = []
814-
for row in breeder_meta_data_list:
815-
breeder_id = row[0]
816-
name = row[1]
817-
creation_ts = row[2]
891+
old_config = breeder_meta_data_row[0][3]
892+
breeder_instance_name = breeder_meta_data_row[0][1]
818893

819-
# Format creation timestamp
820-
if isinstance(creation_ts, str):
821-
created_at = dateutil.parser.parse(creation_ts).isoformat()
822-
else:
823-
created_at = creation_ts.isoformat()
894+
new_config['breeder'] = new_config.get('breeder', old_config.get('breeder', {}))
895+
new_config['breeder']['uuid'] = breeder_id
824896

825-
configured_breeders.append({
826-
"id": breeder_id,
827-
"name": name,
828-
"status": "active",
829-
"createdAt": created_at
830-
})
897+
self._resolve_target_refs(new_config)
898+
BreederConfig.validate_minimal(new_config)
899+
900+
compatible, compat_detail = self._check_trial_compatibility(breeder_id, new_config, force=force)
901+
if not compatible:
902+
return {"result": "FAILURE", "error": compat_detail}
903+
904+
logger.info(f"Trial compatibility: {compat_detail}")
905+
906+
__uuid_common_name = f"breeder_{breeder_id.replace('-', '_')}"
907+
self.archive_repo.set_shutdown_requested(__uuid_common_name, value=True)
908+
909+
import time
910+
time.sleep(2)
911+
912+
old_worker_job_ids = old_config.get('worker_job_ids', [])
913+
if old_worker_job_ids:
914+
for job_id in old_worker_job_ids:
915+
cancel_job_by_id(job_id, reason=f"Updating breeder {breeder_id}")
916+
logger.info(f"Cancelled {len(old_worker_job_ids)} old worker jobs")
917+
918+
if force:
919+
self._clear_trial_data(breeder_id)
920+
logger.info(f"Force mode: cleared trial data for breeder {breeder_id}")
921+
922+
config_history = old_config.get('config_history', [])
923+
config_history.append({
924+
'config': {k: v for k, v in old_config.items() if k != 'config_history'},
925+
'updated_at': datetime.datetime.now().isoformat()
926+
})
927+
new_config['config_history'] = config_history
928+
929+
creation_ts_str = old_config.get('creation_ts')
930+
if creation_ts_str:
931+
new_config['creation_ts'] = creation_ts_str
932+
933+
self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config)
934+
935+
parallel_runs = new_config.get('run', {}).get('parallel', 1)
936+
targets = new_config.get('effectuation', {}).get('targets', [])
937+
targets_count = len(targets)
938+
is_cooperative = new_config.get('cooperation', {}).get('active', False)
939+
940+
worker_launch_failures = []
941+
target_count = 0
942+
worker_job_ids = []
943+
944+
for target in targets:
945+
for run_id in range(parallel_runs):
946+
flow_config = new_config.copy()
947+
flow_id = f'{breeder_instance_name}_{target_count}_{run_id}'
831948

949+
if not is_cooperative:
950+
flow_config = determine_config_shard(
951+
run_id=run_id,
952+
target_id=target_count,
953+
targets_count=targets_count,
954+
config=flow_config,
955+
parallel_runs_count=parallel_runs
956+
)
957+
958+
try:
959+
_, job_id = start_optimization_flow(
960+
flow_id=flow_id,
961+
shard_config=flow_config,
962+
run_id=run_id,
963+
target_id=target_count,
964+
breeder_id=breeder_id
965+
)
966+
worker_job_ids.append(job_id)
967+
except Exception as e:
968+
worker_launch_failures.append({
969+
"flow_id": flow_id,
970+
"target": target_count,
971+
"run": run_id,
972+
"error": str(e),
973+
"error_type": type(e).__name__
974+
})
975+
logger.error(f"Failed to launch worker {flow_id}: {e}")
976+
977+
target_count += 1
978+
979+
if worker_job_ids:
980+
new_config['worker_job_ids'] = worker_job_ids
981+
self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config)
982+
983+
if worker_launch_failures:
984+
return {
985+
"result": "PARTIAL_SUCCESS",
986+
"error": f"Failed to launch {len(worker_launch_failures)} worker(s)",
987+
"workers_started": len(worker_job_ids),
988+
"workers_failed": len(worker_launch_failures),
989+
"trials_cleared": force
990+
}
991+
992+
logger.info(f"Successfully updated breeder: {breeder_id} (force={force})")
832993
return {
833994
"result": "SUCCESS",
834-
"data": configured_breeders
995+
"data": {
996+
"breeder_id": breeder_id,
997+
"name": breeder_instance_name,
998+
"status": "active",
999+
"workers_started": len(worker_job_ids),
1000+
"trials_cleared": force,
1001+
"config_history_entries": len(config_history)
1002+
}
8351003
}
1004+
8361005
except Exception as e:
837-
return {
838-
"result": "FAILURE",
839-
"error": str(e)
840-
}
1006+
logger.error(f"Failed to update breeder {breeder_id}: {e}")
1007+
return {"result": "FAILURE", "error": str(e)}

controller/breeder_update.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from f.controller.config import DatabaseConfig
2+
from f.controller.breeder_service import BreederService
3+
4+
def main(request_data=None):
5+
breeder_id = request_data.get('breeder_id') if request_data else None
6+
if not breeder_id:
7+
return {"result": "FAILURE", "error": "Missing breeder_id"}
8+
9+
new_config = request_data.get('config')
10+
if not new_config:
11+
return {"result": "FAILURE", "error": "Missing config"}
12+
13+
force = request_data.get('force', False)
14+
15+
service = BreederService(
16+
archive_db_config=DatabaseConfig.ARCHIVE_DB,
17+
meta_db_config=DatabaseConfig.META_DB
18+
)
19+
20+
return service.update_breeder(breeder_id, new_config, force=force)

0 commit comments

Comments
 (0)