Skip to content

Commit 05ad2a0

Browse files
yuzelinJingsongLi
authored andcommitted
[core] Fix that cannot read binlog table with projection (#6417)
1 parent 018d61b commit 05ad2a0

2 files changed

Lines changed: 21 additions & 1 deletion

File tree

paimon-core/src/main/java/org/apache/paimon/table/system/BinlogTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
131131
(row1, row2) ->
132132
new AuditLogRow(
133133
readProjection, convertToArray(row1, row2, fieldGetters)),
134-
wrapped.rowType());
134+
this.wrappedReadType);
135135
} else {
136136
return dataRead.createReader(split)
137137
.transform(

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,26 @@ public void testBinlogTableStreamRead() throws Exception {
11721172
iterator.close();
11731173
}
11741174

1175+
@Test
1176+
public void testBinlogTableStreamReadWithProjection() throws Exception {
1177+
sql(
1178+
"CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', "
1179+
+ "'bucket' = '2')");
1180+
BlockingIterator<Row, Row> iterator =
1181+
streamSqlBlockIter(
1182+
"SELECT rowkind, a FROM T$binlog /*+ OPTIONS('scan.mode' = 'latest') */");
1183+
sql("INSERT INTO T VALUES (1, 2)");
1184+
sql("INSERT INTO T VALUES (1, 3)");
1185+
sql("INSERT INTO T VALUES (2, 2)");
1186+
List<Row> rows = iterator.collect(3);
1187+
assertThat(rows)
1188+
.containsExactly(
1189+
Row.of("+I", new Integer[] {1}),
1190+
Row.of("+U", new Integer[] {1, 1}),
1191+
Row.of("+I", new Integer[] {2}));
1192+
iterator.close();
1193+
}
1194+
11751195
@Test
11761196
public void testBinlogTableBatchRead() throws Exception {
11771197
sql(

0 commit comments

Comments
 (0)