Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions func/internal/podcachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (pcm *podCacheManager) redistributeLoad(image string, fn *functionInfo, con
for _, ch := range connections {
bestPodIndex, _ := pcm.findBestPod(fn)
if bestPodIndex != -1 {
pod := pcm.functions[image].pods[bestPodIndex]
pod := &pcm.functions[image].pods[bestPodIndex]
pod.concurrentEvaluations.Add(1)
if pod.podData != nil {
pod.SendResponse(ch, nil)
} else {
Expand Down Expand Up @@ -216,12 +217,29 @@ func (pcm *podCacheManager) podCacheManager(ctx context.Context) {
return pod.podData != nil && pod.podKey != nil && *pod.podKey == evict.podKey
})
if idx != -1 {
klog.Infof("Evicting dead pod %s from cache for image %s (Unavailable)", evict.podKey.Name, evict.image)
pcm.DeletePodWithServiceInBackgroundByObjectKey(*fn.pods[idx].podData)
fn.pods = slices.Delete(fn.pods, idx, idx+1)
} else {
// Best-effort cleanup of any other stale entries for this image.
pcm.removeUnhealthyPods(fn, false)
// Check if the pod still exists and is healthy in k8s.
// Use a bounded context to avoid blocking the event loop on API-server issues.
k8sPod := &corev1.Pod{}
getCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
err := pcm.podManager.kubeClient.Get(getCtx, *fn.pods[idx].podKey, k8sPod)
cancel()
if apierrors.IsNotFound(err) {
klog.Infof("Evicting missing pod %s from cache for image %s (Unavailable)", evict.podKey.Name, evict.image)
if fn.pods[idx].grpcConnection != nil {
fn.pods[idx].grpcConnection.Close()
}
fn.pods = slices.Delete(fn.pods, idx, idx+1)
} else if err != nil {
// Transient API error — keep the pod in cache rather than evicting a healthy pod.
klog.Warningf("Failed to confirm pod health for %s/%s; keeping it in cache: %v", evict.podKey.Namespace, evict.podKey.Name, err)
} else if k8sPod.Status.Phase != corev1.PodRunning || k8sPod.DeletionTimestamp != nil {
klog.Infof("Evicting dead pod %s from cache for image %s (Unavailable)", evict.podKey.Name, evict.image)
if fn.pods[idx].grpcConnection != nil {
fn.pods[idx].grpcConnection.Close()
}
pcm.DeletePodInBackground(k8sPod)
fn.pods = slices.Delete(fn.pods, idx, idx+1)
}
Comment on lines +235 to +242

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question on this, if the Pod is Running, don't we end up leaving the same dead grpcConnection in the cache?

@kushnaidu kushnaidu Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be dead and the grpc will timeout eventually and let go of it. We don't want to close it because it might crash other parallel in RPC calls. This can happen especially when the liveliness probe makes the pod go down under high load. However, the GC as well on its next scan interval will clean up if the pod is dead.

}
if evict.doneCh != nil {
close(evict.doneCh)
Expand Down Expand Up @@ -314,6 +332,15 @@ func (pcm *podCacheManager) retrieveFunctionPods(ctx context.Context) error {
if len(fn.pods) < pcm.maxParallelPodsPerFunction && pod.Status.Phase == corev1.PodRunning {
pData, err := pcm.podManager.createPodData(ctx, serviceKey, podKey, image)
if err == nil {
// Verify gRPC is reachable before adding to cache
if !pcm.podManager.skipGrpcReadyCheck {
if grpcErr := pcm.podManager.waitForGrpcReady(ctx, pData.grpcConnection); grpcErr != nil {
klog.Warningf("retrieved pod %s/%s for %s but gRPC not ready, deleting: %v", pod.Namespace, pod.Name, image, grpcErr)
pData.grpcConnection.Close()
pcm.DeletePodInBackground(&pod)
continue
}
}
klog.Infof("retrieved function evaluator pod %s/%s for %s", pod.Namespace, pod.Name, image)
fn.pods = append(fn.pods, NewPodInfo(nil))
pcm.podManager.podReadyCh <- &podReadyResponse{
Expand Down
253 changes: 132 additions & 121 deletions func/internal/podcachemanager_eventloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func newTestEventLoopPCM(kubeClient client.Client) (*podCacheManager, chan *conn
podReadyCh: readyCh,
podReadyTimeout: 2 * time.Second,
managerNamespace: defaultNamespace,
skipGrpcReadyCheck: true,
},
}
return pcm, reqCh, readyCh, evictCh
Expand Down Expand Up @@ -321,128 +322,138 @@ func TestRetrieveFunctionPods_EmptyPodList(t *testing.T) {
assert.Empty(t, pcm.functions)
}

func TestEventLoop_EvictionRemovesPodByKey(t *testing.T) {
podKey := client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace}
serviceKey := client.ObjectKey{Name: "evict-svc", Namespace: defaultNamespace}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))

k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "evict-pod", Namespace: defaultNamespace},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "evict-svc", Namespace: defaultNamespace},
}

kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build()
pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient)

readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0)
pcm.functions["test-image"] = &functionInfo{
pods: []functionPodInfo{readyPod},
}

