Skip to content

Commit ed8d72b

Browse files
Merge pull request #193 from project-receptor/diagnostics
Adding a diagnostics subsystem Reviewed-by: https://github.com/apps/ansible-zuul
2 parents d84bd3e + 76d45bc commit ed8d72b

15 files changed

Lines changed: 253 additions & 52 deletions

File tree

receptor/__main__.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import asyncio
22
import logging
33
import logging.config
4-
import signal
54
import sys
65

76
from .config import ReceptorConfig
7+
from .diagnostics import log_buffer
88
from .logstash_formatter.logstash import LogstashFormatter
99

1010
logger = logging.getLogger(__name__)
@@ -48,17 +48,13 @@ def main(args=None):
4848

4949
def _f(record):
5050
record.node_id = config.default_node_id
51+
if record.levelno == logging.ERROR:
52+
log_buffer.appendleft(record)
5153
return True
5254

5355
for h in logging.getLogger("receptor").handlers:
5456
h.addFilter(_f)
5557

56-
def dump_stacks(signum, frame):
57-
for t in asyncio.Task.all_tasks():
58-
t.print_stack(file=sys.stderr)
59-
60-
signal.signal(signal.SIGHUP, dump_stacks)
61-
6258
try:
6359
config.go()
6460
except asyncio.CancelledError:

receptor/config.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def __init__(self, args=None):
246246
long_option="--ws_heartbeat",
247247
default_value=None,
248248
value_type="int",
249-
hint="Set heartbeat interval for websocket connections."
249+
hint="Set heartbeat interval for websocket connections.",
250250
)
251251
# ping options
252252
self.add_config_option(
@@ -295,7 +295,7 @@ def __init__(self, args=None):
295295
long_option="--ws_heartbeat",
296296
default_value=None,
297297
value_type="int",
298-
hint="Set heartbeat interval for websocket connections."
298+
hint="Set heartbeat interval for websocket connections.",
299299
)
300300
# send options
301301
self.add_config_option(
@@ -344,7 +344,7 @@ def __init__(self, args=None):
344344
long_option="--ws_heartbeat",
345345
default_value=None,
346346
value_type="int",
347-
hint="Set heartbeat interval for websocket connections."
347+
hint="Set heartbeat interval for websocket connections.",
348348
)
349349
# status options
350350
self.add_config_option(
@@ -378,7 +378,7 @@ def __init__(self, args=None):
378378
long_option="--ws_heartbeat",
379379
default_value=None,
380380
value_type="int",
381-
hint="Set heartbeat interval for websocket connections."
381+
hint="Set heartbeat interval for websocket connections.",
382382
)
383383
self.parse_options(args)
384384

receptor/connection/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .. import fileio
88
from ..bridgequeue import BridgeQueue
99
from ..messages.framed import FramedBuffer
10+
from ..stats import bytes_recv
1011

1112
logger = logging.getLogger(__name__)
1213

@@ -59,6 +60,7 @@ async def receive(self):
5960
async for msg in self.conn:
6061
if self.conn.closed:
6162
break
63+
bytes_recv.inc(len(msg))
6264
await self.buf.put(msg)
6365
except ConnectionResetError:
6466
logger.debug("receive: other side closed the connection")

receptor/connection/sock.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
34
from .base import Transport, log_ssl_detail
45

56
logger = logging.getLogger(__name__)
@@ -31,6 +32,20 @@ async def send(self, q):
3132
self.writer.write(chunk)
3233
await self.writer.drain()
3334

35+
def _diagnostics(self):
36+
t = self.writer._transport.get_extra_info
37+
addr, port = t("peername", (None, None))
38+
return {
39+
"address": addr,
40+
"port": port,
41+
"compression": t("compression"),
42+
"cipher": t("cipher"),
43+
"peercert": t("peercert"),
44+
"sslcontext": t("sslcontext"),
45+
"closed": self.closed,
46+
"chunk_size": self.chunk_size,
47+
}
48+
3449

3550
async def connect(host, port, factory, loop=None, ssl=None, reconnect=True):
3651
if not loop:

receptor/connection/ws.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,20 @@ async def send(self, q):
3333

3434

