Skip to content

Commit 528a1c5

Browse files
committed
fix: kad-dht metrics working now
1 parent 15063c0 commit 528a1c5

6 files changed

Lines changed: 114 additions & 34 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/ip4/172.16.68.73/tcp/22943/p2p/16Uiu2HAmLipMqhDY6J6pbQzNJRVE9Ztt1qZfK6raPb5UDRwVM3Gz

examples/metrics/coordinator.py

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
PingService,
99
handle_ping,
1010
)
11+
from libp2p.kad_dht.kad_dht import DHTMode, KadDHT
1112
from libp2p.peer.id import ID
1213
from libp2p.peer.peerinfo import info_from_p2p_addr
1314
from libp2p.pubsub.gossipsub import GossipSub
1415
from libp2p.pubsub.pubsub import Pubsub
16+
from libp2p.records.validator import Validator
17+
from libp2p.utils.paths import get_script_dir, join_paths
1518

1619
GOSSIPSUB_PROTOCOL_ID = TProtocol("/meshsub/1.0.0")
1720
COMMANDS = """
@@ -21,22 +24,37 @@
2124
- join <topic> - Subscribe to a topic
2225
- leave <topic> - Unsubscribe to a topic
2326
- publish <topic> <message> - Publish a message
27+
- put <key> <value> - Execute PUT_VALUE in DHT
28+
- get <key> - Execute GET_VALUE in DHT
29+
- advertize <content-id> - Execute ADD_PROVIDER in DHT
30+
- get_provider <content-id> - Execute GET_PROVIDERS in DHT
2431
- local - List local multiaddr
2532
- help - List the existing commands
2633
- exit - Shut down
2734
"""
2835

2936

37+
class ExampleValidator(Validator):
38+
def validate(self, key: str, value: bytes) -> None:
39+
if not value:
40+
raise ValueError("Value cannot be empty")
41+
42+
def select(self, key: str, values: list[bytes]) -> int:
43+
return 0
44+
45+
3046
class Node:
31-
def __init__(self, listen_addrs: list[multiaddr.Multiaddr]):
47+
def __init__(
48+
self, listen_addrs: list[multiaddr.Multiaddr], dht_role: str
49+
):
3250
# Create a libp2p-host
3351
self.host = new_host(listen_addrs=listen_addrs, enable_metrics=True)
3452

35-
# Setup PING service
53+
# PING
3654
self.host.set_stream_handler(PING_ID, handle_ping)
3755
self.ping_service = PingService(self.host)
3856

39-
# Set up Pubsub/Gossipsub
57+
# Pubsub/Gossipsub
4058
self.gossipsub = GossipSub(
4159
protocols=[GOSSIPSUB_PROTOCOL_ID],
4260
degree=3, # Number of peers to maintain in mesh
@@ -51,6 +69,14 @@ def __init__(self, listen_addrs: list[multiaddr.Multiaddr]):
5169
)
5270
self.pubsub = Pubsub(self.host, self.gossipsub)
5371

