Skip to content

Commit b09fdc1

Browse files
committed
Adjust to review comments and add unit tests
1 parent 8e5e307 commit b09fdc1

3 files changed

Lines changed: 176 additions & 15 deletions

File tree

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
152152
if (configuration.offsetStoragePath != null) {
153153
readTransform =
154154
readTransform.withOffsetRetainer(
155-
new FileSystemOffsetRetainer(configuration.offsetStoragePath));
155+
FileSystemOffsetRetainer.of(configuration.offsetStoragePath));
156156
}
157157

158158
return readTransform;

sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.nio.channels.Channels;
2727
import java.nio.channels.ReadableByteChannel;
2828
import java.nio.channels.WritableByteChannel;
29+
import java.util.Collections;
2930
import java.util.Map;
3031
import org.apache.beam.sdk.io.FileSystems;
3132
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -51,7 +52,7 @@
5152
* DebeziumIO.read()
5253
* .withConnectorConfiguration(config)
5354
* .withOffsetRetainer(
54-
* new FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
55+
* FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
5556
* .withFormatFunction(myMapper);
5657
* }</pre>
5758
*
@@ -60,12 +61,13 @@
6061
* <pre>{@code
6162
* DebeziumIO.read()
6263
* .withConnectorConfiguration(config)
63-
* .withOffsetRetainer(new FileSystemOffsetRetainer("/tmp/debezium-offset.json"))
64+
* .withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
6465
* .withFormatFunction(myMapper);
6566
* }</pre>
6667
*
67-
* <p><b>Note:</b> writes are not atomic. If the pipeline is killed mid-write, the offset file may
68-
* be corrupt. In that case, delete the file and the connector will restart from the beginning.
68+
* <p><b>Note:</b> writes are performed atomically: the offset is first written to a {@code .tmp}
69+
* sibling file and then renamed to the final path, so a mid-write crash leaves the previous offset
70+
* intact.
6971
*/
7072
public class FileSystemOffsetRetainer implements OffsetRetainer {
7173

@@ -77,10 +79,18 @@ public class FileSystemOffsetRetainer implements OffsetRetainer {
7779
// ObjectMapper is thread-safe after configuration and does not need to be serialised.
7880
private transient @Nullable ObjectMapper objectMapper;
7981

80-
public FileSystemOffsetRetainer(String path) {
82+
// Tracks the last successfully saved offset so repeated identical saves are skipped.
83+
private transient @Nullable Map<String, Object> lastSavedOffset;
84+
85+
private FileSystemOffsetRetainer(String path) {
8186
this.path = path;
8287
}
8388

89+
/** Creates a new {@code FileSystemOffsetRetainer} that stores the offset at {@code path}. */
90+
public static FileSystemOffsetRetainer of(String path) {
91+
return new FileSystemOffsetRetainer(path);
92+
}
93+
8494
private ObjectMapper mapper() {
8595
if (objectMapper == null) {
8696
objectMapper = new ObjectMapper();
@@ -89,8 +99,9 @@ private ObjectMapper mapper() {
8999
}
90100

91101
/**
92-
* Reads the offset JSON file and returns its contents, or {@code null} if the file does not exist
93-
* or cannot be read.
102+
* Reads the offset JSON file and returns its contents, or {@code null} if the file does not yet
103+
* exist (first run). Throws {@link RuntimeException} if the file exists but cannot be read, to
104+
* prevent silently reprocessing data from the beginning.
94105
*/
95106
@Override
96107
public @Nullable Map<String, Object> loadOffset() {
@@ -106,24 +117,43 @@ private ObjectMapper mapper() {
106117
LOG.info("OffsetRetainer: no offset file found at {}; starting from the beginning.", path);
107118
return null;
108119
} catch (IOException e) {
109-
LOG.warn(
110-
"OffsetRetainer: failed to load offset from {}; starting from the beginning.", path, e);
111-
return null;
120+
throw new RuntimeException(
121+
"OffsetRetainer: failed to read offset from "
122+
+ path
123+
+ ". "
124+
+ "Delete the file to restart from the beginning.",
125+
e);
112126
}
113127
}
114128

115129
/**
116-
* Serialises {@code offset} to JSON and writes it to the configured path, overwriting any
117-
* existing file. Errors are logged as warnings and swallowed so the pipeline continues.
130+
* Serialises {@code offset} to JSON and writes it atomically to the configured path.
131+
*
132+
* <p>If the offset is identical to the last successfully written one, the write is skipped to
133+
* avoid unnecessary I/O on every checkpoint.
134+
*
135+
* <p>Otherwise the data is first written to a {@code .tmp} sibling file and then renamed to the
136+
* final path, so a mid-write crash leaves the previous offset intact.
137+
*
138+
* <p>Errors are logged as warnings and swallowed so the pipeline continues.
118139
*/
119140
@Override
120141
public void saveOffset(Map<String, Object> offset) {
142+
if (offset.equals(lastSavedOffset)) {
143+
LOG.debug("OffsetRetainer: offset unchanged, skipping write to {}", path);
144+
return;
145+
}
146+
String tmpPath = path + ".tmp";
121147
try {
122-
ResourceId resourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false);
123-
try (WritableByteChannel channel = FileSystems.create(resourceId, "application/json");
148+
ResourceId tmpResourceId = FileSystems.matchNewResource(tmpPath, /* isDirectory= */ false);
149+
try (WritableByteChannel channel = FileSystems.create(tmpResourceId, "application/json");
124150
OutputStream stream = Channels.newOutputStream(channel)) {
125151
mapper().writeValue(stream, offset);
126152
}
153+
ResourceId finalResourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false);
154+
FileSystems.rename(
155+
Collections.singletonList(tmpResourceId), Collections.singletonList(finalResourceId));
156+
lastSavedOffset = offset;
127157
LOG.debug("OffsetRetainer: saved offset to {}: {}", path, offset);
128158
} catch (IOException e) {
129159
LOG.warn(
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.io.debezium;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNotNull;
23+
import static org.junit.Assert.assertNull;
24+
import static org.junit.Assert.assertThrows;
25+
import static org.junit.Assert.assertTrue;
26+
27+
import java.io.ByteArrayInputStream;
28+
import java.io.ByteArrayOutputStream;
29+
import java.io.File;
30+
import java.io.ObjectInputStream;
31+
import java.io.ObjectOutputStream;
32+
import java.nio.charset.StandardCharsets;
33+
import java.nio.file.Files;
34+
import java.nio.file.Paths;
35+
import java.util.Map;
36+
import org.apache.beam.sdk.io.FileSystems;
37+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
38+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
39+
import org.junit.Before;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
import org.junit.rules.TemporaryFolder;
43+
import org.junit.runner.RunWith;
44+
import org.junit.runners.JUnit4;
45+
46+
/** Unit tests for {@link FileSystemOffsetRetainer}. */
47+
@RunWith(JUnit4.class)
48+
public class FileSystemOffsetRetainerTest {
49+
50+
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
51+
52+
@Before
53+
public void setUp() {
54+
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
55+
}
56+
57+
@Test
58+
public void testLoadOffsetReturnNullWhenFileIsMissing() {
59+
String path = tmpFolder.getRoot().getAbsolutePath() + "/nonexistent.json";
60+
assertNull(FileSystemOffsetRetainer.of(path).loadOffset());
61+
}
62+
63+
@Test
64+
public void testLoadOffsetThrowsWhenFileIsUnreadable() throws Exception {
65+
String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
66+
FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
67+
retainer.saveOffset(ImmutableMap.of("lsn", "100"));
68+
69+
// Corrupt the file so JSON parsing fails.
70+
Files.newBufferedWriter(Paths.get(path), StandardCharsets.UTF_8).close(); // truncate to empty
71+
assertThrows(RuntimeException.class, retainer::loadOffset);
72+
}
73+
74+
@Test
75+
public void testSaveAndLoadOffsetRoundTrip() {
76+
String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
77+
FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
78+
79+
retainer.saveOffset(ImmutableMap.of("file", "binlog.000001", "pos", "156"));
80+
81+
Map<String, Object> loaded = retainer.loadOffset();
82+
assertNotNull(loaded);
83+
assertEquals("binlog.000001", loaded.get("file"));
84+
assertEquals("156", loaded.get("pos"));
85+
}
86+
87+
@Test
88+
public void testSaveOffsetSkipsWriteWhenOffsetUnchanged() throws Exception {
89+
String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
90+
FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
91+
Map<String, Object> offset = ImmutableMap.of("lsn", "100");
92+
93+
retainer.saveOffset(offset);
94+
long modifiedAfterFirstSave = new File(path).lastModified();
95+
96+
// Second call with the same offset should not touch the file.
97+
Thread.sleep(10); // ensure mtime would differ if a write occurred
98+
retainer.saveOffset(offset);
99+
assertEquals(modifiedAfterFirstSave, new File(path).lastModified());
100+
}
101+
102+
@Test
103+
public void testSaveOffsetLeavesNoTmpFile() {
104+
String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
105+
FileSystemOffsetRetainer.of(path).saveOffset(ImmutableMap.of("lsn", "28160840"));
106+
107+
assertTrue("Final offset file should exist", new File(path).exists());
108+
assertFalse("Temp file should not remain after rename", new File(path + ".tmp").exists());
109+
}
110+
111+
@Test
112+
public void testSerializedRetainerCanLoadAfterDeserialization() throws Exception {
113+
String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
114+
FileSystemOffsetRetainer original = FileSystemOffsetRetainer.of(path);
115+
original.saveOffset(ImmutableMap.of("lsn", "12345"));
116+
117+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
118+
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
119+
oos.writeObject(original);
120+
}
121+
FileSystemOffsetRetainer deserialized;
122+
try (ObjectInputStream ois =
123+
new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
124+
deserialized = (FileSystemOffsetRetainer) ois.readObject();
125+
}
126+
127+
Map<String, Object> loaded = deserialized.loadOffset();
128+
assertNotNull(loaded);
129+
assertEquals("12345", loaded.get("lsn"));
130+
}
131+
}

0 commit comments

Comments
 (0)