Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions scheduler/pkg/server/pipeline_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,32 @@ func (s *SchedulerServer) sendPipelineEvents(event *coordinator.PipelineEventMsg
return
}

// if deletion process was triggered, we remove the pipeline from envoy
// When the old version is being terminated during a pipeline update, guard both the
// envoy route removal and the pipeline-gw delete message with an IsLatestVersion check.
// The pipeline-gw stores pipelines by name (not version), so sending a delete for the
// old version would call DeletePipeline(name), wiping the new version's Kafka consumer.
// See: https://github.com/SeldonIO/seldon-core/issues/7072
//
// Note: the envoy update (sendPipelineStreamsEventMsg) and the pipeline-gw messaging
// (sendPipelineEventsToStreamWithTimestamp) are separate subsystems. The envoy guard is
// applied here, before the switch, because sendPipelineStreamsEventMsg is not gated on
// pipeline-gw stream availability. The pipeline-gw delete guard is applied inside the
// switch case below, where the delete message is actually constructed and sent.
isLatestVersion := true
if pv.State.PipelineGwStatus == pipeline.PipelineTerminate {
s.sendPipelineStreamsEventMsg(
&coordinator.PipelineEventMsg{PipelineName: pv.Name}, []string{},
)
var err error
isLatestVersion, err = s.pipelineHandler.IsLatestVersion(pv.Name, pv.Version, pv.UID)
if err != nil {
logger.WithError(err).Warnf("Failed to check if pipeline %s version %d is latest, proceeding with full termination", pv.Name, pv.Version)
isLatestVersion = true
}
if isLatestVersion {
s.sendPipelineStreamsEventMsg(
&coordinator.PipelineEventMsg{PipelineName: pv.Name}, []string{},
)
} else {
logger.Debugf("Pipeline %s version %d is not the latest, skipping envoy route removal to avoid disrupting the newer active version", pv.Name, pv.Version)
}
}

if len(pipelineGwStreams) == 0 && pv.State.PipelineGwStatus != pipeline.PipelineTerminated {
Expand Down Expand Up @@ -531,6 +552,25 @@ func (s *SchedulerServer) sendPipelineEvents(event *coordinator.PipelineEventMsg
status = s.createPipelineCreationMessage(pv)
s.sendPipelineEventsToStreamWithTimestamp(event, status, pipelineGwStreams)
case pipeline.PipelineTerminate:
if !isLatestVersion {
// A newer version is already handling traffic. The pipeline-gw loads pipelines
// by name, so sending a delete would call DeletePipeline(name) which uses the
// same key as the new version — wiping its Kafka consumer. Skip the delete and
// mark the old version as terminated directly.
// See: https://github.com/SeldonIO/seldon-core/issues/7072
logger.Debugf("Pipeline %s version %d is not the latest, skipping pipeline-gw delete to preserve newer version's Kafka consumer", pv.Name, pv.Version)
if err := s.pipelineHandler.SetPipelineGwPipelineState(
pv.Name,
pv.Version,
pv.UID,
pipeline.PipelineTerminated,
"terminated as older version during pipeline update",
util.SourcePipelineStatusEvent,
); err != nil {
logger.WithError(err).Errorf("Failed to set pipeline gw state to terminated for %s", pv.String())
}
return
}
logger.Debug("Pipeline is being terminated, sending deletion message")
if err := s.pipelineHandler.SetPipelineGwPipelineState(
pv.Name,
Expand Down
263 changes: 263 additions & 0 deletions scheduler/pkg/server/pipeline_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package server
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1046,3 +1047,265 @@ func TestPipelineGwRebalance(t *testing.T) {
})
}
}

