Skip to content

Commit 5f178e7

Browse files
harangozoppeterh-wob
authored andcommitted
[fix][io] JDBC sink: prevent OOM from unbounded queue on connection failure
The JDBC sink's internal queue (incomingList) is unbounded. When the database connection drops, executeBatch() hangs until the TCP socket times out. During this period, isFlushing stays true, preventing any draining, while write() continues accepting records without limit. This causes OutOfMemoryError in production. This commit fixes 4 issues: 1. Bounded internal queue: write() now rejects records when queue exceeds maxQueueSize (configurable, defaults to 10x batchSize), applying Pulsar-level back-pressure via negative acknowledgment. 2. State check in write(): records are failed immediately when the sink state is not OPEN (after fatal() or close()). 3. Connection validation and reconnection: ensureConnection() validates the JDBC connection before each flush and reconnects automatically on failure, allowing recovery from transient database outages. 4. Scheduled flush cancellation: fatal() and close() now cancel the periodic flush task to prevent repeated failures on a broken connection. Fixes apache/pulsar#25030
1 parent 314b9de commit 5f178e7

3 files changed

Lines changed: 235 additions & 5 deletions

File tree

jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Properties;
3434
import java.util.concurrent.Executors;
3535
import java.util.concurrent.ScheduledExecutorService;
36+
import java.util.concurrent.ScheduledFuture;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicBoolean;
3839
import java.util.concurrent.atomic.AtomicReference;
@@ -78,8 +79,12 @@ private enum State {
7879
private Deque<Record<T>> incomingList;
7980
private AtomicBoolean isFlushing;
8081
private int batchSize;
82+
private int maxQueueSize;
8183
private ScheduledExecutorService flushExecutor;
84+
private ScheduledFuture<?> scheduledFlushTask;
8285
private SinkContext sinkContext;
86+
private Properties connectionProperties;
87+
private volatile boolean queueFullLogged = false;
8388
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);
8489

8590
@Override
@@ -93,17 +98,17 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
9398
throw new IllegalArgumentException("Required jdbc Url not set.");
9499
}
95100

96-
Properties properties = new Properties();
101+
connectionProperties = new Properties();
97102
String username = jdbcSinkConfig.getUserName();
98103
String password = jdbcSinkConfig.getPassword();
99104
if (username != null) {
100-
properties.setProperty("user", username);
105+
connectionProperties.setProperty("user", username);
101106
}
102107
if (password != null) {
103-
properties.setProperty("password", password);
108+
connectionProperties.setProperty("password", password);
104109
}
105110

106-
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
111+
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties);
107112
connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
108113
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
109114

@@ -114,12 +119,20 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
114119

115120
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
116121
batchSize = jdbcSinkConfig.getBatchSize();
122+
maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
123+
if (maxQueueSize == 0) {
124+
// Auto-size: default to 10x batch size
125+
maxQueueSize = batchSize > 0 ? batchSize * 10 : 10000;
126+
}
127+
// maxQueueSize < 0 (e.g. -1) means unbounded (legacy behavior)
128+
log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded");
117129
incomingList = new LinkedList<>();
118130
isFlushing = new AtomicBoolean(false);
119131

120132
flushExecutor = Executors.newScheduledThreadPool(1);
121133
if (timeoutMs > 0) {
122-
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
134+
scheduledFlushTask = flushExecutor.scheduleAtFixedRate(
135+
this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
123136
}
124137
}
125138

