diff --git a/controller/breeder_service.py b/controller/breeder_service.py index b188ba8..030363d 100644 --- a/controller/breeder_service.py +++ b/controller/breeder_service.py @@ -251,17 +251,75 @@ def create_breeder(self, breeder_config, name): # Create breeder state table for shutdown signaling in the archive DB self.archive_repo.create_breeder_state_table(breeder_id) + # Wait for the breeder_state table to be fully committed and accessible + # This prevents YugabyteDB serialization conflicts when Optuna starts its DDL operations + from f.controller.database import get_db_connection + max_wait = 10 # seconds + check_interval = 0.5 # seconds + import time + table_ready = False + + for attempt in range(int(max_wait / check_interval)): + try: + # Try to query the table - if it succeeds, the transaction is fully committed + db_config = self.archive_repo.base_config.copy() + db_config['database'] = breeder_id + with get_db_connection(db_config) as conn: + with conn.cursor() as cursor: + cursor.execute("SELECT COUNT(*) FROM breeder_state;") + count = cursor.fetchone()[0] + table_ready = True + logger.info(f"Breeder state table is ready for {breeder_uuid}") + break + except Exception as e: + if attempt < (max_wait / check_interval) - 1: + logger.debug(f"Waiting for breeder_state table to be ready... (attempt {attempt + 1})") + time.sleep(check_interval) + else: + logger.error(f"Breeder state table still not ready after {max_wait}s: {e}") + raise + + if not table_ready: + raise Exception(f"Breeder state table did not become ready within {max_wait}s") + # Initialize Optuna schema to prevent race conditions during worker startup # Multiple workers starting simultaneously would otherwise conflict trying to create tables - try: - db_url = self.archive_repo.get_connection_url(breeder_id) - storage = optuna.storages.RDBStorage(url=db_url) - logger.info(f"Initialized Optuna schema for breeder {breeder_uuid}") - except Exception as e: - logger.error(f"Failed to initialize Optuna schema for breeder {breeder_uuid}: {e}") - # Clean up database if schema initialization fails - self.archive_repo.drop_database(breeder_id) - raise + # Retry logic for YugabyteDB serialization failures and timeouts + max_retries = 5 + storage = None + last_error = None + + for attempt in range(max_retries): + try: + db_url = self.archive_repo.get_connection_url(breeder_id) + storage = optuna.storages.RDBStorage(url=db_url) + logger.info(f"Initialized Optuna schema for breeder {breeder_uuid}") + break + except Exception as e: + last_error = e + error_str = str(e) + # Check for YugabyteDB-specific errors that should be retried + is_retryable = ( + 'SerializationFailure' in error_str or + '40001' in error_str or + 'Transaction aborted' in error_str or + 'Timed out waiting' in error_str or + 'InternalError_' in error_str + ) + + if attempt < max_retries - 1 and is_retryable: + wait_time = 2 ** attempt # Exponential backoff: 2s, 4s, 8s, 16s + logger.warning(f"Optuna schema initialization attempt {attempt + 1}/{max_retries} failed for breeder {breeder_uuid}: {e}") + logger.info(f"Retrying in {wait_time} seconds...") + time.sleep(wait_time) + else: + logger.error(f"Failed to initialize Optuna schema for breeder {breeder_uuid} after {max_retries} attempts: {e}") + # Clean up database if schema initialization fails + try: + self.archive_repo.drop_database(breeder_id) + except Exception as drop_error: + logger.error(f"Failed to cleanup database after Optuna init failure: {drop_error}") + raise self.metadata_repo.create_table() self.metadata_repo.insert_breeder_meta(