Skip to content

Commit f65b5c8

Browse files
kevinjqliuHonahX
authored andcommitted
Fix race condition on Table.scan with limit (#545)
1 parent d174760 commit f65b5c8

File tree

1 file changed

+4
-14
lines changed

1 file changed

+4
-14
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -938,13 +938,9 @@ def _task_to_table(
938938
projected_field_ids: Set[int],
939939
positional_deletes: Optional[List[ChunkedArray]],
940940
case_sensitive: bool,
941-
row_counts: List[int],
942941
limit: Optional[int] = None,
943942
name_mapping: Optional[NameMapping] = None,
944943
) -> Optional[pa.Table]:
945-
if limit and sum(row_counts) >= limit:
946-
return None
947-
948944
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
949945
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
950946
with fs.open_input_file(path) as fin:
@@ -1007,11 +1003,6 @@ def _task_to_table(
10071003
if len(arrow_table) < 1:
10081004
return None
10091005

1010-
if limit is not None and sum(row_counts) >= limit:
1011-
return None
1012-
1013-
row_counts.append(len(arrow_table))
1014-
10151006
return to_requested_schema(projected_schema, file_project_schema, arrow_table)
10161007

10171008

@@ -1077,7 +1068,6 @@ def project_table(
10771068
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
10781069
}.union(extract_field_ids(bound_row_filter))
10791070

1080-
row_counts: List[int] = []
10811071
deletes_per_file = _read_all_delete_files(fs, tasks)
10821072
executor = ExecutorFactory.get_or_create()
10831073
futures = [
@@ -1090,21 +1080,21 @@ def project_table(
10901080
projected_field_ids,
10911081
deletes_per_file.get(task.file.file_path),
10921082
case_sensitive,
1093-
row_counts,
10941083
limit,
10951084
table.name_mapping(),
10961085
)
10971086
for task in tasks
10981087
]
1099-
1088+
total_row_count = 0
11001089
# for consistent ordering, we need to maintain future order
11011090
futures_index = {f: i for i, f in enumerate(futures)}
11021091
completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f])
11031092
for future in concurrent.futures.as_completed(futures):
11041093
completed_futures.add(future)
1105-
1094+
if table_result := future.result():
1095+
total_row_count += len(table_result)
11061096
# stop early if limit is satisfied
1107-
if limit is not None and sum(row_counts) >= limit:
1097+
if limit is not None and total_row_count >= limit:
11081098
break
11091099

11101100
# by now, we've either completed all tasks or satisfied the limit

0 commit comments

Comments
 (0)