Skip to content

Commit 2ce1a54

Browse files
authored
Make schema registration optional (default: off) (#399)
1 parent 70ba6e7 commit 2ce1a54

10 files changed

Lines changed: 221 additions & 25 deletions

File tree

src/conductor/client/automator/async_task_runner.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,18 @@ async def __async_register_task_definition(self) -> None:
232232
output_schema_name = None
233233
schema_registry_available = True
234234

235-
if hasattr(self.worker, 'execute_function'):
235+
# Check if schema registration is enabled for this worker
236+
register_schema = getattr(self.worker, 'register_schema', False)
237+
# Also check global Configuration default
238+
if hasattr(self.configuration, 'register_schema') and self.configuration.register_schema is not None:
239+
# Worker-level setting takes precedence if explicitly set (not default)
240+
if not hasattr(self.worker, 'register_schema'):
241+
register_schema = self.configuration.register_schema
242+
243+
if not register_schema:
244+
logger.debug(f"Schema registration disabled for {task_name} (register_schema=False)")
245+
246+
if register_schema and hasattr(self.worker, 'execute_function'):
236247
logger.debug(f"Generating JSON schemas from function signature...")
237248
# Pass strict_schema flag to control additionalProperties
238249
strict_mode = getattr(self.worker, 'strict_schema', False)
@@ -314,6 +325,8 @@ async def __async_register_task_definition(self) -> None:
314325
logger.debug(f"Could not register schemas for {task_name}: {e}")
315326
else:
316327
logger.debug(f" ⚠ No schemas generated (unable to analyze function signature)")
328+
elif not register_schema:
329+
pass # Already logged above
317330
else:
318331
logger.debug(f" ⚠ Class-based worker (no execute_function) - registering task without schemas")
319332

src/conductor/client/automator/task_handler.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ def _run_async_worker_process(
7373
def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id: str, func,
7474
thread_count: int = 1, register_task_def: bool = False,
7575
poll_timeout: int = 100, lease_extend_enabled: bool = False, task_def: Optional['TaskDef'] = None,
76-
overwrite_task_def: bool = True, strict_schema: bool = False):
76+
overwrite_task_def: bool = True, strict_schema: bool = False,
77+
register_schema: Optional[bool] = None):
7778
logger.debug("decorated %s", name)
7879
_decorated_functions[(name, domain)] = {
7980
"func": func,
@@ -86,7 +87,8 @@ def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id:
8687
"lease_extend_enabled": lease_extend_enabled,
8788
"task_def": task_def,
8889
"overwrite_task_def": overwrite_task_def,
89-
"strict_schema": strict_schema
90+
"strict_schema": strict_schema,
91+
"register_schema": register_schema
9092
}
9193

9294

@@ -112,7 +114,8 @@ def get_registered_workers() -> List[Worker]:
112114
paused=False, # Always default to False, only env vars can set to True
113115
task_def_template=record.get("task_def"), # Optional TaskDef configuration
114116
overwrite_task_def=record.get("overwrite_task_def", True),
115-
strict_schema=record.get("strict_schema", False)
117+
strict_schema=record.get("strict_schema", False),
118+
register_schema=record.get("register_schema") if record.get("register_schema") is not None else False
116119
)
117120
workers.append(worker)
118121
return workers
@@ -254,9 +257,15 @@ def __init__(
254257
'poll_timeout': record.get("poll_timeout", 100),
255258
'lease_extend_enabled': record.get("lease_extend_enabled", True),
256259
'overwrite_task_def': record.get("overwrite_task_def", True),
257-
'strict_schema': record.get("strict_schema", False)
260+
'strict_schema': record.get("strict_schema", False),
261+
'register_schema': record.get("register_schema")
258262
}
259263

264+
# Apply global Configuration.register_schema as fallback when
265+
# the decorator doesn't set it explicitly (None).
266+
if code_config['register_schema'] is None and hasattr(configuration, 'register_schema') and configuration.register_schema is not None:
267+
code_config['register_schema'] = configuration.register_schema
268+
260269
# Resolve configuration with environment variable overrides
261270
resolved_config = resolve_worker_config(
262271
worker_name=task_def_name,
@@ -275,7 +284,8 @@ def __init__(
275284
lease_extend_enabled=resolved_config['lease_extend_enabled'],
276285
task_def_template=record.get("task_def"), # Pass TaskDef configuration
277286
overwrite_task_def=resolved_config.get('overwrite_task_def', True),
278-
strict_schema=resolved_config.get('strict_schema', False))
287+
strict_schema=resolved_config.get('strict_schema', False),
288+
register_schema=resolved_config.get('register_schema', False))
279289
logger.debug("created worker with name=%s and domain=%s", task_def_name, resolved_config['domain'])
280290
workers.append(worker)
281291

