Skip to content

Commit 8fca50d

Browse files
committed
Fix test
1 parent 8c8142c commit 8fca50d

11 files changed

Lines changed: 445 additions & 12 deletions

File tree

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink;
20+
21+
import org.apache.paimon.data.BinaryString;
22+
import org.apache.paimon.data.Decimal;
23+
import org.apache.paimon.data.InternalArray;
24+
import org.apache.paimon.data.InternalMap;
25+
import org.apache.paimon.data.InternalRow;
26+
import org.apache.paimon.data.Timestamp;
27+
28+
import org.apache.flink.table.data.ArrayData;
29+
import org.apache.flink.table.data.DecimalData;
30+
import org.apache.flink.table.data.MapData;
31+
import org.apache.flink.table.data.RawValueData;
32+
import org.apache.flink.table.data.RowData;
33+
import org.apache.flink.table.data.StringData;
34+
import org.apache.flink.table.data.TimestampData;
35+
import org.apache.flink.types.RowKind;
36+
import org.apache.flink.types.variant.BinaryVariant;
37+
import org.apache.flink.types.variant.Variant;
38+
39+
import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
40+
41+
/** Convert to Flink row data with blob. */
42+
public class FlinkRowDataWithBlob implements RowData {
43+
44+
private final int blobField;
45+
private InternalRow row;
46+
47+
public FlinkRowDataWithBlob(InternalRow row, int blobField) {
48+
this.row = row;
49+
this.blobField = blobField;
50+
}
51+
52+
public FlinkRowDataWithBlob replace(InternalRow row) {
53+
this.row = row;
54+
return this;
55+
}
56+
57+
@Override
58+
public int getArity() {
59+
return row.getFieldCount();
60+
}
61+
62+
@Override
63+
public RowKind getRowKind() {
64+
return toFlinkRowKind(row.getRowKind());
65+
}
66+
67+
@Override
68+
public void setRowKind(RowKind kind) {
69+
row.setRowKind(fromFlinkRowKind(kind));
70+
}
71+
72+
@Override
73+
public boolean isNullAt(int pos) {
74+
return row.isNullAt(pos);
75+
}
76+
77+
@Override
78+
public boolean getBoolean(int pos) {
79+
return row.getBoolean(pos);
80+
}
81+
82+
@Override
83+
public byte getByte(int pos) {
84+
return row.getByte(pos);
85+
}
86+
87+
@Override
88+
public short getShort(int pos) {
89+
return row.getShort(pos);
90+
}
91+
92+
@Override
93+
public int getInt(int pos) {
94+
return row.getInt(pos);
95+
}
96+
97+
@Override
98+
public long getLong(int pos) {
99+
return row.getLong(pos);
100+
}
101+
102+
@Override
103+
public float getFloat(int pos) {
104+
return row.getFloat(pos);
105+
}
106+
107+
@Override
108+
public double getDouble(int pos) {
109+
return row.getDouble(pos);
110+
}
111+
112+
@Override
113+
public StringData getString(int pos) {
114+
return toFlinkString(row.getString(pos));
115+
}
116+
117+
@Override
118+
public DecimalData getDecimal(int pos, int precision, int scale) {
119+
return toFlinkDecimal(row.getDecimal(pos, precision, scale));
120+
}
121+
122+
@Override
123+
public TimestampData getTimestamp(int pos, int precision) {
124+
return toFlinkTimestamp(row.getTimestamp(pos, precision));
125+
}
126+
127+
@Override
128+
public <T> RawValueData<T> getRawValue(int pos) {
129+
throw new UnsupportedOperationException();
130+
}
131+
132+
@Override
133+
public byte[] getBinary(int pos) {
134+
135+
return pos == blobField ? row.getBlob(pos).toData() : row.getBinary(pos);
136+
}
137+
138+
@Override
139+
public ArrayData getArray(int pos) {
140+
return new FlinkArrayData(row.getArray(pos));
141+
}
142+
143+
@Override
144+
public MapData getMap(int pos) {
145+
return new FlinkMapData(row.getMap(pos));
146+
}
147+
148+
@Override
149+
public RowData getRow(int pos, int numFields) {
150+
return new FlinkRowData(row.getRow(pos, numFields));
151+
}
152+
153+
public Variant getVariant(int pos) {
154+
org.apache.paimon.data.variant.Variant variant = row.getVariant(pos);
155+
return new BinaryVariant(variant.value(), variant.metadata());
156+
}
157+
158+
private static class FlinkArrayData implements ArrayData {
159+
160+
private final InternalArray array;
161+
162+
private FlinkArrayData(InternalArray array) {
163+
this.array = array;
164+
}
165+
166+
@Override
167+
public int size() {
168+
return array.size();
169+
}
170+
171+
@Override
172+
public boolean isNullAt(int pos) {
173+
return array.isNullAt(pos);
174+
}
175+
176+
@Override
177+
public boolean getBoolean(int pos) {
178+
return array.getBoolean(pos);
179+
}
180+
181+
@Override
182+
public byte getByte(int pos) {
183+
return array.getByte(pos);
184+
}
185+
186+
@Override
187+
public short getShort(int pos) {
188+
return array.getShort(pos);
189+
}
190+
191+
@Override
192+
public int getInt(int pos) {
193+
return array.getInt(pos);
194+
}
195+
196+
@Override
197+
public long getLong(int pos) {
198+
return array.getLong(pos);
199+
}
200+
201+
@Override
202+
public float getFloat(int pos) {
203+
return array.getFloat(pos);
204+
}
205+
206+
@Override
207+
public double getDouble(int pos) {
208+
return array.getDouble(pos);
209+
}
210+
211+
@Override
212+
public StringData getString(int pos) {
213+
return toFlinkString(array.getString(pos));
214+
}
215+
216+
@Override
217+
public DecimalData getDecimal(int pos, int precision, int scale) {
218+
return toFlinkDecimal(array.getDecimal(pos, precision, scale));
219+
}
220+
221+
@Override
222+
public TimestampData getTimestamp(int pos, int precision) {
223+
return toFlinkTimestamp(array.getTimestamp(pos, precision));
224+
}
225+
226+
@Override
227+
public <T> RawValueData<T> getRawValue(int pos) {
228+
throw new UnsupportedOperationException();
229+
}
230+
231+
public Variant getVariant(int pos) {
232+
org.apache.paimon.data.variant.Variant variant = array.getVariant(pos);
233+
return new BinaryVariant(variant.value(), variant.metadata());
234+
}
235+
236+
@Override
237+
public byte[] getBinary(int pos) {
238+
return array.getBinary(pos);
239+
}
240+
241+
@Override
242+
public ArrayData getArray(int pos) {
243+
return new FlinkArrayData(array.getArray(pos));
244+
}
245+
246+
@Override
247+
public MapData getMap(int pos) {
248+
return new FlinkMapData(array.getMap(pos));
249+
}
250+
251+
@Override
252+
public RowData getRow(int pos, int numFields) {
253+
return new FlinkRowData(array.getRow(pos, numFields));
254+
}
255+
256+
@Override
257+
public boolean[] toBooleanArray() {
258+
return array.toBooleanArray();
259+
}
260+
261+
@Override
262+
public byte[] toByteArray() {
263+
return array.toByteArray();
264+
}
265+
266+
@Override
267+
public short[] toShortArray() {
268+
return array.toShortArray();
269+
}
270+
271+
@Override
272+
public int[] toIntArray() {
273+
return array.toIntArray();
274+
}
275+
276+
@Override
277+
public long[] toLongArray() {
278+
return array.toLongArray();
279+
}
280+
281+
@Override
282+
public float[] toFloatArray() {
283+
return array.toFloatArray();
284+
}
285+
286+
@Override
287+
public double[] toDoubleArray() {
288+
return array.toDoubleArray();
289+
}
290+
}
291+
292+
private static class FlinkMapData implements MapData {
293+
294+
private final InternalMap map;
295+
296+
private FlinkMapData(InternalMap map) {
297+
this.map = map;
298+
}
299+
300+
@Override
301+
public int size() {
302+
return map.size();
303+
}
304+
305+
@Override
306+
public ArrayData keyArray() {
307+
return new FlinkArrayData(map.keyArray());
308+
}
309+
310+
@Override
311+
public ArrayData valueArray() {
312+
return new FlinkArrayData(map.valueArray());
313+
}
314+
}
315+
316+
public static StringData toFlinkString(BinaryString str) {
317+
return StringData.fromBytes(str.toBytes());
318+
}
319+
320+
public static TimestampData toFlinkTimestamp(Timestamp timestamp) {
321+
return TimestampData.fromEpochMillis(
322+
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
323+
}
324+
325+
public static DecimalData toFlinkDecimal(Decimal decimal) {
326+
if (decimal.isCompact()) {
327+
return DecimalData.fromUnscaledLong(
328+
decimal.toUnscaledLong(), decimal.precision(), decimal.scale());
329+
} else {
330+
return DecimalData.fromBigDecimal(
331+
decimal.toBigDecimal(), decimal.precision(), decimal.scale());
332+
}
333+
}
334+
335+
public static RowKind toFlinkRowKind(org.apache.paimon.types.RowKind rowKind) {
336+
switch (rowKind) {
337+
case INSERT:
338+
return RowKind.INSERT;
339+
case UPDATE_BEFORE:
340+
return RowKind.UPDATE_BEFORE;
341+
case UPDATE_AFTER:
342+
return RowKind.UPDATE_AFTER;
343+
case DELETE:
344+
return RowKind.DELETE;
345+
default:
346+
throw new UnsupportedOperationException();
347+
}
348+
}
349+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,16 @@ public FileStoreSourceReader(
5050
FileStoreSourceReaderMetrics metrics,
5151
IOManager ioManager,
5252
@Nullable Long limit,
53-
@Nullable NestedProjectedRowData rowData) {
53+
@Nullable NestedProjectedRowData rowData,
54+
@Nullable Integer blobField) {
5455
// limiter is created in SourceReader, it can be shared in all split readers
5556
super(
5657
() ->
5758
new FileStoreSourceSplitReader(
5859
tableRead.withIOManager(ioManager),
5960
RecordLimiter.create(limit),
60-
metrics),
61+
metrics,
62+
blobField),
6163
(element, output, state) ->
6264
FlinkRecordsWithSplitIds.emitRecord(
6365
readerContext, element, output, state, metrics, rowData),

0 commit comments

Comments
 (0)