// TestPipelineUpdateDoesNotClearEnvoyRoutes reproduces the race condition described in
// https://github.com/SeldonIO/seldon-core/issues/7072.
//
// When a pipeline is updated (new version created), the scheduler correctly marks the old
// version for termination. However, the old version's PipelineTerminate event was
// unconditionally clearing envoy routes — even though the new version was already healthy
// and serving traffic. This caused a window where the pipeline returned "no healthy upstream".
func TestPipelineUpdateDoesNotClearEnvoyRoutes(t *testing.T) {
g := NewGomegaWithT(t)
ctx := context.Background()

s, hub := createTestScheduler(t)

// Subscribe to envoy route update events so we can assert what gets published.
var streamEvents []coordinator.PipelineStreamsEventMsg
var mu sync.Mutex
hub.RegisterPipelineStreamsEventHandler(
"test-envoy-capture", 100, log.New(),
func(event coordinator.PipelineStreamsEventMsg) {
mu.Lock()
streamEvents = append(streamEvents, event)
mu.Unlock()
},
)

// Add an operator (non-gateway) stream so sendPipelineEvents has somewhere to write status.
operatorStream := newStubPipelineStatusServer(100, 5*time.Millisecond, ctx)
s.pipelineEventStream.mu.Lock()
s.pipelineEventStream.streams[operatorStream] = &PipelineSubscription{
name: "operator",
stream: operatorStream,
fin: make(chan bool),
}
s.pipelineEventStream.mu.Unlock()

// Add pipeline version 1.
err := s.pipelineHandler.AddPipeline(&pb.Pipeline{
Name: "test-pipeline",
Uid: "uid-v1",
Steps: []*pb.PipelineStep{{Name: "step1"}},
})
g.Expect(err).To(BeNil())

// Add pipeline version 2 (simulates a spec update — same name, new UID/steps).
err = s.pipelineHandler.AddPipeline(&pb.Pipeline{
Name: "test-pipeline",
Uid: "uid-v2",
Steps: []*pb.PipelineStep{
{Name: "step1"},
{Name: "step2", Inputs: []string{"step1.outputs"}},
},
})
g.Expect(err).To(BeNil())

// Let initial AddPipeline events propagate and settle.
time.Sleep(500 * time.Millisecond)

// Confirm v2 is the latest version.
isLatest, err := s.pipelineHandler.IsLatestVersion("test-pipeline", 2, "uid-v2")
g.Expect(err).To(BeNil())
g.Expect(isLatest).To(BeTrue())

// Reset captured events — we only care about what happens from this point on.
mu.Lock()
streamEvents = nil
mu.Unlock()

// Simulate the scheduler marking v1's pipeline-gw state as PipelineTerminate.
// In production this is done by terminatePipelineGwOldUnterminatedPipelinesIfNeeded
// (triggered when v2 becomes ready), which sets the in-memory state and then an
// event is subsequently published that causes sendPipelineEvents to run for v1.
// We replicate that by calling SetPipelineGwPipelineState directly, which both
// sets the state and publishes an event that sendPipelineEvents will process.
err = s.pipelineHandler.SetPipelineGwPipelineState(
"test-pipeline", 1, "uid-v1", pipeline.PipelineTerminate, "", "simulate-v2-ready",
)
g.Expect(err).To(BeNil())

// Allow the event to propagate through the event hub.
time.Sleep(500 * time.Millisecond)

// Assert: no envoy update with an empty stream list was published for the pipeline.
// An empty stream list would remove the pipeline from envoy routing, causing
// "no healthy upstream" for a pipeline whose new version is already healthy.
mu.Lock()
defer mu.Unlock()
for _, event := range streamEvents {
if event.PipelineName == "test-pipeline" {
g.Expect(event.StreamNames).ToNot(
BeEmpty(),
"envoy routes for test-pipeline were incorrectly cleared while version 2 is active "+
"(see https://github.com/SeldonIO/seldon-core/issues/7072)",
)
}
}
}

// TestPipelineUpdateOldVersionDeleteNotSentToGateway ensures that when a pipeline is updated
// and the old version is terminated, the delete message is NOT forwarded to the pipeline
// gateway. The pipeline-gw loads pipelines by name, so sending a delete for the old version
// calls DeletePipeline(name) in the pipeline-gw's KafkaManager — which uses the pipeline
// name as the lookup key, the same key held by the new version. This wipes the new version's
// Kafka consumer and breaks the pipeline even though envoy routes remain intact.
// See: https://github.com/SeldonIO/seldon-core/issues/7072
func TestPipelineUpdateOldVersionDeleteNotSentToGateway(t *testing.T) {
g := NewGomegaWithT(t)
ctx := context.Background()

s, _ := createTestScheduler(t)

// Register an operator (non-gateway) stream.
operatorStream := newStubPipelineStatusServer(100, 5*time.Millisecond, ctx)
s.pipelineEventStream.mu.Lock()
s.pipelineEventStream.streams[operatorStream] = &PipelineSubscription{
name: "operator",
stream: operatorStream,
fin: make(chan bool),
}
s.pipelineEventStream.mu.Unlock()

// Register a pipeline gateway stream.
pipelineGwStream := newStubPipelineStatusServer(20, 5*time.Millisecond, ctx)
s.pipelineEventStream.mu.Lock()
s.pipelineEventStream.streams[pipelineGwStream] = &PipelineSubscription{
name: "pipelinegw-1",
stream: pipelineGwStream,
isPipelineGateway: true,
fin: make(chan bool),
}
s.pipelineEventStream.namesToIps["pipelinegw-1"] = "10.0.0.1"
s.pipelineEventStream.mu.Unlock()
s.pipelineGWLoadBalancer.AddServer("pipelinegw-1")

// Add pipeline version 1.
err := s.pipelineHandler.AddPipeline(&pb.Pipeline{
Name: "test-pipeline",
Uid: "uid-v1",
Steps: []*pb.PipelineStep{{Name: "step1"}},
})
g.Expect(err).To(BeNil())

// Add pipeline version 2 (simulates a spec update — same name, new UID/steps).
err = s.pipelineHandler.AddPipeline(&pb.Pipeline{
Name: "test-pipeline",
Uid: "uid-v2",
Steps: []*pb.PipelineStep{
{Name: "step1"},
{Name: "step2", Inputs: []string{"step1.outputs"}},
},
})
g.Expect(err).To(BeNil())

// Let initial events settle, then drain all messages from the gateway stream so
// we only assert on what happens after we trigger v1's termination.
time.Sleep(500 * time.Millisecond)
for len(pipelineGwStream.msgs) > 0 {
<-pipelineGwStream.msgs
}

// Simulate the scheduler marking v1's pipeline-gw state as PipelineTerminate.
// In production this is triggered when v2 becomes ready.
err = s.pipelineHandler.SetPipelineGwPipelineState(
"test-pipeline", 1, "uid-v1", pipeline.PipelineTerminate, "", "simulate-v2-ready",
)
g.Expect(err).To(BeNil())

// Allow the event to propagate through the event hub.
time.Sleep(500 * time.Millisecond)

// Assert: no PipelineDelete operation was sent to the pipeline gateway for the old version.
// A delete message would cause the pipeline-gw to call DeletePipeline(name), which removes
// the entry for "test-pipeline.pipeline" from its KafkaManager — the same entry now owned
// by v2. This breaks v2's inference path despite the envoy route remaining intact.
for len(pipelineGwStream.msgs) > 0 {
msg := <-pipelineGwStream.msgs
g.Expect(msg.Operation).ToNot(
Equal(pb.PipelineStatusResponse_PipelineDelete),
"pipeline gateway received a delete message for the old pipeline version — "+
"this would wipe the new version's Kafka consumer "+
"(see https://github.com/SeldonIO/seldon-core/issues/7072)",
)
}
}

