|
26 | 26 | import org.apache.flink.api.common.serialization.DeserializationSchema; |
27 | 27 | import org.apache.flink.api.common.typeinfo.TypeInformation; |
28 | 28 | import org.apache.flink.api.java.typeutils.RowTypeInfo; |
29 | | -import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; |
30 | 29 | import org.apache.flink.types.Row; |
31 | 30 | import org.apache.flink.util.Collector; |
32 | | -import org.apache.kafka.clients.consumer.ConsumerRecord; |
33 | 31 |
|
| 32 | +import java.io.IOException; |
34 | 33 | import java.util.List; |
35 | 34 |
|
36 | 35 | @Internal |
37 | | -public class PbKafkaDeserializationSchema implements KafkaDeserializationSchema<Row> { |
| 36 | +public class PbDeserializationSchema implements DeserializationSchema<Row> { |
38 | 37 | private static final long serialVersionUID = -2556547991095476394L; |
39 | 38 | private final PbBytesParser parser; |
40 | 39 | private final RowTypeInfo rowTypeInfo; |
41 | 40 | private final int arity; |
42 | 41 |
|
43 | | - public PbKafkaDeserializationSchema(BitSailConfiguration jobConf) throws Exception { |
| 42 | + public PbDeserializationSchema(BitSailConfiguration jobConf) throws Exception { |
44 | 43 | this.parser = new PbBytesParser(jobConf); |
45 | 44 |
|
46 | 45 | List<Descriptors.FieldDescriptor> fields = parser.getDescriptor().getFields(); |
@@ -77,23 +76,21 @@ private PrimitiveColumnTypeInfo<?> getColumnTypeInfo(Descriptors.FieldDescriptor |
77 | 76 |
|
78 | 77 | @Override |
79 | 78 | public void open(DeserializationSchema.InitializationContext context) throws Exception { |
80 | | - KafkaDeserializationSchema.super.open(context); |
81 | 79 | } |
82 | 80 |
|
83 | 81 | @Override |
84 | | - public boolean isEndOfStream(Row row) { |
85 | | - return false; |
| 82 | + public Row deserialize(byte[] value) throws IOException { |
| 83 | + return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo); |
86 | 84 | } |
87 | 85 |
|
88 | 86 | @Override |
89 | | - public Row deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { |
90 | | - byte[] value = record.value(); |
91 | | - return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo); |
| 87 | + public void deserialize(byte[] value, Collector<Row> out) throws IOException { |
| 88 | + out.collect(deserialize(value)); |
92 | 89 | } |
93 | 90 |
|
94 | 91 | @Override |
95 | | - public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<Row> out) throws Exception { |
96 | | - out.collect(deserialize(message)); |
| 92 | + public boolean isEndOfStream(Row row) { |
| 93 | + return false; |
97 | 94 | } |
98 | 95 |
|
99 | 96 | @Override |
|
0 commit comments