Skip to content

Commit e90a574

Browse files
committed
[Test] 为SR字段顺序写入功能添加单元测试
- StarRocksTypeConvertTest: 验证 datetime→TIMESTAMP 和 date→DATE 类型映射 - PostgresCDCBuilderTest: 覆盖 composeJdbcProperties 的边界场景(null/空/单参数/多参数) - CreateCDCSourceOperationTest: 通过反射测试 setSinkTable()(匹配、不匹配、带schema前缀、driver加载列、首个匹配优先) - TableTest: 覆盖新增 sinkTable 字段(默认null、存取、隔离性、重置)
1 parent 4400e48 commit e90a574

4 files changed

Lines changed: 445 additions & 0 deletions

File tree

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.cdc.postgres;
21+
22+
import static org.mockito.Mockito.when;
23+
24+
import org.dinky.data.model.FlinkCDCConfig;
25+
26+
import java.util.Collections;
27+
import java.util.LinkedHashMap;
28+
import java.util.Map;
29+
30+
import org.junit.Assert;
31+
import org.junit.Before;
32+
import org.junit.Test;
33+
import org.mockito.Mock;
34+
import org.mockito.MockitoAnnotations;
35+
36+
/**
37+
* Tests for {@link PostgresCDCBuilder#parseMetaDataConfig()} and the private
38+
* {@code composeJdbcProperties} helper introduced in this commit.
39+
*/
40+
public class PostgresCDCBuilderTest {
41+
42+
@Mock
43+
private FlinkCDCConfig config;
44+
45+
private PostgresCDCBuilder builder;
46+
47+
@Before
48+
public void setUp() {
49+
MockitoAnnotations.initMocks(this);
50+
builder = new PostgresCDCBuilder(config);
51+
when(config.getHostname()).thenReturn("localhost");
52+
when(config.getPort()).thenReturn(5432);
53+
when(config.getDatabase()).thenReturn("testdb");
54+
when(config.getUsername()).thenReturn("user");
55+
when(config.getPassword()).thenReturn("pass");
56+
}
57+
58+
/** No jdbc properties -> URL should not contain '?' */
59+
@Test
60+
public void testParseMetaDataConfig_noJdbcProperties() {
61+
when(config.getJdbc()).thenReturn(null);
62+
63+
Map<String, String> result = builder.parseMetaDataConfig();
64+
65+
String url = result.get("url");
66+
Assert.assertNotNull(url);
67+
Assert.assertTrue("URL should start with jdbc:postgres://", url.startsWith("jdbc:postgres://"));
68+
Assert.assertFalse("URL should not contain '?' when jdbc props are empty", url.contains("?"));
69+
}
70+
71+
/** Empty jdbc map -> same as null, no query string */
72+
@Test
73+
public void testParseMetaDataConfig_emptyJdbcProperties() {
74+
when(config.getJdbc()).thenReturn(Collections.emptyMap());
75+
76+
Map<String, String> result = builder.parseMetaDataConfig();
77+
78+
String url = result.get("url");
79+
Assert.assertFalse("URL should not contain '?' for empty jdbc map", url.contains("?"));
80+
}
81+
82+
/** Single jdbc property -> ?key=value */
83+
@Test
84+
public void testParseMetaDataConfig_singleJdbcProperty() {
85+
Map<String, String> jdbc = Collections.singletonMap("ssl", "true");
86+
when(config.getJdbc()).thenReturn(jdbc);
87+
88+
Map<String, String> result = builder.parseMetaDataConfig();
89+
90+
String url = result.get("url");
91+
Assert.assertTrue("URL should contain '?ssl=true'", url.contains("?ssl=true"));
92+
// Must not end with trailing '&'
93+
Assert.assertFalse("URL must not end with '&'", url.endsWith("&"));
94+
}
95+
96+
/** Multiple jdbc properties -> all encoded, no trailing '&' */
97+
@Test
98+
public void testParseMetaDataConfig_multipleJdbcProperties() {
99+
Map<String, String> jdbc = new LinkedHashMap<>();
100+
jdbc.put("ssl", "true");
101+
jdbc.put("sslmode", "require");
102+
when(config.getJdbc()).thenReturn(jdbc);
103+
104+
Map<String, String> result = builder.parseMetaDataConfig();
105+
106+
String url = result.get("url");
107+
Assert.assertTrue("URL should contain ssl param", url.contains("ssl=true"));
108+
Assert.assertTrue("URL should contain sslmode param", url.contains("sslmode=require"));
109+
Assert.assertFalse("URL must not end with '&'", url.endsWith("&"));
110+
// Exactly one '?' in the query string part
111+
Assert.assertEquals("URL should have exactly one '?'", 1, url.chars().filter(c -> c == '?').count());
112+
}
113+
114+
/** Host / port embedded correctly in URL */
115+
@Test
116+
public void testParseMetaDataConfig_urlContainsHostAndPort() {
117+
when(config.getJdbc()).thenReturn(null);
118+
119+
Map<String, String> result = builder.parseMetaDataConfig();
120+
121+
String url = result.get("url");
122+
Assert.assertTrue("URL should contain host", url.contains("localhost"));
123+
Assert.assertTrue("URL should contain port", url.contains("5432"));
124+
}
125+
}

