Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 212 additions & 3 deletions controller/breeder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from dateutil.parser import parse

from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository
from f.controller.database import ArchiveDatabaseRepository, MetadataDatabaseRepository, execute_query
from f.controller.config import BreederConfig, BREEDER_CAPABILITIES, DatabaseConfig
from f.controller.shared.otel_logging import get_logger

Expand Down Expand Up @@ -816,7 +816,6 @@ def list_breeders(self):
name = row[1]
creation_ts = row[2]

# Format creation timestamp
if isinstance(creation_ts, str):
created_at = dateutil.parser.parse(creation_ts).isoformat()
else:
Expand All @@ -837,4 +836,214 @@ def list_breeders(self):
return {
"result": "FAILURE",
"error": str(e)
}
}

def _count_config_params(self, config):
new_param_count = 0
for category in ['sysctl', 'sysfs', 'cpufreq', 'ethtool']:
category_settings = config.get('settings', {}).get(category, {})
for param_name, param_config in category_settings.items():
if category == 'ethtool':
if isinstance(param_config, dict):
new_param_count += len(param_config)
else:
new_param_count += 1
if new_param_count == 0:
for category_name, category_params in config.get('settings', {}).items():
if isinstance(category_params, dict):
for param_name, param_config in category_params.items():
if isinstance(param_config, dict) and 'constraints' in param_config:
new_param_count += 1
return new_param_count

def _check_trial_compatibility(self, breeder_id, new_config, force=False):
if force:
return True, "Force mode: skipping compatibility check"

try:
import optuna

db_name = f"breeder_{breeder_id.replace('-', '_')}"
db_url = self.archive_repo.get_connection_url(db_name)
storage = optuna.storages.RDBStorage(url=db_url)

studies = storage.get_all_study_names()
if not studies:
return True, "No existing trial data found"

study = optuna.load_study(study_name=studies[0], storage=storage)
completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
if not completed_trials:
return True, "No completed trials"

sample_trial = completed_trials[-1]
old_param_count = len(sample_trial.params)
old_obj_count = len(sample_trial.values) if sample_trial.values else 0

new_param_count = self._count_config_params(new_config)
new_obj_count = len(new_config.get('objectives', []))

incompatibilities = []
if old_param_count != new_param_count:
incompatibilities.append(
f"Parameter count: existing trials have {old_param_count}, new config has {new_param_count}"
)
if old_obj_count != new_obj_count:
incompatibilities.append(
f"Objective count: existing trials have {old_obj_count}, new config has {new_obj_count}"
)

if incompatibilities:
detail = "; ".join(incompatibilities)
return False, f"Trial data incompatible: {detail}. Use force=true to clear trial history and start fresh."

return True, f"Compatible: {old_param_count} params, {old_obj_count} objectives"

except Exception as e:
logger.warning(f"Could not check trial compatibility: {e}")
return True, f"Compatibility check skipped: {e}"

def _clear_trial_data(self, breeder_id):
db_name = f"breeder_{breeder_id.replace('-', '_')}"
db_config = self.archive_repo.base_config.copy()
db_config['database'] = db_name

tables_to_drop = ['trials', 'study_directions', 'study_user_attributes',
'study_system_attributes', 'trial_user_attributes',
'trial_system_attributes', 'trial_params',
'trial_values', 'trial_intermediate_values',
'study', 'alembic_version']

for table in tables_to_drop:
try:
execute_query(db_config, f"DROP TABLE IF EXISTS {table} CASCADE;")
except Exception:
pass

logger.info(f"Cleared trial data for breeder: {breeder_id}")

def update_breeder(self, breeder_id, new_config, force=False):
try:
self.metadata_repo.create_table()
breeder_meta_data_row = self.metadata_repo.fetch_meta_data(breeder_id)

if not breeder_meta_data_row or len(breeder_meta_data_row) == 0:
return {"result": "FAILURE", "error": f"Breeder with ID '{breeder_id}' not found"}

old_config = breeder_meta_data_row[0][3]
breeder_instance_name = breeder_meta_data_row[0][1]

new_config['breeder'] = new_config.get('breeder', old_config.get('breeder', {}))
new_config['breeder']['uuid'] = breeder_id

