Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 67 additions & 9 deletions controller/breeder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,75 @@ def create_breeder(self, breeder_config, name):
# Create breeder state table for shutdown signaling in the archive DB
self.archive_repo.create_breeder_state_table(breeder_id)

# Wait for the breeder_state table to be fully committed and accessible
# This prevents YugabyteDB serialization conflicts when Optuna starts its DDL operations
from f.controller.database import get_db_connection
max_wait = 10 # seconds
check_interval = 0.5 # seconds
import time
table_ready = False

for attempt in range(int(max_wait / check_interval)):
try:
# Try to query the table - if it succeeds, the transaction is fully committed
db_config = self.archive_repo.base_config.copy()
db_config['database'] = breeder_id
with get_db_connection(db_config) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM breeder_state;")
count = cursor.fetchone()[0]
table_ready = True
logger.info(f"Breeder state table is ready for {breeder_uuid}")
break
except Exception as e:
if attempt < (max_wait / check_interval) - 1:
logger.debug(f"Waiting for breeder_state table to be ready... (attempt {attempt + 1})")
time.sleep(check_interval)
else:
logger.error(f"Breeder state table still not ready after {max_wait}s: {e}")
raise

if not table_ready:
raise Exception(f"Breeder state table did not become ready within {max_wait}s")

# Initialize Optuna schema to prevent race conditions during worker startup
# Multiple workers starting simultaneously would otherwise conflict trying to create tables
try:
db_url = self.archive_repo.get_connection_url(breeder_id)
storage = optuna.storages.RDBStorage(url=db_url)
logger.info(f"Initialized Optuna schema for breeder {breeder_uuid}")
except Exception as e:
logger.error(f"Failed to initialize Optuna schema for breeder {breeder_uuid}: {e}")
# Clean up database if schema initialization fails
self.archive_repo.drop_database(breeder_id)
raise
# Retry logic for YugabyteDB serialization failures and timeouts
max_retries = 5
storage = None
last_error = None

for attempt in range(max_retries):
try:
db_url = self.archive_repo.get_connection_url(breeder_id)
storage = optuna.storages.RDBStorage(url=db_url)
logger.info(f"Initialized Optuna schema for breeder {breeder_uuid}")
break
except Exception as e:
last_error = e
error_str = str(e)
# Check for YugabyteDB-specific errors that should be retried
is_retryable = (
'SerializationFailure' in error_str or
'40001' in error_str or
'Transaction aborted' in error_str or
'Timed out waiting' in error_str or
'InternalError_' in error_str
)

if attempt < max_retries - 1 and is_retryable:
wait_time = 2 ** attempt # Exponential backoff: 2s, 4s, 8s, 16s
logger.warning(f"Optuna schema initialization attempt {attempt + 1}/{max_retries} failed for breeder {breeder_uuid}: {e}")
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
logger.error(f"Failed to initialize Optuna schema for breeder {breeder_uuid} after {max_retries} attempts: {e}")
# Clean up database if schema initialization fails
try:
self.archive_repo.drop_database(breeder_id)
except Exception as drop_error:
logger.error(f"Failed to cleanup database after Optuna init failure: {drop_error}")
raise

self.metadata_repo.create_table()
self.metadata_repo.insert_breeder_meta(
Expand Down