Skip to content

Commit d03bcff

Browse files
authored
implement new session constructor (#16256)
* implement new session constructor * cpp client session constructor
1 parent 3446a31 commit d03bcff

14 files changed

Lines changed: 570 additions & 63 deletions
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <iostream>
21+
#include <memory>
22+
#include <string>
23+
#include <vector>
24+
25+
#include "Session.h"
26+
#include "SessionBuilder.h"
27+
#include "SessionDataSet.h"
28+
#include "TableSessionBuilder.h"
29+
30+
namespace {
31+
32+
void RunTreeExample() {
33+
try {
34+
std::vector<std::string> node_urls = {
35+
"127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"};
36+
37+
auto builder = std::make_shared<SessionBuilder>();
38+
auto session = std::shared_ptr<Session>(
39+
builder->username("root")
40+
->password("root")
41+
->nodeUrls(node_urls)
42+
->build());
43+
44+
session->open();
45+
if (!session->checkTimeseriesExists("root.test.d1.s1")) {
46+
session->createTimeseries("root.test.d1.s1", TSDataType::INT64,
47+
TSEncoding::RLE, CompressionType::SNAPPY);
48+
}
49+
session->deleteTimeseries("root.test.d1.s1");
50+
session->close();
51+
} catch (const std::exception& e) {
52+
std::cout << "Caught exception: " << e.what() << std::endl;
53+
}
54+
}
55+
56+
void RunTableExample() {
57+
try {
58+
std::vector<std::string> node_urls = {
59+
"127.0.0.1:6669", "127.0.0.1:6668", "127.0.0.1:6667"};
60+
61+
auto builder = std::make_shared<TableSessionBuilder>();
62+
auto session = std::shared_ptr<TableSession>(
63+
builder->username("root")
64+
->password("root")
65+
->nodeUrls(node_urls)
66+
->build());
67+
68+
session->open();
69+
70+
session->executeNonQueryStatement("DROP DATABASE IF EXISTS db1");
71+
session->executeNonQueryStatement("CREATE DATABASE db1");
72+
session->executeNonQueryStatement("DROP DATABASE IF EXISTS db2");
73+
session->executeNonQueryStatement("CREATE DATABASE db2");
74+
75+
session->close();
76+
} catch (const std::exception& e) {
77+
std::cout << "Caught exception: " << e.what() << std::endl;
78+
}
79+
}
80+
81+
82+
// Example: continuously write/query data so you can manually stop a node
83+
// to test client failover behavior.
84+
void RunResilienceExample() {
85+
try {
86+
std::vector<std::string> node_urls = {
87+
"127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"};
88+
89+
auto builder = std::make_shared<SessionBuilder>();
90+
auto session = std::shared_ptr<Session>(
91+
builder->username("root")
92+
->password("root")
93+
->nodeUrls(node_urls)
94+
->build());
95+
96+
session->open();
97+
98+
if (!session->checkTimeseriesExists("root.resilience.d1.s1")) {
99+
session->createTimeseries("root.resilience.d1.s1", TSDataType::INT64,
100+
TSEncoding::RLE, CompressionType::SNAPPY);
101+
}
102+
103+
std::cout << "Starting resilience test. "
104+
"Stop one node manually to see failover handling..."
105+
<< std::endl;
106+
107+
for (int i = 0; i < 60; ++i) { // run ~60 seconds
108+
int64_t timestamp = std::chrono::system_clock::now().time_since_epoch() /
109+
std::chrono::milliseconds(1);
110+
std::string value = to_string(i);
111+
const char* value_cstr = value.c_str();
112+
113+
try {
114+
session->insertRecord("root.resilience.d1", timestamp,
115+
{"s1"}, {TSDataType::INT64},
116+
{const_cast<char*>(value_cstr)});
117+
std::cout << "[Insert] ts=" << timestamp << ", value=" << value
118+
<< std::endl;
119+
120+
auto dataset = session->executeQueryStatement(
121+
"SELECT s1 FROM root.resilience.d1 LIMIT 1");
122+
std::cout << "[Query] Got dataset: "
123+
<< (dataset ? "Success" : "Null") << std::endl;
124+
125+
} catch (const std::exception& e) {
126+
std::cout << "Caught exception during resilience loop: " << e.what()
127+
<< std::endl;
128+
}
129+
130+
std::this_thread::sleep_for(std::chrono::seconds(1));
131+
}
132+
133+
session->close();
134+
} catch (const std::exception& e) {
135+
std::cout << "Caught exception in RunResilienceExample: " << e.what()
136+
<< std::endl;
137+
}
138+
}
139+
140+
} // namespace
141+
142+
int main() {
143+
//RunTreeExample();
144+
//RunTableExample();
145+
RunResilienceExample();
146+
return 0;
147+
}

iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ class AbstractSessionBuilder {
3030
std::string password = "root";
3131
std::string zoneId = "";
3232
int fetchSize = 10000;
33+
int connectTimeoutMs = 3 * 1000;
34+
int maxRetries = 3;
35+
int retryDelayMs = 500;
3336
std::string sqlDialect = "tree";
3437
std::string database = "";
3538
bool enableAutoFetch = true;
3639
bool enableRedirections = true;
3740
bool enableRPCCompression = false;
41+
std::vector<std::string> nodeUrls;
3842
};
3943

4044
#endif // IOTDB_ABSTRACTSESSIONBUILDER_H

iotdb-client/client-cpp/src/main/Common.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,3 +454,47 @@ const std::vector<char>& BitMap::getByteArray() const {
454454
size_t BitMap::getSize() const {
455455
return this->size;
456456
}
457+
458+
const std::string UrlUtils::PORT_SEPARATOR = ":";
459+
const std::string UrlUtils::ABB_COLON = "[";
460+
461+
TEndPoint UrlUtils::parseTEndPointIpv4AndIpv6Url(const std::string& endPointUrl) {
462+
TEndPoint endPoint;
463+
464+
// Return default TEndPoint if input is empty
465+
if (endPointUrl.empty()) {
466+
return endPoint;
467+
}
468+
469+
size_t portSeparatorPos = endPointUrl.find_last_of(PORT_SEPARATOR);
470+
471+
// If no port separator found, treat entire string as IP
472+
if (portSeparatorPos == std::string::npos) {
473+
endPoint.__set_ip(endPointUrl);
474+
return endPoint;
475+
}
476+
477+
// Extract port part
478+
std::string portStr = endPointUrl.substr(portSeparatorPos + 1);
479+
480+
// Extract IP part
481+
std::string ip = endPointUrl.substr(0, portSeparatorPos);
482+
483+
// Handle IPv6 addresses with brackets
484+
if (ip.find(ABB_COLON) != std::string::npos) {
485+
// Remove surrounding square brackets for IPv6
486+
if (ip.size() >= 2 && ip.front() == '[' && ip.back() == ']') {
487+
ip = ip.substr(1, ip.size() - 2);
488+
}
489+
}
490+
491+
try {
492+
int port = std::stoi(portStr);
493+
endPoint.__set_ip(ip);
494+
endPoint.__set_port(port);
495+
} catch (const std::exception& e) {
496+
endPoint.__set_ip(endPointUrl);
497+
}
498+
499+
return endPoint;
500+
}

iotdb-client/client-cpp/src/main/Common.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,5 +481,23 @@ class RpcUtils {
481481
static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(const TSStatus& status);
482482
};
483483

484+
class UrlUtils {
485+
private:
486+
static const std::string PORT_SEPARATOR;
487+
static const std::string ABB_COLON;
488+
489+
UrlUtils() = delete;
490+
~UrlUtils() = delete;
491+
492+
public:
493+
/**
494+
* Parse TEndPoint from a given TEndPointUrl
495+
* example:[D80:0000:0000:0000:ABAA:0000:00C2:0002]:22227
496+
*
497+
* @param endPointUrl ip:port
498+
* @return TEndPoint with default values if parse error
499+
*/
500+
static TEndPoint parseTEndPointIpv4AndIpv6Url(const std::string& endPointUrl);
501+
};
484502

485503
#endif

iotdb-client/client-cpp/src/main/NodesSupplier.cpp

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <utility>
2525

2626
const std::string NodesSupplier::SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
27+
const std::string NodesSupplier::RUNNING_STATUS = "Running";
2728
const std::string NodesSupplier::STATUS_COLUMN_NAME = "Status";
2829
const std::string NodesSupplier::IP_COLUMN_NAME = "RpcAddress";
2930
const std::string NodesSupplier::PORT_COLUMN_NAME = "RpcPort";
@@ -137,6 +138,7 @@ void NodesSupplier::deduplicateEndpoints() {
137138

138139
void NodesSupplier::startBackgroundRefresh(std::chrono::milliseconds interval) {
139140
isRunning_ = true;
141+
refreshEndpointList();
140142
refreshThread_ = std::thread([this, interval] {
141143
while (isRunning_) {
142144
refreshEndpointList();
@@ -149,60 +151,66 @@ void NodesSupplier::startBackgroundRefresh(std::chrono::milliseconds interval) {
149151
}
150152

151153
std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints() {
154+
for (const auto& endpoint : endpoints_) {
152155
try {
153-
if (client_ == nullptr) {
154-
client_ = std::make_shared<ThriftConnection>(selectionPolicy_(endpoints_));
155-
client_->init(userName_, password_, enableRPCCompression_, zoneId_, version);
156+
if (client_ == nullptr) {
157+
client_ = std::make_shared<ThriftConnection>(endpoint);
158+
client_->init(userName_, password_, enableRPCCompression_, zoneId_, version);
159+
}
160+
161+
auto sessionDataSet = client_->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
162+
163+
uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
164+
auto columnNames = sessionDataSet->getColumnNames();
165+
for (uint32_t i = 0; i < columnNames.size(); i++) {
166+
if (columnNames[i] == IP_COLUMN_NAME) {
167+
columnAddrIdx = i;
168+
} else if (columnNames[i] == PORT_COLUMN_NAME) {
169+
columnPortIdx = i;
170+
} else if (columnNames[i] == STATUS_COLUMN_NAME) {
171+
columnStatusIdx = i;
156172
}
173+
}
157174

158-
auto sessionDataSet = client_->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
159-
160-
uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
161-
auto columnNames = sessionDataSet->getColumnNames();
162-
for (uint32_t i = 0; i < columnNames.size(); i++) {
163-
if (columnNames[i] == IP_COLUMN_NAME) {
164-
columnAddrIdx = i;
165-
} else if (columnNames[i] == PORT_COLUMN_NAME) {
166-
columnPortIdx = i;
167-
} else if (columnNames[i] == STATUS_COLUMN_NAME) {
168-
columnStatusIdx = i;
169-
}
170-
}
175+
if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx == -1) {
176+
throw IoTDBException("Required columns not found in query result.");
177+
}
171178

172-
if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx == -1) {
173-
throw IoTDBException("Required columns not found in query result.");
174-
}
179+
std::vector<TEndPoint> ret;
180+
while (sessionDataSet->hasNext()) {
181+
auto record = sessionDataSet->next();
182+
std::string ip;
183+
int32_t port = 0;
184+
std::string status;
175185

176-
std::vector<TEndPoint> ret;
177-
while (sessionDataSet->hasNext()) {
178-
auto record = sessionDataSet->next();
179-
std::string ip;
180-
int32_t port;
181-
std::string status;
182-
if (record->fields.at(columnAddrIdx).stringV.is_initialized()) {
183-
ip = record->fields.at(columnAddrIdx).stringV.value();
184-
}
185-
if (record->fields.at(columnPortIdx).intV.is_initialized()) {
186-
port = record->fields.at(columnPortIdx).intV.value();
187-
}
188-
if (record->fields.at(columnStatusIdx).stringV.is_initialized()) {
189-
status = record->fields.at(columnStatusIdx).stringV.value();
190-
}
191-
if (ip == "0.0.0.0" || status == REMOVING_STATUS) {
192-
log_warn("Skipping invalid node: " + ip + ":" + to_string(port));
193-
continue;
194-
}
195-
TEndPoint endpoint;
196-
endpoint.ip = ip;
197-
endpoint.port = port;
198-
ret.emplace_back(endpoint);
186+
if (record->fields.at(columnAddrIdx).stringV.is_initialized()) {
187+
ip = record->fields.at(columnAddrIdx).stringV.value();
188+
}
189+
if (record->fields.at(columnPortIdx).intV.is_initialized()) {
190+
port = record->fields.at(columnPortIdx).intV.value();
191+
}
192+
if (record->fields.at(columnStatusIdx).stringV.is_initialized()) {
193+
status = record->fields.at(columnStatusIdx).stringV.value();
199194
}
200195

201-
return ret;
202-
} catch (const IoTDBException& e) {
203-
client_.reset();
204-
throw IoTDBException(std::string("NodesSupplier::fetchLatestEndpoints failed: ") + e.what());
196+
if (ip == "0.0.0.0" || status != RUNNING_STATUS) {
197+
log_warn("Skipping invalid node: " + ip + ":" + std::to_string(port));
198+
continue;
199+
}
200+
TEndPoint newEndpoint;
201+
newEndpoint.ip = ip;
202+
newEndpoint.port = port;
203+
ret.emplace_back(newEndpoint);
204+
}
205+
return ret; // success
206+
} catch (const std::exception& e) {
207+
log_warn("Failed to fetch endpoints from " + endpoint.ip + ":" +
208+
std::to_string(endpoint.port) + " , error=" + e.what());
209+
client_.reset(); // reset client before retrying next endpoint
210+
continue; // try next endpoint
205211
}
212+
}
213+
throw IoTDBException("NodesSupplier::fetchLatestEndpoints failed: all nodes unreachable.");
206214
}
207215

208216
void NodesSupplier::refreshEndpointList() {

iotdb-client/client-cpp/src/main/NodesSupplier.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class StaticNodesSupplier : public INodesSupplier {
6565
class NodesSupplier : public INodesSupplier {
6666
public:
6767
static const std::string SHOW_DATA_NODES_COMMAND;
68+
static const std::string RUNNING_STATUS;
6869
static const std::string STATUS_COLUMN_NAME;
6970
static const std::string IP_COLUMN_NAME;
7071
static const std::string PORT_COLUMN_NAME;

0 commit comments

Comments
 (0)