Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/inference/models/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,27 @@ func (m *Manager) handleGetModel(w http.ResponseWriter, r *http.Request) {
}
}

// ResolveModelID resolves a model reference to a model ID. If resolution fails, it returns the original ref.
func (m *Manager) ResolveModelID(modelRef string) string {
// Sanitize modelRef to prevent log forgery
sanitizedModelRef := strings.ReplaceAll(modelRef, "\n", "")
sanitizedModelRef = strings.ReplaceAll(sanitizedModelRef, "\r", "")

model, err := m.GetModel(sanitizedModelRef)
if err != nil {
m.log.Warnf("Failed to resolve model ref %s to ID: %v", sanitizedModelRef, err)
return sanitizedModelRef
}

modelID, err := model.ID()
if err != nil {
m.log.Warnf("Failed to get model ID for ref %s: %v", sanitizedModelRef, err)
return sanitizedModelRef
}

return modelID
}

func getLocalModel(m *Manager, name string) (*Model, error) {
if m.distributionClient == nil {
return nil, errors.New("model distribution service unavailable")
Expand Down
119 changes: 64 additions & 55 deletions pkg/inference/scheduling/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,20 @@
type runnerKey struct {
// backend is the backend associated with the runner.
backend string
// model is the model associated with the runner.
model string
// modelID is the ID (digest) of the model associated with the runner.
modelID string
// mode is the operation mode associated with the runner.
mode inference.BackendMode
}

// runnerInfo holds information about a runner including its slot and the original model reference used to load it.
type runnerInfo struct {
// slot is the slot index where the runner is stored.
slot int
// modelRef is the original model reference (tag) used to load the runner.
modelRef string
}

// loader manages the loading and unloading of backend runners. It regulates
// active backends in a manner that avoids exhausting system resources. Loaders
// assume that all of their backends have been installed, so no load requests
Expand Down Expand Up @@ -80,7 +88,7 @@
// polling. Each signaling channel should be buffered (with size 1).
waiters map[chan<- struct{}]bool
// runners maps runner keys to their slot index.
runners map[runnerKey]int
runners map[runnerKey]runnerInfo
// slots maps slot indices to associated runners. A slot is considered free
// if the runner value in it is nil.
slots []*runner
Expand Down Expand Up @@ -151,7 +159,7 @@
guard: make(chan struct{}, 1),
availableMemory: totalMemory,
waiters: make(map[chan<- struct{}]bool),
runners: make(map[runnerKey]int, nSlots),
runners: make(map[runnerKey]runnerInfo, nSlots),
slots: make([]*runner, nSlots),
references: make([]uint, nSlots),
allocations: make([]uint64, nSlots),
Expand Down Expand Up @@ -196,24 +204,24 @@
// lock. It returns the number of remaining runners.
func (l *loader) evict(idleOnly bool) int {
now := time.Now()
for r, slot := range l.runners {
unused := l.references[slot] == 0
idle := unused && now.Sub(l.timestamps[slot]) > l.runnerIdleTimeout
for r, runnerInfo := range l.runners {
unused := l.references[runnerInfo.slot] == 0
idle := unused && now.Sub(l.timestamps[runnerInfo.slot]) > l.runnerIdleTimeout
defunct := false
select {
case <-l.slots[slot].done:
case <-l.slots[runnerInfo.slot].done:
defunct = true
default:
}
if unused && (!idleOnly || idle || defunct) {
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
r.backend, r.model, r.mode,
l.log.Infof("Evicting %s backend runner with model %s (%s) in %s mode",
r.backend, r.modelID, runnerInfo.modelRef, r.mode,
Comment thread Dismissed
)
l.slots[slot].terminate()
l.slots[slot] = nil
l.availableMemory += l.allocations[slot]
l.allocations[slot] = 0
l.timestamps[slot] = time.Time{}
l.slots[runnerInfo.slot].terminate()
l.slots[runnerInfo.slot] = nil
l.availableMemory += l.allocations[runnerInfo.slot]
l.allocations[runnerInfo.slot] = 0
l.timestamps[runnerInfo.slot] = time.Time{}
delete(l.runners, r)
}
}
Expand All @@ -224,17 +232,17 @@
// It returns the number of remaining runners.
func (l *loader) evictRunner(backend, model string, mode inference.BackendMode) int {
allBackends := backend == ""
for r, slot := range l.runners {
unused := l.references[slot] == 0
if unused && (allBackends || r.backend == backend) && r.model == model && r.mode == mode {
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
r.backend, r.model, r.mode,
for r, runnerInfo := range l.runners {
unused := l.references[runnerInfo.slot] == 0
if unused && (allBackends || r.backend == backend) && r.modelID == model && r.mode == mode {
l.log.Infof("Evicting %s backend runner with model %s (%s) in %s mode",
r.backend, r.modelID, runnerInfo.modelRef, r.mode,
Comment thread Dismissed
)
l.slots[slot].terminate()
l.slots[slot] = nil
l.availableMemory += l.allocations[slot]
l.allocations[slot] = 0
l.timestamps[slot] = time.Time{}
l.slots[runnerInfo.slot].terminate()
l.slots[runnerInfo.slot] = nil
l.availableMemory += l.allocations[runnerInfo.slot]
l.allocations[runnerInfo.slot] = 0
l.timestamps[runnerInfo.slot] = time.Time{}
delete(l.runners, r)
}
}
Expand All @@ -254,11 +262,12 @@
return l.evict(false)
} else {
for _, model := range unload.Models {
modelID := l.modelManager.ResolveModelID(model)
delete(l.runnerConfigs, runnerKey{unload.Backend, model, inference.BackendModeCompletion})
// Evict both, completion and embedding models. We should consider
// accepting a mode parameter in unload requests.
l.evictRunner(unload.Backend, model, inference.BackendModeCompletion)
l.evictRunner(unload.Backend, model, inference.BackendModeEmbedding)
l.evictRunner(unload.Backend, modelID, inference.BackendModeCompletion)
l.evictRunner(unload.Backend, modelID, inference.BackendModeEmbedding)
}
return len(l.runners)
}
Expand All @@ -282,15 +291,15 @@
func (l *loader) idleCheckDuration() time.Duration {
// Compute the oldest usage time for any idle runner.
var oldest time.Time
for _, slot := range l.runners {
for _, runnerInfo := range l.runners {
select {
case <-l.slots[slot].done:
case <-l.slots[runnerInfo.slot].done:
// Check immediately if a runner is defunct
return 0
default:
}
if l.references[slot] == 0 {
timestamp := l.timestamps[slot]
if l.references[runnerInfo.slot] == 0 {
timestamp := l.timestamps[runnerInfo.slot]
if oldest.IsZero() || timestamp.Before(oldest) {
oldest = timestamp
}
Expand Down Expand Up @@ -378,10 +387,10 @@
}
}

// load allocates a runner using the specified backend and model. If allocated,
// load allocates a runner using the specified backend and modelID. If allocated,
// it should be released by the caller using the release mechanism (once the
// runner is no longer needed).
func (l *loader) load(ctx context.Context, backendName, model string, mode inference.BackendMode) (*runner, error) {
func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string, mode inference.BackendMode) (*runner, error) {
// Grab the backend.
backend, ok := l.backends[backendName]
if !ok {
Expand Down Expand Up @@ -426,20 +435,20 @@
}

// See if we can satisfy the request with an existing runner.
existing, ok := l.runners[runnerKey{backendName, model, mode}]
existing, ok := l.runners[runnerKey{backendName, modelID, mode}]
if ok {
select {
case <-l.slots[existing].done:
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, model)
if l.references[existing] == 0 {
l.evictRunner(backendName, model, mode)
case <-l.slots[existing.slot].done:
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, existing.modelRef)
if l.references[existing.slot] == 0 {
l.evictRunner(backendName, modelID, mode)
} else {
goto WaitForChange
}
default:
l.references[existing] += 1
l.timestamps[existing] = time.Time{}
return l.slots[existing], nil
l.references[existing.slot] += 1
l.timestamps[existing.slot] = time.Time{}
return l.slots[existing.slot], nil
}
}

Expand All @@ -462,15 +471,15 @@
// If we've identified a slot, then we're ready to start a runner.
if slot >= 0 {
var runnerConfig *inference.BackendConfiguration
if rc, ok := l.runnerConfigs[runnerKey{backendName, model, mode}]; ok {
if rc, ok := l.runnerConfigs[runnerKey{backendName, modelID, mode}]; ok {
runnerConfig = &rc
}
// Create the runner.
l.log.Infof("Loading %s backend runner with model %s in %s mode", backendName, model, mode)
runner, err := run(l.log, backend, model, mode, slot, runnerConfig, l.openAIRecorder)
l.log.Infof("Loading %s backend runner with model %s in %s mode", backendName, modelID, mode)
runner, err := run(l.log, backend, modelID, mode, slot, runnerConfig, l.openAIRecorder)
if err != nil {
l.log.Warnf("Unable to start %s backend runner with model %s in %s mode: %v",
backendName, model, mode, err,
backendName, modelID, mode, err,
)
return nil, fmt.Errorf("unable to start runner: %w", err)
}
Expand All @@ -484,14 +493,14 @@
if err := runner.wait(ctx); err != nil {
runner.terminate()
l.log.Warnf("Initialization for %s backend runner with model %s in %s mode failed: %v",
backendName, model, mode, err,
backendName, modelID, mode, err,
)
return nil, fmt.Errorf("error waiting for runner to be ready: %w", err)
}

// Perform registration and return the runner.
l.availableMemory -= memory
l.runners[runnerKey{backendName, model, mode}] = slot
l.runners[runnerKey{backendName, modelID, mode}] = runnerInfo{slot, modelRef}
l.slots[slot] = runner
l.references[slot] = 1
l.allocations[slot] = memory
Expand Down Expand Up @@ -523,17 +532,17 @@
slot := l.runners[runnerKey{runner.backend.Name(), runner.model, runner.mode}]

// Decrement the runner's reference count.
l.references[slot] -= 1
l.references[slot.slot] -= 1

// If the runner's reference count is now zero, then check if it is still
// active, and record now as its idle start time and signal the idle
// checker.
if l.references[slot] == 0 {
if l.references[slot.slot] == 0 {
select {
case <-runner.done:
l.evictRunner(runner.backend.Name(), runner.model, runner.mode)
default:
l.timestamps[slot] = time.Now()
l.timestamps[slot.slot] = time.Now()
select {
case l.idleCheck <- struct{}{}:
default:
Expand All @@ -545,22 +554,22 @@
l.broadcast()
}

func (l *loader) setRunnerConfig(ctx context.Context, backendName, model string, mode inference.BackendMode, runnerConfig inference.BackendConfiguration) error {
func (l *loader) setRunnerConfig(ctx context.Context, backendName, modelID string, mode inference.BackendMode, runnerConfig inference.BackendConfiguration) error {
l.lock(ctx)
defer l.unlock()

runnerId := runnerKey{backendName, model, mode}
runnerId := runnerKey{backendName, modelID, mode}

// If the configuration hasn't changed, then just return.
if existingConfig, ok := l.runnerConfigs[runnerId]; ok && reflect.DeepEqual(runnerConfig, existingConfig) {
l.log.Infof("Configuration for %s runner for model %s unchanged", backendName, model)
l.log.Infof("Configuration for %s runner for modelID %s unchanged", backendName, modelID)
return nil
}

// If there's an active runner whose configuration we want to override, then
// try evicting it (because it may not be in use).
if _, ok := l.runners[runnerId]; ok {
l.evictRunner(backendName, model, mode)
l.evictRunner(backendName, modelID, mode)
}

// If there's still then active runner, then we can't (or at least
Expand All @@ -569,7 +578,7 @@
return errRunnerAlreadyActive
}

l.log.Infof("Configuring %s runner for %s", backendName, model)
l.log.Infof("Configuring %s runner for %s", backendName, modelID)
l.runnerConfigs[runnerId] = runnerConfig
return nil
}
36 changes: 19 additions & 17 deletions pkg/inference/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
allowedOrigins []string,
tracker *metrics.Tracker,
) *Scheduler {
openAIRecorder := metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder"))
openAIRecorder := metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder"), modelManager)

// Create the scheduler.
s := &Scheduler{
Expand Down Expand Up @@ -238,8 +238,10 @@
s.tracker.TrackModel(model, r.UserAgent())
}

modelID := s.modelManager.ResolveModelID(request.Model)

// Request a runner to execute the request and defer its release.
runner, err := s.loader.load(r.Context(), backend.Name(), request.Model, backendMode)
runner, err := s.loader.load(r.Context(), backend.Name(), modelID, request.Model, backendMode)
if err != nil {
http.Error(w, fmt.Errorf("unable to load runner: %w", err).Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -295,17 +297,17 @@

result := make([]BackendStatus, 0, len(s.loader.runners))

for key, slot := range s.loader.runners {
if s.loader.slots[slot] != nil {
for key, runnerInfo := range s.loader.runners {
if s.loader.slots[runnerInfo.slot] != nil {
status := BackendStatus{
BackendName: key.backend,
ModelName: key.model,
ModelName: runnerInfo.modelRef,
Mode: key.mode.String(),
LastUsed: time.Time{},
}

if s.loader.references[slot] == 0 {
status.LastUsed = s.loader.timestamps[slot]
if s.loader.references[runnerInfo.slot] == 0 {
status.LastUsed = s.loader.timestamps[runnerInfo.slot]
}

result = append(result, status)
Expand Down Expand Up @@ -414,9 +416,9 @@
// Configure is called by compose for each model.
s.tracker.TrackModel(model, r.UserAgent())
}

if err := s.loader.setRunnerConfig(r.Context(), backend.Name(), configureRequest.Model, inference.BackendModeCompletion, runnerConfig); err != nil {
s.log.Warnf("Failed to configure %s runner for %s: %s", backend.Name(), configureRequest.Model, err)
modelID := s.modelManager.ResolveModelID(configureRequest.Model)
if err := s.loader.setRunnerConfig(r.Context(), backend.Name(), modelID, inference.BackendModeCompletion, runnerConfig); err != nil {
s.log.Warnf("Failed to configure %s runner for %s (%s): %s", backend.Name(), configureRequest.Model, modelID, err)
Comment thread Dismissed
if errors.Is(err, errRunnerAlreadyActive) {
http.Error(w, err.Error(), http.StatusConflict)
} else {
Expand All @@ -442,14 +444,14 @@
// Find the runner slot for this backend/model combination
key := runnerKey{
backend: backend.BackendName,
model: backend.ModelName,
modelID: backend.ModelName,
mode: parseBackendMode(backend.Mode),
}

if slot, exists := s.loader.runners[key]; exists {
socket, err := RunnerSocketPath(slot)
if runnerInfo, exists := s.loader.runners[key]; exists {
socket, err := RunnerSocketPath(runnerInfo.slot)
if err != nil {
s.log.Warnf("Failed to get socket path for runner %s/%s: %v", backend.BackendName, backend.ModelName, err)
s.log.Warnf("Failed to get socket path for runner %s/%s (%s): %v", backend.BackendName, backend.ModelName, key.modelID, err)
continue
}

Expand Down Expand Up @@ -480,13 +482,13 @@
// Find the runner slot for this backend/model combination
key := runnerKey{
backend: backend.BackendName,
model: backend.ModelName,
modelID: backend.ModelName,
mode: parseBackendMode(backend.Mode),
}

if slot, exists := s.loader.runners[key]; exists {
if runnerInfo, exists := s.loader.runners[key]; exists {
// Use the RunnerSocketPath function to get the socket path
return RunnerSocketPath(slot)
return RunnerSocketPath(runnerInfo.slot)
}
}
}
Expand Down
Loading