perf: [EXPERIMENTAL] cache and broadcast serialized plans across partitions#3244
Closed
andygrove wants to merge 3 commits into
Closed
perf: [EXPERIMENTAL] cache and broadcast serialized plans across partitions#3244andygrove wants to merge 3 commits into
andygrove wants to merge 3 commits into
Conversation
Replace ByteArrayOutputStream with direct CodedOutputStream serialization to eliminate unnecessary allocations during query plan serialization. This optimization: - Pre-allocates exact buffer size using getSerializedSize() - Eliminates ByteArrayOutputStream's internal buffer resizing - Removes defensive array copying from toByteArray() - Applies to 5 hot paths called per-partition during query execution For a query with 1000 partitions, this eliminates 5000+ unnecessary allocations and array copies, significantly reducing GC pressure. Changes: - operators.scala: getCometIterator() and convertBlock() - CometNativeWriteExec.scala: serializedPlanOpt() and doExecute() - ParquetFilters.scala: createNativeFilters()
Serialize native query plans once and broadcast to all executors, avoiding repeated protobuf serialization for each partition. This optimization: - Adds serializePlan() method to serialize an Operator once - Adds getCometIterator() overload accepting pre-serialized bytes - Updates getNativeLimitRDD to broadcast the serialized plan - Updates CometTakeOrderedAndProjectExec to broadcast the topK plan For a query with 1000 partitions across 10 executors, this reduces plan serialization from 1000x to 1x, and plan transfer from 1000x to 10x (once per executor via broadcast). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
hsiang-c
reviewed
Jan 22, 2026
| CometExecUtils | ||
| .getTopKNativePlan(child.output, sortOrder, child, limit) | ||
| .get) | ||
| val broadcastTopK = sparkContext.broadcast(serializedTopK) |
Contributor
There was a problem hiding this comment.
Do we need to unpersist() it after use?
Member
Author
There was a problem hiding this comment.
I think we can rely on Spark to GC?
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3244 +/- ##
============================================
+ Coverage 56.12% 60.11% +3.98%
- Complexity 976 1429 +453
============================================
Files 119 170 +51
Lines 11743 15805 +4062
Branches 2251 2606 +355
============================================
+ Hits 6591 9501 +2910
- Misses 4012 4983 +971
- Partials 1140 1321 +181 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Member
Author
|
I'm not sure if this change is worth it, now that we only do the serde once per stage. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #1204
Summary
Changes
serializePlan()method to serialize anOperatoroncegetCometIterator()overload accepting pre-serialized bytesgetNativeLimitRDDto broadcast the serialized planCometTakeOrderedAndProjectExecto broadcast the topK planImpact
For a query with 1000 partitions across 10 executors:
Test plan
🤖 Generated with Claude Code