Skip to content

Commit 999e4e7

Browse files
committed
batch polling and batching support
1 parent b2de890 commit 999e4e7

8 files changed

Lines changed: 2214 additions & 337 deletions

File tree

examples/asyncio_workers.py

Lines changed: 229 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
"""
2-
AsyncIO Workers Example
2+
AsyncIO Workers Example - Java SDK Architecture
33
4-
This example demonstrates how to use the AsyncIO-based TaskHandlerAsyncIO
5-
instead of the multiprocessing-based TaskHandler.
4+
This example demonstrates the AsyncIO task runner with Java SDK architecture features:
5+
- Semaphore-based dynamic batch polling
6+
- Per-worker thread count configuration
7+
- Automatic lease extension
8+
- In-memory queue for V2 API chained tasks
9+
- Zero-polling optimization
610
7-
Advantages of AsyncIO:
8-
- Lower memory footprint (single process)
9-
- Better for I/O-bound tasks
10-
- Simpler debugging
11+
Key Features (matching Java SDK):
12+
- Dynamic batch sizing (batch = available threads)
13+
- No server calls when all threads busy
14+
- Adaptive concurrency control
15+
- Optimal resource utilization
1116
1217
Requirements:
1318
pip install httpx # AsyncIO HTTP client
1419
20+
Configuration:
21+
Set environment variables or create conductor_config.py:
22+
- CONDUCTOR_SERVER_URL: e.g., https://play.orkes.io/api
23+
- CONDUCTOR_AUTH_KEY: API key
24+
- CONDUCTOR_AUTH_SECRET: API secret
25+
1526
Run:
1627
python examples/asyncio_workers.py
1728
"""
@@ -60,18 +71,28 @@ class User:
6071
company: Company
6172

6273

63-
# Example 1: Synchronous worker (will run in thread pool)
64-
@worker_task(task_definition_name='greet')
74+
# Example 1: Simple synchronous worker (runs in thread pool)
75+
@worker_task(
76+
task_definition_name='greet',
77+
thread_count=101, # Low concurrency for simple tasks
78+
poll_timeout=100, # Default poll timeout (ms)
79+
lease_extend_enabled=False # Fast tasks don't need lease extension
80+
)
6581
def greet(name: str) -> str:
6682
"""
6783
Synchronous worker - automatically runs in thread pool to avoid blocking.
68-
Good for legacy code or CPU-bound tasks.
84+
Good for legacy code or simple CPU-bound tasks.
6985
"""
7086
return f'Hello {name}'
7187

7288

