Skip to content

Commit 64f11d2

Browse files
committed
initial commit for asyncio
1 parent 28c133c commit 64f11d2

13 files changed

Lines changed: 5424 additions & 4 deletions

ASYNCIO_TEST_COVERAGE.md

Lines changed: 416 additions & 0 deletions
Large diffs are not rendered by default.

WORKER_CONCURRENCY_DESIGN.md

Lines changed: 1776 additions & 0 deletions
Large diffs are not rendered by default.

examples/asyncio_workers.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
"""
2+
AsyncIO Workers Example
3+
4+
This example demonstrates how to use the AsyncIO-based TaskHandlerAsyncIO
5+
instead of the multiprocessing-based TaskHandler.
6+
7+
Advantages of AsyncIO:
8+
- Lower memory footprint (single process)
9+
- Better for I/O-bound tasks
10+
- Simpler debugging
11+
12+
Requirements:
13+
pip install httpx # AsyncIO HTTP client
14+
15+
Run:
16+
python examples/asyncio_workers.py
17+
"""
18+
19+
import asyncio
20+
import json
21+
import signal
22+
from conductor.client.automator.task_handler_asyncio import TaskHandlerAsyncIO
23+
from conductor.client.configuration.configuration import Configuration
24+
from conductor.client.worker.worker_task import worker_task
25+
26+
from dataclasses import dataclass
27+
28+
29+
@dataclass
30+
class Geo:
31+
lat: str
32+
lng: str
33+
34+
35+
@dataclass
36+
class Address:
37+
street: str
38+
suite: str
39+
city: str
40+
zipcode: str
41+
geo: Geo
42+
43+
44+
@dataclass
45+
class Company:
46+
name: str
47+
catchPhrase: str
48+
bs: str
49+
50+
51+
@dataclass
52+
class User:
53+
id: int
54+
name: str
55+
username: str
56+
email: str
57+
address: Address
58+
phone: str
59+
website: str
60+
company: Company
61+
62+
63+
# Example 1: Synchronous worker (will run in thread pool)
64+
@worker_task(task_definition_name='greet')
65+
def greet(name: str) -> str:
66+
"""
67+
Synchronous worker - automatically runs in thread pool to avoid blocking.
68+
Good for legacy code or CPU-bound tasks.
69+
"""
70+
return f'Hello {name}'
71+
72+
73+
# Example 2: Async worker (runs natively in event loop)
74+
@worker_task(task_definition_name='greet_async')
75+
async def greet_async(name: str) -> str:
76+
"""
77+
Async worker - runs natively in the event loop.
78+
Perfect for I/O-bound tasks like HTTP calls, DB queries, etc.
79+
"""
80+
# Simulate async I/O operation
81+
await asyncio.sleep(0.1)
82+
return f'Hello {name} (from async function)'
83+
84+
85+
# Example 3: Async worker with HTTP call
86+
@worker_task(task_definition_name='fetch_user')
87+
async def fetch_user(user_id: str) -> dict:
88+
"""
89+
Example of making async HTTP calls using httpx.
90+
This is more efficient than synchronous requests.
91+
"""
92+
try:
93+
import httpx
94+
print(f'fetching user {user_id}')
95+
async with httpx.AsyncClient() as client:
96+
response = await client.get(
97+
f'https://jsonplaceholder.typicode.com/users/{user_id}'
98+
)
99+
print(f'response {response.json()}')
100+
return response.json()
101+
102+
except Exception as e:
103+
return {"error": str(e)}
104+
105+
106+
@worker_task(task_definition_name='process_user')
107+
async def process_user(user: User) -> dict:
108+
"""
109+
Example of making async HTTP calls using httpx.
110+
This is more efficient than synchronous requests.
111+
"""
112+
try:
113+
import httpx
114+
print(f'fetching user details for {user.id}')
115+
async with httpx.AsyncClient() as client:
116+
response = await client.get(
117+
f'https://jsonplaceholder.typicode.com/users/{user.id + 1}'
118+
)
119+
print(f'response {response.json()}')
120+
return response.json()
121+
122+
except Exception as e:
123+
return {"error": str(e)}
124+
125+
126+
# Example 4: CPU-bound work in thread pool
127+
@worker_task(task_definition_name='calculate')
128+
def calculate_fibonacci(n: int) -> int:
129+
"""
130+
CPU-bound work automatically runs in thread pool.
131+
For heavy CPU work, consider using multiprocessing TaskHandler instead.
132+
"""
133+
if n <= 1:
134+
return n
135+
return calculate_fibonacci(n - 1) + calculate_fibonacci(n - 2)
136+
137+
138+
# Example 5: Mixed I/O and CPU work
139+
@worker_task(task_definition_name='process_data')
140+
async def process_data(data_url: str) -> dict:
141+
"""
142+
Demonstrates mixing async I/O with CPU-bound work.
143+
I/O runs in event loop, CPU work runs in thread pool.
144+
"""
145+
import httpx
146+
147+
# I/O-bound: Fetch data asynchronously
148+
async with httpx.AsyncClient() as client:
149+
response = await client.get(data_url)
150+
data = response.json()
151+
152+
# CPU-bound: Process in thread pool
153+
loop = asyncio.get_running_loop()
154+
result = await loop.run_in_executor(
155+
None, # Default thread pool
156+
_process_data_sync,
157+
data
158+
)
159+
160+
return result
161+
162+
163+
def _process_data_sync(data: dict) -> dict:
164+
"""Helper function for CPU-bound processing"""
165+
# Simulated CPU-intensive work
166+
import time
167+
time.sleep(0.1)
168+
return {"processed": True, "count": len(data)}
169+
170+
171+
async def main():
172+
"""
173+
Main entry point demonstrating different ways to use TaskHandlerAsyncIO.
174+
"""
175+
176+
# Configuration - defaults to reading from environment variables:
177+
# - CONDUCTOR_SERVER_URL: e.g., https://play.orkes.io/api
178+
# - CONDUCTOR_AUTH_KEY: API key
179+
# - CONDUCTOR_AUTH_SECRET: API secret
180+
api_config = Configuration()
181+
182+
print("=" * 60)
183+
print("Conductor AsyncIO Workers Example")
184+
print("=" * 60)
185+
print(f"Server: {api_config.host}")
186+
print(f"Workers: greet, greet_async, fetch_user, calculate, process_data")
187+
print("=" * 60)
188+
print("\nStarting workers... Press Ctrl+C to stop\n")
189+
190+
# Option 1: Using async context manager (recommended)
191+
try:
192+
async with TaskHandlerAsyncIO(configuration=api_config) as task_handler:
193+
# Set up graceful shutdown on SIGTERM
194+
loop = asyncio.get_running_loop()
195+
196+
def signal_handler():
197+
print("\n\nReceived shutdown signal, stopping workers...")
198+
loop.create_task(task_handler.stop())
199+
200+
# Register signal handlers
201+
for sig in (signal.SIGTERM, signal.SIGINT):
202+
loop.add_signal_handler(sig, signal_handler)
203+
204+
# Wait for workers to complete (blocks until stopped)
205+
await task_handler.wait()
206+
207+
except KeyboardInterrupt:
208+
print("\n\nShutting down gracefully...")
209+
210+
except Exception as e:
211+
print(f"\n\nError: {e}")
212+
raise
213+
214+
# Option 2: Manual start/stop (alternative)
215+
# task_handler = TaskHandlerAsyncIO(configuration=api_config)
216+
# await task_handler.start()
217+
# try:
218+
# await asyncio.sleep(60) # Run for 60 seconds
219+
# finally:
220+
# await task_handler.stop()
221+
222+
# Option 3: Run with timeout (for testing)
223+
# from conductor.client.automator.task_handler_asyncio import run_workers_async
224+
# await run_workers_async(
225+
# configuration=api_config,
226+
# stop_after_seconds=60 # Auto-stop after 60 seconds
227+
# )
228+
229+
print("\nWorkers stopped. Goodbye!")
230+
231+
232+
if __name__ == '__main__':
233+
"""
234+
Run the async main function.
235+
236+
Python 3.7+: asyncio.run(main())
237+
Python 3.6: asyncio.get_event_loop().run_until_complete(main())
238+
"""
239+
try:
240+
asyncio.run(main())
241+
except KeyboardInterrupt:
242+
pass

0 commit comments

Comments
 (0)