-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathtest_integration.py
More file actions
478 lines (389 loc) · 17.7 KB
/
Copy pathtest_integration.py
File metadata and controls
478 lines (389 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# ruff: noqa: ARG001
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from agentex.protocol.acp import (
RPCMethod,
SendEventParams,
CancelTaskParams,
CreateTaskParams,
)
from agentex.lib.sdk.fastacp.impl.sync_acp import SyncACP
from agentex.lib.sdk.fastacp.impl.temporal_acp import TemporalACP
from agentex.lib.sdk.fastacp.impl.async_base_acp import AsyncBaseACP
class TestImplementationBehavior:
"""Test specific behavior differences between ACP implementations"""
@pytest.mark.asyncio()
async def test_sync_acp_default_handlers(self):
"""Test SyncACP has expected default handlers"""
with patch.dict("os.environ", {"AGENTEX_BASE_URL": ""}):
sync_acp = SyncACP.create()
# Should have send_message_message handler by default
assert RPCMethod.MESSAGE_SEND in sync_acp._handlers
@pytest.mark.asyncio()
async def test_async_acp_default_handlers(self):
"""Test AsyncBaseACP has expected default handlers"""
with patch.dict("os.environ", {"AGENTEX_BASE_URL": ""}):
async_acp = AsyncBaseACP.create()
# Should have create, message, and cancel handlers by default
assert RPCMethod.TASK_CREATE in async_acp._handlers
assert RPCMethod.EVENT_SEND in async_acp._handlers
assert RPCMethod.TASK_CANCEL in async_acp._handlers
@pytest.mark.asyncio()
async def test_temporal_acp_creation_with_mocked_client(self):
"""Test TemporalACP creation with mocked temporal client"""
with patch.dict("os.environ", {"AGENTEX_BASE_URL": ""}):
with patch.object(TemporalACP, "create", new_callable=AsyncMock) as mock_create:
mock_temporal_instance = MagicMock(spec=TemporalACP)
mock_temporal_instance._handlers = {}
mock_temporal_instance.temporal_client = MagicMock()
mock_create.return_value = mock_temporal_instance
temporal_acp = TemporalACP.create(temporal_address="localhost:7233")
assert temporal_acp == mock_temporal_instance
assert hasattr(temporal_acp, "temporal_client")
class TestRealWorldScenarios:
"""Test real-world usage scenarios and integration"""
@pytest.mark.asyncio()
async def test_message_handling_workflow(self, sync_acp, free_port, test_server_runner):
"""Test complete message handling workflow"""
messages_received = []
@sync_acp.on_task_event_send
async def message_handler(params: SendEventParams):
messages_received.append(
{
"task_id": params.task.id,
"message_content": params.message.content, # type: ignore[attr-defined]
"author": params.message.author, # type: ignore[attr-defined]
}
)
return {"processed": True}
runner = test_server_runner(sync_acp, free_port)
await runner.start()
# Send multiple messages
async with httpx.AsyncClient() as client:
for i in range(3):
request_data = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {
"id": f"workflow-task-{i}",
"agent_id": "workflow-agent",
"status": "RUNNING",
},
"message": {
"type": "text",
"author": "user",
"content": f"Workflow message {i}",
},
},
"id": f"workflow-{i}",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=request_data)
assert response.status_code == 200
# Give background tasks time to process
await asyncio.sleep(0.2)
# Verify all messages were processed
assert len(messages_received) == 3
for i, msg in enumerate(messages_received):
assert msg["task_id"] == f"workflow-task-{i}"
assert msg["message_content"] == f"Workflow message {i}"
assert msg["author"] == "user"
await runner.stop()
@pytest.mark.asyncio()
async def test_task_lifecycle_management(self, async_base_acp, free_port, test_server_runner):
"""Test complete task lifecycle: create -> message -> cancel"""
task_events = []
@async_base_acp.on_task_create
async def create_handler(params: CreateTaskParams):
task_events.append(("created", params.task.id))
@async_base_acp.on_task_event_send
async def message_handler(params: SendEventParams):
task_events.append(("message", params.task.id))
@async_base_acp.on_task_cancel
async def cancel_handler(params: CancelTaskParams):
task_events.append(("cancelled", params.task_id)) # type: ignore[attr-defined]
runner = test_server_runner(async_base_acp, free_port)
await runner.start()
async with httpx.AsyncClient() as client:
# Create task
create_request = {
"jsonrpc": "2.0",
"method": "task/create",
"params": {
"task": {
"id": "lifecycle-task",
"agent_id": "lifecycle-agent",
"status": "RUNNING",
}
},
"id": "create-1",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=create_request)
assert response.status_code == 200
# Send message
message_request = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {
"id": "lifecycle-task",
"agent_id": "lifecycle-agent",
"status": "RUNNING",
},
"message": {
"type": "text",
"author": "user",
"content": "Lifecycle test message",
},
},
"id": "message-1",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=message_request)
assert response.status_code == 200
# Cancel task
cancel_request = {
"jsonrpc": "2.0",
"method": "task/cancel",
"params": {"task_id": "lifecycle-task"},
"id": "cancel-1",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=cancel_request)
assert response.status_code == 200
# Give background tasks time to process
await asyncio.sleep(0.2)
# Verify task lifecycle events
assert len(task_events) == 3
assert task_events[0] == ("created", "lifecycle-task")
assert task_events[1] == ("message", "lifecycle-task")
assert task_events[2] == ("cancelled", "lifecycle-task")
await runner.stop()
class TestErrorRecovery:
"""Test error handling and recovery scenarios"""
@pytest.mark.asyncio()
async def test_server_resilience_to_handler_failures(
self, sync_acp, free_port, test_server_runner
):
"""Test server continues working after handler failures"""
failure_count = 0
success_count = 0
@sync_acp.on_task_event_send
async def unreliable_handler(params: SendEventParams):
nonlocal failure_count, success_count
if "fail" in params.message.content: # type: ignore[attr-defined]
failure_count += 1
raise RuntimeError("Simulated handler failure")
else:
success_count += 1
return {"success": True}
runner = test_server_runner(sync_acp, free_port)
await runner.start()
async with httpx.AsyncClient() as client:
# Send failing request
fail_request = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {"id": "fail-task", "agent_id": "test-agent", "status": "RUNNING"},
"message": {"type": "text", "author": "user", "content": "This should fail"},
},
"id": "fail-1",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=fail_request)
assert response.status_code == 200 # Server should still respond
# Send successful request after failure
success_request = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {"id": "success-task", "agent_id": "test-agent", "status": "RUNNING"},
"message": {"type": "text", "author": "user", "content": "This should succeed"},
},
"id": "success-1",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=success_request)
assert response.status_code == 200
# Verify server is still healthy
health_response = await client.get(f"http://127.0.0.1:{free_port}/healthz")
assert health_response.status_code == 200
# Give background tasks time to process
await asyncio.sleep(0.2)
assert failure_count == 1
assert success_count == 1
await runner.stop()
@pytest.mark.asyncio()
async def test_concurrent_request_handling(self, sync_acp, free_port, test_server_runner):
"""Test handling multiple concurrent requests"""
processed_requests = []
@sync_acp.on_task_event_send
async def concurrent_handler(params: SendEventParams):
# Simulate some processing time
await asyncio.sleep(0.05)
processed_requests.append(params.task.id)
return {"processed": params.task.id}
runner = test_server_runner(sync_acp, free_port)
await runner.start()
# Send multiple concurrent requests
async def send_request(client, task_id):
request_data = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {"id": task_id, "agent_id": "concurrent-agent", "status": "RUNNING"},
"message": {
"type": "text",
"author": "user",
"content": f"Concurrent message for {task_id}",
},
},
"id": f"concurrent-{task_id}",
}
return await client.post(f"http://127.0.0.1:{free_port}/api", json=request_data)
async with httpx.AsyncClient() as client:
# Send 5 concurrent requests
tasks = [send_request(client, f"task-{i}") for i in range(5)]
responses = await asyncio.gather(*tasks)
# All should return immediate acknowledgment
for response in responses:
assert response.status_code == 200
data = response.json()
assert data["result"]["status"] == "processing"
# Give background tasks time to complete
await asyncio.sleep(0.3)
# All requests should have been processed
assert len(processed_requests) == 5
assert set(processed_requests) == {f"task-{i}" for i in range(5)}
await runner.stop()
class TestSpecialCases:
"""Test edge cases and special scenarios"""
@pytest.mark.asyncio()
async def test_notification_vs_request_behavior(self, sync_acp, free_port, test_server_runner):
"""Test difference between notifications (no ID) and requests (with ID)"""
notifications_received = 0
requests_received = 0
@sync_acp.on_task_event_send
async def tracking_handler(params: SendEventParams):
nonlocal notifications_received, requests_received
if "notification" in params.message.content: # type: ignore[attr-defined]
notifications_received += 1
else:
requests_received += 1
return {"handled": True}
runner = test_server_runner(sync_acp, free_port)
await runner.start()
async with httpx.AsyncClient() as client:
# Send notification (no ID)
notification_data = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {
"id": "notification-task",
"agent_id": "test-agent",
"status": "RUNNING",
},
"message": {
"type": "text",
"author": "user",
"content": "This is a notification",
},
},
# Note: no "id" field
}
notification_response = await client.post(
f"http://127.0.0.1:{free_port}/api", json=notification_data
)
assert notification_response.status_code == 200
notification_result = notification_response.json()
assert notification_result["id"] is None
# Send regular request (with ID)
request_data = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {"id": "request-task", "agent_id": "test-agent", "status": "RUNNING"},
"message": {"type": "text", "author": "user", "content": "This is a request"},
},
"id": "request-1",
}
request_response = await client.post(
f"http://127.0.0.1:{free_port}/api", json=request_data
)
assert request_response.status_code == 200
request_result = request_response.json()
assert request_result["id"] == "request-1"
assert request_result["result"]["status"] == "processing"
# Give background tasks time to process
await asyncio.sleep(0.1)
assert notifications_received == 1
assert requests_received == 1
await runner.stop()
@pytest.mark.asyncio()
async def test_unicode_message_handling(self, sync_acp, free_port, test_server_runner):
"""Test handling of unicode characters in messages"""
received_message = None
@sync_acp.on_task_event_send
async def unicode_handler(params: SendEventParams):
nonlocal received_message
received_message = params.message.content # type: ignore[attr-defined]
return {"unicode_handled": True}
runner = test_server_runner(sync_acp, free_port)
await runner.start()
unicode_text = "Hello 世界 🌍 émojis 🚀 and special chars: \n\t\r"
async with httpx.AsyncClient() as client:
request_data = {
"jsonrpc": "2.0",
"method": "event/send",
"params": {
"task": {
"id": "unicode-task",
"agent_id": "unicode-agent",
"status": "RUNNING",
},
"message": {"type": "text", "author": "user", "content": unicode_text},
},
"id": "unicode-test",
}
response = await client.post(f"http://127.0.0.1:{free_port}/api", json=request_data)
assert response.status_code == 200
# Give background task time to process
await asyncio.sleep(0.1)
assert received_message == unicode_text
await runner.stop()
class TestImplementationIsolation:
"""Test that different implementations don't interfere with each other"""
@pytest.mark.asyncio()
async def test_handler_isolation_between_implementations(self):
"""Test handlers registered on one implementation don't affect others"""
with patch.dict("os.environ", {"AGENTEX_BASE_URL": ""}):
sync_acp = SyncACP.create()
async_acp = AsyncBaseACP.create()
sync_handled = False
async_handled = False
@sync_acp.on_task_event_send
async def sync_handler(params: SendEventParams):
nonlocal sync_handled
sync_handled = True
return {"sync": True}
@async_acp.on_task_event_send
async def async_handler(params: SendEventParams):
nonlocal async_handled
async_handled = True
return {"async": True}
# Create test parameters
message_params = SendEventParams( # type: ignore[call-arg]
task={"id": "isolation-test-task", "agent_id": "test-agent", "status": "RUNNING"},
event={"type": "text", "author": "user", "content": "Isolation test"}, # type: ignore[misc]
)
# Execute sync handler
sync_result = await sync_acp._handlers[RPCMethod.EVENT_SEND](message_params)
assert sync_handled is True
assert async_handled is False
assert sync_result == {"sync": True}
# Reset and execute async handler
sync_handled = False
async_result = await async_acp._handlers[RPCMethod.EVENT_SEND](message_params)
assert sync_handled is False
assert async_handled is True
assert async_result == {"async": True}