Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 111 additions & 112 deletions jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
Expand All @@ -32,6 +33,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -76,15 +78,14 @@ private enum State {
protected JdbcUtils.TableDefinition tableDefinition;

// for flush
private Deque<Record<T>> incomingList;
private LinkedBlockingDeque<Record<T>> incomingList;
private AtomicBoolean isFlushing;
private int batchSize;
private int maxQueueSize;
private ScheduledExecutorService flushExecutor;
private ScheduledFuture<?> scheduledFlushTask;
private SinkContext sinkContext;
private Properties connectionProperties;
private volatile boolean queueFullLogged = false;
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);

@Override
Expand Down Expand Up @@ -127,7 +128,11 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}
// maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior)
log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded");
incomingList = new LinkedList<>();
if (maxQueueSize > 0) {
incomingList = new LinkedBlockingDeque<>(maxQueueSize);
} else {
incomingList = new LinkedBlockingDeque<>();
}
isFlushing = new AtomicBoolean(false);

flushExecutor = Executors.newScheduledThreadPool(1);
Expand Down Expand Up @@ -172,6 +177,10 @@ private static List<String> getListFromConfig(String jdbcSinkConfig) {
@Override
public void close() throws Exception {
state.set(State.CLOSED);
// Fail any records still in the queue
List<Record<T>> remaining = new ArrayList<>();
incomingList.drainTo(remaining);
remaining.forEach(Record::fail);
if (scheduledFlushTask != null) {
scheduledFlushTask.cancel(false);
scheduledFlushTask = null;
Expand Down Expand Up @@ -211,31 +220,30 @@ public void write(Record<T> record) throws Exception {
record.fail();
return;
}
int number = 0;
boolean shouldFail = false;
boolean shouldLogQueueFull = false;
int queueSizeSnapshot = 0;
synchronized (incomingList) {
if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
if (!queueFullLogged) {
queueFullLogged = true;
shouldLogQueueFull = true;
queueSizeSnapshot = incomingList.size();
}
shouldFail = true;
} else {
incomingList.add(record);
number = incomingList.size();
}

// Block until space is available or timeout expires.
// Blocking the Pulsar IO thread IS the back-pressure: it stops
// message consumption for this sink's subscription.
boolean accepted;
try {
accepted = incomingList.offer(record, 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
record.fail();
return;
}
if (shouldFail) {
if (shouldLogQueueFull) {
log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure",
queueSizeSnapshot, maxQueueSize);

if (!accepted) {
if (state.get() != State.OPEN) {
log.warn("Sink became {} while waiting for queue space, failing record", state.get());
} else {
log.warn("Queue still full after timeout (capacity: {}), failing record", maxQueueSize);
}
record.fail();
return;
}

int number = incomingList.size();
if (batchSize > 0 && number >= batchSize) {
if (log.isDebugEnabled()) {
log.debug("flushing by batches, hit batch size {}", batchSize);
Expand Down Expand Up @@ -287,108 +295,99 @@ private void flush() {
}
if (!isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
synchronized (incomingList) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
return;
}
boolean needAnotherRound;
final Deque<Record<T>> swapList = new LinkedList<>();

synchronized (incomingList) {
if (incomingList.isEmpty()) {
isFlushing.set(false);
return;
}
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
incomingList.size();

for (int i = 0; i < actualBatchSize; i++) {
swapList.add(incomingList.removeFirst());
}
needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
}
long start = System.nanoTime();

int count = 0;
try {
ensureConnection();

PreparedStatement currentBatch = null;
final List<Mutation> mutations = swapList
.stream()
.map(this::createMutation)
.collect(Collectors.toList());
// bind each record value
PreparedStatement statement;
for (Mutation mutation : mutations) {
switch (mutation.getType()) {
case DELETE:
statement = deleteStatement;
break;
case UPDATE:
statement = updateStatement;
break;
case INSERT:
statement = insertStatement;
break;
case UPSERT:
statement = upsertStatement;
break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s, or not set which indicate %s",
mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
try {
boolean continueLoop = true;
while (continueLoop) {
continueLoop = false;
final Deque<Record<T>> swapList = new LinkedList<>();
try {
int drainSize = batchSize > 0 ? batchSize : Integer.MAX_VALUE;
incomingList.drainTo(swapList, drainSize);
if (swapList.isEmpty()) {
return;
}
bindValue(statement, mutation);
count += 1;
if (jdbcSinkConfig.isUseJdbcBatch()) {
if (currentBatch != null && statement != currentBatch) {
internalFlushBatch(swapList, currentBatch, count, start);
start = System.nanoTime();
if (log.isDebugEnabled()) {
log.debug("Starting flush, drained {} records, queue remaining: {}",
swapList.size(), incomingList.size());
}

long start = System.nanoTime();
int count = 0;

ensureConnection();

PreparedStatement currentBatch = null;
final List<Mutation> mutations = swapList
.stream()
.map(this::createMutation)
.collect(Collectors.toList());
// bind each record value
PreparedStatement statement;
for (Mutation mutation : mutations) {
switch (mutation.getType()) {
case DELETE:
statement = deleteStatement;
break;
case UPDATE:
statement = updateStatement;
break;
case INSERT:
statement = insertStatement;
break;
case UPSERT:
statement = upsertStatement;
break;
default:
String msg = String.format(
"Unsupported action %s, can be one of %s, or not set which indicate %s",
mutation.getType(), Arrays.toString(MutationType.values()), MutationType.INSERT);
throw new IllegalArgumentException(msg);
}
statement.addBatch();
currentBatch = statement;
} else {
statement.execute();
if (!jdbcSinkConfig.isUseTransactions()) {
swapList.removeFirst().ack();
bindValue(statement, mutation);
count += 1;
if (jdbcSinkConfig.isUseJdbcBatch()) {
if (currentBatch != null && statement != currentBatch) {
internalFlushBatch(swapList, currentBatch, count, start);
start = System.nanoTime();
}
statement.addBatch();
currentBatch = statement;
} else {
statement.execute();
if (!jdbcSinkConfig.isUseTransactions()) {
swapList.removeFirst().ack();
}
}
}
}

if (jdbcSinkConfig.isUseJdbcBatch()) {
internalFlushBatch(swapList, currentBatch, count, start);
} else {
internalFlush(swapList);
}
queueFullLogged = false;
} catch (Exception e) {
log.error("Got exception {} after {} ms, failing {} messages",
e.getMessage(),
(System.nanoTime() - start) / 1000 / 1000,
swapList.size(),
e);
swapList.forEach(Record::fail);
try {
if (jdbcSinkConfig.isUseTransactions()) {
connection.rollback();
if (jdbcSinkConfig.isUseJdbcBatch()) {
internalFlushBatch(swapList, currentBatch, count, start);
} else {
internalFlush(swapList);
}
} catch (Exception ex) {
log.error("Failed to rollback transaction", ex);
continueLoop = batchSize > 0 && incomingList.size() >= batchSize;
} catch (Exception e) {
log.error("Got exception {} failing {} messages",
e.getMessage(), swapList.size(), e);
swapList.forEach(Record::fail);
try {
if (jdbcSinkConfig.isUseTransactions()) {
connection.rollback();
}
} catch (Exception ex) {
log.error("Failed to rollback transaction", ex);
}
fatal(e);
}
fatal(e);
}

} finally {
isFlushing.set(false);
if (needAnotherRound) {
flush();
}
}
}

private void ensureConnection() throws Exception {
Expand Down