diff --git a/src/conductor/client/automator/async_task_runner.py b/src/conductor/client/automator/async_task_runner.py index e07596f3..0801b0d8 100644 --- a/src/conductor/client/automator/async_task_runner.py +++ b/src/conductor/client/automator/async_task_runner.py @@ -232,7 +232,18 @@ async def __async_register_task_definition(self) -> None: output_schema_name = None schema_registry_available = True - if hasattr(self.worker, 'execute_function'): + # Check if schema registration is enabled for this worker + register_schema = getattr(self.worker, 'register_schema', False) + # Also check global Configuration default + if hasattr(self.configuration, 'register_schema') and self.configuration.register_schema is not None: + # Worker-level setting takes precedence if explicitly set (not default) + if not hasattr(self.worker, 'register_schema'): + register_schema = self.configuration.register_schema + + if not register_schema: + logger.debug(f"Schema registration disabled for {task_name} (register_schema=False)") + + if register_schema and hasattr(self.worker, 'execute_function'): logger.debug(f"Generating JSON schemas from function signature...") # Pass strict_schema flag to control additionalProperties strict_mode = getattr(self.worker, 'strict_schema', False) @@ -314,6 +325,8 @@ async def __async_register_task_definition(self) -> None: logger.debug(f"Could not register schemas for {task_name}: {e}") else: logger.debug(f" ⚠ No schemas generated (unable to analyze function signature)") + elif not register_schema: + pass # Already logged above else: logger.debug(f" ⚠ Class-based worker (no execute_function) - registering task without schemas") diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index c032ede0..2e3616f1 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -73,7 +73,8 @@ def _run_async_worker_process( def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id: str, func, thread_count: int = 1, register_task_def: bool = False, poll_timeout: int = 100, lease_extend_enabled: bool = False, task_def: Optional['TaskDef'] = None, - overwrite_task_def: bool = True, strict_schema: bool = False): + overwrite_task_def: bool = True, strict_schema: bool = False, + register_schema: Optional[bool] = None): logger.debug("decorated %s", name) _decorated_functions[(name, domain)] = { "func": func, @@ -86,7 +87,8 @@ def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id: "lease_extend_enabled": lease_extend_enabled, "task_def": task_def, "overwrite_task_def": overwrite_task_def, - "strict_schema": strict_schema + "strict_schema": strict_schema, + "register_schema": register_schema } @@ -112,7 +114,8 @@ def get_registered_workers() -> List[Worker]: paused=False, # Always default to False, only env vars can set to True task_def_template=record.get("task_def"), # Optional TaskDef configuration overwrite_task_def=record.get("overwrite_task_def", True), - strict_schema=record.get("strict_schema", False) + strict_schema=record.get("strict_schema", False), + register_schema=record.get("register_schema") if record.get("register_schema") is not None else False ) workers.append(worker) return workers @@ -254,9 +257,15 @@ def __init__( 'poll_timeout': record.get("poll_timeout", 100), 'lease_extend_enabled': record.get("lease_extend_enabled", True), 'overwrite_task_def': record.get("overwrite_task_def", True), - 'strict_schema': record.get("strict_schema", False) + 'strict_schema': record.get("strict_schema", False), + 'register_schema': record.get("register_schema") } + # Apply global Configuration.register_schema as fallback when + # the decorator doesn't set it explicitly (None). + if code_config['register_schema'] is None and hasattr(configuration, 'register_schema') and configuration.register_schema is not None: + code_config['register_schema'] = configuration.register_schema + # Resolve configuration with environment variable overrides resolved_config = resolve_worker_config( worker_name=task_def_name, @@ -275,7 +284,8 @@ def __init__( lease_extend_enabled=resolved_config['lease_extend_enabled'], task_def_template=record.get("task_def"), # Pass TaskDef configuration overwrite_task_def=resolved_config.get('overwrite_task_def', True), - strict_schema=resolved_config.get('strict_schema', False)) + strict_schema=resolved_config.get('strict_schema', False), + register_schema=resolved_config.get('register_schema', False)) logger.debug("created worker with name=%s and domain=%s", task_def_name, resolved_config['domain']) workers.append(worker) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index f14f5994..6ea95504 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -204,7 +204,18 @@ def __register_task_definition(self) -> None: output_schema_name = None schema_registry_available = True - if hasattr(self.worker, 'execute_function'): + # Check if schema registration is enabled for this worker + register_schema = getattr(self.worker, 'register_schema', False) + # Also check global Configuration default + if hasattr(self.configuration, 'register_schema') and self.configuration.register_schema is not None: + # Worker-level setting takes precedence if explicitly set (not default) + if not hasattr(self.worker, 'register_schema'): + register_schema = self.configuration.register_schema + + if not register_schema: + logger.debug(f"Schema registration disabled for {task_name} (register_schema=False)") + + if register_schema and hasattr(self.worker, 'execute_function'): logger.debug(f"Generating JSON schemas from function signature...") # Pass strict_schema flag to control additionalProperties strict_mode = getattr(self.worker, 'strict_schema', False) @@ -287,6 +298,8 @@ def __register_task_definition(self) -> None: logger.debug(f"Could not register schemas for {task_name}: {e}") else: logger.debug(f" ⚠ No schemas generated (unable to analyze function signature)") + elif not register_schema: + pass # Already logged above else: logger.debug(f" ⚠ Class-based worker (no execute_function) - registering task without schemas") diff --git a/src/conductor/client/configuration/configuration.py b/src/conductor/client/configuration/configuration.py index 3242ceb1..c75877fa 100644 --- a/src/conductor/client/configuration/configuration.py +++ b/src/conductor/client/configuration/configuration.py @@ -30,7 +30,8 @@ def __init__( debug: bool = False, authentication_settings: AuthenticationSettings = None, server_api_url: Optional[str] = None, - auth_token_ttl_min: int = 45 + auth_token_ttl_min: int = 45, + register_schema: Optional[bool] = None ): if server_api_url is not None: self.host = server_api_url @@ -84,6 +85,16 @@ def __init__( # Provide an alterative to requests.Session() for HTTP connection. self.http_connection = None + # Global default for schema registration (None = defer to per-worker config) + if register_schema is not None: + self.register_schema = register_schema + else: + env_val = os.getenv("CONDUCTOR_REGISTER_SCHEMAS") + if env_val is not None: + self.register_schema = env_val.strip().lower() in ('true', '1', 'yes', 'on') + else: + self.register_schema = None # No global override; per-worker default (True) applies + # not updated yet self.token_update_time = 0 self.auth_token_ttl_msec = auth_token_ttl_min * 60 * 1000 diff --git a/src/conductor/client/worker/worker.py b/src/conductor/client/worker/worker.py index 829796a6..a5855a2e 100644 --- a/src/conductor/client/worker/worker.py +++ b/src/conductor/client/worker/worker.py @@ -302,7 +302,8 @@ def __init__(self, paused: bool = False, task_def_template: Optional['TaskDef'] = None, overwrite_task_def: bool = True, - strict_schema: bool = False + strict_schema: bool = False, + register_schema: bool = False ) -> Self: super().__init__(task_definition_name) self.api_client = ApiClient() @@ -324,6 +325,7 @@ def __init__(self, self.task_def_template = task_def_template # Optional TaskDef configuration self.overwrite_task_def = overwrite_task_def # Whether to overwrite existing task definitions self.strict_schema = strict_schema # Whether to enforce strict schema (additionalProperties=false) + self.register_schema = register_schema # Whether to register JSON schemas alongside task definitions # Initialize background event loop for async workers self._background_loop = None diff --git a/src/conductor/client/worker/worker_config.py b/src/conductor/client/worker/worker_config.py index 84dd7439..70b35ddb 100644 --- a/src/conductor/client/worker/worker_config.py +++ b/src/conductor/client/worker/worker_config.py @@ -38,7 +38,8 @@ def process_order(order_id: str): 'lease_extend_enabled': 'lease_extend_enabled', 'paused': 'paused', 'overwrite_task_def': 'overwrite_task_def', - 'strict_schema': 'strict_schema' + 'strict_schema': 'strict_schema', + 'register_schema': 'register_schema' } @@ -186,7 +187,8 @@ def resolve_worker_config( lease_extend_enabled: Optional[bool] = None, paused: Optional[bool] = None, overwrite_task_def: Optional[bool] = None, - strict_schema: Optional[bool] = None + strict_schema: Optional[bool] = None, + register_schema: Optional[bool] = None ) -> dict: """ Resolve worker configuration with hierarchical override. @@ -208,6 +210,7 @@ def resolve_worker_config( paused: Whether worker is paused (code-level default) overwrite_task_def: Whether to overwrite existing task definitions (code-level default, default=True) strict_schema: Whether to set additionalProperties=false in schemas (code-level default, default=False) + register_schema: Whether to register JSON schemas alongside task definitions (code-level default, default=False) Returns: Dict with resolved configuration values @@ -269,6 +272,11 @@ def resolve_worker_config( env_strict = _get_env_value(worker_name, 'strict_schema', bool) resolved['strict_schema'] = env_strict if env_strict is not None else (strict_schema if strict_schema is not None else False) + # Resolve register_schema (default: False) + # Priority: per-worker env > global env (conductor.worker.all) > code-level > default (False) + env_register_schema = _get_env_value(worker_name, 'register_schema', bool) + resolved['register_schema'] = env_register_schema if env_register_schema is not None else (register_schema if register_schema is not None else False) + return resolved diff --git a/src/conductor/client/worker/worker_task.py b/src/conductor/client/worker/worker_task.py index df800749..08afd968 100644 --- a/src/conductor/client/worker/worker_task.py +++ b/src/conductor/client/worker/worker_task.py @@ -7,7 +7,7 @@ def WorkerTask(task_definition_name: str, poll_interval: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None, poll_interval_seconds: int = 0, thread_count: int = 1, register_task_def: bool = False, - poll_timeout: int = 100, lease_extend_enabled: bool = False): + poll_timeout: int = 100, lease_extend_enabled: bool = False, register_schema: Optional[bool] = None): """ Decorator to register a function as a Conductor worker task (legacy CamelCase name). @@ -62,7 +62,7 @@ def worker_task_func(func): register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain, worker_id=worker_id, thread_count=thread_count, register_task_def=register_task_def, poll_timeout=poll_timeout, lease_extend_enabled=lease_extend_enabled, - func=func) + register_schema=register_schema, func=func) @functools.wraps(func) def wrapper_func(*args, **kwargs): @@ -80,7 +80,8 @@ def wrapper_func(*args, **kwargs): def worker_task(task_definition_name: str, poll_interval_millis: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None, thread_count: int = 1, register_task_def: bool = False, poll_timeout: int = 100, lease_extend_enabled: bool = False, - task_def: Optional['TaskDef'] = None, overwrite_task_def: bool = True, strict_schema: bool = False): + task_def: Optional['TaskDef'] = None, overwrite_task_def: bool = True, strict_schema: bool = False, + register_schema: Optional[bool] = None): """ Decorator to register a function as a Conductor worker task. @@ -156,6 +157,14 @@ def worker_task(task_definition_name: str, poll_interval_millis: int = 100, doma - When True: additionalProperties=false (strict validation) - Can be overridden via env: conductor.worker..strict_schema=true + register_schema: Whether to register JSON schemas alongside task definitions. + - Default: None (inherits from Configuration.register_schema or False) + - When True: Input/output JSON schemas are generated from type hints and registered + - When False: Task definition is registered without schemas + - Can be set globally via Configuration(register_schema=True) or env: CONDUCTOR_REGISTER_SCHEMAS=true + - Can be overridden per-worker via env: conductor.worker..register_schema=true + - Priority: per-worker env > global env (conductor.worker.all) > decorator > Configuration > default (False) + Returns: Decorated function that can be called normally or used as a workflow task @@ -189,7 +198,8 @@ def worker_task_func(func): register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain, worker_id=worker_id, thread_count=thread_count, register_task_def=register_task_def, poll_timeout=poll_timeout, lease_extend_enabled=lease_extend_enabled, task_def=task_def, - overwrite_task_def=overwrite_task_def, strict_schema=strict_schema, func=func) + overwrite_task_def=overwrite_task_def, strict_schema=strict_schema, + register_schema=register_schema, func=func) @functools.wraps(func) def wrapper_func(*args, **kwargs): diff --git a/tests/unit/automator/test_task_registration.py b/tests/unit/automator/test_task_registration.py index 1824efd1..91230ece 100644 --- a/tests/unit/automator/test_task_registration.py +++ b/tests/unit/automator/test_task_registration.py @@ -89,7 +89,8 @@ def process_order(order: OrderInfo) -> dict: worker = Worker( task_definition_name='process_order', execute_function=process_order, - register_task_def=True + register_task_def=True, + register_schema=True ) # Setup mocks @@ -153,8 +154,8 @@ def worker_func(name: str) -> str: @patch('conductor.client.automator.task_runner.OrkesMetadataClient') @patch('conductor.client.automator.task_runner.OrkesSchemaClient') - def test_always_registers_schemas(self, mock_schema_client_class, mock_metadata_client_class): - """Schemas are always registered (may overwrite existing).""" + def test_registers_schemas_when_enabled(self, mock_schema_client_class, mock_metadata_client_class): + """Schemas are registered when register_schema=True.""" def worker_func(name: str) -> str: return name @@ -162,7 +163,8 @@ def worker_func(name: str) -> str: worker = Worker( task_definition_name='task_with_schemas', execute_function=worker_func, - register_task_def=True + register_task_def=True, + register_schema=True ) mock_metadata = Mock() @@ -176,12 +178,49 @@ def worker_func(name: str) -> str: task_runner = TaskRunner(worker, self.config) task_runner._TaskRunner__register_task_definition() - # Should always register schemas (2 calls: input and output) + # Should register schemas when register_schema=True (2 calls: input and output) self.assertEqual(mock_schema.register_schema.call_count, 2) # Should register task definition self.assertTrue(mock_metadata.update_task_def.called or mock_metadata.register_task_def.called) + @patch('conductor.client.automator.task_runner.OrkesMetadataClient') + @patch('conductor.client.automator.task_runner.OrkesSchemaClient') + def test_skips_schemas_when_disabled(self, mock_schema_client_class, mock_metadata_client_class): + """Schemas are NOT registered when register_schema=False (default).""" + + def worker_func(name: str) -> str: + return name + + worker = Worker( + task_definition_name='task_no_schemas', + execute_function=worker_func, + register_task_def=True + # register_schema defaults to False + ) + + mock_metadata = Mock() + mock_schema = Mock() + mock_metadata_client_class.return_value = mock_metadata + mock_schema_client_class.return_value = mock_schema + + setup_update_then_register_mock(mock_metadata) + + task_runner = TaskRunner(worker, self.config) + task_runner._TaskRunner__register_task_definition() + + # Schema client should NOT be created (register_schema=False) + mock_schema_client_class.assert_not_called() + mock_schema.register_schema.assert_not_called() + + # Task definition should still be registered + self.assertTrue(mock_metadata.update_task_def.called or mock_metadata.register_task_def.called) + + # TaskDef should have no schema links + task_def = get_registered_or_updated_task_def(mock_metadata) + self.assertIsNone(task_def.input_schema) + self.assertIsNone(task_def.output_schema) + @patch('conductor.client.automator.task_runner.OrkesMetadataClient') def test_registration_without_type_hints(self, mock_metadata_client_class): """When function has no type hints, register task without schemas.""" @@ -250,7 +289,8 @@ def worker_func(user_id: str, count: int = 10) -> dict: worker = Worker( task_definition_name='test_task', execute_function=worker_func, - register_task_def=True + register_task_def=True, + register_schema=True ) mock_metadata = Mock() @@ -364,7 +404,8 @@ def worker_func(user_id: str) -> dict: worker = Worker( task_definition_name='my_task', execute_function=worker_func, - register_task_def=True + register_task_def=True, + register_schema=True ) mock_metadata = Mock() @@ -454,7 +495,8 @@ def worker_func(name: str) -> str: worker = Worker( task_definition_name='test_task', execute_function=worker_func, - register_task_def=True + register_task_def=True, + register_schema=True ) mock_metadata = Mock() @@ -507,7 +549,8 @@ def update_user(user: User) -> dict: worker = Worker( task_definition_name='update_user', execute_function=update_user, - register_task_def=True + register_task_def=True, + register_schema=True ) mock_metadata = Mock() @@ -547,7 +590,8 @@ def long_task() -> Union[dict, TaskInProgress]: worker = Worker( task_definition_name='long_task', execute_function=long_task, - register_task_def=True + register_task_def=True, + register_schema=True ) mock_metadata = Mock() @@ -738,6 +782,7 @@ def worker(user: User) -> dict: task_definition_name='test_task', execute_function=worker, register_task_def=True, + register_schema=True, strict_schema=False # Lenient ) @@ -772,6 +817,7 @@ def worker(name: str) -> str: task_definition_name='strict_task', execute_function=worker, register_task_def=True, + register_schema=True, strict_schema=True # Strict ) diff --git a/tests/unit/configuration/test_configuration.py b/tests/unit/configuration/test_configuration.py index f44807f8..dec1c630 100644 --- a/tests/unit/configuration/test_configuration.py +++ b/tests/unit/configuration/test_configuration.py @@ -34,6 +34,39 @@ def test_initialization_with_server_api_url(self): 'https://developer.orkescloud.com/api' ) + def test_register_schema_default_is_none(self): + """register_schema defaults to None when not set anywhere""" + configuration = Configuration(server_api_url='http://localhost:8080/api') + self.assertIsNone(configuration.register_schema) + + def test_register_schema_explicit_false(self): + """register_schema can be set explicitly to False""" + configuration = Configuration(server_api_url='http://localhost:8080/api', register_schema=False) + self.assertFalse(configuration.register_schema) + + def test_register_schema_explicit_true(self): + """register_schema can be set explicitly to True""" + configuration = Configuration(server_api_url='http://localhost:8080/api', register_schema=True) + self.assertTrue(configuration.register_schema) + + @mock.patch.dict(os.environ, {"CONDUCTOR_REGISTER_SCHEMAS": "false"}) + def test_register_schema_from_env_var(self): + """register_schema reads CONDUCTOR_REGISTER_SCHEMAS env var""" + configuration = Configuration(server_api_url='http://localhost:8080/api') + self.assertFalse(configuration.register_schema) + + @mock.patch.dict(os.environ, {"CONDUCTOR_REGISTER_SCHEMAS": "true"}) + def test_register_schema_from_env_var_true(self): + """register_schema reads CONDUCTOR_REGISTER_SCHEMAS env var as True""" + configuration = Configuration(server_api_url='http://localhost:8080/api') + self.assertTrue(configuration.register_schema) + + @mock.patch.dict(os.environ, {"CONDUCTOR_REGISTER_SCHEMAS": "false"}) + def test_register_schema_explicit_overrides_env(self): + """Explicit register_schema param takes precedence over env var""" + configuration = Configuration(server_api_url='http://localhost:8080/api', register_schema=True) + self.assertTrue(configuration.register_schema) + def test_initialization_with_basic_auth_server_api_url(self): configuration = Configuration( server_api_url="https://user:password@developer.orkescloud.com/api" diff --git a/tests/unit/worker/test_worker_config.py b/tests/unit/worker/test_worker_config.py index 0610894d..9e9c59fb 100644 --- a/tests/unit/worker/test_worker_config.py +++ b/tests/unit/worker/test_worker_config.py @@ -294,6 +294,56 @@ def test_all_properties_resolvable(self): self.assertFalse(config['lease_extend_enabled']) + def test_register_schema_defaults_to_false(self): + """Test register_schema defaults to False when not set anywhere""" + config = resolve_worker_config( + worker_name='test_worker', + poll_interval=1000 + ) + self.assertFalse(config['register_schema']) + + def test_register_schema_code_level_false(self): + """Test register_schema can be set to False at code level""" + config = resolve_worker_config( + worker_name='test_worker', + register_schema=False + ) + self.assertFalse(config['register_schema']) + + def test_register_schema_env_override_per_worker(self): + """Test register_schema can be overridden per worker via env""" + os.environ['conductor.worker.my_task.register_schema'] = 'false' + + config = resolve_worker_config( + worker_name='my_task', + register_schema=True # Code says True + ) + # Env override wins + self.assertFalse(config['register_schema']) + + def test_register_schema_env_override_global(self): + """Test register_schema can be overridden globally via env""" + os.environ['conductor.worker.all.register_schema'] = 'false' + + config = resolve_worker_config( + worker_name='test_worker', + register_schema=None # Not set in code + ) + self.assertFalse(config['register_schema']) + + def test_register_schema_per_worker_env_beats_global_env(self): + """Test per-worker env var takes precedence over global env var""" + os.environ['conductor.worker.all.register_schema'] = 'false' + os.environ['conductor.worker.special_task.register_schema'] = 'true' + + config = resolve_worker_config( + worker_name='special_task', + register_schema=None + ) + # Per-worker override wins + self.assertTrue(config['register_schema']) + + class TestWorkerConfigIntegration(unittest.TestCase): """Integration tests for worker configuration in realistic scenarios"""