Skip to content

Commit 1df74e2

Browse files
committed
feat: added metrics for swarm-connection cycle
1 parent 528a1c5 commit 1df74e2

8 files changed

Lines changed: 114 additions & 356 deletions

File tree

examples/metrics/coordinator.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from libp2p.pubsub.gossipsub import GossipSub
1515
from libp2p.pubsub.pubsub import Pubsub
1616
from libp2p.records.validator import Validator
17-
from libp2p.utils.paths import get_script_dir, join_paths
1817

1918
GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
2019
COMMANDS = """
@@ -24,7 +23,7 @@
2423
- join <topic> - Subscribe to a topic
2524
- leave <topic> - Unsubscribe to a topic
2625
- publish <topic> <message> - Publish a message
27-
- put <key> <value> - Execute PUT_VALUE in DHT
26+
- put <key> <value> - Execute PUT_VALUE in DHT
2827
- get <key> - Execute GET_VALUE in DHT
2928
- advertize <content-id> - Execute ADD_PROVIDER in DHT
3029
- get_provider <content-id> - Execute GET_PROVIDERS in DHT
@@ -44,9 +43,7 @@ def select(self, key: str, values: list[bytes]) -> int:
4443

4544

4645
class Node:
47-
def __init__(
48-
self, listen_addrs: list[multiaddr.Multiaddr], dht_role: str
49-
):
46+
def __init__(self, listen_addrs: list[multiaddr.Multiaddr], dht_role: str):
5047
# Create a libp2p-host
5148
self.host = new_host(listen_addrs=listen_addrs, enable_metrics=True)
5249

@@ -89,6 +86,11 @@ async def receive_loop(self, subsription):
8986
while not self.termination_event.is_set():
9087
try:
9188
message = await subsription.get()
89+
90+
from_peer_id = ID(message.from_id).to_base58()
91+
if from_peer_id == self.host.get_id().pretty():
92+
continue
93+
9294
print(f"From: {ID(message.from_id).to_base58()}")
9395
print(f"Received: {message.data.decode('utf-8')}")
9496
except Exception:
@@ -131,35 +133,35 @@ async def command_executor(self, nursery):
131133
if cmd == "publish" and len(parts) > 2:
132134
await self.pubsub.publish(parts[1], parts[2].encode())
133135
print(f"Published: {parts[2]}")
134-
136+
135137
if cmd == "put" and len(parts) > 2:
136138
key = parts[1]
137139
value = parts[2].encode()
138-
140+
139141
await self.dht.put_value(key, value)
140142
print(f"Stored value: {value.decode()} with key: {key}")
141-
143+
142144
if cmd == "get" and len(parts) > 1:
143145
key = parts[1]
144-
146+
145147
retrieved_value = await self.dht.get_value(key)
146148
if retrieved_value:
147149
print(f"Retrieved value: {retrieved_value.decode()}")
148150
else:
149151
print("Failed to retrieve")
150-
152+
151153
if cmd == "advertize" and len(parts) > 1:
152154
content_id = parts[1]
153-
155+
154156
success = await self.dht.provide(content_id)
155157
if success:
156158
print(f"Advertised as provider for content: {content_id}")
157159
else:
158160
print("Failed to advertise as provider")
159-
161+
160162
if cmd == "get_provider" and len(parts) > 1:
161163
content_id = parts[1]
162-
164+
163165
providers = await self.dht.find_providers(content_id)
164166
if providers:
165167
print(
@@ -168,7 +170,7 @@ async def command_executor(self, nursery):
168170
)
169171
else:
170172
print("No providers found")
171-
173+
172174
if cmd == "local":
173175
maddr = self.host.get_addrs()[0]
174176
print(maddr)

examples/metrics/runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ async def main() -> None:
1414
listen_addrs = get_available_interfaces(0)
1515
node = Node(
1616
listen_addrs=listen_addrs,
17-
dht_role= "server",
17+
dht_role="server",
1818
)
1919

2020
async with (

libp2p/metrics/bandwidth.py

Lines changed: 0 additions & 86 deletions
This file was deleted.

libp2p/metrics/kad_dht.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# GET_PROVIDERS
1212
# ADD_PROVIDERS
1313

14+
1415
class KadDhtMetrics:
1516
inbound: Counter
1617
find_node: Counter
@@ -59,26 +60,18 @@ def __init__(self):
5960
def record(self, event: KadDhtEvent) -> None:
6061
if event.inbound:
6162
self.inbound.labels(peer_id=event.peer_id).inc()
62-
print("inbound")
6363

6464
if event.find_node:
6565
self.find_node.labels(peer_id=event.peer_id).inc()
66-
print("find_node")
6766

6867
if event.get_value:
6968
self.get_value.labels(peer_id=event.peer_id).inc()
70-
print("get_value")
7169

7270
if event.put_value:
7371
self.put_value.labels(peer_id=event.peer_id).inc()
74-
print("put_value")
7572

7673
if event.get_providers:
7774
self.get_providers.labels(peer_id=event.peer_id).inc()
78-
print("get_provider")
7975

8076
if event.add_provider:
8177
self.add_provider.labels(peer_id=event.peer_id).inc()
82-
print("add_provider")
83-
84-
print("\n")

libp2p/metrics/metrics.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from libp2p.metrics.gossipsub import GossipsubMetrics
99
from libp2p.metrics.kad_dht import KadDhtMetrics
1010
from libp2p.metrics.ping import PingMetrics
11+
from libp2p.metrics.swarm import SwarmEvent, SwarmMetrics
1112
from libp2p.pubsub.pubsub import GossipsubEvent
1213

1314

@@ -30,6 +31,7 @@ def __init__(self):
3031
self.ping = PingMetrics()
3132
self.gossipsub = GossipsubMetrics()
3233
self.kad_dht = KadDhtMetrics()
34+
self.swarm = SwarmMetrics()
3335

3436
async def start_prometheus_server(
3537
self,
@@ -62,3 +64,5 @@ async def start_prometheus_server(
6264
self.gossipsub.record(event)
6365
case KadDhtEvent():
6466
self.kad_dht.record(event)
67+
case SwarmEvent():
68+
self.swarm.record(event)

libp2p/metrics/relay.py

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)