go pcm.podCacheManager(t.Context())

// Send eviction for the specific pod
doneCh := make(chan struct{})
evictCh <- &podEvictionRequest{
image: "test-image",
podKey: podKey,
doneCh: doneCh,
}

select {
case <-doneCh:
case <-time.After(5 * time.Second):
t.Fatal("eviction did not complete")
}

// Verify pod was removed from cache
fn := pcm.functions["test-image"]
assert.Empty(t, fn.pods, "evicted pod should be removed from cache")

// Verify k8s pod was deleted (background delete)
deadline := time.Now().Add(2 * time.Second)
for {
var pod corev1.Pod
err := kubeClient.Get(t.Context(), podKey, &pod)
if apierrors.IsNotFound(err) {
break
}
if time.Now().After(deadline) {
assert.True(t, apierrors.IsNotFound(err), "k8s pod should be deleted")
break
}
time.Sleep(20 * time.Millisecond)
}
}

func TestEventLoop_EvictionUnknownImage(t *testing.T) {
kubeClient := fake.NewClientBuilder().Build()
pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient)

go pcm.podCacheManager(t.Context())

// Send eviction for an image not in the cache
doneCh := make(chan struct{})
evictCh <- &podEvictionRequest{
image: "unknown-image",
podKey: client.ObjectKey{Name: "no-pod", Namespace: defaultNamespace},
doneCh: doneCh,
}

select {
case <-doneCh:
// doneCh closed even for unknown image — no hang
case <-time.After(5 * time.Second):
t.Fatal("eviction for unknown image should still close doneCh")
}
}

func TestEventLoop_EvictionPodKeyNotFound(t *testing.T) {
// Pod in cache has a different key than the eviction request
podKey := client.ObjectKey{Name: "real-pod", Namespace: defaultNamespace}
serviceKey := client.ObjectKey{Name: "real-svc", Namespace: defaultNamespace}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, _ := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))

k8sPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "real-pod", Namespace: defaultNamespace},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
}
k8sSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "real-svc", Namespace: defaultNamespace},
}

kubeClient := fake.NewClientBuilder().WithObjects(k8sPod, k8sSvc).Build()
pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient)

readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0)
pcm.functions["test-image"] = &functionInfo{
pods: []functionPodInfo{readyPod},
}

go pcm.podCacheManager(t.Context())

// Send eviction with a non-matching podKey → falls back to removeUnhealthyPods
doneCh := make(chan struct{})
evictCh <- &podEvictionRequest{
image: "test-image",
podKey: client.ObjectKey{Name: "wrong-pod", Namespace: defaultNamespace},
doneCh: doneCh,
func TestEventLoop_Eviction(t *testing.T) {
now := metav1.Now()

tests := []struct {
name string
k8sObjects []client.Object
evictImage string
evictPodKey client.ObjectKey
expectRemoved bool
expectMessage string
}{
{
name: "pod not found in k8s is removed from cache",
k8sObjects: nil,
evictImage: "test-image",
evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace},
expectRemoved: true,
},
{
name: "pod in Failed state is removed from cache",
k8sObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "evict-pod", Namespace: defaultNamespace},
Status: corev1.PodStatus{Phase: corev1.PodFailed},
},
},
evictImage: "test-image",
evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace},
expectRemoved: true,
},
{
name: "pod with DeletionTimestamp is removed from cache",
k8sObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "evict-pod",
Namespace: defaultNamespace,
DeletionTimestamp: &now,
Finalizers: []string{"test-finalizer"},
},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
},
},
evictImage: "test-image",
evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace},
expectRemoved: true,
},
{
name: "healthy Running pod is kept in cache",
k8sObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "evict-pod", Namespace: defaultNamespace},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
},
},
evictImage: "test-image",
evictPodKey: client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace},
expectRemoved: false,
},
{
name: "unknown image closes doneCh without error",
k8sObjects: nil,
evictImage: "unknown-image",
evictPodKey: client.ObjectKey{Name: "no-pod", Namespace: defaultNamespace},
expectRemoved: false, // no function entry, nothing to remove
},
{
name: "non-matching podKey is a no-op",
k8sObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "real-pod", Namespace: defaultNamespace},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
},
},
evictImage: "test-image",
evictPodKey: client.ObjectKey{Name: "wrong-pod", Namespace: defaultNamespace},
expectRemoved: false,
},
}

