Skip to content

Commit a68d335

Browse files
committed
Add a pull-style streaming select API
pg_clickhouse needs to consume select results one block at a time, but clickhouse-cpp only exposes a callback-driven select path today. That forces downstream users to layer coroutines or connection resets on top of the client when they need pull-style iteration. Add BeginSelect(), ReceiveSelectBlock(), and EndSelect() to mirror the existing multi-step insert workflow. The implementation reuses the existing query and packet handling code, keeps Query callbacks active for progress, profile, and log packets, and drains canceled queries so connections remain reusable. Add integration tests that cover full streaming iteration, preserved Query callbacks, early cleanup, end-of-stream reuse, and exception cleanup with subsequent reuse.
1 parent e22153c commit a68d335

3 files changed

Lines changed: 335 additions & 26 deletions

File tree

clickhouse/client.cpp

Lines changed: 195 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "client.h"
2+
#include "clickhouse/error_codes.h"
23
#include "clickhouse/version.h"
34
#include "protocol.h"
45

@@ -157,6 +158,12 @@ class Client::Impl {
157158

158159
void SelectWithExternalData(Query query, const ExternalTables& external_tables);
159160

161+
void BeginSelect(const Query& query);
162+
163+
std::optional<Block> ReceiveSelectBlock();
164+
165+
void EndSelect();
166+
160167
void SendCancel();
161168

162169
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
@@ -208,6 +215,14 @@ class Client::Impl {
208215

209216
void InitializeStreams(std::unique_ptr<SocketBase>&& socket);
210217

218+
void EnsureIdle(const char* action) const;
219+
220+
uint64_t DrainQueryResponse(const char* context);
221+
222+
void ResetSelectState();
223+
224+
std::optional<Block> TakeSelectBlock();
225+
211226
inline size_t GetConnectionAttempts() const
212227
{
213228
return options_.endpoints.size() * options_.send_retries;
@@ -258,7 +273,12 @@ class Client::Impl {
258273

259274
ServerInfo server_info_;
260275

261-
bool inserting_;
276+
bool inserting_ = false;
277+
bool selecting_ = false;
278+
bool discarding_select_data_ = false;
279+
bool select_finished_ = false;
280+
std::optional<Block> select_block_;
281+
std::unique_ptr<Query> select_query_;
262282
};
263283

264284
ClientOptions modifyClientOptions(ClientOptions opts)
@@ -289,16 +309,19 @@ Client::Impl::Impl(const ClientOptions& opts,
289309
}
290310

291311
Client::Impl::~Impl() {
312+
try {
313+
EndSelect();
314+
} catch (...) {
315+
}
316+
292317
try {
293318
EndInsert();
294319
} catch (...) {
295320
}
296321
}
297322

298323
void Client::Impl::ExecuteQuery(Query query) {
299-
if (inserting_) {
300-
throw ValidationError("cannot execute query while inserting");
301-
}
324+
EnsureIdle("execute query");
302325

303326
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
304327

@@ -315,9 +338,7 @@ void Client::Impl::ExecuteQuery(Query query) {
315338

316339

317340
void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
318-
if (inserting_) {
319-
throw ValidationError("cannot execute query while inserting");
320-
}
341+
EnsureIdle("execute query");
321342

322343
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
323344
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
@@ -338,6 +359,89 @@ void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& ext
338359
}
339360
}
340361

362+
void Client::Impl::BeginSelect(const Query& query) {
363+
EnsureIdle("begin select");
364+
365+
if (options_.ping_before_query) {
366+
RetryGuard([this]() { Ping(); });
367+
}
368+
369+
select_query_ = std::make_unique<Query>(query);
370+
select_block_.reset();
371+
select_finished_ = false;
372+
selecting_ = true;
373+
events_ = select_query_.get();
374+
375+
try {
376+
SendQuery(*select_query_);
377+
} catch (...) {
378+
ResetSelectState();
379+
throw;
380+
}
381+
}
382+
383+
std::optional<Block> Client::Impl::ReceiveSelectBlock() {
384+
if (!selecting_) {
385+
throw ValidationError("illegal call to ReceiveSelectBlock without first calling BeginSelect");
386+
}
387+
388+
if (auto block = TakeSelectBlock()) {
389+
return block;
390+
}
391+
392+
if (select_finished_) {
393+
return std::nullopt;
394+
}
395+
396+
uint64_t server_packet = 0;
397+
try {
398+
while (ReceivePacket(&server_packet)) {
399+
if (auto block = TakeSelectBlock()) {
400+
return block;
401+
}
402+
}
403+
} catch (...) {
404+
select_finished_ = true;
405+
throw;
406+
}
407+
408+
if (server_packet == ServerCodes::EndOfStream || server_packet == ServerCodes::Exception) {
409+
select_finished_ = true;
410+
return std::nullopt;
411+
}
412+
413+
select_finished_ = true;
414+
throw ProtocolError(std::string{"unexpected packet from server while receiving select block, expected Data, EndOfStream or Exception, got: "}
415+
+ (server_packet ? std::to_string(server_packet) : "nothing"));
416+
}
417+
418+
void Client::Impl::EndSelect() {
419+
if (!selecting_) {
420+
return;
421+
}
422+
423+
if (select_finished_) {
424+
ResetSelectState();
425+
return;
426+
}
427+
428+
try {
429+
discarding_select_data_ = true;
430+
SendCancel();
431+
DrainQueryResponse("receiving end of query");
432+
} catch (const ServerException& e) {
433+
if (e.GetCode() != ErrorCodes::QUERY_WAS_CANCELLED) {
434+
ResetSelectState();
435+
throw;
436+
}
437+
} catch (...) {
438+
ResetSelectState();
439+
throw;
440+
}
441+
442+
ResetSelectState();
443+
}
444+
341445
void Client::Impl::SendBlockData(const Block& block) {
342446
if (compression_ == CompressionState::Enable) {
343447
std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size, options_.compression_method);
@@ -382,9 +486,7 @@ std::string NameToQueryString(const std::string &input)
382486
}
383487

384488
void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
385-
if (inserting_) {
386-
throw ValidationError("cannot execute query while inserting, use SendInsertData instead");
387-
}
489+
EnsureIdle("insert");
388490

389491
if (options_.ping_before_query) {
390492
RetryGuard([this]() { Ping(); });
@@ -420,9 +522,7 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
420522
}
421523

422524
Block Client::Impl::BeginInsert(Query query) {
423-
if (inserting_) {
424-
throw ValidationError("cannot execute query while inserting");
425-
}
525+
EnsureIdle("begin insert");
426526

427527
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
428528

@@ -469,23 +569,12 @@ void Client::Impl::EndInsert() {
469569
SendData(Block());
470570

471571
// Wait for EOS.
472-
uint64_t eos_packet{0};
473-
while (ReceivePacket(&eos_packet)) {
474-
;
475-
}
476-
477-
if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception
478-
&& eos_packet != ServerCodes::Log && options_.rethrow_exceptions) {
479-
throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "}
480-
+ (eos_packet ? std::to_string(eos_packet) : "nothing") + ")");
481-
}
572+
DrainQueryResponse("receiving end of query");
482573
inserting_ = false;
483574
}
484575

485576
void Client::Impl::Ping() {
486-
if (inserting_) {
487-
throw ValidationError("cannot execute query while inserting");
488-
}
577+
EnsureIdle("ping");
489578

490579
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
491580
output_->Flush();
@@ -501,6 +590,7 @@ void Client::Impl::Ping() {
501590
void Client::Impl::ResetConnection() {
502591
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
503592
inserting_ = false;
593+
ResetSelectState();
504594

505595
if (!Handshake()) {
506596
throw ProtocolError("fail to connect to " + options_.host);
@@ -813,6 +903,17 @@ bool Client::Impl::ReceiveData() {
813903
}
814904
}
815905

906+
if (selecting_) {
907+
if (discarding_select_data_) {
908+
return true;
909+
}
910+
if (select_block_) {
911+
throw ProtocolError("received unexpected data packet while previous select block is still pending");
912+
}
913+
select_block_.emplace(std::move(block));
914+
return true;
915+
}
916+
816917
if (events_) {
817918
events_->OnData(block);
818919
if (!events_->OnDataCancelable(block)) {
@@ -876,6 +977,25 @@ void Client::Impl::SendCancel() {
876977
output_->Flush();
877978
}
878979

980+
void Client::Impl::ResetSelectState() {
981+
select_block_.reset();
982+
discarding_select_data_ = false;
983+
select_finished_ = false;
984+
selecting_ = false;
985+
events_ = nullptr;
986+
select_query_.reset();
987+
}
988+
989+
std::optional<Block> Client::Impl::TakeSelectBlock() {
990+
if (!select_block_) {
991+
return std::nullopt;
992+
}
993+
994+
Block block = std::move(*select_block_);
995+
select_block_.reset();
996+
return block;
997+
}
998+
879999
void Client::Impl::SendQuery(const Query& query, bool finalize) {
8801000
WireFormat::WriteUInt64(*output_, ClientCodes::Query);
8811001
WireFormat::WriteString(*output_, query.GetQueryID());
@@ -1047,6 +1167,31 @@ void Client::Impl::InitializeStreams(std::unique_ptr<SocketBase>&& socket) {
10471167
std::swap(socket, socket_);
10481168
}
10491169

1170+
void Client::Impl::EnsureIdle(const char* action) const {
1171+
if (inserting_) {
1172+
throw ValidationError(std::string("cannot ") + action + " while inserting");
1173+
}
1174+
if (selecting_) {
1175+
throw ValidationError(std::string("cannot ") + action + " while selecting");
1176+
}
1177+
}
1178+
1179+
uint64_t Client::Impl::DrainQueryResponse(const char* context) {
1180+
uint64_t terminal_packet = 0;
1181+
while (ReceivePacket(&terminal_packet)) {
1182+
;
1183+
}
1184+
1185+
if (terminal_packet != ServerCodes::EndOfStream && terminal_packet != ServerCodes::Exception
1186+
&& terminal_packet != ServerCodes::Log && options_.rethrow_exceptions) {
1187+
throw ProtocolError(std::string{"unexpected packet from server while "} + context
1188+
+ ", expected Exception, EndOfStream or Log, got: "
1189+
+ (terminal_packet ? std::to_string(terminal_packet) : "nothing"));
1190+
}
1191+
1192+
return terminal_packet;
1193+
}
1194+
10501195
bool Client::Impl::SendHello() {
10511196
WireFormat::WriteUInt64(*output_, ClientCodes::Hello);
10521197
WireFormat::WriteString(*output_, std::string(CLIENT_NAME));
@@ -1196,6 +1341,30 @@ void Client::Select(const Query& query) {
11961341
Execute(query);
11971342
}
11981343

1344+
void Client::BeginSelect(const Query& query) {
1345+
impl_->BeginSelect(query);
1346+
}
1347+
1348+
void Client::BeginSelect(const char* query) {
1349+
impl_->BeginSelect(Query(query));
1350+
}
1351+
1352+
void Client::BeginSelect(const std::string& query) {
1353+
impl_->BeginSelect(Query(query));
1354+
}
1355+
1356+
void Client::BeginSelect(const std::string& query, const std::string& query_id) {
1357+
impl_->BeginSelect(Query(query, query_id));
1358+
}
1359+
1360+
std::optional<Block> Client::ReceiveSelectBlock() {
1361+
return impl_->ReceiveSelectBlock();
1362+
}
1363+
1364+
void Client::EndSelect() {
1365+
impl_->EndSelect();
1366+
}
1367+
11991368
void Client::SelectWithExternalData(const std::string& query, const ExternalTables& external_tables, SelectCallback cb) {
12001369
impl_->SelectWithExternalData(Query(query).OnData(std::move(cb)), external_tables);
12011370
}

clickhouse/client.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,18 @@ class Client {
270270
/// Alias for Execute.
271271
void Select(const Query& query);
272272

273+
/// Start a select query and consume result blocks with ReceiveSelectBlock.
274+
void BeginSelect(const Query& query);
275+
void BeginSelect(const char* query);
276+
void BeginSelect(const std::string& query);
277+
void BeginSelect(const std::string& query, const std::string& query_id);
278+
279+
/// Receive the next block for a select session started by BeginSelect.
280+
std::optional<Block> ReceiveSelectBlock();
281+
282+
/// End a select session started by BeginSelect.
283+
void EndSelect();
284+
273285
/// Intends for insert block of data into a table \p table_name.
274286
void Insert(const std::string& table_name, const Block& block);
275287
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);

0 commit comments

Comments
 (0)