Skip to content

Commit 42916df

Browse files
committed
Fix KubernetesExecutor scheduler crash when pod_override is queued in-cluster
When the scheduler runs in-cluster, kubernetes-python-client v36 changed model constructors to use Configuration.get_default_copy() instead of Configuration() (kubernetes-client/python#2532, OpenAPI Generator v6.6.0). So every V1Pod built after load_incluster_config() captures the global in-cluster Configuration, whose refresh_api_key_hook is a local closure (InClusterConfigLoader._set_config.<locals>._refresh_api_key). pickle cannot serialize a local closure, so putting a task's pod_override V1Pod on the executor's multiprocessing queue raises PicklingError and crashes the scheduler in a loop. This affects any in-cluster KubernetesExecutor deployment where a task sets a V1Pod pod_override, independent of the Airflow version; pinning the client below 36 is not viable because 35.x has a separate no_proxy regression. Reset local_vars_configuration to a fresh Configuration() on the pod_override (recursively) before queuing -- exactly what v35 model constructors produced. It carries no in-cluster auth hook so the pod is picklable, while keeping client_side_validation so the worker-side reconcile_pods setters still work. (Setting it to None instead breaks reconcile: model setters dereference self.local_vars_configuration.client_side_validation.) The pod keeps its V1Pod type through the queue, so run_next is unchanged.
1 parent 11f6228 commit 42916df

2 files changed

Lines changed: 167 additions & 0 deletions

File tree

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from typing import TYPE_CHECKING, Any
3737

3838
from deprecated import deprecated
39+
from kubernetes.client import Configuration
3940
from kubernetes.dynamic import DynamicClient
4041
from sqlalchemy import select
4142

@@ -73,6 +74,37 @@
7374
)
7475

7576

77+
def _reset_local_vars_configuration(obj: Any) -> None:
78+
"""
79+
Reset ``local_vars_configuration`` to a fresh ``Configuration`` on every kubernetes model object.
80+
81+
kubernetes-python-client v36 changed model constructors to call
82+
``Configuration.get_default_copy()`` instead of ``Configuration()``, so every model object
83+
created after ``load_incluster_config()`` runs captures the global ``Configuration``, which holds
84+
an unpicklable ``refresh_api_key_hook`` closure -- pickling such a pod onto the multiprocessing
85+
queue raises ``PicklingError`` and crashes the scheduler.
86+
87+
Replace it with a fresh ``Configuration()`` (exactly what v35 model constructors used): it carries
88+
no in-cluster auth hook so it is picklable, while still providing ``client_side_validation`` so the
89+
worker-side ``PodGenerator.reconcile_pods`` setters keep working. Setting it to ``None`` instead
90+
would break reconcile -- model setters dereference
91+
``self.local_vars_configuration.client_side_validation``. The field is only API-client state used
92+
within the same process; workers reinitialize their own kube config, so nothing is lost. No-op on v35.
93+
"""
94+
if obj is None or isinstance(obj, (str, int, float, bool)):
95+
return
96+
if hasattr(obj, "openapi_types"):
97+
obj.local_vars_configuration = Configuration()
98+
for attr in obj.openapi_types:
99+
_reset_local_vars_configuration(getattr(obj, attr, None))
100+
elif isinstance(obj, dict):
101+
for v in obj.values():
102+
_reset_local_vars_configuration(v)
103+
elif isinstance(obj, (list, tuple)):
104+
for item in obj:
105+
_reset_local_vars_configuration(item)
106+
107+
76108
class KubernetesExecutor(BaseExecutor):
77109
"""Executor for Kubernetes."""
78110

