@@ -419,6 +419,82 @@ def test_not_adopt_unassigned_task(self, mock_kube_client):
419419 assert not mock_kube_client .patch_namespaced_pod .called
420420 assert pod_ids == {"foobar" : {}}
421421
422+ @mock .patch ('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task' )
423+ @mock .patch ('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods' )
424+ def test_try_adopt_task_instances (self , mock_adopt_completed_pods , mock_adopt_launched_task ):
425+ executor = self .kubernetes_executor
426+ executor .scheduler_job_id = "10"
427+ mock_ti = mock .MagicMock (queued_by_job_id = "1" , external_executor_id = "1" , dag_id = "dag" , task_id = "task" )
428+ pod = k8s .V1Pod (metadata = k8s .V1ObjectMeta (name = "foo" , labels = {"dag_id" : "dag" , "task_id" : "task" }))
429+ pod_id = create_pod_id (dag_id = "dag" , task_id = "task" )
430+ mock_kube_client = mock .MagicMock ()
431+ mock_kube_client .list_namespaced_pod .return_value .items = [pod ]
432+ executor .kube_client = mock_kube_client
433+
434+ # First adoption
435+ executor .try_adopt_task_instances ([mock_ti ])
436+ mock_kube_client .list_namespaced_pod .assert_called_once_with (
437+ namespace = 'default' , label_selector = 'airflow-worker=1'
438+ )
439+ mock_adopt_launched_task .assert_called_once_with (mock_kube_client , pod , {pod_id : mock_ti })
440+ mock_adopt_completed_pods .assert_called_once ()
441+ # We aren't checking the return value of `try_adopt_task_instances` because it relies on
442+ # `adopt_launched_task` mutating its arg. This should be refactored, but not right now.
443+
444+ # Second adoption (queued_by_job_id and external_executor_id no longer match)
445+ mock_kube_client .reset_mock ()
446+ mock_adopt_launched_task .reset_mock ()
447+ mock_adopt_completed_pods .reset_mock ()
448+
449+ mock_ti .queued_by_job_id = "10" # scheduler_job would have updated this after the first adoption
450+ executor .scheduler_job_id = "20"
451+
452+ executor .try_adopt_task_instances ([mock_ti ])
453+ mock_kube_client .list_namespaced_pod .assert_called_once_with (
454+ namespace = 'default' , label_selector = 'airflow-worker=10'
455+ )
456+ mock_adopt_launched_task .assert_called_once_with (mock_kube_client , pod , {pod_id : mock_ti })
457+ mock_adopt_completed_pods .assert_called_once ()
458+
459+ @mock .patch ('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods' )
460+ def test_try_adopt_task_instances_multiple_scheduler_ids (self , mock_adopt_completed_pods ):
461+ """We try to find pods only once per scheduler id"""
462+ executor = self .kubernetes_executor
463+ mock_kube_client = mock .MagicMock ()
464+ executor .kube_client = mock_kube_client
465+
466+ mock_tis = [
467+ mock .MagicMock (queued_by_job_id = "10" , external_executor_id = "1" , dag_id = "dag" , task_id = "task" ),
468+ mock .MagicMock (queued_by_job_id = "40" , external_executor_id = "1" , dag_id = "dag" , task_id = "task2" ),
469+ mock .MagicMock (queued_by_job_id = "40" , external_executor_id = "1" , dag_id = "dag" , task_id = "task3" ),
470+ ]
471+
472+ executor .try_adopt_task_instances (mock_tis )
473+ assert mock_kube_client .list_namespaced_pod .call_count == 2
474+ mock_kube_client .list_namespaced_pod .assert_has_calls (
475+ [
476+ mock .call (namespace = 'default' , label_selector = 'airflow-worker=10' ),
477+ mock .call (namespace = 'default' , label_selector = 'airflow-worker=40' ),
478+ ],
479+ any_order = True ,
480+ )
481+
482+ @mock .patch ('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task' )
483+ @mock .patch ('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods' )
484+ def test_try_adopt_task_instances_no_matching_pods (
485+ self , mock_adopt_completed_pods , mock_adopt_launched_task
486+ ):
487+ executor = self .kubernetes_executor
488+ mock_ti = mock .MagicMock (queued_by_job_id = "1" , external_executor_id = "1" , dag_id = "dag" , task_id = "task" )
489+ mock_kube_client = mock .MagicMock ()
490+ mock_kube_client .list_namespaced_pod .return_value .items = []
491+ executor .kube_client = mock_kube_client
492+
493+ tis_to_flush = executor .try_adopt_task_instances ([mock_ti ])
494+ assert tis_to_flush == [mock_ti ]
495+ mock_adopt_launched_task .assert_not_called ()
496+ mock_adopt_completed_pods .assert_called_once ()
497+
422498
423499class TestKubernetesJobWatcher (unittest .TestCase ):
424500 def setUp (self ):
0 commit comments