Skip to content

Commit 6f4ac21

Browse files
authored
[feat][io] implement pip-297 for jdbc sinks (#25195)
1 parent e160b1a commit 6f4ac21

2 files changed

Lines changed: 97 additions & 2 deletions

File tree

pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.ScheduledExecutorService;
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicReference;
3839
import java.util.function.Function;
3940
import java.util.stream.Collectors;
4041
import lombok.AllArgsConstructor;
@@ -50,6 +51,11 @@
5051
*/
5152
@Slf4j
5253
public abstract class JdbcAbstractSink<T> implements Sink<T> {
54+
55+
private enum State {
56+
OPEN, FAILED, CLOSED
57+
}
58+
5359
// ----- Runtime fields
5460
protected JdbcSinkConfig jdbcSinkConfig;
5561
@Getter
@@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
7379
private AtomicBoolean isFlushing;
7480
private int batchSize;
7581
private ScheduledExecutorService flushExecutor;
82+
private SinkContext sinkContext;
83+
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);
7684

7785
@Override
7886
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
87+
this.sinkContext = sinkContext;
7988
jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
8089
jdbcSinkConfig.validate();
8190

@@ -148,6 +157,7 @@ private static List<String> getListFromConfig(String jdbcSinkConfig) {
148157

149158
@Override
150159
public void close() throws Exception {
160+
state.set(State.CLOSED);
151161
if (flushExecutor != null) {
152162
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
153163
flushExecutor.shutdown();
@@ -310,8 +320,9 @@ private void flush() {
310320
connection.rollback();
311321
}
312322
} catch (Exception ex) {
313-
throw new RuntimeException(ex);
323+
log.error("Failed to rollback transaction", ex);
314324
}
325+
fatal(e);
315326
}
316327

317328
isFlushing.set(false);
@@ -385,4 +396,16 @@ private static boolean isBatchItemFailed(int returnCode) {
385396
return true;
386397
}
387398

399+
/**
400+
* Signal a fatal exception to the framework.
401+
* This will cause the function instance to terminate properly.
402+
*
403+
* @param e the fatal exception
404+
*/
405+
private void fatal(Exception e) {
406+
if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) {
407+
sinkContext.fatal(e);
408+
}
409+
}
410+
388411
}

pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818
*/
1919
package org.apache.pulsar.io.jdbc;
2020

21+
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.doThrow;
2224
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.verify;
2326
import static org.mockito.Mockito.when;
2427
import com.google.common.collect.ImmutableMap;
2528
import com.google.common.collect.Maps;
29+
import java.sql.PreparedStatement;
30+
import java.sql.SQLException;
2631
import java.util.Arrays;
2732
import java.util.HashMap;
2833
import java.util.List;
@@ -56,6 +61,7 @@
5661
import org.apache.pulsar.common.schema.SchemaType;
5762
import org.apache.pulsar.functions.api.Record;
5863
import org.apache.pulsar.functions.source.PulsarRecord;
64+
import org.apache.pulsar.io.core.SinkContext;
5965
import org.awaitility.Awaitility;
6066
import org.testng.Assert;
6167
import org.testng.annotations.AfterMethod;
@@ -133,7 +139,9 @@ protected void configure(Map<String, Object> configuration) {
133139

134140
@AfterMethod(alwaysRun = true)
135141
public void tearDown() throws Exception {
136-
jdbcSink.close();
142+
if (jdbcSink != null) {
143+
jdbcSink.close();
144+
}
137145
sqliteUtils.tearDown();
138146
}
139147

@@ -860,6 +868,70 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti
860868
}
861869
}
862870

871+
/**
872+
* Test that fatal() is called when an unrecoverable exception occurs during flush.
873+
* This verifies the PIP-297 implementation for proper termination of the sink.
874+
*
875+
* The test works by:
876+
* 1. Opening the sink with a valid table (so open() succeeds)
877+
* 2. Using reflection to replace the insertStatement with a mock that throws SQLException
878+
* 3. Writing a record to trigger flush
879+
* 4. Verifying that fatal() was called with the exception
880+
*/
881+
@Test
882+
public void testFatalCalledOnFlushException() throws Exception {
883+
jdbcSink.close();
884+
jdbcSink = null;
885+
886+
String jdbcUrl = sqliteUtils.sqliteUri();
887+
Map<String, Object> conf = Maps.newHashMap();
888+
conf.put("jdbcUrl", jdbcUrl);
889+
conf.put("tableName", tableName); // Use valid table so open() succeeds
890+
conf.put("key", "field3");
891+
conf.put("nonKey", "field1,field2");
892+
conf.put("batchSize", 1);
893+
894+
SinkContext mockSinkContext = mock(SinkContext.class);
895+
AtomicReference<Throwable> fatalException = new AtomicReference<>();
896+
doAnswer(invocation -> {
897+
fatalException.set(invocation.getArgument(0));
898+
return null;
899+
}).when(mockSinkContext).fatal(any(Throwable.class));
900+
901+
SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink();
902+
try {
903+
sinkWithContext.open(conf, mockSinkContext);
904+
905+
// Create a mock PreparedStatement that throws SQLException on execute()
906+
PreparedStatement mockStatement = mock(PreparedStatement.class);
907+
SQLException simulatedException = new SQLException("Simulated database connection failure");
908+
doThrow(simulatedException).when(mockStatement).execute();
909+
doThrow(simulatedException).when(mockStatement).executeBatch();
910+
911+
// Use reflection to replace the insertStatement with our mock
912+
FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true);
913+
914+
Foo insertObj = new Foo("f1", "f2", 1);
915+
Map<String, String> props = Maps.newHashMap();
916+
props.put("ACTION", "INSERT");
917+
CompletableFuture<Boolean> future = new CompletableFuture<>();
918+
sinkWithContext.write(createMockFooRecord(insertObj, props, future));
919+
920+
// Wait for the flush to complete and fail
921+
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
922+
verify(mockSinkContext).fatal(any(Throwable.class));
923+
Assert.assertNotNull(fatalException.get());
924+
Assert.assertTrue(fatalException.get() instanceof SQLException);
925+
Assert.assertEquals(fatalException.get().getMessage(), "Simulated database connection failure");
926+
});
927+
928+
// Verify the record was failed (not acked)
929+
Assert.assertFalse(future.get(1, TimeUnit.SECONDS));
930+
} finally {
931+
sinkWithContext.close();
932+
}
933+
}
934+
863935
@SuppressWarnings("unchecked")
864936
private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties,
865937
CompletableFuture<Boolean> future) {

0 commit comments

Comments
 (0)