Commit 82d24d7
committed
[Prism] Fix SDF splits and checkpoints in ElementManager.ReturnResiduals
Fixes a bug where splittable DoFN (SDF) splits and checkpoints would completely lose
their deferred residual restrictions, causing the pipeline to terminate prematurely
and downstream assertions to fail due to missing elements (e.g., test_register_finalizations).
Root Cause:
- The previous patch in v2 fixed a double-counting bug of livePending by removing the
decoding and rescheduling of split residuals (stage.AddPending and em.addPending)
from ReturnResiduals(), assuming they were already placed back by stage.splitBundle().
- This holds true for normal non-SDF channel splits, where stage.splitBundle already
puts the unprocessed original elements back.
- However, when a splittable DoFn (SDF) checkpoints itself, the active element splits on
its restriction rather than simple unprocessed channel elements. In this case, the original
remaining elements (res) in splitBundle() has a length of 0, but the SDK worker
returns a new restriction in the residuals.Data (e.g. unprocessedElements length 1).
- Because the previous patch completely removed rescheduling from ReturnResiduals, this new
residual restriction was completely lost and never added back to the pending queue.
Solution:
- Inside ReturnResiduals(), we dynamically calculate the original remaining elements in
the bundle: originalRemainingCount := len(completed.es) - firstRsIndex.
- We compare the total returned residuals (unprocessedElements) against originalRemainingCount.
- If len(unprocessedElements) > originalRemainingCount, the difference represents the new SDF
residual restrictions. We selectively add ONLY these new residuals back to the stage pending
heap and safely increment em.livePending by this difference.
- This elegantly preserves the fix for normal channel splits (preventing double-counting), while
ensuring SDF checkpoint residuals are correctly scheduled.
- Also includes detailed slog.Info logging during execution to track livePending state changes
accurately in addPending, ReturnResiduals, and splitBundle.1 parent 76e656e commit 82d24d7
1 file changed
Lines changed: 17 additions & 5 deletions
Lines changed: 17 additions & 5 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
240 | 240 | | |
241 | 241 | | |
242 | 242 | | |
| 243 | + | |
243 | 244 | | |
244 | 245 | | |
| 246 | + | |
245 | 247 | | |
246 | 248 | | |
247 | 249 | | |
| |||
1091 | 1093 | | |
1092 | 1094 | | |
1093 | 1095 | | |
1094 | | - | |
1095 | | - | |
1096 | 1096 | | |
1097 | 1097 | | |
1098 | 1098 | | |
| 1099 | + | |
| 1100 | + | |
| 1101 | + | |
| 1102 | + | |
| 1103 | + | |
| 1104 | + | |
| 1105 | + | |
1099 | 1106 | | |
1100 | 1107 | | |
1101 | | - | |
1102 | | - | |
| 1108 | + | |
| 1109 | + | |
| 1110 | + | |
| 1111 | + | |
| 1112 | + | |
1103 | 1113 | | |
| 1114 | + | |
1104 | 1115 | | |
1105 | 1116 | | |
1106 | 1117 | | |
| |||
2185 | 2196 | | |
2186 | 2197 | | |
2187 | 2198 | | |
2188 | | - | |
| 2199 | + | |
2189 | 2200 | | |
2190 | 2201 | | |
2191 | 2202 | | |
| |||
2205 | 2216 | | |
2206 | 2217 | | |
2207 | 2218 | | |
| 2219 | + | |
2208 | 2220 | | |
2209 | 2221 | | |
2210 | 2222 | | |
| |||
0 commit comments