diff --git a/.gitignore b/.gitignore index e46cc8aa6..04ff07df9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ __pycache__/ # C extensions *.so +my-kivy-app/ # Distribution / packaging *.egg diff --git a/libp2p/mobile/__init__.py b/libp2p/mobile/__init__.py new file mode 100644 index 000000000..e9480e409 --- /dev/null +++ b/libp2p/mobile/__init__.py @@ -0,0 +1,30 @@ +""" +Mobile compatibility layer for py-libp2p. + +This module provides mobile-compatible implementations of the libp2p transport layer, +designed to work with asyncio instead of trio for better mobile runtime support. +""" + +from .runtime import AsyncRuntimeAdapter, is_mobile_runtime, get_runtime_adapter +from .transport import MobileTCPTransport, MobileTCPListener +from .io import MobileAsyncStream, MobileBufferedStream +from .factory import create_tcp_transport, get_transport_class + +__all__ = [ + # Runtime adapter + "AsyncRuntimeAdapter", + "is_mobile_runtime", + "get_runtime_adapter", + + # Transport layer + "MobileTCPTransport", + "MobileTCPListener", + + # I/O layer + "MobileAsyncStream", + "MobileBufferedStream", + + # Factory functions + "create_tcp_transport", + "get_transport_class", +] diff --git a/libp2p/mobile/factory.py b/libp2p/mobile/factory.py new file mode 100644 index 000000000..f7153f745 --- /dev/null +++ b/libp2p/mobile/factory.py @@ -0,0 +1,53 @@ +""" +Transport factory for choosing between trio and mobile implementations. + +This module provides a factory that automatically chooses the appropriate +transport implementation based on the runtime environment. +""" + +from typing import Type + +from libp2p.abc import ITransport +from libp2p.mobile.runtime import is_mobile_runtime, get_runtime_adapter +from libp2p.mobile.transport import MobileTCPTransport + + +def create_tcp_transport() -> ITransport: + """ + Create a TCP transport appropriate for the current runtime. + + Returns: + ITransport: MobileTCPTransport for mobile, or original TCP for desktop + """ + runtime_adapter = get_runtime_adapter() + + if runtime_adapter.use_asyncio or is_mobile_runtime(): + # Use mobile-compatible transport + return MobileTCPTransport() + else: + # Use original trio-based transport for desktop + try: + from libp2p.transport.tcp.tcp import TCP + return TCP() + except ImportError: + # Fallback to mobile transport if trio is not available + return MobileTCPTransport() + + +def get_transport_class() -> Type[ITransport]: + """ + Get the transport class appropriate for the current runtime. + + Returns: + Type[ITransport]: Transport class to use + """ + runtime_adapter = get_runtime_adapter() + + if runtime_adapter.use_asyncio or is_mobile_runtime(): + return MobileTCPTransport + else: + try: + from libp2p.transport.tcp.tcp import TCP + return TCP + except ImportError: + return MobileTCPTransport diff --git a/libp2p/mobile/io.py b/libp2p/mobile/io.py new file mode 100644 index 000000000..ac22f920b --- /dev/null +++ b/libp2p/mobile/io.py @@ -0,0 +1,189 @@ +""" +Mobile-compatible I/O stream implementations. + +This module provides asyncio-based stream implementations that are compatible +with mobile platforms while maintaining the same interface as the trio-based +implementations. +""" + +import asyncio + +from libp2p.io.abc import ReadWriteCloser + + +class MobileAsyncStream(ReadWriteCloser): + """ + Mobile-compatible stream implementation using asyncio. + + This provides the same interface as TrioTCPStream but uses asyncio + StreamReader and StreamWriter for mobile compatibility. + """ + + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self.reader = reader + self.writer = writer + self._closed = False + + async def read(self, n: int | None = None) -> bytes: + """ + Read up to n bytes from the stream. + + Args: + n: Maximum number of bytes to read. If None, read until EOF. + + Returns: + bytes: The data read from the stream + + Raises: + Exception: If the stream is closed or an error occurs + """ + if self._closed: + raise Exception("Stream is closed") + + try: + if n is None: + # Read until EOF + return await self.reader.read() + else: + # Read exactly n bytes or until EOF + return await self.reader.read(n) + except Exception as e: + await self.close() + raise e + + async def write(self, data: bytes) -> None: + """ + Write data to the stream. + + Args: + data: The data to write + + Raises: + Exception: If the stream is closed or an error occurs + """ + if self._closed: + raise Exception("Stream is closed") + + try: + self.writer.write(data) + await self.writer.drain() + except Exception as e: + await self.close() + raise e + + async def close(self) -> None: + """ + Close the stream and clean up resources. + """ + if not self._closed: + self._closed = True + + if not self.writer.is_closing(): + self.writer.close() + try: + await self.writer.wait_closed() + except Exception: + # Ignore errors during close + pass + + def get_remote_address(self) -> tuple[str, int] | None: + """ + Return the remote address of the connected peer. + + Returns: + tuple[str, int] | None: (host, port) tuple or None if not available + """ + try: + peername = self.writer.get_extra_info('peername') + if peername and len(peername) >= 2: + return (str(peername[0]), int(peername[1])) + except Exception: + pass + return None + + def get_local_address(self) -> tuple[str, int] | None: + """ + Get the local address of the connection. + + Returns: + tuple[str, int] | None: (host, port) tuple or None if not available + """ + try: + sockname = self.writer.get_extra_info('sockname') + if sockname and len(sockname) >= 2: + return (str(sockname[0]), int(sockname[1])) + except Exception: + pass + return None + + @property + def closed(self) -> bool: + """ + Check if the stream is closed. + + Returns: + bool: True if the stream is closed + """ + return self._closed or self.writer.is_closing() + + +class MobileBufferedStream(MobileAsyncStream): + """ + Buffered version of MobileAsyncStream for improved performance. + + This adds buffering capabilities for scenarios where frequent small + reads/writes might impact performance on mobile devices. + """ + + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, + read_buffer_size: int = 8192, write_buffer_size: int = 8192): + super().__init__(reader, writer) + self._read_buffer_size = read_buffer_size + self._write_buffer_size = write_buffer_size + self._write_buffer = bytearray() + + async def write(self, data: bytes) -> None: + """ + Buffered write operation. + + Args: + data: The data to write + """ + if self._closed: + raise Exception("Stream is closed") + + self._write_buffer.extend(data) + + # Flush if buffer is full + if len(self._write_buffer) >= self._write_buffer_size: + await self.flush() + + async def flush(self) -> None: + """ + Flush the write buffer to the underlying stream. + """ + if self._closed: + raise Exception("Stream is closed") + + if self._write_buffer: + try: + self.writer.write(bytes(self._write_buffer)) + await self.writer.drain() + self._write_buffer.clear() + except Exception as e: + await self.close() + raise e + + async def close(self) -> None: + """ + Close the stream, flushing any remaining buffered data. + """ + if not self._closed: + try: + # Flush remaining data before closing + await self.flush() + except Exception: + # Ignore flush errors during close + pass + + await super().close() diff --git a/libp2p/mobile/runtime.py b/libp2p/mobile/runtime.py new file mode 100644 index 000000000..479e0b7ce --- /dev/null +++ b/libp2p/mobile/runtime.py @@ -0,0 +1,181 @@ +""" +Async runtime adapter for mobile compatibility. + +This module provides an abstraction layer that allows py-libp2p to work with +different async runtimes (trio for desktop, asyncio for mobile). +""" + +import sys +import asyncio +from contextlib import asynccontextmanager +from typing import Any, Callable, Coroutine, TypeVar, Awaitable +from collections.abc import AsyncIterator + +T = TypeVar('T') + +# Optional trio import +try: + import trio + HAS_TRIO = True +except ImportError: + HAS_TRIO = False + trio = None + + +def is_mobile_runtime() -> bool: + """ + Detect if running on a mobile platform. + + Returns: + bool: True if running on Android, iOS, or in a WASM environment (Pyodide) + """ + return ( + hasattr(sys, 'getandroidapilevel') or # Android + sys.platform == 'ios' or # iOS + 'pyodide' in sys.modules or # Pyodide/WASM + 'androidsdk' in sys.modules # Python-for-Android + ) + + +class AsyncRuntimeAdapter: + """ + Adapter that provides a unified interface for different async runtimes. + + On mobile platforms, uses asyncio. On desktop, can use trio or asyncio. + """ + + def __init__(self, force_asyncio: bool = False): + """ + Initialize the runtime adapter. + + Args: + force_asyncio: If True, force use of asyncio even on desktop + """ + self.use_asyncio = force_asyncio or is_mobile_runtime() + + if self.use_asyncio: + self.runtime_name = 'asyncio' + else: + if HAS_TRIO: + self.runtime_name = 'trio' + else: + # Fallback to asyncio if trio is not available + self.use_asyncio = True + self.runtime_name = 'asyncio' + + @asynccontextmanager + async def create_nursery(self) -> AsyncIterator['MobileNursery']: + """ + Create a nursery/task group for concurrent task execution. + + Yields: + MobileNursery: A nursery object that can spawn tasks + """ + if self.use_asyncio: + # Use TaskGroup for Python 3.11+, fallback for older versions + try: + async with asyncio.TaskGroup() as task_group: + yield AsyncioNursery(task_group) + except AttributeError: + # Fallback for Python < 3.11 + nursery = AsyncioNurseryLegacy() + try: + yield nursery + finally: + await nursery.close() + else: + if HAS_TRIO and trio is not None: + async with trio.open_nursery() as nursery: + yield TrioNursery(nursery) + else: + # Fallback if trio is not available + nursery = AsyncioNurseryLegacy() + try: + yield nursery + finally: + await nursery.close() + + async def start_task(self, func: Callable[..., Awaitable[T]], *args: Any) -> T: + """ + Start a task and return when it completes. + + Args: + func: Async function to run + *args: Arguments to pass to the function + + Returns: + The result of the function + """ + if self.use_asyncio: + return await func(*args) + else: + import trio + return await func(*args) + + +class MobileNursery: + """Base class for nursery implementations.""" + + async def start_soon(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any) -> None: + """Start a task without waiting for it to complete.""" + raise NotImplementedError + + +class AsyncioNursery(MobileNursery): + """Nursery implementation using asyncio.TaskGroup (Python 3.11+).""" + + def __init__(self, task_group: asyncio.TaskGroup): + self.task_group = task_group + + async def start_soon(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any) -> None: + """Start a task in the task group.""" + coro = func(*args) + self.task_group.create_task(coro) + + +class AsyncioNurseryLegacy(MobileNursery): + """Nursery implementation for older Python versions without TaskGroup.""" + + def __init__(self): + self.tasks: list[asyncio.Task] = [] + + async def start_soon(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any) -> None: + """Start a task and track it.""" + coro = func(*args) + task = asyncio.create_task(coro) + self.tasks.append(task) + + async def close(self) -> None: + """Wait for all tasks to complete.""" + if self.tasks: + await asyncio.gather(*self.tasks, return_exceptions=True) + + +class TrioNursery(MobileNursery): + """Nursery implementation using trio.""" + + def __init__(self, nursery): + self.nursery = nursery + + async def start_soon(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any) -> None: + """Start a task in the trio nursery.""" + if HAS_TRIO and trio: + self.nursery.start_soon(func, *args) + else: + # This shouldn't happen if trio is available, but just in case + raise RuntimeError("Trio nursery not available") + + +# Global runtime adapter instance +_runtime_adapter = AsyncRuntimeAdapter() + + +def get_runtime_adapter() -> AsyncRuntimeAdapter: + """Get the global runtime adapter instance.""" + return _runtime_adapter + + +def set_runtime_adapter(adapter: AsyncRuntimeAdapter) -> None: + """Set a custom runtime adapter.""" + global _runtime_adapter + _runtime_adapter = adapter diff --git a/libp2p/mobile/transport.py b/libp2p/mobile/transport.py new file mode 100644 index 000000000..b2a40fdd8 --- /dev/null +++ b/libp2p/mobile/transport.py @@ -0,0 +1,219 @@ +""" +Mobile-compatible transport implementations for py-libp2p. + +This module provides asyncio-based transport implementations that can run +on mobile platforms while maintaining compatibility with the existing +trio-based desktop implementation. +""" + +import asyncio +import logging +import socket +from typing import Any, Callable, Optional, Sequence +from collections.abc import Awaitable + +from multiaddr import Multiaddr + +from libp2p.abc import IListener, IRawConnection, ITransport +from libp2p.custom_types import THandler +from libp2p.network.connection.raw_connection import RawConnection +from libp2p.transport.exceptions import OpenConnectionError +from .runtime import get_runtime_adapter, MobileNursery +from libp2p.mobile.io import MobileAsyncStream + +logger = logging.getLogger("libp2p.mobile.transport") + + +class MobileTCPListener(IListener): + """ + Mobile-compatible TCP listener using asyncio. + + This listener can coexist with the trio-based TCP listener and provides + the same interface while using asyncio for mobile compatibility. + """ + + def __init__(self, handler_function: THandler) -> None: + self.handler = handler_function + self.servers: list[asyncio.Server] = [] + self.addresses: list[Multiaddr] = [] + self._runtime_adapter = get_runtime_adapter() + + async def listen(self, maddr: Multiaddr, nursery: Any) -> bool: + """ + Start listening on the specified multiaddress. + + Args: + maddr: The multiaddress to listen on + nursery: Nursery for task management (can be trio or mobile nursery) + + Returns: + True if listening started successfully, False otherwise + """ + tcp_port_str = maddr.value_for_protocol("tcp") + if tcp_port_str is None: + logger.error(f"Cannot listen: TCP port is missing in multiaddress {maddr}") + return False + + try: + tcp_port = int(tcp_port_str) + except ValueError: + logger.error( + f"Cannot listen: Invalid TCP port '{tcp_port_str}' " + f"in multiaddress {maddr}" + ) + return False + + ip4_host_str = maddr.value_for_protocol("ip4") + # None means listen on all interfaces + host = ip4_host_str or "0.0.0.0" + + try: + if self._runtime_adapter.use_asyncio: + await self._start_asyncio_server(host, tcp_port, nursery) + else: + # Fallback to trio behavior for compatibility + await self._start_with_nursery(host, tcp_port, nursery) + + # Store the address we're actually listening on + actual_addr = f"/ip4/{host}/tcp/{tcp_port}" + self.addresses.append(Multiaddr(actual_addr)) + + logger.info(f"Mobile TCP listener started on {actual_addr}") + return True + + except Exception as e: + logger.error(f"Failed to start TCP listener for {maddr}: {e}") + return False + + async def _start_asyncio_server(self, host: str, port: int, nursery: Any) -> None: + """Start server using asyncio.""" + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + """Handle incoming client connection.""" + try: + stream = MobileAsyncStream(reader, writer) + await self.handler(stream) + except Exception as e: + logger.debug(f"Connection handling failed: {e}") + if not writer.is_closing(): + writer.close() + await writer.wait_closed() + + server = await asyncio.start_server(handle_client, host, port) + self.servers.append(server) + + # Start serving in the background + if hasattr(nursery, 'start_soon'): + # This is our mobile nursery + await nursery.start_soon(server.serve_forever) + else: + # Start serving in background task + asyncio.create_task(server.serve_forever()) + + async def _start_with_nursery(self, host: str, port: int, nursery: Any) -> None: + """Start server using nursery (trio compatibility).""" + # For trio compatibility when not in mobile mode + # This would delegate to the original trio implementation + # For now, fallback to asyncio + await self._start_asyncio_server(host, port, nursery) + + def get_addrs(self) -> tuple[Multiaddr, ...]: + """ + Get the addresses this listener is bound to. + + Returns: + Tuple of multiaddresses + """ + return tuple(self.addresses) + + async def close(self) -> None: + """Close all servers and clean up.""" + for server in self.servers: + server.close() + await server.wait_closed() + self.servers.clear() + self.addresses.clear() + + +class MobileTCPTransport(ITransport): + """ + Mobile-compatible TCP transport using asyncio. + + This transport can coexist with the trio-based TCP transport and provides + the same interface while using asyncio for mobile compatibility. + """ + + def __init__(self) -> None: + self._runtime_adapter = get_runtime_adapter() + + async def dial(self, maddr: Multiaddr) -> IRawConnection: + """ + Dial a peer on the specified multiaddress. + + Args: + maddr: The multiaddress of the peer to dial + + Returns: + IRawConnection: The established connection + + Raises: + OpenConnectionError: If the connection cannot be established + """ + host_str = maddr.value_for_protocol("ip4") + port_str = maddr.value_for_protocol("tcp") + + if host_str is None: + raise OpenConnectionError( + f"Failed to dial {maddr}: IP address not found in multiaddr." + ) + + if port_str is None: + raise OpenConnectionError( + f"Failed to dial {maddr}: TCP port not found in multiaddr." + ) + + try: + port_int = int(port_str) + except ValueError: + raise OpenConnectionError( + f"Failed to dial {maddr}: Invalid TCP port '{port_str}'." + ) + + try: + if self._runtime_adapter.use_asyncio: + # Use asyncio for mobile compatibility + reader, writer = await asyncio.open_connection(host_str, port_int) + stream = MobileAsyncStream(reader, writer) + else: + # Fallback to trio behavior for desktop compatibility + # In a real implementation, this would use the original trio transport + # For now, we'll use asyncio as fallback + reader, writer = await asyncio.open_connection(host_str, port_int) + stream = MobileAsyncStream(reader, writer) + + return RawConnection(stream, True) # True indicates we initiated the connection + + except OSError as error: + raise OpenConnectionError( + f"Failed to open TCP stream to {maddr}: {error}" + ) from error + except Exception as error: + raise OpenConnectionError( + f"An unexpected error occurred when dialing {maddr}: {error}" + ) from error + + def create_listener(self, handler_function: THandler) -> MobileTCPListener: + """ + Create a mobile-compatible TCP listener. + + Args: + handler_function: Function to handle incoming connections + + Returns: + MobileTCPListener: The created listener + """ + return MobileTCPListener(handler_function) + + +def _multiaddr_from_socket_info(host: str, port: int) -> Multiaddr: + """Create a multiaddr from host and port information.""" + return Multiaddr(f"/ip4/{host}/tcp/{port}")