Skip to content

Commit b24494f

Browse files
committed
Change TriggerState finished bitset coder to a SentinelBitSetCoder
SentinelBitSetCoder is same as BitSetCoder except that it encodes empty bitset as a single element 0 byte array. This allows checking if the finished bitset is empty or missing. SentinelBitSetCoder and BitSetCoder are state compatible. Both coders can decode encoded bytes from the other coder successfully.
1 parent b296b40 commit b24494f

File tree

4 files changed

+234
-2
lines changed

4 files changed

+234
-2
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@
6969
## New Features / Improvements
7070

7171
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
72+
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
73+
encode finished bitset [#38139](https://github.com/apache/beam/pull/38139).
74+
SentinelBitSetCoder and BitSetCoder are state compatible. Both coders can
75+
decode encoded bytes from the other coder.
7276

7377
## Breaking Changes
7478

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core.serialization;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
import java.util.BitSet;
24+
import org.apache.beam.sdk.coders.AtomicCoder;
25+
import org.apache.beam.sdk.coders.ByteArrayCoder;
26+
import org.apache.beam.sdk.coders.CoderException;
27+
28+
/**
29+
* Coder for {@link BitSet} that stores an empty bit set as a byte array with a single 0 element.
30+
*/
31+
public class SentinelBitSetCoder extends AtomicCoder<BitSet> {
32+
private static final SentinelBitSetCoder INSTANCE = new SentinelBitSetCoder();
33+
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
34+
35+
private SentinelBitSetCoder() {}
36+
37+
public static SentinelBitSetCoder of() {
38+
return INSTANCE;
39+
}
40+
41+
@Override
42+
public void encode(BitSet value, OutputStream outStream) throws CoderException, IOException {
43+
encode(value, outStream, Context.NESTED);
44+
}
45+
46+
@Override
47+
public void encode(BitSet value, OutputStream outStream, Context context)
48+
throws CoderException, IOException {
49+
if (value == null) {
50+
throw new CoderException("cannot encode a null BitSet");
51+
}
52+
byte[] bytes = value.isEmpty() ? new byte[] {0} : value.toByteArray();
53+
BYTE_ARRAY_CODER.encodeAndOwn(bytes, outStream, context);
54+
}
55+
56+
@Override
57+
public BitSet decode(InputStream inStream) throws CoderException, IOException {
58+
return decode(inStream, Context.NESTED);
59+
}
60+
61+
@Override
62+
public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException {
63+
return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
64+
}
65+
66+
@Override
67+
public void verifyDeterministic() throws NonDeterministicException {
68+
verifyDeterministic(
69+
this,
70+
"SentinelBitSetCoder requires its ByteArrayCoder to be deterministic.",
71+
BYTE_ARRAY_CODER);
72+
}
73+
74+
@Override
75+
public boolean consistentWithEquals() {
76+
return true;
77+
}
78+
}

runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.beam.runners.core.StateAccessor;
2727
import org.apache.beam.runners.core.StateTag;
2828
import org.apache.beam.runners.core.StateTags;
29-
import org.apache.beam.sdk.coders.BitSetCoder;
29+
import org.apache.beam.runners.core.serialization.SentinelBitSetCoder;
3030
import org.apache.beam.sdk.state.Timers;
3131
import org.apache.beam.sdk.state.ValueState;
3232
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -59,7 +59,7 @@
5959
public class TriggerStateMachineRunner<W extends BoundedWindow> {
6060
@VisibleForTesting
6161
public static final StateTag<ValueState<BitSet>> FINISHED_BITS_TAG =
62-
StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
62+
StateTags.makeSystemTagInternal(StateTags.value("closed", SentinelBitSetCoder.of()));
6363

6464
private final ExecutableTriggerStateMachine rootTrigger;
6565
private final TriggerStateMachineContextFactory<W> contextFactory;
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core.serialization;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.equalTo;
22+
23+
import java.util.Arrays;
24+
import java.util.BitSet;
25+
import java.util.List;
26+
import org.apache.beam.sdk.coders.BitSetCoder;
27+
import org.apache.beam.sdk.coders.Coder;
28+
import org.apache.beam.sdk.coders.Coder.Context;
29+
import org.apache.beam.sdk.coders.CoderException;
30+
import org.apache.beam.sdk.testing.CoderProperties;
31+
import org.apache.beam.sdk.util.CoderUtils;
32+
import org.apache.beam.sdk.values.TypeDescriptor;
33+
import org.junit.Rule;
34+
import org.junit.Test;
35+
import org.junit.rules.ExpectedException;
36+
import org.junit.runner.RunWith;
37+
import org.junit.runners.JUnit4;
38+
39+
/** Tests for {@link SentinelBitSetCoder}. */
40+
@RunWith(JUnit4.class)
41+
public class SentinelBitSetCoderTest {
42+
43+
private static final Coder<BitSet> TEST_CODER = SentinelBitSetCoder.of();
44+
45+
private static final List<BitSet> TEST_VALUES =
46+
Arrays.asList(
47+
BitSet.valueOf(new byte[] {0xa, 0xb, 0xc}),
48+
BitSet.valueOf(new byte[] {0xd, 0x3}),
49+
BitSet.valueOf(new byte[] {0xd, 0xe}),
50+
BitSet.valueOf(new byte[] {0}),
51+
BitSet.valueOf(new byte[] {}));
52+
53+
@Test
54+
public void testDecodeEncodeEquals() throws Exception {
55+
for (BitSet value : TEST_VALUES) {
56+
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
57+
}
58+
}
59+
60+
@Test
61+
public void testRegisterByteSizeObserver() throws Exception {
62+
CoderProperties.testByteCount(
63+
SentinelBitSetCoder.of(), Coder.Context.OUTER, TEST_VALUES.toArray(new BitSet[] {}));
64+
65+
CoderProperties.testByteCount(
66+
SentinelBitSetCoder.of(), Coder.Context.NESTED, TEST_VALUES.toArray(new BitSet[] {}));
67+
}
68+
69+
@Test
70+
public void testStructuralValueConsistentWithEquals() throws Exception {
71+
for (BitSet value1 : TEST_VALUES) {
72+
for (BitSet value2 : TEST_VALUES) {
73+
CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
74+
}
75+
}
76+
}
77+
78+
/**
79+
* Generated data to check that the wire format has not changed. "CgsM" is {0xa, 0xb, 0xc} "DQM"
80+
* is {0xd, 0x3} "DQ4" is {0xd, 0xe} "AA==" is {0} (Sentinel for empty BitSet)
81+
*/
82+
private static final List<String> TEST_ENCODINGS =
83+
Arrays.asList("CgsM", "DQM", "DQ4", "AA", "AA");
84+
85+
@Test
86+
public void testWireFormatEncode() throws Exception {
87+
CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
88+
}
89+
90+
@Rule public ExpectedException thrown = ExpectedException.none();
91+
92+
@Test
93+
public void encodeNullThrowsCoderException() throws Exception {
94+
thrown.expect(CoderException.class);
95+
thrown.expectMessage("cannot encode a null BitSet");
96+
97+
CoderUtils.encodeToBase64(TEST_CODER, null);
98+
}
99+
100+
@Test
101+
public void testEncodedTypeDescriptor() throws Exception {
102+
assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(BitSet.class)));
103+
}
104+
105+
@Test
106+
public void testEmptyBitSetEncoding() throws Exception {
107+
{
108+
byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, new BitSet());
109+
// ByteArrayCoder in OUTER context encodes as is.
110+
assertThat(encoded, equalTo(new byte[] {0}));
111+
}
112+
{
113+
byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, new BitSet(), Context.NESTED);
114+
// Varint length = 1, data = 1
115+
assertThat(encoded, equalTo(new byte[] {1, 0}));
116+
}
117+
}
118+
119+
@Test
120+
public void testCompatibilityWithBitSetCoder() throws Exception {
121+
BitSetCoder bitSetCoder = BitSetCoder.of();
122+
SentinelBitSetCoder sentinelCoder = SentinelBitSetCoder.of();
123+
124+
for (BitSet bitset : TEST_VALUES) {
125+
for (Coder.Context context : Arrays.asList(Coder.Context.OUTER, Coder.Context.NESTED)) {
126+
// Test SentinelBitSetCoder can decode bytes encoded by BitSetCoder
127+
{
128+
byte[] encodedByBitSet = CoderUtils.encodeToByteArray(bitSetCoder, bitset, context);
129+
BitSet decodedBySentinel =
130+
CoderUtils.decodeFromByteArray(sentinelCoder, encodedByBitSet, context);
131+
assertThat(
132+
"Decoding BitSetCoder encoded value with context " + context,
133+
decodedBySentinel,
134+
equalTo(bitset));
135+
}
136+
137+
// Test BitSetCoder can decode bytes encoded by SentinelBitSetCoder
138+
{
139+
byte[] encodedBySentinel = CoderUtils.encodeToByteArray(sentinelCoder, bitset, context);
140+
BitSet decodedByBitSet =
141+
CoderUtils.decodeFromByteArray(bitSetCoder, encodedBySentinel, context);
142+
assertThat(
143+
"Decoding SentinelBitSetCoder encoded value with context " + context,
144+
decodedByBitSet,
145+
equalTo(bitset));
146+
}
147+
}
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)