Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,7 @@ jobs:
if [[ "${{ matrix.toxenv }}" == "wheel" ]]; then
python -m tox run -e windows-wheel
else
python -m tox run -e py${{ matrix.python-version }}-${{ matrix.toxenv }}
tox_python="py${{ matrix.python-version }}"
tox_python="${tox_python//./}"
python -m tox run -e "${tox_python}-${{ matrix.toxenv }}"
fi
25 changes: 25 additions & 0 deletions docs/examples.request_response.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Request/Response Demo
=====================

This example demonstrates the high-level libp2p request/response helper using a
single JSON request and response over a dedicated protocol stream.

.. code-block:: console

$ request-response-demo
Listener ready, listening on:
...

Copy the printed command into another terminal, for example:

.. code-block:: console

$ request-response-demo -d /ip4/127.0.0.1/tcp/8000/p2p/<PEER_ID> --message hello
Sent: hello
Received: {'message': 'hello', 'echo': 'HELLO', 'peer': '<PEER_ID>'}

The full source code for this example is below:

.. literalinclude:: ../examples/request_response/request_response_demo.py
:language: python
:linenos:
1 change: 1 addition & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Examples
examples.echo
examples.echo_quic
examples.ping
examples.request_response
examples.interop
examples.pubsub
examples.bitswap
Expand Down
37 changes: 37 additions & 0 deletions docs/libp2p.request_response.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
libp2p.request_response package
===============================

Submodules
----------

libp2p.request_response.api module
----------------------------------

.. automodule:: libp2p.request_response.api
:members:
:undoc-members:
:show-inheritance:

libp2p.request_response.codec module
------------------------------------

.. automodule:: libp2p.request_response.codec
:members:
:undoc-members:
:show-inheritance:

libp2p.request_response.exceptions module
-----------------------------------------

.. automodule:: libp2p.request_response.exceptions
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

.. automodule:: libp2p.request_response
:members:
:undoc-members:
:show-inheritance:
1 change: 1 addition & 0 deletions docs/libp2p.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Subpackages
libp2p.perf
libp2p.protocol_muxer
libp2p.pubsub
libp2p.request_response
libp2p.rcmgr
libp2p.records
libp2p.relay
Expand Down
1 change: 1 addition & 0 deletions examples/request_response/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Examples for the request_response helper."""
129 changes: 129 additions & 0 deletions examples/request_response/request_response_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

import argparse
import logging
import random
import secrets

import multiaddr
import trio

from libp2p import new_host
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.custom_types import TProtocol
from libp2p.peer.peerinfo import info_from_p2p_addr
from libp2p.request_response import JSONCodec, RequestResponse
from libp2p.utils.address_validation import (
find_free_port,
get_available_interfaces,
get_optimal_binding_address,
)

logging.basicConfig(level=logging.WARNING)
logging.getLogger("multiaddr").setLevel(logging.WARNING)
logging.getLogger("libp2p").setLevel(logging.WARNING)

PROTOCOL_ID = TProtocol("/example/request-response/1.0.0")


async def run(
port: int,
destination: str | None,
message: str,
seed: int | None = None,
) -> None:
if port <= 0:
port = find_free_port()
listen_addr = get_available_interfaces(port)

if seed is not None:
random.seed(seed)
secret_number = random.getrandbits(32 * 8)
secret = secret_number.to_bytes(length=32, byteorder="big")
else:
secret = secrets.token_bytes(32)

host = new_host(key_pair=create_new_key_pair(secret))
rr = RequestResponse(host)
codec = JSONCodec()

async def handler(request: dict[str, str], context) -> dict[str, str]:
return {
"message": request["message"],
"echo": request["message"].upper(),
"peer": str(context.peer_id),
}

async with host.run(listen_addrs=listen_addr), trio.open_nursery() as nursery:
nursery.start_soon(host.get_peerstore().start_cleanup_task, 60)
print(f"I am {host.get_id().to_string()}")

if not destination:
rr.set_handler(PROTOCOL_ID, handler=handler, codec=codec)
peer_id = host.get_id().to_string()
print("Listener ready, listening on:\n")
for addr in listen_addr:
print(f"{addr}/p2p/{peer_id}")

optimal_addr = get_optimal_binding_address(port)
optimal_addr_with_peer = f"{optimal_addr}/p2p/{peer_id}"
print(
"\nRun this from the same folder in another console:\n\n"
f"request-response-demo -d {optimal_addr_with_peer} --message hello\n"
)
print("Waiting for incoming requests...")
await trio.sleep_forever()

destination_str = destination
if destination_str is None:
raise ValueError("destination is required in dialer mode")
maddr = multiaddr.Multiaddr(destination_str)
info = info_from_p2p_addr(maddr)
await host.connect(info)
response = await rr.send_request(
peer_id=info.peer_id,
protocol_ids=[PROTOCOL_ID],
request={"message": message},
codec=codec,
)
print(f"Sent: {message}")
print(f"Received: {response}")


def main() -> None:
description = """
Demonstrates the request/response helper with a single JSON request and response.
Run once without -d to start a listener, then run again with -d to send a request.
"""
example_maddr = (
"/ip4/[HOST_IP]/tcp/8000/p2p/QmQn4SwGkDZKkUEpBRBvTmheQycxAHJUNmVEnjA2v1qe8Q"
)
parser = argparse.ArgumentParser(description=description)
parser.add_argument("-p", "--port", default=0, type=int, help="source port")
parser.add_argument(
"-d",
"--destination",
type=str,
help=f"destination multiaddr string, e.g. {example_maddr}",
)
parser.add_argument(
"--message",
type=str,
default="hello",
help="JSON message payload to send",
)
parser.add_argument(
"-s",
"--seed",
type=int,
help="seed the RNG to make peer IDs reproducible",
)
args = parser.parse_args()
try:
trio.run(run, args.port, args.destination, args.message, args.seed)
except KeyboardInterrupt:
pass


if __name__ == "__main__":
main()
51 changes: 25 additions & 26 deletions libp2p/io/msgio.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,17 @@
"""
``msgio`` is an implementation of `https://github.com/libp2p/go-msgio`.

