Skip to content

Commit 85e5e25

Browse files
author
tianfengyu
committed
[FLINK-39834] Fix Oracle UNISTR parsing with embedded concat operator
1 parent 7355d76 commit 85e5e25

3 files changed

Lines changed: 179 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ limitations under the License.
207207
</includes>
208208
</artifactSet>
209209
<filters>
210+
<filter>
211+
<artifact>io.debezium:debezium-connector-oracle</artifact>
212+
<excludes>
213+
<exclude>
214+
io/debezium/connector/oracle/logminer/UnistrHelper.class
215+
</exclude>
216+
</excludes>
217+
</filter>
210218
<filter>
211219
<artifact>org.apache.kafka:*</artifact>
212220
<excludes>
@@ -276,4 +284,4 @@ limitations under the License.
276284
</plugins>
277285
</build>
278286

279-
</project>
287+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.oracle.logminer;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
/** A utility/helper class to support decoding Oracle Unicode String function values, {@code UNISTR}. */
24+
public class UnistrHelper {
25+
26+
private static final String UNITSTR_FUNCTION_START = "UNISTR('";
27+
private static final String UNISTR_FUNCTION_END = "')";
28+
29+
public static boolean isUnistrFunction(String data) {
30+
return data != null
31+
&& data.startsWith(UNITSTR_FUNCTION_START)
32+
&& data.endsWith(UNISTR_FUNCTION_END);
33+
}
34+
35+
public static String convert(String data) {
36+
if (data == null || data.length() == 0) {
37+
return data;
38+
}
39+
40+
// Multiple UNISTR function calls maybe concatenated together using "||".
41+
// Only split on SQL concatenation operators outside UNISTR quoted data.
42+
final List<String> parts = tokenize(data);
43+
44+
final StringBuilder result = new StringBuilder();
45+
for (String part : parts) {
46+
final String trimmed = part.trim();
47+
if (isUnistrFunction(trimmed)) {
48+
result.append(
49+
decode(
50+
trimmed.substring(
51+
UNITSTR_FUNCTION_START.length(), trimmed.length() - 2)));
52+
} else {
53+
result.append(data);
54+
}
55+
}
56+
return result.toString();
57+
}
58+
59+
private static String decode(String value) {
60+
final StringBuilder result = new StringBuilder();
61+
for (int i = 0; i < value.length(); i++) {
62+
char c = value.charAt(i);
63+
if (c == '\\' && value.length() >= i + 4) {
64+
result.append(
65+
Character.toChars(Integer.parseInt(value.substring(i + 1, i + 5), 16)));
66+
i += 4;
67+
} else {
68+
result.append(c);
69+
}
70+
}
71+
return result.toString();
72+
}
73+
74+
private static List<String> tokenize(String data) {
75+
final List<String> parts = new ArrayList<>();
76+
final int length = data.length();
77+
78+
boolean inQuotedData = false;
79+
int startIndex = 0;
80+
81+
for (int i = 0; i < length; i++) {
82+
if (stringMatches(data, i, UNITSTR_FUNCTION_START)) {
83+
inQuotedData = true;
84+
if (startIndex == -1) {
85+
startIndex = i;
86+
}
87+
} else if (inQuotedData && stringMatches(data, i, UNISTR_FUNCTION_END)) {
88+
inQuotedData = false;
89+
} else if (!inQuotedData && stringMatches(data, i, "||")) {
90+
parts.add(data.substring(startIndex, i).trim());
91+
92+
i += 1;
93+
startIndex = i + 1;
94+
}
95+
}
96+
97+
if (startIndex < data.length()) {
98+
parts.add(data.substring(startIndex).trim());
99+
}
100+
101+
return parts;
102+
}
103+
104+
private static boolean stringMatches(String str, int index, String token) {
105+
return str.regionMatches(index, token, 0, token.length());
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.oracle.logminer;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
class UnistrHelperTest {
25+
26+
@Test
27+
void shouldConvertUnistrValues() {
28+
assertThat(UnistrHelper.convert("UNISTR('\\0412\\044B')")).isEqualTo("\u0412\u044B");
29+
assertThat(UnistrHelper.convert("UNISTR('\\0412\\044B')||UNISTR('\\0431\\0443')"))
30+
.isEqualTo("\u0412\u044B\u0431\u0443");
31+
assertThat(UnistrHelper.convert("UNISTR('\\0412\\044B') || UNISTR('\\0431\\0443')"))
32+
.isEqualTo("\u0412\u044B\u0431\u0443");
33+
}
34+
35+
@Test
36+
void shouldConvertUnistrValueWithConcatenationCharacterSequence() {
37+
assertThat(UnistrHelper.convert("UNISTR('\\4E2D\\56FD||\\6B66\\6C49')"))
38+
.isEqualTo("\u4E2D\u56FD||\u6B66\u6C49");
39+
}
40+
41+
@Test
42+
void shouldConvertUnistrValueWithEmbeddedConcatenationAndAsciiCharacters() {
43+
assertThat(
44+
UnistrHelper.convert(
45+
"UNISTR('\\592A\\5E73\\6D0B\\53CC\\514D4000||"
46+
+ "\\518D\\5236\\9020\\5DF2\\5F55\\5355||"
47+
+ "C440100VEH26071668')"))
48+
.isEqualTo(
49+
"\u592A\u5E73\u6D0B\u53CC\u514D4000||"
50+
+ "\u518D\u5236\u9020\u5DF2\u5F55\u5355||"
51+
+ "C440100VEH26071668");
52+
}
53+
54+
@Test
55+
void shouldNotDuplicateOriginalUnistrValueWhenConcatenationSequenceIsInsideFunction() {
56+
String unistr =
57+
"UNISTR('\\592A\\5E73\\6D0B\\53CC\\514D4000||"
58+
+ "\\518D\\5236\\9020\\5DF2\\5F55\\5355||"
59+
+ "C440100VEH26071668')";
60+
61+
assertThat(UnistrHelper.convert(unistr)).doesNotContain("UNISTR(");
62+
}
63+
}

0 commit comments

Comments
 (0)