Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/apphost/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/cryptopunkscc/astrald/astral"
)

// Conn wraps a net.Conn with the astral Query that opened it and the direction
// of the connection; direction determines which identity is local vs remote.
type Conn struct {
net.Conn
query *astral.Query
Expand Down Expand Up @@ -37,6 +39,8 @@ func (conn Conn) LocalIdentity() *astral.Identity {
return conn.query.Target
}

// RemoteAddr overrides net.Conn.RemoteAddr to return the peer's astral identity
// as the address rather than a TCP/socket address.
func (conn Conn) RemoteAddr() net.Addr {
return Addr{address: conn.RemoteIdentity().String()}
}
2 changes: 2 additions & 0 deletions lib/apphost/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/cryptopunkscc/astrald/mod/apphost"
)

// Host represents an authenticated session with an apphost node, providing
// message framing via an embedded Channel and query routing over the IPC conn.
type Host struct {
*channel.Channel
conn *ipc.Conn
Expand Down
6 changes: 6 additions & 0 deletions lib/apphost/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/cryptopunkscc/astrald/mod/apphost"
)

// Router manages connections to an apphost endpoint, caching resolved identities
// across calls and handling context-driven query cancellation.
type Router struct {
endpoint string
token string
Expand Down Expand Up @@ -67,6 +69,8 @@ func (router *Router) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery) (
return host.RouteQuery(q, ctx.Zone(), ctx.Filters())
}