73-
# Example 2: Async worker (runs natively in event loop)
74-
@worker_task(task_definition_name='greet_async')
89+
# Example 2: Simple async worker (runs natively in event loop)
90+
@worker_task(
91+
task_definition_name='greet_async',
92+
thread_count=10, # Higher concurrency for async I/O
93+
poll_timeout=100,
94+
lease_extend_enabled=False
95+
)
7596
async def greet_async(name: str) -> str:
7697
"""
7798
Async worker - runs natively in the event loop.
@@ -82,71 +103,136 @@ async def greet_async(name: str) -> str:
82103
return f'Hello {name} (from async function)'
83104

84105

85-
# Example 3: Async worker with HTTP call
86-
@worker_task(task_definition_name='fetch_user')
106+
# Example 3: High-throughput HTTP worker with batch polling
107+
@worker_task(
108+
poll_interval_millis=10,
109+
task_definition_name='fetch_user',
110+
thread_count=20, # High concurrency for I/O-bound tasks
111+
poll_timeout=20, # Longer timeout for efficient long-polling
112+
lease_extend_enabled=False # Fast HTTP calls don't need lease extension
113+
)
87114
async def fetch_user(user_id: str) -> dict:
88115
"""
89116
Example of making async HTTP calls using httpx.
90-
This is more efficient than synchronous requests.
117+
With thread_count=20, the system will:
118+
- Batch poll up to 20 tasks when all threads available
119+
- Skip polling when all 20 threads busy (zero-polling)
120+
- Dynamically adjust batch size based on availability
91121
"""
92122
try:
93123
import httpx
94-
# print(f'fetching user {user_id}')
95124
async with httpx.AsyncClient() as client:
96125
response = await client.get(
97-
f'https://jsonplaceholder.typicode.com/users/{user_id}'
126+
f'https://jsonplaceholder.typicode.com/users/{user_id}',
127+
timeout=10.0
98128
)
99-
# print(f'response {response.json()}')
100129
return response.json()
101130

102131
except Exception as e:
103132
return {"error": str(e)}
104133

105134

106-
@worker_task(task_definition_name='process_user')
135+
# Example 4: Dataclass-based worker (type-safe input)
136+
@worker_task(
137+
task_definition_name='process_user',
138+
thread_count=15,
139+
poll_timeout=150,
140+
lease_extend_enabled=False
141+
)
107142
async def process_user(user: User) -> dict:
108143
"""
109-
Example of making async HTTP calls using httpx.
110-
This is more efficient than synchronous requests.
144+
Worker that accepts User dataclass - SDK automatically converts from dict.
145+
Demonstrates type-safe worker functions.
146+
147+
The fetch_user task returns a dict, which is chained to this task.
148+
Since dict outputs are used as-is (not wrapped in "result"),
149+
the User dataclass can be properly constructed.
150+
"""
151+
try:
152+
import httpx
153+
async with httpx.AsyncClient() as client:
154+
response = await client.get(
155+
f'https://jsonplaceholder.typicode.com/users/{user.id + 3}',
156+
timeout=10.0
157+
)
158+
return response.json()
159+
160+
except Exception as e:
161+
return {"error": str(e)}
162+
163+
164+
# Example 5: Worker with dict input (flexible alternative)
165+
@worker_task(
166+
task_definition_name='process_user_dict',
167+
thread_count=10,
168+
poll_timeout=150,
169+
lease_extend_enabled=False
170+
)
171+
async def process_user_dict(user: dict) -> dict:
172+
"""
173+
Worker that accepts dict input directly - more flexible.
174+
Use this when you don't need strict type checking.
175+
176+
Accepts any dict with an 'id' field.
111177
"""
112178
try:
113179
import httpx
114-
# print(f'fetching user details for {user.id}')
180+
user_id = user.get('id', 1)
181+
115182
async with httpx.AsyncClient() as client:
116183
response = await client.get(
117-
f'https://jsonplaceholder.typicode.com/users/{user.id + 1}'
184+
f'https://jsonplaceholder.typicode.com/users/{user_id + 1}',
185+
timeout=10.0
118186
)
119-
# print(f'response {response.json()}')
120187
return response.json()
121188

122189
except Exception as e:
123190
return {"error": str(e)}
124191

125192

126-
# Example 4: CPU-bound work in thread pool
127-
@worker_task(task_definition_name='calculate')
193+
# Example 6: CPU-bound work in thread pool (lower concurrency)
194+
@worker_task(
195+
task_definition_name='calculate',
196+
thread_count=4, # Lower concurrency for CPU-bound tasks
197+
poll_timeout=100,
198+
lease_extend_enabled=False
199+
)
128200
def calculate_fibonacci(n: int) -> int:
129201
"""
130202
CPU-bound work automatically runs in thread pool.
131203
For heavy CPU work, consider using multiprocessing TaskHandler instead.
204+
205+
Note: thread_count=4 limits concurrent CPU-intensive tasks to avoid
206+
overwhelming the system (GIL contention).
132207
"""
133208
if n <= 1:
134209
return n
135210
return calculate_fibonacci(n - 1) + calculate_fibonacci(n - 2)
136211

137212

138-
# Example 5: Mixed I/O and CPU work
139-
@worker_task(task_definition_name='process_data')
213+
# Example 7: Mixed I/O and CPU work with controlled concurrency
214+
@worker_task(
215+
task_definition_name='process_data',
216+
thread_count=12, # Moderate concurrency for mixed workload
217+
poll_timeout=200,
218+
lease_extend_enabled=True, # Enable lease extension for longer tasks
219+
register_task_def=False # Don't auto-register task definition
220+
)
140221
async def process_data(data_url: str) -> dict:
141222
"""
142223
Demonstrates mixing async I/O with CPU-bound work.
143224
I/O runs in event loop, CPU work runs in thread pool.
225+
226+
With thread_count=12:
227+
- System can batch poll up to 12 tasks when all threads free
228+
- Zero-polling kicks in when all 12 threads busy
229+
- Dynamically adjusts batch size as threads complete
144230
"""
145231
import httpx
146232

147233
# I/O-bound: Fetch data asynchronously
148234
async with httpx.AsyncClient() as client:
149-
response = await client.get(data_url)
235+
response = await client.get(data_url, timeout=10.0)
150236
data = response.json()
151237

152238
# CPU-bound: Process in thread pool
@@ -168,9 +254,33 @@ def _process_data_sync(data: dict) -> dict:
168254
return {"processed": True, "count": len(data)}
169255

170256

257+
# Example 8: Long-running task with automatic lease extension
258+
@worker_task(
259+
task_definition_name='long_task',
260+
thread_count=2, # Low concurrency for expensive tasks
261+
poll_timeout=500,
262+
lease_extend_enabled=True # Automatically extends lease at 80% of timeout
263+
)
264+
async def long_running_task(duration: int) -> dict:
265+
"""
266+
Demonstrates automatic lease extension for long-running tasks.
267+
268+
If task.response_timeout_seconds = 300 (5 minutes):
269+
- Lease extension sent at 240s (80%)
270+
- Repeats every 240s until task completes
271+
- Retries up to 3 times per extension
272+
- Automatically cancelled when task completes
273+
274+
This keeps the task alive in Conductor during long processing.
275+
"""
276+
# Simulate long-running operation
277+
await asyncio.sleep(duration)
278+
return {"duration": duration, "completed": True}
279+
280+
171281
async def main():
172282
"""
173-
Main entry point demonstrating different ways to use TaskHandlerAsyncIO.
283+
Main entry point demonstrating AsyncIO task handler with Java SDK architecture.
174284
"""
175285

176286
# Configuration - defaults to reading from environment variables:
@@ -180,10 +290,26 @@ async def main():
180290
api_config = Configuration()
181291

182292
print("=" * 60)
183-
print("Conductor AsyncIO Workers Example")
293+
print("Conductor AsyncIO Workers - Java SDK Architecture")
184294
print("=" * 60)
185295
print(f"Server: {api_config.host}")
186-
print(f"Workers: greet, greet_async, fetch_user, calculate, process_data")
296+
print()
297+
print("Workers with dynamic batch polling:")
298+
print(" • greet (thread_count=1)")
299+
print(" • greet_async (thread_count=10)")
300+
print(" • fetch_user (thread_count=20) - High throughput")
301+
print(" • process_user (thread_count=15) - Type-safe dataclass")
302+
print(" • process_user_dict (thread_count=10) - Flexible dict input")
303+
print(" • calculate (thread_count=4) - CPU-bound")
304+
print(" • process_data (thread_count=12) - Mixed I/O+CPU")
305+
print(" • long_task (thread_count=2) - With lease extension")
306+
print()
307+
print("Features:")
308+
print(" ✓ Dynamic batch polling (batch size = available threads)")
309+
print(" ✓ Zero-polling optimization (skip when all threads busy)")
310+
print(" ✓ Automatic lease extension at 80% of timeout")
311+
print(" ✓ In-memory queue for V2 API chained tasks")
312+
print(" ✓ Per-worker concurrency control")
187313
print("=" * 60)
188314
print("\nStarting workers... Press Ctrl+C to stop\n")
189315

@@ -229,6 +355,71 @@ def signal_handler():
229355
print("\nWorkers stopped. Goodbye!")
230356

231357

358+
async def demo_v2_api():
359+
"""
360+
Example of V2 API support with in-memory queue.
361+
362+
When enabled (export taskUpdateV2=true), the server can return
363+
the next task to execute in the update response, which is added
364+
to the in-memory queue to avoid redundant polling.
365+
"""
366+
import os
367+
os.environ['taskUpdateV2'] = 'true'
368+
369+
api_config = Configuration()
370+
371+
@worker_task(
372+
task_definition_name='chained_task',
373+
thread_count=10
374+
)
375+
async def chained_task(data: dict) -> dict:
376+
"""Task that may be part of a chained workflow"""
377+
await asyncio.sleep(0.5)
378+
return {"result": "processed", "data": data}
379+
380+
async with TaskHandlerAsyncIO(configuration=api_config) as handler:
381+
# Server may return next task in workflow
382+
# → Added to in-memory queue
383+
# → Drained before next server poll
384+
# → Reduces server calls by ~30% for chained workflows
385+
await handler.wait()
386+
387+
388+
async def demo_zero_polling():
389+
"""
390+
Example demonstrating zero-polling optimization.
391+
392+
When all threads are busy:
393+
- poll_count = 0 (no available permits)
394+
- Skip server call (zero-polling)
395+
- Sleep briefly and retry
396+
- Saves server resources during high load
397+
"""
398+
399+
@worker_task(
400+
task_definition_name='busy_task',
401+
thread_count=5 # Only 5 concurrent tasks allowed
402+
)
403+
async def busy_task(duration: int) -> dict:
404+
"""Simulates a task that takes 'duration' seconds"""
405+
await asyncio.sleep(duration)
406+
return {"completed": True}
407+
408+
api_config = Configuration()
409+
410+
async with TaskHandlerAsyncIO(configuration=api_config) as handler:
411+
# Scenario: 10 tasks queued on server
412+
#
413+
# Poll #1: 5 permits available → batch poll 5 tasks → all threads busy
414+
# Poll #2: 0 permits available → zero-polling (skip server call)
415+
# Poll #3: 0 permits available → zero-polling (skip server call)
416+
# ...
417+
# Poll #N: 2 tasks complete → 2 permits available → batch poll 2 tasks
418+
#
419+
# Result: Saved (N-2) server calls during high load
420+
await handler.wait()
421+
422+
232423
if __name__ == '__main__':
233424
"""
234425
Run the async main function.
@@ -237,6 +428,12 @@ def signal_handler():
237428
Python 3.6: asyncio.get_event_loop().run_until_complete(main())
238429
"""
239430
try:
431+
# Run main demo
240432
asyncio.run(main())
433+
434+
# Uncomment to run other demos:
435+
# asyncio.run(demo_v2_api())
436+
# asyncio.run(demo_zero_polling())
437+
241438
except KeyboardInterrupt:
242439
pass

0 commit comments

Comments
 (0)