Skip to content

Commit ba429af

Browse files
committed
feat: swarm metrics and per-protocol inbound and outbound bandwidth usage in prometheus
1 parent f5f7ee1 commit ba429af

2 files changed

Lines changed: 328 additions & 0 deletions

File tree

libp2p/metrics/bandwidth.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import asyncio
2+
from prometheus_client import Counter
3+
4+
5+
class BandwidthMetrics:
6+
"""
7+
Prometheus bandwidth metrics for libp2p transport streams.
8+
"""
9+
10+
def __init__(self):
11+
12+
self.bandwidth = Counter(
13+
"libp2p_bandwidth_bytes_total",
14+
"Bandwidth usage by direction and protocol stack",
15+
["direction", "protocols"],
16+
)
17+
18+
def outbound(self, protocols, n):
19+
self.bandwidth.labels(
20+
direction="outbound",
21+
protocols=protocols,
22+
).inc(n)
23+
24+
def inbound(self, protocols, n):
25+
self.bandwidth.labels(
26+
direction="inbound",
27+
protocols=protocols,
28+
).inc(n)
29+
30+
31+
class InstrumentedStream:
32+
"""
33+
Wraps a stream to measure bandwidth.
34+
"""
35+
36+
def __init__(self, stream, metrics: BandwidthMetrics, protocols: str):
37+
self.stream = stream
38+
self.metrics = metrics
39+
self.protocols = protocols
40+
41+
async def read(self, n=-1):
42+
data = await self.stream.read(n)
43+
44+
if data:
45+
self.metrics.inbound(self.protocols, len(data))
46+
47+
return data
48+
49+
async def write(self, data: bytes):
50+
n = await self.stream.write(data)
51+
52+
if n is None:
53+
n = len(data)
54+
55+
self.metrics.outbound(self.protocols, n)
56+
57+
return n
58+
59+
async def close(self):
60+
await self.stream.close()
61+
62+
63+
class TransportWrapper:
64+
"""
65+
Wraps a transport and instruments bandwidth.
66+
"""
67+
68+
def __init__(self, transport, metrics: BandwidthMetrics):
69+
self.transport = transport
70+
self.metrics = metrics
71+
72+
async def dial(self, addr, protocols):
73+
74+
stream = await self.transport.dial(addr)
75+
76+
return InstrumentedStream(
77+
stream,
78+
self.metrics,
79+
protocols,
80+
)
81+
82+
async def accept(self, protocols):
83+
84+
stream = await self.transport.accept()
85+
86+
return InstrumentedStream(
87+
stream,
88+
self.metrics,
89+
protocols,
90+
)

