The a2a_client.go module provides a Go API for a2a messaging, enabling integration with Go-based systems, microservices, and CLIs.
# Add to your go.mod (root of this repo is the Go module)
go get github.com/javimosch/a2a-skillpackage main
import (
"fmt"
"log"
"a2a" // import the local package
)
func main() {
// Initialize client
client, err := a2a.NewClient("my-project", "alice")
if err != nil {
log.Fatal(err)
}
// Send a message
msgID, err := client.Send("bob", "Hello Bob!", nil, "")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Sent message %d\n", msgID)
// Receive messages (blocks up to 10 seconds)
messages, err := client.Recv(a2a.RecvOpts{
Wait: 10,
UnreadOnly: true,
IncludeSelf: false,
})
if err != nil {
log.Fatal(err)
}
for _, msg := range messages {
fmt.Printf("%s: %s\n", msg.Sender, msg.Body)
}
// Broadcast
client.Send("all", "Hello everyone!", nil, "")
// Mark done
client.SetStatus("done")
}Create a new client.
Send a message with optional thread ID and TTL. Set to to "all", "*", or "broadcast" for broadcast messages.
Pass ttlSeconds as nil for no expiry, or threadID as "" for no thread.
ttl := 3600
msgID, err := client.Send("bob", "Hello", &ttl, "thread-1")Backward-compatible wrapper for Send() without thread or TTL.
msgID, err := client.SendSimple("bob", "Hello")Receive messages with full options (struct-based API). Calls CleanupExpired()
and Touch() internally before fetching. Returns after finding messages, or
after opts.Wait seconds.
import "time"
msgs, err := client.Recv(a2a.RecvOpts{
Wait: 30,
UnreadOnly: true,
IncludeSelf: false,
Limit: 10,
Since: nil, // optional: pointer to float64 timestamp
})Backward-compatible wrapper for Recv() with positional args.
messages, err := client.RecvSimple(30.0, true, false, 10)| Field | Type | Default | Description |
|---|---|---|---|
Wait |
float64 |
0 |
Block up to N seconds for at least one message |
UnreadOnly |
bool |
true |
Only return unread messages |
IncludeSelf |
bool |
false |
Include messages from this agent |
Limit |
int |
0 |
Max messages to return (0 = unlimited) |
Since |
*float64 |
nil |
Filter to messages after this timestamp |
View recent messages without marking as read.
recent, err := client.Peek(50)Get registered agents.
peers, err := client.ListPeers()
for _, p := range peers {
fmt.Println(p.ID, p.Status)
}Update agent status (active/idle/done/blocked). Returns last_seen timestamp. Returns an error if the status is not one of the valid values or if the agent is not registered.
client.SetStatus("done")Check agent status. Returns nil, nil if agent is not found (matching Python's None return).
status, err := client.GetStatus("bob")
if status == nil {
// agent not found
}Full-text search using SQLite FTS5. Requires the binary to be built with
-tags fts5. Falls back to LIKE-based search if FTS5 is unavailable.
results, err := client.SearchFTS("important", 100)Search messages by substring (case-insensitive). Returns an error if the query is empty or limit is not a positive integer.
results, err := client.Search("important", 100)Get all messages in a thread.
messages, err := client.Thread("42")Get aggregated bus statistics.
stats, err := client.Stats()
fmt.Printf("Total messages: %d\n", stats.Messages)Get stats as JSON string.
jsonStr, err := client.StatsJSON()Create the database and schema. No-op if already exists. Migrates older
schemas that lack the ttl_seconds column.
client.InitProject()Register this agent on the bus. If upsert is true, updates existing
registration instead of failing. The pid parameter is a pointer to an
int — pass nil to omit the process ID, or &pidVar for a specific PID.
client.AgentID = "alice"
client.Register("planner", "Plan things", "claude", nil, true)Remove this agent from the bus.
client.Unregister()Update the agent's last_seen timestamp to now.
client.Touch()Delete messages that have exceeded their TTL. Returns count of deleted
messages. Called automatically by Recv() and Peek().
deleted, err := client.CleanupExpired()Alias for ListPeers(). Returns all registered agents.
peers, err := client.List()Set or get agent status. If newStatus is one of active, idle, done,
blocked, updates the status and returns the previous status as a pointer.
Returns nil for the previous status if the agent was not previously
registered.
old, err := client.Status("active")Check whether an agent is registered on the bus.
exists, err := client.AgentExists("bob")Block until at least count unread messages exist for this agent, or
until timeoutSec seconds elapse. Returns true if the desired message
count was reached, false if the timeout elapsed first.
found, err := client.Wait(1, 30)Delete the entire database file. All bus data is lost.
client.Clear()Returns resolved project metadata: project name, database path, and whether the database file exists.
info := client.ProjectInfo()
fmt.Println(info["db"], info["exists"])package main
import (
"encoding/json"
"fmt"
"log"
"time"
"a2a"
)
type Task struct {
ID string `json:"id"`
Work string `json:"work"`
}
func main() {
client := a2a.NewClient("production", "worker-1")
client.SetStatus("active")
for {
// Wait for task (30 second timeout)
messages, err := client.Recv(a2a.RecvOpts{Wait: 30, UnreadOnly: true, IncludeSelf: false, Limit: 1})
if err != nil {
log.Fatal(err)
}
if len(messages) == 0 {
fmt.Println("No tasks, exiting")
break
}
// Parse task
var task Task
err = json.Unmarshal([]byte(messages[0].Body), &task)
if err != nil {
continue
}
fmt.Printf("Processing task %s: %s\n", task.ID, task.Work)
// Do work
time.Sleep(1 * time.Second)
// Report completion
result := map[string]interface{}{
"task_id": task.ID,
"status": "complete",
"time": time.Now().Format(time.RFC3339),
}
resultJSON, _ := json.Marshal(result)
client.Send("coordinator", string(resultJSON), "", nil)
}
client.SetStatus("done")
}FROM golang:1.20-alpine
WORKDIR /app
COPY . .
RUN go build -o worker .
CMD ["./worker"]apiVersion: batch/v1
kind: Job
metadata:
name: a2a-worker
spec:
template:
spec:
containers:
- name: worker
image: a2a-worker:latest
volumeMounts:
- name: a2a-data
mountPath: /.a2a
volumes:
- name: a2a-data
emptyDir: {}Direct database access:
- send(): ~5ms
- recv(): ~10ms per poll
- search(): ~20ms for 1000 messages
- CLIENT_API.md — Python client
- NODE_CLIENT_API.md — Node.js client
- REST_API.md — HTTP interface