@@ -233,13 +233,13 @@ unwrap_for_block_column(const chc_type * t)
233233}
234234
235235pg_noreturn static void
236- raise_chc (const chc_err * err , int sqlstate )
236+ raise_chc (const chc_err * err , int sqlstate , const char * prefix )
237237{
238238 const char * m = (err && err -> msg [0 ]) ? err -> msg : "unknown error" ;
239239
240240 ereport (ERROR ,
241241 (errcode (sqlstate ),
242- errmsg ("pg_clickhouse: %s" , m )));
242+ errmsg ("pg_clickhouse: %s%s" , prefix ? prefix : " " , m )));
243243}
244244
245245/* ===================================================================== */
@@ -470,7 +470,8 @@ ch_binary_connect(ch_connection_details * details)
470470 int rc = chc_client_init (& s -> client , & opts , & pg_chc_alloc , & s -> io , & err );
471471
472472 if (rc != CHC_OK )
473- raise_chc (& err , ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION );
473+ raise_chc (& err , ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION ,
474+ "connection error: " );
474475 }
475476 PG_CATCH ();
476477 {
@@ -946,6 +947,7 @@ struct ch_binary_insert_handle
946947 bool array_active ;
947948 size_t array_col_idx ;
948949 bool started ;
950+ bool finalized ; /* finalize_insert has run (success or raised) */
949951};
950952
951953static void
@@ -1050,7 +1052,7 @@ classify_column(ic_col * ic, const chc_type * col_t)
10501052
10511053 ereport (ERROR ,
10521054 (errcode (ERRCODE_FDW_INVALID_DATA_TYPE ),
1053- errmsg ("pg_clickhouse: unsupported column type: %.*s" ,
1055+ errmsg ("pg_clickhouse: could not prepare insert - unsupported column type: %.*s" ,
10541056 (int ) tnlen , tname ? tname : "?" )));
10551057 }
10561058}
@@ -1067,7 +1069,7 @@ recv_initial_block(struct ch_binary_state *s, ch_binary_insert_handle * h)
10671069 if (rc != CHC_OK )
10681070 {
10691071 s -> broken = true;
1070- raise_chc (& err , ERRCODE_FDW_ERROR );
1072+ raise_chc (& err , ERRCODE_FDW_ERROR , "could not prepare insert - " );
10711073 }
10721074 if (pkt .kind == CHC_PKT_EXCEPTION )
10731075 {
@@ -1083,7 +1085,7 @@ recv_initial_block(struct ch_binary_state *s, ch_binary_insert_handle * h)
10831085 chc_packet_clear (s -> client , & pkt );
10841086 ereport (ERROR ,
10851087 (errcode (ERRCODE_FDW_ERROR ),
1086- errmsg ("pg_clickhouse: %s" , msg_copy )));
1088+ errmsg ("pg_clickhouse: could not prepare insert - %s" , msg_copy )));
10871089 }
10881090 if (pkt .kind == CHC_PKT_DATA && pkt .block &&
10891091 chc_block_n_columns (pkt .block ) > 0 )
@@ -1189,7 +1191,7 @@ ch_binary_begin_insert(ch_binary_connection_t * conn, const ch_query * query,
11891191 if (rc != CHC_OK )
11901192 {
11911193 s -> broken = true;
1192- raise_chc (& err , ERRCODE_FDW_ERROR );
1194+ raise_chc (& err , ERRCODE_FDW_ERROR , "could not prepare insert - " );
11931195 }
11941196
11951197 recv_initial_block (s , h );
@@ -1262,7 +1264,7 @@ resolve_col(ch_binary_insert_handle * h, size_t col_idx, bool isnull)
12621264
12631265 ereport (ERROR ,
12641266 (errcode (ERRCODE_NOT_NULL_VIOLATION ),
1265- errmsg ("pg_clickhouse: cannot append NULL to NOT NULL %.*s column" ,
1267+ errmsg ("pg_clickhouse: could not append data to column - cannot append NULL to NOT NULL %.*s column" ,
12661268 (int ) tnlen , tname ? tname : "?" )));
12671269 }
12681270 if (!h -> array_active && c -> is_nullable )
@@ -1978,7 +1980,7 @@ ch_binary_flush_block(ch_binary_insert_handle * h)
19781980 int rc = chc_block_builder_init (& bb , & pg_chc_alloc , & err );
19791981
19801982 if (rc != CHC_OK )
1981- raise_chc (& err , ERRCODE_FDW_ERROR );
1983+ raise_chc (& err , ERRCODE_FDW_ERROR , "could not append data to column - " );
19821984
19831985 for (size_t i = 0 ; i < h -> ncols ; i ++ )
19841986 {
@@ -2101,7 +2103,8 @@ ch_binary_flush_block(ch_binary_insert_handle * h)
21012103 }
21022104 }
21032105 if (rc != CHC_OK )
2104- raise_chc (& err , ERRCODE_FDW_ERROR );
2106+ raise_chc (& err , ERRCODE_FDW_ERROR ,
2107+ "could not append data to column - " );
21052108 }
21062109
21072110 rc = chc_client_send_data (h -> client , bb , & err );
@@ -2110,7 +2113,7 @@ ch_binary_flush_block(ch_binary_insert_handle * h)
21102113 {
21112114 if (h -> state )
21122115 h -> state -> broken = true;
2113- raise_chc (& err , ERRCODE_FDW_ERROR );
2116+ raise_chc (& err , ERRCODE_FDW_ERROR , "could not insert columns - " );
21142117 }
21152118
21162119 /* Reset per-column buffers for next batch. */
@@ -2132,62 +2135,72 @@ ch_binary_flush_block(ch_binary_insert_handle * h)
21322135 MemoryContextSwitchTo (old );
21332136}
21342137
2138+ /*
2139+ * Send final empty Data + drain. May ereport on server exception or
2140+ * transport failure. Idempotent via h->finalized. Leaves h->cxt alive;
2141+ * ch_binary_release_insert deletes it.
2142+ */
21352143void
2136- ch_binary_end_insert (ch_binary_insert_handle * h )
2144+ ch_binary_finalize_insert (ch_binary_insert_handle * h )
21372145{
2138- if (!h )
2146+ if (!h || h -> finalized )
2147+ return ;
2148+
2149+ /*
2150+ * Set early so an ereport(ERROR) below still leaves h in the "do not
2151+ * touch the wire" state for the release callback.
2152+ */
2153+ h -> finalized = true;
2154+
2155+ if (!(h -> started && h -> client ))
21392156 return ;
21402157
21412158 MemoryContext old = MemoryContextSwitchTo (h -> cxt );
21422159 char * exc_msg = NULL ;
21432160 bool broke = false;
2161+ chc_err err = {0 };
2162+ int rc = chc_client_send_data (h -> client , NULL , & err );
21442163
2145- if (h -> started && h -> client )
2164+ if (rc != CHC_OK )
21462165 {
2147- chc_err err = {0 };
2148- int rc = chc_client_send_data (h -> client , NULL , & err );
2149-
2150- if (rc != CHC_OK )
2151- {
2152- broke = true;
2153- exc_msg = pstrdup (err .msg [0 ] ? err .msg : "send_data failed" );
2154- }
2155- else
2166+ broke = true;
2167+ exc_msg = pstrdup (err .msg [0 ] ? err .msg : "send_data failed" );
2168+ }
2169+ else
2170+ {
2171+ /* Drain until EOS or exception. */
2172+ for (;;)
21562173 {
2157- /* Drain until EOS or exception. */
2158- for (;;)
2159- {
2160- chc_packet pkt = {0 };
2174+ chc_packet pkt = {0 };
21612175
2162- err = (chc_err )
2163- {
2164- 0
2165- };
2166- rc = chc_client_recv_packet (h -> client , & pkt , & err );
2167- if (rc != CHC_OK )
2168- {
2169- broke = true;
2170- exc_msg = pstrdup (err .msg [0 ] ? err .msg : "recv_packet failed" );
2171- chc_packet_clear (h -> client , & pkt );
2172- break ;
2173- }
2174- if (pkt .kind == CHC_PKT_EXCEPTION )
2175- {
2176- const char * msg = "server exception" ;
2177-
2178- if (pkt .exception && pkt .exception -> display_text )
2179- msg = pkt .exception -> display_text ;
2180- else if (pkt .exception && pkt .exception -> name )
2181- msg = pkt .exception -> name ;
2182- exc_msg = pstrdup (msg );
2183- broke = true;
2184- chc_packet_clear (h -> client , & pkt );
2185- break ;
2186- }
2176+ err = (chc_err )
2177+ {
2178+ 0
2179+ };
2180+ rc = chc_client_recv_packet (h -> client , & pkt , & err );
2181+ if (rc != CHC_OK )
2182+ {
2183+ broke = true;
2184+ exc_msg = pstrdup (err .msg [0 ] ? err .msg : "recv_packet failed" );
21872185 chc_packet_clear (h -> client , & pkt );
2188- if (pkt .kind == CHC_PKT_END_OF_STREAM )
2189- break ;
2186+ break ;
2187+ }
2188+ if (pkt .kind == CHC_PKT_EXCEPTION )
2189+ {
2190+ const char * msg = "server exception" ;
2191+
2192+ if (pkt .exception && pkt .exception -> display_text )
2193+ msg = pkt .exception -> display_text ;
2194+ else if (pkt .exception && pkt .exception -> name )
2195+ msg = pkt .exception -> name ;
2196+ exc_msg = pstrdup (msg );
2197+ broke = true;
2198+ chc_packet_clear (h -> client , & pkt );
2199+ break ;
21902200 }
2201+ chc_packet_clear (h -> client , & pkt );
2202+ if (pkt .kind == CHC_PKT_END_OF_STREAM )
2203+ break ;
21912204 }
21922205 }
21932206
@@ -2198,20 +2211,33 @@ ch_binary_end_insert(ch_binary_insert_handle * h)
21982211 if (broke && h -> state )
21992212 h -> state -> broken = true;
22002213
2201- /*
2202- * Copy exc_msg into the parent context before deleting h->cxt so the
2203- * ereport message survives.
2204- */
2214+ MemoryContextSwitchTo (old );
2215+
22052216 if (exc_msg )
22062217 {
2207- MemoryContextSwitchTo ( old );
2218+ /* exc_msg lives in h->cxt; copy into parent before raising. */
22082219 char * parent_msg = pstrdup (exc_msg );
22092220
2210- MemoryContextDelete (h -> cxt );
22112221 ereport (ERROR ,
22122222 (errcode (ERRCODE_FDW_ERROR ),
2213- errmsg ("pg_clickhouse: %s" , parent_msg )));
2223+ errmsg ("pg_clickhouse: could not finish INSERT - %s" , parent_msg )));
22142224 }
2215- MemoryContextSwitchTo (old );
2225+ }
2226+
2227+ /*
2228+ * Teardown counterpart to finalize. Safe from a MemoryContext reset
2229+ * callback: never raises, never talks to the server. If finalize did not
2230+ * run (mid-query abort), flags the connection broken so it rebuilds on
2231+ * next use.
2232+ */
2233+ void
2234+ ch_binary_release_insert (ch_binary_insert_handle * h )
2235+ {
2236+ if (!h )
2237+ return ;
2238+
2239+ if (!h -> finalized && h -> started && h -> state )
2240+ h -> state -> broken = true;
2241+
22162242 MemoryContextDelete (h -> cxt );
22172243}
0 commit comments