diff --git a/controller/breeder_service.py b/controller/breeder_service.py index eca5d50..fff111e 100644 --- a/controller/breeder_service.py +++ b/controller/breeder_service.py @@ -5,7 +5,7 @@ import os from dateutil.parser import parse -from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository +from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository, execute_query from f.controller.config import BreederConfig, BREEDER_CAPABILITIES, DatabaseConfig from f.controller.shared.otel_logging import get_logger @@ -816,7 +816,6 @@ def list_breeders(self): name = row[1] creation_ts = row[2] - # Format creation timestamp if isinstance(creation_ts, str): created_at = dateutil.parser.parse(creation_ts).isoformat() else: @@ -837,4 +836,214 @@ def list_breeders(self): return { "result": "FAILURE", "error": str(e) - } \ No newline at end of file + } + + def _count_config_params(self, config): + new_param_count = 0 + for category in ['sysctl', 'sysfs', 'cpufreq', 'ethtool']: + category_settings = config.get('settings', {}).get(category, {}) + for param_name, param_config in category_settings.items(): + if category == 'ethtool': + if isinstance(param_config, dict): + new_param_count += len(param_config) + else: + new_param_count += 1 + if new_param_count == 0: + for category_name, category_params in config.get('settings', {}).items(): + if isinstance(category_params, dict): + for param_name, param_config in category_params.items(): + if isinstance(param_config, dict) and 'constraints' in param_config: + new_param_count += 1 + return new_param_count + + def _check_trial_compatibility(self, breeder_id, new_config, force=False): + if force: + return True, "Force mode: skipping compatibility check" + + try: + import optuna + + db_name = f"breeder_{breeder_id.replace('-', '_')}" + db_url = self.archive_repo.get_connection_url(db_name) + storage = optuna.storages.RDBStorage(url=db_url) + + studies = storage.get_all_study_names() + if not studies: + return True, "No existing trial data found" + + study = optuna.load_study(study_name=studies[0], storage=storage) + completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE] + if not completed_trials: + return True, "No completed trials" + + sample_trial = completed_trials[-1] + old_param_count = len(sample_trial.params) + old_obj_count = len(sample_trial.values) if sample_trial.values else 0 + + new_param_count = self._count_config_params(new_config) + new_obj_count = len(new_config.get('objectives', [])) + + incompatibilities = [] + if old_param_count != new_param_count: + incompatibilities.append( + f"Parameter count: existing trials have {old_param_count}, new config has {new_param_count}" + ) + if old_obj_count != new_obj_count: + incompatibilities.append( + f"Objective count: existing trials have {old_obj_count}, new config has {new_obj_count}" + ) + + if incompatibilities: + detail = "; ".join(incompatibilities) + return False, f"Trial data incompatible: {detail}. Use force=true to clear trial history and start fresh." + + return True, f"Compatible: {old_param_count} params, {old_obj_count} objectives" + + except Exception as e: + logger.warning(f"Could not check trial compatibility: {e}") + return True, f"Compatibility check skipped: {e}" + + def _clear_trial_data(self, breeder_id): + db_name = f"breeder_{breeder_id.replace('-', '_')}" + db_config = self.archive_repo.base_config.copy() + db_config['database'] = db_name + + tables_to_drop = ['trials', 'study_directions', 'study_user_attributes', + 'study_system_attributes', 'trial_user_attributes', + 'trial_system_attributes', 'trial_params', + 'trial_values', 'trial_intermediate_values', + 'study', 'alembic_version'] + + for table in tables_to_drop: + try: + execute_query(db_config, f"DROP TABLE IF EXISTS {table} CASCADE;") + except Exception: + pass + + logger.info(f"Cleared trial data for breeder: {breeder_id}") + + def update_breeder(self, breeder_id, new_config, force=False): + try: + self.metadata_repo.create_table() + breeder_meta_data_row = self.metadata_repo.fetch_meta_data(breeder_id) + + if not breeder_meta_data_row or len(breeder_meta_data_row) == 0: + return {"result": "FAILURE", "error": f"Breeder with ID '{breeder_id}' not found"} + + old_config = breeder_meta_data_row[0][3] + breeder_instance_name = breeder_meta_data_row[0][1] + + new_config['breeder'] = new_config.get('breeder', old_config.get('breeder', {})) + new_config['breeder']['uuid'] = breeder_id + + self._resolve_target_refs(new_config) + BreederConfig.validate_minimal(new_config) + + compatible, compat_detail = self._check_trial_compatibility(breeder_id, new_config, force=force) + if not compatible: + return {"result": "FAILURE", "error": compat_detail} + + logger.info(f"Trial compatibility: {compat_detail}") + + __uuid_common_name = f"breeder_{breeder_id.replace('-', '_')}" + self.archive_repo.set_shutdown_requested(__uuid_common_name, value=True) + + import time + time.sleep(2) + + old_worker_job_ids = old_config.get('worker_job_ids', []) + if old_worker_job_ids: + for job_id in old_worker_job_ids: + cancel_job_by_id(job_id, reason=f"Updating breeder {breeder_id}") + logger.info(f"Cancelled {len(old_worker_job_ids)} old worker jobs") + + if force: + self._clear_trial_data(breeder_id) + logger.info(f"Force mode: cleared trial data for breeder {breeder_id}") + + config_history = old_config.get('config_history', []) + config_history.append({ + 'config': {k: v for k, v in old_config.items() if k != 'config_history'}, + 'updated_at': datetime.datetime.now().isoformat() + }) + new_config['config_history'] = config_history + + creation_ts_str = old_config.get('creation_ts') + if creation_ts_str: + new_config['creation_ts'] = creation_ts_str + + self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config) + + parallel_runs = new_config.get('run', {}).get('parallel', 1) + targets = new_config.get('effectuation', {}).get('targets', []) + targets_count = len(targets) + is_cooperative = new_config.get('cooperation', {}).get('active', False) + + worker_launch_failures = [] + target_count = 0 + worker_job_ids = [] + + for target in targets: + for run_id in range(parallel_runs): + flow_config = new_config.copy() + flow_id = f'{breeder_instance_name}_{target_count}_{run_id}' + + if not is_cooperative: + flow_config = determine_config_shard( + run_id=run_id, + target_id=target_count, + targets_count=targets_count, + config=flow_config, + parallel_runs_count=parallel_runs + ) + + try: + _, job_id = start_optimization_flow( + flow_id=flow_id, + shard_config=flow_config, + run_id=run_id, + target_id=target_count, + breeder_id=breeder_id + ) + worker_job_ids.append(job_id) + except Exception as e: + worker_launch_failures.append({ + "flow_id": flow_id, + "target": target_count, + "run": run_id, + "error": str(e), + "error_type": type(e).__name__ + }) + logger.error(f"Failed to launch worker {flow_id}: {e}") + + target_count += 1 + + if worker_job_ids: + new_config['worker_job_ids'] = worker_job_ids + self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config) + + if worker_launch_failures: + return { + "result": "PARTIAL_SUCCESS", + "error": f"Failed to launch {len(worker_launch_failures)} worker(s)", + "workers_started": len(worker_job_ids), + "workers_failed": len(worker_launch_failures), + "trials_cleared": force + } + + logger.info(f"Successfully updated breeder: {breeder_id} (force={force})") + return { + "result": "SUCCESS", + "data": { + "breeder_id": breeder_id, + "name": breeder_instance_name, + "status": "active", + "workers_started": len(worker_job_ids), + "trials_cleared": force, + "config_history_entries": len(config_history) + } + } + + except Exception as e: + logger.error(f"Failed to update breeder {breeder_id}: {e}") + return {"result": "FAILURE", "error": str(e)} \ No newline at end of file diff --git a/controller/breeder_update.py b/controller/breeder_update.py new file mode 100644 index 0000000..168ab78 --- /dev/null +++ b/controller/breeder_update.py @@ -0,0 +1,20 @@ +from f.controller.config import DatabaseConfig +from f.controller.breeder_service import BreederService + +def main(request_data=None): + breeder_id = request_data.get('breeder_id') if request_data else None + if not breeder_id: + return {"result": "FAILURE", "error": "Missing breeder_id"} + + new_config = request_data.get('config') + if not new_config: + return {"result": "FAILURE", "error": "Missing config"} + + force = request_data.get('force', False) + + service = BreederService( + archive_db_config=DatabaseConfig.ARCHIVE_DB, + meta_db_config=DatabaseConfig.META_DB + ) + + return service.update_breeder(breeder_id, new_config, force=force)