-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync_openptrack_aggregator.py
More file actions
98 lines (82 loc) · 2.93 KB
/
sync_openptrack_aggregator.py
File metadata and controls
98 lines (82 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import json
import argparse
import configparser
import logging
from typing import List, Literal, Optional
from OpenPTrack import OpenPTrack
# from communication import JsonCommunication, KafkaCommunication, MqttCommunication, BasicJsonCommunication
from communication import BasicJsonCommunication
from util import Log
from tap import Tap
class ArgumentParser(Tap):
sources : List[str]
"""List of sources file to load the information from"""
topics : List[str]
"""Topic to load the info used for camera distance computation """
log: Literal["DEBUG", "INFO", "WARNING", "ERROR", "FATAL"] = "INFO"
"""Log level to report"""
config: str = "defconfig.ini"
"""path to configuration file with all settings"""
output: str = "aggregator.json"
"""Output path for aggregator data"""
parser = ArgumentParser()
args = parser.parse_args()
log = Log(__name__, enable_console=True, enable_file=False)
class Publisher:
def publish(any):
print(any)
pass
class FakePublisher:
publisher: Publisher
def __init__(self) -> None:
self.publisher = Publisher()
def set_config_options(*any):
pass
def publish_info(any):
pass
def publish(any, any2):
pass
def main():
# Handle args
log_lvl = getattr(logging, args.log, logging.DEBUG)
log.set_level(log_lvl)
config_file = os.path.join(os.path.dirname(__file__), args.config)
if not os.path.exists(config_file):
log.f("Configuration file do not find, exit")
return
# Handle config
config = configparser.ConfigParser(allow_no_value=True)
config.read(config_file)
# Init communication
if args.topics is not None:
topics = args.topics
else:
topics = json.loads(config["SUBSCRIBE-COMMUNICATION"]["topic"])
communication_topics = [(t, config["SUBSCRIBE-COMMUNICATION"].getint("qos")) for t in topics]
json_communication = BasicJsonCommunication(
communication_topics,
args.sources,
config["GENERAL"].getboolean("debug")
)
communication = None
communication_class = None
communication_class = BasicJsonCommunication
if communication_class is not None:
communication = FakePublisher()
# Init aggregator
log.i("Aggregator type: {}".format("single person" if False else "multi person"))
aggregator = OpenPTrack(
json_communication,
config["GENERAL"].getfloat("step-init") if config["GENERAL"]["step-init"] else None,
config["GENERAL"].getfloat("window") if config["GENERAL"]["window"] else None,
False,
communication,
config["GENERAL"].getboolean("debug"),
logging.getLevelName(args.log)
)
communication.set_config_options(aggregator, config["SUBSCRIBE-COMMUNICATION"]["options-topic"], config["SUBSCRIBE-COMMUNICATION"]["options-path"])
# Run the system
aggregator.loopCustom(args.output)
if __name__ == '__main__':
main()