-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathclient.py
More file actions
55 lines (47 loc) · 2.17 KB
/
client.py
File metadata and controls
55 lines (47 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import logging
from urllib.parse import urlparse
import aiohttp
_logger = logging.getLogger(__name__)
class RLPGatewayClient(object):
"""
A client to read application logs directly from RLP gateway.
The client is initialized with client id and client secret,
and provides functionality for asynchronous HTTP requests to RLP gateway endpoint.
"""
def __init__(self, rlp_gateway_endpoint, proxy, verify_ssl, credentials_manager):
self.proxy = None
self.rlp_gateway_endpoint = rlp_gateway_endpoint
self.verify_ssl = verify_ssl
self.credentials_manager = credentials_manager
if proxy is not None and len(proxy) > 0:
self.proxy = proxy
async def stream_logs(self, app_guid, **kwargs):
url = f"{self.rlp_gateway_endpoint}/v2/read"
headers = {
"Authorization": self.credentials_manager._access_token,
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
}
params = {"log": "", "source_id": app_guid}
if "headers" in kwargs:
headers.update(kwargs["headers"])
if "params" in kwargs:
params.update(kwargs["params"])
async with aiohttp.ClientSession(headers=headers, proxy=self.proxy) as session:
async with session.get(url=url, params=params) as response:
if response.status == 204:
yield {}
else:
buffer = b""
async for data in response.content.iter_chunked(1024):
buffer += data
if b"\n\n" in buffer and buffer.startswith(b"data:"):
log_message = buffer.split(b"\n\n")[0]
buffer = buffer.replace(log_message + b"\n\n", b"")
yield log_message
elif buffer.startswith(b"event: heartbeat") or buffer.startswith(b"event: closing"):
# Consume heartbeats to keep the connection alive
buffer = b""
yield data
else:
yield data