Skip to content

Commit 7d0b74b

Browse files
authored
[to dev/1.3] C++ client: backport thread-safe SessionPool (#17800) (#17817)
* C++ client: add thread-safe SessionPool, enable RPC compression, and harden buffers (#17800) * Wire RPC compression flag through Session to its connections The enableRPCCompression option set via Session::open(bool) or the session builder was never propagated to SessionConnection, whose flag was hardcoded to false, so the compact Thrift protocol never took effect. Thread the flag from the builder/open() into both the data SessionConnection and the node-discovery NodesSupplier client so compression actually applies. * Use snprintf for Tablet bounds-check error messages Tablet::addValue and the OBJECT-value overload formatted out-of-range diagnostics with sprintf into a fixed 100-byte stack buffer, risking an overflow. Switch to snprintf bounded by sizeof(buffer) and cast the size_t arguments to long to match the %ld format. * Append big-endian bytes in MyStringBuffer instead of overwriting On big-endian hosts MyStringBuffer::putOrderedByte used str.assign, which replaced the whole buffer with each numeric write and corrupted previously serialized content. Use str.append so bytes accumulate, matching the little-endian path. * Add thread-safe SessionPool to the C++ client Introduce SessionPool and SessionPoolBuilder so multiple threads can share a bounded set of connections without external locking. A single Session is not safe to use concurrently, so the pool lends each Session to one borrower at a time via an RAII PooledSession handle and reclaims it on scope exit. Sessions are created outside the lock to avoid blocking other borrowers during the handshake, and getSession() blocks up to a configurable timeout when the pool is exhausted. Query results are returned as a PooledSessionDataSet that keeps the Session leased until the result set is fully read, since SessionDataSet lazily fetches further blocks over the same connection. Connections that raise IoTDBConnectionException are evicted rather than recycled. Add integration tests covering basic borrow/insert/query, concurrent writers, and exhaustion-timeout behavior. * Reject zero maxSize in SessionPool instead of clamping to 1 Address review feedback: maxSize is size_t, so a non-positive check reduces to == 0 (and "<= 0" would be a tautological-comparison warning under -Wall). Rather than silently clamping an invalid 0 to 1, fail fast by throwing IoTDBException so the misuse surfaces at construction time. * Tolerate missing timeseries in SessionPool test cleanup The pre-test cleanup deleted root.test.pool.* timeseries unconditionally, which threw 508 (does not exist) on a fresh database and failed the new [sessionPool] cases. Ignore that error since the cleanup is best-effort. * Revert "Wire RPC compression flag through Session to its connections" This reverts commit 2f35cc5. Honoring the compression flag makes the client negotiate the compact Thrift protocol, which the binary-only IoTDB server used by the C++ integration tests cannot speak, breaking the pre-existing ts_session_open_with_compression smoke test (it had only passed because the flag was a no-op). Compression needs a compact-protocol-enabled test server, so it will be reintroduced in a dedicated PR with the matching server-side test support. SessionPool keeps its compression option for forward compatibility; it is currently a no-op, as the rest of the client has always been. * fix format * Discard SessionPool session if pool closed during construction Address review feedback: acquire() releases the lock while building a new connection, so a concurrent close() could set closed_ after the slot was reserved, and the freshly opened session would still be handed out from a closed pool. Re-check closed_ under the lock after construction; if the pool was closed meanwhile, release the slot, tear the session down outside the lock, and throw instead of returning it. * C++ client: adapt SessionPool to dev/1.3 Session/builder APIs The cherry-picked SessionPool targeted master's expanded AbstractSessionBuilder (DEFAULT_* constants plus nodeUrls/connectTimeoutMs/useSSL/trustCertFilePath fields), none of which exist on dev/1.3, so the C++ client failed to compile. Adapt only the newly added SessionPool.h/.cpp (no existing dev/1.3 interface or implementation is changed): - Replace AbstractSessionBuilder::DEFAULT_* with dev/1.3's literal defaults. - Keep connectTimeoutMs (still applied via Session::open()). - Drop the multi-node (nodeUrls) constructor and SSL options, since dev/1.3's Session(AbstractSessionBuilder*) wires neither and would silently ignore them.
1 parent cb8fd4c commit 7d0b74b

5 files changed

Lines changed: 758 additions & 5 deletions

File tree

iotdb-client/client-cpp/src/main/Common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ const char* MyStringBuffer::getOrderedByte(size_t len) {
384384

385385
void MyStringBuffer::putOrderedByte(char* buf, int len) {
386386
if (isBigEndian) {
387-
str.assign(buf, len);
387+
str.append(buf, len);
388388
}
389389
else {
390390
for (int i = len - 1; i > -1; i--) {

iotdb-client/client-cpp/src/main/Session.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <thread>
3232
#include <stdexcept>
3333
#include <cstdlib>
34+
#include <cstdio>
3435
#include <future>
3536
#include <boost/date_time/gregorian/gregorian.hpp>
3637
#include <thrift/protocol/TBinaryProtocol.h>
@@ -272,15 +273,17 @@ class Tablet {
272273
void addValue(size_t schemaId, size_t rowIndex, const T& value) {
273274
if (schemaId >= schemas.size()) {
274275
char tmpStr[100];
275-
sprintf(tmpStr, "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.",
276-
schemaId, schemas.size());
276+
snprintf(tmpStr, sizeof(tmpStr),
277+
"Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.",
278+
(long)schemaId, (long)schemas.size());
277279
throw std::out_of_range(tmpStr);
278280
}
279281

280282
if (rowIndex >= rowSize) {
281283
char tmpStr[100];
282-
sprintf(tmpStr, "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.", rowIndex,
283-
rowSize);
284+
snprintf(tmpStr, sizeof(tmpStr),
285+
"Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.",
286+
(long)rowIndex, (long)rowSize);
284287
throw std::out_of_range(tmpStr);
285288
}
286289

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "SessionPool.h"
21+
22+
void PooledSession::reset() {
23+
if (session_ && pool_ != nullptr) {
24+
pool_->putBack(session_, broken_);
25+
}
26+
pool_ = nullptr;
27+
session_ = nullptr;
28+
broken_ = false;
29+
}
30+
31+
SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password,
32+
size_t maxSize)
33+
: host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)),
34+
password_(std::move(password)), maxSize_(maxSize) {
35+
if (maxSize_ == 0) {
36+
throw IoTDBException("SessionPool maxSize must be greater than 0.");
37+
}
38+
}
39+
40+
SessionPool::~SessionPool() {
41+
try {
42+
close();
43+
} catch (const std::exception& e) {
44+
log_debug(std::string("SessionPool::~SessionPool(), ") + e.what());
45+
}
46+
}
47+
48+
SessionPool& SessionPool::setFetchSize(int fetchSize) {
49+
fetchSize_ = fetchSize;
50+
return *this;
51+
}
52+
53+
SessionPool& SessionPool::setZoneId(std::string zoneId) {
54+
zoneId_ = std::move(zoneId);
55+
return *this;
56+
}
57+
58+
SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) {
59+
sqlDialect_ = std::move(sqlDialect);
60+
return *this;
61+
}
62+
63+
SessionPool& SessionPool::setDatabase(std::string database) {
64+
database_ = std::move(database);
65+
return *this;
66+
}
67+
68+
SessionPool& SessionPool::setEnableRedirection(bool enable) {
69+
enableRedirection_ = enable;
70+
return *this;
71+
}
72+
73+
SessionPool& SessionPool::setEnableAutoFetch(bool enable) {
74+
enableAutoFetch_ = enable;
75+
return *this;
76+
}
77+
78+
SessionPool& SessionPool::setEnableRPCCompression(bool enable) {
79+
enableRPCCompression_ = enable;
80+
return *this;
81+
}
82+
83+
SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) {
84+
connectTimeoutMs_ = connectTimeoutMs;
85+
return *this;
86+
}
87+
88+
SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) {
89+
waitTimeoutMs_ = timeoutMs;
90+
return *this;
91+
}
92+
93+
std::shared_ptr<Session> SessionPool::constructNewSession() {
94+
AbstractSessionBuilder builder;
95+
builder.host = host_;
96+
builder.rpcPort = rpcPort_;
97+
builder.username = username_;
98+
builder.password = password_;
99+
builder.zoneId = zoneId_;
100+
builder.fetchSize = fetchSize_;
101+
builder.sqlDialect = sqlDialect_;
102+
builder.database = database_;
103+
builder.enableAutoFetch = enableAutoFetch_;
104+
builder.enableRedirections = enableRedirection_;
105+
builder.enableRPCCompression = enableRPCCompression_;
106+
107+
auto session = std::make_shared<Session>(&builder);
108+
session->open(enableRPCCompression_, connectTimeoutMs_);
109+
return session;
110+
}
111+
112+
std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) {
113+
const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_;
114+
std::unique_lock<std::mutex> lock(mutex_);
115+
const auto deadline =
116+
std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout);
117+
118+
while (true) {
119+
if (closed_) {
120+
throw IoTDBException("SessionPool is closed.");
121+
}
122+
if (!idleQueue_.empty()) {
123+
auto session = idleQueue_.front();
124+
idleQueue_.pop_front();
125+
return session;
126+
}
127+
if (size_ < maxSize_) {
128+
// Reserve a slot, then build the connection outside the lock so other
129+
// borrowers are not blocked by network/handshake latency.
130+
++size_;
131+
lock.unlock();
132+
std::shared_ptr<Session> session;
133+
try {
134+
session = constructNewSession();
135+
} catch (...) {
136+
lock.lock();
137+
--size_;
138+
cv_.notify_one();
139+
throw;
140+
}
141+
lock.lock();
142+
if (closed_) {
143+
// The pool was closed while this session was being built; do not hand it
144+
// out. Release its slot and let it be torn down outside the lock.
145+
--size_;
146+
lock.unlock();
147+
throw IoTDBException("SessionPool is closed.");
148+
}
149+
return session;
150+
}
151+
152+
// Pool exhausted: wait for a Session to be returned.
153+
if (effectiveTimeout <= 0) {
154+
cv_.wait(lock);
155+
} else {
156+
if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && idleQueue_.empty() &&
157+
size_ >= maxSize_ && !closed_) {
158+
throw IoTDBException(
159+
"Wait to get session timeout in SessionPool, maxSize=" + std::to_string(maxSize_) +
160+
", waitTimeoutMs=" + std::to_string(effectiveTimeout) + ".");
161+
}
162+
}
163+
}
164+
}
165+
166+
void SessionPool::putBack(const std::shared_ptr<Session>& session, bool broken) {
167+
std::lock_guard<std::mutex> lock(mutex_);
168+
if (broken || closed_) {
169+
// Drop the Session and free its slot so a healthy replacement can be created
170+
// on demand. The caller (PooledSession::reset) still holds the last reference
171+
// and tears the connection down after we return, i.e. outside this lock.
172+
--size_;
173+
} else {
174+
idleQueue_.push_back(session);
175+
}
176+
cv_.notify_one();
177+
}
178+
179+
PooledSession SessionPool::getSession() {
180+
return getSession(waitTimeoutMs_);
181+
}
182+
183+
PooledSession SessionPool::getSession(int64_t timeoutMs) {
184+
return PooledSession(this, acquire(timeoutMs));
185+
}
186+
187+
void SessionPool::insertTablet(Tablet& tablet, bool sorted) {
188+
execute([&](Session& s) { s.insertTablet(tablet, sorted); });
189+
}
190+
191+
void SessionPool::insertAlignedTablet(Tablet& tablet, bool sorted) {
192+
execute([&](Session& s) { s.insertAlignedTablet(tablet, sorted); });
193+
}
194+
195+
void SessionPool::insertTablets(std::unordered_map<std::string, Tablet*>& tablets, bool sorted) {
196+
execute([&](Session& s) { s.insertTablets(tablets, sorted); });
197+
}
198+
199+
void SessionPool::insertRecord(const std::string& deviceId, int64_t time,
200+
const std::vector<std::string>& measurements,
201+
const std::vector<std::string>& values) {
202+
execute([&](Session& s) { s.insertRecord(deviceId, time, measurements, values); });
203+
}
204+
205+
void SessionPool::insertRecords(const std::vector<std::string>& deviceIds,
206+
const std::vector<int64_t>& times,
207+
const std::vector<std::vector<std::string>>& measurementsList,
208+
const std::vector<std::vector<std::string>>& valuesList) {
209+
execute([&](Session& s) { s.insertRecords(deviceIds, times, measurementsList, valuesList); });
210+
}
211+
212+
void SessionPool::executeNonQueryStatement(const std::string& sql) {
213+
execute([&](Session& s) { s.executeNonQueryStatement(sql); });
214+
}
215+
216+
PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql) {
217+
PooledSession lease = getSession();
218+
try {
219+
auto dataSet = lease->executeQueryStatement(sql);
220+
return PooledSessionDataSet(std::move(lease), std::move(dataSet));
221+
} catch (const IoTDBConnectionException&) {
222+
lease.markBroken();
223+
throw;
224+
}
225+
}
226+
227+
PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql,
228+
int64_t timeoutInMs) {
229+
PooledSession lease = getSession();
230+
try {
231+
auto dataSet = lease->executeQueryStatement(sql, timeoutInMs);
232+
return PooledSessionDataSet(std::move(lease), std::move(dataSet));
233+
} catch (const IoTDBConnectionException&) {
234+
lease.markBroken();
235+
throw;
236+
}
237+
}
238+
239+
void SessionPool::close() {
240+
std::deque<std::shared_ptr<Session>> toClose;
241+
{
242+
std::lock_guard<std::mutex> lock(mutex_);
243+
if (closed_) {
244+
return;
245+
}
246+
closed_ = true;
247+
toClose.swap(idleQueue_);
248+
size_ -= toClose.size();
249+
}
250+
cv_.notify_all();
251+
// Sessions destructed here (outside the lock) close their connections.
252+
toClose.clear();
253+
}
254+
255+
size_t SessionPool::activeCount() {
256+
std::lock_guard<std::mutex> lock(mutex_);
257+
return size_ - idleQueue_.size();
258+
}

0 commit comments

Comments
 (0)