Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions core/application/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut
DB: authDB,
})

// Create ReplicaReconciler for auto-scaling model replicas
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
// RegistrationToken feed the state-reconciliation passes: pending op
// drain uses the adapter, and model health probes use the token to auth
// against workers' gRPC HealthCheck.
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
Adapter: remoteUnloader,
RegistrationToken: cfg.Distributed.RegistrationToken,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
ProbeStaleAfter: 2 * time.Minute,
})

// Create ModelRouterAdapter to wire into ModelLoader
Expand Down
7 changes: 6 additions & 1 deletion core/application/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,12 @@ func New(opts ...config.AppOption) (*Application, error) {
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
// instance runs periodic checks (avoids duplicate upgrades across replicas).
if len(options.BackendGalleries) > 0 {
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB())
// Pass a lazy getter for the backend manager so the checker always
// uses the active one — DistributedBackendManager is swapped in above
// and asks workers for their installed backends, which is what
// upgrade detection needs in distributed mode.
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
application.upgradeChecker = uc
go uc.Run(options.Context)
}
Expand Down
53 changes: 39 additions & 14 deletions core/application/upgrade_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/advisorylock"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
Expand All @@ -26,6 +27,12 @@ type UpgradeChecker struct {
galleries []config.Gallery
systemState *system.SystemState
db *gorm.DB // non-nil in distributed mode
// backendManagerFn lazily returns the current backend manager (may be
// swapped from Local to Distributed after startup). Pulled through each
// check so the UpgradeChecker uses whichever is active. In distributed
// mode this ensures CheckUpgrades asks workers instead of the (empty)
// frontend filesystem — fixing the bug where upgrades never surfaced.
backendManagerFn func() galleryop.BackendManager

checkInterval time.Duration
stop chan struct{}
Expand All @@ -40,18 +47,22 @@ type UpgradeChecker struct {
// NewUpgradeChecker creates a new UpgradeChecker service.
// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode
// (uses advisory locks so only one instance runs periodic checks).
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker {
// backendManagerFn is optional; when set, CheckUpgrades is routed through
// the active backend manager — required in distributed mode so the check
// aggregates from workers rather than the empty frontend filesystem.
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker {
return &UpgradeChecker{
appConfig: appConfig,
modelLoader: ml,
galleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
db: db,
checkInterval: 6 * time.Hour,
stop: make(chan struct{}),
done: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
lastUpgrades: make(map[string]gallery.UpgradeInfo),
appConfig: appConfig,
modelLoader: ml,
galleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
db: db,
backendManagerFn: backendManagerFn,
checkInterval: 6 * time.Hour,
stop: make(chan struct{}),
done: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
lastUpgrades: make(map[string]gallery.UpgradeInfo),
}
}

Expand All @@ -64,13 +75,16 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade
func (uc *UpgradeChecker) Run(ctx context.Context) {
defer close(uc.done)

// Initial delay: don't slow down startup
// Initial delay: don't slow down startup. Short enough that operators
// don't stare at an empty upgrade banner for long; long enough that
// workers have registered and reported their installed backends.
initialDelay := 10 * time.Second
select {
case <-ctx.Done():
return
case <-uc.stop:
return
case <-time.After(30 * time.Second):
case <-time.After(initialDelay):
}

// First check always runs locally (to warm the cache on this instance)
Expand Down Expand Up @@ -144,7 +158,18 @@ func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo
}

func (uc *UpgradeChecker) runCheck(ctx context.Context) {
upgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
var (
upgrades map[string]gallery.UpgradeInfo
err error
)
if uc.backendManagerFn != nil {
if bm := uc.backendManagerFn(); bm != nil {
upgrades, err = bm.CheckUpgrades(ctx)
}
}
if upgrades == nil && err == nil {
upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
}

uc.mu.Lock()
uc.lastCheckTime = time.Now()
Expand Down
3 changes: 3 additions & 0 deletions core/cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ func (s *backendSupervisor) subscribeLifecycleEvents() {
if b.Metadata != nil {
info.InstalledAt = b.Metadata.InstalledAt
info.GalleryURL = b.Metadata.GalleryURL
info.Version = b.Metadata.Version
info.URI = b.Metadata.URI
info.Digest = b.Metadata.Digest
}
infos = append(infos, info)
}
Expand Down
17 changes: 17 additions & 0 deletions core/gallery/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,23 @@ type SystemBackend struct {
Metadata *BackendMetadata
UpgradeAvailable bool `json:"upgrade_available,omitempty"`
AvailableVersion string `json:"available_version,omitempty"`
// Nodes holds per-node attribution in distributed mode. Empty in single-node.
// Each entry describes a node that has this backend installed, with the
// version/digest it reports. Lets the UI surface drift and per-node status.
Nodes []NodeBackendRef `json:"nodes,omitempty"`
}

// NodeBackendRef describes one node's view of an installed backend. Used both
// for per-node attribution in the UI and for drift detection during upgrade
// checks (a cluster with mismatched versions/digests is flagged upgradeable).
type NodeBackendRef struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
NodeStatus string `json:"node_status"` // healthy | unhealthy | offline | draining | pending
Version string `json:"version,omitempty"`
Digest string `json:"digest,omitempty"`
URI string `json:"uri,omitempty"`
InstalledAt string `json:"installed_at,omitempty"`
}

type SystemBackends map[string]SystemBackend
Expand Down
132 changes: 108 additions & 24 deletions core/gallery/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,43 @@ type UpgradeInfo struct {
AvailableVersion string `json:"available_version"`
InstalledDigest string `json:"installed_digest,omitempty"`
AvailableDigest string `json:"available_digest,omitempty"`
// NodeDrift lists nodes whose installed version or digest differs from
// the cluster majority. Non-empty means the cluster has diverged and an
// upgrade will realign it. Empty in single-node mode.
NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"`
}

// CheckBackendUpgrades compares installed backends against gallery entries
// and returns a map of backend names to UpgradeInfo for those that have
// newer versions or different OCI digests available.
// NodeDriftInfo describes one node that disagrees with the cluster majority
// on which version/digest of a backend is installed.
type NodeDriftInfo struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Version string `json:"version,omitempty"`
Digest string `json:"digest,omitempty"`
}

// CheckBackendUpgrades is the single-node entrypoint. Distributed callers use
// CheckUpgradesAgainst directly with their aggregated SystemBackends.
func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState) (map[string]UpgradeInfo, error) {
galleryBackends, err := AvailableBackends(galleries, systemState)
installed, err := ListSystemBackends(systemState)
if err != nil {
return nil, fmt.Errorf("failed to list available backends: %w", err)
return nil, fmt.Errorf("failed to list installed backends: %w", err)
}
return CheckUpgradesAgainst(ctx, galleries, systemState, installed)
}

installedBackends, err := ListSystemBackends(systemState)
// CheckUpgradesAgainst compares a caller-supplied SystemBackends set against
// the gallery. Fixes the distributed-mode bug where the old code passed the
// frontend's (empty) local filesystem through ListSystemBackends and so never
// surfaced any upgrades.
//
// Cluster drift policy: if a backend's per-node versions/digests disagree, the
// row is flagged upgradeable regardless of whether any node matches the gallery
// — next Upgrade All realigns the cluster. NodeDrift lists the outliers.
func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) {
galleryBackends, err := AvailableBackends(galleries, systemState)
if err != nil {
return nil, fmt.Errorf("failed to list installed backends: %w", err)
return nil, fmt.Errorf("failed to list available backends: %w", err)
}

result := make(map[string]UpgradeInfo)
Expand All @@ -57,56 +80,117 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste
}

