What happened?
Hello,
I am trying to read 50 csv files with 3 columns from a Cloud Dataflow / Apache Beam v2.67.0 job. Previously, I was using ReadFromText, then I wrote a lambda function to split each text line into 3 fields. I tried switching to beam.io.textio.ReadFromCsv (to handle quoted fields for example), but the job randomly fails with the following error :
What I understand is :
ReadFromCsv is reading the data using the Pandas API, which results in 1 or multiple pandas Dataframes. Then it tries to convert each Dataframe into a Beam PCollection. In my case, it seems like an empty Dataframe can be generated sometimes, but the conversion process does not handle it well.
This code here : https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/opcounters.py#L221
mean_element_size = self.producer_batch_converter.estimate_byte_size(
windowed_batch.values) / batch_length
Seems to be the issue, as the batch_length is 0,
The files are not empty of course, they all have around a million lines each
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
What happened?
Hello,
I am trying to read 50 csv files with 3 columns from a Cloud Dataflow / Apache Beam v2.67.0 job. Previously, I was using ReadFromText, then I wrote a lambda function to split each text line into 3 fields. I tried switching to beam.io.textio.ReadFromCsv (to handle quoted fields for example), but the job randomly fails with the following error :
What I understand is :
ReadFromCsv is reading the data using the Pandas API, which results in 1 or multiple pandas Dataframes. Then it tries to convert each Dataframe into a Beam PCollection. In my case, it seems like an empty Dataframe can be generated sometimes, but the conversion process does not handle it well.
This code here : https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/opcounters.py#L221
Seems to be the issue, as the batch_length is 0,
The files are not empty of course, they all have around a million lines each
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components