// GuestID returns the authenticated guest identity, connecting to the host to
// resolve it on first call; returns nil if the connection or auth fails.
func (router *Router) GuestID() *astral.Identity {
if router.guestID != nil {
return router.guestID
Expand All @@ -81,6 +85,8 @@ func (router *Router) GuestID() *astral.Identity {
return router.guestID
}

// HostID returns the host node's identity, connecting to resolve it on first
// call; returns nil if the connection fails.
func (router *Router) HostID() *astral.Identity {
if router.hostID != nil {
return router.hostID
Expand Down
2 changes: 2 additions & 0 deletions lib/apps/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/cryptopunkscc/astrald/mod/apphost"
)

// Handler accepts inbound IPC queries from an apphost-registered endpoint.
// Close or context cancellation terminates all blocking calls.
type Handler struct {
listener net.Listener
ipcToken astral.Nonce
Expand Down
2 changes: 2 additions & 0 deletions lib/apps/object_describer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type objectDescribeArgs struct {
Out string `query:"optional"`
}

// WithObjectDescriber mounts the objects.describe IPC op and registers the app as a Describer with the node.
// All provided describers are fanned out concurrently per describe request.
func WithObjectDescriber(describers ...objects.Describer) ServeOption {
return func(cfg *serveConfig) error {
if len(describers) == 0 {
Expand Down
2 changes: 2 additions & 0 deletions lib/apps/object_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type objectFindArgs struct {
Out string `query:"optional"`
}

// WithObjectFinder mounts the objects.find IPC op and registers the app as a Finder with the node.
// All provided finders are fanned out concurrently per find request.
func WithObjectFinder(finders ...objects.Finder) ServeOption {
return func(cfg *serveConfig) error {
if len(finders) == 0 {
Expand Down
2 changes: 2 additions & 0 deletions lib/apps/object_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type objectSearchArgs struct {
Out string `query:"optional"`
}

// WithObjectSearcher mounts the objects.search IPC op and registers the app as a Searcher with the node.
// All provided searchers are fanned out concurrently per search request.
func WithObjectSearcher(searchers ...objects.Searcher) ServeOption {
return func(cfg *serveConfig) error {
if len(searchers) == 0 {
Expand Down
1 change: 1 addition & 0 deletions lib/apps/pending_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cryptopunkscc/astrald/mod/apphost"
)

// PendingQuery holds an unresolved inbound query; exactly one of Accept, Reject, RejectWithCode, Skip, or Close must be called.
type PendingQuery struct {
conn net.Conn
query *astral.Query
Expand Down
7 changes: 6 additions & 1 deletion lib/apps/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ type regRequest struct {
done chan error
}

// RegistrationHook is called after every successful (re)connect and handler re-registration.
// Errors abort the reconnect cycle and trigger a reconnect attempt.
type RegistrationHook func(ctx *astral.Context) error

// RegistrationHookRegistrar is an optional extension of Registrar for components that support
// post-registration lifecycle hooks.
type RegistrationHookRegistrar interface {
AddRegistrationHooks(hooks ...RegistrationHook)
}
Expand Down Expand Up @@ -72,6 +76,8 @@ func WithEvents(e AppRegistrarEvents) AppRegistrarOption {
return func(s *AppRegistrar) { s.events = e }
}

// WithRegistrarRegistrationHooks adds RegistrationHooks at construction time.
// Use WithRegistrationHooks (a ServeOption) to add hooks via Serve/ServeWith instead.
func WithRegistrarRegistrationHooks(hooks ...RegistrationHook) AppRegistrarOption {
return func(s *AppRegistrar) { s.AddRegistrationHooks(hooks...) }
}
Expand All @@ -93,7 +99,6 @@ func NewAppRegistrar(ctx *astral.Context, opts ...AppRegistrarOption) *AppRegist
return s
}

// NewDefaultAppRegistrar creates an AppRegistrar with default options and starts its run loop.
func NewDefaultAppRegistrar(ctx *astral.Context, opts ...AppRegistrarOption) *AppRegistrar {
return NewAppRegistrar(ctx, opts...)
}
Expand Down
1 change: 1 addition & 0 deletions lib/apps/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func WithRegistrationHook(hook RegistrationHook) ServeOption {
return WithRegistrationHooks(hook)
}

// WithRegistrationHooks adds hooks that run after each successful (re)registration with the node.
func WithRegistrationHooks(hooks ...RegistrationHook) ServeOption {
return func(cfg *serveConfig) error {
for _, hook := range hooks {
Expand Down
2 changes: 2 additions & 0 deletions lib/arl/arl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func New(caller *astral.Identity, target *astral.Identity, query string) *ARL {
return &ARL{Caller: caller, Target: target, Query: query}
}

// Split parses a raw ARL string of the form [caller@][target:]query into its three components.
func Split(s string) (caller, target, query string) {
matches := callerExp.FindStringSubmatch(s)
if len(matches) > 0 {
Expand All @@ -40,6 +41,7 @@ func Split(s string) (caller, target, query string) {
return
}

// Parse parses an ARL string (with or without the astral:// scheme) into an ARL; if resolver is non-nil it is used to resolve identity strings, otherwise raw key parsing is attempted.
func Parse(s string, resolver dir.Resolver) (arl *ARL, err error) {
if after, found := strings.CutPrefix(s, "astral://"); found {
s = after
Expand Down
4 changes: 4 additions & 0 deletions lib/astrald/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/cryptopunkscc/astrald/lib/query"
)

// Client wraps a Router with an optional fixed target identity, used to direct queries
// without the caller having to supply the target on every call.
type Client struct {
Router
targetID *astral.Identity
Expand All @@ -30,6 +32,7 @@ func SetDefault(client *Client) {
defaultClient = client
}

// Query routes an outbound query to client.targetID using the client's own guest identity as the caller.
func (client *Client) Query(ctx *astral.Context, method string, args any) (astral.Conn, error) {
return client.RouteQuery(ctx, astral.Launch(query.New(client.GuestID(), client.targetID, method, args)))
}
Expand All @@ -51,6 +54,7 @@ func QueryChannel(ctx *astral.Context, method string, args any, cfg ...channel.C
return Default().QueryChannel(ctx, method, args, cfg...)
}

// WithTarget returns a shallow copy of the client with the target identity set; the original is not modified.
func (client *Client) WithTarget(identity *astral.Identity) *Client {
c := *client
c.targetID = identity
Expand Down
2 changes: 2 additions & 0 deletions lib/astrald/conn_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/cryptopunkscc/astrald/astral"
)

// ConnMonitor wraps an astral.Conn to track transferred bytes and fire optional callbacks on close or I/O errors.
// Callbacks are invoked synchronously at the point of the error or close call.
type ConnMonitor struct {
OnClose func()
OnReadError func(error)
Expand Down
2 changes: 2 additions & 0 deletions lib/astrald/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package astrald

import "github.com/cryptopunkscc/astrald/astral"

// NewContext returns a context pre-populated with the default client's guest identity and ZoneAll,
// suitable for outbound queries without additional configuration.
func NewContext() *astral.Context {
return astral.
NewContext(nil).
Expand Down
2 changes: 2 additions & 0 deletions lib/astrald/gate_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ReadyGate interface {
Ready() <-chan struct{}
}

// GateRouter wraps a Router and blocks every outbound query until the gate signals ready.
type GateRouter struct {
Router
gate ReadyGate
Expand All @@ -19,6 +20,7 @@ func NewGateRouter(r Router, g ReadyGate) *GateRouter {
return &GateRouter{Router: r, gate: g}
}

// RouteQuery blocks until the gate is open or ctx is cancelled, then delegates to the inner router.
func (gr *GateRouter) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery) (astral.Conn, error) {
select {
case <-gr.gate.Ready():
Expand Down
5 changes: 5 additions & 0 deletions lib/astrald/retry_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"github.com/cryptopunkscc/astrald/sig"
)

// RetryRouter wraps a Router and retries queries that fail with ErrNodeUnavailable,
// waiting between attempts according to the provided sig.Retry policy.
// maxAttempts == 0 means unlimited retries.
type RetryRouter struct {
Router
retry *sig.Retry
Expand All @@ -28,6 +31,8 @@ func NewNoRetryRouter(r Router) *RetryRouter {
return &RetryRouter{Router: r, maxAttempts: 1}
}

// RouteQuery retries the query on ErrNodeUnavailable; any other error is returned immediately.
// On success the retry policy is reset so the next call starts fresh.
func (rr *RetryRouter) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery) (astral.Conn, error) {
for attempt := 0; ; attempt++ {
conn, err := rr.Router.RouteQuery(ctx, q)
Expand Down
2 changes: 2 additions & 0 deletions lib/astrald/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/cryptopunkscc/astrald/lib/apphost"
)

// Router is the core transport abstraction: it routes an in-flight query to its destination
// and exposes the local guest and host identities used to build queries.
type Router interface {
RouteQuery(*astral.Context, *astral.InFlightQuery) (astral.Conn, error)
GuestID() *astral.Identity
Expand Down
1 change: 1 addition & 0 deletions lib/ipc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipc

import "net"

// Conn wraps a net.Conn with the IPC protocol and address used to establish it.
type Conn struct {
net.Conn
protocol string
Expand Down
3 changes: 3 additions & 0 deletions lib/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Dial(target string) (conn *Conn, err error) {
return DialContext(context.Background(), target)
}

// DialContext connects to target using the format "proto:addr", where proto is one of tcp, unix, memu, or memb.
func DialContext(ctx context.Context, target string) (conn *Conn, err error) {
parts := strings.SplitN(target, ":", 2)
if len(parts) < 2 {
Expand Down Expand Up @@ -55,6 +56,7 @@ func DialContext(ctx context.Context, target string) (conn *Conn, err error) {
return
}

// Listen binds to the given "proto:addr" IPC address; for unix sockets it expands "~/" and removes a stale socket file before retrying.
func Listen(ipcAddress string) (net.Listener, error) {
var protocol, address string

Expand Down Expand Up @@ -100,6 +102,7 @@ func Listen(ipcAddress string) (net.Listener, error) {
}
}

// ListenAny opens a listener on a system-assigned ephemeral address for the given protocol, useful when the caller does not care which address is used.
func ListenAny(protocol string) (net.Listener, error) {
switch protocol {
case "tcp":
Expand Down
3 changes: 3 additions & 0 deletions lib/query/args_to_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package query

import "strings"

// ArgsToMap converts a flag-style argument slice to a map. Arguments prefixed
// with "-" are treated as named keys consuming the next element as their value;
// unprefixed arguments are stored under DefaultArgKey.
func ArgsToMap(args []string) (params map[string]string) {
params = make(map[string]string)

Expand Down
1 change: 1 addition & 0 deletions lib/query/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewConn(localID *astral.Identity, remoteID *astral.Identity, w io.WriteClos
}
}

// Read closes the connection on any read error, ensuring the write side is also torn down.
func (s *Conn) Read(p []byte) (n int, err error) {
n, err = s.Reader.Read(p)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions lib/query/editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ func Edit(s any) *Editor {
return edit(s, false)
}

// EditCamel returns an Editor for s, preserving the original field name casing
// instead of converting to snake_case as Edit does.
func EditCamel(args any) *Editor {
return edit(args, true)
}

// EditValue returns an Editor from an already-obtained reflect.Value; useful
// when the caller already holds a reflected struct pointer.
func EditValue(v reflect.Value) *Editor {
return editValue(v, false)
}
Expand Down Expand Up @@ -143,6 +147,8 @@ func (editor *Editor) Field(name string) (*FieldEditor, error) {
return nil, ErrFieldNotFound
}

// SetMany applies a map of values to the editor's fields, silently skipping
// unknown keys; returns an error only on type-conversion failures.
func (editor *Editor) SetMany(vals map[string]string) error {
for key, value := range vals {
err := editor.Set(key, value)
Expand All @@ -157,6 +163,8 @@ func (editor *Editor) SetMany(vals map[string]string) error {
return nil
}

// SetArgs parses a "-key value" argument slice into the editor's fields,
// returning unconsumed (non-flag) args and an error for any unknown flag.
func (editor *Editor) SetArgs(args []string) (unparsed []string, err error) {
var i = 0

Expand Down
2 changes: 2 additions & 0 deletions lib/query/field_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type FieldTag struct {
Other map[string]string
}

// ParseTag parses a semicolon-separated struct tag value of the form
// "key:<name>;skip;required" into a FieldTag.
func ParseTag(tag string) *FieldTag {
var fieldTag = FieldTag{Other: make(map[string]string)}

Expand Down
1 change: 1 addition & 0 deletions lib/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/cryptopunkscc/astrald/astral"
)

// DefaultArgKey is the map key used for positional (non-flagged) arguments.
const DefaultArgKey = "arg"
const maxQueryTimeout = 60 * time.Second

Expand Down
2 changes: 2 additions & 0 deletions lib/query/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func RouteInFlight(ctx *astral.Context, r astral.Router, q *astral.InFlightQuery
return NewConn(q.Caller, q.Target, target, pipeReader, true), err
}

// Route routes a Query and wraps the resulting connection in a channel.Channel
// for structured framed I/O, unlike RouteInFlight which returns a raw Conn.
func Route(ctx *astral.Context, r astral.Router, q *astral.Query) (*channel.Channel, error) {
conn, err := RouteInFlight(ctx, r, astral.Launch(q))
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions lib/routing/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/cryptopunkscc/astrald/lib/query"
)

// App is a self-describing ScopeRouter that automatically exposes a ".spec"
// op listing all registered operations.
type App struct {
*ScopeRouter
}
Expand All @@ -19,6 +21,8 @@ type SpecArgs struct {
Out string
}

// NewApp wraps s in an OpRouter, mounts it in a ScopeRouter, and injects a
// ".spec" op that streams the full operation manifest to callers.
func NewApp(s any) *App {
ops := NewOpRouter()
ops.AddStruct(s)
Expand All @@ -30,6 +34,7 @@ func NewApp(s any) *App {
return &app
}

// Spec streams all OpSpec entries for the app and terminates with an EOS object.
func (app *App) Spec(_ *astral.Context, query *IncomingQuery, args SpecArgs) error {
ch := query.Accept(channel.WithFormats(args.In, args.Out))
defer ch.Close()
Expand All @@ -48,6 +53,8 @@ func (app *App) Add(scope string, s any) {
app.ScopeRouter.Add(scope, NewOpRouter(s))
}

// Run routes args[0] as an op name (with remaining args as query params) against
// the app and bridges the resulting connection to stdin/stdout.
func (app *App) Run(ctx *astral.Context, args []string) error {
if len(args) == 0 {
return fmt.Errorf("missing command")
Expand Down
1 change: 1 addition & 0 deletions lib/routing/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewConn(localID *astral.Identity, remoteID *astral.Identity, w io.WriteClos
}
}

// Read closes the connection on any read error to ensure cleanup propagates.
func (s *Conn) Read(p []byte) (n int, err error) {
n, err = s.Reader.Read(p)
if err != nil {
Expand Down
Loading