Skip to content

Improve hash partition for columns contains Struct/list/map#63152

Open
laysfire wants to merge 4 commits into
ray-project:masterfrom
laysfire:improve_hash_partition_performance
Open

Improve hash partition for columns contains Struct/list/map#63152
laysfire wants to merge 4 commits into
ray-project:masterfrom
laysfire:improve_hash_partition_performance

Conversation

@laysfire
Copy link
Copy Markdown
Contributor

@laysfire laysfire commented May 6, 2026

Description

This pr is to improve hash partition performance when table contains pandas can't handle types by moving retrieve table columns operations out of loop.
Use the following script to verify:

import time
import pyarrow as pa
from ray.data._internal.arrow_ops.transform_pyarrow import hash_partition

idx = list(range(50000000))
ints = [[i]for i in range(50000000)]
t = pa.Table.from_pydict(
   {
       "idx": pa.array(idx),
       "ints": pa.array(ints),
   }
)

start = time.time()
hash_partition(t, hash_cols=["idx", "ints"], num_partitions=10)
end = time.time()
print(end - start)

The test result is:

CPU spec Code Version Time consumed
Apple M4 original 66s
Apple M4 optimized 18s

Related issues

Link related issues: "Closes #62550", Cant' reopen after force push, recreate new pr.

Signed-off-by: yifan.xie <xyfabcd@163.com>
@laysfire laysfire requested a review from a team as a code owner May 6, 2026 03:45
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request optimizes the row-by-row hashing logic in _hash_partition by replacing manual column indexing with a more efficient enumerate(zip(*table.columns)) approach. This change improves code readability and potentially performance when processing unhashable PyArrow columns. I have no feedback to provide as there are no review comments.

@laysfire
Copy link
Copy Markdown
Contributor Author

laysfire commented May 6, 2026

@owenowenisme hi, could you help review?

@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels May 6, 2026
@owenowenisme owenowenisme self-assigned this May 7, 2026
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great improvement! Could you add comments explaining this improvement in the code?

@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label May 16, 2026
Signed-off-by: yifan.xie <xyfabcd@163.com>
goutamvenkat-anyscale pushed a commit that referenced this pull request Jun 1, 2026
## Description
Optimize `_convert_arrow_table_to_examples` in `tfrecords_datasink.py`
by hoisting per-column lookups and schema validation out of the row
loop, and iterating columns in lockstep with `zip(*columns)`.

The old code called `arrow_table[name][i]` per row × column, paying
Python-level `__getitem__` dispatch on every element. The new code uses
`ChunkedArray.__iter__` (a C-level loop) for row-by-row scalars, and
moves the `name in schema_dict` validation to a one-shot up-front check.

Same optimization pattern as #63152.

## Benchmark

End-to-end TFRecord write (convert → SerializeToString → CRC32C →
write-to-buffer), 5 runs each, best time reported. macOS, Apple Silicon,
pyarrow 16.

| Case | Before | After | Speedup |
|---|---|---|---|
| Mixed scalars (200k × 3) | 4.55s | 3.71s | **1.22x** |
| Wide table (50k × 20) | 5.55s | 4.34s | **1.28x** |
| List-of-int (200k × 2) | 3.39s | 3.00s | **1.13x** |
| Long strings (50k × 2) | 0.82s | 0.73s | **1.12x** |
| With tf_schema (200k × 2) | 4.79s | 4.16s | **1.15x** |

**12–28% faster end-to-end**, with wider tables seeing the largest wins
(hoisting `arrow_table[name]` per column matters more when there are
more columns).

## Correctness

Output is byte-identical to the previous implementation. Verified by
comparing `SerializeToString()` bytes across scalar columns, list-of-int
columns, multi-chunk inputs, empty tables, null-containing columns, and
`tf_schema` validation. The existing tests in
`python/ray/data/tests/datasource/test_tfrecords.py` exercise these
cases and should continue to pass unchanged.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 2, 2026

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions Bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests stale The issue is stale. It will be closed within 7 days unless there are further conversation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants