Skip to content

Commit fb6cd43

Browse files
Rollback async client
1 parent 225f598 commit fb6cd43

543 files changed

Lines changed: 61 additions & 121080 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/pull_request.yml

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,6 @@ jobs:
8080
conductor-sdk-test:latest \
8181
/bin/sh -c "cd /package && COVERAGE_FILE=/package/${{ env.COVERAGE_DIR }}/.coverage.integration coverage run -m pytest -m v4 tests/integration -v"
8282
83-
- name: Run asyncio integration tests
84-
id: asyncio_integration_tests
85-
continue-on-error: true
86-
run: |
87-
docker run --rm \
88-
-e CONDUCTOR_AUTH_KEY=${{ env.CONDUCTOR_AUTH_KEY }} \
89-
-e CONDUCTOR_AUTH_SECRET=${{ env.CONDUCTOR_AUTH_SECRET }} \
90-
-e CONDUCTOR_SERVER_URL=${{ env.CONDUCTOR_SERVER_URL }} \
91-
-v ${{ github.workspace }}/${{ env.COVERAGE_DIR }}:/package/${{ env.COVERAGE_DIR }}:rw \
92-
conductor-sdk-test:latest \
93-
/bin/sh -c "cd /package && COVERAGE_FILE=/package/${{ env.COVERAGE_DIR }}/.coverage.asyncio_integration coverage run -m pytest -m v4 tests/integration/async -v"
94-
9583
- name: Generate coverage report
9684
id: coverage_report
9785
continue-on-error: true
@@ -123,5 +111,5 @@ jobs:
123111
file: ${{ env.COVERAGE_FILE }}
124112

125113
- name: Check test results
126-
if: steps.unit_tests.outcome == 'failure' || steps.bc_tests.outcome == 'failure' || steps.serdeser_tests.outcome == 'failure' || steps.integration_tests.outcome == 'failure' || steps.asyncio_integration_tests.outcome == 'failure'
114+
if: steps.unit_tests.outcome == 'failure' || steps.bc_tests.outcome == 'failure' || steps.serdeser_tests.outcome == 'failure' || steps.integration_tests.outcome == 'failure'
127115
run: exit 1

docs/authentication-retry-policy.md

Lines changed: 4 additions & 197 deletions
Original file line numberDiff line numberDiff line change
@@ -14,55 +14,14 @@ When a request receives a 401 (Unauthorized) response, the SDK automatically:
1414
4. **Retries the request** with the new token
1515
5. **Tracks attempts per endpoint** to handle persistent failures gracefully
1616

17-
### Race Condition Protection (Async Client)
1817

19-
The async client includes built-in protection against race conditions when multiple concurrent requests receive 401 errors:
20-
21-
- Uses an async lock (`asyncio.Lock`) to synchronize token refresh operations
22-
- Checks if another coroutine already refreshed the token before performing a refresh
23-
- Validates token freshness using `token_update_time` and `auth_token_ttl_sec`
24-
- Ensures only one token refresh occurs even with multiple simultaneous 401 responses
25-
26-
### Synchronous vs Asynchronous Behavior
27-
28-
**Async Client (`conductor.asyncio_client`)**:
29-
- Uses `asyncio.sleep()` for non-blocking delays
30-
- Includes race condition protection with `asyncio.Lock`
31-
- Ideal for high-concurrency scenarios and modern async applications
32-
33-
**Sync Client (`conductor.client`)**:
18+
**Client (`conductor.client`)**:
3419
- Uses `time.sleep()` for delays (blocks the current thread)
3520
- Simpler implementation without lock-based synchronization
3621
- Suitable for traditional synchronous applications and scripts
3722

3823
## Configuration
3924

40-
Both sync and async clients share the same configuration parameters.
41-
42-
### Async Client Configuration
43-
44-
For the async client (`conductor.asyncio_client`), configure the retry policy during initialization:
45-
46-
```python
47-
from conductor.asyncio_client.configuration import Configuration
48-
49-
configuration = Configuration(
50-
server_url="https://your-conductor-server.com/api",
51-
auth_key="your_key_id",
52-
auth_secret="your_key_secret",
53-
# 401 retry configuration
54-
auth_401_max_attempts=5, # Maximum retry attempts per endpoint
55-
auth_401_base_delay_ms=1000.0, # Base delay in milliseconds
56-
auth_401_max_delay_ms=60000.0, # Maximum delay cap in milliseconds
57-
auth_401_jitter_percent=0.2, # Random jitter (20%)
58-
auth_401_stop_behavior="stop_worker" # Behavior after max attempts
59-
)
60-
```
61-
62-
### Sync Client Configuration
63-
64-
For the sync client (`conductor.client`), configuration is identical:
65-
6625
```python
6726
from conductor.client.configuration.configuration import Configuration
6827

@@ -130,39 +89,6 @@ With default configuration (`base_delay_ms=1000`, `jitter_percent=0.2`, `max_del
13089

13190
## Usage Examples
13291

133-
### Basic Usage with Async Client
134-
135-
```python
136-
import asyncio
137-
from conductor.asyncio_client.configuration import Configuration
138-
from conductor.asyncio_client.workflow_client import WorkflowClient
139-
140-
async def main():
141-
# Configuration with custom retry policy
142-
config = Configuration(
143-
server_url="https://your-server.com/api",
144-
auth_key="your_key",
145-
auth_secret="your_secret",
146-
auth_401_max_attempts=3,
147-
auth_401_base_delay_ms=500.0
148-
)
149-
150-
workflow_client = WorkflowClient(config)
151-
152-
# The retry policy is automatically applied to all API calls
153-
workflow = await workflow_client.start_workflow(
154-
name="my_workflow",
155-
version=1,
156-
input={"key": "value"}
157-
)
158-
159-
print(f"Started workflow: {workflow.workflow_id}")
160-
161-
asyncio.run(main())
162-
```
163-
164-
### Basic Usage with Sync Client
165-
16692
```python
16793
from conductor.client.configuration.configuration import Configuration
16894
from conductor.client.workflow_client import WorkflowClient
@@ -188,66 +114,10 @@ workflow_id = workflow_client.start_workflow(
188114
print(f"Started workflow: {workflow_id}")
189115
```
190116

191-
### Handling Multiple Concurrent Requests (Async Client)
192-
193-
The async client handles concurrent requests efficiently with race condition protection:
194-
195-
```python
196-
import asyncio
197-
from conductor.asyncio_client.configuration import Configuration
198-
from conductor.asyncio_client.workflow_client import WorkflowClient
199-
200-
async def start_multiple_workflows():
201-
config = Configuration(
202-
server_url="https://your-server.com/api",
203-
auth_key="your_key",
204-
auth_secret="your_secret"
205-
)
206-
207-
workflow_client = WorkflowClient(config)
208-
209-
# Start multiple workflows concurrently
210-
# If all receive 401, only one token refresh will occur
211-
tasks = [
212-
workflow_client.start_workflow(name="workflow1", version=1, input={}),
213-
workflow_client.start_workflow(name="workflow2", version=1, input={}),
214-
workflow_client.start_workflow(name="workflow3", version=1, input={})
215-
]
216-
217-
results = await asyncio.gather(*tasks)
218-
return results
219-
220-
asyncio.run(start_multiple_workflows())
221-
```
222-
223117
### Working with Task Workers
224118

225119
Both sync and async workers benefit from the retry policy:
226120

227-
**Async Worker:**
228-
```python
229-
from conductor.asyncio_client.configuration import Configuration
230-
from conductor.asyncio_client.worker.worker import Worker
231-
from conductor.asyncio_client.worker.worker_task import WorkerTask
232-
233-
config = Configuration(
234-
server_url="https://your-server.com/api",
235-
auth_key="your_key",
236-
auth_secret="your_secret",
237-
auth_401_max_attempts=8, # More attempts for long-running workers
238-
auth_401_base_delay_ms=1000.0
239-
)
240-
241-
@WorkerTask(task_definition_name="example_task", worker_id="worker1", domain="test")
242-
async def example_task(task):
243-
return {"status": "completed"}
244-
245-
# Workers automatically benefit from retry policy
246-
worker = Worker(config)
247-
worker.start_polling()
248-
```
249-
250-
**Sync Worker:**
251121
```python
252122
from conductor.client.configuration.configuration import Configuration
253123
from conductor.client.worker.worker import Worker
@@ -292,7 +162,7 @@ For development environments where you want faster feedback (works for both sync
292162

293163
```python
294164
# Async client
295-
from conductor.asyncio_client.configuration import Configuration
165+
from conductor.client.configuration import Configuration
296166
# OR Sync client
297167
# from conductor.client.configuration.configuration import Configuration
298168

