Skip to content

Commit 078ab64

Browse files
feat: Adding a connection pool to reduce the time spent opening connection to Postgres (#4851)
<!-- Thanks for sending a pull request (PR)! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: [Contributing to Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md) 2. Ensure you have added or run the appropriate tests for your PR 3. If the PR is work in progress, mark it a draft on GitHub. 4. Please write your PR title to summarize what this PR proposes, we are following Conventional Commits style for PR titles as well. 5. Be sure to keep the PR description updated to reflect all changes. --> ### What changes were proposed in this PR? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes. Here are some tips for you: 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. 3. If it is a refactoring, clarify what has been changed. 3. It would be helpful to include a before-and-after comparison using screenshots or GIFs. 4. Please consider writing useful notes for better and faster reviews. --> This PR adds a HikariCP connection pool to SqlServer.scala so that jOOQ queries borrow pre-authenticated connections from a pool instead of opening a new TCP connection and performing SCRAM-SHA-256 authentication on every database call. ### What changed in SqlServer.scala: - Added a HikariConfig with maximumPoolSize=10, minimumIdle=2, connectionTimeout=30s, idleTimeout=10min, maxLifetime=30min (chosen to stay below typical PostgreSQL idle and load-balancer connection-reaping windows) - Initialized a HikariDataSource and built the jOOQ DSLContext via DSL.using(dataSource, SQL_DIALECT) so queries draw from the pool - Added a close() method to shut down the pool on application or test teardown, and updated clearInstance() to call it, so test classes that replace the singleton don't leak pool threads - Added a documented warning against caching the DSLContext in a val or lazy val. MockTexeraDB swaps the singleton between test classes, and a cached context would hold a stale reference to a dead pool ### Any related issues, documentation, discussions? <!-- Please use this section to link other resources if not mentioned already. 1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves #1234` or `Closes #1234`. If it is only related, simply mention the issue number. 2. If there is design documentation, please add the link. 3. If there is a discussion in the mailing list, please add the link. --> closes: #4852 ### How was this PR tested? <!-- If tests were added, say they were added here. Or simply mention that if the PR is tested with existing test cases. Make sure to include/update test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> - Existing unit and integration tests pass — the pool-backed DSLContext is a drop-in replacement for the previous direct-driver context - Verified the MockTexeraDB flow still works across test classes by running the full test suite and confirming no "Connection refused" failures from stale contexts ### Was this PR authored or co-authored using generative AI tooling? <!-- If generative AI tooling has been used in the process of authoring this PR, please include the phrase: 'Generated-by: ' followed by the name of the tool and its version. If no, write 'No'. Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details. --> Co-authored with Claude opus 4.6 (Anthropic) in compliance with ASF --------- Co-authored-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
1 parent 7879c2a commit 078ab64

10 files changed

Lines changed: 273 additions & 19 deletions

File tree

access-control-service/LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ Scala/Java jars:
247247
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
248248
- com.typesafe.config-1.4.6.jar
249249
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
250+
- com.zaxxer.HikariCP-5.1.0.jar
250251
- io.dropwizard.dropwizard-auth-4.0.7.jar
251252
- io.dropwizard.dropwizard-configuration-4.0.7.jar
252253
- io.dropwizard.dropwizard-core-4.0.7.jar

amber/LICENSE-binary-java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ Scala/Java jars:
291291
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
292292
- com.typesafe.ssl-config-core_2.13-0.6.1.jar
293293
- com.univocity.univocity-parsers-2.9.1.jar
294+
- com.zaxxer.HikariCP-5.1.0.jar
294295
- commons-beanutils.commons-beanutils-1.9.4.jar
295296
- commons-cli.commons-cli-1.2.jar
296297
- commons-codec.commons-codec-1.17.1.jar

common/dao/build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,5 @@ libraryDependencies ++= Seq(
175175

176176
libraryDependencies ++= Seq(
177177
"org.postgresql" % "postgresql" % "42.7.10",
178+
"com.zaxxer" % "HikariCP" % "5.1.0"
178179
)

common/dao/src/main/scala/org/apache/texera/dao/SqlServer.scala

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,59 +19,73 @@
1919

2020
package org.apache.texera.dao
2121

22+
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
2223
import org.jooq.impl.DSL
2324
import org.jooq.{DSLContext, SQLDialect}
24-
import org.postgresql.ds.PGSimpleDataSource
2525

2626
/**
2727
* SqlServer class that manages a connection to a PostgreSQL database using jOOQ.
2828
*
29+
* Uses a HikariCP connection pool so that every jOOQ query borrows a pre-authenticated
30+
* connection from the pool rather than opening a new TCP + SCRAM handshake each time.
31+
*
2932
* WARNING: Do not cache the DSLContext returned by `createDSLContext()` in a val or lazy val.
3033
* During testing, `MockTexeraDB` replaces the SqlServer instance between test classes.
3134
* A cached DSLContext will hold a stale reference to a dead database connection from a previous test class,
3235
* causing "Connection refused" errors when tests run together.
3336
* Use `def` to ensure the connection is looked up each time.
3437
*
35-
* @param url The database connection URL.
36-
* @param user The username for authenticating with the database.
38+
* @param url The JDBC connection URL.
39+
* @param user The username for authenticating with the database.
3740
* @param password The password for authenticating with the database.
3841
*/
3942
class SqlServer private (url: String, user: String, password: String) {
4043
val SQL_DIALECT: SQLDialect = SQLDialect.POSTGRES
41-
private val dataSource: PGSimpleDataSource = new PGSimpleDataSource()
42-
var context: DSLContext = {
43-
dataSource.setUrl(url)
44-
dataSource.setUser(user)
45-
dataSource.setPassword(password)
46-
dataSource.setConnectTimeout(5)
47-
DSL.using(dataSource, SQL_DIALECT)
44+
45+
private val hikariConfig: HikariConfig = {
46+
val cfg = new HikariConfig()
47+
cfg.setJdbcUrl(url)
48+
cfg.setUsername(user)
49+
cfg.setPassword(password)
50+
cfg.setPoolName("texera-hikari")
51+
cfg.setMaximumPoolSize(10)
52+
cfg.setMinimumIdle(2)
53+
// How long a caller waits for a connection before throwing (ms)
54+
cfg.setConnectionTimeout(30000)
55+
// How long an idle connection stays in the pool before being retired (ms)
56+
cfg.setIdleTimeout(600000)
57+
// Maximum lifetime of any connection in the pool (ms); must be < PostgreSQL's idle timeout
58+
cfg.setMaxLifetime(1800000)
59+
cfg
4860
}
4961

62+
private val dataSource: HikariDataSource = new HikariDataSource(hikariConfig)
63+
64+
var context: DSLContext = DSL.using(dataSource, SQL_DIALECT)
65+
5066
def createDSLContext(): DSLContext = context
5167

5268
def replaceDSLContext(newContext: DSLContext): Unit = {
5369
context = newContext
5470
}
71+
72+
def close(): Unit = {
73+
if (!dataSource.isClosed) dataSource.close()
74+
}
5575
}
5676

5777
object SqlServer {
5878
private var instance: Option[SqlServer] = None
5979

6080
def initConnection(url: String, user: String, password: String): Unit = {
61-
if (instance.isEmpty) {
62-
val server = new SqlServer(url, user, password)
63-
instance = Some(server)
64-
}
81+
instance.foreach(_.close())
82+
instance = Some(new SqlServer(url, user, password))
6583
}
6684

6785
def getInstance(): SqlServer = {
6886
instance.get
6987
}
7088

71-
def clearInstance(): Unit = {
72-
instance = None
73-
}
74-
7589
/**
7690
* A utility function for create a transaction block using given sql context
7791
* @param dsl the sql context

common/dao/src/test/scala/org/apache/texera/dao/MockTexeraDB.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ trait MockTexeraDB {
6767
value.close()
6868
dbInstance = None
6969
dslContext = None
70-
SqlServer.clearInstance()
7170
case None =>
7271
// do nothing
7372
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.texera.dao
21+
22+
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
23+
import org.jooq.impl.DSL
24+
import org.scalatest.flatspec.AnyFlatSpec
25+
import org.scalatest.matchers.should.Matchers
26+
import org.scalatest.{BeforeAndAfterAll}
27+
28+
class SqlServerSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll with MockTexeraDB {
29+
30+
override def beforeAll(): Unit = initializeDBAndReplaceDSLContext()
31+
override def afterAll(): Unit = shutdownDB()
32+
33+
// -------------------------------------------------------------------------
34+
// SqlServer.withTransaction
35+
//
36+
// getDSLContext is backed by the embedded Postgres DataSource, so each
37+
// top-level query borrows a connection from the pool. withTransaction
38+
// binds a single connection for the duration of the block, making rollback
39+
// and commit behaviour fully observable.
40+
// -------------------------------------------------------------------------
41+
42+
"SqlServer.withTransaction" should "return the value produced by the block" in {
43+
val result = SqlServer.withTransaction(getDSLContext) { _ => 42 }
44+
result shouldBe 42
45+
}
46+
47+
it should "commit the block's work so subsequent queries observe the changes" in {
48+
// SELECT 1 is a lightweight live query; completing without error confirms
49+
// the transaction committed and the connection was returned cleanly.
50+
val result = SqlServer.withTransaction(getDSLContext) { ctx =>
51+
ctx.selectOne().fetchOne().value1()
52+
}
53+
result shouldBe 1
54+
}
55+
56+
it should "re-throw the exception when the block throws" in {
57+
val boom = new RuntimeException("intentional failure")
58+
val thrown = intercept[RuntimeException] {
59+
SqlServer.withTransaction(getDSLContext) { _ => throw boom }
60+
}
61+
thrown.getMessage should include("intentional failure")
62+
}
63+
64+
it should "roll back all DML in the block when an exception is thrown" in {
65+
// A permanent (non-TEMP) table is used so every connection from the pool
66+
// can see it; TEMP tables are session-scoped and would be invisible across
67+
// pool connections.
68+
val dsl = getDSLContext
69+
dsl.execute("CREATE TABLE IF NOT EXISTS _txn_rollback_test (v INT)")
70+
try {
71+
intercept[RuntimeException] {
72+
SqlServer.withTransaction(dsl) { ctx =>
73+
ctx.execute("INSERT INTO _txn_rollback_test VALUES (99)")
74+
throw new RuntimeException("force rollback")
75+
}
76+
}
77+
// The INSERT was inside the rolled-back transaction, so the table must
78+
// still be empty.
79+
dsl.fetchCount(DSL.table(DSL.name("_txn_rollback_test"))) shouldBe 0
80+
} finally {
81+
dsl.execute("DROP TABLE IF EXISTS _txn_rollback_test")
82+
}
83+
}
84+
85+
it should "support nested return types beyond Int" in {
86+
val result = SqlServer.withTransaction(getDSLContext) { ctx =>
87+
ctx.selectOne().fetchOne().value1().toString
88+
}
89+
result shouldBe "1"
90+
}
91+
92+
// -------------------------------------------------------------------------
93+
// HikariCP pool lifecycle and configuration
94+
//
95+
// These tests create their own HikariDataSource against the embedded Postgres
96+
// instance so they can drive the pool directly, independently of the
97+
// DSLContext replacement that MockTexeraDB applies for its own queries.
98+
// -------------------------------------------------------------------------
99+
100+
private def buildPool(
101+
maxSize: Int = 5,
102+
minIdle: Int = 1,
103+
poolName: String = "spec-pool"
104+
): HikariDataSource = {
105+
// Use the default "postgres" database so no schema setup is needed.
106+
val jdbcUrl = getDBInstance.getJdbcUrl("postgres", "postgres")
107+
val cfg = new HikariConfig()
108+
cfg.setJdbcUrl(jdbcUrl)
109+
cfg.setUsername("postgres")
110+
cfg.setPassword("")
111+
cfg.setPoolName(poolName)
112+
cfg.setMaximumPoolSize(maxSize)
113+
cfg.setMinimumIdle(minIdle)
114+
cfg.setConnectionTimeout(5000)
115+
new HikariDataSource(cfg)
116+
}
117+
118+
"HikariCP pool" should "provide a usable connection that can execute queries" in {
119+
val ds = buildPool()
120+
try {
121+
val conn = ds.getConnection
122+
try {
123+
val rs = conn.prepareStatement("SELECT 1").executeQuery()
124+
rs.next() shouldBe true
125+
rs.getInt(1) shouldBe 1
126+
} finally conn.close()
127+
} finally ds.close()
128+
}
129+
130+
it should "apply the configured pool name" in {
131+
val ds = buildPool(poolName = "my-named-pool")
132+
try {
133+
ds.getHikariConfigMXBean.getPoolName shouldBe "my-named-pool"
134+
} finally ds.close()
135+
}
136+
137+
it should "apply the configured maximum pool size" in {
138+
val ds = buildPool(maxSize = 7)
139+
try {
140+
ds.getHikariConfigMXBean.getMaximumPoolSize shouldBe 7
141+
} finally ds.close()
142+
}
143+
144+
it should "apply the configured minimum idle connections" in {
145+
val ds = buildPool(minIdle = 2)
146+
try {
147+
ds.getHikariConfigMXBean.getMinimumIdle shouldBe 2
148+
} finally ds.close()
149+
}
150+
151+
it should "count a borrowed connection as active" in {
152+
val ds = buildPool()
153+
try {
154+
val conn = ds.getConnection
155+
try {
156+
ds.getHikariPoolMXBean.getActiveConnections should be >= 1
157+
} finally conn.close()
158+
} finally ds.close()
159+
}
160+
161+
it should "decrement active count and increment idle count once a connection is returned" in {
162+
val ds = buildPool()
163+
try {
164+
val conn = ds.getConnection
165+
conn.close()
166+
ds.getHikariPoolMXBean.getActiveConnections shouldBe 0
167+
ds.getHikariPoolMXBean.getIdleConnections should be >= 1
168+
} finally ds.close()
169+
}
170+
171+
it should "allow up to the maximum pool size connections to be borrowed concurrently" in {
172+
val ds = buildPool(maxSize = 3)
173+
try {
174+
val c1 = ds.getConnection
175+
val c2 = ds.getConnection
176+
val c3 = ds.getConnection
177+
ds.getHikariPoolMXBean.getActiveConnections shouldBe 3
178+
c1.close(); c2.close(); c3.close()
179+
} finally ds.close()
180+
}
181+
182+
it should "report isClosed as false while open and true after close" in {
183+
val ds = buildPool()
184+
ds.isClosed shouldBe false
185+
ds.close()
186+
ds.isClosed shouldBe true
187+
}
188+
189+
it should "reject getConnection after the pool has been closed" in {
190+
val ds = buildPool()
191+
ds.close()
192+
// HikariCP throws an SQLException (wrapped as RuntimeException by the pool)
193+
// when a caller tries to borrow from a closed pool.
194+
assertThrows[Exception](ds.getConnection)
195+
}
196+
197+
// -------------------------------------------------------------------------
198+
// SqlServer.close()
199+
//
200+
// The instance's private HikariDataSource is the only resource that needs
201+
// explicit release; close() guards it against double-close. These tests
202+
// construct a fresh SqlServer via initConnection (the only public entry
203+
// point — the class constructor is private) and assert against the
204+
// underlying pool via reflection, which avoids broadening the class API
205+
// just to make this branch observable.
206+
// -------------------------------------------------------------------------
207+
208+
private def datasourceOf(instance: SqlServer): HikariDataSource = {
209+
val field = classOf[SqlServer].getDeclaredField("dataSource")
210+
field.setAccessible(true)
211+
field.get(instance).asInstanceOf[HikariDataSource]
212+
}
213+
214+
"SqlServer.close" should "shut down the underlying HikariDataSource and be idempotent" in {
215+
val jdbcUrl = getDBInstance.getJdbcUrl("postgres", "postgres")
216+
// Replaces the singleton — initConnection internally calls close() on the
217+
// prior instance, which is itself an exercise of the same path. The trait
218+
// holds its own DSLContext separately, so other tests' database access is
219+
// unaffected by this replacement.
220+
SqlServer.initConnection(jdbcUrl, "postgres", "")
221+
val instance = SqlServer.getInstance()
222+
val ds = datasourceOf(instance)
223+
224+
ds.isClosed shouldBe false
225+
instance.close()
226+
ds.isClosed shouldBe true
227+
228+
// Second close() must take the `dataSource.isClosed` branch and return
229+
// without throwing. Calling Hikari's close() twice would itself be safe
230+
// today, but the guard is what this assertion pins.
231+
noException should be thrownBy instance.close()
232+
ds.isClosed shouldBe true
233+
}
234+
}

computing-unit-managing-service/LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ Scala/Java jars:
274274
- com.typesafe.play.play-functional_2.13-2.10.6.jar
275275
- com.typesafe.play.play-json_2.13-2.10.6.jar
276276
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
277+
- com.zaxxer.HikariCP-5.1.0.jar
277278
- commons-beanutils.commons-beanutils-1.9.4.jar
278279
- commons-cli.commons-cli-1.2.jar
279280
- commons-codec.commons-codec-1.17.1.jar

config-service/LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ Scala/Java jars:
247247
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
248248
- com.typesafe.config-1.4.6.jar
249249
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
250+
- com.zaxxer.HikariCP-5.1.0.jar
250251
- io.dropwizard.dropwizard-auth-4.0.7.jar
251252
- io.dropwizard.dropwizard-configuration-4.0.7.jar
252253
- io.dropwizard.dropwizard-core-4.0.7.jar

file-service/LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ Scala/Java jars:
268268
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
269269
- com.typesafe.config-1.4.6.jar
270270
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
271+
- com.zaxxer.HikariCP-5.1.0.jar
271272
- commons-beanutils.commons-beanutils-1.9.4.jar
272273
- commons-cli.commons-cli-1.2.jar
273274
- commons-codec.commons-codec-1.17.1.jar

workflow-compiling-service/LICENSE-binary

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ Scala/Java jars:
269269
- com.thesamet.scalapb.scalapb-runtime_2.13-0.11.20.jar
270270
- com.typesafe.config-1.4.6.jar
271271
- com.typesafe.scala-logging.scala-logging_2.13-3.9.5.jar
272+
- com.zaxxer.HikariCP-5.1.0.jar
272273
- com.univocity.univocity-parsers-2.9.1.jar
273274
- commons-beanutils.commons-beanutils-1.9.4.jar
274275
- commons-cli.commons-cli-1.2.jar

0 commit comments

Comments
 (0)