libp2p/metrics/swarm.py

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
import time
2+
from prometheus_client import Counter, Histogram
3+
4+
5+
class SwarmMetrics:
6+
"""
7+
Prometheus metrics for libp2p swarm events.
8+
Mirrors the Rust libp2p metrics implementation.
9+
"""
10+
11+
def __init__(self):
12+
13+
# ---------------------------
14+
# incoming connections
15+
# ---------------------------
16+
17+
self.connections_incoming = Counter(
18+
"swarm_connections_incoming_total",
19+
"Number of incoming connections per address stack",
20+
["protocols"],
21+
)
22+
23+
self.connections_incoming_error = Counter(
24+
"swarm_connections_incoming_error_total",
25+
"Number of incoming connection errors",
26+
["error", "protocols"],
27+
)
28+
29+
# ---------------------------
30+
# connection lifecycle
31+
# ---------------------------
32+
33+
self.connections_established = Counter(
34+
"swarm_connections_established_total",
35+
"Number of connections established",
36+
["role", "protocols"],
37+
)
38+
39+
self.connections_establishment_duration = Histogram(
40+
"swarm_connections_establishment_duration_seconds",
41+
"Time taken to establish connection",
42+
["role", "protocols"],
43+
buckets=(
44+
0.01, 0.02, 0.05, 0.1,
45+
0.2, 0.5, 1, 2, 5, 10
46+
),
47+
)
48+
49+
self.connections_duration = Histogram(
50+
"swarm_connections_duration_seconds",
51+
"Time a connection was alive",
52+
["role", "protocols", "cause"],
53+
buckets=(
54+
0.01, 0.1, 1, 5, 10, 30, 60, 300, 600
55+
),
56+
)
57+
58+
# ---------------------------
59+
# listening addresses
60+
# ---------------------------
61+
62+
self.new_listen_addr = Counter(
63+
"swarm_new_listen_addr_total",
64+
"Number of new listen addresses",
65+
["protocols"],
66+
)
67+
68+
self.expired_listen_addr = Counter(
69+
"swarm_expired_listen_addr_total",
70+
"Number of expired listen addresses",
71+
["protocols"],
72+
)
73+
74+
# ---------------------------
75+
# external addresses
76+
# ---------------------------
77+
78+
self.external_addr_candidates = Counter(
79+
"swarm_external_addr_candidates_total",
80+
"Number of new external address candidates",
81+
["protocols"],
82+
)
83+
84+
self.external_addr_confirmed = Counter(
85+
"swarm_external_addr_confirmed_total",
86+
"Number of confirmed external addresses",
87+
["protocols"],
88+
)
89+
90+
self.external_addr_expired = Counter(
91+
"swarm_external_addr_expired_total",
92+
"Number of expired external addresses",
93+
["protocols"],
94+
)
95+
96+
# ---------------------------
97+
# listener lifecycle
98+
# ---------------------------
99+
100+
self.listener_closed = Counter(
101+
"swarm_listener_closed_total",
102+
"Number of listeners closed",
103+
["protocols"],
104+
)
105+
106+
self.listener_error = Counter(
107+
"swarm_listener_error_total",
108+
"Number of listener errors",
109+
)
110+
111+
# ---------------------------
112+
# dialing
113+
# ---------------------------
114+
115+
self.dial_attempt = Counter(
116+
"swarm_dial_attempt_total",
117+
"Number of dial attempts",
118+
)
119+
120+
self.outgoing_connection_error = Counter(
121+
"swarm_outgoing_connection_error_total",
122+
"Outgoing connection errors",
123+
["peer", "error"],
124+
)
125+
126+
# ---------------------------
127+
# connection tracking
128+
# ---------------------------
129+
130+
self.connections = {}
131+
132+
# -------------------------------------------------
133+
134+
def record(self, event):
135+
"""
136+
Record a SwarmEvent-like object.
137+
"""
138+
139+
etype = event["type"]
140+
141+
if etype == "ConnectionEstablished":
142+
143+
role = event["role"]
144+
protocols = event["protocols"]
145+
conn_id = event["connection_id"]
146+
duration = event.get("established_in", 0)
147+
148+
self.connections_established.labels(
149+
role=role,
150+
protocols=protocols,
151+
).inc()
152+
153+
self.connections_establishment_duration.labels(
154+
role=role,
155+
protocols=protocols,
156+
).observe(duration)
157+
158+
self.connections[conn_id] = time.time()
159+
160+
elif etype == "ConnectionClosed":
161+
162+
conn_id = event["connection_id"]
163+
role = event["role"]
164+
protocols = event["protocols"]
165+
cause = event.get("cause", "None")
166+
167+
if conn_id in self.connections:
168+
elapsed = time.time() - self.connections.pop(conn_id)
169+
170+
self.connections_duration.labels(
171+
role=role,
172+
protocols=protocols,
173+
cause=cause,
174+
).observe(elapsed)
175+
176+
elif etype == "IncomingConnection":
177+
178+
self.connections_incoming.labels(
179+
protocols=event["protocols"]
180+
).inc()
181+
182+
elif etype == "IncomingConnectionError":
183+
184+
self.connections_incoming_error.labels(
185+
error=event["error"],
186+
protocols=event["protocols"],
187+
).inc()
188+
189+
elif etype == "OutgoingConnectionError":
190+
191+
self.outgoing_connection_error.labels(
192+
peer=event["peer"],
193+
error=event["error"],
194+
).inc()
195+
196+
elif etype == "NewListenAddr":
197+
198+
self.new_listen_addr.labels(
199+
protocols=event["protocols"]
200+
).inc()
201+
202+
elif etype == "ExpiredListenAddr":
203+
204+
self.expired_listen_addr.labels(
205+
protocols=event["protocols"]
206+
).inc()
207+
208+
elif etype == "ListenerClosed":
209+
210+
self.listener_closed.labels(
211+
protocols=event["protocols"]
212+
).inc()
213+
214+
elif etype == "ListenerError":
215+
216+
self.listener_error.inc()
217+
218+
elif etype == "Dialing":
219+
220+
self.dial_attempt.inc()
221+
222+
elif etype == "NewExternalAddrCandidate":
223+
224+
self.external_addr_candidates.labels(
225+
protocols=event["protocols"]
226+
).inc()
227+
228+
elif etype == "ExternalAddrConfirmed":
229+
230+
self.external_addr_confirmed.labels(
231+
protocols=event["protocols"]
232+
).inc()
233+
234+
elif etype == "ExternalAddrExpired":
235+
236+
self.external_addr_expired.labels(
237+
protocols=event["protocols"]
238+
).inc()

0 commit comments

Comments
 (0)