self._resolve_target_refs(new_config)
BreederConfig.validate_minimal(new_config)

compatible, compat_detail = self._check_trial_compatibility(breeder_id, new_config, force=force)
if not compatible:
return {"result": "FAILURE", "error": compat_detail}

logger.info(f"Trial compatibility: {compat_detail}")

__uuid_common_name = f"breeder_{breeder_id.replace('-', '_')}"
self.archive_repo.set_shutdown_requested(__uuid_common_name, value=True)

import time
time.sleep(2)

old_worker_job_ids = old_config.get('worker_job_ids', [])
if old_worker_job_ids:
for job_id in old_worker_job_ids:
cancel_job_by_id(job_id, reason=f"Updating breeder {breeder_id}")
logger.info(f"Cancelled {len(old_worker_job_ids)} old worker jobs")

if force:
self._clear_trial_data(breeder_id)
logger.info(f"Force mode: cleared trial data for breeder {breeder_id}")

config_history = old_config.get('config_history', [])
config_history.append({
'config': {k: v for k, v in old_config.items() if k != 'config_history'},
'updated_at': datetime.datetime.now().isoformat()
})
new_config['config_history'] = config_history

creation_ts_str = old_config.get('creation_ts')
if creation_ts_str:
new_config['creation_ts'] = creation_ts_str

self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config)

parallel_runs = new_config.get('run', {}).get('parallel', 1)
targets = new_config.get('effectuation', {}).get('targets', [])
targets_count = len(targets)
is_cooperative = new_config.get('cooperation', {}).get('active', False)

worker_launch_failures = []
target_count = 0
worker_job_ids = []

for target in targets:
for run_id in range(parallel_runs):
flow_config = new_config.copy()
flow_id = f'{breeder_instance_name}_{target_count}_{run_id}'

if not is_cooperative:
flow_config = determine_config_shard(
run_id=run_id,
target_id=target_count,
targets_count=targets_count,
config=flow_config,
parallel_runs_count=parallel_runs
)

try:
_, job_id = start_optimization_flow(
flow_id=flow_id,
shard_config=flow_config,
run_id=run_id,
target_id=target_count,
breeder_id=breeder_id
)
worker_job_ids.append(job_id)
except Exception as e:
worker_launch_failures.append({
"flow_id": flow_id,
"target": target_count,
"run": run_id,
"error": str(e),
"error_type": type(e).__name__
})
logger.error(f"Failed to launch worker {flow_id}: {e}")

target_count += 1

if worker_job_ids:
new_config['worker_job_ids'] = worker_job_ids
self.metadata_repo.update_breeder_meta(breeder_id=breeder_id, meta_state=new_config)

if worker_launch_failures:
return {
"result": "PARTIAL_SUCCESS",
"error": f"Failed to launch {len(worker_launch_failures)} worker(s)",
"workers_started": len(worker_job_ids),
"workers_failed": len(worker_launch_failures),
"trials_cleared": force
}

logger.info(f"Successfully updated breeder: {breeder_id} (force={force})")
return {
"result": "SUCCESS",
"data": {
"breeder_id": breeder_id,
"name": breeder_instance_name,
"status": "active",
"workers_started": len(worker_job_ids),
"trials_cleared": force,
"config_history_entries": len(config_history)
}
}

except Exception as e:
logger.error(f"Failed to update breeder {breeder_id}: {e}")
return {"result": "FAILURE", "error": str(e)}
20 changes: 20 additions & 0 deletions controller/breeder_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from f.controller.config import DatabaseConfig
from f.controller.breeder_service import BreederService

def main(request_data=None):
breeder_id = request_data.get('breeder_id') if request_data else None
if not breeder_id:
return {"result": "FAILURE", "error": "Missing breeder_id"}

new_config = request_data.get('config')
if not new_config:
return {"result": "FAILURE", "error": "Missing config"}

force = request_data.get('force', False)

service = BreederService(
archive_db_config=DatabaseConfig.ARCHIVE_DB,
meta_db_config=DatabaseConfig.META_DB
)

return service.update_breeder(breeder_id, new_config, force=force)
Loading