Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [11]
java_version: [21]
os: [ubuntu-latest]

steps:
Expand Down
61 changes: 61 additions & 0 deletions ebean-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,67 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>3.5.3</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>

<plugin> <!-- Multi-Release with 21 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifestEntries>
<Multi-Release>true</Multi-Release>
</manifestEntries>
</archive>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.14.0</version>
<executions>
<!-- Compile for base version Java 11 -->
<execution>
<id>base</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<release>11</release>
<compileSourceRoots>
<compileSourceRoot>${project.basedir}/src/main/java</compileSourceRoot>
</compileSourceRoots>
</configuration>
</execution>
<!-- Compile for Java 21 -->
<execution>
<id>java21</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<release>21</release>
<compileSourceRoots>
<compileSourceRoot>${project.basedir}/src/main/java21</compileSourceRoot>
</compileSourceRoots>
<outputDirectory>${project.build.outputDirectory}/META-INF/versions/21</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import java.io.PrintWriter;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -81,6 +84,8 @@ final class ConnectionPool implements DataSourcePool {
private final PooledConnectionQueue queue;
private Timer heartBeatTimer;
private int heartbeatPoolExhaustedCount;
private final ExecutorService executor;

/**
* Used to find and close() leaked connections. Leaked connections are
* thought to be busy but have not been used for some time. Each time a
Expand Down Expand Up @@ -134,6 +139,7 @@ final class ConnectionPool implements DataSourcePool {
init();
}
this.nextTrimTime = System.currentTimeMillis() + trimPoolFreqMillis;
this.executor = ExecutorFactory.newExecutor();
}

private void init() {
Expand Down Expand Up @@ -649,15 +655,23 @@ public void offline() {
shutdownPool(false, false);
}

private void shutdownPool(boolean closeBusyConnections, boolean fromHook) {
stopHeartBeatIfRunning();
PoolStatus status = queue.shutdown(closeBusyConnections);
dataSourceUp.set(false);
if (fromHook) {
Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
} else {
Log.info("DataSource [{0}] shutdown {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
removeShutdownHook();
private void shutdownPool(boolean fullShutdown, boolean fromHook) {
heartbeatLock.lock();
try {
stopHeartBeatIfRunning();
PoolStatus status = queue.shutdown(fullShutdown);
dataSourceUp.set(false);
if (fullShutdown) {
shutdownExecutor();
}
if (fromHook) {
Log.info("DataSource [{0}] shutdown on JVM exit {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
} else {
Log.info("DataSource [{0}] shutdown {1} psc[hit:{2} miss:{3} put:{4} rem:{5}]", name, status, pscHit, pscMiss, pscPut, pscRem);
removeShutdownHook();
}
} finally {
heartbeatLock.unlock();
}
}

Expand Down Expand Up @@ -718,6 +732,61 @@ private void stopHeartBeatIfRunning() {
}
}

private static final class AsyncCloser implements Runnable {
final PooledConnection pc;
final boolean logErrors;

private AsyncCloser(PooledConnection pc, boolean logErrors) {
this.pc = pc;
this.logErrors = logErrors;
}

@Override
public void run() {
pc.doCloseConnection(logErrors);
}

@Override
public String toString() {
return pc.toString();
}
}

/**
* Closes the connection in the background as it may be slow or block.
*/
void closeConnectionFullyAsync(PooledConnection pc, boolean logErrors) {
if (!executor.isShutdown()) {
try {
executor.submit(new AsyncCloser(pc, logErrors));
return;
} catch (RejectedExecutionException e) {
Log.trace("DataSource [{0}] closing connection synchronously", name);
}
}
// it is possible that we receive runnables after shutdown.
// in this case, we will execute them immediately (outside lock)
pc.doCloseConnection(logErrors);
}

private void shutdownExecutor() {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
Log.warn("DataSource [{0}] on shutdown, timeout waiting for connections to close", name);
}
} catch (InterruptedException ie) {
Log.warn("DataSource [{0}] on shutdown, interrupted closing connections", name, ie);
}
final var pendingTasks = executor.shutdownNow();
if (!pendingTasks.isEmpty()) {
Log.warn("DataSource [{0}] on shutdown, {1} pending connections were not closed", name, pendingTasks.size());
}
}

/**
* Return the default autoCommit setting for the pool.
*/
@Override
public boolean isAutoCommit() {
return autoCommit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.ebean.datasource.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

final class ExecutorFactory {

static ExecutorService newExecutor() {
return Executors.newSingleThreadExecutor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.sql.*;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand Down Expand Up @@ -234,52 +235,75 @@ void closeConnectionFully(boolean logErrors) {
if (Log.isLoggable(System.Logger.Level.TRACE)) {
Log.trace("Closing Connection[{0}] reason[{1}], pstmtStats: {2}", name, closeReason, pstmtCache.description());
}
if (pool != null) {
pool.pstmtCacheMetrics(pstmtCache);
if (pool == null) {
return; // this can happen in tests only.
}
pool.pstmtCacheMetrics(pstmtCache);
pool.closeConnectionFullyAsync(this, logErrors);
}

/**
* this method performs network IO and may block
*/
void doCloseConnection(boolean logErrors) {
pool.dec();
long start = System.nanoTime();
try {
if (connection.isClosed()) {
// Typically, the JDBC Driver has its own JVM shutdown hook and already
// closed the connections in our DataSource pool so making this DEBUG level
Log.trace("Closing Connection[{0}] that is already closed?", name);
return;
try {
if (connection.isClosed()) {
// Typically, the JDBC Driver has its own JVM shutdown hook and already
// closed the connections in our DataSource pool so making this DEBUG level
Log.trace("Closing Connection[{0}] that is already closed?", name);
return;
}
} catch (SQLException ex) {
if (logErrors) {
Log.error("Error checking if connection [" + name + "] is closed", ex);
}
}
} catch (SQLException ex) {
if (logErrors) {
Log.error("Error checking if connection [" + name + "] is closed", ex);
try {
clearPreparedStatementCache();
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Error when closing connection Statements", ex);
}
}
try {
// DB2 (and some other DBMS) may have uncommitted changes and do not allow close
// so try to do a rollback.
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Could not perform rollback", ex);
}
}
try {
connection.close();
} catch (SQLException ex) {
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
}
}
} finally {
long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (millis > 500) {
Log.warn("Closing connection [" + fullDescription() + "] took an unexpected long time of " + millis + " ms");
}
}
}

void clearPreparedStatementCache() throws SQLException {
lock.lock();
try {
for (ExtendedPreparedStatement ps : pstmtCache.values()) {
ps.closeDestroy();
}
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Error when closing connection Statements", ex);
}

} finally {
lock.unlock();
}
try {
// DB2 (and some other DBMS) may have uncommitted changes and do not allow close
// so try to do a rollback.
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (SQLException ex) {
if (logErrors) {
Log.warn("Could not perform rollback", ex);
}
}
try {
connection.close();
pool.dec();
} catch (SQLException ex) {
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.ebean.datasource.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

final class ExecutorFactory {

static ExecutorService newExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
}
Loading
Loading