src/conductor/client/automator/task_runner.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,18 @@ def __register_task_definition(self) -> None:
220220
output_schema_name = None
221221
schema_registry_available = True
222222

223-
if hasattr(self.worker, 'execute_function'):
223+
# Check if schema registration is enabled for this worker
224+
register_schema = getattr(self.worker, 'register_schema', False)
225+
# Also check global Configuration default
226+
if hasattr(self.configuration, 'register_schema') and self.configuration.register_schema is not None:
227+
# Worker-level setting takes precedence if explicitly set (not default)
228+
if not hasattr(self.worker, 'register_schema'):
229+
register_schema = self.configuration.register_schema
230+
231+
if not register_schema:
232+
logger.debug(f"Schema registration disabled for {task_name} (register_schema=False)")
233+
234+
if register_schema and hasattr(self.worker, 'execute_function'):
224235
logger.debug(f"Generating JSON schemas from function signature...")
225236
# Pass strict_schema flag to control additionalProperties
226237
strict_mode = getattr(self.worker, 'strict_schema', False)
@@ -303,6 +314,8 @@ def __register_task_definition(self) -> None:
303314
logger.debug(f"Could not register schemas for {task_name}: {e}")
304315
else:
305316
logger.debug(f" ⚠ No schemas generated (unable to analyze function signature)")
317+
elif not register_schema:
318+
pass # Already logged above
306319
else:
307320
logger.debug(f" ⚠ Class-based worker (no execute_function) - registering task without schemas")
308321

src/conductor/client/configuration/configuration.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ def __init__(
3030
debug: bool = False,
3131
authentication_settings: AuthenticationSettings = None,
3232
server_api_url: Optional[str] = None,
33-
auth_token_ttl_min: int = 45
33+
auth_token_ttl_min: int = 45,
34+
register_schema: Optional[bool] = None
3435
):
3536
if server_api_url is not None:
3637
self.host = server_api_url
@@ -84,6 +85,16 @@ def __init__(
8485
# Provide an alterative to requests.Session() for HTTP connection.
8586
self.http_connection = None
8687

88+
# Global default for schema registration (None = defer to per-worker config)
89+
if register_schema is not None:
90+
self.register_schema = register_schema
91+
else:
92+
env_val = os.getenv("CONDUCTOR_REGISTER_SCHEMAS")
93+
if env_val is not None:
94+
self.register_schema = env_val.strip().lower() in ('true', '1', 'yes', 'on')
95+
else:
96+
self.register_schema = None # No global override; per-worker default (True) applies
97+
8798
# not updated yet
8899
self.token_update_time = 0
89100
self.auth_token_ttl_msec = auth_token_ttl_min * 60 * 1000

src/conductor/client/worker/worker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ def __init__(self,
302302
paused: bool = False,
303303
task_def_template: Optional['TaskDef'] = None,
304304
overwrite_task_def: bool = True,
305-
strict_schema: bool = False
305+
strict_schema: bool = False,
306+
register_schema: bool = False
306307
) -> Self:
307308
super().__init__(task_definition_name)
308309
self.api_client = ApiClient()
@@ -324,6 +325,7 @@ def __init__(self,
324325
self.task_def_template = task_def_template # Optional TaskDef configuration
325326
self.overwrite_task_def = overwrite_task_def # Whether to overwrite existing task definitions
326327
self.strict_schema = strict_schema # Whether to enforce strict schema (additionalProperties=false)
328+
self.register_schema = register_schema # Whether to register JSON schemas alongside task definitions
327329

328330
# Initialize background event loop for async workers
329331
self._background_loop = None

src/conductor/client/worker/worker_config.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ def process_order(order_id: str):
3838
'lease_extend_enabled': 'lease_extend_enabled',
3939
'paused': 'paused',
4040
'overwrite_task_def': 'overwrite_task_def',
41-
'strict_schema': 'strict_schema'
41+
'strict_schema': 'strict_schema',
42+
'register_schema': 'register_schema'
4243
}
4344

4445

@@ -186,7 +187,8 @@ def resolve_worker_config(
186187
lease_extend_enabled: Optional[bool] = None,
187188
paused: Optional[bool] = None,
188189
overwrite_task_def: Optional[bool] = None,
189-
strict_schema: Optional[bool] = None
190+
strict_schema: Optional[bool] = None,
191+
register_schema: Optional[bool] = None
190192
) -> dict:
191193
"""
192194
Resolve worker configuration with hierarchical override.
@@ -208,6 +210,7 @@ def resolve_worker_config(
208210
paused: Whether worker is paused (code-level default)
209211
overwrite_task_def: Whether to overwrite existing task definitions (code-level default, default=True)
210212
strict_schema: Whether to set additionalProperties=false in schemas (code-level default, default=False)
213+
register_schema: Whether to register JSON schemas alongside task definitions (code-level default, default=False)
211214
212215
Returns:
213216
Dict with resolved configuration values
@@ -269,6 +272,11 @@ def resolve_worker_config(
269272
env_strict = _get_env_value(worker_name, 'strict_schema', bool)
270273
resolved['strict_schema'] = env_strict if env_strict is not None else (strict_schema if strict_schema is not None else False)
271274

275+
# Resolve register_schema (default: False)
276+
# Priority: per-worker env > global env (conductor.worker.all) > code-level > default (False)
277+
env_register_schema = _get_env_value(worker_name, 'register_schema', bool)
278+
resolved['register_schema'] = env_register_schema if env_register_schema is not None else (register_schema if register_schema is not None else False)
279+
272280
return resolved
273281

274282

src/conductor/client/worker/worker_task.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
def WorkerTask(task_definition_name: str, poll_interval: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None,
99
poll_interval_seconds: int = 0, thread_count: int = 1, register_task_def: bool = False,
10-
poll_timeout: int = 100, lease_extend_enabled: bool = False):
10+
poll_timeout: int = 100, lease_extend_enabled: bool = False, register_schema: Optional[bool] = None):
1111
"""
1212
Decorator to register a function as a Conductor worker task (legacy CamelCase name).
1313
@@ -62,7 +62,7 @@ def worker_task_func(func):
6262
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
6363
worker_id=worker_id, thread_count=thread_count, register_task_def=register_task_def,
6464
poll_timeout=poll_timeout, lease_extend_enabled=lease_extend_enabled,
65-
func=func)
65+
register_schema=register_schema, func=func)
6666