dinky-common/src/test/java/org/dinky/data/model/TableTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,38 @@ void setUp() {
8787
flinkConfig = "#{schemaName}=schemaName, #{tableName}=tableName, #{abc}=abc, #{}=null, bcd=bcd";
8888
}
8989

90+
// ---- sinkTable field (added in this commit) ----
91+
92+
@Test
93+
void sinkTable_defaultIsNull() {
94+
assertThat(table.getSinkTable(), equalTo(null));
95+
}
96+
97+
@Test
98+
void sinkTable_setAndGet() {
99+
Table sink = new Table("sink_orders", "target_schema", null);
100+
table.setSinkTable(sink);
101+
assertThat(table.getSinkTable(), equalTo(sink));
102+
assertThat(table.getSinkTable().getName(), equalTo("sink_orders"));
103+
}
104+
105+
@Test
106+
void sinkTable_doesNotAffectSourceTableFields() {
107+
Table sink = new Table("sink_orders", "target_schema", null);
108+
table.setSinkTable(sink);
109+
// Source table fields must remain unchanged
110+
assertThat(table.getName(), equalTo("TableNameOrigin"));
111+
assertThat(table.getSchema(), equalTo("SchemaOrigin"));
112+
}
113+
114+
@Test
115+
void sinkTable_canBeResetToNull() {
116+
Table sink = new Table("sink_orders", "target_schema", null);
117+
table.setSinkTable(sink);
118+
table.setSinkTable(null);
119+
assertThat(table.getSinkTable(), equalTo(null));
120+
}
121+
90122
@Test
91123
void getFlinkDDL() {
92124
String result = table.getFlinkDDL(flinkConfig, "NewTableName");
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one or more
4+
* contributor license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright ownership.
6+
* The ASF licenses this file to You under the Apache License, Version 2.0
7+
* (the "License"); you may not use this file except in compliance with
8+
* the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
package org.dinky.trans.ddl;
21+
22+
import static org.junit.jupiter.api.Assertions.*;
23+
import static org.mockito.Mockito.*;
24+
25+
import org.dinky.cdc.SinkBuilder;
26+
import org.dinky.data.model.Table;
27+
28+
import java.lang.reflect.Method;
29+
import java.util.Arrays;
30+
import java.util.Collections;
31+
import java.util.List;
32+
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
36+
/**
37+
* Unit tests for the private static method
38+
* {@code CreateCDCSourceOperation#setSinkTable} introduced in this commit.
39+
*
40+
* <p>The method is exercised via reflection to avoid exposing it as package-visible.
41+
*/
42+
class CreateCDCSourceOperationTest {
43+
44+
private Method setSinkTableMethod;
45+
private SinkBuilder sinkBuilder;
46+
47+
@BeforeEach
48+
void setUp() throws Exception {
49+
setSinkTableMethod = CreateCDCSourceOperation.class.getDeclaredMethod(
50+
"setSinkTable", Table.class, List.class, SinkBuilder.class,
51+
org.dinky.metadata.driver.Driver.class);
52+
setSinkTableMethod.setAccessible(true);
53+
54+
sinkBuilder = mock(SinkBuilder.class);
55+
}
56+
57+
private void invoke(Table table, List<Table> sinkTables, SinkBuilder sb,
58+
org.dinky.metadata.driver.Driver driver) throws Exception {
59+
setSinkTableMethod.invoke(null, table, sinkTables, sb, driver);
60+
}
61+
62+
/**
63+
* When the sinkBuilder returns a table name that matches one of the sink tables
64+
* (plain name, no schema prefix), the source table's sinkTable is set.
65+
*/
66+
@Test
67+
void testMatchBySinkTableName_plain() throws Exception {
68+
Table sourceTable = new Table("orders", "public", null);
69+
Table sinkTable = new Table("orders", "public", null);
70+
71+
when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders");
72+
73+
invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null);
74+
75+
assertNotNull(sourceTable.getSinkTable());
76+
assertEquals("orders", sourceTable.getSinkTable().getName());
77+
}
78+
79+
/**
80+
* When the sink table carries a schema prefix (e.g. "public.orders"), the method
81+
* must split on '.' and match using only the table-name part.
82+
*/
83+
@Test
84+
void testMatchBySinkTableName_withSchemaPrefix() throws Exception {
85+
Table sourceTable = new Table("orders", "public", null);
86+
Table sinkTable = new Table("orders", "public", null);
87+
// getSchemaTableName() returns "public.orders"
88+
89+
when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders");
90+
91+
invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null);
92+
93+
assertNotNull(sourceTable.getSinkTable());
94+
}
95+
96+
/**
97+
* No matching sink table -> sinkTable stays null.
98+
*/
99+
@Test
100+
void testNoMatch_sinkTableRemainsNull() throws Exception {
101+
Table sourceTable = new Table("orders", "public", null);
102+
Table sinkTable = new Table("customers", "public", null);
103+
104+
when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders");
105+
106+
invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, null);
107+
108+
assertNull(sourceTable.getSinkTable());
109+
}
110+
111+
/**
112+
* When sinkRealDriver is provided, columns should be loaded from it.
113+
*/
114+
@Test
115+
void testMatchWithDriver_columnsAreSet() throws Exception {
116+
Table sourceTable = new Table("orders", "public", null);
117+
Table sinkTable = new Table("orders", "public", null);
118+
119+
when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders");
120+
121+
org.dinky.metadata.driver.Driver driver = mock(org.dinky.metadata.driver.Driver.class);
122+
when(driver.listColumnsSortByPK("public", "orders"))
123+
.thenReturn(Collections.emptyList());
124+
125+
invoke(sourceTable, Collections.singletonList(sinkTable), sinkBuilder, driver);
126+
127+
assertNotNull(sourceTable.getSinkTable());
128+
verify(driver).listColumnsSortByPK("public", "orders");
129+
}
130+
131+
/**
132+
* Empty sink table list -> sinkTable stays null, no exception.
133+
*/
134+
@Test
135+
void testEmptySinkTableList() throws Exception {
136+
Table sourceTable = new Table("orders", "public", null);
137+
138+
when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders");
139+
140+
invoke(sourceTable, Collections.emptyList(), sinkBuilder, null);
141+
142+
assertNull(sourceTable.getSinkTable());
143+
}
144+
145+
/**
146+
* Multiple candidates; only the first match should be used (break after first match).
147+
*/
148+
@Test
149+
void testFirstMatchWins() throws Exception {
150+
Table sourceTable = new Table("orders", "public", null);
151+
Table sinkTable1 = new Table("orders", "schema1", null);
152+
Table sinkTable2 = new Table("orders", "schema2", null);
153+
154+
when(sinkBuilder.getSinkTableName(sourceTable)).thenReturn("orders");
155+
156+
invoke(sourceTable, Arrays.asList(sinkTable1, sinkTable2), sinkBuilder, null);
157+
158+
assertNotNull(sourceTable.getSinkTable());
159+
assertEquals("schema1", sourceTable.getSinkTable().getSchema());
160+
}
161+
}

0 commit comments

Comments
 (0)