select {
case <-doneCh:
case <-time.After(5 * time.Second):
t.Fatal("eviction with non-matching podKey should still close doneCh")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
podKey := client.ObjectKey{Name: "evict-pod", Namespace: defaultNamespace}
serviceKey := client.ObjectKey{Name: "evict-svc", Namespace: defaultNamespace}
serviceUrl := serviceKey.Name + "." + serviceKey.Namespace + serviceDnsNameSuffix
address := net.JoinHostPort(serviceUrl, defaultWrapperServerPort)
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if !assert.NoError(t, err) {
return
}
t.Cleanup(func() { _ = conn.Close() })

builder := fake.NewClientBuilder()
if len(tt.k8sObjects) > 0 {
builder = builder.WithObjects(tt.k8sObjects...)
}
kubeClient := builder.Build()
pcm, _, _, evictCh := newTestEventLoopPCM(kubeClient)

// Only set up cache entry if the eviction targets "test-image"
if tt.evictImage == "test-image" {
readyPod := makeReadyPodInfo("test-image", podKey, serviceKey, conn, 0)
pcm.functions["test-image"] = &functionInfo{
pods: []functionPodInfo{readyPod},
}
}

go pcm.podCacheManager(t.Context())

doneCh := make(chan struct{})
evictCh <- &podEvictionRequest{
image: tt.evictImage,
podKey: tt.evictPodKey,
doneCh: doneCh,
}

select {
case <-doneCh:
case <-time.After(5 * time.Second):
t.Fatal("eviction did not complete")
}

fn := pcm.functions["test-image"]
if tt.expectRemoved {
if fn != nil {
assert.Empty(t, fn.pods, "pod should be removed from cache")
}
} else {
if fn != nil {
assert.Len(t, fn.pods, 1, "pod should remain in cache")
}
}
})
}

// Pod should still be in cache (it's healthy, removeUnhealthyPods won't remove it)
fn := pcm.functions["test-image"]
assert.Len(t, fn.pods, 1, "healthy pod should remain in cache when podKey doesn't match")
}
12 changes: 4 additions & 8 deletions func/internal/podevaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,10 @@ func (pe *podEvaluator) EvaluateFunction(ctx context.Context, req *evaluator.Eva
}
}()

// First attempt: fail fast if pod is dead (no WaitForReady).
// Retries: use WaitForReady since eviction cleaned stale pods and
// the retry may get a newly-created pod that is still starting.
var callOpts []grpc.CallOption
if attempt > 0 {
callOpts = append(callOpts, grpc.WaitForReady(true))
}
resp, err := evaluator.NewFunctionEvaluatorClient(pod.grpcConnection).EvaluateFunction(ctx, req, callOpts...)
// Pod is guaranteed to have an active gRPC connection (verified
// during pod readiness via waitForGrpcReady). Unavailable means
// the pod died after being connected.
resp, err := evaluator.NewFunctionEvaluatorClient(pod.grpcConnection).EvaluateFunction(ctx, req)
Comment thread
kushnaidu marked this conversation as resolved.
if err != nil {
// Retry only on Unavailable — indicates the pod is dead/unreachable:
// connection refused (pod deleted), connection reset (pod crashed),
Expand Down
1 change: 1 addition & 0 deletions func/internal/podevaluator_podcachemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func TestPodCacheManager(t *testing.T) {
managerNamespace: defaultNamespace,
enablePrivateRegistries: false,
podReadyCh: podReadyCh,
skipGrpcReadyCheck: true,
}

for k, v := range defaultImageMetadataCache {
Expand Down
Loading
Loading