@@ -413,33 +283,16 @@ When `auth_401_max_attempts` is reached:
413283
3. Verify network connectivity to auth service
414284
4. Check server-side auth service logs
415285

416-
### Issue: Concurrent Requests Causing Multiple Token Refreshes (Async Client)
417-
418-
**Symptom**: Multiple token refresh operations for simultaneous requests (should not happen with race condition protection in async client)
419-
420-
**Solutions**:
421-
1. Verify you're using the SDK version with race condition fixes (async client only)
422-
2. Check if you're sharing the same `Configuration` instance across requests
423-
3. Ensure `asyncio.Lock` is working correctly in your environment
424-
4. Note: Sync client doesn't have this protection as it's not designed for concurrent requests
425-
426286
## Technical Details
427287

428288
### Implementation
429289

430290
The retry policy is implemented in:
431-
- **Async Client**: `src/conductor/asyncio_client/adapters/api_client_adapter.py`
432-
- **Sync Client**: `src/conductor/client/adapters/api_client_adapter.py`
291+
- **Client**: `src/conductor/client/adapters/api_client_adapter.py`
433292
- **Shared Policy**: `src/conductor/client/exceptions/auth_401_policy.py` - Policy configuration and backoff calculation
434293

435294
### Thread Safety and Concurrency
436295

437-
**Async Client:**
438-
- Uses `asyncio.Lock` for token refresh synchronization
439-
- Safe for concurrent coroutines
440-
- Tokens are checked and refreshed atomically
441-
- Race condition protection prevents duplicate token refreshes
442-
443296
**Sync Client:**
444297
- Designed for single-threaded sequential execution
445298
- No explicit locking (not needed for sequential requests)
@@ -454,57 +307,11 @@ The retry policy is implemented in:
454307
- Jitter prevents synchronized retry storms
455308
- Per-endpoint attempt tracking allows independent retry logic
456309

457-
**Async Client Specific:**
458-
- Non-blocking delays don't prevent other operations from executing
459-
- Ideal for high-concurrency scenarios (workers polling multiple tasks)
460-
- Lower overall latency in concurrent scenarios
461-
462-
**Sync Client Specific:**
310+
**Client Specific:**
463311
- Blocking delays pause the current thread
464312
- Simpler to reason about (sequential execution)
465313
- More suitable for scripts and batch processing
466314

467-
## Choosing Between Sync and Async
468-
469-
Use the **Async Client** when:
470-
- Building high-concurrency applications (web servers, API gateways)
471-
- Running workers that poll multiple tasks simultaneously
472-
- Need non-blocking I/O operations
473-
- Want maximum throughput with concurrent requests
474-
- Working with modern async frameworks (FastAPI, aiohttp, etc.)
475-
476-
Use the **Sync Client** when:
477-
- Writing simple scripts or batch jobs
478-
- Working with traditional synchronous code
479-
- Don't need concurrent request handling
480-
- Prefer simpler, more straightforward code
481-
- Integrating with existing sync-only libraries
482-
483-
Both clients provide the same retry policy functionality with identical configuration options.
484-
485-
## Migration Between Sync and Async
486-
487-
The configuration is compatible between both clients, making migration easier:
488-
489-
```python
490-
# Shared configuration
491-
config_params = {
492-
"server_url": "https://your-server.com/api",
493-
"auth_key": "your_key",
494-
"auth_secret": "your_secret",
495-
"auth_401_max_attempts": 5,
496-
"auth_401_base_delay_ms": 1000.0,
497-
}
498-
499-
# Use with async client
500-
from conductor.asyncio_client.configuration import Configuration as AsyncConfig
501-
async_config = AsyncConfig(**config_params)
502-
503-
# Or with sync client
504-
from conductor.client.configuration.configuration import Configuration as SyncConfig
505-
sync_config = SyncConfig(**config_params)
506-
```
507-
508315
## Related Documentation
509316

