diff --git a/scheduler/pkg/server/pipeline_status.go b/scheduler/pkg/server/pipeline_status.go index bc5b04bf20..bea5525332 100644 --- a/scheduler/pkg/server/pipeline_status.go +++ b/scheduler/pkg/server/pipeline_status.go @@ -482,11 +482,32 @@ func (s *SchedulerServer) sendPipelineEvents(event *coordinator.PipelineEventMsg return } - // if deletion process was triggered, we remove the pipeline from envoy + // When the old version is being terminated during a pipeline update, guard both the + // envoy route removal and the pipeline-gw delete message with an IsLatestVersion check. + // The pipeline-gw stores pipelines by name (not version), so sending a delete for the + // old version would call DeletePipeline(name), wiping the new version's Kafka consumer. + // See: https://github.com/SeldonIO/seldon-core/issues/7072 + // + // Note: the envoy update (sendPipelineStreamsEventMsg) and the pipeline-gw messaging + // (sendPipelineEventsToStreamWithTimestamp) are separate subsystems. The envoy guard is + // applied here, before the switch, because sendPipelineStreamsEventMsg is not gated on + // pipeline-gw stream availability. The pipeline-gw delete guard is applied inside the + // switch case below, where the delete message is actually constructed and sent. + isLatestVersion := true if pv.State.PipelineGwStatus == pipeline.PipelineTerminate { - s.sendPipelineStreamsEventMsg( - &coordinator.PipelineEventMsg{PipelineName: pv.Name}, []string{}, - ) + var err error + isLatestVersion, err = s.pipelineHandler.IsLatestVersion(pv.Name, pv.Version, pv.UID) + if err != nil { + logger.WithError(err).Warnf("Failed to check if pipeline %s version %d is latest, proceeding with full termination", pv.Name, pv.Version) + isLatestVersion = true + } + if isLatestVersion { + s.sendPipelineStreamsEventMsg( + &coordinator.PipelineEventMsg{PipelineName: pv.Name}, []string{}, + ) + } else { + logger.Debugf("Pipeline %s version %d is not the latest, skipping envoy route removal to avoid disrupting the newer active version", pv.Name, pv.Version) + } } if len(pipelineGwStreams) == 0 && pv.State.PipelineGwStatus != pipeline.PipelineTerminated { @@ -531,6 +552,25 @@ func (s *SchedulerServer) sendPipelineEvents(event *coordinator.PipelineEventMsg status = s.createPipelineCreationMessage(pv) s.sendPipelineEventsToStreamWithTimestamp(event, status, pipelineGwStreams) case pipeline.PipelineTerminate: + if !isLatestVersion { + // A newer version is already handling traffic. The pipeline-gw loads pipelines + // by name, so sending a delete would call DeletePipeline(name) which uses the + // same key as the new version — wiping its Kafka consumer. Skip the delete and + // mark the old version as terminated directly. + // See: https://github.com/SeldonIO/seldon-core/issues/7072 + logger.Debugf("Pipeline %s version %d is not the latest, skipping pipeline-gw delete to preserve newer version's Kafka consumer", pv.Name, pv.Version) + if err := s.pipelineHandler.SetPipelineGwPipelineState( + pv.Name, + pv.Version, + pv.UID, + pipeline.PipelineTerminated, + "terminated as older version during pipeline update", + util.SourcePipelineStatusEvent, + ); err != nil { + logger.WithError(err).Errorf("Failed to set pipeline gw state to terminated for %s", pv.String()) + } + return + } logger.Debug("Pipeline is being terminated, sending deletion message") if err := s.pipelineHandler.SetPipelineGwPipelineState( pv.Name, diff --git a/scheduler/pkg/server/pipeline_status_test.go b/scheduler/pkg/server/pipeline_status_test.go index 9a8c4ad9f5..9c261bc056 100644 --- a/scheduler/pkg/server/pipeline_status_test.go +++ b/scheduler/pkg/server/pipeline_status_test.go @@ -12,6 +12,7 @@ package server import ( "context" "fmt" + "sync" "testing" "time" @@ -1046,3 +1047,265 @@ func TestPipelineGwRebalance(t *testing.T) { }) } } + +// TestPipelineUpdateDoesNotClearEnvoyRoutes reproduces the race condition described in +// https://github.com/SeldonIO/seldon-core/issues/7072. +// +// When a pipeline is updated (new version created), the scheduler correctly marks the old +// version for termination. However, the old version's PipelineTerminate event was +// unconditionally clearing envoy routes — even though the new version was already healthy +// and serving traffic. This caused a window where the pipeline returned "no healthy upstream". +func TestPipelineUpdateDoesNotClearEnvoyRoutes(t *testing.T) { + g := NewGomegaWithT(t) + ctx := context.Background() + + s, hub := createTestScheduler(t) + + // Subscribe to envoy route update events so we can assert what gets published. + var streamEvents []coordinator.PipelineStreamsEventMsg + var mu sync.Mutex + hub.RegisterPipelineStreamsEventHandler( + "test-envoy-capture", 100, log.New(), + func(event coordinator.PipelineStreamsEventMsg) { + mu.Lock() + streamEvents = append(streamEvents, event) + mu.Unlock() + }, + ) + + // Add an operator (non-gateway) stream so sendPipelineEvents has somewhere to write status. + operatorStream := newStubPipelineStatusServer(100, 5*time.Millisecond, ctx) + s.pipelineEventStream.mu.Lock() + s.pipelineEventStream.streams[operatorStream] = &PipelineSubscription{ + name: "operator", + stream: operatorStream, + fin: make(chan bool), + } + s.pipelineEventStream.mu.Unlock() + + // Add pipeline version 1. + err := s.pipelineHandler.AddPipeline(&pb.Pipeline{ + Name: "test-pipeline", + Uid: "uid-v1", + Steps: []*pb.PipelineStep{{Name: "step1"}}, + }) + g.Expect(err).To(BeNil()) + + // Add pipeline version 2 (simulates a spec update — same name, new UID/steps). + err = s.pipelineHandler.AddPipeline(&pb.Pipeline{ + Name: "test-pipeline", + Uid: "uid-v2", + Steps: []*pb.PipelineStep{ + {Name: "step1"}, + {Name: "step2", Inputs: []string{"step1.outputs"}}, + }, + }) + g.Expect(err).To(BeNil()) + + // Let initial AddPipeline events propagate and settle. + time.Sleep(500 * time.Millisecond) + + // Confirm v2 is the latest version. + isLatest, err := s.pipelineHandler.IsLatestVersion("test-pipeline", 2, "uid-v2") + g.Expect(err).To(BeNil()) + g.Expect(isLatest).To(BeTrue()) + + // Reset captured events — we only care about what happens from this point on. + mu.Lock() + streamEvents = nil + mu.Unlock() + + // Simulate the scheduler marking v1's pipeline-gw state as PipelineTerminate. + // In production this is done by terminatePipelineGwOldUnterminatedPipelinesIfNeeded + // (triggered when v2 becomes ready), which sets the in-memory state and then an + // event is subsequently published that causes sendPipelineEvents to run for v1. + // We replicate that by calling SetPipelineGwPipelineState directly, which both + // sets the state and publishes an event that sendPipelineEvents will process. + err = s.pipelineHandler.SetPipelineGwPipelineState( + "test-pipeline", 1, "uid-v1", pipeline.PipelineTerminate, "", "simulate-v2-ready", + ) + g.Expect(err).To(BeNil()) + + // Allow the event to propagate through the event hub. + time.Sleep(500 * time.Millisecond) + + // Assert: no envoy update with an empty stream list was published for the pipeline. + // An empty stream list would remove the pipeline from envoy routing, causing + // "no healthy upstream" for a pipeline whose new version is already healthy. + mu.Lock() + defer mu.Unlock() + for _, event := range streamEvents { + if event.PipelineName == "test-pipeline" { + g.Expect(event.StreamNames).ToNot( + BeEmpty(), + "envoy routes for test-pipeline were incorrectly cleared while version 2 is active "+ + "(see https://github.com/SeldonIO/seldon-core/issues/7072)", + ) + } + } +} + +// TestPipelineUpdateOldVersionDeleteNotSentToGateway ensures that when a pipeline is updated +// and the old version is terminated, the delete message is NOT forwarded to the pipeline +// gateway. The pipeline-gw loads pipelines by name, so sending a delete for the old version +// calls DeletePipeline(name) in the pipeline-gw's KafkaManager — which uses the pipeline +// name as the lookup key, the same key held by the new version. This wipes the new version's +// Kafka consumer and breaks the pipeline even though envoy routes remain intact. +// See: https://github.com/SeldonIO/seldon-core/issues/7072 +func TestPipelineUpdateOldVersionDeleteNotSentToGateway(t *testing.T) { + g := NewGomegaWithT(t) + ctx := context.Background() + + s, _ := createTestScheduler(t) + + // Register an operator (non-gateway) stream. + operatorStream := newStubPipelineStatusServer(100, 5*time.Millisecond, ctx) + s.pipelineEventStream.mu.Lock() + s.pipelineEventStream.streams[operatorStream] = &PipelineSubscription{ + name: "operator", + stream: operatorStream, + fin: make(chan bool), + } + s.pipelineEventStream.mu.Unlock() + + // Register a pipeline gateway stream. + pipelineGwStream := newStubPipelineStatusServer(20, 5*time.Millisecond, ctx) + s.pipelineEventStream.mu.Lock() + s.pipelineEventStream.streams[pipelineGwStream] = &PipelineSubscription{ + name: "pipelinegw-1", + stream: pipelineGwStream, + isPipelineGateway: true, + fin: make(chan bool), + } + s.pipelineEventStream.namesToIps["pipelinegw-1"] = "10.0.0.1" + s.pipelineEventStream.mu.Unlock() + s.pipelineGWLoadBalancer.AddServer("pipelinegw-1") + + // Add pipeline version 1. + err := s.pipelineHandler.AddPipeline(&pb.Pipeline{ + Name: "test-pipeline", + Uid: "uid-v1", + Steps: []*pb.PipelineStep{{Name: "step1"}}, + }) + g.Expect(err).To(BeNil()) + + // Add pipeline version 2 (simulates a spec update — same name, new UID/steps). + err = s.pipelineHandler.AddPipeline(&pb.Pipeline{ + Name: "test-pipeline", + Uid: "uid-v2", + Steps: []*pb.PipelineStep{ + {Name: "step1"}, + {Name: "step2", Inputs: []string{"step1.outputs"}}, + }, + }) + g.Expect(err).To(BeNil()) + + // Let initial events settle, then drain all messages from the gateway stream so + // we only assert on what happens after we trigger v1's termination. + time.Sleep(500 * time.Millisecond) + for len(pipelineGwStream.msgs) > 0 { + <-pipelineGwStream.msgs + } + + // Simulate the scheduler marking v1's pipeline-gw state as PipelineTerminate. + // In production this is triggered when v2 becomes ready. + err = s.pipelineHandler.SetPipelineGwPipelineState( + "test-pipeline", 1, "uid-v1", pipeline.PipelineTerminate, "", "simulate-v2-ready", + ) + g.Expect(err).To(BeNil()) + + // Allow the event to propagate through the event hub. + time.Sleep(500 * time.Millisecond) + + // Assert: no PipelineDelete operation was sent to the pipeline gateway for the old version. + // A delete message would cause the pipeline-gw to call DeletePipeline(name), which removes + // the entry for "test-pipeline.pipeline" from its KafkaManager — the same entry now owned + // by v2. This breaks v2's inference path despite the envoy route remaining intact. + for len(pipelineGwStream.msgs) > 0 { + msg := <-pipelineGwStream.msgs + g.Expect(msg.Operation).ToNot( + Equal(pb.PipelineStatusResponse_PipelineDelete), + "pipeline gateway received a delete message for the old pipeline version — "+ + "this would wipe the new version's Kafka consumer "+ + "(see https://github.com/SeldonIO/seldon-core/issues/7072)", + ) + } +} + +// TestPipelineDeleteLatestVersionSentToGateway verifies that the IsLatestVersion guard +// introduced for issue #7072 does NOT block delete messages for normal pipeline deletion +// (i.e. when the pipeline being terminated is itself the latest version). This ensures the +// guard only suppresses deletes for old versions during pipeline updates, and does not +// interfere with the normal pipeline deletion flow. +func TestPipelineDeleteLatestVersionSentToGateway(t *testing.T) { + g := NewGomegaWithT(t) + ctx := context.Background() + + s, _ := createTestScheduler(t) + + // Register an operator (non-gateway) stream. + operatorStream := newStubPipelineStatusServer(100, 5*time.Millisecond, ctx) + s.pipelineEventStream.mu.Lock() + s.pipelineEventStream.streams[operatorStream] = &PipelineSubscription{ + name: "operator", + stream: operatorStream, + fin: make(chan bool), + } + s.pipelineEventStream.mu.Unlock() + + // Register a pipeline gateway stream. + pipelineGwStream := newStubPipelineStatusServer(20, 5*time.Millisecond, ctx) + s.pipelineEventStream.mu.Lock() + s.pipelineEventStream.streams[pipelineGwStream] = &PipelineSubscription{ + name: "pipelinegw-1", + stream: pipelineGwStream, + isPipelineGateway: true, + fin: make(chan bool), + } + s.pipelineEventStream.namesToIps["pipelinegw-1"] = "10.0.0.1" + s.pipelineEventStream.mu.Unlock() + s.pipelineGWLoadBalancer.AddServer("pipelinegw-1") + + // Add only a single pipeline version (no update — this is the only and latest version). + err := s.pipelineHandler.AddPipeline(&pb.Pipeline{ + Name: "test-pipeline", + Uid: "uid-v1", + Steps: []*pb.PipelineStep{{Name: "step1"}}, + }) + g.Expect(err).To(BeNil()) + + // Confirm it is the latest (and only) version. + isLatest, err := s.pipelineHandler.IsLatestVersion("test-pipeline", 1, "uid-v1") + g.Expect(err).To(BeNil()) + g.Expect(isLatest).To(BeTrue()) + + // Let initial creation events settle, then drain the gateway stream. + time.Sleep(500 * time.Millisecond) + for len(pipelineGwStream.msgs) > 0 { + <-pipelineGwStream.msgs + } + + // Trigger termination of the latest (and only) version — this is the normal deletion path, + // not a pipeline update. The guard should NOT suppress the delete message here. + err = s.pipelineHandler.SetPipelineGwPipelineState( + "test-pipeline", 1, "uid-v1", pipeline.PipelineTerminate, "", "simulate-pipeline-delete", + ) + g.Expect(err).To(BeNil()) + + // Allow the event to propagate through the event hub. + time.Sleep(500 * time.Millisecond) + + // Assert: the gateway received a PipelineDelete message. Suppressing it here would + // leave the pipeline's Kafka consumer alive in the pipeline-gw even after deletion. + var gotDelete bool + for len(pipelineGwStream.msgs) > 0 { + msg := <-pipelineGwStream.msgs + if msg.Operation == pb.PipelineStatusResponse_PipelineDelete { + gotDelete = true + } + } + g.Expect(gotDelete).To(BeTrue(), + "pipeline gateway did not receive a delete message for the latest pipeline version — "+ + "normal pipeline deletion requires the delete to be forwarded to the gateway", + ) +}