3535
async def connect(
36-
uri, factory, loop=None, ssl_context=None, reconnect=True,
37-
ws_extra_headers=None, ws_heartbeat=None
36+
uri,
37+
factory,
38+
loop=None,
39+
ssl_context=None,
40+
reconnect=True,
41+
ws_extra_headers=None,
42+
ws_heartbeat=None,
3843
):
3944
if not loop:
4045
loop = asyncio.get_event_loop()
4146

4247
worker = factory()
4348
try:
44-
proxy_scheme = {'ws': 'http', 'wss': 'https'}[urlparse(uri).scheme]
49+
proxy_scheme = {"ws": "http", "wss": "https"}[urlparse(uri).scheme]
4550
proxies = proxies_from_env()
4651
if proxy_scheme in proxies:
4752
proxy = proxies[proxy_scheme].proxy
@@ -50,8 +55,12 @@ async def connect(
5055
proxy = None
5156
proxy_auth = None
5257
async with aiohttp.ClientSession().ws_connect(
53-
uri, ssl=ssl_context, headers=ws_extra_headers,
54-
proxy=proxy, proxy_auth=proxy_auth, heartbeat=ws_heartbeat
58+
uri,
59+
ssl=ssl_context,
60+
headers=ws_extra_headers,
61+
proxy=proxy,
62+
proxy_auth=proxy_auth,
63+
heartbeat=ws_heartbeat,
5564
) as ws:
5665
log_ssl_detail(ws)
5766
t = WebSocket(ws)

receptor/controller.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from .connection.base import Worker
1010
from .connection.manager import Manager
11+
from .diagnostics import status
1112
from .messages.framed import FileBackedBuffer, FramedMessage
1213
from .receptor import Receptor
1314

@@ -39,6 +40,7 @@ def __init__(self, config, loop=asyncio.get_event_loop(), queue=None):
3940
if self.queue is None:
4041
self.queue = asyncio.Queue(loop=loop)
4142
self.receptor.response_queue = self.queue
43+
self.status_task = loop.create_task(status(self.receptor))
4244

4345
async def shutdown_loop(self):
4446
tasks = [

receptor/diagnostics.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import asyncio
2+
import inspect
3+
import json
4+
import logging
5+
import os
6+
import signal
7+
import traceback as tb
8+
import types
9+
from collections import defaultdict, deque
10+
from datetime import datetime
11+
from functools import singledispatch
12+
13+
from prometheus_client import generate_latest
14+
15+
from . import fileio
16+
from .logstash_formatter.logstash import LogstashFormatter
17+
18+
logger = logging.getLogger(__name__)
19+
20+
log_buffer = deque(maxlen=10)
21+
fmt = LogstashFormatter()
22+
trigger = asyncio.Event()
23+
24+
signal.signal(signal.SIGHUP, lambda n, h: trigger.set())
25+
26+
27+
@singledispatch
28+
def extract_module(o):
29+
return o.__module__
30+
31+
32+
@extract_module.register(types.CoroutineType)
33+
def extract_coro(coro):
34+
return inspect.getmodule(coro, coro.cr_code.co_filename).__name__
35+
36+
37+
@extract_module.register(types.GeneratorType)
38+
def extract_gen(gen):
39+
return inspect.getmodule(gen, gen.gi_code.co_filename).__name__
40+
41+
42+
@singledispatch
43+
def encode(o):
44+
return json.JSONEncoder().default(o)
45+
46+
47+
@encode.register(set)
48+
def encode_set(s):
49+
return list(s)
50+
51+
52+
@encode.register(types.FunctionType)
53+
def encode_function_type(func):
54+
return f"{func.__module__}.{func.__qualname__}"
55+
56+
57+
@encode.register(datetime)
58+
def encode_datetime(o):
59+
return o.isoformat()
60+
61+
62+
def structure_task(task):
63+
coro = task._coro
64+
try:
65+
mod = extract_module(coro)
66+
except Exception:
67+
mod = "<unknown>"
68+
69+
out = {"state": task._state, "name": f"{mod}.{coro.__qualname__}", "stack": []}
70+
71+
try:
72+
stack = tb.extract_stack(task.get_stack()[0])
73+
out["stack"] = [
74+
{"filename": fs.filename, "line": fs.line, "lineno": fs.lineno} for fs in stack
75+
]
76+
except IndexError:
77+
pass
78+
79+
return out
80+
81+
82+
def tasks():
83+
d = defaultdict(list)
84+
for t in asyncio.Task.all_tasks():
85+
st = structure_task(t)
86+
state = st.pop("state")
87+
d[state].append(st)
88+
return [{"state": state, "items": tasks} for state, tasks in d.items()]
89+
90+
91+
def format_connection(node_id, connection, capabilities):
92+
d = connection._diagnostics()
93+
d["node_id"] = node_id
94+
d["capabilities"] = capabilities
95+
return d
96+
97+
98+
def format_router(router):
99+
edges = [
100+
{"left": edge[0], "right": edge[1], "cost": cost} for edge, cost in router._edges.items()
101+
]
102+
103+
neighbors = [
104+
{"node_id": node_id, "items": values} for node_id, values in router._neighbors.items()
105+
]
106+
107+
table = [
108+
{"destination_node_id": node_id, "next_hop": v[0], "cost": v[1]}
109+
for node_id, v in router.routing_table.items()
110+
]
111+
112+
return {"nodes": router._nodes, "edges": edges, "neighbors": neighbors, "table": table}
113+
114+
115+
async def status(receptor_object):
116+
path = os.path.join(receptor_object.base_path, "diagnostics.json")
117+
doc = {}
118+
doc["config"] = receptor_object.config._parsed_args.__dict__
119+
doc["node_id"] = receptor_object.node_id
120+
while True:
121+
trigger.clear()
122+
doc["datetime"] = datetime.utcnow()
123+
doc["recent_errors"] = list(fmt._record_to_dict(r) for r in log_buffer)
124+
doc["connections"] = [
125+
format_connection(node_id, conn, receptor_object.node_capabilities[node_id])
126+
for node_id, connections in receptor_object.connections.items()
127+
for conn in connections
128+
]
129+
doc["routes"] = format_router(receptor_object.router)
130+
doc["tasks"] = tasks()
131+
doc["metrics"] = generate_latest()
132+
try:
133+
await fileio.write(path, json.dumps(doc, default=encode), mode="w")
134+
except Exception:
135+
logger.exception("failed to dump diagnostic data")
136+
137+
# run every 30 seconds, or when triggered
138+
try:
139+
await asyncio.wait_for(trigger.wait(), 30)
140+
except asyncio.TimeoutError:
141+
pass

receptor/entrypoints.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def node_keepalive():
3535
controller.add_peer(
3636
peer,
3737
ws_extra_headers=config.node_ws_extra_headers,
38-
ws_heartbeat=config.node_ws_heartbeat
38+
ws_heartbeat=config.node_ws_heartbeat,
3939
)
4040
if config.node_keepalive_interval > 1:
4141
controller.loop.create_task(node_keepalive())
@@ -179,7 +179,7 @@ async def status_entrypoint():
179179
config.status_ws_extra_headers,
180180
config.status_ws_heartbeat,
181181
print_status,
182-
noop
182+
noop,
183183
)
184184

185185
async def print_status():

receptor/logstash_formatter/logstash.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,7 @@ def __init__(
5858
except Exception:
5959
self.source_host = ""
6060

61-
def format(self, record):
62-
"""
63-
Format a log record to JSON, if the message is a dict
64-
assume an empty message and use the dict as additional
65-
fields.
66-
"""
67-
61+
def _record_to_dict(self, record):
6862
fields = record.__dict__.copy()
6963

7064
if isinstance(record.msg, dict):
@@ -107,7 +101,16 @@ def format(self, record):
107101
"@fields": self._build_fields(logr, fields),
108102
}
109103
)
104+
return logr
105+
106+
def format(self, record):
107+
"""
108+
Format a log record to JSON, if the message is a dict
109+
assume an empty message and use the dict as additional
110+
fields.
111+
"""
110112

113+
logr = self._record_to_dict(record)
111114
return json.dumps(logr, default=self.json_default, cls=self.json_cls)
112115

113116
def _build_fields(self, defaults, fields):

0 commit comments

Comments
 (0)