@@ -226,6 +258,15 @@ def execute_async(
226258
pod_template_file = executor_config.get("pod_template_file", None)
227259
else:
228260
pod_template_file = None
261+
262+
# Reset ``local_vars_configuration`` on the ``pod_override`` to a fresh ``Configuration()`` before
263+
# putting it on the multiprocessing queue. kubernetes-python-client v36 makes model constructors
264+
# capture the global in-cluster ``Configuration`` (with an unpicklable ``refresh_api_key_hook``
265+
# closure), so queuing the pod raises ``PicklingError`` and crashes the scheduler.
266+
# ``kube_executor_config`` is the same ``V1Pod`` object referenced by the workload's
267+
# ``executor_config``, so this also makes the workload picklable. No-op on v35.
268+
_reset_local_vars_configuration(kube_executor_config)
269+
229270
self.event_buffer[key] = (TaskInstanceState.QUEUED, self.scheduler_job_id)
230271
self.task_queue.put(KubernetesJob(key, command, kube_executor_config, pod_template_file))
231272
# We keep a temporary local record that we've handled this so we don't

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,132 @@ def test_pod_template_file_override_in_executor_config(
915915
finally:
916916
executor.end()
917917

918+
@staticmethod
919+
def _install_unpicklable_incluster_hook(pod):
920+
"""Simulate the in-cluster kubernetes v36 default config: an unpicklable local-closure hook
921+
on every nested model object's ``Configuration`` (``InClusterConfigLoader._set_config.<locals>.
922+
_refresh_api_key``)."""
923+
from kubernetes.client import Configuration
924+
925+
def _make_hook():
926+
def _refresh_api_key(config):
927+
return None
928+
929+
return _refresh_api_key
930+
931+
for obj in (
932+
pod,
933+
pod.metadata,
934+
pod.spec,
935+
pod.spec.containers[0],
936+
pod.spec.containers[0].resources,
937+
):
938+
cfg = Configuration()
939+
cfg.refresh_api_key_hook = _make_hook()
940+
obj.local_vars_configuration = cfg
941+
942+
def test_reset_local_vars_configuration_strips_hook_recursively(self):
943+
"""``_reset_local_vars_configuration`` must replace every nested config with a hook-free one."""
944+
import pickle
945+
946+
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import (
947+
_reset_local_vars_configuration,
948+
)
949+
950+
# Build a pod with the unpicklable in-cluster hook on every nested config; confirm it is
951+
# unpicklable first, then that the reset makes it picklable.
952+
pod = k8s.V1Pod(
953+
metadata=k8s.V1ObjectMeta(),
954+
spec=k8s.V1PodSpec(
955+
containers=[k8s.V1Container(name="base", resources=k8s.V1ResourceRequirements())]
956+
),
957+
)
958+
self._install_unpicklable_incluster_hook(pod)
959+
with pytest.raises((pickle.PicklingError, AttributeError, TypeError)):
960+
pickle.dumps(pod)
961+
962+
_reset_local_vars_configuration(pod)
963+
964+
for obj in (pod, pod.metadata, pod.spec, pod.spec.containers[0], pod.spec.containers[0].resources):
965+
assert obj.local_vars_configuration.refresh_api_key_hook is None
966+
assert obj.local_vars_configuration.client_side_validation is True
967+
# now picklable
968+
pickle.dumps(pod)
969+
970+
@pytest.mark.execution_timeout(30)
971+
@pytest.mark.skipif(
972+
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
973+
)
974+
@mock.patch(
975+
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.run_pod_async"
976+
)
977+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
978+
def test_execute_async_pod_override_picklable_and_reconcilable(
979+
self, mock_get_kube_client, mock_run_pod_async, data_file
980+
):
981+
"""Regression: a ``pod_override`` carrying an in-cluster ``Configuration`` must be both picklable
982+
and reconcilable.
983+
984+
kubernetes-python-client v36 model constructors capture the global in-cluster ``Configuration``,
985+
whose ``refresh_api_key_hook`` is a local closure -> pickling the ``KubernetesJob`` onto the
986+
multiprocessing queue crashes the scheduler. ``execute_async`` resets ``local_vars_configuration``
987+
to a fresh ``Configuration()`` so the job pickles, while keeping ``client_side_validation`` so the
988+
worker-side ``run_next``/``reconcile_pods`` still works (nulling it instead would raise
989+
``'NoneType' object has no attribute 'client_side_validation'``).
990+
"""
991+
import pickle
992+
993+
pod_override = k8s.V1Pod(
994+
metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
995+
spec=k8s.V1PodSpec(
996+
containers=[
997+
k8s.V1Container(
998+
name="base",
999+
image="airflow:3.6",
1000+
resources=k8s.V1ResourceRequirements(requests={"cpu": "100m", "memory": "384Mi"}),
1001+
)
1002+
]
1003+
),
1004+
)
1005+
self._install_unpicklable_incluster_hook(pod_override)
1006+
1007+
# Sanity: the raw pod is unpicklable (reproduces the scheduler crash pre-fix).
1008+
with pytest.raises((pickle.PicklingError, AttributeError, TypeError)):
1009+
pickle.dumps(pod_override)
1010+
1011+
executor_template_file = data_file("executor/basic_template.yaml")
1012+
mock_get_kube_client.return_value = mock.patch("kubernetes.client.CoreV1Api", autospec=True)
1013+
1014+
with conf_vars({("kubernetes_executor", "pod_template_file"): None}):
1015+
executor = self.kubernetes_executor
1016+
executor.start()
1017+
try:
1018+
executor.execute_async(
1019+
key=TaskInstanceKey("dag", "task", "run_id", 1),
1020+
queue=None,
1021+
command=["airflow", "tasks", "run", "true", "some_parameter"],
1022+
executor_config={
1023+
"pod_template_file": executor_template_file,
1024+
"pod_override": pod_override,
1025+
},
1026+
)
1027+
1028+
assert not executor.task_queue.empty()
1029+
job = executor.task_queue.get_nowait()
1030+
executor.task_queue.task_done()
1031+
1032+
# 1) The queued job pickles cleanly (this is the actual scheduler crash path).
1033+
pickle.dumps(job)
1034+
# 2) Contract preserved: kube_executor_config stays a V1Pod.
1035+
assert isinstance(job.kube_executor_config, k8s.V1Pod)
1036+
# 3) Worker-side reconcile/construct_pod succeeds (the null-config approach fails here).
1037+
self.kubernetes_executor.kube_scheduler.run_next(job)
1038+
mock_run_pod_async.assert_called_once()
1039+
built = mock_run_pod_async.call_args[0][0]
1040+
assert built.spec.containers[0].resources.requests == {"cpu": "100m", "memory": "384Mi"}
1041+
finally:
1042+
executor.end()
1043+
9181044
@pytest.mark.db_test
9191045
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
9201046
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")

0 commit comments

Comments
 (0)