diff --git a/CHANGELOG.md b/CHANGELOG.md index c8e28090c..dd347a3a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.9 +## 0.10.0 ### Breaking Changes @@ -39,6 +39,8 @@ ### Bug Fixes +- **[jdbc-v2]** Fixed `Statement.cancel()` throwing `SESSION_IS_LOCKED` when the statement was running inside a ClickHouse session (e.g. via `clickhouse_setting_session_id`). The `KILL QUERY` request issued by `cancel()` now runs outside the session, so it no longer contends with the running query for the session lock. (https://github.com/ClickHouse/clickhouse-java/issues/2690) + - **[client-v2]** Fixed inconsistent use of `executionTimeout` parameter in `Client` component. The timeout was previously set in milliseconds but mistakenly retrieved and used in seconds in some places. Now it correctly uses milliseconds consistently. (https://github.com/ClickHouse/clickhouse-java/issues/2358) ## 0.9.8 diff --git a/client-v2/src/main/java/com/clickhouse/client/api/command/CommandSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/command/CommandSettings.java index 399a50f26..b02d98eba 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/command/CommandSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/command/CommandSettings.java @@ -33,4 +33,10 @@ public CommandSettings use(Session session) { super.use(session); return this; } + + @Override + public CommandSettings clearSession() { + super.clearSession(); + return this; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java index 840e33889..7a2001eda 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/insert/InsertSettings.java @@ -145,6 +145,11 @@ public InsertSettings use(Session session) { return this; } + public InsertSettings clearSession() { + settings.clearSession(); + return this; + } + public int getInputStreamCopyBufferSize() { return this.inputStreamCopyBufferSize; } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java index aa2403c73..e55d1429e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/CommonSettings.java @@ -139,6 +139,14 @@ public CommonSettings use(Session session) { return this; } + public void clearSession() { + resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_ID)); + resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_CHECK)); + resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_TIMEOUT)); + // Do not clean `session_timezone` setting because it is not related to session management and used to + // set timezone for consequent queries in some multi-user applications. + } + /** * Operation id. Used internally to register new operation. * Should not be called directly. diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java index 95babb1ea..9df39f407 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/QuerySettings.java @@ -139,6 +139,11 @@ public QuerySettings use(Session session) { return this; } + public QuerySettings clearSession() { + settings.clearSession(); + return this; + } + /** * Read buffer is used for reading data from a server. Size is in bytes. * Minimal value is {@value MINIMAL_READ_BUFFER_SIZE} bytes. diff --git a/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java b/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java index ca4614a67..81261dc33 100644 --- a/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/SettingsTests.java @@ -143,6 +143,32 @@ void testQuerySettingsSpecific() throws Exception { Assert.assertEquals(settings.getSessionTimeout().intValue(), 45); Assert.assertEquals(settings.getSessionTimezone(), "Europe/Berlin"); } + + { + final QuerySettings settings = new QuerySettings(); + settings.setSessionId("session-clear-1"); + settings.setSessionCheck(true); + settings.setSessionTimeout(60); + settings.setSessionTimezone("America/New_York"); + Assert.assertNotNull(settings.getSessionId()); + Assert.assertNotNull(settings.getSessionCheck()); + Assert.assertNotNull(settings.getSessionTimeout()); + Assert.assertNotNull(settings.getSessionTimezone()); + + settings.clearSession(); + + Assert.assertNull(settings.getSessionId(), "clearSession() must remove session_id"); + Assert.assertNull(settings.getSessionCheck(), "clearSession() must remove session_check"); + Assert.assertNull(settings.getSessionTimeout(), "clearSession() must remove session_timeout"); + // session_timezone is not session-management state; it is preserved across clearSession(). + Assert.assertEquals(settings.getSessionTimezone(), "America/New_York", + "clearSession() must not remove session_timezone"); + + // Non-session settings are unaffected. + settings.setDatabase("db1"); + settings.clearSession(); + Assert.assertEquals(settings.getDatabase(), "db1"); + } } @Test @@ -232,5 +258,31 @@ public void testInsertSettingsSpecific() throws Exception { Assert.assertEquals(settings.getSessionTimeout().intValue(), 50); Assert.assertEquals(settings.getSessionTimezone(), "Europe/Paris"); } + + { + final InsertSettings settings = new InsertSettings(); + settings.setSessionId("session-clear-2"); + settings.setSessionCheck(true); + settings.setSessionTimeout(90); + settings.setSessionTimezone("Asia/Tokyo"); + Assert.assertNotNull(settings.getSessionId()); + Assert.assertNotNull(settings.getSessionCheck()); + Assert.assertNotNull(settings.getSessionTimeout()); + Assert.assertNotNull(settings.getSessionTimezone()); + + settings.clearSession(); + + Assert.assertNull(settings.getSessionId(), "clearSession() must remove session_id"); + Assert.assertNull(settings.getSessionCheck(), "clearSession() must remove session_check"); + Assert.assertNull(settings.getSessionTimeout(), "clearSession() must remove session_timeout"); + // session_timezone is not session-management state; it is preserved across clearSession(). + Assert.assertEquals(settings.getSessionTimezone(), "Asia/Tokyo", + "clearSession() must not remove session_timezone"); + + // Non-session settings are unaffected. + settings.setDatabase("db2"); + settings.clearSession(); + Assert.assertEquals(settings.getDatabase(), "db2"); + } } } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java index 2801450ed..d5c4e4233 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java @@ -333,9 +333,12 @@ public void cancel() throws SQLException { return; } + // KILL QUERY must not run inside the same session as the query being canceled otherwise it will + // cause "Session is locked by a concurrent client" (SESSION_IS_LOCKED) error. + QuerySettings cancelSettings = QuerySettings.merge(getLocalSettings(), new QuerySettings()).clearSession(); try (QueryResponse response = connection.getClient().query(String.format("KILL QUERY%sWHERE query_id = '%s'", connection.onCluster ? " ON CLUSTER " + SQLUtils.enquoteIdentifier(connection.cluster, true) + ' ' : ' ', - lastQueryId), connection.getDefaultQuerySettings()).get()){ + lastQueryId), cancelSettings).get()){ LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId()); } catch (Exception e) { throw new SQLException(e); diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java index 4995caf16..eed401a14 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java @@ -29,6 +29,8 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -711,6 +713,133 @@ public void testConcurrentCancel() throws Exception { } } + /** + * Waits until the given query id appears in {@code system.processes}, so we know the operation has actually + * started on the server before attempting to cancel it. Uses a dedicated connection (no session) to observe. + */ + private boolean waitForQueryToStart(String queryId, int timeoutSeconds) throws Exception { + if (queryId == null) { + return false; + } + long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds); + while (System.currentTimeMillis() < deadline) { + try (Connection conn = getJdbcConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT count() FROM system.processes WHERE query_id = '" + queryId + "'")) { + + if (rs.next() && rs.getLong(1) > 0) { + return true; + } + } + Thread.sleep(200); + } + return false; + } + + private String waitForQueryId(StatementImpl stmt, int timeoutSeconds) throws Exception { + long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds); + while (System.currentTimeMillis() < deadline) { + String queryId = stmt.getLastQueryId(); + if (queryId != null && !queryId.isEmpty()) { + return queryId; + } + Thread.sleep(50); + } + return null; + } + + @Test(groups = {"integration"}) + public void testCancelQueryWithSession() throws Exception { + if (isCloud()) { + throw new SkipException("Cloud + HTTP doesn't work well. Enough to test locally"); + } + + // Regression test for #2690: cancelling a query that runs inside a session must not fail with + // "Session is locked by a concurrent client" (SESSION_IS_LOCKED). The KILL QUERY request issued by + // cancel() must not carry the session id of the query being cancelled. + String sessionId = "test-session-" + UUID.randomUUID(); + try (Connection conn = getJdbcConnection()) { + try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { + stmt.getLocalSettings().setSessionId(sessionId); + stmt.setQueryTimeout(30); // safety net so a failed cancel cannot hang the test + + final AtomicReference threadError = new AtomicReference<>(); + final CountDownLatch started = new CountDownLatch(1); + Thread worker = new Thread(() -> { + started.countDown(); + // Long-running query that only completes when killed. + try (ResultSet rs = stmt.executeQuery("SELECT count() FROM system.numbers_mt")) { + rs.next(); + } catch (Throwable t) { + System.out.println("Error: " + t.getMessage()); + threadError.set(t); + } + }); + worker.start(); + started.await(); + + String queryId = waitForQueryId(stmt, 15); + assertNotNull(queryId, "Query id was not assigned in time"); + assertTrue(waitForQueryToStart(queryId, 15), "Query did not start on the server in time"); + + // Cancel from the main thread - must not throw SESSION_IS_LOCKED. + stmt.cancel(); + + worker.join(TimeUnit.SECONDS.toMillis(20)); + assertFalse(worker.isAlive(), "Query was not cancelled and is still running"); + } + } + } + + @Test(groups = {"integration"}) + public void testCancelInsertWithSession() throws Exception { + if (isCloud()) { + throw new SkipException("Cloud + HTTP doesn't work well. Enough to test locally"); + } + // Regression test for #2690 covering a long-running INSERT executed inside a session. + String tableName = getDatabase() + ".cancel_insert_with_session"; + String sessionId = "test-session-" + UUID.randomUUID(); + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { + try (Statement setup = conn.createStatement()) { + setup.execute("DROP TABLE IF EXISTS " + tableName); + setup.execute("CREATE TABLE " + tableName + " (num UInt64) ENGINE = MergeTree ORDER BY ()"); + } + + try (StatementImpl stmt = (StatementImpl) conn.createStatement()) { + stmt.getLocalSettings().setSessionId(sessionId); + stmt.setQueryTimeout(30); // safety net so a failed cancel cannot hang the test + + final AtomicReference threadError = new AtomicReference<>(); + final CountDownLatch started = new CountDownLatch(1); + Thread worker = new Thread(() -> { + started.countDown(); + // Long-running insert that only completes when killed. + try { + stmt.executeUpdate("INSERT INTO " + tableName + " SELECT number FROM system.numbers_mt"); + } catch (Throwable t) { + threadError.set(t); + } + }); + worker.start(); + started.await(); + + String queryId = waitForQueryId(stmt, 15); + assertNotNull(queryId, "Query id was not assigned in time"); + assertTrue(waitForQueryToStart(queryId, 15), "Insert did not start on the server in time"); + + // Cancel from the main thread - must not throw SESSION_IS_LOCKED. + stmt.cancel(); + + worker.join(TimeUnit.SECONDS.toMillis(20)); + assertFalse(worker.isAlive(), "Insert was not cancelled and is still running"); + } finally { + try (Statement cleanup = conn.createStatement()) { + cleanup.execute("DROP TABLE IF EXISTS " + tableName); + } + } + } + } + @Test(groups = {"integration"}) public void testTextFormatInResponse() throws Exception { try (Connection conn = getJdbcConnection();