Skip to content

Commit bd47dc9

Browse files
authored
Use component coder when handling nullable coder in prism. (#34615)
* Invoke component coder in nullable coder. * Add a check to ensure the correct number of components in other coders. * Re-enable a previously failed java flatten test.
1 parent 61cf27f commit bd47dc9

2 files changed

Lines changed: 9 additions & 5 deletions

File tree

runners/prism/java/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,6 @@ def sickbayTests = [
110110
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',
111111

112112
// Java side dying during execution.
113-
// https://github.com/apache/beam/issues/32930
114-
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
115113
// Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74
116114
// Likely due to prism't coder changes.
117115
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',

sdks/go/pkg/beam/runners/prism/internal/coders.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i
250250
ioutilx.ReadN(r, int(l))
251251
}
252252
case urns.CoderNullable:
253+
ccids := c.GetComponentCoderIds()
254+
if len(ccids) != 1 {
255+
panic(fmt.Sprintf("Nullable coder must have only one component: %s", prototext.Format(c)))
256+
}
257+
ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
253258
return func(r io.Reader) {
254259
b, _ := ioutilx.ReadN(r, 1)
255260
if len(b) == 0 {
@@ -260,8 +265,7 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i
260265
if prefix == 0 {
261266
return
262267
}
263-
l, _ := coder.DecodeVarInt(r)
264-
ioutilx.ReadN(r, int(l))
268+
ed(r)
265269
}
266270
case urns.CoderVarInt:
267271
return func(r io.Reader) {
@@ -277,14 +281,16 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i
277281
}
278282
case urns.CoderIterable:
279283
ccids := c.GetComponentCoderIds()
284+
if len(ccids) != 1 {
285+
panic(fmt.Sprintf("Iterable coder must have only one component: %s", prototext.Format(c)))
286+
}
280287
ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
281288
return func(r io.Reader) {
282289
l, _ := coder.DecodeInt32(r)
283290
for i := int32(0); i < l; i++ {
284291
ed(r)
285292
}
286293
}
287-
288294
case urns.CoderKV:
289295
ccids := c.GetComponentCoderIds()
290296
if len(ccids) != 2 {

0 commit comments

Comments
 (0)