-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathbase_daemon.py
More file actions
141 lines (126 loc) · 5.35 KB
/
base_daemon.py
File metadata and controls
141 lines (126 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import argparse
from abc import ABC, abstractmethod
from inspect import signature
from types import FunctionType
from uuid import UUID
from pyoaev.client import OpenAEV
from pyoaev.configuration import Configuration
from pyoaev.exceptions import OpenAEVError
from pyoaev.utils import logger
class BaseDaemon(ABC):
"""A base class for implementing a kind of daemon that periodically polls
a given callback.
:param configuration: configuration to provide the daemon
(allowing for looking up values within the callback for example)
:type configuration: Configuration
:param callback: a method or function to periodically call, defaults to None.
:type callback: callable, optional
:param logger: a logger object, to log events. if not supplied, a default logger
will be spawned to provide this functionality.
:type logger: Any
:param api_client: an API client that will provide connectivity with other systems.
:type api_client: Any
"""
def __init__(
self,
configuration: Configuration,
callback: callable = None,
logger=None,
api_client=None,
):
self._configuration = configuration
self._callback = callback
self.api = api_client or BaseDaemon.__get_default_api_client(
url=self._configuration.get("openaev_url"),
token=self._configuration.get("openaev_token"),
tenant_id=self._configuration.get("openaev_tenant_id"),
)
# logging
# compatibility layer: in order for older configs to still work, search for legacy names
actual_log_level = (
self._configuration.get("log_level")
or self._configuration.get("collector_log_level")
or self._configuration.get("injector_log_level")
or "info"
)
actual_log_name = (
self._configuration.get("name")
or self._configuration.get("collector_name")
or self._configuration.get("injector_name")
or "daemon"
)
self.logger = logger or BaseDaemon.__get_default_logger(
actual_log_level.upper(),
actual_log_name,
)
@abstractmethod
def _setup(self):
"""A run-once method that inheritors must implement. This serves to instantiate
all useful objects and functionality for the implementor to run.
"""
pass
@abstractmethod
def _start_loop(self):
"""Starts the daemon's main execution loop. Implementors should implement
the main execution logic in here.
"""
pass
def _try_callback(self):
"""Tries to call the configured callback. Note that if any error is thrown,
it is immediately swallowed (but still logged) allowing the collector to keep
running. This is useful for any transient issue (e.g. API endpoint down...).
"""
try:
# this is some black magic to allow injecting the collector daemon instance
# into an arbitrary callback that has a specific argument name
# this allow for avoiding subclassing the CollectorDaemon class just to provide the callback
# Example:
#
# def standalone_func(collector):
# collector.api.call_openaev()
#
# CollectorDaemon(config=<pyboas.configuration.Configuration>, standalone_func).start()
if (
isinstance(self._callback, FunctionType)
and "collector" in signature(self._callback).parameters
):
self._callback(collector=self)
else:
self._callback()
except Exception as err: # pylint: disable=broad-except
self.logger.error(f"Error calling: {err}")
def start(self):
"""Start the daemon. This will run the implementor's run-once setup method and
follow-up with the main execution loop. Note that at this point, if there is no
configured callback, the method will abort and kill the daemon.
"""
parser = argparse.ArgumentParser(description="parse daemon options")
parser.add_argument("--dump-config-schema", action="store_true")
args = parser.parse_args()
if args.dump_config_schema:
print(self._configuration.schema())
return
if self._callback is None:
raise OpenAEVError("This daemon has no configured callback.")
self._setup()
self._start_loop()
def set_callback(self, callback: callable):
"""Configures a callback to call in the main execution loop. If the callback
was not provided in the daemon's ctor, this should be set before calling start().
"""
self._callback = callback
def get_id(self):
"""Returns the daemon instance's ID contained in configuration. Configuration
must define any of these keys: `id`, `collector_id`, `injector_id`.
"""
return (
self._configuration.get("id")
or self._configuration.get("collector_id")
or self._configuration.get("injector_id")
)
@classmethod
def __get_default_api_client(cls, url, token, tenant_id: UUID | None):
return OpenAEV(url=url, token=token, tenant_id=tenant_id)
@classmethod
def __get_default_logger(cls, log_level, name):
return logger(log_level)(name)