The src/lib.rs module provides a Rust API for a2a messaging, enabling integration with Rust-based systems, CLI tools, and high-performance services.
# Add to your Cargo.toml
[dependencies]
rusqlite = { version = "0.29", features = ["bundled"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Clone the a2a library
cargo new a2a-project
cp src/lib.rs a2a-project/src/use a2a::Client;
fn main() {
// Initialize client
let client = Client::new("my-project", "alice");
// Send a message (4th arg is optional thread_id)
match client.send("bob", "Hello Bob!", None, None) {
Ok(msg_id) => println!("Sent message {}", msg_id),
Err(e) => eprintln!("Error: {}", e),
}
// Receive messages (blocks up to 10 seconds)
match client.recv(10, true, false, None) {
Ok(messages) => {
for msg in messages {
println!("{}: {}", msg.sender, msg.body);
}
}
Err(e) => eprintln!("Error: {}", e),
}
// Broadcast
let _ = client.send("all", "Hello everyone!", None, None);
// Mark done
let _ = client.set_status("done");
}Create a new client. Returns an error if the project or agent_id is empty, contains path separators, or starts with a dot.
let client = Client::new("my-project", "alice").expect("invalid project or agent_id");Send a message. Set to to "all", "*", or "broadcast" for broadcast messages.
Pass None for ttl_seconds (no expiry) and thread_id (no thread grouping).
let ttl = Some(3600i64);
match client.send("bob", "Hello", ttl, Some("thread-1")) {
Ok(msg_id) => println!("Sent: {}", msg_id),
Err(e) => eprintln!("Error: {}", e),
}Receive messages. Returns after finding messages, or after wait seconds.
match client.recv(30, true, false, Some(10)) {
Ok(messages) => println!("Received {} messages", messages.len()),
Err(e) => eprintln!("Error: {}", e),
}View recent messages without marking as read.
match client.peek(50) {
Ok(messages) => println!("Recent: {}", messages.len()),
Err(e) => eprintln!("Error: {}", e),
}Get registered agents.
match client.list_peers() {
Ok(peers) => {
for p in peers {
println!("{}: {}", p.id, p.status);
}
}
Err(e) => eprintln!("Error: {}", e),
}Update agent status (active/idle/done/blocked). Returns an error if the status is not one of the valid values.
client.set_status("done")?;Check agent status.
match client.get_status(Some("bob")) {
Ok(status) => println!("Bob: {:?}", status),
Err(e) => eprintln!("Error: {}", e),
}Update this agent's last_seen timestamp. Useful for heartbeat/keep-alive
signaling.
client.touch()?;Search messages by substring (case-insensitive). Returns an error if the query is empty or limit is not a positive integer.
match client.search("important", 100) {
Ok(results) => println!("Found {} messages", results.len()),
Err(e) => eprintln!("Error: {}", e),
}Get all messages in a thread.
match client.thread("42") {
Ok(messages) => println!("Thread: {} messages", messages.len()),
Err(e) => eprintln!("Error: {}", e),
}Register this agent on the bus.
match client.register("researcher", "Research things", "rust", None, true) {
Ok(_) => println!("Registered"),
Err(e) => eprintln!("Error: {}", e),
}Remove this agent from the bus.
match client.unregister() {
Ok(_) => println!("Unregistered"),
Err(e) => eprintln!("Error: {}", e),
}Get aggregated bus statistics.
match client.stats() {
Ok(stats) => println!("Total messages: {}", stats.messages),
Err(e) => eprintln!("Error: {}", e),
}Alias for list_peers().
let peers = client.list()?;Get or set agent status. If new_status is Some, sets and returns Ok(None). If None, returns the current status.
// Get current status
let current = client.status(None)?;
// Set status
client.status(Some("idle"))?;Alias for wait_for_messages().
Block until count unread messages arrive or timeout elapses. Returns true if enough messages arrived.
let got_enough = client.wait_for_messages(3, 30.0)?;
if !got_enough {
println!("Timed out waiting for messages");
}Initialize the project database, creating tables if they don't exist. Safe to call multiple times.
client.init_project()?;Delete the project database and all WAL-related files. Warning: permanently deletes all messages and agent registrations.
client.clear()?;Get resolved project information (project name, database path, whether DB exists).
let info = client.project_info();
println!("Project: {}, DB: {}, exists: {}", info.project, info.db, info.exists);use a2a::Client;
use serde_json::{json, from_str};
#[derive(serde::Deserialize)]
struct Task {
id: String,
work: String,
}
fn main() {
let client = Client::new("production", "worker-1");
let _ = client.set_status("active");
loop {
// Wait for task (30 second timeout)
match client.recv(30, true, false, Some(1)) {
Ok(messages) => {
if messages.is_empty() {
println!("No tasks, exiting");
break;
}
// Parse task
if let Ok(task) = from_str::<Task>(&messages[0].body) {
println!("Processing task {}: {}", task.id, task.work);
// Do work
std::thread::sleep(std::time::Duration::from_secs(1));
// Report completion
let result = json!({
"task_id": task.id,
"status": "complete",
"time": chrono::Local::now().to_rfc3339()
});
let _ = client.send(
"coordinator",
&result.to_string(),
None,
None
);
}
}
Err(e) => eprintln!("Error: {}", e),
}
}
let _ = client.set_status("done");
}# Build the library
cargo build --release
# Run tests
cargo test
# Build examples
cargo build --examples
# Run example
cargo run --example task_workerDirect database access:
- send(): ~5ms
- recv(): ~10ms per poll
- search(): ~20ms for 1000 messages
- Thread-safe with SQLite WAL mode
FROM rust:1.75-alpine
WORKDIR /app
COPY . .
RUN cargo build --release
CMD ["./target/release/worker"][Unit]
Description=a2a Rust Worker
After=network.target
[Service]
Type=simple
User=a2a
ExecStart=/opt/a2a/worker
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target- CLIENT_API.md — Python client
- NODE_CLIENT_API.md — Node.js client
- GO_CLIENT_API.md — Go client
- REST_API.md — HTTP interface