diff --git a/func/internal/podcachemanager.go b/func/internal/podcachemanager.go index f7359de5c..2d5382fa7 100644 --- a/func/internal/podcachemanager.go +++ b/func/internal/podcachemanager.go @@ -97,7 +97,8 @@ func (pcm *podCacheManager) redistributeLoad(image string, fn *functionInfo, con for _, ch := range connections { bestPodIndex, _ := pcm.findBestPod(fn) if bestPodIndex != -1 { - pod := pcm.functions[image].pods[bestPodIndex] + pod := &pcm.functions[image].pods[bestPodIndex] + pod.concurrentEvaluations.Add(1) if pod.podData != nil { pod.SendResponse(ch, nil) } else { @@ -216,12 +217,29 @@ func (pcm *podCacheManager) podCacheManager(ctx context.Context) { return pod.podData != nil && pod.podKey != nil && *pod.podKey == evict.podKey }) if idx != -1 { - klog.Infof("Evicting dead pod %s from cache for image %s (Unavailable)", evict.podKey.Name, evict.image) - pcm.DeletePodWithServiceInBackgroundByObjectKey(*fn.pods[idx].podData) - fn.pods = slices.Delete(fn.pods, idx, idx+1) - } else { - // Best-effort cleanup of any other stale entries for this image. - pcm.removeUnhealthyPods(fn, false) + // Check if the pod still exists and is healthy in k8s. + // Use a bounded context to avoid blocking the event loop on API-server issues. + k8sPod := &corev1.Pod{} + getCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + err := pcm.podManager.kubeClient.Get(getCtx, *fn.pods[idx].podKey, k8sPod) + cancel() + if apierrors.IsNotFound(err) { + klog.Infof("Evicting missing pod %s from cache for image %s (Unavailable)", evict.podKey.Name, evict.image) + if fn.pods[idx].grpcConnection != nil { + fn.pods[idx].grpcConnection.Close() + } + fn.pods = slices.Delete(fn.pods, idx, idx+1) + } else if err != nil { + // Transient API error — keep the pod in cache rather than evicting a healthy pod. + klog.Warningf("Failed to confirm pod health for %s/%s; keeping it in cache: %v", evict.podKey.Namespace, evict.podKey.Name, err) + } else if k8sPod.Status.Phase != corev1.PodRunning || k8sPod.DeletionTimestamp != nil { + klog.Infof("Evicting dead pod %s from cache for image %s (Unavailable)", evict.podKey.Name, evict.image) + if fn.pods[idx].grpcConnection != nil { + fn.pods[idx].grpcConnection.Close() + } + pcm.DeletePodInBackground(k8sPod) + fn.pods = slices.Delete(fn.pods, idx, idx+1) + } } if evict.doneCh != nil { close(evict.doneCh) @@ -314,6 +332,15 @@ func (pcm *podCacheManager) retrieveFunctionPods(ctx context.Context) error { if len(fn.pods) < pcm.maxParallelPodsPerFunction && pod.Status.Phase == corev1.PodRunning { pData, err := pcm.podManager.createPodData(ctx, serviceKey, podKey, image) if err == nil { + // Verify gRPC is reachable before adding to cache + if !pcm.podManager.skipGrpcReadyCheck { + if grpcErr := pcm.podManager.waitForGrpcReady(ctx, pData.grpcConnection); grpcErr != nil { + klog.Warningf("retrieved pod %s/%s for %s but gRPC not ready, deleting: %v", pod.Namespace, pod.Name, image, grpcErr) + pData.grpcConnection.Close() + pcm.DeletePodInBackground(&pod) + continue + } + } klog.Infof("retrieved function evaluator pod %s/%s for %s", pod.Namespace, pod.Name, image) fn.pods = append(fn.pods, NewPodInfo(nil)) pcm.podManager.podReadyCh <- &podReadyResponse{ diff --git a/func/internal/podcachemanager_eventloop_test.go b/func/internal/podcachemanager_eventloop_test.go index e90eb42c3..85ca98a7e 100644 --- a/func/internal/podcachemanager_eventloop_test.go +++ b/func/internal/podcachemanager_eventloop_test.go @@ -60,6 +60,7 @@ func newTestEventLoopPCM(kubeClient client.Client) (*podCacheManager, chan *conn podReadyCh: readyCh, podReadyTimeout: 2 * time.Second, managerNamespace: defaultNamespace, + skipGrpcReadyCheck: true, }, } return pcm, reqCh, readyCh, evictCh @@ -321,128 +322,138 @@ func TestRetrieveFunctionPods_EmptyPodList(t *testing.T) { assert.Empty(t, pcm.functions) } -func TestEventLoop_EvictionRemovesPodByKey(t *testing.T) { - podKey := client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace} - serviceKey := client.ObjectKey{Name: "evict-svc", Namespace: defaultNamespace} - serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix - address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) - conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - - k8sPod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "evict-pod", Namespace: defaultNamespace}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - } - k8sSvc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "evict-svc", Namespace: defaultNamespace}, - } - - kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build() - pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient) - - readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0) - pcm.functions["test-image"] = &functionInfo{ - pods: []functionPodInfo{readyPod}, - } - - go pcm.podCacheManager(t.Context()) - - // Send eviction for the specific pod - doneCh := make(chan struct{}) - evictCh <- &podEvictionRequest{ - image: "test-image", - podKey: podKey, - doneCh: doneCh, - } - - select { - case <-doneCh: - case <-time.After(5 * time.Second): - t.Fatal("eviction did not complete") - } - - // Verify pod was removed from cache - fn := pcm.functions["test-image"] - assert.Empty(t, fn.pods, "evicted pod should be removed from cache") - - // Verify k8s pod was deleted (background delete) - deadline := time.Now().Add(2 * time.Second) - for { - var pod corev1.Pod - err := kubeClient.Get(t.Context(), podKey, &pod) - if apierrors.IsNotFound(err) { - break - } - if time.Now().After(deadline) { - assert.True(t, apierrors.IsNotFound(err), "k8s pod should be deleted") - break - } - time.Sleep(20 * time.Millisecond) - } -} - -func TestEventLoop_EvictionUnknownImage(t *testing.T) { - kubeClient := fake.NewClientBuilder().Build() - pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient) - - go pcm.podCacheManager(t.Context()) - - // Send eviction for an image not in the cache - doneCh := make(chan struct{}) - evictCh <- &podEvictionRequest{ - image: "unknown-image", - podKey: client.ObjectKey{Name: "no-pod", Namespace: defaultNamespace}, - doneCh: doneCh, - } - - select { - case <-doneCh: - // doneCh closed even for unknown image — no hang - case <-time.After(5 * time.Second): - t.Fatal("eviction for unknown image should still close doneCh") - } -} - -func TestEventLoop_EvictionPodKeyNotFound(t *testing.T) { - // Pod in cache has a different key than the eviction request - podKey := client.ObjectKey{Name: "real-pod", Namespace: defaultNamespace} - serviceKey := client.ObjectKey{Name: "real-svc", Namespace: defaultNamespace} - serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix - address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) - conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - - k8sPod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "real-pod", Namespace: defaultNamespace}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - } - k8sSvc := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "real-svc", Namespace: defaultNamespace}, - } - - kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build() - pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient) - - readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0) - pcm.functions["test-image"] = &functionInfo{ - pods: []functionPodInfo{readyPod}, - } - - go pcm.podCacheManager(t.Context()) - - // Send eviction with a non-matching podKey → falls back to removeUnhealthyPods - doneCh := make(chan struct{}) - evictCh <- &podEvictionRequest{ - image: "test-image", - podKey: client.ObjectKey{Name: "wrong-pod", Namespace: defaultNamespace}, - doneCh: doneCh, +func TestEventLoop_Eviction(t *testing.T) { + now := metav1.Now() + + tests := []struct { + name string + k8sObjects []client.Object + evictImage string + evictPodKey client.ObjectKey + expectRemoved bool + expectMessage string + }{ + { + name: "pod not found in k8s is removed from cache", + k8sObjects: nil, + evictImage: "test-image", + evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace}, + expectRemoved: true, + }, + { + name: "pod in Failed state is removed from cache", + k8sObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "evict-pod", Namespace: defaultNamespace}, + Status: corev1.PodStatus{Phase: corev1.PodFailed}, + }, + }, + evictImage: "test-image", + evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace}, + expectRemoved: true, + }, + { + name: "pod with DeletionTimestamp is removed from cache", + k8sObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "evict-pod", + Namespace: defaultNamespace, + DeletionTimestamp: &now, + Finalizers: []string{"test-finalizer"}, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + }, + evictImage: "test-image", + evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace}, + expectRemoved: true, + }, + { + name: "healthy Running pod is kept in cache", + k8sObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "evict-pod", Namespace: defaultNamespace}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + }, + evictImage: "test-image", + evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace}, + expectRemoved: false, + }, + { + name: "unknown image closes doneCh without error", + k8sObjects: nil, + evictImage: "unknown-image", + evictPodKey: client.ObjectKey{Name: "no-pod", Namespace: defaultNamespace}, + expectRemoved: false, // no function entry, nothing to remove + }, + { + name: "non-matching podKey is a no-op", + k8sObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "real-pod", Namespace: defaultNamespace}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + }, + evictImage: "test-image", + evictPodKey: client.ObjectKey{Name: "wrong-pod", Namespace: defaultNamespace}, + expectRemoved: false, + }, } - select { - case <-doneCh: - case <-time.After(5 * time.Second): - t.Fatal("eviction with non-matching podKey should still close doneCh") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + podKey := client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace} + serviceKey := client.ObjectKey{Name: "evict-svc", Namespace: defaultNamespace} + serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix + address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort) + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if !assert.NoError(t, err) { + return + } + t.Cleanup(func() { _ = conn.Close() }) + + builder := fake.NewClientBuilder() + if len(tt.k8sObjects) > 0 { + builder = builder.WithObjects(tt.k8sObjects...) + } + kubeClient := builder.Build() + pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient) + + // Only set up cache entry if the eviction targets "test-image" + if tt.evictImage == "test-image" { + readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0) + pcm.functions["test-image"] = &functionInfo{ + pods: []functionPodInfo{readyPod}, + } + } + + go pcm.podCacheManager(t.Context()) + + doneCh := make(chan struct{}) + evictCh <- &podEvictionRequest{ + image: tt.evictImage, + podKey: tt.evictPodKey, + doneCh: doneCh, + } + + select { + case <-doneCh: + case <-time.After(5 * time.Second): + t.Fatal("eviction did not complete") + } + + fn := pcm.functions["test-image"] + if tt.expectRemoved { + if fn != nil { + assert.Empty(t, fn.pods, "pod should be removed from cache") + } + } else { + if fn != nil { + assert.Len(t, fn.pods, 1, "pod should remain in cache") + } + } + }) } - - // Pod should still be in cache (it's healthy, removeUnhealthyPods won't remove it) - fn := pcm.functions["test-image"] - assert.Len(t, fn.pods, 1, "healthy pod should remain in cache when podKey doesn't match") } diff --git a/func/internal/podevaluator.go b/func/internal/podevaluator.go index 5085d50c0..4a5c1b449 100644 --- a/func/internal/podevaluator.go +++ b/func/internal/podevaluator.go @@ -236,14 +236,10 @@ func (pe *podEvaluator) EvaluateFunction(ctx context.Context, req *evaluator.Eva } }() - // First attempt: fail fast if pod is dead (no WaitForReady). - // Retries: use WaitForReady since eviction cleaned stale pods and - // the retry may get a newly-created pod that is still starting. - var callOpts []grpc.CallOption - if attempt > 0 { - callOpts = append(callOpts, grpc.WaitForReady(true)) - } - resp, err := evaluator.NewFunctionEvaluatorClient(pod.grpcConnection).EvaluateFunction(ctx, req, callOpts...) + // Pod is guaranteed to have an active gRPC connection (verified + // during pod readiness via waitForGrpcReady). Unavailable means + // the pod died after being connected. + resp, err := evaluator.NewFunctionEvaluatorClient(pod.grpcConnection).EvaluateFunction(ctx, req) if err != nil { // Retry only on Unavailable — indicates the pod is dead/unreachable: // connection refused (pod deleted), connection reset (pod crashed), diff --git a/func/internal/podevaluator_podcachemanager_test.go b/func/internal/podevaluator_podcachemanager_test.go index 1ebc547ad..f5defdcec 100644 --- a/func/internal/podevaluator_podcachemanager_test.go +++ b/func/internal/podevaluator_podcachemanager_test.go @@ -336,6 +336,7 @@ func TestPodCacheManager(t *testing.T) { managerNamespace: defaultNamespace, enablePrivateRegistries: false, podReadyCh: podReadyCh, + skipGrpcReadyCheck: true, } for k, v := range defaultImageMetadataCache { diff --git a/func/internal/podevaluator_podmanager_test.go b/func/internal/podevaluator_podmanager_test.go index 2a5832ac6..226affa30 100644 --- a/func/internal/podevaluator_podmanager_test.go +++ b/func/internal/podevaluator_podmanager_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "gopkg.in/yaml.v3" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -608,6 +609,7 @@ func TestPodManager(t *testing.T) { enablePrivateRegistriesTls: false, tlsSecretPath: "/var/tmp/tls-secret/", + skipGrpcReadyCheck: true, } for k, v := range tt.imageMetadataCache { @@ -852,3 +854,99 @@ func deepCopyObject(in, out interface{}) { panic(err) } } + +func TestWaitForGrpcReady_Success(t *testing.T) { + // Start a real gRPC server + addr, cleanup := startFakeEvalServer(t, func(_ context.Context, _ *pb.EvaluateFunctionRequest) (*pb.EvaluateFunctionResponse, error) { + return &pb.EvaluateFunctionResponse{ResourceList: []byte("ok")}, nil + }) + defer cleanup() + + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + pm := &podManager{podReadyTimeout: 5 * time.Second} + err = pm.waitForGrpcReady(context.Background(), conn) + assert.NoError(t, err, "should connect to running server") +} + +func TestWaitForGrpcReady_Timeout(t *testing.T) { + // Connect to an address where nothing is listening + conn, err := grpc.NewClient("127.0.0.1:1", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + pm := &podManager{podReadyTimeout: 500 * time.Millisecond} + err = pm.waitForGrpcReady(context.Background(), conn) + assert.Error(t, err, "should timeout on unreachable server") + assert.Contains(t, err.Error(), "did not become ready") +} + +func TestGetFuncEvalPodClient_WaitForGrpcReadyFailure(t *testing.T) { + const testNs = "test-ns" + const testImage = "test-fn-image" + const podName = "test-fn-image-1-abcd1234" + const serviceName = podName + + // Service exists but the DNS name (serviceName.namespace.svc.cluster.local:9446) + // won't resolve in test — waitForGrpcReady will timeout trying to connect. + k8sPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: testNs, + Labels: map[string]string{krmFunctionImageLabel: serviceName}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + PodIP: "127.0.0.1", + Conditions: []corev1.PodCondition{ + {Type: corev1.PodReady, Status: corev1.ConditionTrue}, + }, + }, + } + k8sSvc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: testNs}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{Port: 9446}}, + }, + } + k8sEndpoint := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: testNs}, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []corev1.EndpointPort{{Port: 9446}}, + }, + }, + } + + kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc, k8sEndpoint).Build() + + podReadyCh := make(chan *podReadyResponse, 1) + pm := &podManager{ + kubeClient: kubeClient, + namespace: testNs, + wrapperServerImage: defaultWrapperServerImage, + imageMetadataCache: sync.Map{}, + podReadyCh: podReadyCh, + podReadyTimeout: 500 * time.Millisecond, // short timeout + managerNamespace: testNs, + maxGrpcMessageSize: 4 * 1024 * 1024, + skipGrpcReadyCheck: false, + } + + serviceKey := client.ObjectKey{Name: serviceName, Namespace: testNs} + podKey := client.ObjectKey{Name: podName, Namespace: testNs} + + pData, err := pm.createPodData(context.Background(), serviceKey, podKey, testImage) + require.NoError(t, err) + require.NotNil(t, pData.grpcConnection) + + // waitForGrpcReady should fail — the service DNS name won't resolve in this unit test, so the connection never becomes READY + err = pm.waitForGrpcReady(context.Background(), pData.grpcConnection) + assert.Error(t, err, "waitForGrpcReady should fail on unreachable server") + assert.Contains(t, err.Error(), "did not become ready") + + pData.grpcConnection.Close() +} diff --git a/func/internal/podmanager.go b/func/internal/podmanager.go index c77f33fdd..542fb76c5 100644 --- a/func/internal/podmanager.go +++ b/func/internal/podmanager.go @@ -38,6 +38,7 @@ import ( configapi "github.com/kptdev/porch/api/porchconfig/v1alpha1" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -219,6 +220,8 @@ type podManager struct { imageResolver runneroptions.ImageResolveFunc // tagResolver is used to resolve the tag of the given image tagResolver runtime.TagResolver + // skipGrpcReadyCheck disables the gRPC readiness verification during pod creation (for testing) + skipGrpcReadyCheck bool } type digestAndEntrypoint struct { @@ -257,6 +260,26 @@ func (pm *podManager) createPodData(ctx context.Context, serviceKey client.Objec return podData, err } +// waitForGrpcReady triggers a connection attempt and waits until the gRPC +// connection reaches READY state or the context expires. +func (pm *podManager) waitForGrpcReady(ctx context.Context, cc *grpc.ClientConn) error { + ctx, cancel := context.WithTimeout(ctx, pm.podReadyTimeout) + defer cancel() + cc.Connect() + for { + state := cc.GetState() + if state == connectivity.Ready { + return nil + } + if state == connectivity.Shutdown { + return fmt.Errorf("gRPC connection shut down before becoming ready") + } + if !cc.WaitForStateChange(ctx, state) { + return fmt.Errorf("gRPC connection did not become ready (last state: %v): %w", state, ctx.Err()) + } + } +} + // getFuncEvalPodClient ensures there is a pod running and ready for the image. // It will send it to the podReadyCh channel when the pod is ready. ttl is the // time-to-live period for the pod. If useGenerateName is false, it will try to @@ -285,7 +308,18 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, po } serviceKey := client.ObjectKeyFromObject(serviceTemplate) podData, err = pm.createPodData(ctx, serviceKey, podKey, image) - return podData, err + if err != nil { + return podData, err + } + // Verify the gRPC server is actually accepting connections before marking ready. + if !pm.skipGrpcReadyCheck { + if err := pm.waitForGrpcReady(ctx, podData.grpcConnection); err != nil { + podData.grpcConnection.Close() + podData.grpcConnection = nil + return podData, fmt.Errorf("gRPC server not ready for pod %s/%s: %w", podKey.Namespace, podKey.Name, err) + } + } + return podData, nil }() pm.podReadyCh <- &podReadyResponse{