510317
- [Authorization & Access Control](README.md)

docs/configuration/proxy.md

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -177,35 +177,6 @@ config = Configuration(
177177
)
178178
```
179179

180-
### Async Client Proxy Configuration
181-
182-
```python
183-
import asyncio
184-
import httpx
185-
from conductor.asyncio_client.configuration import Configuration
186-
from conductor.asyncio_client.adapters import ApiClient
187-
188-
async def main():
189-
# Create async HTTP client with proxy
190-
async_client = httpx.AsyncClient(
191-
proxies={
192-
"http://": "http://proxy.company.com:8080",
193-
"https://": "http://proxy.company.com:8080"
194-
}
195-
)
196-
197-
config = Configuration(
198-
server_url="https://api.orkes.io/api",
199-
http_connection=async_client
200-
)
201-
202-
async with ApiClient(config) as api_client:
203-
# Use the client with proxy configuration
204-
pass
205-
206-
asyncio.run(main())
207-
```
208-
209180
## Troubleshooting
210181

211182
### Common Proxy Issues

docs/configuration/ssl-tls.md

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -93,70 +93,6 @@ config = Configuration(base_url="https://play.orkes.io")
9393
config.http_connection = custom_client
9494
```
9595

96-
## Asynchronous Client SSL Configuration
97-
98-
### Basic Async SSL Configuration
99-
100-
```python
101-
import asyncio
102-
from conductor.asyncio_client.configuration import Configuration
103-
from conductor.asyncio_client.adapters import ApiClient
104-
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
105-
106-
# Basic SSL configuration with custom CA certificate
107-
config = Configuration(
108-
server_url="https://play.orkes.io/api",
109-
ssl_ca_cert="/path/to/ca-certificate.pem",
110-
)
111-
112-
async def main():
113-
async with ApiClient(config) as api_client:
114-
orkes_clients = OrkesClients(api_client, config)
115-
workflow_client = orkes_clients.get_workflow_client()
116-
117-
# Use the client with SSL configuration
118-
workflows = await workflow_client.search_workflows()
119-
print(f"Found {len(workflows)} workflows")
120-
121-
asyncio.run(main())
122-
```
123-
124-
### Async SSL with Certificate Data
125-
126-
```python
127-
# SSL with custom CA certificate data (PEM string)
128-
config = Configuration(
129-
server_url="https://play.orkes.io/api",
130-
ca_cert_data="""-----BEGIN CERTIFICATE-----
131-
MIIDXTCCAkWgAwIBAgIJAKoK/Ovj8EUMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
132-
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
133-
aWRnaXRzIFB0eSBMdGQwHhcNMTYwMjEyMTQ0NDQ2WhcNMjYwMjEwMTQ0NDQ2WjBF
134-
-----END CERTIFICATE-----""",
135-
)
136-
```
137-
138-
### Async SSL with Custom SSL Context
139-
140-
```python
141-
import ssl
142-
143-
# Create custom SSL context
144-
ssl_context = ssl.create_default_context()
145-
ssl_context.load_verify_locations("/path/to/ca-certificate.pem")
146-
ssl_context.load_cert_chain(
147-
certfile="/path/to/client-certificate.pem",
148-
keyfile="/path/to/client-key.pem"
149-
)
150-
ssl_context.check_hostname = True
151-
ssl_context.verify_mode = ssl.CERT_REQUIRED
152-
153-
# Use with async client
154-
config = Configuration(
155-
server_url="https://play.orkes.io/api",
156-
ssl_ca_cert="/path/to/ca-certificate.pem",
157-
)
158-
```
159-
16096
## Environment Variable Configuration
16197

16298
You can configure SSL settings using environment variables:

0 commit comments

Comments
 (0)