Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/services/nodes/managers_distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip — they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
Expand Down
37 changes: 37 additions & 0 deletions core/services/nodes/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/mudler/LocalAI/core/services/advisorylock"
grpcclient "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/xlog"
"github.com/nats-io/nats.go"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -206,12 +208,47 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
}
continue
}

// ErrNoResponders means the node has no active NATS subscription for
// this subject. Either its connection dropped, or it's the wrong
// node type entirely. Mark unhealthy so the health monitor's
// heartbeat-only pass doesn't immediately flip it back — and so
// ListDuePendingBackendOps (which filters by status=healthy) stops
// picking the row until the node genuinely recovers.
if errors.Is(applyErr, nats.ErrNoResponders) {
xlog.Warn("Reconciler: no NATS responders — marking node unhealthy",
"op", op.Op, "backend", op.Backend, "node", op.NodeID)
_ = rc.registry.MarkUnhealthy(ctx, op.NodeID)
}

// Dead-letter cap: after maxAttempts the row is the reconciler
// equivalent of a poison message. Delete it loudly so the queue
// doesn't churn NATS every tick forever — operators can re-issue
// the op from the UI if they still want it applied.
if op.Attempts+1 >= maxPendingBackendOpAttempts {
xlog.Error("Reconciler: abandoning pending backend op after max attempts",
"op", op.Op, "backend", op.Backend, "node", op.NodeID,
"attempts", op.Attempts+1, "last_error", applyErr)
if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil {
xlog.Warn("Reconciler: failed to delete abandoned op row", "id", op.ID, "error", err)
}
continue
}

_ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error())
xlog.Warn("Reconciler: pending backend op retry failed",
"op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr)
}
}

// maxPendingBackendOpAttempts caps how many times the reconciler retries a
// failing row before dead-lettering it. Ten attempts at exponential backoff
// (30s → 15m cap) is >1h of wall-clock patience — well past any transient
// worker restart or network blip. Poisoned rows beyond that are almost
// certainly structural (wrong node type, non-existent gallery entry) and no
// amount of further retrying will help.
const maxPendingBackendOpAttempts = 10

// probeLoadedModels gRPC-health-checks model addresses that the DB says are
// loaded. If a model's backend process is gone (OOM, crash, manual restart)
// we remove the row so ghosts don't linger. Only probes rows older than
Expand Down
26 changes: 26 additions & 0 deletions core/services/nodes/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,30 @@ var _ = Describe("ReplicaReconciler — state reconciliation", func() {
Expect(row.NextRetryAt).To(BeTemporally(">", before))
})
})

Describe("NewNodeRegistry malformed-row pruning", func() {
It("drops queue rows for agent nodes and non-existent nodes on startup", func() {
agent := &BackendNode{Name: "agent-1", NodeType: NodeTypeAgent, Address: "x"}
Expect(registry.Register(context.Background(), agent, true)).To(Succeed())
backend := &BackendNode{Name: "backend-1", NodeType: NodeTypeBackend, Address: "y"}
Expect(registry.Register(context.Background(), backend, true)).To(Succeed())

// Three rows: one for a valid backend node (should survive),
// one for an agent node (pruned), one for an empty backend name
// on the valid node (pruned).
Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "foo", OpBackendInstall, nil)).To(Succeed())
Expect(registry.UpsertPendingBackendOp(context.Background(), agent.ID, "foo", OpBackendInstall, nil)).To(Succeed())
Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "", OpBackendInstall, nil)).To(Succeed())

// Re-instantiating the registry runs the cleanup migration.
_, err := NewNodeRegistry(db)
Expect(err).ToNot(HaveOccurred())

var rows []PendingBackendOp
Expect(db.Find(&rows).Error).To(Succeed())
Expect(rows).To(HaveLen(1))
Expect(rows[0].NodeID).To(Equal(backend.ID))
Expect(rows[0].Backend).To(Equal("foo"))
})
})
})
24 changes: 24 additions & 0 deletions core/services/nodes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) {
}); err != nil {
return nil, fmt.Errorf("migrating node tables: %w", err)
}

// One-shot cleanup of queue rows that can never drain: ops targeted at
// agent workers (wrong subscription set), at non-existent nodes, or with
// an empty backend name. The guard in enqueueAndDrainBackendOp prevents
// new ones from being written, but rows persisted by earlier versions
// keep the reconciler busy retrying a permanently-failing NATS request
// every 30s. Guarded by the same migration advisory lock so only one
// frontend runs it.
_ = advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error {
res := db.Exec(`
DELETE FROM pending_backend_ops
WHERE backend = ''
OR node_id NOT IN (SELECT id FROM backend_nodes WHERE node_type = ? OR node_type = '')
`, NodeTypeBackend)
if res.Error != nil {
xlog.Warn("Failed to prune malformed pending_backend_ops rows", "error", res.Error)
return res.Error
}
if res.RowsAffected > 0 {
xlog.Info("Pruned pending_backend_ops rows (wrong node type or empty backend)", "count", res.RowsAffected)
}
return nil
})

return &NodeRegistry{db: db}, nil
}

Expand Down
Loading