Skip to content

Commit 4726c2a

Browse files
committed
[beam-tools] #358 add KryoSerializer for StreamElement
1 parent b550bb6 commit 4726c2a

8 files changed

Lines changed: 307 additions & 46 deletions

File tree

beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamStream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,7 +1198,6 @@ Trigger getTrigger() {
11981198
return windowingStrategy.getTrigger();
11991199
}
12001200

1201-
@SuppressWarnings("unchecked")
12021201
private static void registerCoders(CoderRegistry registry, Repository repo) {
12031202
// FIXME: need to get rid of this fallback
12041203
KryoCoder<Object> coder =
@@ -1210,7 +1209,7 @@ private static void registerCoders(CoderRegistry registry, Repository repo) {
12101209
kryo.addDefaultSerializer(UnboundedSource.class, KryoSerializableSerializer.class),
12111210
kryo ->
12121211
kryo.addDefaultSerializer(BoundedSource.class, KryoSerializableSerializer.class),
1213-
BeamStream::addDefaultSerializersForGroovyTypes,
1212+
kryo -> addDefaultSerializersForGroovyTypes(kryo, repo),
12141213
kryo -> BeamStream.registerCodersForSchemes(kryo, repo),
12151214
kryo -> BeamStream.registerCommonTypes(kryo, repo),
12161215
kryo -> kryo.setRegistrationRequired(true));
@@ -1222,9 +1221,10 @@ private static void registerCoders(CoderRegistry registry, Repository repo) {
12221221
registry.registerCoderForClass(Pair.class, PairCoder.of(coder, coder));
12231222
}
12241223

1225-
private static void addDefaultSerializersForGroovyTypes(Kryo kryo) {
1224+
private static void addDefaultSerializersForGroovyTypes(Kryo kryo, Repository repo) {
12261225
kryo.addDefaultSerializer(Tuple.class, TupleSerializer.class);
12271226
kryo.addDefaultSerializer(GStringImpl.class, GStringSerializer.class);
1227+
kryo.addDefaultSerializer(StreamElement.class, new StreamElementSerializer(repo.asFactory()));
12281228
}
12291229

12301230
private static void registerCodersForSchemes(Kryo kryo, Repository repo) {

beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/StreamConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ static StreamConfig of(BeamDataOperator beam) {
4141
return new StreamConfig(((ConfigRepository) beam.getRepository()));
4242
}
4343

44+
static StreamConfig of(Repository repo) {
45+
return new StreamConfig((ConfigRepository) repo);
46+
}
47+
4448
@Getter private final int collectPort;
4549
@Getter private final String collectHostname;
4650
private final RepositoryFactory repositoryFactory;
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2017-2026 O2 Czech Republic, a.s.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package cz.o2.proxima.beam.tools.groovy;
17+
18+
import cz.o2.proxima.core.repository.AttributeDescriptor;
19+
import cz.o2.proxima.core.repository.EntityDescriptor;
20+
import cz.o2.proxima.core.repository.Repository;
21+
import cz.o2.proxima.core.repository.RepositoryFactory;
22+
import cz.o2.proxima.core.storage.StreamElement;
23+
import java.io.Serializable;
24+
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.Kryo;
25+
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.Serializer;
26+
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.io.Input;
27+
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.io.Output;
28+
29+
public class StreamElementSerializer extends Serializer<StreamElement> implements Serializable {
30+
31+
private final RepositoryFactory factory;
32+
private transient Repository repoInstance = null;
33+
34+
public StreamElementSerializer(RepositoryFactory factory) {
35+
this.factory = factory;
36+
}
37+
38+
@Override
39+
public void write(Kryo kryo, Output output, StreamElement element) {
40+
if (element.hasSequentialId()) {
41+
output.writeBoolean(true);
42+
output.writeVarLong(element.getSequentialId(), true);
43+
} else {
44+
output.writeBoolean(false);
45+
output.writeString(element.getUuid());
46+
}
47+
output.writeString(element.getEntityDescriptor().getName());
48+
output.writeString(element.getKey());
49+
output.writeString(element.getAttribute());
50+
output.writeVarLong(element.getStamp(), true);
51+
output.writeVarInt(element.isDelete() ? -1 : element.getValue().length, false);
52+
if (!element.isDelete()) {
53+
output.writeBytes(element.getValue());
54+
}
55+
}
56+
57+
@Override
58+
public StreamElement read(Kryo kryo, Input input, Class<? extends StreamElement> aClass) {
59+
final Repository repo = repo();
60+
final long seqId;
61+
final String uuid;
62+
if (input.readBoolean()) {
63+
seqId = input.readVarLong(true);
64+
uuid = null;
65+
} else {
66+
seqId = -1;
67+
uuid = input.readString();
68+
}
69+
final String entity = input.readString();
70+
final String key = input.readString();
71+
final String attribute = input.readString();
72+
final long stamp = input.readVarLong(true);
73+
final EntityDescriptor entityDesc = repo.getEntity(entity);
74+
final AttributeDescriptor<?> attrDesc = entityDesc.getAttribute(attribute);
75+
int len = input.readVarInt(false);
76+
if (len < 0) {
77+
if (seqId >= 0) {
78+
return StreamElement.delete(entityDesc, attrDesc, seqId, key, attribute, stamp);
79+
}
80+
return StreamElement.delete(entityDesc, attrDesc, uuid, key, attribute, stamp);
81+
}
82+
if (seqId >= 0) {
83+
return StreamElement.upsert(
84+
entityDesc, attrDesc, seqId, key, attribute, stamp, input.readBytes(len));
85+
}
86+
return StreamElement.upsert(
87+
entityDesc, attrDesc, uuid, key, attribute, stamp, input.readBytes(len));
88+
}
89+
90+
private Repository repo() {
91+
if (repoInstance == null) {
92+
repoInstance = factory.apply();
93+
}
94+
return repoInstance;
95+
}
96+
}

beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamStreamTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.concurrent.locks.LockSupport;
7373
import java.util.function.Function;
7474
import java.util.stream.Collectors;
75+
import javax.annotation.Nullable;
7576
import lombok.extern.slf4j.Slf4j;
7677
import org.apache.beam.runners.direct.DirectOptions;
7778
import org.apache.beam.runners.direct.DirectRunner;
@@ -119,7 +120,7 @@
119120
public class BeamStreamTest extends StreamTest {
120121

121122
private final transient Repository repo =
122-
Repository.ofTest(ConfigFactory.load("test-reference.conf"));
123+
Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
123124
transient BeamDataOperator op;
124125

125126
@Parameters
@@ -152,7 +153,7 @@ static TestStreamProvider provider(boolean stream, Class<? extends PipelineRunne
152153
return new TestStreamProvider() {
153154
@SuppressWarnings("unchecked")
154155
@Override
155-
public <T> Stream<T> of(List<T> values) {
156+
public <T> Stream<T> of(List<T> values, @Nullable Repository repo) {
156157
Set<Class<?>> classes = values.stream().map(Object::getClass).collect(Collectors.toSet());
157158

158159
Preconditions.checkArgument(
@@ -162,7 +163,7 @@ public <T> Stream<T> of(List<T> values) {
162163

163164
return injectTypeOf(
164165
new BeamStream<>(
165-
StreamConfig.empty(),
166+
repo == null ? StreamConfig.empty() : StreamConfig.of(repo),
166167
true,
167168
registeringTypes(
168169
PCollectionProvider.boundedOrUnbounded(
@@ -213,7 +214,7 @@ static <T> TestStream<T> asTestStream(List<T> values) {
213214

214215
static <T> BeamStream<T> injectTypeOf(BeamStream<T> delegate) {
215216
return new BeamStream<T>(
216-
StreamConfig.empty(),
217+
delegate.config,
217218
delegate.isBounded(),
218219
delegate.collection,
219220
WindowingStrategy.globalDefault(),
@@ -248,7 +249,7 @@ <X> BeamStream<X> descendant(Function<Pipeline, PCollection<X>> factory) {
248249

249250
static <T> BeamWindowedStream<T> injectTypeOf(BeamWindowedStream<T> delegate) {
250251
return new BeamWindowedStream<T>(
251-
StreamConfig.empty(),
252+
delegate.config,
252253
delegate.isBounded(),
253254
delegate.getCollection(),
254255
delegate.getWindowingStrategy(),
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2017-2026 O2 Czech Republic, a.s.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package cz.o2.proxima.beam.tools.groovy;
17+
18+
import static org.junit.Assert.assertEquals;
19+
20+
import cz.o2.proxima.core.repository.AttributeDescriptor;
21+
import cz.o2.proxima.core.repository.EntityDescriptor;
22+
import cz.o2.proxima.core.repository.Repository;
23+
import cz.o2.proxima.core.storage.StreamElement;
24+
import cz.o2.proxima.typesafe.config.ConfigFactory;
25+
import java.io.ByteArrayInputStream;
26+
import java.io.ByteArrayOutputStream;
27+
import java.io.IOException;
28+
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.io.Input;
29+
import org.apache.beam.repackaged.kryo.com.esotericsoftware.kryo.io.Output;
30+
import org.junit.Test;
31+
32+
public class StreamElementSerializerTest {
33+
34+
@Test
35+
public void testSerialization() throws IOException {
36+
Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
37+
StreamElementSerializer serializer = new StreamElementSerializer(repo.asFactory());
38+
EntityDescriptor event = repo.getEntity("event");
39+
EntityDescriptor gateway = repo.getEntity("gateway");
40+
AttributeDescriptor<byte[]> data = event.getAttribute("data");
41+
AttributeDescriptor<Object> rule = gateway.getAttribute("rule.*");
42+
43+
{
44+
StreamElement el =
45+
StreamElement.upsert(
46+
event,
47+
data,
48+
"uuid",
49+
"key",
50+
data.getName(),
51+
System.currentTimeMillis(),
52+
new byte[] {1, 2, 3});
53+
checkSerialization(serializer, el);
54+
}
55+
{
56+
StreamElement el =
57+
StreamElement.upsert(
58+
event,
59+
data,
60+
1,
61+
"key",
62+
data.getName(),
63+
System.currentTimeMillis(),
64+
new byte[] {1, 2, 3});
65+
checkSerialization(serializer, el);
66+
}
67+
{
68+
StreamElement el =
69+
StreamElement.delete(
70+
event, data, "uuid", "key", data.getName(), System.currentTimeMillis());
71+
checkSerialization(serializer, el);
72+
}
73+
{
74+
StreamElement el =
75+
StreamElement.delete(event, data, 1, "key", data.getName(), System.currentTimeMillis());
76+
checkSerialization(serializer, el);
77+
}
78+
{
79+
StreamElement el =
80+
StreamElement.upsert(
81+
gateway,
82+
rule,
83+
"uuid",
84+
"key",
85+
"rule.1",
86+
System.currentTimeMillis(),
87+
new byte[] {1, 2, 3});
88+
checkSerialization(serializer, el);
89+
}
90+
{
91+
StreamElement el =
92+
StreamElement.upsert(
93+
gateway, rule, 1, "key", "rule.1", System.currentTimeMillis(), new byte[] {1, 2, 3});
94+
checkSerialization(serializer, el);
95+
}
96+
{
97+
StreamElement el =
98+
StreamElement.delete(gateway, rule, "uuid", "key", "rule.1", System.currentTimeMillis());
99+
checkSerialization(serializer, el);
100+
}
101+
{
102+
StreamElement el =
103+
StreamElement.delete(gateway, rule, 1, "key", "rule.1", System.currentTimeMillis());
104+
checkSerialization(serializer, el);
105+
}
106+
}
107+
108+
private static void checkSerialization(StreamElementSerializer serializer, StreamElement el)
109+
throws IOException {
110+
byte[] serialized;
111+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
112+
Output output = new Output(baos)) {
113+
serializer.write(null, output, el);
114+
output.flush();
115+
serialized = baos.toByteArray();
116+
}
117+
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
118+
Input input = new Input(bais)) {
119+
assertEquals(serializer.read(null, input, null), el);
120+
}
121+
}
122+
}

tools/src/test/java/cz/o2/proxima/tools/groovy/AbstractStreamTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
*/
1616
package cz.o2.proxima.tools.groovy;
1717

18+
import cz.o2.proxima.core.repository.Repository;
1819
import java.io.Serializable;
1920
import java.util.Arrays;
21+
import java.util.List;
2022
import java.util.stream.Collectors;
23+
import javax.annotation.Nullable;
2124

2225
/** Base class for tests of all stream classes. */
2326
abstract class AbstractStreamTest implements Serializable {
@@ -32,6 +35,14 @@ protected AbstractStreamTest(TestStreamProvider provider) {
3235

3336
@SafeVarargs
3437
final <T> Stream<T> stream(T... items) {
35-
return provider.of(Arrays.stream(items).collect(Collectors.toList()));
38+
return stream(Arrays.stream(items).collect(Collectors.toList()));
39+
}
40+
41+
final <T> Stream<T> stream(List<T> items) {
42+
return stream(items, null);
43+
}
44+
45+
final <T> Stream<T> stream(List<T> items, @Nullable Repository repo) {
46+
return provider.of(items, repo);
3647
}
3748
}

0 commit comments

Comments
 (0)