forked from finos/symphony-bdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatahose_loop.py
More file actions
73 lines (59 loc) · 2.71 KB
/
datahose_loop.py
File metadata and controls
73 lines (59 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
from symphony.bdk.core.auth.auth_session import AuthSession
from symphony.bdk.core.config.model.bdk_config import BdkConfig
from symphony.bdk.core.retry import retry
from symphony.bdk.core.retry.strategy import read_datahose_retry
from symphony.bdk.core.service.datafeed.abstract_ackId_event_loop import AbstractAckIdEventLoop
from symphony.bdk.core.service.datafeed.abstract_datahose_loop import AbstractDatahoseLoop
from symphony.bdk.core.service.session.session_service import SessionService
from symphony.bdk.gen.agent_api.datahose_api import DatahoseApi
from symphony.bdk.gen.agent_model.v5_events_read_body import V5EventsReadBody
# DFv2 API authorizes a maximum length for the tag parameter
DATAHOSE_TAG_MAX_LENGTH = 80
TYPE = "datahose"
logger = logging.getLogger(__name__)
class DatahoseLoop(AbstractAckIdEventLoop, AbstractDatahoseLoop):
"""A class for implementing the datahose loop service.
This service will be started by calling :func:`~DatahoseLoop.start`.
The BDK bot will listen to this datahose to get all the received real-time events that are set
as event_types in the configuration.
This service will be stopped by calling :func:`~DatahoseLoop.stop`
"""
def __init__(
self,
datahose_api: DatahoseApi,
session_service: SessionService,
auth_session: AuthSession,
config: BdkConfig,
):
super().__init__(datahose_api, session_service, auth_session, config)
if config.datahose is not None:
not_truncated_tag = (
config.datahose.tag
if config.datahose.tag is not None
else TYPE + "-" + config.bot.username
if config.bot.username is not None
else TYPE
)
self._tag = not_truncated_tag[:DATAHOSE_TAG_MAX_LENGTH]
self._retry = config.datahose.retry
self._event_types = config.datahose.event_types
async def start(self):
if self._running:
raise RuntimeError("The datahose service is already started")
logger.debug("Starting datahose loop")
self._bot_info = await self._session_service.get_session()
try:
await super().start()
finally:
logger.debug("Stopping datahose loop")
@retry(retry=read_datahose_retry)
async def _read_events(self):
params = {
"session_token": await self._auth_session.session_token,
"key_manager_token": await self._auth_session.key_manager_token,
"body": V5EventsReadBody(
type=TYPE, tag=self._tag, event_types=self._event_types, ack_id=self._ack_id
),
}
return await self._datafeed_api.read_events(**params)