Skip to content

Having NoneType error in multiple pipeline build using TFX and Airflow #1

@Engineer1999

Description

@Engineer1999

I am working with TFX and AIrflow to build the image classification and segmentation pipeline. I am replicating the repo by Jan Marcel Kezmann (Link). While executing the classification_dag.py in dags/classification_pipeline and segmentation_dag.py in dags/segmentation_pipeline, I am getting a runtime error. The error message is mentioned below.

[2023-01-23, 10:55:37 UTC] {taskinstance.py:1889} ERROR - Task failed with exception Traceback (most recent call last): File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute return_value = self.execute_callable() File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_component.py", line 76, in _airflow_component_launcher launcher.launch() File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch copy.deepcopy(execution_decision.exec_properties)) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 74, in _run_executor copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties)) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 586, in Do TransformProcessor().Transform(label_inputs, label_outputs, status_file) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 1145, in Transform make_beam_pipeline_fn) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/tfx/components/transform/executor.py", line 1526, in _RunBeamImpl output_path=dataset.materialize_output_path)) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/pipeline.py", line 597, in __exit__ self.result = self.run() File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/pipeline.py", line 574, in run return self.runner.run_pipeline(self, self._options) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline return runner.run_pipeline(pipeline, options) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 201, in run_pipeline options) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api return self.run_stages(stage_context, stages) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 443, in run_stages runner_execution_context, bundle_context_manager, bundle_input) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 776, in _execute_bundle bundle_manager)) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1000, in _run_bundle data_input, data_output, input_timers, expected_timer_output) File "/home/bhargavpatel/anaconda3/envs/ic_pipeline/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1393, in process_bundle for ix, part in enumerate(input.partition(self._num_workers)): AttributeError: 'NoneType' object has no attribute 'partition' [2023-01-23, 10:55:37 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=segmentation_dag, task_id=Transform, execution_date=20230123T052420, start_date=20230123T052509, end_date=20230123T052537 [2023-01-23, 10:55:37 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 43 for task Transform ('NoneType' object has no attribute 'partition'; 42847) [2023-01-23, 10:55:37 UTC] {local_task_job.py:156} INFO - Task exited with return code 1 [2023-01-23, 10:55:37 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

Airflow Dag is as below Segmentation Dag

I tried to check multiple versions of Airflow and TFX. I believe this error is due to incompatible versions as the error is coming from the executor of the transform component and airflow. Before the dag execution, I compiled the individual module and have not found any errors in them.

Versions TFX:- 1.12.0 Tensorflow:- 2.11.0 Airflow:- 2.3.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions