-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync_dynamic_aggregator.py
More file actions
119 lines (102 loc) · 3.83 KB
/
sync_dynamic_aggregator.py
File metadata and controls
119 lines (102 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import json
import argparse
import configparser
import logging
from typing import List, Literal, Optional
from communication.mqtt_communication import MqttCommunication
from core import Aggregator
from physics_aggregator import DynamicAggregator
# from communication import JsonCommunication, KafkaCommunication, MqttCommunication, BasicJsonCommunication
from communication import BasicJsonCommunication
from util import Log
from tap import Tap
class ArgumentParser(Tap):
sources : List[str]
"""List of sources file to load the information from"""
topics : List[str]
"""Topic to load the info used for camera distance computation """
log: Literal["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] = "FATAL"
"""Log level to report"""
config: str = "defconfig.ini"
"""path to configuration file with all settings"""
output: str = "aggregator.json"
"""Output path for aggregator data"""
enable_filter: bool = False
"""Enable FLK filtering"""
parser = ArgumentParser()
args = parser.parse_args()
log = Log(__name__, enable_console=True, enable_file=False)
class Publisher:
def publish(any):
print(any)
pass
class FakePublisher:
publisher: Publisher
def __init__(self) -> None:
self.publisher = Publisher()
def set_config_options(*any):
pass
def publish_info(any):
pass
def publish(any, any2):
pass
def main():
# Handle args
log_lvl = getattr(logging, args.log, logging.DEBUG)
log.set_level(log_lvl)
config_file = os.path.join(os.path.dirname(__file__), args.config)
if not os.path.exists(config_file):
log.f("Configuration file do not find, exit")
return
# Handle config
config = configparser.ConfigParser(allow_no_value=True)
config.read(config_file)
# Init communication
if args.topics is not None:
topics = args.topics
else:
topics = json.loads(config["SUBSCRIBE-COMMUNICATION"]["topic"])
communication_topics = [(t, config["SUBSCRIBE-COMMUNICATION"].getint("qos")) for t in topics]
print(communication_topics)
json_communication = BasicJsonCommunication(
communication_topics,
args.sources,
config["GENERAL"].getboolean("debug")
)
communication = None
communication_class = None
communication_class = BasicJsonCommunication
if communication_class is not None:
communication = FakePublisher()
# communication = MqttCommunication(
# None,
# config["PUBLISH-COMMUNICATION"]["broker-ip"],
# config["PUBLISH-COMMUNICATION"].getint("port"),
# False,
# config["PUBLISH-COMMUNICATION"]["clientid"],
# config["PUBLISH-COMMUNICATION"]["username"],
# config["PUBLISH-COMMUNICATION"]["password"][1:-1],
# config["PUBLISH-COMMUNICATION"]["topic"] or "/aggregator",
# config["GENERAL"].getboolean("debug"),
# log_lvl,
# config["PUBLISH-COMMUNICATION"]["protocol"],
# config["PUBLISH-COMMUNICATION"]["sasl_mechanism"]
# )
# Init aggregator
log.i("Aggregator type: {}".format("single person" if False else "multi person"))
aggregator = DynamicAggregator(
json_communication,
config["GENERAL"].getfloat("step-init") if config["GENERAL"]["step-init"] else None,
config["GENERAL"].getfloat("window") if config["GENERAL"]["window"] else None,
False,
communication,
config["GENERAL"].getboolean("debug"),
logging.getLevelName(args.log),
args.enable_filter
)
# communication.set_config_options(aggregator, config["SUBSCRIBE-COMMUNICATION"]["options-topic"], config["SUBSCRIBE-COMMUNICATION"]["options-path"])
# Run the system
aggregator.loopCustom(args.output)
if __name__ == '__main__':
main()