from that repo: "a simple package to r/w length-delimited slices."

NOTE: currently missing the capability to indicate lengths by "varint" method.
From that repo: "a simple package to r/w length-delimited slices."
"""

from abc import (
abstractmethod,
)
from typing import (
Literal,
)

from libp2p.io.abc import (
MsgReadWriteCloser,
Reader,
ReadWriteCloser,
)
from libp2p.io.utils import (
read_exactly,
)
from libp2p.utils import (
decode_uvarint_from_stream,
encode_varint_prefixed,
)

from .exceptions import (
MessageTooLarge,
)
from abc import abstractmethod
from typing import Literal

from libp2p.io.abc import MsgReadWriteCloser, Reader, ReadWriteCloser
from libp2p.io.utils import read_exactly
from libp2p.utils import decode_uvarint_from_stream, encode_varint_prefixed

from .exceptions import MessageTooLarge

BYTE_ORDER: Literal["big", "little"] = "big"

Expand Down Expand Up @@ -87,6 +70,22 @@ def encode_msg(self, msg: bytes) -> bytes:
class VarIntLengthMsgReadWriter(BaseMsgReadWriter):
max_msg_size: int

def __init__(
self,
read_write_closer: ReadWriteCloser,
max_msg_size: int | None = None,
) -> None:
super().__init__(read_write_closer)
if max_msg_size is None:
if not hasattr(self, "max_msg_size"):
raise TypeError("max_msg_size is required")
effective_max_msg_size = self.max_msg_size
else:
effective_max_msg_size = max_msg_size
if effective_max_msg_size <= 0:
raise ValueError("max_msg_size must be greater than 0")
self.max_msg_size = effective_max_msg_size

async def next_msg_len(self) -> int:
msg_len = await decode_uvarint_from_stream(self.read_write_closer)
if msg_len > self.max_msg_size:
Expand Down
31 changes: 31 additions & 0 deletions libp2p/request_response/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from .api import RequestContext, RequestResponse, RequestResponseConfig
from .codec import BytesCodec, JSONCodec, RequestResponseCodec
from .exceptions import (
MessageTooLargeError,
ProtocolNotSupportedError,
RequestDecodeError,
RequestEncodeError,
RequestResponseError,
RequestTimeoutError,
RequestTransportError,
ResponseDecodeError,
ResponseEncodeError,
)

__all__ = [
"BytesCodec",
"JSONCodec",
"MessageTooLargeError",
"ProtocolNotSupportedError",
"RequestContext",
"RequestDecodeError",
"RequestEncodeError",
"RequestResponse",
"RequestResponseCodec",
"RequestResponseConfig",
"RequestResponseError",
"RequestTimeoutError",
"RequestTransportError",
"ResponseDecodeError",
"ResponseEncodeError",
]
Loading
Loading