Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/conductor/client/automator/async_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,18 @@ async def __async_register_task_definition(self) -> None:
output_schema_name = None
schema_registry_available = True

if hasattr(self.worker, 'execute_function'):
# Check if schema registration is enabled for this worker
register_schema = getattr(self.worker, 'register_schema', False)
# Also check global Configuration default
if hasattr(self.configuration, 'register_schema') and self.configuration.register_schema is not None:
# Worker-level setting takes precedence if explicitly set (not default)
if not hasattr(self.worker, 'register_schema'):
register_schema = self.configuration.register_schema

if not register_schema:
logger.debug(f"Schema registration disabled for {task_name} (register_schema=False)")

if register_schema and hasattr(self.worker, 'execute_function'):
logger.debug(f"Generating JSON schemas from function signature...")
# Pass strict_schema flag to control additionalProperties
strict_mode = getattr(self.worker, 'strict_schema', False)
Expand Down Expand Up @@ -314,6 +325,8 @@ async def __async_register_task_definition(self) -> None:
logger.debug(f"Could not register schemas for {task_name}: {e}")
else:
logger.debug(f" ⚠ No schemas generated (unable to analyze function signature)")
elif not register_schema:
pass # Already logged above
else:
logger.debug(f" ⚠ Class-based worker (no execute_function) - registering task without schemas")

Expand Down
20 changes: 15 additions & 5 deletions src/conductor/client/automator/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def _run_async_worker_process(
def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id: str, func,
thread_count: int = 1, register_task_def: bool = False,
poll_timeout: int = 100, lease_extend_enabled: bool = False, task_def: Optional['TaskDef'] = None,
overwrite_task_def: bool = True, strict_schema: bool = False):
overwrite_task_def: bool = True, strict_schema: bool = False,
register_schema: Optional[bool] = None):
logger.debug("decorated %s", name)
_decorated_functions[(name, domain)] = {
"func": func,
Expand All @@ -86,7 +87,8 @@ def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id:
"lease_extend_enabled": lease_extend_enabled,
"task_def": task_def,
"overwrite_task_def": overwrite_task_def,
"strict_schema": strict_schema
"strict_schema": strict_schema,
"register_schema": register_schema
}


Expand All @@ -112,7 +114,8 @@ def get_registered_workers() -> List[Worker]:
paused=False, # Always default to False, only env vars can set to True
task_def_template=record.get("task_def"), # Optional TaskDef configuration
overwrite_task_def=record.get("overwrite_task_def", True),
strict_schema=record.get("strict_schema", False)
strict_schema=record.get("strict_schema", False),
register_schema=record.get("register_schema") if record.get("register_schema") is not None else False
)
workers.append(worker)
return workers
Expand Down Expand Up @@ -254,9 +257,15 @@ def __init__(
'poll_timeout': record.get("poll_timeout", 100),
'lease_extend_enabled': record.get("lease_extend_enabled", True),
'overwrite_task_def': record.get("overwrite_task_def", True),
'strict_schema': record.get("strict_schema", False)
'strict_schema': record.get("strict_schema", False),
'register_schema': record.get("register_schema")
}

# Apply global Configuration.register_schema as fallback when
# the decorator doesn't set it explicitly (None).
if code_config['register_schema'] is None and hasattr(configuration, 'register_schema') and configuration.register_schema is not None:
code_config['register_schema'] = configuration.register_schema

# Resolve configuration with environment variable overrides
resolved_config = resolve_worker_config(
worker_name=task_def_name,
Expand All @@ -275,7 +284,8 @@ def __init__(
lease_extend_enabled=resolved_config['lease_extend_enabled'],
task_def_template=record.get("task_def"), # Pass TaskDef configuration
overwrite_task_def=resolved_config.get('overwrite_task_def', True),
strict_schema=resolved_config.get('strict_schema', False))
strict_schema=resolved_config.get('strict_schema', False),
register_schema=resolved_config.get('register_schema', False))
logger.debug("created worker with name=%s and domain=%s", task_def_name, resolved_config['domain'])
workers.append(worker)

