Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.10.9
## 0.10.0

### Breaking Changes

Expand Down Expand Up @@ -39,6 +39,8 @@

### Bug Fixes

- **[jdbc-v2]** Fixed `Statement.cancel()` throwing `SESSION_IS_LOCKED` when the statement was running inside a ClickHouse session (e.g. via `clickhouse_setting_session_id`). The `KILL QUERY` request issued by `cancel()` now runs outside the session, so it no longer contends with the running query for the session lock. (https://github.com/ClickHouse/clickhouse-java/issues/2690)

- **[client-v2]** Fixed inconsistent use of `executionTimeout` parameter in `Client` component. The timeout was previously set in milliseconds but mistakenly retrieved and used in seconds in some places. Now it correctly uses milliseconds consistently. (https://github.com/ClickHouse/clickhouse-java/issues/2358)

## 0.9.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public CommandSettings use(Session session) {
super.use(session);
return this;
}

@Override
public CommandSettings clearSession() {
super.clearSession();
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public InsertSettings use(Session session) {
return this;
}

public InsertSettings clearSession() {
settings.clearSession();
return this;
}

public int getInputStreamCopyBufferSize() {
return this.inputStreamCopyBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ public CommonSettings use(Session session) {
return this;
}

public void clearSession() {
resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_ID));
resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_CHECK));
resetOption(ClientConfigProperties.serverSetting(ClickHouseHttpProto.QPARAM_SESSION_TIMEOUT));
// Do not clean `session_timezone` setting because it is not related to session management and used to
// set timezone for consequent queries in some multi-user applications.
}

/**
* Operation id. Used internally to register new operation.
* Should not be called directly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public QuerySettings use(Session session) {
return this;
}

public QuerySettings clearSession() {
Comment thread
chernser marked this conversation as resolved.
settings.clearSession();
return this;
}

/**
* Read buffer is used for reading data from a server. Size is in bytes.
* Minimal value is {@value MINIMAL_READ_BUFFER_SIZE} bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,32 @@ void testQuerySettingsSpecific() throws Exception {
Assert.assertEquals(settings.getSessionTimeout().intValue(), 45);
Assert.assertEquals(settings.getSessionTimezone(), "Europe/Berlin");
}

{
final QuerySettings settings = new QuerySettings();
settings.setSessionId("session-clear-1");
settings.setSessionCheck(true);
settings.setSessionTimeout(60);
settings.setSessionTimezone("America/New_York");
Assert.assertNotNull(settings.getSessionId());
Assert.assertNotNull(settings.getSessionCheck());
Assert.assertNotNull(settings.getSessionTimeout());
Assert.assertNotNull(settings.getSessionTimezone());

settings.clearSession();

Assert.assertNull(settings.getSessionId(), "clearSession() must remove session_id");
Assert.assertNull(settings.getSessionCheck(), "clearSession() must remove session_check");
Assert.assertNull(settings.getSessionTimeout(), "clearSession() must remove session_timeout");
// session_timezone is not session-management state; it is preserved across clearSession().
Assert.assertEquals(settings.getSessionTimezone(), "America/New_York",
"clearSession() must not remove session_timezone");

// Non-session settings are unaffected.
settings.setDatabase("db1");
settings.clearSession();
Assert.assertEquals(settings.getDatabase(), "db1");
}
}

@Test
Expand Down Expand Up @@ -232,5 +258,31 @@ public void testInsertSettingsSpecific() throws Exception {
Assert.assertEquals(settings.getSessionTimeout().intValue(), 50);
Assert.assertEquals(settings.getSessionTimezone(), "Europe/Paris");
}

{
final InsertSettings settings = new InsertSettings();
settings.setSessionId("session-clear-2");
settings.setSessionCheck(true);
settings.setSessionTimeout(90);
settings.setSessionTimezone("Asia/Tokyo");
Assert.assertNotNull(settings.getSessionId());
Assert.assertNotNull(settings.getSessionCheck());
Assert.assertNotNull(settings.getSessionTimeout());
Assert.assertNotNull(settings.getSessionTimezone());

settings.clearSession();

Assert.assertNull(settings.getSessionId(), "clearSession() must remove session_id");
Assert.assertNull(settings.getSessionCheck(), "clearSession() must remove session_check");
Assert.assertNull(settings.getSessionTimeout(), "clearSession() must remove session_timeout");
// session_timezone is not session-management state; it is preserved across clearSession().
Assert.assertEquals(settings.getSessionTimezone(), "Asia/Tokyo",
"clearSession() must not remove session_timezone");

// Non-session settings are unaffected.
settings.setDatabase("db2");
settings.clearSession();
Assert.assertEquals(settings.getDatabase(), "db2");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,12 @@ public void cancel() throws SQLException {
return;
}

// KILL QUERY must not run inside the same session as the query being canceled otherwise it will
// cause "Session is locked by a concurrent client" (SESSION_IS_LOCKED) error.
QuerySettings cancelSettings = QuerySettings.merge(getLocalSettings(), new QuerySettings()).clearSession();
try (QueryResponse response = connection.getClient().query(String.format("KILL QUERY%sWHERE query_id = '%s'",
connection.onCluster ? " ON CLUSTER " + SQLUtils.enquoteIdentifier(connection.cluster, true) + ' ' : ' ',
lastQueryId), connection.getDefaultQuerySettings()).get()){
lastQueryId), cancelSettings).get()){
Comment thread
chernser marked this conversation as resolved.
LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId());
} catch (Exception e) {
throw new SQLException(e);
Expand Down
129 changes: 129 additions & 0 deletions jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -711,6 +713,133 @@ public void testConcurrentCancel() throws Exception {
}
}

/**
* Waits until the given query id appears in {@code system.processes}, so we know the operation has actually
* started on the server before attempting to cancel it. Uses a dedicated connection (no session) to observe.
*/
private boolean waitForQueryToStart(String queryId, int timeoutSeconds) throws Exception {
if (queryId == null) {
return false;
}
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
while (System.currentTimeMillis() < deadline) {
try (Connection conn = getJdbcConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT count() FROM system.processes WHERE query_id = '" + queryId + "'")) {

if (rs.next() && rs.getLong(1) > 0) {
return true;
}
}
Thread.sleep(200);
}
return false;
}