installedVersion := installed.Metadata.Version
installedDigest := installed.Metadata.Digest
galleryVersion := galleryEntry.Version

// If both sides have versions, compare them
// Detect cluster drift: does every node report the same version+digest?
// In single-node mode this stays empty (Nodes is nil).
majority, drift := summarizeNodeDrift(installed.Nodes)
if majority.version != "" {
installedVersion = majority.version
}
if majority.digest != "" {
installedDigest = majority.digest
}

makeInfo := func(info UpgradeInfo) UpgradeInfo {
info.NodeDrift = drift
return info
}

// If versions are available on both sides, they're the source of truth.
if galleryVersion != "" && installedVersion != "" {
if galleryVersion != installedVersion {
result[installed.Metadata.Name] = UpgradeInfo{
if galleryVersion != installedVersion || len(drift) > 0 {
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: installedVersion,
AvailableVersion: galleryVersion,
}
})
}
// Versions match — no upgrade needed
continue
}

// Gallery has a version but installed doesn't — this happens for backends
// installed before version tracking was added. Flag as upgradeable so
// users can re-install to pick up version metadata.
// Gallery has a version but installed doesn't — backends installed before
// version tracking was added. Flag as upgradeable to pick up metadata.
if galleryVersion != "" && installedVersion == "" {
result[installed.Metadata.Name] = UpgradeInfo{
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: "",
AvailableVersion: galleryVersion,
}
})
continue
}

// Fall back to OCI digest comparison when versions are unavailable
// Fall back to OCI digest comparison when versions are unavailable.
if downloader.URI(galleryEntry.URI).LooksLikeOCI() {
remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil)
if err != nil {
xlog.Warn("Failed to get remote OCI digest for upgrade check", "backend", installed.Metadata.Name, "error", err)
continue
}
// If we have a stored digest, compare; otherwise any remote digest
// means we can't confirm we're up to date — flag as upgradeable
if installed.Metadata.Digest == "" || remoteDigest != installed.Metadata.Digest {
result[installed.Metadata.Name] = UpgradeInfo{
// means we can't confirm we're up to date — flag as upgradeable.
if installedDigest == "" || remoteDigest != installedDigest || len(drift) > 0 {
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledDigest: installed.Metadata.Digest,
InstalledDigest: installedDigest,
AvailableDigest: remoteDigest,
}
})
}
} else if len(drift) > 0 {
// No version/digest path but nodes disagree — still worth flagging.
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: installedVersion,
InstalledDigest: installedDigest,
})
}
// No version info and non-OCI URI — cannot determine, skip
}

return result, nil
}

// summarizeNodeDrift collapses per-node version/digest tuples to a majority
// pair and returns the outliers. In single-node mode (empty nodes slice) this
// returns zero values and a nil drift list.
func summarizeNodeDrift(nodes []NodeBackendRef) (majority struct{ version, digest string }, drift []NodeDriftInfo) {
if len(nodes) == 0 {
return majority, nil
}

type key struct{ version, digest string }
counts := map[key]int{}
var topKey key
var topCount int
for _, n := range nodes {
k := key{n.Version, n.Digest}
counts[k]++
if counts[k] > topCount {
topCount = counts[k]
topKey = k
}
}

majority.version = topKey.version
majority.digest = topKey.digest

if len(counts) == 1 {
return majority, nil // unanimous — no drift
}
for _, n := range nodes {
if n.Version == majority.version && n.Digest == majority.digest {
continue
}
drift = append(drift, NodeDriftInfo{
NodeID: n.NodeID,
NodeName: n.NodeName,
Version: n.Version,
Digest: n.Digest,
})
}
return majority, drift
}

// UpgradeBackend upgrades a single backend to the latest gallery version using
// an atomic swap with backup-based rollback on failure.
func UpgradeBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, galleries []config.Gallery, backendName string, downloadStatus func(string, string, string, float64)) error {
Expand Down
Loading
Loading