diff --git a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 3f2477a384..620abbcc08 100644 --- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -24,6 +24,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashMap; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -76,7 +78,7 @@ private enum State { protected JdbcUtils.TableDefinition tableDefinition; // for flush - private Deque> incomingList; + private LinkedBlockingDeque> incomingList; private AtomicBoolean isFlushing; private int batchSize; private int maxQueueSize; @@ -84,7 +86,6 @@ private enum State { private ScheduledFuture scheduledFlushTask; private SinkContext sinkContext; private Properties connectionProperties; - private volatile boolean queueFullLogged = false; private final AtomicReference state = new AtomicReference<>(State.OPEN); @Override @@ -127,7 +128,11 @@ public void open(Map config, SinkContext sinkContext) throws Exc } // maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior) log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded"); - incomingList = new LinkedList<>(); + if (maxQueueSize > 0) { + incomingList = new LinkedBlockingDeque<>(maxQueueSize); + } else { + incomingList = new LinkedBlockingDeque<>(); + } isFlushing = new AtomicBoolean(false); flushExecutor = Executors.newScheduledThreadPool(1); @@ -172,6 +177,10 @@ private static List getListFromConfig(String jdbcSinkConfig) { @Override public void close() throws Exception { state.set(State.CLOSED); + // Fail any records still in the queue + List> remaining = new ArrayList<>(); + incomingList.drainTo(remaining); + remaining.forEach(Record::fail); if (scheduledFlushTask != null) { scheduledFlushTask.cancel(false); scheduledFlushTask = null; @@ -211,31 +220,30 @@ public void write(Record record) throws Exception { record.fail(); return; } - int number = 0; - boolean shouldFail = false; - boolean shouldLogQueueFull = false; - int queueSizeSnapshot = 0; - synchronized (incomingList) { - if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) { - if (!queueFullLogged) { - queueFullLogged = true; - shouldLogQueueFull = true; - queueSizeSnapshot = incomingList.size(); - } - shouldFail = true; - } else { - incomingList.add(record); - number = incomingList.size(); - } + + // Block until space is available or timeout expires. + // Blocking the Pulsar IO thread IS the back-pressure: it stops + // message consumption for this sink's subscription. + boolean accepted; + try { + accepted = incomingList.offer(record, 1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + record.fail(); + return; } - if (shouldFail) { - if (shouldLogQueueFull) { - log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure", - queueSizeSnapshot, maxQueueSize); + + if (!accepted) { + if (state.get() != State.OPEN) { + log.warn("Sink became {} while waiting for queue space, failing record", state.get()); + } else { + log.warn("Queue still full after timeout (capacity: {}), failing record", maxQueueSize); } record.fail(); return; } + + int number = incomingList.size(); if (batchSize > 0 && number >= batchSize) { if (log.isDebugEnabled()) { log.debug("flushing by batches, hit batch size {}", batchSize); @@ -287,108 +295,99 @@ private void flush() { } if (!isFlushing.compareAndSet(false, true)) { if (log.isDebugEnabled()) { - synchronized (incomingList) { - log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); - } + log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); } return; } - boolean needAnotherRound; - final Deque> swapList = new LinkedList<>(); - - synchronized (incomingList) { - if (incomingList.isEmpty()) { - isFlushing.set(false); - return; - } - if (log.isDebugEnabled()) { - log.debug("Starting flush, queue size: {}", incomingList.size()); - } - final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) : - incomingList.size(); - for (int i = 0; i < actualBatchSize; i++) { - swapList.add(incomingList.removeFirst()); - } - needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize; - } - long start = System.nanoTime(); - - int count = 0; - try { - ensureConnection(); - - PreparedStatement currentBatch = null; - final List mutations = swapList - .stream() - .map(this::createMutation) - .collect(Collectors.toList()); - // bind each record value - PreparedStatement statement; - for (Mutation mutation : mutations) { - switch (mutation.getType()) { - case DELETE: - statement = deleteStatement; - break; - case UPDATE: - statement = updateStatement; - break; - case INSERT: - statement = insertStatement; - break; - case UPSERT: - statement = upsertStatement; - break; - default: - String msg = String.format( - "Unsupported action %s, can be one of %s, or not set which indicate %s", - mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT); - throw new IllegalArgumentException(msg); + try { + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + final Deque> swapList = new LinkedList<>(); + try { + int drainSize = batchSize > 0 ? batchSize : Integer.MAX_VALUE; + incomingList.drainTo(swapList, drainSize); + if (swapList.isEmpty()) { + return; } - bindValue(statement, mutation); - count += 1; - if (jdbcSinkConfig.isUseJdbcBatch()) { - if (currentBatch != null && statement != currentBatch) { - internalFlushBatch(swapList, currentBatch, count, start); - start = System.nanoTime(); + if (log.isDebugEnabled()) { + log.debug("Starting flush, drained {} records, queue remaining: {}", + swapList.size(), incomingList.size()); + } + + long start = System.nanoTime(); + int count = 0; + + ensureConnection(); + + PreparedStatement currentBatch = null; + final List mutations = swapList + .stream() + .map(this::createMutation) + .collect(Collectors.toList()); + // bind each record value + PreparedStatement statement; + for (Mutation mutation : mutations) { + switch (mutation.getType()) { + case DELETE: + statement = deleteStatement; + break; + case UPDATE: + statement = updateStatement; + break; + case INSERT: + statement = insertStatement; + break; + case UPSERT: + statement = upsertStatement; + break; + default: + String msg = String.format( + "Unsupported action %s, can be one of %s, or not set which indicate %s", + mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT); + throw new IllegalArgumentException(msg); } - statement.addBatch(); - currentBatch = statement; - } else { - statement.execute(); - if (!jdbcSinkConfig.isUseTransactions()) { - swapList.removeFirst().ack(); + bindValue(statement, mutation); + count += 1; + if (jdbcSinkConfig.isUseJdbcBatch()) { + if (currentBatch != null && statement != currentBatch) { + internalFlushBatch(swapList, currentBatch, count, start); + start = System.nanoTime(); + } + statement.addBatch(); + currentBatch = statement; + } else { + statement.execute(); + if (!jdbcSinkConfig.isUseTransactions()) { + swapList.removeFirst().ack(); + } } } - } - if (jdbcSinkConfig.isUseJdbcBatch()) { - internalFlushBatch(swapList, currentBatch, count, start); - } else { - internalFlush(swapList); - } - queueFullLogged = false; - } catch (Exception e) { - log.error("Got exception {} after {} ms, failing {} messages", - e.getMessage(), - (System.nanoTime() - start) / 1000 / 1000, - swapList.size(), - e); - swapList.forEach(Record::fail); - try { - if (jdbcSinkConfig.isUseTransactions()) { - connection.rollback(); + if (jdbcSinkConfig.isUseJdbcBatch()) { + internalFlushBatch(swapList, currentBatch, count, start); + } else { + internalFlush(swapList); } - } catch (Exception ex) { - log.error("Failed to rollback transaction", ex); + continueLoop = batchSize > 0 && incomingList.size() >= batchSize; + } catch (Exception e) { + log.error("Got exception {} failing {} messages", + e.getMessage(), swapList.size(), e); + swapList.forEach(Record::fail); + try { + if (jdbcSinkConfig.isUseTransactions()) { + connection.rollback(); + } + } catch (Exception ex) { + log.error("Failed to rollback transaction", ex); + } + fatal(e); } - fatal(e); } - + } finally { isFlushing.set(false); - if (needAnotherRound) { - flush(); - } + } } private void ensureConnection() throws Exception {