Skip to content

Commit e1e2c3b

Browse files
authored
[DebeziumIO] Implement startOffset & offset persistence for Kafka (#28248) (#37750)
* [DebeziumIO] Implement offsetRetainer for Kafka (#28248) * Adjust to review comments and add unit tests * Address Gemini review comment & update CHANGES.md
1 parent 487696c commit e1e2c3b

10 files changed

Lines changed: 738 additions & 5 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
## I/Os
6666

6767
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
68+
* DebeziumIO (Java): added `OffsetRetainer` interface and `FileSystemOffsetRetainer` implementation to persist and restore CDC offsets across pipeline restarts, and exposed `withStartOffset` / `withOffsetRetainer` on `DebeziumIO.Read` and the cross-language `ReadBuilder` ([#28248](https://github.com/apache/beam/issues/28248)).
6869

6970
## New Features / Improvements
7071

sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ type readFromDebeziumSchema struct {
7777
MaxNumberOfRecords *int64
7878
MaxTimeToRun *int64
7979
ConnectionProperties []string
80+
StartOffset []string
81+
OffsetStoragePath *string
8082
}
8183

8284
type debeziumConfig struct {
@@ -149,6 +151,44 @@ func ConnectionProperties(cp []string) readOption {
149151
}
150152
}
151153

154+
// StartOffset specifies the offset from which the connector should resume consuming
155+
// changes. Each entry must be a "key=value" string, where numeric values are encoded
156+
// as their decimal string representation.
157+
//
158+
// Example for PostgreSQL:
159+
//
160+
// debeziumio.StartOffset([]string{"lsn=28160840"})
161+
//
162+
// Example for MySQL:
163+
//
164+
// debeziumio.StartOffset([]string{"file=binlog.000001", "pos=156"})
165+
//
166+
// Obtain the offset from the output of a previous pipeline run. Numeric values such
167+
// as LSN or binlog position are automatically parsed to Long on the Java side.
168+
func StartOffset(offset []string) readOption {
169+
return func(cfg *debeziumConfig) {
170+
cfg.readSchema.StartOffset = offset
171+
}
172+
}
173+
174+
// OffsetStoragePath sets a path where the connector offset is automatically saved after each
175+
// checkpoint and loaded on pipeline startup, allowing the pipeline to resume from where it
176+
// left off without any manual offset management.
177+
//
178+
// The path can be on any filesystem supported by the active Beam runner
179+
// (local disk, GCS, S3, etc.).
180+
//
181+
// Example:
182+
//
183+
// debeziumio.OffsetStoragePath("gs://my-bucket/debezium/orders-offset.json")
184+
//
185+
// When set, takes precedence over StartOffset.
186+
func OffsetStoragePath(path string) readOption {
187+
return func(cfg *debeziumConfig) {
188+
cfg.readSchema.OffsetStoragePath = &path
189+
}
190+
}
191+
152192
// ExpansionAddr sets the expansion service address to use for DebeziumIO cross-langauage transform.
153193
func ExpansionAddr(expansionAddr string) readOption {
154194
return func(cfg *debeziumConfig) {

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
144144

145145
abstract @Nullable Long getPollingTimeout();
146146

147+
abstract @Nullable Map<String, Object> getStartOffset();
148+
149+
abstract @Nullable OffsetRetainer getOffsetRetainer();
150+
147151
abstract @Nullable Coder<T> getCoder();
148152

149153
abstract Builder<T> toBuilder();
@@ -162,6 +166,10 @@ abstract static class Builder<T> {
162166

163167
abstract Builder<T> setPollingTimeout(Long miliseconds);
164168

169+
abstract Builder<T> setStartOffset(Map<String, Object> startOffset);
170+
171+
abstract Builder<T> setOffsetRetainer(OffsetRetainer retainer);
172+
165173
abstract Read<T> build();
166174
}
167175

@@ -230,6 +238,74 @@ public Read<T> withPollingTimeout(Long miliseconds) {
230238
return toBuilder().setPollingTimeout(miliseconds).build();
231239
}
232240

241+
/**
242+
* Sets a starting offset so the connector resumes consuming changes from a previously seen
243+
* position rather than from the beginning of the change stream.
244+
*
245+
* <p>The offset format is connector-specific. You can capture the current offset for each
246+
* processed record inside your {@link SourceRecordMapper} via {@link
247+
* org.apache.kafka.connect.source.SourceRecord#sourceOffset()} and persist it externally (for
248+
* example in Cloud Storage, a database, or a local file). On the next pipeline run, pass the
249+
* last saved offset here.
250+
*
251+
* <p>Example (PostgreSQL):
252+
*
253+
* <pre>{@code
254+
* // Capture the offset inside the SourceRecordMapper:
255+
* Map<String, Object> offset = sourceRecord.sourceOffset();
256+
* // Persist 'offset' externally, then on restart:
257+
* DebeziumIO.read()
258+
* .withConnectorConfiguration(config)
259+
* .withStartOffset(savedOffset)
260+
* .withFormatFunction(myMapper);
261+
* }</pre>
262+
*
263+
* @param startOffset A map representing the resumption point, as returned by {@code
264+
* SourceRecord#sourceOffset()}.
265+
* @return PTransform {@link #read}
266+
*/
267+
public Read<T> withStartOffset(Map<String, Object> startOffset) {
268+
checkArgument(startOffset != null, "startOffset can not be null");
269+
return toBuilder().setStartOffset(startOffset).build();
270+
}
271+
272+
/**
273+
* Sets an {@link OffsetRetainer} that automatically saves and restores the connector offset,
274+
* allowing the pipeline to resume from where it left off after a restart without any manual
275+
* offset management.
276+
*
277+
* <p>When a retainer is configured:
278+
*
279+
* <ol>
280+
* <li>At pipeline startup, {@link OffsetRetainer#loadOffset()} is called. If a saved offset
281+
* is found, the connector resumes from that position; otherwise it starts from the
282+
* beginning of the change stream.
283+
* <li>After each successful checkpoint ({@code task.commit()}), {@link
284+
* OffsetRetainer#saveOffset(Map)} is called with the latest committed offset.
285+
* </ol>
286+
*
287+
* <p>The built-in {@link FileSystemOffsetRetainer} persists the offset as a JSON file on any
288+
* Beam-compatible filesystem (local, GCS, S3, etc.):
289+
*
290+
* <pre>{@code
291+
* DebeziumIO.read()
292+
* .withConnectorConfiguration(config)
293+
* .withOffsetRetainer(
294+
* new FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
295+
* .withFormatFunction(myMapper);
296+
* }</pre>
297+
*
298+
* <p>When both a retainer and {@link #withStartOffset(Map)} are set, the retainer takes
299+
* precedence. Use {@link #withStartOffset(Map)} alone for a one-time manual override.
300+
*
301+
* @param retainer The {@link OffsetRetainer} to use for loading and saving offsets.
302+
* @return PTransform {@link #read}
303+
*/
304+
public Read<T> withOffsetRetainer(OffsetRetainer retainer) {
305+
checkArgument(retainer != null, "retainer can not be null");
306+
return toBuilder().setOffsetRetainer(retainer).build();
307+
}
308+
233309
protected Schema getRecordSchema() {
234310
KafkaSourceConsumerFn<T> fn =
235311
new KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(), this);

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.io.debezium;
1919

2020
import com.google.auto.service.AutoService;
21+
import java.util.HashMap;
2122
import java.util.List;
2223
import java.util.Map;
2324
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
@@ -78,6 +79,8 @@ public static class Configuration extends CrossLanguageConfiguration {
7879
private @Nullable List<String> connectionProperties;
7980
private @Nullable Long maxNumberOfRecords;
8081
private @Nullable Long maxTimeToRun;
82+
private @Nullable List<String> startOffset;
83+
private @Nullable String offsetStoragePath;
8184

8285
public void setConnectionProperties(@Nullable List<String> connectionProperties) {
8386
this.connectionProperties = connectionProperties;
@@ -90,6 +93,14 @@ public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
9093
public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
9194
this.maxTimeToRun = maxTimeToRun;
9295
}
96+
97+
public void setStartOffset(@Nullable List<String> startOffset) {
98+
this.startOffset = startOffset;
99+
}
100+
101+
public void setOffsetStoragePath(@Nullable String offsetStoragePath) {
102+
this.offsetStoragePath = offsetStoragePath;
103+
}
93104
}
94105

95106
@Override
@@ -123,6 +134,33 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
123134
readTransform = readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
124135
}
125136

137+
if (configuration.startOffset != null) {
138+
Map<String, Object> startOffsetMap = new HashMap<>();
139+
for (String property : configuration.startOffset) {
140+
String[] parts = property.split("=", 2);
141+
if (parts.length != 2) {
142+
throw new IllegalArgumentException(
143+
"Invalid startOffset entry: \""
144+
+ property
145+
+ "\". Expected format is \"key=value\".");
146+
}
147+
String key = parts[0];
148+
String value = parts[1];
149+
try {
150+
startOffsetMap.put(key, Long.parseLong(value));
151+
} catch (NumberFormatException e) {
152+
startOffsetMap.put(key, value);
153+
}
154+
}
155+
readTransform = readTransform.withStartOffset(startOffsetMap);
156+
}
157+
158+
if (configuration.offsetStoragePath != null) {
159+
readTransform =
160+
readTransform.withOffsetRetainer(
161+
FileSystemOffsetRetainer.of(configuration.offsetStoragePath));
162+
}
163+
126164
return readTransform;
127165
}
128166
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.io.debezium;
19+
20+
import com.fasterxml.jackson.core.type.TypeReference;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import java.io.FileNotFoundException;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.OutputStream;
26+
import java.nio.channels.Channels;
27+
import java.nio.channels.ReadableByteChannel;
28+
import java.nio.channels.WritableByteChannel;
29+
import java.util.Collections;
30+
import java.util.Map;
31+
import org.apache.beam.sdk.io.FileSystems;
32+
import org.apache.beam.sdk.io.fs.ResourceId;
33+
import org.checkerframework.checker.nullness.qual.Nullable;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
/**
38+
* An {@link OffsetRetainer} that persists the Debezium connector offset as a JSON file using Beam's
39+
* {@link FileSystems} abstraction.
40+
*
41+
* <p>The {@code path} argument can point to any filesystem supported by the active Beam runner,
42+
* including local disk, Google Cloud Storage, Amazon S3, and others
43+
*
44+
* <p>On every {@code task.commit()}, the latest offset is serialised to JSON and written to the
45+
* given path (overwriting the previous file). On pipeline startup the file is read back and the
46+
* connector resumes from the stored position. If the file does not yet exist the connector starts
47+
* from the beginning of the change stream.
48+
*
49+
* <p>Example — resume from GCS:
50+
*
51+
* <pre>{@code
52+
* DebeziumIO.read()
53+
* .withConnectorConfiguration(config)
54+
* .withOffsetRetainer(
55+
* FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
56+
* .withFormatFunction(myMapper);
57+
* }</pre>
58+
*
59+
* <p>Example — local filesystem (useful for testing):
60+
*
61+
* <pre>{@code
62+
* DebeziumIO.read()
63+
* .withConnectorConfiguration(config)
64+
* .withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
65+
* .withFormatFunction(myMapper);
66+
* }</pre>
67+
*
68+
* <p><b>Note:</b> writes are performed atomically: the offset is first written to a {@code .tmp}
69+
* sibling file and then renamed to the final path, so a mid-write crash leaves the previous offset
70+
* intact.
71+
*/
72+
public class FileSystemOffsetRetainer implements OffsetRetainer {
73+
74+
private static final Logger LOG = LoggerFactory.getLogger(FileSystemOffsetRetainer.class);
75+
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};
76+
77+
private final String path;
78+
79+
// ObjectMapper is thread-safe after configuration and does not need to be serialised.
80+
private transient @Nullable ObjectMapper objectMapper;
81+
82+
// Tracks the last successfully saved offset so repeated identical saves are skipped.
83+
private transient @Nullable Map<String, Object> lastSavedOffset;
84+
85+
private FileSystemOffsetRetainer(String path) {
86+
this.path = path;
87+
}
88+
89+
/** Creates a new {@code FileSystemOffsetRetainer} that stores the offset at {@code path}. */
90+
public static FileSystemOffsetRetainer of(String path) {
91+
return new FileSystemOffsetRetainer(path);
92+
}
93+
94+
private ObjectMapper mapper() {
95+
if (objectMapper == null) {
96+
objectMapper = new ObjectMapper();
97+
}
98+
return objectMapper;
99+
}
100+
101+
/**
102+
* Reads the offset JSON file and returns its contents, or {@code null} if the file does not yet
103+
* exist (first run). Throws {@link RuntimeException} if the file exists but cannot be read, to
104+
* prevent silently reprocessing data from the beginning.
105+
*/
106+
@Override
107+
public @Nullable Map<String, Object> loadOffset() {
108+
try {
109+
ResourceId resourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false);
110+
try (ReadableByteChannel channel = FileSystems.open(resourceId);
111+
InputStream stream = Channels.newInputStream(channel)) {
112+
Map<String, Object> offset = mapper().readValue(stream, MAP_TYPE);
113+
LOG.info("OffsetRetainer: loaded offset from {}: {}", path, offset);
114+
return offset;
115+
}
116+
} catch (FileNotFoundException e) {
117+
LOG.info("OffsetRetainer: no offset file found at {}; starting from the beginning.", path);
118+
return null;
119+
} catch (IOException e) {
120+
throw new RuntimeException(
121+
"OffsetRetainer: failed to read offset from "
122+
+ path
123+
+ ". "
124+
+ "Delete the file to restart from the beginning.",
125+
e);
126+
}
127+
}
128+
129+
/**
130+
* Serialises {@code offset} to JSON and writes it atomically to the configured path.
131+
*
132+
* <p>If the offset is identical to the last successfully written one, the write is skipped to
133+
* avoid unnecessary I/O on every checkpoint.
134+
*
135+
* <p>Otherwise the data is first written to a {@code .tmp} sibling file and then renamed to the
136+
* final path, so a mid-write crash leaves the previous offset intact.
137+
*
138+
* <p>Errors are logged as warnings and swallowed so the pipeline continues.
139+
*/
140+
@Override
141+
public void saveOffset(Map<String, Object> offset) {
142+
if (offset.equals(lastSavedOffset)) {
143+
LOG.debug("OffsetRetainer: offset unchanged, skipping write to {}", path);
144+
return;
145+
}
146+
String tmpPath = path + ".tmp";
147+
try {
148+
ResourceId tmpResourceId = FileSystems.matchNewResource(tmpPath, /* isDirectory= */ false);
149+
try (WritableByteChannel channel = FileSystems.create(tmpResourceId, "application/json");
150+
OutputStream stream = Channels.newOutputStream(channel)) {
151+
mapper().writeValue(stream, offset);
152+
}
153+
ResourceId finalResourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false);
154+
FileSystems.rename(
155+
Collections.singletonList(tmpResourceId), Collections.singletonList(finalResourceId));
156+
lastSavedOffset = offset;
157+
LOG.debug("OffsetRetainer: saved offset to {}: {}", path, offset);
158+
} catch (IOException e) {
159+
LOG.warn(
160+
"OffsetRetainer: failed to save offset to {}."
161+
+ " The offset will be lost if the pipeline restarts.",
162+
path,
163+
e);
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)