@@ -158,6 +171,10 @@ private static List<String> getListFromConfig(String jdbcSinkConfig) {
158171
@Override
159172
public void close() throws Exception {
160173
state.set(State.CLOSED);
174+
if (scheduledFlushTask != null) {
175+
scheduledFlushTask.cancel(false);
176+
scheduledFlushTask = null;
177+
}
161178
if (flushExecutor != null) {
162179
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
163180
flushExecutor.shutdown();
@@ -188,8 +205,22 @@ public void close() throws Exception {
188205

189206
@Override
190207
public void write(Record<T> record) throws Exception {
208+
if (state.get() != State.OPEN) {
209+
log.warn("Sink is not in OPEN state (current: {}), failing record", state.get());
210+
record.fail();
211+
return;
212+
}
191213
int number;
192214
synchronized (incomingList) {
215+
if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
216+
if (!queueFullLogged) {
217+
log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure",
218+
incomingList.size(), maxQueueSize);
219+
queueFullLogged = true;
220+
}
221+
record.fail();
222+
return;
223+
}
193224
incomingList.add(record);
194225
number = incomingList.size();
195226
}
@@ -239,6 +270,9 @@ protected enum MutationType {
239270

240271

241272
private void flush() {
273+
if (state.get() == State.CLOSED) {
274+
return;
275+
}
242276
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
243277
boolean needAnotherRound;
244278
final Deque<Record<T>> swapList = new LinkedList<>();
@@ -259,6 +293,8 @@ private void flush() {
259293

260294
int count = 0;
261295
try {
296+
ensureConnection();
297+
262298
PreparedStatement currentBatch = null;
263299
final List<Mutation> mutations = swapList
264300
.stream()
@@ -308,6 +344,7 @@ private void flush() {
308344
} else {
309345
internalFlush(swapList);
310346
}
347+
queueFullLogged = false;
311348
} catch (Exception e) {
312349
log.error("Got exception {} after {} ms, failing {} messages",
313350
e.getMessage(),
@@ -336,6 +373,38 @@ private void flush() {
336373
}
337374
}
338375

376+
private void ensureConnection() throws Exception {
377+
try {
378+
if (connection != null && connection.isValid(2)) {
379+
return;
380+
}
381+
} catch (SQLException e) {
382+
log.warn("Connection validation failed: {}", e.getMessage());
383+
}
384+
385+
log.info("JDBC connection is invalid, attempting to reconnect to: {}", jdbcUrl);
386+
closeConnectionQuietly();
387+
388+
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties);
389+
connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
390+
391+
tableId = JdbcUtils.getTableId(connection, tableName);
392+
initStatement();
393+
394+
log.info("Successfully reconnected to: {}", jdbcUrl);
395+
}
396+
397+
private void closeConnectionQuietly() {
398+
if (connection != null) {
399+
try {
400+
connection.close();
401+
} catch (Exception e) {
402+
log.debug("Error closing stale connection", e);
403+
}
404+
connection = null;
405+
}
406+
}
407+
339408
private void internalFlush(Deque<Record<T>> swapList) throws SQLException {
340409
if (jdbcSinkConfig.isUseTransactions()) {
341410
connection.commit();
@@ -404,6 +473,10 @@ private static boolean isBatchItemFailed(int returnCode) {
404473
*/
405474
private void fatal(Exception e) {
406475
if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) {
476+
log.error("Fatal error in JDBC sink, signaling framework for shutdown", e);
477+
if (scheduledFlushTask != null) {
478+
scheduledFlushTask.cancel(false);
479+
}
407480
sinkContext.fatal(e);
408481
}
409482
}

jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
130130
)
131131
private NullValueAction nullValueAction = NullValueAction.FAIL;
132132

133+
@FieldDoc(
134+
required = false,
135+
defaultValue = "0",
136+
help = "Maximum number of records to buffer in the internal queue before applying back-pressure. "
137+
+ "When the queue is full, incoming records will be failed (negatively acknowledged) so that "
138+
+ "the Pulsar consumer can redeliver them later. This prevents out-of-memory errors when the "
139+
+ "database connection is slow or broken. "
140+
+ "A value of 0 (default) auto-sizes to batchSize * 10. "
141+
+ "A value of -1 disables the limit (unbounded, legacy behavior)."
142+
)
143+
private int maxQueueSize = 0;
144+
133145
public enum InsertMode {
134146
INSERT,
135147
UPSERT,

jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,151 @@ public void testFatalCalledOnFlushException() throws Exception {
932932
}
933933
}
934934

935+
/**
936+
* Test that write() rejects records when the sink is in FAILED state.
937+
* After fatal() is called, records should be failed immediately instead of queuing.
938+
*/
939+
@Test
940+
public void testWriteRejectsRecordsAfterFatal() throws Exception {
941+
jdbcSink.close();
942+
jdbcSink = null;
943+
944+
String jdbcUrl = sqliteUtils.sqliteUri();
945+
Map<String, Object> conf = Maps.newHashMap();
946+
conf.put("jdbcUrl", jdbcUrl);
947+
conf.put("tableName", tableName);
948+
conf.put("key", "field3");
949+
conf.put("nonKey", "field1,field2");
950+
conf.put("batchSize", 1);
951+
952+
SinkContext mockSinkContext = mock(SinkContext.class);
953+
SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink();
954+
try {
955+
sinkWithContext.open(conf, mockSinkContext);
956+
957+
// Force a fatal error by replacing insertStatement with a mock that throws
958+
PreparedStatement mockStatement = mock(PreparedStatement.class);
959+
doThrow(new SQLException("Connection lost")).when(mockStatement).execute();
960+
FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true);
961+
962+
// Write first record to trigger fatal
963+
Foo obj1 = new Foo("f1", "f2", 10);
964+
CompletableFuture<Boolean> future1 = new CompletableFuture<>();
965+
sinkWithContext.write(createMockFooRecord(obj1, Maps.newHashMap(), future1));
966+
967+
// Wait for fatal to be called
968+
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
969+
verify(mockSinkContext).fatal(any(Throwable.class)));
970+
971+
// Now write another record — it should be failed immediately
972+
Foo obj2 = new Foo("f3", "f4", 11);
973+
CompletableFuture<Boolean> future2 = new CompletableFuture<>();
974+
sinkWithContext.write(createMockFooRecord(obj2, Maps.newHashMap(), future2));
975+
976+
// Record should be failed (false), not acked (true)
977+
Assert.assertFalse(future2.get(1, TimeUnit.SECONDS));
978+
} finally {
979+
sinkWithContext.close();
980+
}
981+
}
982+
983+
/**
984+
* Test that the bounded queue applies back-pressure by failing records when full.
985+
*/
986+
@Test
987+
public void testBoundedQueueBackPressure() throws Exception {
988+
jdbcSink.close();
989+
jdbcSink = null;
990+
991+
String jdbcUrl = sqliteUtils.sqliteUri();
992+
Map<String, Object> conf = Maps.newHashMap();
993+
conf.put("jdbcUrl", jdbcUrl);
994+
conf.put("tableName", tableName);
995+
conf.put("key", "field3");
996+
conf.put("nonKey", "field1,field2");
997+
// Large batch size so flush is not triggered by writes
998+
conf.put("batchSize", 1000);
999+
// No time-based flush
1000+
conf.put("timeoutMs", 0);
1001+
// Small queue to test back-pressure
1002+
conf.put("maxQueueSize", 5);
1003+
1004+
SqliteJdbcAutoSchemaSink boundedSink = new SqliteJdbcAutoSchemaSink();
1005+
try {
1006+
boundedSink.open(conf, null);
1007+
1008+
// Fill the queue to capacity
1009+
List<CompletableFuture<Boolean>> futures = new java.util.ArrayList<>();
1010+
for (int i = 0; i < 5; i++) {
1011+
CompletableFuture<Boolean> f = new CompletableFuture<>();
1012+
futures.add(f);
1013+
boundedSink.write(createMockFooRecord(new Foo("f1", "f2", i + 100), Maps.newHashMap(), f));
1014+
}
1015+
1016+
// Next write should be rejected due to queue full
1017+
CompletableFuture<Boolean> overflowFuture = new CompletableFuture<>();
1018+
boundedSink.write(createMockFooRecord(new Foo("overflow", "val", 999), Maps.newHashMap(), overflowFuture));
1019+
1020+
// The overflow record should be failed immediately
1021+
Assert.assertFalse(overflowFuture.get(1, TimeUnit.SECONDS));
1022+
} finally {
1023+
boundedSink.close();
1024+
}
1025+
}
1026+
1027+
/**
1028+
* Test that ensureConnection() reconnects when the existing connection becomes invalid.
1029+
* Simulates a database going away and coming back by closing the connection mid-flight,
1030+
* then verifying the sink recovers and processes the next batch successfully.
1031+
*/
1032+
@Test
1033+
public void testReconnectOnBrokenConnection() throws Exception {
1034+
jdbcSink.close();
1035+
jdbcSink = null;
1036+
1037+
String jdbcUrl = sqliteUtils.sqliteUri();
1038+
Map<String, Object> conf = Maps.newHashMap();
1039+
conf.put("jdbcUrl", jdbcUrl);
1040+
conf.put("tableName", tableName);
1041+
conf.put("key", "field3");
1042+
conf.put("nonKey", "field1,field2");
1043+
conf.put("batchSize", 1);
1044+
1045+
SqliteJdbcAutoSchemaSink reconnectSink = new SqliteJdbcAutoSchemaSink();
1046+
try {
1047+
reconnectSink.open(conf, null);
1048+
1049+
// First write should succeed — connection is healthy
1050+
Foo obj1 = new Foo("reconnect1", "val1", 50);
1051+
CompletableFuture<Boolean> future1 = new CompletableFuture<>();
1052+
reconnectSink.write(createMockFooRecord(obj1, Maps.newHashMap(), future1));
1053+
Assert.assertTrue(future1.get(5, TimeUnit.SECONDS));
1054+
1055+
// Verify record was persisted
1056+
int count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE field3=50", (rs) -> {
1057+
Assert.assertEquals(rs.getString(1), "reconnect1");
1058+
});
1059+
Assert.assertEquals(count, 1);
1060+
1061+
// Now break the connection by closing it behind the sink's back
1062+
reconnectSink.getConnection().close();
1063+
1064+
// Next write should trigger ensureConnection() → reconnect → succeed
1065+
Foo obj2 = new Foo("reconnect2", "val2", 51);
1066+
CompletableFuture<Boolean> future2 = new CompletableFuture<>();
1067+
reconnectSink.write(createMockFooRecord(obj2, Maps.newHashMap(), future2));
1068+
Assert.assertTrue(future2.get(5, TimeUnit.SECONDS));
1069+
1070+
// Verify second record was also persisted after reconnect
1071+
count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE field3=51", (rs) -> {
1072+
Assert.assertEquals(rs.getString(1), "reconnect2");
1073+
});
1074+
Assert.assertEquals(count, 1);
1075+
} finally {
1076+
reconnectSink.close();
1077+
}
1078+
}
1079+
9351080
@SuppressWarnings("unchecked")
9361081
private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties,
9371082
CompletableFuture<Boolean> future) {

0 commit comments

Comments
 (0)