@@ -161,6 +161,12 @@ class Client::Impl {
161161
162162 void Insert (const std::string& table_name, const std::string& query_id, const Block& block);
163163
164+ PreparedInsert * PrepareInsert (Query query);
165+
166+ void FinishInsert ();
167+
168+ void SendData (const Block& block);
169+
164170 void Ping ();
165171
166172 void ResetConnection ();
@@ -175,12 +181,11 @@ class Client::Impl {
175181 bool Handshake ();
176182
177183 bool ReceivePacket (uint64_t * server_packet = nullptr );
184+ bool ReceivePreparePackets (uint64_t * server_packet = nullptr );
178185
179186 void SendQuery (const Query& query, bool finalize = true );
180187 void FinalizeQuery ();
181188
182- void SendData (const Block& block);
183-
184189 void SendBlockData (const Block& block);
185190 void SendExternalData (const ExternalTables& external_tables);
186191
@@ -415,6 +420,23 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
415420 }
416421}
417422
423+ void Client::Impl::FinishInsert () {
424+ // Send empty block as marker of end of data.
425+ SendData (Block ());
426+
427+ // Wait for EOS.
428+ uint64_t eos_packet{0 };
429+ while (ReceivePacket (&eos_packet)) {
430+ ;
431+ }
432+
433+ if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception
434+ && eos_packet != ServerCodes::Log && options_.rethrow_exceptions ) {
435+ throw ProtocolError (std::string{" unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: " }
436+ + (eos_packet ? std::to_string (eos_packet) : " nothing" ) + " )" );
437+ }
438+ }
439+
418440void Client::Impl::Ping () {
419441 WireFormat::WriteUInt64 (*output_, ClientCodes::Ping);
420442 output_->Flush ();
@@ -648,6 +670,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
648670 }
649671}
650672
673+ bool Client::Impl::ReceivePreparePackets (uint64_t * server_packet) {
674+ uint64_t packet_type = 0 ;
675+
676+ while (true ) {
677+ if (!WireFormat::ReadVarint64 (*input_, &packet_type)) {
678+ throw std::runtime_error (" unexpected package type " +
679+ std::to_string ((int )packet_type) + " for insert query" );
680+ }
681+ if (server_packet) {
682+ *server_packet = packet_type;
683+ }
684+
685+ switch (packet_type) {
686+ case ServerCodes::Data: {
687+ if (!ReceiveData ()) {
688+ throw ProtocolError (" can't read data packet from input stream" );
689+ }
690+ return true ;
691+ }
692+
693+ case ServerCodes::Exception: {
694+ ReceiveException ();
695+ return false ;
696+ }
697+
698+ case ServerCodes::ProfileInfo:
699+ case ServerCodes::Progress:
700+ case ServerCodes::Pong:
701+ case ServerCodes::Hello:
702+ continue ;
703+
704+ case ServerCodes::Log: {
705+ // log tag
706+ if (!WireFormat::SkipString (*input_)) {
707+ return false ;
708+ }
709+ Block block;
710+
711+ // Use uncompressed stream since log blocks usually contain only one row
712+ if (!ReadBlock (*input_, &block)) {
713+ return false ;
714+ }
715+
716+ if (events_) {
717+ events_->OnServerLog (block);
718+ }
719+ continue ;
720+ }
721+
722+ case ServerCodes::TableColumns: {
723+ // external table name
724+ if (!WireFormat::SkipString (*input_)) {
725+ return false ;
726+ }
727+
728+ // columns metadata
729+ if (!WireFormat::SkipString (*input_)) {
730+ return false ;
731+ }
732+ continue ;
733+ }
734+
735+ // No others expected.
736+ case ServerCodes::EndOfStream:
737+ case ServerCodes::ProfileEvents:
738+ default :
739+ throw UnimplementedError (" unimplemented " + std::to_string ((int )packet_type));
740+ break ;
741+ }
742+ }
743+ }
744+
651745bool Client::Impl::ReadBlock (InputStream& input, Block* block) {
652746 // Additional information about block.
653747 if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1063,7 +1157,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10631157 }
10641158 }
10651159 }
1066- // Connectiong with current_endpoint_ are broken.
1160+ // Connecting with current_endpoint_ are broken.
10671161 // Trying to establish with the another one from the list.
10681162 size_t connection_attempts_count = GetConnectionAttempts ();
10691163 for (size_t i = 0 ; i < connection_attempts_count;)
@@ -1085,6 +1179,41 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10851179 }
10861180}
10871181
1182+ Client::PreparedInsert * Client::Impl::PrepareInsert (Query query) {
1183+ // Arrange a query callback to extract a block that corresponds to the
1184+ // query columns.
1185+ auto block = new Block ();
1186+ query.OnData ([&block](const Block& b) {
1187+ for (Block::Iterator bi (b); bi.IsValid (); bi.Next ()) {
1188+ // Create the ClickHouse column type.
1189+ clickhouse::ColumnRef col = bi.Column ();
1190+ auto chtype = col->Type ();
1191+ if (chtype->GetCode () == Type::LowCardinality) {
1192+ chtype = col->As <ColumnLowCardinality>()->GetNestedType ();
1193+ }
1194+ block->AppendColumn (bi.Name (), clickhouse::CreateColumnByType (col->Type ()->GetName ()));
1195+ }
1196+
1197+ return true ;
1198+ });
1199+
1200+
1201+ EnsureNull en (static_cast <QueryEvents*>(&query), &events_);
1202+
1203+ if (options_.ping_before_query ) {
1204+ RetryGuard ([this ]() { Ping (); });
1205+ }
1206+
1207+ SendQuery (query.GetText ());
1208+
1209+ // Receive data packet but keep the query/connection open.
1210+ if (!ReceivePreparePackets ()) {
1211+ throw std::runtime_error (" fail to receive data packet" );
1212+ }
1213+
1214+ return new PreparedInsert (this , block);
1215+ }
1216+
10881217Client::Client (const ClientOptions& opts)
10891218 : options_(opts)
10901219 , impl_(new Impl(opts))
@@ -1149,6 +1278,14 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
11491278 impl_->Insert (table_name, query_id, block);
11501279}
11511280
1281+ Client::PreparedInsert * Client::PrepareInsert (const std::string& query) {
1282+ return impl_->PrepareInsert (Query (query));
1283+ }
1284+
1285+ Client::PreparedInsert * Client::PrepareInsert (const std::string& query, const std::string& query_id) {
1286+ return impl_->PrepareInsert (Query (query, query_id));
1287+ }
1288+
11521289void Client::Ping () {
11531290 impl_->Ping ();
11541291}
@@ -1179,4 +1316,26 @@ Client::Version Client::GetVersion() {
11791316 };
11801317}
11811318
1319+ Client::PreparedInsert::PreparedInsert (void *c, Block *b) {
1320+ client = c;
1321+ block = b;
1322+ }
1323+
1324+ Client::PreparedInsert::~PreparedInsert () {
1325+ auto c = (Client::Impl *)(client);
1326+ c->FinishInsert ();
1327+
1328+ // Do not delete client as we're effectively its child.
1329+ if (block) delete block;
1330+ }
1331+
1332+ Block * Client::PreparedInsert::GetBlock () { return block; }
1333+
1334+ void Client::PreparedInsert::Execute () {
1335+ auto c = (Client::Impl *)(client);
1336+ block->RefreshRowCount ();
1337+ c->SendData (*block);
1338+ block->Clear ();
1339+ }
1340+
11821341}
0 commit comments