diff --git a/mod/nodes/client/client.go b/mod/nodes/client/client.go index 4f4711963..bc36c54d4 100644 --- a/mod/nodes/client/client.go +++ b/mod/nodes/client/client.go @@ -20,6 +20,7 @@ func New(targetID *astral.Identity, client *astrald.Client) *Client { return &Client{astral: client, targetID: targetID} } +// Default returns the shared package-level client, lazily creating it on first use. func Default() *Client { if defaultClient == nil { defaultClient = New(nil, astrald.Default()) @@ -27,6 +28,7 @@ func Default() *Client { return defaultClient } +// WithTarget returns a copy retargeted at the given identity; the receiver is unchanged. func (client *Client) WithTarget(target *astral.Identity) *Client { return &Client{astral: client.astral, targetID: target} } diff --git a/mod/nodes/frames/frame.go b/mod/nodes/frames/frame.go index da677bf00..9f2a13e9f 100644 --- a/mod/nodes/frames/frame.go +++ b/mod/nodes/frames/frame.go @@ -6,11 +6,14 @@ import ( "github.com/cryptopunkscc/astrald/astral" ) +// Frame is a message exchanged over a node link. type Frame interface { astral.Object fmt.Stringer } +// FrameTypes maps frame opcodes to object types by slice index. +// note: order is the wire opcode; never reorder or remove entries. var FrameTypes = []string{ "nodes.frames.ping", "nodes.frames.query", diff --git a/mod/nodes/module.go b/mod/nodes/module.go index 7522f57bb..04cc3d4db 100644 --- a/mod/nodes/module.go +++ b/mod/nodes/module.go @@ -38,6 +38,8 @@ const ( MaxDataFrameSize = 8192 ) +// Module manages encrypted links between nodes: establishing them, resolving +// peer endpoints, and tracking liveness. type Module interface { EstablishInboundLink(ctx context.Context, conn exonet.Conn) error EstablishOutboundLink(ctx context.Context, remoteID *astral.Identity, conn exonet.Conn) (Link, error) @@ -66,10 +68,13 @@ type Link interface { Done() <-chan struct{} } +// EndpointResolver resolves the network endpoints at which an identity can be reached. type EndpointResolver interface { ResolveEndpoints(*astral.Context, *astral.Identity) (<-chan *EndpointWithTTL, error) } +// LinkStrategy drives one approach to establishing a link (e.g. direct, NAT, Tor); +// Done closes once the strategy has finished, whether or not it produced a link. type LinkStrategy interface { Name() string Signal(ctx *astral.Context) diff --git a/mod/nodes/src/authorizers.go b/mod/nodes/src/authorizers.go index 3a07484ca..23e316a81 100644 --- a/mod/nodes/src/authorizers.go +++ b/mod/nodes/src/authorizers.go @@ -5,6 +5,7 @@ import ( "github.com/cryptopunkscc/astrald/mod/nodes" ) +// AuthorizeRelayFor grants relaying only when the actor relays for its own identity. func (mod *Module) AuthorizeRelayFor(ctx *astral.Context, a *nodes.RelayForAction) bool { return a.Actor().IsEqual(a.ForID) } diff --git a/mod/nodes/src/basic_link_strategy.go b/mod/nodes/src/basic_link_strategy.go index 118d19ee7..74a0aacab 100644 --- a/mod/nodes/src/basic_link_strategy.go +++ b/mod/nodes/src/basic_link_strategy.go @@ -24,6 +24,8 @@ var _ nodes.LinkStrategy = &BasicLinkStrategy{} func (s *BasicLinkStrategy) Name() string { return nodes.StrategyBasic } +// Signal starts a dialing round in the background; concurrent calls while a round +// is still active are ignored. The first successful link wins, the rest are closed. func (s *BasicLinkStrategy) Signal(ctx *astral.Context) { s.mu.Lock() if s.activeDone != nil { @@ -101,6 +103,8 @@ func (s *BasicLinkStrategy) Signal(ctx *astral.Context) { }() } +// Done returns a channel closed when the active round finishes; a closed channel +// is returned immediately when no round is running. func (s *BasicLinkStrategy) Done() <-chan struct{} { s.mu.Lock() defer s.mu.Unlock() diff --git a/mod/nodes/src/deps.go b/mod/nodes/src/deps.go index 8fbb7c001..2ee56bd3a 100644 --- a/mod/nodes/src/deps.go +++ b/mod/nodes/src/deps.go @@ -25,6 +25,8 @@ type Deps struct { Events events.Module } +// LoadDependencies injects deps and registers the "linked" directory filter and +// the relay-for authorizer. func (mod *Module) LoadDependencies(*astral.Context) (err error) { err = core.Inject(mod.node, &mod.Deps) if err != nil { diff --git a/mod/nodes/src/endpoint_resolver.go b/mod/nodes/src/endpoint_resolver.go index 11fe3e46a..abb406ece 100644 --- a/mod/nodes/src/endpoint_resolver.go +++ b/mod/nodes/src/endpoint_resolver.go @@ -10,6 +10,8 @@ import ( nodescli "github.com/cryptopunkscc/astrald/mod/nodes/client" ) +// ResolveEndpoints fans out to all registered resolvers and merges their results +// into one channel, closed once every resolver finishes. func (mod *Module) ResolveEndpoints(ctx *astral.Context, nodeID *astral.Identity) (_ <-chan *nodes.EndpointWithTTL, err error) { var ch = make(chan *nodes.EndpointWithTTL) @@ -62,6 +64,8 @@ func (mod *Module) runResolver(ctx *astral.Context, r nodes.EndpointResolver, no } } +// UpdateNodeEndpoints asks a remote resolver node for identity's endpoints and +// stores them locally. Per-endpoint store errors are logged, not returned. func (mod *Module) UpdateNodeEndpoints(ctx *astral.Context, resolver *astral.Identity, identity *astral.Identity) error { client := nodescli.New(resolver, astrald.Default()) diff --git a/mod/nodes/src/input_buffer.go b/mod/nodes/src/input_buffer.go index 0792c02c7..efdac5f8b 100644 --- a/mod/nodes/src/input_buffer.go +++ b/mod/nodes/src/input_buffer.go @@ -32,6 +32,7 @@ func NewInputBuffer(size int, onRead func(int)) *InputBuffer { return &InputBuffer{size: size, onRead: onRead, shut: make(chan struct{}), done: make(chan struct{})} } +// Push appends a whole chunk. All-or-nothing: rejects with ErrBufferOverflow rather than storing a partial chunk. func (b *InputBuffer) Push(p []byte) error { b.mu.Lock() defer b.mu.Unlock() @@ -49,6 +50,7 @@ func (b *InputBuffer) Push(p []byte) error { return nil } +// Read never blocks: on an empty open buffer it returns *ErrBufferEmpty carrying a wake channel; on an empty closed buffer it returns io.EOF. func (b *InputBuffer) Read(p []byte) (n int, err error) { b.mu.Lock() defer b.mu.Unlock() @@ -101,10 +103,12 @@ func (b *InputBuffer) IsEmpty() bool { return b.used == 0 } +// Closed is closed when Close is called, before remaining data is drained. func (b *InputBuffer) Closed() <-chan struct{} { return b.shut } +// Done is closed once the buffer is both closed and fully drained. func (b *InputBuffer) Done() <-chan struct{} { return b.done } diff --git a/mod/nodes/src/link.go b/mod/nodes/src/link.go index 745fc44cc..fa57ac8b9 100644 --- a/mod/nodes/src/link.go +++ b/mod/nodes/src/link.go @@ -21,6 +21,7 @@ type Ping struct { pong chan struct{} } +// Link is a live mux channel to a peer with its own ping/keepalive loop and pressure tracking; closes once on first error. type Link struct { *channel.Channel mux *Mux @@ -51,6 +52,7 @@ func (s *Link) RemoteIdentity() *astral.Identity { return s.conn.RemoteIdentity() } +// CloseWithError records err as the link's cause of death; only the first call's error is kept. func (s *Link) CloseWithError(err error) error { if err == nil { err = errors.New("link closed") @@ -142,6 +144,7 @@ func (s *Link) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.Wri return s.mux.RouteQuery(ctx, q, w) } +// Ping sends one ping and blocks for the pong. Only one ping may be in flight; concurrent calls error. func (s *Link) Ping() (time.Duration, error) { p := &Ping{ nonce: astral.NewNonce(), diff --git a/mod/nodes/src/link_negotiator.go b/mod/nodes/src/link_negotiator.go index 72d0a19c9..f9f21013b 100644 --- a/mod/nodes/src/link_negotiator.go +++ b/mod/nodes/src/link_negotiator.go @@ -24,6 +24,7 @@ type muxLinkNegotiator struct { ch *channel.Channel } +// NegotiateOutbound reads the peer's feature list, selects mux2, and waits for the link nonce; fails if mux2 is absent or rejected. func (n *muxLinkNegotiator) NegotiateOutbound() (*Link, error) { var count *astral.Uint16 if err := n.ch.Switch(channel.Expect(&count), channel.PassErrors); err != nil { @@ -71,6 +72,7 @@ func (n *muxLinkNegotiator) NegotiateOutbound() (*Link, error) { return link, nil } +// NegotiateInbound offers mux2, confirms the peer selected it, and assigns the link nonce. func (n *muxLinkNegotiator) NegotiateInbound() (*Link, error) { if err := n.ch.Send(astral.NewUint16(1)); err != nil { return nil, fmt.Errorf("send feature count: %w", err) @@ -108,6 +110,7 @@ func (n *muxLinkNegotiator) NegotiateInbound() (*Link, error) { return link, nil } +// EstablishOutboundLink runs the noise handshake and mux negotiation over conn, then registers the link; closes conn on any error. func (mod *Module) EstablishOutboundLink(ctx context.Context, remoteID *astral.Identity, conn exonet.Conn) (_ nodes.Link, err error) { defer func() { if err != nil { @@ -142,6 +145,7 @@ func (mod *Module) EstablishOutboundLink(ctx context.Context, remoteID *astral.I return link, nil } +// EstablishInboundLink runs the inbound noise handshake and mux negotiation over conn, then registers the link; closes conn on any error. func (mod *Module) EstablishInboundLink(ctx context.Context, conn exonet.Conn) (err error) { defer func() { if err != nil { diff --git a/mod/nodes/src/link_pool.go b/mod/nodes/src/link_pool.go index d0d8b9042..49ac97dc9 100644 --- a/mod/nodes/src/link_pool.go +++ b/mod/nodes/src/link_pool.go @@ -26,6 +26,7 @@ func NewLinkPool(mod *Module) *LinkPool { } } +// AddLink registers link, wires its mux router, emits a created event, and spawns a goroutine that removes it and closes its sessions when it dies. func (pool *LinkPool) AddLink(link *Link) error { dir := "in" netName := "unknown network" @@ -158,6 +159,7 @@ func (pool *LinkPool) getOrCreateNodeLinker(target *astral.Identity) *NodeLinker return linker } +// RetrieveLink returns a matching existing link immediately unless ForceNew is set; otherwise it activates linking strategies and delivers the first link that matches, ctx cancellation, or ErrLinkNotProduced. func (pool *LinkPool) RetrieveLink( ctx *astral.Context, target *astral.Identity, diff --git a/mod/nodes/src/module.go b/mod/nodes/src/module.go index cc11d9103..7e1b28dd3 100644 --- a/mod/nodes/src/module.go +++ b/mod/nodes/src/module.go @@ -51,6 +51,7 @@ type Module struct { privateKey *crypto.PrivateKey } +// Run schedules the endpoint-cleanup task and blocks until ctx is cancelled. func (mod *Module) Run(ctx *astral.Context) error { mod.ctx = ctx.IncludeZone(astral.ZoneNetwork) <-mod.Deps.Scheduler.Ready() diff --git a/mod/nodes/src/mux.go b/mod/nodes/src/mux.go index 977d1da09..853362168 100644 --- a/mod/nodes/src/mux.go +++ b/mod/nodes/src/mux.go @@ -16,6 +16,7 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Mux multiplexes many query sessions over one link channel, dispatching inbound frames and tracking per-session state. type Mux struct { mod *Module ch *channel.Channel @@ -66,6 +67,7 @@ func (m *Mux) SendMigrateFrame(nonce astral.Nonce) error { return m.ch.Send(&frames.Migrate{Nonce: nonce}) } +// SetRouter installs the router once; subsequent calls are ignored. Unblocks waitRouter. func (m *Mux) SetRouter(r astral.Router) { if !m.routerOnce.CompareAndSwap(false, true) { return @@ -74,6 +76,7 @@ func (m *Mux) SetRouter(r astral.Router) { close(m.routerSet) } +// RouteQuery opens a session and sends a query (or relay query) frame, then blocks for the peer's routing result or ctx cancellation; on accept it pumps the session into w. func (m *Mux) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.WriteCloser) (_ io.WriteCloser, err error) { sourceID := m.RemoteIdentity() if q.Caller.IsEqual(ctx.Identity()) { @@ -124,6 +127,7 @@ func (m *Mux) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.Writ } } +// Handle dispatches one inbound frame; query frames are handled in their own goroutine, the rest inline on the read loop. func (m *Mux) Handle(obj astral.Object) error { frame, ok := obj.(frames.Frame) if !ok { diff --git a/mod/nodes/src/mux_session_reader.go b/mod/nodes/src/mux_session_reader.go index 6c7331d10..c5973b09c 100644 --- a/mod/nodes/src/mux_session_reader.go +++ b/mod/nodes/src/mux_session_reader.go @@ -50,6 +50,7 @@ func (r *muxSessionReader) Close() error { return nil } +// Push appends to the current buffer, falling through to the queued next buffer if the current one is already closed (mid-migration). func (r *muxSessionReader) Push(p []byte) error { r.cond.L.Lock() buf := r.buf @@ -83,6 +84,7 @@ func (r *muxSessionReader) Resume() { r.cond.Broadcast() } +// Advance closes the current buffer and, once it has drained, promotes the queued next buffer; no-op if none is queued. func (r *muxSessionReader) Advance() { r.cond.L.Lock() defer r.cond.L.Unlock() @@ -97,6 +99,7 @@ func (r *muxSessionReader) Advance() { } } +// Read blocks while paused or empty, transparently switching to the next buffer at EOF, so migration is invisible to the caller. func (r *muxSessionReader) Read(p []byte) (n int, err error) { for { r.cond.L.Lock() diff --git a/mod/nodes/src/mux_session_writer.go b/mod/nodes/src/mux_session_writer.go index b9aad5036..19c2687a9 100644 --- a/mod/nodes/src/mux_session_writer.go +++ b/mod/nodes/src/mux_session_writer.go @@ -30,6 +30,7 @@ func (w *muxSessionWriter) Buf() *OutputBuffer { return w.buf } +// SwapBuf installs a new buffer and reset func for the new link, closing the old buffer. func (w *muxSessionWriter) SwapBuf(buf *OutputBuffer, reset func()) { w.cond.L.Lock() old := w.buf @@ -41,6 +42,7 @@ func (w *muxSessionWriter) SwapBuf(buf *OutputBuffer, reset func()) { } } +// Close sends a Reset frame to the peer and closes the buffer; idempotent. Use PeerClose to skip the Reset. func (w *muxSessionWriter) Close() error { w.cond.L.Lock() if w.closed { @@ -94,6 +96,7 @@ func (w *muxSessionWriter) Resume() { w.cond.Broadcast() } +// Write blocks while paused and retries when the buffer runs out of flow-control credit, until all of p is written or the writer closes. func (w *muxSessionWriter) Write(p []byte) (int, error) { total := 0 diff --git a/mod/nodes/src/nat_link_strategy.go b/mod/nodes/src/nat_link_strategy.go index 5b89c6738..d93591abb 100644 --- a/mod/nodes/src/nat_link_strategy.go +++ b/mod/nodes/src/nat_link_strategy.go @@ -29,6 +29,7 @@ var _ nodes.LinkStrategy = &NATLinkStrategy{} func (s *NATLinkStrategy) Name() string { return nodes.StrategyNAT } +// Signal starts a NAT-traversal attempt in the background; a no-op if one is already running. func (s *NATLinkStrategy) Signal(ctx *astral.Context) { s.mu.Lock() if s.done != nil { @@ -183,6 +184,7 @@ func (s *NATLinkStrategy) signalDone() { } } +// Done returns a channel closed when the current attempt finishes; already closed when no attempt is running. func (s *NATLinkStrategy) Done() <-chan struct{} { s.mu.Lock() defer s.mu.Unlock() diff --git a/mod/nodes/src/node_linker.go b/mod/nodes/src/node_linker.go index fac7adb7e..f758dcb0d 100644 --- a/mod/nodes/src/node_linker.go +++ b/mod/nodes/src/node_linker.go @@ -25,6 +25,7 @@ func NewNodeLinker(mod *Module, target *astral.Identity) *NodeLinker { return linker } +// Activate signals the named strategies and returns a channel closed once all of them finish; an already-closed channel if none match. func (linker *NodeLinker) Activate(ctx *astral.Context, strategies []string) <-chan struct{} { var doneChannels []<-chan struct{} for _, strategy := range strategies { diff --git a/mod/nodes/src/object_describer.go b/mod/nodes/src/object_describer.go index 5ce10aafe..69a3f7b43 100644 --- a/mod/nodes/src/object_describer.go +++ b/mod/nodes/src/object_describer.go @@ -9,6 +9,9 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// DescribeObject fans out to every provider returned by FindObject and merges their +// descriptors into the result channel. Filtered providers are skipped. Requires the +// network zone. The channel closes once all providers are exhausted. func (mod *Module) DescribeObject(ctx *astral.Context, objectID *astral.ObjectID) (<-chan *objects.Descriptor, error) { if !ctx.Zone().Is(astral.ZoneNetwork) { return nil, astral.ErrZoneExcluded diff --git a/mod/nodes/src/object_finder.go b/mod/nodes/src/object_finder.go index 3861bf0f3..ecc15da57 100644 --- a/mod/nodes/src/object_finder.go +++ b/mod/nodes/src/object_finder.go @@ -4,6 +4,8 @@ import ( "github.com/cryptopunkscc/astrald/astral" ) +// FindObject returns providers for an object from the local search cache only; it +// performs no live network lookup. The channel is already closed on return. func (mod *Module) FindObject(ctx *astral.Context, id *astral.ObjectID) (<-chan *astral.Identity, error) { out := make(chan *astral.Identity, 1) defer close(out) diff --git a/mod/nodes/src/object_receiver.go b/mod/nodes/src/object_receiver.go index 6f7de8e95..71715620c 100644 --- a/mod/nodes/src/object_receiver.go +++ b/mod/nodes/src/object_receiver.go @@ -12,6 +12,9 @@ import ( "github.com/cryptopunkscc/astrald/mod/utp" ) +// ReceiveObject dispatches an inbound object by type, accepting observed-endpoint +// messages and reacting to link events with connectivity upgrades and endpoint +// refreshes. Unhandled types are ignored without accepting the drop. func (mod *Module) ReceiveObject(drop objects.Drop) error { switch object := drop.Object().(type) { case *nodes.ObservedEndpointMessage: diff --git a/mod/nodes/src/op_add_endpoint.go b/mod/nodes/src/op_add_endpoint.go index 9e1929519..aac03c88e 100644 --- a/mod/nodes/src/op_add_endpoint.go +++ b/mod/nodes/src/op_add_endpoint.go @@ -18,6 +18,8 @@ type opAddEndpointArgs struct { Out string `query:"optional"` } +// OpAddEndpoint parses "network:address" and registers it for the identity with a +// fixed ~90-day TTL. func (mod *Module) OpAddEndpoint(_ *astral.Context, q *routing.IncomingQuery, args opAddEndpointArgs) (err error) { chunks := strings.SplitN(args.Endpoint, ":", 2) if len(chunks) != 2 { diff --git a/mod/nodes/src/op_migrate_session.go b/mod/nodes/src/op_migrate_session.go index 761af7bcd..c5d39e06e 100644 --- a/mod/nodes/src/op_migrate_session.go +++ b/mod/nodes/src/op_migrate_session.go @@ -14,6 +14,9 @@ type opMigrateSessionArgs struct { Out string `query:"optional"` } +// OpMigrateSession moves an open session onto another link. With Start set it drives +// the migration directly; otherwise it acts as the responder, exchanging ready/switched/ +// resume/done signals with the initiator. Validates session and link state before starting. func (mod *Module) OpMigrateSession(ctx *astral.Context, q *routing.IncomingQuery, args opMigrateSessionArgs) error { ch := channel.New(q.AcceptRaw(), channel.WithOutputFormat(args.Out)) defer ch.Close() diff --git a/mod/nodes/src/op_new_link.go b/mod/nodes/src/op_new_link.go index 0bf4927c0..7c1457a34 100644 --- a/mod/nodes/src/op_new_link.go +++ b/mod/nodes/src/op_new_link.go @@ -17,6 +17,9 @@ type opNewLinkArgs struct { Out string } +// OpNewLink establishes a link to the target, either to a specific endpoint or via the +// given (or all) strategies. The link is built by a scheduled task; the query is accepted +// before it completes so creation may outlast the query timeout. Failures map to reject codes. func (mod *Module) OpNewLink(ctx *astral.Context, q *routing.IncomingQuery, args opNewLinkArgs) (err error) { target, err := mod.Dir.ResolveIdentity(args.Target) if err != nil { diff --git a/mod/nodes/src/output_buffer.go b/mod/nodes/src/output_buffer.go index df61a617d..211b120c1 100644 --- a/mod/nodes/src/output_buffer.go +++ b/mod/nodes/src/output_buffer.go @@ -25,6 +25,8 @@ func NewOutputBuffer(write func([]byte) error) *OutputBuffer { return &OutputBuffer{write: write} } +// Write never blocks: with no available space it returns *ErrBufferEmpty whose channel +// closes once Grow adds space. It may consume only part of p, bounded by available space. func (b *OutputBuffer) Write(p []byte) (int, error) { b.mu.Lock() if b.closed { diff --git a/mod/nodes/src/pressure.go b/mod/nodes/src/pressure.go index 565ea326b..e28a9a199 100644 --- a/mod/nodes/src/pressure.go +++ b/mod/nodes/src/pressure.go @@ -4,12 +4,17 @@ import ( "time" ) +// LinkPressureDetector tracks per-link throughput and RTT to decide when a link is +// under enough pressure to warrant upgrading to a better transport. Fed via OnBytes +// and OnRTT; IsHigh reports the current state and the onHigh callback fires on entry. type LinkPressureDetector interface { OnBytes(n int, now time.Time) OnRTT(rtt time.Duration, now time.Time) IsHigh() bool } +// LinkPressureConfig tunes the leaky-bucket throughput score and RTT EMA that combine, +// with hysteresis between Enter and Exit, into the HIGH-pressure trigger. type LinkPressureConfig struct { // LeakRate is how fast the token bucket drains in bytes/sec. // Traffic below this rate does not accumulate pressure; only sustained diff --git a/mod/nodes/src/query_router.go b/mod/nodes/src/query_router.go index ed5553401..a1a2df956 100644 --- a/mod/nodes/src/query_router.go +++ b/mod/nodes/src/query_router.go @@ -11,6 +11,9 @@ import ( "github.com/cryptopunkscc/astrald/mod/nodes" ) +// RouteQuery routes a query to its target over a link, reusing an existing one or +// retrieving a new one. On failure it falls back to relays listed in q.Extra, sending +// caller proof first when the caller differs from the context identity. Network zone only. func (mod *Module) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.WriteCloser) (rw io.WriteCloser, err error) { // check if the context allows for network queries if !ctx.Zone().Is(astral.ZoneNetwork) { diff --git a/mod/nodes/src/session_migrator.go b/mod/nodes/src/session_migrator.go index 82816b5a3..f552333a9 100644 --- a/mod/nodes/src/session_migrator.go +++ b/mod/nodes/src/session_migrator.go @@ -6,6 +6,8 @@ import ( "github.com/cryptopunkscc/astrald/mod/nodes" ) +// SessionMigrator moves an open session from one link to another. +// Call Begin, SendMigrateFrame, WaitClosed, then Complete in that order. type SessionMigrator struct { mod *Module session *session @@ -31,6 +33,8 @@ func (mod *Module) newSessionMigrator(session *session) (*SessionMigrator, error return &SessionMigrator{mod: mod, session: session, reader: reader, writer: writer}, nil } +// Begin transitions the session to migrating, pauses the writer, and swaps +// reader/writer buffers onto target. Fails unless the session is open. func (m *SessionMigrator) Begin(target *Link) error { if !m.session.swapState(stateOpen, stateMigrating) { m.mod.log.Logv(1, "session %v in state %v, cannot migrate", m.session.Nonce, m.session.getState()) @@ -58,11 +62,13 @@ func (m *SessionMigrator) Begin(target *Link) error { return nil } +// SendMigrateFrame tells the peer to migrate, sent on the old link after Begin. func (m *SessionMigrator) SendMigrateFrame() error { m.mod.log.Logv(1, "sending migrate frame for session %v on link %v", m.session.Nonce, m.oldLink.id) return m.oldLink.GetMux().SendMigrateFrame(m.session.Nonce) } +// WaitClosed blocks until the old link drains its input buffer or ctx ends. func (m *SessionMigrator) WaitClosed(ctx context.Context) error { m.mod.log.Logv(1, "waiting for old input buffer to close for session %v", m.session.Nonce) select { @@ -78,6 +84,8 @@ func (m *SessionMigrator) SetPeerBuffer(n int) { m.peerBuffer = n } +// Complete reattaches the session to the new link's mux, reopens it, and +// resumes the writer grown to the peer's advertised buffer. func (m *SessionMigrator) Complete() error { m.mod.log.Logv(1, "resuming session %v on link %v (peer buffer %v)", m.session.Nonce, m.newLink.id, m.peerBuffer) diff --git a/mod/nodes/src/tor_link_strategy.go b/mod/nodes/src/tor_link_strategy.go index a8f71880d..b01302290 100644 --- a/mod/nodes/src/tor_link_strategy.go +++ b/mod/nodes/src/tor_link_strategy.go @@ -32,6 +32,7 @@ var _ nodes.LinkStrategy = &TorLinkStrategy{} func (s *TorLinkStrategy) Name() string { return nodes.StrategyTor } +// Signal kicks off a connection attempt; idempotent while one is in flight. func (s *TorLinkStrategy) Signal(ctx *astral.Context) { s.mu.Lock() if s.done != nil { @@ -202,6 +203,8 @@ func (s *TorLinkStrategy) tryEndpoint(ctx *astral.Context, endpoint *nodes.Endpo return link } +// Done closes once the quick (foreground) phase ends; background retries may +// still be running. Returns an already-closed channel if Signal never ran. func (s *TorLinkStrategy) Done() <-chan struct{} { s.mu.Lock() defer s.mu.Unlock() diff --git a/mod/nodes/tasks.go b/mod/nodes/tasks.go index 450ea5b25..f379f6c9e 100644 --- a/mod/nodes/tasks.go +++ b/mod/nodes/tasks.go @@ -2,6 +2,7 @@ package nodes import "github.com/cryptopunkscc/astrald/mod/scheduler" +// LinkProducerTask is a scheduler task whose Result is valid only after the task completes. type LinkProducerTask interface { scheduler.Task Result() (info *LinkInfo, err error) diff --git a/mod/objects/client/describe.go b/mod/objects/client/describe.go index 2311ff151..696f86fc6 100644 --- a/mod/objects/client/describe.go +++ b/mod/objects/client/describe.go @@ -10,6 +10,8 @@ import ( _ "github.com/cryptopunkscc/astrald/mod/all/pub" ) +// Describe streams descriptors on the returned channel until EOS, then closes it. +// The error pointer is only valid once the channel is closed. func (client *Client) Describe(ctx *astral.Context, objectID *astral.ObjectID) (<-chan *objects.Descriptor, *error) { ch, err := client.queryCh(ctx, objects.MethodDescribe, query.Args{ "id": objectID.String(), diff --git a/mod/objects/client/find.go b/mod/objects/client/find.go index 3a94379cf..5d7034e7d 100644 --- a/mod/objects/client/find.go +++ b/mod/objects/client/find.go @@ -8,6 +8,8 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Find streams identities holding the object until EOS, then closes the channel. +// Zero identities are skipped. The error pointer is only valid once the channel is closed. func (client *Client) Find(ctx *astral.Context, objectID *astral.ObjectID) (<-chan *astral.Identity, *error) { ch, err := client.queryCh(ctx, objects.MethodFind, query.Args{ "id": objectID, diff --git a/mod/objects/client/purge.go b/mod/objects/client/purge.go index d1a636fc3..c8a5956c5 100644 --- a/mod/objects/client/purge.go +++ b/mod/objects/client/purge.go @@ -8,6 +8,8 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Purge streams the IDs of purged objects until EOS, then closes the channel. +// The error pointer is only valid once the channel is closed. func (client *Client) Purge(ctx *astral.Context, repo string) (<-chan *astral.ObjectID, *error) { ch, err := client.queryCh(ctx, objects.MethodPurge, query.Args{ "repo": repo, diff --git a/mod/objects/client/push.go b/mod/objects/client/push.go index 5b19f8fe1..4f0a113bf 100644 --- a/mod/objects/client/push.go +++ b/mod/objects/client/push.go @@ -8,6 +8,7 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// Push sends the object and fails with "rejected" if the node does not accept it. func (client *Client) Push(ctx *astral.Context, object astral.Object) error { ch, err := client.queryCh(ctx, objects.MethodPush, nil) if err != nil { diff --git a/mod/objects/client/read.go b/mod/objects/client/read.go index 5c7c8c6c1..b062cbf0f 100644 --- a/mod/objects/client/read.go +++ b/mod/objects/client/read.go @@ -8,6 +8,7 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// Read returns a stream of the object's bytes from offset; caller must Close it. func (client *Client) Read(ctx *astral.Context, objectID *astral.ObjectID, offset, limit int64) (io.ReadCloser, error) { return client.query(ctx, objects.MethodRead, query.Args{ "id": objectID, diff --git a/mod/objects/client/register_describer.go b/mod/objects/client/register_describer.go index 834f276fc..6343aa0d3 100644 --- a/mod/objects/client/register_describer.go +++ b/mod/objects/client/register_describer.go @@ -6,6 +6,7 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// RegisterDescriber registers the caller as a describer provider and blocks until acked. func (client *Client) RegisterDescriber(ctx *astral.Context) error { ch, err := client.queryCh(ctx, objects.MethodRegisterDescriber, nil) if err != nil { diff --git a/mod/objects/client/register_finder.go b/mod/objects/client/register_finder.go index ef703ede6..40d4b095c 100644 --- a/mod/objects/client/register_finder.go +++ b/mod/objects/client/register_finder.go @@ -6,6 +6,7 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// RegisterFinder registers the caller as a finder provider and blocks until acked. func (client *Client) RegisterFinder(ctx *astral.Context) error { ch, err := client.queryCh(ctx, objects.MethodRegisterFinder, nil) if err != nil { diff --git a/mod/objects/client/register_searcher.go b/mod/objects/client/register_searcher.go index f764e57a2..46a245ecf 100644 --- a/mod/objects/client/register_searcher.go +++ b/mod/objects/client/register_searcher.go @@ -6,6 +6,7 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// RegisterSearcher registers the caller as a searcher provider and blocks until acked. func (client *Client) RegisterSearcher(ctx *astral.Context) error { ch, err := client.queryCh(ctx, objects.MethodRegisterSearcher, nil) if err != nil { diff --git a/mod/objects/client/scan.go b/mod/objects/client/scan.go index 40e2be15f..c93563f59 100644 --- a/mod/objects/client/scan.go +++ b/mod/objects/client/scan.go @@ -7,6 +7,8 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// Scan streams the repo's object IDs over the returned channel; the error pointer is valid only after it closes. +// With follow, a nil ID separates the initial snapshot from subsequent live updates. func (client *Client) Scan(ctx *astral.Context, repo string, follow bool) (<-chan *astral.ObjectID, *error) { ch, err := client.queryCh(ctx, objects.MethodScan, query.Args{ "repo": repo, diff --git a/mod/objects/client/search.go b/mod/objects/client/search.go index affa9a7c8..21b53e2dc 100644 --- a/mod/objects/client/search.go +++ b/mod/objects/client/search.go @@ -8,6 +8,7 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Search streams results over the returned channel; the error pointer is valid only after it closes. func (client *Client) Search(ctx *astral.Context, q objects.SearchQuery) (<-chan *objects.SearchResult, *error) { ch, err := client.queryCh(ctx, objects.MethodSearch, query.Args{ "q": q, diff --git a/mod/objects/fs/file.go b/mod/objects/fs/file.go index b0b223009..e462295b8 100644 --- a/mod/objects/fs/file.go +++ b/mod/objects/fs/file.go @@ -14,6 +14,7 @@ type File struct { io.ReadCloser } +// Seek delegates to the underlying reader, or returns ErrUnsupported if it is not a Seeker. func (f File) Seek(offset int64, whence int) (int64, error) { if s, ok := f.ReadCloser.(io.Seeker); ok { return s.Seek(offset, whence) diff --git a/mod/objects/fs/fs.go b/mod/objects/fs/fs.go index 94c35033c..7e8efa95b 100644 --- a/mod/objects/fs/fs.go +++ b/mod/objects/fs/fs.go @@ -27,6 +27,8 @@ func (f *FS) Open(name string) (fs.File, error) { return f.OpenContext(astral.NewContext(nil).WithZone(astral.ZoneAll), name) } +// OpenContext opens an object by its ID; name must parse as an object ID, not a path. +// The read is bounded by openTimeout. func (f *FS) OpenContext(ctx *astral.Context, name string) (fs.File, error) { // parse object id objectID, err := astral.ParseID(name) diff --git a/mod/objects/load.go b/mod/objects/load.go index 6b07f60ec..3ed10e52a 100644 --- a/mod/objects/load.go +++ b/mod/objects/load.go @@ -8,6 +8,8 @@ import ( "github.com/cryptopunkscc/astrald/astral" ) +// Load reads an object from repo, decodes it, and type-asserts it to T. +// Returns ErrObjectTooLarge if the object exceeds MaxObjectSize, or an error if the decoded type is not T. func Load[T astral.Object](ctx *astral.Context, repo Repository, objectID *astral.ObjectID) (o T, err error) { if int64(objectID.Size) > MaxObjectSize { return o, ErrObjectTooLarge diff --git a/mod/objects/mem/reader.go b/mod/objects/mem/reader.go index b96a4c838..bb4462388 100644 --- a/mod/objects/mem/reader.go +++ b/mod/objects/mem/reader.go @@ -34,6 +34,7 @@ func (r *Reader) Read(p []byte) (n int, err error) { return r.r.Read(p) } +// Seek is unsupported; this reader is forward-only. func (r *Reader) Seek(offset int64, whence int) (int64, error) { return 0, errors.ErrUnsupported } diff --git a/mod/objects/mem/repository.go b/mod/objects/mem/repository.go index 96c5c40aa..48704a9f4 100644 --- a/mod/objects/mem/repository.go +++ b/mod/objects/mem/repository.go @@ -58,6 +58,7 @@ func (repo *Repository) Contains(ctx *astral.Context, objectID *astral.ObjectID) return slices.Contains(repo.objects.Keys(), objectID.String()), nil } +// Read serves only the device zone; other zones are excluded. func (repo *Repository) Read(ctx *astral.Context, objectID *astral.ObjectID, offset int64, limit int64) (objects.Reader, error) { if !ctx.Zone().Is(astral.ZoneDevice) { return nil, astral.ErrZoneExcluded @@ -76,6 +77,8 @@ func (repo *Repository) Read(ctx *astral.Context, objectID *astral.ObjectID, off return NewReader(bytes[s:e], repo), nil } +// Scan streams existing object IDs, then closes unless follow is set. +// When following, a nil sentinel separates the initial set from live additions. func (repo *Repository) Scan(ctx *astral.Context, follow bool) (<-chan *astral.ObjectID, error) { ch := make(chan *astral.ObjectID) diff --git a/mod/objects/mem/writer.go b/mod/objects/mem/writer.go index d263cadb7..dda32a98b 100644 --- a/mod/objects/mem/writer.go +++ b/mod/objects/mem/writer.go @@ -35,6 +35,8 @@ func (w *Writer) Write(p []byte) (n int, err error) { return n, err } +// Commit resolves the buffered bytes to an object ID and stores them. +// Idempotent: only the first call succeeds; later calls return ErrClosedPipe. func (w *Writer) Commit() (*astral.ObjectID, error) { if !w.closed.CompareAndSwap(false, true) { return nil, objects.ErrClosedPipe diff --git a/mod/objects/module.go b/mod/objects/module.go index ad4ff68d6..30e498ab7 100644 --- a/mod/objects/module.go +++ b/mod/objects/module.go @@ -110,6 +110,7 @@ type Receiver interface { ReceiveObject(Drop) error } +// Drop is a received object pending a decision; Accept resolves it, persisting when save is true. type Drop interface { SenderID() *astral.Identity Object() astral.Object @@ -124,6 +125,8 @@ type Holder interface { HoldObject(*astral.ObjectID) bool } +// IsOffsetLimitValid reports whether the offset/limit window fits within the object. +// A limit of 0 is treated as a valid zero-length window, not "read to end". func IsOffsetLimitValid(objectID *astral.ObjectID, offset int64, limit int64) bool { // offset has to be non-negative and cannot be larger than the object if offset < 0 || offset > int64(objectID.Size) { diff --git a/mod/objects/nil_repository.go b/mod/objects/nil_repository.go index d54978a30..462e2b0da 100644 --- a/mod/objects/nil_repository.go +++ b/mod/objects/nil_repository.go @@ -6,6 +6,7 @@ import ( "io" ) +// NilRepository is a Repository stub whose every operation returns ErrUnsupported. type NilRepository struct { } diff --git a/mod/objects/read_seeker.go b/mod/objects/read_seeker.go index a0f8c67f8..dee552211 100644 --- a/mod/objects/read_seeker.go +++ b/mod/objects/read_seeker.go @@ -7,6 +7,8 @@ import ( "sync" ) +// ReadSeeker adapts a repository's offset-based Read into an io.ReadSeeker. +// note: Seek reopens the underlying reader at the new position rather than seeking in place. type ReadSeeker struct { readerID *astral.Identity objectID *astral.ObjectID @@ -43,6 +45,7 @@ func (r *ReadSeeker) Read(p []byte) (n int, err error) { return n, err } +// Seek reopens the underlying reader at the resolved position; it does not validate against object bounds. func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { r.mu.Lock() defer r.mu.Unlock() diff --git a/mod/objects/search.go b/mod/objects/search.go index cd2fe1e54..3070c9ebd 100644 --- a/mod/objects/search.go +++ b/mod/objects/search.go @@ -9,6 +9,7 @@ type Searcher interface { SearchObject(ctx *astral.Context, query SearchQuery) (<-chan *SearchResult, error) } +// SearchPreprocessor is a hook that mutates a Search in place before it runs. type SearchPreprocessor interface { PreprocessSearch(*Search) } diff --git a/mod/objects/source_identifier.go b/mod/objects/source_identifier.go index 436794aa3..c60d6686a 100644 --- a/mod/objects/source_identifier.go +++ b/mod/objects/source_identifier.go @@ -8,6 +8,9 @@ type SourceIdentifier interface { SourceIdentity() *astral.Identity } +// SourceIdentity extracts the source identity from v if it implements SourceIdentifier. +// The bool reports whether v implements the interface; err is non-nil only when it does +// but yields a nil or invalid identity. func SourceIdentity(v any) (*astral.Identity, bool, error) { source, ok := v.(SourceIdentifier) if !ok { diff --git a/mod/objects/src/deps.go b/mod/objects/src/deps.go index 8ad3e6d23..002f2f29f 100644 --- a/mod/objects/src/deps.go +++ b/mod/objects/src/deps.go @@ -6,6 +6,9 @@ import ( "github.com/cryptopunkscc/astrald/mod/objects" ) +// LoadDependencies injects core deps, then scans every other loaded module and +// auto-registers it under whichever objects extension interfaces it implements +// (Describer, Searcher, Finder, etc). func (mod *Module) LoadDependencies(*astral.Context) (err error) { err = core.Inject(mod.node, &mod.Deps) if err != nil { diff --git a/mod/objects/src/describe.go b/mod/objects/src/describe.go index 22d55d300..2adcc23dd 100644 --- a/mod/objects/src/describe.go +++ b/mod/objects/src/describe.go @@ -8,6 +8,9 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Describe fans the query out to all registered describers in parallel and +// merges their descriptors onto one channel, closed when every describer is +// done or ctx is cancelled. func (mod *Module) Describe(ctx *astral.Context, objectID *astral.ObjectID) (<-chan *objects.Descriptor, error) { var results = make(chan *objects.Descriptor) @@ -50,6 +53,8 @@ func (mod *Module) Describe(ctx *astral.Context, objectID *astral.ObjectID) (<-c return results, nil } +// AddDescriber registers a describer, skipping it if one with the same source +// identity is already registered. func (mod *Module) AddDescriber(describer objects.Describer) error { source, ok, err := objects.SourceIdentity(describer) if err != nil { diff --git a/mod/objects/src/drop.go b/mod/objects/src/drop.go index 09d4663dc..80fc7108c 100644 --- a/mod/objects/src/drop.go +++ b/mod/objects/src/drop.go @@ -26,6 +26,8 @@ func (drop *Drop) Object() astral.Object { return drop.object } +// Accept marks the drop accepted and, if save is true, persists the object to +// the target repo. Persistence is idempotent: a second save call is a no-op. func (drop *Drop) Accept(save bool) error { drop.accepted.Set(true) if !save { diff --git a/mod/objects/src/external_describer.go b/mod/objects/src/external_describer.go index 5e3f69fb6..70db9fe23 100644 --- a/mod/objects/src/external_describer.go +++ b/mod/objects/src/external_describer.go @@ -32,6 +32,9 @@ func NewExternalDescriber(mod *Module, id *astral.Identity) *ExternalDescriber { func (d *ExternalDescriber) SourceIdentity() *astral.Identity { return d.id } +// DescribeObject queries the remote peer and relays its descriptors, stamping +// each with the peer's identity. The stream runs under a per-call timeout and +// closes when it ends, errors, or the timeout fires. func (d *ExternalDescriber) DescribeObject(ctx *astral.Context, id *astral.ObjectID) (<-chan *objects.Descriptor, error) { ctx, cancel := ctx.WithTimeout(d.timeout) diff --git a/mod/objects/src/external_finder.go b/mod/objects/src/external_finder.go index a675b97b5..cf3633be4 100644 --- a/mod/objects/src/external_finder.go +++ b/mod/objects/src/external_finder.go @@ -31,6 +31,9 @@ func NewExternalFinder(mod *Module, id *astral.Identity) *ExternalFinder { func (f *ExternalFinder) SourceIdentity() *astral.Identity { return f.id } +// FindObject queries the remote peer and relays the provider identities it +// returns. The stream runs under a per-call timeout and closes when it ends, +// errors, or the timeout fires. func (f *ExternalFinder) FindObject(ctx *astral.Context, id *astral.ObjectID) (<-chan *astral.Identity, error) { providerCtx, cancel := ctx.WithTimeout(f.timeout) diff --git a/mod/objects/src/external_searcher.go b/mod/objects/src/external_searcher.go index df6a500b6..5d76a04ce 100644 --- a/mod/objects/src/external_searcher.go +++ b/mod/objects/src/external_searcher.go @@ -32,6 +32,9 @@ func NewExternalSearcher(mod *Module, id *astral.Identity) *ExternalSearcher { func (s *ExternalSearcher) SourceIdentity() *astral.Identity { return s.id } +// SearchObject runs the query against the remote peer and relays its results, +// stamping each with the peer's identity. The stream runs under a per-call +// timeout and closes when it ends, errors, or the timeout fires. func (s *ExternalSearcher) SearchObject(ctx *astral.Context, q objects.SearchQuery) (<-chan *objects.SearchResult, error) { providerCtx, cancel := ctx.WithTimeout(s.timeout) in, errPtr := s.client.Search(providerCtx, q) diff --git a/mod/objects/src/find.go b/mod/objects/src/find.go index abd73f52a..591f49115 100644 --- a/mod/objects/src/find.go +++ b/mod/objects/src/find.go @@ -8,6 +8,9 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Find fans the query out to all registered finders in parallel and merges the +// provider identities they return onto one channel, closed once every finder is +// done or ctx is cancelled. func (mod *Module) Find(ctx *astral.Context, objectID *astral.ObjectID) (<-chan *astral.Identity, error) { finders := mod.finders.Clone() results := make(chan *astral.Identity) @@ -50,6 +53,8 @@ func (mod *Module) Find(ctx *astral.Context, objectID *astral.ObjectID) (<-chan return results, nil } +// AddFinder registers a finder, skipping it if one with the same source +// identity is already registered. func (mod *Module) AddFinder(finder objects.Finder) error { source, ok, err := objects.SourceIdentity(finder) if err != nil { diff --git a/mod/objects/src/module.go b/mod/objects/src/module.go index 7f4336a02..e6b01e8c8 100644 --- a/mod/objects/src/module.go +++ b/mod/objects/src/module.go @@ -54,6 +54,8 @@ type Module struct { objectsReadsJournal *objectsReadsJournal } +// Run blocks until ctx is cancelled, then flushes any pending object reads +// before returning. func (mod *Module) Run(ctx *astral.Context) error { mod.ctx = ctx @@ -67,6 +69,9 @@ func (mod *Module) Run(ctx *astral.Context) error { return nil } +// Load reads and decodes an object from repo. Data that isn't a valid astral +// object is returned as an *astral.Blob rather than an error. Marks the read in +// the reads journal and tracks the type for decoded objects. func (mod *Module) Load(ctx *astral.Context, repo objects.Repository, objectID *astral.ObjectID) (astral.Object, error) { // read the object data r, err := repo.Read(ctx, objectID, 0, 0) @@ -154,6 +159,9 @@ func (mod *Module) GetType(ctx *astral.Context, objectID *astral.ObjectID) (obje return t.String(), nil } +// Probe reads only the object's header and reports its type, MIME, source repo, +// and read latency without loading the full payload. Tracks the type when the +// object carries a valid astral stamp. func (mod *Module) Probe(ctx *astral.Context, repo objects.Repository, objectID *astral.ObjectID) (probe *objects.Probe, err error) { probe = &objects.Probe{} diff --git a/mod/objects/src/network_reader.go b/mod/objects/src/network_reader.go index 99a9ed72a..71643b0b6 100644 --- a/mod/objects/src/network_reader.go +++ b/mod/objects/src/network_reader.go @@ -9,6 +9,8 @@ import ( "github.com/cryptopunkscc/astrald/lib/query" ) +// NetworkReader streams an object from a remote provider. Seeking reopens the +// underlying connection at a new offset rather than seeking in place. type NetworkReader struct { mod *Module objectID *astral.ObjectID @@ -25,6 +27,8 @@ func (r *NetworkReader) Read(p []byte) (n int, err error) { return n, err } +// Seek closes the current network read and reopens it at the resolved offset by +// issuing a fresh read query to the provider. Returns 0, not the new position. func (r *NetworkReader) Seek(offset int64, whence int) (int64, error) { r.ReadCloser.Close() diff --git a/mod/objects/src/op_contains.go b/mod/objects/src/op_contains.go index 7ce29a7da..83b942d68 100644 --- a/mod/objects/src/op_contains.go +++ b/mod/objects/src/op_contains.go @@ -13,6 +13,8 @@ type opContainsArgs struct { Out string `query:"optional"` } +// OpContains reports whether a repository holds an object. With the ID arg it +// answers once; otherwise it streams a Bool per ObjectID read from the channel. func (mod *Module) OpContains(ctx *astral.Context, q *routing.IncomingQuery, args opContainsArgs) (err error) { ctx = ctx.WithIdentity(q.Caller()) diff --git a/mod/objects/src/op_delete.go b/mod/objects/src/op_delete.go index d2d85c0fc..3a6f2c8e5 100644 --- a/mod/objects/src/op_delete.go +++ b/mod/objects/src/op_delete.go @@ -13,6 +13,9 @@ type opDeleteArgs struct { Zone *astral.Zone `query:"optional"` } +// OpDelete deletes one object (ID arg) or a stream of objects read from the +// channel. Requires an explicit repository — there is no default, to avoid +// accidental deletion. func (mod *Module) OpDelete(ctx *astral.Context, q *routing.IncomingQuery, args opDeleteArgs) (err error) { // prepare the context ctx = ctx.WithIdentity(q.Caller()) diff --git a/mod/objects/src/op_describe.go b/mod/objects/src/op_describe.go index d9ae2ecc6..19d1e95af 100644 --- a/mod/objects/src/op_describe.go +++ b/mod/objects/src/op_describe.go @@ -19,6 +19,8 @@ type opDescribeArgs struct { Except *string `query:"optional"` } +// OpDescribe streams an object's descriptors, filtered by the Only/Except type +// lists, and terminates the stream with an EOS marker. func (mod *Module) OpDescribe(ctx *astral.Context, q *routing.IncomingQuery, args opDescribeArgs) (err error) { ctx, cancel := ctx.WithIdentity(q.Caller()).IncludeZone(args.Zone).WithTimeout(time.Minute) defer cancel() diff --git a/mod/objects/src/op_echo.go b/mod/objects/src/op_echo.go index 4d1a8ce26..cefb6ca58 100644 --- a/mod/objects/src/op_echo.go +++ b/mod/objects/src/op_echo.go @@ -20,6 +20,9 @@ type opEchoArgs struct { Out string `query:"optional"` } +// OpEcho relays received objects back, optionally filtered by Only/Except and +// stopped on the Stop type. Strict mode fails on objects whose blueprint isn't +// registered; lenient mode passes them through unparsed. func (mod *Module) OpEcho(ctx *astral.Context, q *routing.IncomingQuery, args opEchoArgs) (err error) { // prepare lists var only, except []string diff --git a/mod/objects/src/op_probe.go b/mod/objects/src/op_probe.go index 020d1cd65..ea5361874 100644 --- a/mod/objects/src/op_probe.go +++ b/mod/objects/src/op_probe.go @@ -13,6 +13,8 @@ type opProbeArgs struct { Out string `query:"optional"` } +// OpProbe probes a single object when args.ID is set, otherwise streams probes +// for ObjectIDs received over the channel until EOS. func (mod *Module) OpProbe(ctx *astral.Context, q *routing.IncomingQuery, args opProbeArgs) (err error) { ch := channel.New(q.AcceptRaw(), channel.WithFormats(args.In, args.Out)) defer ch.Close() diff --git a/mod/objects/src/op_purge.go b/mod/objects/src/op_purge.go index cbac2a15f..9146ddd00 100644 --- a/mod/objects/src/op_purge.go +++ b/mod/objects/src/op_purge.go @@ -12,6 +12,8 @@ type opPurgeArgs struct { Zone *astral.Zone `query:"optional"` } +// OpPurge deletes unheld objects from a repository, streaming each purged ObjectID +// then a final error or EOS. Defaults to ZoneAll when no zone is given. func (mod *Module) OpPurge(ctx *astral.Context, q *routing.IncomingQuery, args opPurgeArgs) error { ctx = ctx.WithIdentity(q.Caller()) if args.Zone == nil { diff --git a/mod/objects/src/op_push.go b/mod/objects/src/op_push.go index 6a81abeff..6a9699f68 100644 --- a/mod/objects/src/op_push.go +++ b/mod/objects/src/op_push.go @@ -13,6 +13,8 @@ type opPushArgs struct { Out string `query:"optional"` } +// OpPush receives pushed objects from the caller and replies with a Bool +// per object indicating whether it was accepted. func (mod *Module) OpPush(ctx *astral.Context, q *routing.IncomingQuery, args opPushArgs) (err error) { ch := channel.New(q.AcceptRaw(), channel.WithFormats(args.In, args.Out)) defer ch.Close() diff --git a/mod/objects/src/op_read.go b/mod/objects/src/op_read.go index 0f6d53a5c..1b47d073d 100644 --- a/mod/objects/src/op_read.go +++ b/mod/objects/src/op_read.go @@ -17,6 +17,8 @@ type opReadArgs struct { Repo string `query:"optional"` } +// OpRead authorizes the caller, then streams raw object bytes over the accepted +// connection. Records the access in the reads journal, which feeds purge ordering. func (mod *Module) OpRead(ctx *astral.Context, q *routing.IncomingQuery, args opReadArgs) (err error) { ctx = ctx.IncludeZone(args.Zone) repo := mod.ReadDefault() diff --git a/mod/objects/src/op_register_describer.go b/mod/objects/src/op_register_describer.go index 1702d4774..e543128f9 100644 --- a/mod/objects/src/op_register_describer.go +++ b/mod/objects/src/op_register_describer.go @@ -12,6 +12,8 @@ type opRegisterDescriberArgs struct { Out string `query:"optional"` } +// OpRegisterDescriber registers the caller as an external describer. +// Rejects network-origin callers and self-registration by the node. func (mod *Module) OpRegisterDescriber(ctx *astral.Context, q *routing.IncomingQuery, args opRegisterDescriberArgs) error { // Keep this local for now; extract shared external registration validation once the API settles. if q.Origin() == astral.OriginNetwork { diff --git a/mod/objects/src/op_register_finder.go b/mod/objects/src/op_register_finder.go index b49100e03..8e0182832 100644 --- a/mod/objects/src/op_register_finder.go +++ b/mod/objects/src/op_register_finder.go @@ -12,6 +12,8 @@ type opRegisterFinderArgs struct { Out string `query:"optional"` } +// OpRegisterFinder registers the caller as an external finder. +// Rejects network-origin callers and self-registration by the node. func (mod *Module) OpRegisterFinder(ctx *astral.Context, q *routing.IncomingQuery, args opRegisterFinderArgs) error { // Keep this local for now; extract shared external registration validation once the API settles. if q.Origin() == astral.OriginNetwork { diff --git a/mod/objects/src/op_register_searcher.go b/mod/objects/src/op_register_searcher.go index 87b2de069..1dbb1f749 100644 --- a/mod/objects/src/op_register_searcher.go +++ b/mod/objects/src/op_register_searcher.go @@ -12,6 +12,8 @@ type opRegisterSearcherArgs struct { Out string `query:"optional"` } +// OpRegisterSearcher registers the caller as an external searcher. +// Rejects network-origin callers and self-registration by the node. func (mod *Module) OpRegisterSearcher(ctx *astral.Context, q *routing.IncomingQuery, args opRegisterSearcherArgs) error { // Keep this local for now; extract shared external registration validation once the API settles. if q.Origin() == astral.OriginNetwork { diff --git a/mod/objects/src/op_search.go b/mod/objects/src/op_search.go index 48c4b7f92..cc41ad2b7 100644 --- a/mod/objects/src/op_search.go +++ b/mod/objects/src/op_search.go @@ -18,6 +18,8 @@ type SearchArgs struct { Out string `query:"optional"` } +// OpSearch streams matches for the query, deduplicated by ObjectID and +// optionally filtered to objects the named repo contains. Bounded to one minute. func (mod *Module) OpSearch(ctx *astral.Context, q *routing.IncomingQuery, args SearchArgs) (err error) { ctx, cancel := ctx.WithIdentity(q.Caller()).IncludeZone(args.Zone).WithTimeout(time.Minute) defer cancel() diff --git a/mod/objects/src/receive.go b/mod/objects/src/receive.go index 3344f3f2c..ae998b8ba 100644 --- a/mod/objects/src/receive.go +++ b/mod/objects/src/receive.go @@ -15,6 +15,7 @@ func (mod *Module) AddReceiver(receiver objects.Receiver) error { return mod.receivers.Add(receiver) } +// Receive dispatches obj to all registered receivers; returns an error if no receiver accepts it. func (mod *Module) Receive(obj astral.Object, source *astral.Identity) (err error) { if source.IsZero() { source = mod.node.Identity() diff --git a/mod/objects/src/repo_group.go b/mod/objects/src/repo_group.go index 0b23d3812..5d87c6d66 100644 --- a/mod/objects/src/repo_group.go +++ b/mod/objects/src/repo_group.go @@ -10,6 +10,7 @@ import ( ) type RepoGroup struct { + // Concurrent makes Read race all members and return the first hit; otherwise members are tried in order. Concurrent bool mod *Module label string @@ -65,6 +66,8 @@ func (group *RepoGroup) Contains(ctx *astral.Context, objectID *astral.ObjectID) return false, nil } +// Scan merges the object streams of all members. +// With follow, a nil sentinel is emitted once every member has drained its backlog, then live updates continue. func (group *RepoGroup) Scan(ctx *astral.Context, follow bool) (<-chan *astral.ObjectID, error) { ch := make(chan *astral.ObjectID) diff --git a/mod/objects/src/repositories.go b/mod/objects/src/repositories.go index e3a4bee11..000e48e1f 100644 --- a/mod/objects/src/repositories.go +++ b/mod/objects/src/repositories.go @@ -21,6 +21,7 @@ func (mod *Module) GetRepository(name string) (repo objects.Repository) { return } +// RemoveRepository removes the named repository, drops it from every group, and fires its AfterRemoved callback if implemented. func (mod *Module) RemoveRepository(name string) error { if len(name) == 0 { return errors.New("name is empty") diff --git a/mod/objects/src/search.go b/mod/objects/src/search.go index ba7da38f7..7df4195a1 100644 --- a/mod/objects/src/search.go +++ b/mod/objects/src/search.go @@ -10,6 +10,8 @@ import ( "github.com/cryptopunkscc/astrald/sig" ) +// Search runs all local searchers, and network searchers when the context zone permits, merging their results into one channel. +// The channel closes once every searcher finishes; the returned error reports only setup failures. func (mod *Module) Search(ctx *astral.Context, query objects.SearchQuery) (<-chan *objects.SearchResult, error) { search := &objects.Search{ CallerID: ctx.Identity(), @@ -105,6 +107,7 @@ func (mod *Module) Search(ctx *astral.Context, query objects.SearchQuery) (<-cha return results, nil } +// AddSearcher registers a searcher, deduplicating by source identity so each source is added at most once. func (mod *Module) AddSearcher(searcher objects.Searcher) error { source, ok, err := objects.SourceIdentity(searcher) if err != nil {