The a2a_client.py module provides a Python object-oriented API for a2a messaging, eliminating the need to shell out to the a2a CLI.
# Copy a2a_client.py to your project or install via pip (if published)
cp a2a_client.py /path/to/your/project/from a2a_client import A2AClient
# Initialize client
client = A2AClient(project="my-project", agent_id="alice")
# Send a message
msg_id = client.send("bob", "Hello Bob!")
# Receive messages (blocks up to 10 seconds)
messages = client.recv(wait=10)
for msg in messages:
print(f"{msg['sender']}: {msg['body']}")
# Broadcast
client.send("all", "Hello everyone!")
# Mark yourself done
client.set_status("done")client = A2AClient(project: str, agent_id: str)Parameters:
project: Project name (also respects$A2A_PROJECTenvironment variable)agent_id: This agent's unique ID
Note: The database is expected to exist at ~/.a2a/{project}/database.db.
success = client.register(
role: str,
prompt: str = "",
cli: str = "",
pid: int | None = None,
upsert: bool = True
) -> boolRegister this agent on the bus. Must be called before send/recv.
Parameters:
role: Agent's role description (e.g., "developer", "critic")prompt: Optional system prompt sent to peer agentscli: CLI tool name (e.g., "claude", "opencode")pid: Process ID (optional)upsert: Update existing registration if True (default: True, preserves original created_at)
Returns: True on success
Example:
client.register("researcher", cli="python", upsert=True)success = client.unregister() -> boolRemove this agent from the bus.
Returns: True on success
Example:
client.unregister()msg_id = client.send(
to: str,
message: str,
ttl_seconds: Optional[int] = None,
thread_id: Optional[str] = None
) -> intSend a message to a peer.
Parameters:
to: Recipient agent ID, or"all"/"*"/"broadcast"for broadcastmessage: Message body (plain text)ttl_seconds: Optional time-to-live in seconds. Messages expire after this duration.thread_id: Optional thread/topic ID to group related messages
Returns: Message ID (integer)
Example:
# Direct message
msg_id = client.send("bob", "Are you there?")
# Broadcast
client.send("all", "Team standup in 5 minutes")
# Message with expiry
client.send("bob", "Urgent task", ttl_seconds=3600) # Expires in 1 hourmessages = client.recv(
wait: float = 0,
unread_only: bool = True,
include_self: bool = False,
limit: int = 0
) -> List[Dict[str, Any]]Receive messages addressed to this agent.
Parameters:
wait: Block up to N seconds waiting for messages (0 = don't block)unread_only: Only return messages not yet read by this agent (default: True)include_self: Include messages sent by this agent (default: False)limit: Max messages to return (0 = unlimited)
Returns: List of message dicts with keys:
id: Message IDsender: Sender agent IDrecipient: Recipient agent ID (None for broadcast)body: Message textthread_id: Optional thread/topic IDcreated_at: Timestamp
Example:
# Wait for next unread message (up to 30s)
messages = client.recv(wait=30)
# Get all messages including already-read
messages = client.recv(unread_only=False, limit=50)
# Include messages I sent myself
messages = client.recv(wait=10, include_self=True)messages = client.peek(limit: int = 20) -> List[Dict[str, Any]]View recent messages without marking them as read (observer mode).
Parameters:
limit: Max messages to return (default: 20)
Returns: List of message dicts (same format as recv())
Example:
# See last 50 messages
recent = client.peek(limit=50)
for msg in recent:
print(f"[{msg['created_at']}] {msg['sender']} -> {msg['recipient'] or 'ALL'}")peers = client.list_peers() -> List[Dict[str, Any]]Get roster of registered agents.
Returns: List of peer dicts with keys:
id: Agent IDrole: Agent role (optional)cli: CLI used (claude/opencode/pi/python/etc)status: Agent status (active/idle/done/blocked)pid: Process ID (if available)
Example:
peers = client.list_peers()
active = [p for p in peers if p['status'] == 'active']
print(f"Active agents: {[p['id'] for p in active]}")client.set_status(status: str) -> NoneUpdate this agent's status to signal state to peers.
Parameters:
status: One of'active','idle','done','blocked'
Example:
# Signal that work is done
client.set_status("done")
# Signal that waiting for something
client.set_status("blocked")
# Resume activity
client.set_status("active")status = client.get_status(agent_id: Optional[str] = None) -> Optional[str]Check an agent's status.
Parameters:
agent_id: Agent to check (defaults to self.agent_id if omitted)
Returns: Status string or None if agent not found
Example:
# Check self
my_status = client.get_status()
# Check peer
bob_status = client.get_status("bob")success = client.wait_for_messages(
count: int = 1,
timeout: float = 60
) -> boolBlock until N unread messages arrive or timeout.
Parameters:
count: Number of unread messages to wait fortimeout: Max seconds to wait
Returns: True if got N messages, False on timeout
Example:
# Wait for 3 responses before proceeding
if client.wait_for_messages(count=3, timeout=30):
responses = client.recv()
else:
print("Timeout: only got", len(client.recv()), "responses")Update this agent's last_seen timestamp. Useful for heartbeat/keep-alive
signaling.
client.touch() -> NoneExample:
# Send a heartbeat every 60 seconds
while True:
client.touch()
time.sleep(60)messages = client.search(
query: str,
limit: int = 50
) -> List[Dict[str, Any]]Search all messages by content substring (case-insensitive).
Parameters:
query: Search substring (case-insensitive)limit: Max messages to return (must be positive)
Raises:
ValueError: If query is empty or limit is not a positive integer
Returns: List of matching message dicts (sorted by creation time, newest first)
Example:
# Find all messages about a bug
bugs = client.search("bug", limit=100)
for msg in bugs:
print(f"{msg['sender']}: {msg['body']}")
# Search for task assignments
tasks = client.search("assign", limit=50)messages = client.thread(thread_id: str) -> List[Dict[str, Any]]Get all messages in a specific thread.
Parameters:
thread_id: Thread ID (must match a message'sthread_idfield)
Returns: List of message dicts in thread, ordered chronologically
Example:
# Get all messages in a thread
thread_messages = client.thread("my-thread-id")
print(f"Thread has {len(thread_messages)} messages:")
for msg in thread_messages:
print(f" {msg['sender']}: {msg['body']}")stats = client.stats() -> Dict[str, Any]Get aggregated bus statistics.
Returns: Dict with keys:
messages: Total message countdirect_messages: Direct (non-broadcast) message countbroadcasts: Broadcast message countthreads: Number of distinct threadsagents_active: Count of agents with status='active'agents_done: Count of agents with status='done'top_senders: List of top 5 senders (dicts withagentandcount)
Example:
stats = client.stats()
print(f"Bus stats:")
print(f" Messages: {stats['messages']} ({stats['direct_messages']} direct, {stats['broadcasts']} broadcast)")
print(f" Threads: {stats['threads']}")
print(f" Agents: {stats['agents_active']} active, {stats['agents_done']} done")
print(f" Top senders: {stats['top_senders']}")Initialize the project database, creating tables if they don't exist. Safe to call multiple times.
client.init_project()Get resolved project information.
info = client.project_info()
# Returns: {"project": "my-project", "db": "/root/.a2a/my-project/database.db", "exists": True}Delete the project database and all WAL-related files. Warning: permanently deletes all messages and agent registrations.
client.clear()An async variant is available in a2a_client_async.py (class A2AClientAsync)
using aiosqlite. It mirrors the sync API surface but all methods are
async def. Usage:
import asyncio
from a2a_client_async import A2AClientAsync
async def main():
client = A2AClientAsync(project="my-project", agent_id="alice")
await client.register("dev")
msg_id = await client.send("bob", "Hello")
messages = await client.recv(wait=10)
await client.set_status("done")
asyncio.run(main())Requires aiosqlite (pip install aiosqlite). See
test_async_modules.py for the full test suite.
from a2a_client import A2AClient
import time
def main():
# Initialize
client = A2AClient(project="my-project", agent_id="researcher")
# List peers
peers = [p['id'] for p in client.list_peers() if p['id'] != "researcher"]
print(f"Found peers: {peers}")
# Broadcast question
client.send("all", "What are the top 3 features you'd prioritize?")
# Wait for responses
print("Waiting for 3 responses...")
if not client.wait_for_messages(count=3, timeout=30):
print("Timeout!")
# Collect responses
responses = client.recv(unread_only=True)
for msg in responses:
print(f" {msg['sender']}: {msg['body']}")
# Summarize findings
summary = f"Received {len(responses)} responses on feature prioritization."
client.send("all", f"Summary: {summary}")
# Mark done
client.set_status("done")
print("Done!")
if __name__ == "__main__":
main()The client raises exceptions for common errors:
from a2a_client import A2AClient
try:
client = A2AClient("nonexistent-project", "alice")
client.send("bob", "Hello")
except Exception as e:
print(f"Error: {e}")Common errors:
- ValueError: Empty project or agent_id in constructor
- sqlite3.OperationalError: Database schema issue (call
client.register()ora2a initfirst) - sqlite3.IntegrityError: Agent not registered (call
client.register()first)
The client uses direct SQLite connections:
- send(): ~5ms per message
- recv(): ~10ms per poll (when
wait > 0) - peek(): ~5ms per query
- list_peers(): ~5ms
No subprocess overhead (unlike the CLI), making it suitable for high-frequency messaging.
The Python client can coexist with CLI agents:
# Start CLI agent in background
python3 examples/researcher_agent.py &
# Use Python client to send messages
python3 << 'EOF'
from a2a_client import A2AClient
client = A2AClient("my-project", "python-agent")
client.send("researcher", "Please investigate X")
responses = client.recv(wait=30)
EOFUnit tests are in test_a2a_client.py:
python3 test_a2a_client.py -vCovers all methods with fresh databases, concurrent access, read-tracking, TTL, etc.