forked from dapr/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_invoke_simple.py
More file actions
31 lines (25 loc) · 934 Bytes
/
Copy pathtest_invoke_simple.py
File metadata and controls
31 lines (25 loc) · 934 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import httpx
import pytest
EXPECTED_RECEIVER = [
'{"id":1,"message":"hello world"}',
]
@pytest.mark.example_dir('invoke-simple')
def test_invoke_simple(dapr):
dapr.start(
'--app-id invoke-receiver --app-protocol grpc --app-port 50051 '
'--dapr-http-port 3500 -- python3 invoke-receiver.py',
wait=5,
)
# invoke-caller.py runs an infinite loop, so we invoke the method
# directly through the sidecar HTTP API with a single call.
resp = httpx.post(
'http://localhost:3500/v1.0/invoke/invoke-receiver/method/my-method',
json={'id': 1, 'message': 'hello world'},
timeout=5,
)
resp.raise_for_status()
assert 'text/plain' in resp.headers.get('content-type', '')
assert 'INVOKE_RECEIVED' in resp.text
receiver_output = dapr.stop()
for line in EXPECTED_RECEIVER:
assert line in receiver_output, f'Missing in receiver output: {line}'