private String waitForQueryId(StatementImpl stmt, int timeoutSeconds) throws Exception {
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
while (System.currentTimeMillis() < deadline) {
String queryId = stmt.getLastQueryId();
if (queryId != null && !queryId.isEmpty()) {
return queryId;
}
Thread.sleep(50);
}
return null;
}

@Test(groups = {"integration"})
public void testCancelQueryWithSession() throws Exception {
if (isCloud()) {
throw new SkipException("Cloud + HTTP doesn't work well. Enough to test locally");
}

// Regression test for #2690: cancelling a query that runs inside a session must not fail with
// "Session is locked by a concurrent client" (SESSION_IS_LOCKED). The KILL QUERY request issued by
// cancel() must not carry the session id of the query being cancelled.
String sessionId = "test-session-" + UUID.randomUUID();
try (Connection conn = getJdbcConnection()) {
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
stmt.getLocalSettings().setSessionId(sessionId);
stmt.setQueryTimeout(30); // safety net so a failed cancel cannot hang the test

final AtomicReference<Throwable> threadError = new AtomicReference<>();
final CountDownLatch started = new CountDownLatch(1);
Thread worker = new Thread(() -> {
started.countDown();
// Long-running query that only completes when killed.
try (ResultSet rs = stmt.executeQuery("SELECT count() FROM system.numbers_mt")) {
rs.next();
} catch (Throwable t) {
System.out.println("Error: " + t.getMessage());
threadError.set(t);
}
});
worker.start();
started.await();

String queryId = waitForQueryId(stmt, 15);
assertNotNull(queryId, "Query id was not assigned in time");
assertTrue(waitForQueryToStart(queryId, 15), "Query did not start on the server in time");

// Cancel from the main thread - must not throw SESSION_IS_LOCKED.
stmt.cancel();

worker.join(TimeUnit.SECONDS.toMillis(20));
assertFalse(worker.isAlive(), "Query was not cancelled and is still running");
}
}
}

@Test(groups = {"integration"})
public void testCancelInsertWithSession() throws Exception {
if (isCloud()) {
throw new SkipException("Cloud + HTTP doesn't work well. Enough to test locally");
}
// Regression test for #2690 covering a long-running INSERT executed inside a session.
String tableName = getDatabase() + ".cancel_insert_with_session";
String sessionId = "test-session-" + UUID.randomUUID();
try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) {
try (Statement setup = conn.createStatement()) {
setup.execute("DROP TABLE IF EXISTS " + tableName);
setup.execute("CREATE TABLE " + tableName + " (num UInt64) ENGINE = MergeTree ORDER BY ()");
}

try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
stmt.getLocalSettings().setSessionId(sessionId);
stmt.setQueryTimeout(30); // safety net so a failed cancel cannot hang the test

final AtomicReference<Throwable> threadError = new AtomicReference<>();
final CountDownLatch started = new CountDownLatch(1);
Thread worker = new Thread(() -> {
started.countDown();
// Long-running insert that only completes when killed.
try {
stmt.executeUpdate("INSERT INTO " + tableName + " SELECT number FROM system.numbers_mt");
} catch (Throwable t) {
threadError.set(t);
}
});
worker.start();
started.await();

String queryId = waitForQueryId(stmt, 15);
assertNotNull(queryId, "Query id was not assigned in time");
assertTrue(waitForQueryToStart(queryId, 15), "Insert did not start on the server in time");

// Cancel from the main thread - must not throw SESSION_IS_LOCKED.
stmt.cancel();

worker.join(TimeUnit.SECONDS.toMillis(20));
assertFalse(worker.isAlive(), "Insert was not cancelled and is still running");
} finally {
try (Statement cleanup = conn.createStatement()) {
cleanup.execute("DROP TABLE IF EXISTS " + tableName);
}
}
}
}

@Test(groups = {"integration"})
public void testTextFormatInResponse() throws Exception {
try (Connection conn = getJdbcConnection();
Expand Down
Loading