Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mod/nodes/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ func New(targetID *astral.Identity, client *astrald.Client) *Client {
return &Client{astral: client, targetID: targetID}
}

// Default returns the shared package-level client, lazily creating it on first use.
func Default() *Client {
if defaultClient == nil {
defaultClient = New(nil, astrald.Default())
}
return defaultClient
}

// WithTarget returns a copy retargeted at the given identity; the receiver is unchanged.
func (client *Client) WithTarget(target *astral.Identity) *Client {
return &Client{astral: client.astral, targetID: target}
}
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/frames/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"github.com/cryptopunkscc/astrald/astral"
)

// Frame is a message exchanged over a node link.
type Frame interface {
astral.Object
fmt.Stringer
}

// FrameTypes maps frame opcodes to object types by slice index.
// note: order is the wire opcode; never reorder or remove entries.
var FrameTypes = []string{
"nodes.frames.ping",
"nodes.frames.query",
Expand Down
5 changes: 5 additions & 0 deletions mod/nodes/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
MaxDataFrameSize = 8192
)

// Module manages encrypted links between nodes: establishing them, resolving
// peer endpoints, and tracking liveness.
type Module interface {
EstablishInboundLink(ctx context.Context, conn exonet.Conn) error
EstablishOutboundLink(ctx context.Context, remoteID *astral.Identity, conn exonet.Conn) (Link, error)
Expand Down Expand Up @@ -66,10 +68,13 @@ type Link interface {
Done() <-chan struct{}
}

// EndpointResolver resolves the network endpoints at which an identity can be reached.
type EndpointResolver interface {
ResolveEndpoints(*astral.Context, *astral.Identity) (<-chan *EndpointWithTTL, error)
}

// LinkStrategy drives one approach to establishing a link (e.g. direct, NAT, Tor);
// Done closes once the strategy has finished, whether or not it produced a link.
type LinkStrategy interface {
Name() string
Signal(ctx *astral.Context)
Expand Down
1 change: 1 addition & 0 deletions mod/nodes/src/authorizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/cryptopunkscc/astrald/mod/nodes"
)

// AuthorizeRelayFor grants relaying only when the actor relays for its own identity.
func (mod *Module) AuthorizeRelayFor(ctx *astral.Context, a *nodes.RelayForAction) bool {
return a.Actor().IsEqual(a.ForID)
}
4 changes: 4 additions & 0 deletions mod/nodes/src/basic_link_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var _ nodes.LinkStrategy = &BasicLinkStrategy{}

func (s *BasicLinkStrategy) Name() string { return nodes.StrategyBasic }

// Signal starts a dialing round in the background; concurrent calls while a round
// is still active are ignored. The first successful link wins, the rest are closed.
func (s *BasicLinkStrategy) Signal(ctx *astral.Context) {
s.mu.Lock()
if s.activeDone != nil {
Expand Down Expand Up @@ -101,6 +103,8 @@ func (s *BasicLinkStrategy) Signal(ctx *astral.Context) {
}()
}

// Done returns a channel closed when the active round finishes; a closed channel
// is returned immediately when no round is running.
func (s *BasicLinkStrategy) Done() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions mod/nodes/src/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Deps struct {
Events events.Module
}

// LoadDependencies injects deps and registers the "linked" directory filter and
// the relay-for authorizer.
func (mod *Module) LoadDependencies(*astral.Context) (err error) {
err = core.Inject(mod.node, &mod.Deps)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions mod/nodes/src/endpoint_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
nodescli "github.com/cryptopunkscc/astrald/mod/nodes/client"
)

// ResolveEndpoints fans out to all registered resolvers and merges their results
// into one channel, closed once every resolver finishes.
func (mod *Module) ResolveEndpoints(ctx *astral.Context, nodeID *astral.Identity) (_ <-chan *nodes.EndpointWithTTL, err error) {
var ch = make(chan *nodes.EndpointWithTTL)

Expand Down Expand Up @@ -62,6 +64,8 @@ func (mod *Module) runResolver(ctx *astral.Context, r nodes.EndpointResolver, no
}
}

// UpdateNodeEndpoints asks a remote resolver node for identity's endpoints and
// stores them locally. Per-endpoint store errors are logged, not returned.
func (mod *Module) UpdateNodeEndpoints(ctx *astral.Context, resolver *astral.Identity, identity *astral.Identity) error {
client := nodescli.New(resolver, astrald.Default())

Expand Down
4 changes: 4 additions & 0 deletions mod/nodes/src/input_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func NewInputBuffer(size int, onRead func(int)) *InputBuffer {
return &InputBuffer{size: size, onRead: onRead, shut: make(chan struct{}), done: make(chan struct{})}
}

// Push appends a whole chunk. All-or-nothing: rejects with ErrBufferOverflow rather than storing a partial chunk.
func (b *InputBuffer) Push(p []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -49,6 +50,7 @@ func (b *InputBuffer) Push(p []byte) error {
return nil
}

// Read never blocks: on an empty open buffer it returns *ErrBufferEmpty carrying a wake channel; on an empty closed buffer it returns io.EOF.
func (b *InputBuffer) Read(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down Expand Up @@ -101,10 +103,12 @@ func (b *InputBuffer) IsEmpty() bool {
return b.used == 0
}

// Closed is closed when Close is called, before remaining data is drained.
func (b *InputBuffer) Closed() <-chan struct{} {
return b.shut
}

// Done is closed once the buffer is both closed and fully drained.
func (b *InputBuffer) Done() <-chan struct{} {
return b.done
}
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Ping struct {
pong chan struct{}
}

// Link is a live mux channel to a peer with its own ping/keepalive loop and pressure tracking; closes once on first error.
type Link struct {
*channel.Channel
mux *Mux
Expand Down Expand Up @@ -51,6 +52,7 @@ func (s *Link) RemoteIdentity() *astral.Identity {
return s.conn.RemoteIdentity()
}

// CloseWithError records err as the link's cause of death; only the first call's error is kept.
func (s *Link) CloseWithError(err error) error {
if err == nil {
err = errors.New("link closed")
Expand Down Expand Up @@ -142,6 +144,7 @@ func (s *Link) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.Wri
return s.mux.RouteQuery(ctx, q, w)
}

// Ping sends one ping and blocks for the pong. Only one ping may be in flight; concurrent calls error.
func (s *Link) Ping() (time.Duration, error) {
p := &Ping{
nonce: astral.NewNonce(),
Expand Down
4 changes: 4 additions & 0 deletions mod/nodes/src/link_negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type muxLinkNegotiator struct {
ch *channel.Channel
}

// NegotiateOutbound reads the peer's feature list, selects mux2, and waits for the link nonce; fails if mux2 is absent or rejected.
func (n *muxLinkNegotiator) NegotiateOutbound() (*Link, error) {
var count *astral.Uint16
if err := n.ch.Switch(channel.Expect(&count), channel.PassErrors); err != nil {
Expand Down Expand Up @@ -71,6 +72,7 @@ func (n *muxLinkNegotiator) NegotiateOutbound() (*Link, error) {
return link, nil
}

// NegotiateInbound offers mux2, confirms the peer selected it, and assigns the link nonce.
func (n *muxLinkNegotiator) NegotiateInbound() (*Link, error) {
if err := n.ch.Send(astral.NewUint16(1)); err != nil {
return nil, fmt.Errorf("send feature count: %w", err)
Expand Down Expand Up @@ -108,6 +110,7 @@ func (n *muxLinkNegotiator) NegotiateInbound() (*Link, error) {
return link, nil
}

// EstablishOutboundLink runs the noise handshake and mux negotiation over conn, then registers the link; closes conn on any error.
func (mod *Module) EstablishOutboundLink(ctx context.Context, remoteID *astral.Identity, conn exonet.Conn) (_ nodes.Link, err error) {
defer func() {
if err != nil {
Expand Down Expand Up @@ -142,6 +145,7 @@ func (mod *Module) EstablishOutboundLink(ctx context.Context, remoteID *astral.I
return link, nil
}

// EstablishInboundLink runs the inbound noise handshake and mux negotiation over conn, then registers the link; closes conn on any error.
func (mod *Module) EstablishInboundLink(ctx context.Context, conn exonet.Conn) (err error) {
defer func() {
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions mod/nodes/src/link_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewLinkPool(mod *Module) *LinkPool {
}
}

// AddLink registers link, wires its mux router, emits a created event, and spawns a goroutine that removes it and closes its sessions when it dies.
func (pool *LinkPool) AddLink(link *Link) error {
dir := "in"
netName := "unknown network"
Expand Down Expand Up @@ -158,6 +159,7 @@ func (pool *LinkPool) getOrCreateNodeLinker(target *astral.Identity) *NodeLinker
return linker
}

// RetrieveLink returns a matching existing link immediately unless ForceNew is set; otherwise it activates linking strategies and delivers the first link that matches, ctx cancellation, or ErrLinkNotProduced.
func (pool *LinkPool) RetrieveLink(
ctx *astral.Context,
target *astral.Identity,
Expand Down
1 change: 1 addition & 0 deletions mod/nodes/src/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Module struct {
privateKey *crypto.PrivateKey
}

// Run schedules the endpoint-cleanup task and blocks until ctx is cancelled.
func (mod *Module) Run(ctx *astral.Context) error {
mod.ctx = ctx.IncludeZone(astral.ZoneNetwork)
<-mod.Deps.Scheduler.Ready()
Expand Down
4 changes: 4 additions & 0 deletions mod/nodes/src/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cryptopunkscc/astrald/sig"
)

// Mux multiplexes many query sessions over one link channel, dispatching inbound frames and tracking per-session state.
type Mux struct {
mod *Module
ch *channel.Channel
Expand Down Expand Up @@ -66,6 +67,7 @@ func (m *Mux) SendMigrateFrame(nonce astral.Nonce) error {
return m.ch.Send(&frames.Migrate{Nonce: nonce})
}

// SetRouter installs the router once; subsequent calls are ignored. Unblocks waitRouter.
func (m *Mux) SetRouter(r astral.Router) {
if !m.routerOnce.CompareAndSwap(false, true) {
return
Expand All @@ -74,6 +76,7 @@ func (m *Mux) SetRouter(r astral.Router) {
close(m.routerSet)
}

// RouteQuery opens a session and sends a query (or relay query) frame, then blocks for the peer's routing result or ctx cancellation; on accept it pumps the session into w.
func (m *Mux) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.WriteCloser) (_ io.WriteCloser, err error) {
sourceID := m.RemoteIdentity()
if q.Caller.IsEqual(ctx.Identity()) {
Expand Down Expand Up @@ -124,6 +127,7 @@ func (m *Mux) RouteQuery(ctx *astral.Context, q *astral.InFlightQuery, w io.Writ
}
}

// Handle dispatches one inbound frame; query frames are handled in their own goroutine, the rest inline on the read loop.
func (m *Mux) Handle(obj astral.Object) error {
frame, ok := obj.(frames.Frame)
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/mux_session_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (r *muxSessionReader) Close() error {
return nil
}

// Push appends to the current buffer, falling through to the queued next buffer if the current one is already closed (mid-migration).
func (r *muxSessionReader) Push(p []byte) error {
r.cond.L.Lock()
buf := r.buf
Expand Down Expand Up @@ -83,6 +84,7 @@ func (r *muxSessionReader) Resume() {
r.cond.Broadcast()
}

// Advance closes the current buffer and, once it has drained, promotes the queued next buffer; no-op if none is queued.
func (r *muxSessionReader) Advance() {
r.cond.L.Lock()
defer r.cond.L.Unlock()
Expand All @@ -97,6 +99,7 @@ func (r *muxSessionReader) Advance() {
}
}

// Read blocks while paused or empty, transparently switching to the next buffer at EOF, so migration is invisible to the caller.
func (r *muxSessionReader) Read(p []byte) (n int, err error) {
for {
r.cond.L.Lock()
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/mux_session_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (w *muxSessionWriter) Buf() *OutputBuffer {
return w.buf
}

// SwapBuf installs a new buffer and reset func for the new link, closing the old buffer.
func (w *muxSessionWriter) SwapBuf(buf *OutputBuffer, reset func()) {
w.cond.L.Lock()
old := w.buf
Expand All @@ -41,6 +42,7 @@ func (w *muxSessionWriter) SwapBuf(buf *OutputBuffer, reset func()) {
}
}

// Close sends a Reset frame to the peer and closes the buffer; idempotent. Use PeerClose to skip the Reset.
func (w *muxSessionWriter) Close() error {
w.cond.L.Lock()
if w.closed {
Expand Down Expand Up @@ -94,6 +96,7 @@ func (w *muxSessionWriter) Resume() {
w.cond.Broadcast()
}

// Write blocks while paused and retries when the buffer runs out of flow-control credit, until all of p is written or the writer closes.
func (w *muxSessionWriter) Write(p []byte) (int, error) {
total := 0

Expand Down
2 changes: 2 additions & 0 deletions mod/nodes/src/nat_link_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ nodes.LinkStrategy = &NATLinkStrategy{}

func (s *NATLinkStrategy) Name() string { return nodes.StrategyNAT }

// Signal starts a NAT-traversal attempt in the background; a no-op if one is already running.
func (s *NATLinkStrategy) Signal(ctx *astral.Context) {
s.mu.Lock()
if s.done != nil {
Expand Down Expand Up @@ -183,6 +184,7 @@ func (s *NATLinkStrategy) signalDone() {
}
}

// Done returns a channel closed when the current attempt finishes; already closed when no attempt is running.
func (s *NATLinkStrategy) Done() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions mod/nodes/src/node_linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewNodeLinker(mod *Module, target *astral.Identity) *NodeLinker {
return linker
}

// Activate signals the named strategies and returns a channel closed once all of them finish; an already-closed channel if none match.
func (linker *NodeLinker) Activate(ctx *astral.Context, strategies []string) <-chan struct{} {
var doneChannels []<-chan struct{}
for _, strategy := range strategies {
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/object_describer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/cryptopunkscc/astrald/sig"
)

// DescribeObject fans out to every provider returned by FindObject and merges their
// descriptors into the result channel. Filtered providers are skipped. Requires the
// network zone. The channel closes once all providers are exhausted.
func (mod *Module) DescribeObject(ctx *astral.Context, objectID *astral.ObjectID) (<-chan *objects.Descriptor, error) {
if !ctx.Zone().Is(astral.ZoneNetwork) {
return nil, astral.ErrZoneExcluded
Expand Down
2 changes: 2 additions & 0 deletions mod/nodes/src/object_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/cryptopunkscc/astrald/astral"
)

// FindObject returns providers for an object from the local search cache only; it
// performs no live network lookup. The channel is already closed on return.
func (mod *Module) FindObject(ctx *astral.Context, id *astral.ObjectID) (<-chan *astral.Identity, error) {
out := make(chan *astral.Identity, 1)
defer close(out)
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/object_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/cryptopunkscc/astrald/mod/utp"
)

// ReceiveObject dispatches an inbound object by type, accepting observed-endpoint
// messages and reacting to link events with connectivity upgrades and endpoint
// refreshes. Unhandled types are ignored without accepting the drop.
func (mod *Module) ReceiveObject(drop objects.Drop) error {
switch object := drop.Object().(type) {
case *nodes.ObservedEndpointMessage:
Expand Down
2 changes: 2 additions & 0 deletions mod/nodes/src/op_add_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type opAddEndpointArgs struct {
Out string `query:"optional"`
}

// OpAddEndpoint parses "network:address" and registers it for the identity with a
// fixed ~90-day TTL.
func (mod *Module) OpAddEndpoint(_ *astral.Context, q *routing.IncomingQuery, args opAddEndpointArgs) (err error) {
chunks := strings.SplitN(args.Endpoint, ":", 2)
if len(chunks) != 2 {
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/op_migrate_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type opMigrateSessionArgs struct {
Out string `query:"optional"`
}

// OpMigrateSession moves an open session onto another link. With Start set it drives
// the migration directly; otherwise it acts as the responder, exchanging ready/switched/
// resume/done signals with the initiator. Validates session and link state before starting.
func (mod *Module) OpMigrateSession(ctx *astral.Context, q *routing.IncomingQuery, args opMigrateSessionArgs) error {
ch := channel.New(q.AcceptRaw(), channel.WithOutputFormat(args.Out))
defer ch.Close()
Expand Down
3 changes: 3 additions & 0 deletions mod/nodes/src/op_new_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type opNewLinkArgs struct {
Out string
}

// OpNewLink establishes a link to the target, either to a specific endpoint or via the
// given (or all) strategies. The link is built by a scheduled task; the query is accepted
// before it completes so creation may outlast the query timeout. Failures map to reject codes.
func (mod *Module) OpNewLink(ctx *astral.Context, q *routing.IncomingQuery, args opNewLinkArgs) (err error) {
target, err := mod.Dir.ResolveIdentity(args.Target)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions mod/nodes/src/output_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func NewOutputBuffer(write func([]byte) error) *OutputBuffer {
return &OutputBuffer{write: write}
}

// Write never blocks: with no available space it returns *ErrBufferEmpty whose channel
// closes once Grow adds space. It may consume only part of p, bounded by available space.
func (b *OutputBuffer) Write(p []byte) (int, error) {
b.mu.Lock()
if b.closed {
Expand Down
Loading