sink: optimize causality slot mapping#4903
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThe slot-selection mechanism in the causality package is optimized by preselecting appropriate mapping functions at creation time: bitmask operations for power-of-two slot counts and modulo arithmetic otherwise. Hash ordering logic is refactored, with new test and benchmark suites added to validate behavior and measure performance. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@wk989898: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Code Review
This pull request optimizes the causality slot mapping by introducing a specialized mapping function that uses bitmasking for power-of-two slot counts, improving performance on the hot path. It also includes comprehensive benchmarks and unit tests for the new mapping logic. However, a critical regression was found in the sortHashes function, which removed the deduplication logic previously present in sortAndDedupHashes. This omission could lead to self-dependency panics in the Node structure when duplicate hashes are processed.
| sort.Slice(hashes, func(i, j int) bool { | ||
| return hashes[i]%numSlots < hashes[j]%numSlots | ||
| return getSlot(hashes[i]) < getSlot(hashes[j]) | ||
| }) | ||
|
|
||
| // Dedup hashes | ||
| last := hashes[0] | ||
| j := 1 | ||
| for i, hash := range hashes { | ||
| if i == 0 { | ||
| // skip first one, start checking duplication from 2nd one | ||
| continue | ||
| } | ||
| if hash == last { | ||
| continue | ||
| } | ||
| last = hash | ||
| hashes[j] = hash | ||
| j++ | ||
| } | ||
| hashes = hashes[:j] | ||
|
|
||
| return hashes |
There was a problem hiding this comment.
The sortHashes function is missing the deduplication logic that was present in the original sortAndDedupHashes. If the input hashes contains duplicate values, the Add method will attempt to add the same node to the dependency graph multiple times, causing it to depend on itself and triggering a panic in Node.dependOn. Additionally, the sort should use the hash value as a secondary key to ensure identical hashes are adjacent for deduplication.
sort.Slice(hashes, func(i, j int) bool {
si, sj := getSlot(hashes[i]), getSlot(hashes[j])
if si != sj {
return si < sj
}
return hashes[i] < hashes[j]
})
if len(hashes) < 2 {
return hashes
}
j := 0
for i := 1; i < len(hashes); i++ {
if hashes[i] != hashes[j] {
j++
hashes[j] = hashes[i]
}
}
return hashes[:j+1]There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
downstreamadapter/sink/mysql/causality/slot.go (1)
170-180: Optional: precompute slot indices to avoid O(N log N) closure calls in the comparator.
sort.Slice's comparator invokesgetSlottwice per comparison. For the hot-path slice lengths seen in the benchmarks (up to ~1024), precomputing the slot index once per element and sorting indices (or pairs) typically gives a meaningful speedup and also removes the indirect call from the comparator. Not required — bench numbers inslot_benchmark_test.goshould tell whether it's worth doing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/mysql/causality/slot.go` around lines 170 - 180, The comparator for sort.Slice in sortHashes calls getSlot repeatedly; precompute each element's slot once to avoid O(N log N) closure calls by creating a temporary slice of structs or pairs (e.g., {hash uint64, slot int}) populated by calling getSlot(hash) for every hash, sort that temporary slice by the precomputed slot (using sort.Slice), then build and return a slice of hashes from the sorted pairs; update function sortHashes and reference getSlot only during the precompute step.downstreamadapter/sink/mysql/causality/slot_test.go (1)
22-49: Consider also covering the zero‑slot panic and the non‑power‑of‑two mask path.Expected values are correct, but the new
log.Panic("causality slot count must be positive")branch inNewSlotsand theisPowerOfTwofalse path (e.g.NewSlots(7)) are not exercised here. A smallrequire.Panics(...)test plus a non‑power‑of‑twogetSlotcase would lock in the selection logic going forward.🧪 Proposed additional test
func TestNewSlotIndexFunc(t *testing.T) { t.Parallel() powerOfTwoSlots := NewSlots(16) require.Equal(t, uint64(3), powerOfTwoSlots.getSlot(19)) require.Equal(t, uint64(15), powerOfTwoSlots.getSlot(31)) nonPowerOfTwoSlots := NewSlots(6) require.Equal(t, uint64(1), nonPowerOfTwoSlots.getSlot(19)) require.Equal(t, uint64(5), nonPowerOfTwoSlots.getSlot(11)) } + +func TestNewSlotsZeroPanics(t *testing.T) { + t.Parallel() + require.Panics(t, func() { NewSlots(0) }) +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/mysql/causality/slot_test.go` around lines 22 - 49, Add tests to cover the panic and non-power-of-two branching: add a require.Panics(...) that calls NewSlots(0) to exercise the log.Panic("causality slot count must be positive") branch, and add another case that constructs a Slots with a non-power-of-two value (e.g. NewSlots(7)) and asserts the expected getSlot result to exercise the isPowerOfTwo == false path (same target function: NewSlots and method getSlot; you can reuse existing patterns from TestNewSlotIndexFunc).downstreamadapter/sink/mysql/causality/slot_benchmark_test.go (1)
28-74: Indexing assumeslen(hashes)is a power of two.
hashes[i&(len(hashes)-1)]only wraps correctly becausemakeBenchmarkSlotHashes(4096)happens to return a power‑of‑two length. If the 4096 constant is ever tweaked (e.g. to 1000) the wrap math is silently wrong. Consider either asserting the length in the helper or usingi%len(hashes)— branch-predicted modulo on a constant-per-iteration divisor is fine in benchmark harness code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/mysql/causality/slot_benchmark_test.go` around lines 28 - 74, The benchmark currently indexes the hashes slice using bitmask wrapping (hashes[i&(len(hashes)-1)]) inside BenchmarkSlotIndex which only works when makeBenchmarkSlotHashes(...) returns a power-of-two length; update the benchmark to avoid the silent bug by either (A) adding an explicit runtime assertion in makeBenchmarkSlotHashes that len(hashes) is a power of two (so callers like BenchmarkSlotIndex get a clear failure), or (B) change all occurrences inside BenchmarkSlotIndex to use safe wrapping via i%len(hashes) (e.g., replace hashes[i&(len(hashes)-1)] with hashes[i%len(hashes)]); reference symbols: makeBenchmarkSlotHashes, BenchmarkSlotIndex, and the inner loops that currently use i&(len(hashes)-1).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/mysql/causality/slot_benchmark_test.go`:
- Around line 176-191: dedupSortedHashes currently dereferences hashes[0]
without checking length and uses a range loop with an i==0 continue; fix by
returning early for len(hashes) <= 1 to avoid panic, then iterate from index 1
(e.g., for i := 1; i < len(hashes); i++) comparing hashes[i] to last, compacting
into hashes[j] as before; keep the function signature and return hashes[:j]
unchanged so callers of dedupSortedHashes and any future copies won't panic on
empty input.
---
Nitpick comments:
In `@downstreamadapter/sink/mysql/causality/slot_benchmark_test.go`:
- Around line 28-74: The benchmark currently indexes the hashes slice using
bitmask wrapping (hashes[i&(len(hashes)-1)]) inside BenchmarkSlotIndex which
only works when makeBenchmarkSlotHashes(...) returns a power-of-two length;
update the benchmark to avoid the silent bug by either (A) adding an explicit
runtime assertion in makeBenchmarkSlotHashes that len(hashes) is a power of two
(so callers like BenchmarkSlotIndex get a clear failure), or (B) change all
occurrences inside BenchmarkSlotIndex to use safe wrapping via i%len(hashes)
(e.g., replace hashes[i&(len(hashes)-1)] with hashes[i%len(hashes)]); reference
symbols: makeBenchmarkSlotHashes, BenchmarkSlotIndex, and the inner loops that
currently use i&(len(hashes)-1).
In `@downstreamadapter/sink/mysql/causality/slot_test.go`:
- Around line 22-49: Add tests to cover the panic and non-power-of-two
branching: add a require.Panics(...) that calls NewSlots(0) to exercise the
log.Panic("causality slot count must be positive") branch, and add another case
that constructs a Slots with a non-power-of-two value (e.g. NewSlots(7)) and
asserts the expected getSlot result to exercise the isPowerOfTwo == false path
(same target function: NewSlots and method getSlot; you can reuse existing
patterns from TestNewSlotIndexFunc).
In `@downstreamadapter/sink/mysql/causality/slot.go`:
- Around line 170-180: The comparator for sort.Slice in sortHashes calls getSlot
repeatedly; precompute each element's slot once to avoid O(N log N) closure
calls by creating a temporary slice of structs or pairs (e.g., {hash uint64,
slot int}) populated by calling getSlot(hash) for every hash, sort that
temporary slice by the precomputed slot (using sort.Slice), then build and
return a slice of hashes from the sorted pairs; update function sortHashes and
reference getSlot only during the precompute step.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e5aead7b-ea05-4dcb-b179-ca6f670d99b5
📒 Files selected for processing (4)
downstreamadapter/sink/mysql/causality/slot.godownstreamadapter/sink/mysql/causality/slot_benchmark_test.godownstreamadapter/sink/mysql/causality/slot_test.godownstreamadapter/sink/mysql/sink.go
| func dedupSortedHashes(hashes []uint64) []uint64 { | ||
| last := hashes[0] | ||
| j := 1 | ||
| for i, hash := range hashes { | ||
| if i == 0 { | ||
| continue | ||
| } | ||
| if hash == last { | ||
| continue | ||
| } | ||
| last = hash | ||
| hashes[j] = hash | ||
| j++ | ||
| } | ||
| return hashes[:j] | ||
| } |
There was a problem hiding this comment.
dedupSortedHashes has a latent panic on empty input and an awkward loop.
hashes[0] is accessed before any length check; currently safe because both callers guard len == 0, but a copy of this helper elsewhere would panic. The for i, hash := range hashes { if i == 0 { continue } ... } shape is also harder to read than a plain index loop.
♻️ Proposed cleanup
func dedupSortedHashes(hashes []uint64) []uint64 {
- last := hashes[0]
- j := 1
- for i, hash := range hashes {
- if i == 0 {
- continue
- }
- if hash == last {
- continue
- }
- last = hash
- hashes[j] = hash
- j++
- }
- return hashes[:j]
+ if len(hashes) == 0 {
+ return hashes
+ }
+ j := 1
+ for i := 1; i < len(hashes); i++ {
+ if hashes[i] != hashes[j-1] {
+ hashes[j] = hashes[i]
+ j++
+ }
+ }
+ return hashes[:j]
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func dedupSortedHashes(hashes []uint64) []uint64 { | |
| last := hashes[0] | |
| j := 1 | |
| for i, hash := range hashes { | |
| if i == 0 { | |
| continue | |
| } | |
| if hash == last { | |
| continue | |
| } | |
| last = hash | |
| hashes[j] = hash | |
| j++ | |
| } | |
| return hashes[:j] | |
| } | |
| func dedupSortedHashes(hashes []uint64) []uint64 { | |
| if len(hashes) == 0 { | |
| return hashes | |
| } | |
| j := 1 | |
| for i := 1; i < len(hashes); i++ { | |
| if hashes[i] != hashes[j-1] { | |
| hashes[j] = hashes[i] | |
| j++ | |
| } | |
| } | |
| return hashes[:j] | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/sink/mysql/causality/slot_benchmark_test.go` around lines
176 - 191, dedupSortedHashes currently dereferences hashes[0] without checking
length and uses a range loop with an i==0 continue; fix by returning early for
len(hashes) <= 1 to avoid panic, then iterate from index 1 (e.g., for i := 1; i
< len(hashes); i++) comparing hashes[i] to last, compacting into hashes[j] as
before; keep the function signature and return hashes[:j] unchanged so callers
of dedupSortedHashes and any future copies won't panic on empty input.
What problem does this PR solve?
Issue Number: ref #4582
What is changed and how it works?
This pull request optimizes the causality slot mapping by introducing a specialized mapping function that uses bitmasking for power-of-two slot counts, improving performance on the hot path.
Check List
Tests
Unit test
Integration test
Manual test (add detailed scripts or steps below)
The selected power-of-two slot mapper reduces slot index cost from 8.39 ns/op to 3.46 ns/op, about 2.4x faster than variable modulo. In sortHashes, keeping dedup while only switching the mapper improves the benchmark by about 1.4x to 1.8x depending on hash count.
hashes_8 unique:
old_modulo_dedup 545.240 ns/op
selected_mapper_dedup 366.320 ns/op 1.49x faster
selected_mapper_no_dedup 340.500 ns/op 1.60x faster
hashes_8 with_duplicates:
old_modulo_dedup 489.960 ns/op
selected_mapper_dedup 345.460 ns/op 1.42x faster
selected_mapper_no_dedup 336.380 ns/op 1.46x faster
hashes_64 unique:
old_modulo_dedup 8108.800 ns/op
selected_mapper_dedup 4721.400 ns/op 1.72x faster
selected_mapper_no_dedup 4533.600 ns/op 1.79x faster
hashes_64 with_duplicates:
old_modulo_dedup 7490.400 ns/op
selected_mapper_dedup 4355.200 ns/op 1.72x faster
selected_mapper_no_dedup 4202.000 ns/op 1.78x faster
hashes_1024 unique:
old_modulo_dedup 266482.000 ns/op
selected_mapper_dedup 149083.600 ns/op 1.79x faster
selected_mapper_no_dedup 144516.600 ns/op 1.84x faster
hashes_1024 with_duplicates:
old_modulo_dedup 250754.600 ns/op
selected_mapper_dedup 138317.200 ns/op 1.81x faster
selected_mapper_no_dedup 136423.000 ns/op 1.84x faster
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Tests
Refactor
Documentation