Expand Down
15 changes: 14 additions & 1 deletion src/conductor/client/automator/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,18 @@ def __register_task_definition(self) -> None:
output_schema_name = None
schema_registry_available = True

if hasattr(self.worker, 'execute_function'):
# Check if schema registration is enabled for this worker
register_schema = getattr(self.worker, 'register_schema', False)
# Also check global Configuration default
if hasattr(self.configuration, 'register_schema') and self.configuration.register_schema is not None:
# Worker-level setting takes precedence if explicitly set (not default)
if not hasattr(self.worker, 'register_schema'):
register_schema = self.configuration.register_schema

if not register_schema:
logger.debug(f"Schema registration disabled for {task_name} (register_schema=False)")

if register_schema and hasattr(self.worker, 'execute_function'):
logger.debug(f"Generating JSON schemas from function signature...")
# Pass strict_schema flag to control additionalProperties
strict_mode = getattr(self.worker, 'strict_schema', False)
Expand Down Expand Up @@ -287,6 +298,8 @@ def __register_task_definition(self) -> None:
logger.debug(f"Could not register schemas for {task_name}: {e}")
else:
logger.debug(f" ⚠ No schemas generated (unable to analyze function signature)")
elif not register_schema:
pass # Already logged above
else:
logger.debug(f" ⚠ Class-based worker (no execute_function) - registering task without schemas")

Expand Down
13 changes: 12 additions & 1 deletion src/conductor/client/configuration/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def __init__(
debug: bool = False,
authentication_settings: AuthenticationSettings = None,
server_api_url: Optional[str] = None,
auth_token_ttl_min: int = 45
auth_token_ttl_min: int = 45,
register_schema: Optional[bool] = None
):
if server_api_url is not None:
self.host = server_api_url
Expand Down Expand Up @@ -84,6 +85,16 @@ def __init__(
# Provide an alterative to requests.Session() for HTTP connection.
self.http_connection = None

# Global default for schema registration (None = defer to per-worker config)
if register_schema is not None:
self.register_schema = register_schema
else:
env_val = os.getenv("CONDUCTOR_REGISTER_SCHEMAS")
if env_val is not None:
self.register_schema = env_val.strip().lower() in ('true', '1', 'yes', 'on')
else:
self.register_schema = None # No global override; per-worker default (True) applies

# not updated yet
self.token_update_time = 0
self.auth_token_ttl_msec = auth_token_ttl_min * 60 * 1000
Expand Down
4 changes: 3 additions & 1 deletion src/conductor/client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ def __init__(self,
paused: bool = False,
task_def_template: Optional['TaskDef'] = None,
overwrite_task_def: bool = True,
strict_schema: bool = False
strict_schema: bool = False,
register_schema: bool = False
) -> Self:
super().__init__(task_definition_name)
self.api_client = ApiClient()
Expand All @@ -324,6 +325,7 @@ def __init__(self,
self.task_def_template = task_def_template # Optional TaskDef configuration
self.overwrite_task_def = overwrite_task_def # Whether to overwrite existing task definitions
self.strict_schema = strict_schema # Whether to enforce strict schema (additionalProperties=false)
self.register_schema = register_schema # Whether to register JSON schemas alongside task definitions

# Initialize background event loop for async workers
self._background_loop = None
Expand Down
12 changes: 10 additions & 2 deletions src/conductor/client/worker/worker_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def process_order(order_id: str):
'lease_extend_enabled': 'lease_extend_enabled',
'paused': 'paused',
'overwrite_task_def': 'overwrite_task_def',
'strict_schema': 'strict_schema'
'strict_schema': 'strict_schema',
'register_schema': 'register_schema'
}


Expand Down Expand Up @@ -186,7 +187,8 @@ def resolve_worker_config(
lease_extend_enabled: Optional[bool] = None,
paused: Optional[bool] = None,
overwrite_task_def: Optional[bool] = None,
strict_schema: Optional[bool] = None
strict_schema: Optional[bool] = None,
register_schema: Optional[bool] = None
) -> dict:
"""
Resolve worker configuration with hierarchical override.
Expand All @@ -208,6 +210,7 @@ def resolve_worker_config(
paused: Whether worker is paused (code-level default)
overwrite_task_def: Whether to overwrite existing task definitions (code-level default, default=True)
strict_schema: Whether to set additionalProperties=false in schemas (code-level default, default=False)
register_schema: Whether to register JSON schemas alongside task definitions (code-level default, default=False)

Returns:
Dict with resolved configuration values
Expand Down Expand Up @@ -269,6 +272,11 @@ def resolve_worker_config(
env_strict = _get_env_value(worker_name, 'strict_schema', bool)
resolved['strict_schema'] = env_strict if env_strict is not None else (strict_schema if strict_schema is not None else False)

# Resolve register_schema (default: False)
# Priority: per-worker env > global env (conductor.worker.all) > code-level > default (False)
env_register_schema = _get_env_value(worker_name, 'register_schema', bool)
resolved['register_schema'] = env_register_schema if env_register_schema is not None else (register_schema if register_schema is not None else False)

return resolved


Expand Down
18 changes: 14 additions & 4 deletions src/conductor/client/worker/worker_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

def WorkerTask(task_definition_name: str, poll_interval: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None,
poll_interval_seconds: int = 0, thread_count: int = 1, register_task_def: bool = False,
poll_timeout: int = 100, lease_extend_enabled: bool = False):
poll_timeout: int = 100, lease_extend_enabled: bool = False, register_schema: Optional[bool] = None):
"""
Decorator to register a function as a Conductor worker task (legacy CamelCase name).

Expand Down Expand Up @@ -62,7 +62,7 @@ def worker_task_func(func):
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
worker_id=worker_id, thread_count=thread_count, register_task_def=register_task_def,
poll_timeout=poll_timeout, lease_extend_enabled=lease_extend_enabled,
func=func)
register_schema=register_schema, func=func)

@functools.wraps(func)
def wrapper_func(*args, **kwargs):
Expand All @@ -80,7 +80,8 @@ def wrapper_func(*args, **kwargs):

def worker_task(task_definition_name: str, poll_interval_millis: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None,
thread_count: int = 1, register_task_def: bool = False, poll_timeout: int = 100, lease_extend_enabled: bool = False,
task_def: Optional['TaskDef'] = None, overwrite_task_def: bool = True, strict_schema: bool = False):
task_def: Optional['TaskDef'] = None, overwrite_task_def: bool = True, strict_schema: bool = False,
register_schema: Optional[bool] = None):
"""
Decorator to register a function as a Conductor worker task.

Expand Down Expand Up @@ -156,6 +157,14 @@ def worker_task(task_definition_name: str, poll_interval_millis: int = 100, doma
- When True: additionalProperties=false (strict validation)
- Can be overridden via env: conductor.worker.<name>.strict_schema=true

register_schema: Whether to register JSON schemas alongside task definitions.
- Default: None (inherits from Configuration.register_schema or False)
- When True: Input/output JSON schemas are generated from type hints and registered
- When False: Task definition is registered without schemas
- Can be set globally via Configuration(register_schema=True) or env: CONDUCTOR_REGISTER_SCHEMAS=true
- Can be overridden per-worker via env: conductor.worker.<name>.register_schema=true
- Priority: per-worker env > global env (conductor.worker.all) > decorator > Configuration > default (False)

Returns:
Decorated function that can be called normally or used as a workflow task

Expand Down Expand Up @@ -189,7 +198,8 @@ def worker_task_func(func):
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
worker_id=worker_id, thread_count=thread_count, register_task_def=register_task_def,
poll_timeout=poll_timeout, lease_extend_enabled=lease_extend_enabled, task_def=task_def,
overwrite_task_def=overwrite_task_def, strict_schema=strict_schema, func=func)
overwrite_task_def=overwrite_task_def, strict_schema=strict_schema,
register_schema=register_schema, func=func)

@functools.wraps(func)
def wrapper_func(*args, **kwargs):
Expand Down
Loading
Loading