Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,23 @@ public Connection borrowConnection(final long timeout, final TimeUnit unit) thro
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");

// Get valid connection
final Connection availableConnection = getAvailableConnection();
if (availableConnection == null) {
return waitForConnection(timeout, unit);
Connection availableConnection;
try {
// acquire lock before attempting to obtain an available connection and potentially waiting for one
// to prevent race conditions with other threads that are releasing or creating new connections
// otherwise signal may be called before await which would result in timeouts
waitLock.lock();
availableConnection = getAvailableConnection();
if (availableConnection == null) {
availableConnection = waitForConnection(timeout, unit);
}
} finally {
waitLock.unlock();
}

if (logger.isDebugEnabled())
if (logger.isDebugEnabled()) {
logger.debug("Borrowed {} on {} with {}", availableConnection.getConnectionInfo(), host, Thread.currentThread());
}
return availableConnection;
}

Expand Down Expand Up @@ -287,6 +297,9 @@ private void considerNewConnection() {
}

private void newConnection() {
if (logger.isTraceEnabled()) {
logger.trace("Scheduling creation of new connection for connection pool {} using scheduler {}", this.getPoolInfo(), cluster.connectionScheduler());
}
cluster.connectionScheduler().submit(() -> {
// seems like this should be decremented first because if addConnectionIfUnderMaximum fails there is
// nothing that wants to decrement this number and so it leaves things in a state where you could
Expand Down Expand Up @@ -487,6 +500,7 @@ private boolean tryReconnect(final Host h) {

private void announceAvailableConnection() {
logger.debug("Announce connection available on {}", host);
logConnectionPoolStatus();

waitLock.lock();
try {
Expand Down Expand Up @@ -523,23 +537,23 @@ private Connection getAvailableConnection() {

return available;
}

private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
Comment thread
Cole-Greer marked this conversation as resolved.
logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
logConnectionPoolStatus();

waitLock.lock();
waiter.incrementAndGet();
try {
hasAvailableConnection.await(timeout, unit);
} finally {
waiter.decrementAndGet();
waitLock.unlock();
}
}

private void announceAllAvailableConnection() {
logger.debug("Announce to all connection available on {}", host);

logConnectionPoolStatus();

waitLock.lock();
try {
hasAvailableConnection.signalAll();
Expand Down Expand Up @@ -602,6 +616,12 @@ private void appendConnections(final StringBuilder sb, final Connection connecti
}
}

private void logConnectionPoolStatus() {
if (logger.isTraceEnabled()) {
logger.trace("Connection Pool status: {} ", this.getPoolInfo());
}
}

@Override
public String toString() {
return poolLabel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public static void main(final String[] args) {
final boolean suppressStackTraces = Boolean.parseBoolean(options.getOrDefault("suppressStackTraces", "false").toString());

final boolean exercise = Boolean.parseBoolean(options.getOrDefault("exercise", "false").toString());
final String script = options.getOrDefault("script", "1+1").toString();
final String script = options.getOrDefault("script", "g.inject(1)").toString();

final Cluster cluster = Cluster.build(host)
.maxConnectionPoolSize(maxConnectionPoolSize)
Expand Down