Skip to content

Latest commit

 

History

History
372 lines (274 loc) · 8.13 KB

File metadata and controls

372 lines (274 loc) · 8.13 KB

a2a Rust Client Library

The src/lib.rs module provides a Rust API for a2a messaging, enabling integration with Rust-based systems, CLI tools, and high-performance services.

Installation

# Add to your Cargo.toml
[dependencies]
rusqlite = { version = "0.29", features = ["bundled"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

# Clone the a2a library
cargo new a2a-project
cp src/lib.rs a2a-project/src/

Quick Start

use a2a::Client;

fn main() {
    // Initialize client
    let client = Client::new("my-project", "alice");

    // Send a message (4th arg is optional thread_id)
    match client.send("bob", "Hello Bob!", None, None) {
        Ok(msg_id) => println!("Sent message {}", msg_id),
        Err(e) => eprintln!("Error: {}", e),
    }

    // Receive messages (blocks up to 10 seconds)
    match client.recv(10, true, false, None) {
        Ok(messages) => {
            for msg in messages {
                println!("{}: {}", msg.sender, msg.body);
            }
        }
        Err(e) => eprintln!("Error: {}", e),
    }

    // Broadcast
    let _ = client.send("all", "Hello everyone!", None, None);

    // Mark done
    let _ = client.set_status("done");
}

API Reference

Client::new(project, agent_id) -> Result<Client, ValidationError>

Create a new client. Returns an error if the project or agent_id is empty, contains path separators, or starts with a dot.

let client = Client::new("my-project", "alice").expect("invalid project or agent_id");

send(to, message, ttl_seconds, thread_id) -> Result

Send a message. Set to to "all", "*", or "broadcast" for broadcast messages. Pass None for ttl_seconds (no expiry) and thread_id (no thread grouping).

let ttl = Some(3600i64);
match client.send("bob", "Hello", ttl, Some("thread-1")) {
    Ok(msg_id) => println!("Sent: {}", msg_id),
    Err(e) => eprintln!("Error: {}", e),
}

recv(wait, unread_only, include_self, limit) -> Result<Vec>

Receive messages. Returns after finding messages, or after wait seconds.

match client.recv(30, true, false, Some(10)) {
    Ok(messages) => println!("Received {} messages", messages.len()),
    Err(e) => eprintln!("Error: {}", e),
}

peek(limit) -> Result<Vec>

View recent messages without marking as read.

match client.peek(50) {
    Ok(messages) => println!("Recent: {}", messages.len()),
    Err(e) => eprintln!("Error: {}", e),
}

list_peers() -> Result<Vec>

Get registered agents.

match client.list_peers() {
    Ok(peers) => {
        for p in peers {
            println!("{}: {}", p.id, p.status);
        }
    }
    Err(e) => eprintln!("Error: {}", e),
}

set_status(status) -> Result<()>

Update agent status (active/idle/done/blocked). Returns an error if the status is not one of the valid values.

client.set_status("done")?;

get_status(agent_id) -> Result<Option>

Check agent status.

match client.get_status(Some("bob")) {
    Ok(status) => println!("Bob: {:?}", status),
    Err(e) => eprintln!("Error: {}", e),
}

touch() -> Result<()>

Update this agent's last_seen timestamp. Useful for heartbeat/keep-alive signaling.

client.touch()?;

search(query, limit) -> Result<Vec>

Search messages by substring (case-insensitive). Returns an error if the query is empty or limit is not a positive integer.

match client.search("important", 100) {
    Ok(results) => println!("Found {} messages", results.len()),
    Err(e) => eprintln!("Error: {}", e),
}

thread(thread_id) -> Result<Vec>

Get all messages in a thread.

match client.thread("42") {
    Ok(messages) => println!("Thread: {} messages", messages.len()),
    Err(e) => eprintln!("Error: {}", e),
}

register(role, prompt, cli, pid, upsert) -> Result

Register this agent on the bus.

match client.register("researcher", "Research things", "rust", None, true) {
    Ok(_) => println!("Registered"),
    Err(e) => eprintln!("Error: {}", e),
}

unregister() -> Result

Remove this agent from the bus.

match client.unregister() {
    Ok(_) => println!("Unregistered"),
    Err(e) => eprintln!("Error: {}", e),
}

stats() -> Result

Get aggregated bus statistics.

match client.stats() {
    Ok(stats) => println!("Total messages: {}", stats.messages),
    Err(e) => eprintln!("Error: {}", e),
}

list() -> Result<Vec>

Alias for list_peers().

let peers = client.list()?;

status(new_status: Option<&str>) -> Result<Option>

Get or set agent status. If new_status is Some, sets and returns Ok(None). If None, returns the current status.

// Get current status
let current = client.status(None)?;

// Set status
client.status(Some("idle"))?;

wait(count, timeout_secs) -> Result

Alias for wait_for_messages().

wait_for_messages(count: i64, timeout: f64) -> Result

Block until count unread messages arrive or timeout elapses. Returns true if enough messages arrived.

let got_enough = client.wait_for_messages(3, 30.0)?;
if !got_enough {
    println!("Timed out waiting for messages");
}

init_project() -> Result<()>

Initialize the project database, creating tables if they don't exist. Safe to call multiple times.

client.init_project()?;

clear() -> Result<()>

Delete the project database and all WAL-related files. Warning: permanently deletes all messages and agent registrations.

client.clear()?;

project_info() -> ProjectInfo

Get resolved project information (project name, database path, whether DB exists).

let info = client.project_info();
println!("Project: {}, DB: {}, exists: {}", info.project, info.db, info.exists);

Example: Task Worker

use a2a::Client;
use serde_json::{json, from_str};

#[derive(serde::Deserialize)]
struct Task {
    id: String,
    work: String,
}

fn main() {
    let client = Client::new("production", "worker-1");
    let _ = client.set_status("active");

    loop {
        // Wait for task (30 second timeout)
        match client.recv(30, true, false, Some(1)) {
            Ok(messages) => {
                if messages.is_empty() {
                    println!("No tasks, exiting");
                    break;
                }

                // Parse task
                if let Ok(task) = from_str::<Task>(&messages[0].body) {
                    println!("Processing task {}: {}", task.id, task.work);

                    // Do work
                    std::thread::sleep(std::time::Duration::from_secs(1));

                    // Report completion
                    let result = json!({
                        "task_id": task.id,
                        "status": "complete",
                        "time": chrono::Local::now().to_rfc3339()
                    });

                    let _ = client.send(
                        "coordinator",
                        &result.to_string(),
                        None,
                        None
                    );
                }
            }
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    let _ = client.set_status("done");
}

Building and Testing

# Build the library
cargo build --release

# Run tests
cargo test

# Build examples
cargo build --examples

# Run example
cargo run --example task_worker

Performance

Direct database access:

  • send(): ~5ms
  • recv(): ~10ms per poll
  • search(): ~20ms for 1000 messages
  • Thread-safe with SQLite WAL mode

Deployment

Docker

FROM rust:1.75-alpine
WORKDIR /app
COPY . .
RUN cargo build --release
CMD ["./target/release/worker"]

Systemd

[Unit]
Description=a2a Rust Worker
After=network.target

[Service]
Type=simple
User=a2a
ExecStart=/opt/a2a/worker
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

See Also