6767
@functools.wraps(func)
6868
def wrapper_func(*args, **kwargs):
@@ -80,7 +80,8 @@ def wrapper_func(*args, **kwargs):
8080

8181
def worker_task(task_definition_name: str, poll_interval_millis: int = 100, domain: Optional[str] = None, worker_id: Optional[str] = None,
8282
thread_count: int = 1, register_task_def: bool = False, poll_timeout: int = 100, lease_extend_enabled: bool = False,
83-
task_def: Optional['TaskDef'] = None, overwrite_task_def: bool = True, strict_schema: bool = False):
83+
task_def: Optional['TaskDef'] = None, overwrite_task_def: bool = True, strict_schema: bool = False,
84+
register_schema: Optional[bool] = None):
8485
"""
8586
Decorator to register a function as a Conductor worker task.
8687
@@ -156,6 +157,14 @@ def worker_task(task_definition_name: str, poll_interval_millis: int = 100, doma
156157
- When True: additionalProperties=false (strict validation)
157158
- Can be overridden via env: conductor.worker.<name>.strict_schema=true
158159
160+
register_schema: Whether to register JSON schemas alongside task definitions.
161+
- Default: None (inherits from Configuration.register_schema or False)
162+
- When True: Input/output JSON schemas are generated from type hints and registered
163+
- When False: Task definition is registered without schemas
164+
- Can be set globally via Configuration(register_schema=True) or env: CONDUCTOR_REGISTER_SCHEMAS=true
165+
- Can be overridden per-worker via env: conductor.worker.<name>.register_schema=true
166+
- Priority: per-worker env > global env (conductor.worker.all) > decorator > Configuration > default (False)
167+
159168
Returns:
160169
Decorated function that can be called normally or used as a workflow task
161170
@@ -189,7 +198,8 @@ def worker_task_func(func):
189198
register_decorated_fn(name=task_definition_name, poll_interval=poll_interval_millis, domain=domain,
190199
worker_id=worker_id, thread_count=thread_count, register_task_def=register_task_def,
191200
poll_timeout=poll_timeout, lease_extend_enabled=lease_extend_enabled, task_def=task_def,
192-
overwrite_task_def=overwrite_task_def, strict_schema=strict_schema, func=func)
201+
overwrite_task_def=overwrite_task_def, strict_schema=strict_schema,
202+
register_schema=register_schema, func=func)
193203

194204
@functools.wraps(func)
195205
def wrapper_func(*args, **kwargs):

0 commit comments

Comments
 (0)