Skip to content

Commit c03274d

Browse files
committed
Address Gemini review comment & update CHANGES.md
1 parent b09fdc1 commit c03274d

3 files changed

Lines changed: 23 additions & 0 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
## I/Os
6666

6767
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
68+
* DebeziumIO (Java): added `OffsetRetainer` interface and `FileSystemOffsetRetainer` implementation to persist and restore CDC offsets across pipeline restarts, and exposed `withStartOffset` / `withOffsetRetainer` on `DebeziumIO.Read` and the cross-language `ReadBuilder` ([#28248](https://github.com/apache/beam/issues/28248)).
6869

6970
## New Features / Improvements
7071

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
138138
Map<String, Object> startOffsetMap = new HashMap<>();
139139
for (String property : configuration.startOffset) {
140140
String[] parts = property.split("=", 2);
141+
if (parts.length != 2) {
142+
throw new IllegalArgumentException(
143+
"Invalid startOffset entry: \""
144+
+ property
145+
+ "\". Expected format is \"key=value\".");
146+
}
141147
String key = parts[0];
142148
String value = parts[1];
143149
try {

sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.debezium.connector.mysql.MySqlConnector;
2828
import io.debezium.connector.mysql.MySqlConnectorConfig;
2929
import java.io.Serializable;
30+
import java.util.Arrays;
3031
import java.util.HashMap;
3132
import java.util.Map;
3233
import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration;
@@ -231,4 +232,19 @@ public void testRetainerFallsBackToWithStartOffsetWhenLoadReturnsNull() throws E
231232
KafkaSourceConsumerFn.OffsetHolder restriction = fn.getInitialRestriction(null);
232233
assertEquals(explicitOffset, restriction.offset);
233234
}
235+
236+
@Test
237+
public void testBuildExternalThrowsOnMalformedStartOffsetEntry() {
238+
DebeziumTransformRegistrar.ReadBuilder.Configuration config =
239+
new DebeziumTransformRegistrar.ReadBuilder.Configuration();
240+
config.setUsername("user");
241+
config.setPassword("pass");
242+
config.setHost("localhost");
243+
config.setPort("3306");
244+
config.setConnectorClass("MySQL");
245+
config.setStartOffset(Arrays.asList("lsn=100", "no-equals-sign"));
246+
assertThrows(
247+
IllegalArgumentException.class,
248+
() -> new DebeziumTransformRegistrar.ReadBuilder().buildExternal(config));
249+
}
234250
}

0 commit comments

Comments
 (0)