72+
# KAD-DHT
73+
if dht_role == "server":
74+
dht_mode = DHTMode.SERVER
75+
else:
76+
dht_mode = DHTMode.CLIENT
77+
self.dht = KadDHT(self.host, dht_mode)
78+
self.dht.register_validator("exp", ExampleValidator())
79+
5480
# CLI input send/receive channels
5581
self.input_send_channel, self.input_receive_channel = trio.open_memory_channel(
5682
100
@@ -105,7 +131,44 @@ async def command_executor(self, nursery):
105131
if cmd == "publish" and len(parts) > 2:
106132
await self.pubsub.publish(parts[1], parts[2].encode())
107133
print(f"Published: {parts[2]}")
108-
134+
135+
if cmd == "put" and len(parts) > 2:
136+
key = parts[1]
137+
value = parts[2].encode()
138+
139+
await self.dht.put_value(key, value)
140+
print(f"Stored value: {value.decode()} with key: {key}")
141+
142+
if cmd == "get" and len(parts) > 1:
143+
key = parts[1]
144+
145+
retrieved_value = await self.dht.get_value(key)
146+
if retrieved_value:
147+
print(f"Retrieved value: {retrieved_value.decode()}")
148+
else:
149+
print("Failed to retrieve")
150+
151+
if cmd == "advertize" and len(parts) > 1:
152+
content_id = parts[1]
153+
154+
success = await self.dht.provide(content_id)
155+
if success:
156+
print(f"Advertised as provider for content: {content_id}")
157+
else:
158+
print("Failed to advertise as provider")
159+
160+
if cmd == "get_provider" and len(parts) > 1:
161+
content_id = parts[1]
162+
163+
providers = await self.dht.find_providers(content_id)
164+
if providers:
165+
print(
166+
f"Found {len(providers)} providers: "
167+
f"{[p.peer_id for p in providers]}"
168+
)
169+
else:
170+
print("No providers found")
171+
109172
if cmd == "local":
110173
maddr = self.host.get_addrs()[0]
111174
print(maddr)

examples/metrics/runner.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,19 @@
33

44
from examples.metrics.coordinator import COMMANDS, Node
55
from libp2p.metrics.metrics import Metrics
6-
from libp2p.tools.async_service.trio_service import (
7-
background_trio_service,
8-
)
6+
from libp2p.tools.anyio_service.context import background_trio_service
97
from libp2p.utils.address_validation import get_available_interfaces
108

119

1210
async def main() -> None:
11+
promt_session = PromptSession()
12+
1313
# Create a libp2p-node instance
1414
listen_addrs = get_available_interfaces(0)
15-
node = Node(listen_addrs=listen_addrs)
15+
node = Node(
16+
listen_addrs=listen_addrs,
17+
dht_role= "server",
18+
)
1619

1720
async with (
1821
node.host.run(listen_addrs=listen_addrs),
@@ -23,34 +26,35 @@ async def main() -> None:
2326

2427
async with background_trio_service(node.pubsub):
2528
async with background_trio_service(node.gossipsub):
26-
await trio.sleep(1)
27-
await node.pubsub.wait_until_ready()
28-
print("Gossipsub and Pubsub services started !!")
29+
async with background_trio_service(node.dht):
30+
await trio.sleep(1)
31+
await node.pubsub.wait_until_ready()
32+
print("Gossipsub and Pubsub services started !!")
33+
print(f"DHT service started with {node.dht.mode} mode")
2934

30-
# METRICS
31-
metrics = Metrics()
32-
nursery.start_soon(
33-
metrics.start_prometheus_server, node.host.metric_recv_channel
34-
)
35-
nursery.start_soon(node.command_executor, nursery)
36-
await trio.sleep(1)
35+
# METRICS
36+
metrics = Metrics()
37+
nursery.start_soon(
38+
metrics.start_prometheus_server, node.host.metric_recv_channel
39+
)
40+
nursery.start_soon(node.command_executor, nursery)
41+
await trio.sleep(1)
3742

38-
print("Entering intractive mode, type commands below.")
39-
promt_session = PromptSession()
40-
print(COMMANDS)
43+
print("Entering intractive mode, type commands below.")
44+
print(COMMANDS)
4145

42-
while not node.termination_event.is_set():
43-
try:
44-
_ = await trio.to_thread.run_sync(input)
45-
user_input = await trio.to_thread.run_sync(
46-
lambda: promt_session.prompt("Command> ")
47-
)
48-
cmds = user_input.strip().split(" ", 2)
49-
await node.input_send_channel.send(cmds)
46+
while not node.termination_event.is_set():
47+
try:
48+
_ = await trio.to_thread.run_sync(input)
49+
user_input = await trio.to_thread.run_sync(
50+
lambda: promt_session.prompt("Command> ")
51+
)
52+
cmds = user_input.strip().split(" ", 2)
53+
await node.input_send_channel.send(cmds)
5054

51-
except Exception as e:
52-
print(f"Error in the interactive shell: {e}")
53-
await trio.sleep(1)
55+
except Exception as e:
56+
print(f"Error in the interactive shell: {e}")
57+
await trio.sleep(1)
5458

5559
print("Shutdown complete, Goodbye!")
5660

libp2p/kad_dht/kad_dht.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class KadDhtEvent:
114114
get_providers: bool = False
115115
add_provider: bool = False
116116

117+
117118
class KadDHT(Service):
118119
"""
119120
Kademlia DHT implementation for libp2p.
@@ -877,7 +878,8 @@ async def handle_stream(self, stream: INetStream) -> None:
877878
logger.warning(f"Failed to parse protobuf message: {proto_err}")
878879

879880
# Send KAD-DHT event to Metrics
880-
stream.metric_send_channel.send(event)
881+
if stream.metric_send_channel is not None:
882+
await stream.metric_send_channel.send(event)
881883

882884
await stream.close()
883885
except Exception as e:

libp2p/metrics/kad_dht.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,26 @@ def __init__(self):
5959
def record(self, event: KadDhtEvent) -> None:
6060
if event.inbound:
6161
self.inbound.labels(peer_id=event.peer_id).inc()
62+
print("inbound")
6263

6364
if event.find_node:
6465
self.find_node.labels(peer_id=event.peer_id).inc()
66+
print("find_node")
6567

6668
if event.get_value:
6769
self.get_value.labels(peer_id=event.peer_id).inc()
70+
print("get_value")
6871

6972
if event.put_value:
7073
self.put_value.labels(peer_id=event.peer_id).inc()
74+
print("put_value")
7175

7276
if event.get_providers:
7377
self.get_providers.labels(peer_id=event.peer_id).inc()
78+
print("get_provider")
7479

7580
if event.add_provider:
7681
self.add_provider.labels(peer_id=event.peer_id).inc()
82+
print("add_provider")
83+
84+
print("\n")

libp2p/pubsub/pubsub.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,9 @@ async def continuously_read_stream(self, stream: INetStream) -> None:
528528
)
529529
await self.router.handle_rpc(rpc_incoming, peer_id)
530530

531-
await stream.metric_send_channel.send(event)
531+
if stream.metric_send_channel is not None:
532+
await stream.metric_send_channel.send(event)
533+
532534
except StreamEOF:
533535
logger.debug(
534536
f"Stream closed for peer {peer_id}, exiting read loop cleanly."

0 commit comments

Comments
 (0)