-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcounting_client.py
More file actions
115 lines (94 loc) · 3.98 KB
/
counting_client.py
File metadata and controls
115 lines (94 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import logging
import argparse, textwrap
import json
from intersect_sdk import (
INTERSECT_JSON_VALUE,
IntersectClient,
IntersectClientCallback,
IntersectClientConfig,
default_intersect_lifecycle_loop,
)
logging.basicConfig(level=logging.INFO)
class SampleOrchestrator:
"""This class contains the callback function.
It uses a class because we want to modify our own state from the callback function.
State is managed through counting the events we receive - we'll process a certain number of events, then stop.
"""
MAX_EVENTS_TO_PROCESS = 3
def __init__(self) -> None:
"""Straightforward constructor, initializes a global variable we modify on getting an event."""
self.events_encountered = 0
def event_callback(
self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE
) -> None:
"""Handles events from the Counting Service.
We want to process 3 events and then stop processing events. With each event, we'll print out the emitted event.
In this case, since we don't send out any messages or switch the events we listen to, we can just return None.
If we DID want to modify this, we would return an IntersectClientCallback object.
Params:
- _source: the source of the event (in this instance, it will always be the counting service)
- _operation: the name of the operation from the service which emitted the event. Sometimes this comes from a message.
In this case it will always be 'increment_counter_function', since that's the function the event was configured on.
- _event_name: the name of the event. In this case it will always be 'increment_counter'.
- payload: the actual value of the emitted event. In this case it will always be an integer (3, 27, 81, 243, ...)
"""
# this check isn't necessary in this instance, but given that events are completely asynchronous
# it can be good to verify that we only handle the intended events
if self.events_encountered > self.MAX_EVENTS_TO_PROCESS:
return
self.events_encountered += 1
print(payload)
if self.events_encountered == self.MAX_EVENTS_TO_PROCESS:
raise Exception
def parse_arguments():
"""
Setup and parse command-line arguments.
"""
p = argparse.ArgumentParser(description="Counting Example",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent('''\
Examples:
# Starting Service
python3 counting_service.py -c mybroker.json
# Starting a Client
python3 counting_client.py -c mybroker.json
'''))
p.add_argument(
"-c", "--config",
type=str,
help="Path to broker configuration file (JSON format)."
)
return p.parse_args()
if __name__ == '__main__':
args = parse_arguments()
if args.config:
with open(args.config, 'r') as f:
from_config_file = json.load(f)
else:
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 1883,
'protocol': 'mqtt3.1.1',
},
],
}
# start listening to events from the counting service
config = IntersectClientConfig(
initial_message_event_config=IntersectClientCallback(
services_to_start_listening_for_events=[
'counting-organization.counting-facility.counting-system.counting-subsystem.counting-service',
]
),
**from_config_file,
)
orchestrator = SampleOrchestrator()
client = IntersectClient(
config=config,
event_callback=orchestrator.event_callback,
)
default_intersect_lifecycle_loop(
client,
)