// TestPipelineDeleteLatestVersionSentToGateway verifies that the IsLatestVersion guard
// introduced for issue #7072 does NOT block delete messages for normal pipeline deletion
// (i.e. when the pipeline being terminated is itself the latest version). This ensures the
// guard only suppresses deletes for old versions during pipeline updates, and does not
// interfere with the normal pipeline deletion flow.
func TestPipelineDeleteLatestVersionSentToGateway(t *testing.T) {
g := NewGomegaWithT(t)
ctx := context.Background()

s, _ := createTestScheduler(t)

// Register an operator (non-gateway) stream.
operatorStream := newStubPipelineStatusServer(100, 5*time.Millisecond, ctx)
s.pipelineEventStream.mu.Lock()
s.pipelineEventStream.streams[operatorStream] = &PipelineSubscription{
name: "operator",
stream: operatorStream,
fin: make(chan bool),
}
s.pipelineEventStream.mu.Unlock()

// Register a pipeline gateway stream.
pipelineGwStream := newStubPipelineStatusServer(20, 5*time.Millisecond, ctx)
s.pipelineEventStream.mu.Lock()
s.pipelineEventStream.streams[pipelineGwStream] = &PipelineSubscription{
name: "pipelinegw-1",
stream: pipelineGwStream,
isPipelineGateway: true,
fin: make(chan bool),
}
s.pipelineEventStream.namesToIps["pipelinegw-1"] = "10.0.0.1"
s.pipelineEventStream.mu.Unlock()
s.pipelineGWLoadBalancer.AddServer("pipelinegw-1")

// Add only a single pipeline version (no update — this is the only and latest version).
err := s.pipelineHandler.AddPipeline(&pb.Pipeline{
Name: "test-pipeline",
Uid: "uid-v1",
Steps: []*pb.PipelineStep{{Name: "step1"}},
})
g.Expect(err).To(BeNil())

// Confirm it is the latest (and only) version.
isLatest, err := s.pipelineHandler.IsLatestVersion("test-pipeline", 1, "uid-v1")
g.Expect(err).To(BeNil())
g.Expect(isLatest).To(BeTrue())

// Let initial creation events settle, then drain the gateway stream.
time.Sleep(500 * time.Millisecond)
for len(pipelineGwStream.msgs) > 0 {
<-pipelineGwStream.msgs
}

// Trigger termination of the latest (and only) version — this is the normal deletion path,
// not a pipeline update. The guard should NOT suppress the delete message here.
err = s.pipelineHandler.SetPipelineGwPipelineState(
"test-pipeline", 1, "uid-v1", pipeline.PipelineTerminate, "", "simulate-pipeline-delete",
)
g.Expect(err).To(BeNil())

// Allow the event to propagate through the event hub.
time.Sleep(500 * time.Millisecond)

// Assert: the gateway received a PipelineDelete message. Suppressing it here would
// leave the pipeline's Kafka consumer alive in the pipeline-gw even after deletion.
var gotDelete bool
for len(pipelineGwStream.msgs) > 0 {
msg := <-pipelineGwStream.msgs
if msg.Operation == pb.PipelineStatusResponse_PipelineDelete {
gotDelete = true
}
}
g.Expect(gotDelete).To(BeTrue(),
"pipeline gateway did not receive a delete message for the latest pipeline version — "+
"normal pipeline deletion requires the delete to be forwarded to the gateway",
)
}