Skip to content

Commit d57e833

Browse files
committed
[DebeziumIO] Implement offsetRetainer for Kafka (#28248)
1 parent 3197d88 commit d57e833

8 files changed

Lines changed: 554 additions & 5 deletions

File tree

sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ type readFromDebeziumSchema struct {
7777
MaxNumberOfRecords *int64
7878
MaxTimeToRun *int64
7979
ConnectionProperties []string
80+
StartOffset []string
81+
OffsetStoragePath *string
8082
}
8183

8284
type debeziumConfig struct {
@@ -149,6 +151,44 @@ func ConnectionProperties(cp []string) readOption {
149151
}
150152
}
151153

154+
// StartOffset specifies the offset from which the connector should resume consuming
155+
// changes. Each entry must be a "key=value" string, where numeric values are encoded
156+
// as their decimal string representation.
157+
//
158+
// Example for PostgreSQL:
159+
//
160+
// debeziumio.StartOffset([]string{"lsn=28160840"})
161+
//
162+
// Example for MySQL:
163+
//
164+
// debeziumio.StartOffset([]string{"file=binlog.000001", "pos=156"})
165+
//
166+
// Obtain the offset from the output of a previous pipeline run. Numeric values such
167+
// as LSN or binlog position are automatically parsed to Long on the Java side.
168+
func StartOffset(offset []string) readOption {
169+
return func(cfg *debeziumConfig) {
170+
cfg.readSchema.StartOffset = offset
171+
}
172+
}
173+
174+
// OffsetStoragePath sets a path where the connector offset is automatically saved after each
175+
// checkpoint and loaded on pipeline startup, allowing the pipeline to resume from where it
176+
// left off without any manual offset management.
177+
//
178+
// The path can be on any filesystem supported by the active Beam runner
179+
// (local disk, GCS, S3, etc.).
180+
//
181+
// Example:
182+
//
183+
// debeziumio.OffsetStoragePath("gs://my-bucket/debezium/orders-offset.json")
184+
//
185+
// When set, takes precedence over StartOffset.
186+
func OffsetStoragePath(path string) readOption {
187+
return func(cfg *debeziumConfig) {
188+
cfg.readSchema.OffsetStoragePath = &path
189+
}
190+
}
191+
152192
// ExpansionAddr sets the expansion service address to use for DebeziumIO cross-langauage transform.
153193
func ExpansionAddr(expansionAddr string) readOption {
154194
return func(cfg *debeziumConfig) {

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
144144

145145
abstract @Nullable Long getPollingTimeout();
146146

147+
abstract @Nullable Map<String, Object> getStartOffset();
148+
149+
abstract @Nullable OffsetRetainer getOffsetRetainer();
150+
147151
abstract @Nullable Coder<T> getCoder();
148152

149153
abstract Builder<T> toBuilder();
@@ -162,6 +166,10 @@ abstract static class Builder<T> {
162166

163167
abstract Builder<T> setPollingTimeout(Long miliseconds);
164168

169+
abstract Builder<T> setStartOffset(Map<String, Object> startOffset);
170+
171+
abstract Builder<T> setOffsetRetainer(OffsetRetainer retainer);
172+
165173
abstract Read<T> build();
166174
}
167175

@@ -230,6 +238,74 @@ public Read<T> withPollingTimeout(Long miliseconds) {
230238
return toBuilder().setPollingTimeout(miliseconds).build();
231239
}
232240

241+
/**
242+
* Sets a starting offset so the connector resumes consuming changes from a previously seen
243+
* position rather than from the beginning of the change stream.
244+
*
245+
* <p>The offset format is connector-specific. You can capture the current offset for each
246+
* processed record inside your {@link SourceRecordMapper} via {@link
247+
* org.apache.kafka.connect.source.SourceRecord#sourceOffset()} and persist it externally (for
248+
* example in Cloud Storage, a database, or a local file). On the next pipeline run, pass the
249+
* last saved offset here.
250+
*
251+
* <p>Example (PostgreSQL):
252+
*
253+
* <pre>{@code
254+
* // Capture the offset inside the SourceRecordMapper:
255+
* Map<String, Object> offset = sourceRecord.sourceOffset();
256+
* // Persist 'offset' externally, then on restart:
257+
* DebeziumIO.read()
258+
* .withConnectorConfiguration(config)
259+
* .withStartOffset(savedOffset)
260+
* .withFormatFunction(myMapper);
261+
* }</pre>
262+
*
263+
* @param startOffset A map representing the resumption point, as returned by {@code
264+
* SourceRecord#sourceOffset()}.
265+
* @return PTransform {@link #read}
266+
*/
267+
public Read<T> withStartOffset(Map<String, Object> startOffset) {
268+
checkArgument(startOffset != null, "startOffset can not be null");
269+
return toBuilder().setStartOffset(startOffset).build();
270+
}
271+
272+
/**
273+
* Sets an {@link OffsetRetainer} that automatically saves and restores the connector offset,
274+
* allowing the pipeline to resume from where it left off after a restart without any manual
275+
* offset management.
276+
*
277+
* <p>When a retainer is configured:
278+
*
279+
* <ol>
280+
* <li>At pipeline startup, {@link OffsetRetainer#loadOffset()} is called. If a saved offset
281+
* is found, the connector resumes from that position; otherwise it starts from the
282+
* beginning of the change stream.
283+
* <li>After each successful checkpoint ({@code task.commit()}), {@link
284+
* OffsetRetainer#saveOffset(Map)} is called with the latest committed offset.
285+
* </ol>
286+
*
287+
* <p>The built-in {@link FileSystemOffsetRetainer} persists the offset as a JSON file on any
288+
* Beam-compatible filesystem (local, GCS, S3, etc.):
289+
*
290+
* <pre>{@code
291+
* DebeziumIO.read()
292+
* .withConnectorConfiguration(config)
293+
* .withOffsetRetainer(
294+
* new FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
295+
* .withFormatFunction(myMapper);
296+
* }</pre>
297+
*
298+
* <p>When both a retainer and {@link #withStartOffset(Map)} are set, the retainer takes
299+
* precedence. Use {@link #withStartOffset(Map)} alone for a one-time manual override.
300+
*
301+
* @param retainer The {@link OffsetRetainer} to use for loading and saving offsets.
302+
* @return PTransform {@link #read}
303+
*/
304+
public Read<T> withOffsetRetainer(OffsetRetainer retainer) {
305+
checkArgument(retainer != null, "retainer can not be null");
306+
return toBuilder().setOffsetRetainer(retainer).build();
307+
}
308+
233309
protected Schema getRecordSchema() {
234310
KafkaSourceConsumerFn<T> fn =
235311
new KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(), this);

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.io.debezium;
1919

2020
import com.google.auto.service.AutoService;
21+
import java.util.HashMap;
2122
import java.util.List;
2223
import java.util.Map;
2324
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
@@ -78,6 +79,8 @@ public static class Configuration extends CrossLanguageConfiguration {
7879
private @Nullable List<String> connectionProperties;
7980
private @Nullable Long maxNumberOfRecords;
8081
private @Nullable Long maxTimeToRun;
82+
private @Nullable List<String> startOffset;
83+
private @Nullable String offsetStoragePath;
8184

8285
public void setConnectionProperties(@Nullable List<String> connectionProperties) {
8386
this.connectionProperties = connectionProperties;
@@ -90,6 +93,14 @@ public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
9093
public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
9194
this.maxTimeToRun = maxTimeToRun;
9295
}
96+
97+
public void setStartOffset(@Nullable List<String> startOffset) {
98+
this.startOffset = startOffset;
99+
}
100+
101+
public void setOffsetStoragePath(@Nullable String offsetStoragePath) {
102+
this.offsetStoragePath = offsetStoragePath;
103+
}
93104
}
94105

95106
@Override
@@ -123,6 +134,27 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
123134
readTransform = readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
124135
}
125136

137+
if (configuration.startOffset != null) {
138+
Map<String, Object> startOffsetMap = new HashMap<>();
139+
for (String property : configuration.startOffset) {
140+
String[] parts = property.split("=", 2);
141+
String key = parts[0];
142+
String value = parts[1];
143+
try {
144+
startOffsetMap.put(key, Long.parseLong(value));
145+
} catch (NumberFormatException e) {
146+
startOffsetMap.put(key, value);
147+
}
148+
}
149+
readTransform = readTransform.withStartOffset(startOffsetMap);
150+
}
151+
152+
if (configuration.offsetStoragePath != null) {
153+
readTransform =
154+
readTransform.withOffsetRetainer(
155+
new FileSystemOffsetRetainer(configuration.offsetStoragePath));
156+
}
157+
126158
return readTransform;
127159
}
128160
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.io.debezium;
19+
20+
import com.fasterxml.jackson.core.type.TypeReference;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import java.io.FileNotFoundException;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.OutputStream;
26+
import java.nio.channels.Channels;
27+
import java.nio.channels.ReadableByteChannel;
28+
import java.nio.channels.WritableByteChannel;
29+
import java.util.Map;
30+
import org.apache.beam.sdk.io.FileSystems;
31+
import org.apache.beam.sdk.io.fs.ResourceId;
32+
import org.checkerframework.checker.nullness.qual.Nullable;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
/**
37+
* An {@link OffsetRetainer} that persists the Debezium connector offset as a JSON file using Beam's
38+
* {@link FileSystems} abstraction.
39+
*
40+
* <p>The {@code path} argument can point to any filesystem supported by the active Beam runner,
41+
* including local disk, Google Cloud Storage, Amazon S3, and others
42+
*
43+
* <p>On every {@code task.commit()}, the latest offset is serialised to JSON and written to the
44+
* given path (overwriting the previous file). On pipeline startup the file is read back and the
45+
* connector resumes from the stored position. If the file does not yet exist the connector starts
46+
* from the beginning of the change stream.
47+
*
48+
* <p>Example — resume from GCS:
49+
*
50+
* <pre>{@code
51+
* DebeziumIO.read()
52+
* .withConnectorConfiguration(config)
53+
* .withOffsetRetainer(
54+
* new FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
55+
* .withFormatFunction(myMapper);
56+
* }</pre>
57+
*
58+
* <p>Example — local filesystem (useful for testing):
59+
*
60+
* <pre>{@code
61+
* DebeziumIO.read()
62+
* .withConnectorConfiguration(config)
63+
* .withOffsetRetainer(new FileSystemOffsetRetainer("/tmp/debezium-offset.json"))
64+
* .withFormatFunction(myMapper);
65+
* }</pre>
66+
*
67+
* <p><b>Note:</b> writes are not atomic. If the pipeline is killed mid-write, the offset file may
68+
* be corrupt. In that case, delete the file and the connector will restart from the beginning.
69+
*/
70+
public class FileSystemOffsetRetainer implements OffsetRetainer {
71+
72+
private static final Logger LOG = LoggerFactory.getLogger(FileSystemOffsetRetainer.class);
73+
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};
74+
75+
private final String path;
76+
77+
// ObjectMapper is thread-safe after configuration and does not need to be serialised.
78+
private transient @Nullable ObjectMapper objectMapper;
79+
80+
public FileSystemOffsetRetainer(String path) {
81+
this.path = path;
82+
}
83+
84+
private ObjectMapper mapper() {
85+
if (objectMapper == null) {
86+
objectMapper = new ObjectMapper();
87+
}
88+
return objectMapper;
89+
}
90+
91+
/**
92+
* Reads the offset JSON file and returns its contents, or {@code null} if the file does not exist
93+
* or cannot be read.
94+
*/
95+
@Override
96+
public @Nullable Map<String, Object> loadOffset() {
97+
try {
98+
ResourceId resourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false);
99+
try (ReadableByteChannel channel = FileSystems.open(resourceId);
100+
InputStream stream = Channels.newInputStream(channel)) {
101+
Map<String, Object> offset = mapper().readValue(stream, MAP_TYPE);
102+
LOG.info("OffsetRetainer: loaded offset from {}: {}", path, offset);
103+
return offset;
104+
}
105+
} catch (FileNotFoundException e) {
106+
LOG.info("OffsetRetainer: no offset file found at {}; starting from the beginning.", path);
107+
return null;
108+
} catch (IOException e) {
109+
LOG.warn(
110+
"OffsetRetainer: failed to load offset from {}; starting from the beginning.", path, e);
111+
return null;
112+
}
113+
}
114+
115+
/**
116+
* Serialises {@code offset} to JSON and writes it to the configured path, overwriting any
117+
* existing file. Errors are logged as warnings and swallowed so the pipeline continues.
118+
*/
119+
@Override
120+
public void saveOffset(Map<String, Object> offset) {
121+
try {
122+
ResourceId resourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false);
123+
try (WritableByteChannel channel = FileSystems.create(resourceId, "application/json");
124+
OutputStream stream = Channels.newOutputStream(channel)) {
125+
mapper().writeValue(stream, offset);
126+
}
127+
LOG.debug("OffsetRetainer: saved offset to {}: {}", path, offset);
128+
} catch (IOException e) {
129+
LOG.warn(
130+
"OffsetRetainer: failed to save offset to {}."
131+
+ " The offset will be lost if the pipeline restarts.",
132+
path,
133+
e);
134+
}
135+
}
136+
}

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,21 @@ public class KafkaSourceConsumerFn<T> extends DoFn<Map<String, String>, T> {
122122
@GetInitialRestriction
123123
public OffsetHolder getInitialRestriction(@Element Map<String, String> unused)
124124
throws IOException {
125-
return new OffsetHolder(null, null, null, spec.getMaxNumberOfRecords(), spec.getMaxTimeToRun());
125+
Map<String, Object> initialOffset = null;
126+
127+
// Retainer takes precedence: it reflects the most recently committed position.
128+
OffsetRetainer retainer = spec.getOffsetRetainer();
129+
if (retainer != null) {
130+
initialOffset = retainer.loadOffset();
131+
}
132+
133+
// Fall back to the explicit one-time override when the retainer has no saved offset.
134+
if (initialOffset == null) {
135+
initialOffset = spec.getStartOffset();
136+
}
137+
138+
return new OffsetHolder(
139+
initialOffset, null, null, spec.getMaxNumberOfRecords(), spec.getMaxTimeToRun());
126140
}
127141

128142
@NewTracker
@@ -284,6 +298,16 @@ public ProcessContinuation process(
284298
receiver.outputWithTimestamp(json, recordInstant);
285299
}
286300
task.commit();
301+
302+
// Persist the offset after every successful commit so the pipeline can resume
303+
// from this position on restart.
304+
OffsetRetainer retainer = spec.getOffsetRetainer();
305+
@SuppressWarnings("unchecked")
306+
Map<String, Object> committedOffset =
307+
(Map<String, Object>) tracker.currentRestriction().offset;
308+
if (retainer != null && committedOffset != null) {
309+
retainer.saveOffset(committedOffset);
310+
}
287311
}
288312
} catch (Exception ex) {
289313
throw new RuntimeException("Error occurred when consuming changes from Database. ", ex);

0 commit comments

Comments
 (0)