5656#define FLOAT8LABEL "float8"
5757#define FLOAT8OID 701
5858
59+ #define FIN_DISCARD 0x1
60+ #define FIN_CLOSE 0x2
61+ #define FIN_ABORT 0x4
5962
6063
61- static int pgsql_stmt_dtor (pdo_stmt_t * stmt )
64+
65+ static void pgsql_stmt_finish (pdo_pgsql_stmt * S , int fin_mode )
6266{
63- pdo_pgsql_stmt * S = (pdo_pgsql_stmt * )stmt -> driver_data ;
64- bool server_obj_usable = !Z_ISUNDEF (stmt -> database_object_handle )
65- && IS_OBJ_VALID (EG (objects_store ).object_buckets [Z_OBJ_HANDLE (stmt -> database_object_handle )])
66- && !(OBJ_FLAGS (Z_OBJ (stmt -> database_object_handle )) & IS_OBJ_FREE_CALLED );
67+ if (S -> is_running_unbuffered && S -> result && (fin_mode & FIN_ABORT )) {
68+ PGcancel * cancel = PQgetCancel (S -> H -> server );
69+ char errbuf [256 ];
70+ PQcancel (cancel , errbuf , 256 );
71+ PQfreeCancel (cancel );
72+ S -> is_running_unbuffered = false;
73+ }
6774
6875 if (S -> result ) {
6976 /* free the resource */
7077 PQclear (S -> result );
7178 S -> result = NULL ;
7279 }
7380
74- if (S -> stmt_name ) {
75- if (S -> is_prepared && server_obj_usable ) {
76- pdo_pgsql_db_handle * H = S -> H ;
77- char * q = NULL ;
78- PGresult * res ;
81+ if (S -> is_running_unbuffered ) {
82+ /* https://postgresql.org/docs/current/libpq-async.html:
83+ * "PQsendQuery cannot be called again until PQgetResult has returned NULL"
84+ * And as all single-row functions are connection-wise instead of statement-wise,
85+ * any new single-row query has to make sure no preceding one is still running.
86+ */
87+ // @todo Implement !(fin_mode & FIN_DISCARD)
88+ // instead of discarding results we could store them to their statement
89+ // so that their fetch() will get them (albeit not in lazy mode anymore).
90+ while ((S -> result = PQgetResult (S -> H -> server ))) {
91+ PQclear (S -> result );
92+ S -> result = NULL ;
93+ }
94+ S -> is_running_unbuffered = false;
95+ }
7996
80- spprintf (& q , 0 , "DEALLOCATE %s" , S -> stmt_name );
81- res = PQexec (H -> server , q );
82- efree (q );
83- if (res ) {
84- PQclear (res );
85- }
97+ if (S -> stmt_name && S -> is_prepared && (fin_mode & FIN_CLOSE )) {
98+ char * q = NULL ;
99+ PGresult * res ;
100+
101+ spprintf (& q , 0 , "DEALLOCATE %s" , S -> stmt_name );
102+ res = PQexec (S -> H -> server , q );
103+ efree (q );
104+ if (res ) {
105+ PQclear (res );
106+ }
107+
108+ S -> is_prepared = false;
109+ if (S -> H -> running_stmt == S ) {
110+ S -> H -> running_stmt = NULL ;
86111 }
112+ }
113+ }
114+
115+ static int pgsql_stmt_dtor (pdo_stmt_t * stmt )
116+ {
117+ pdo_pgsql_stmt * S = (pdo_pgsql_stmt * )stmt -> driver_data ;
118+ bool server_obj_usable = !Z_ISUNDEF (stmt -> database_object_handle )
119+ && IS_OBJ_VALID (EG (objects_store ).object_buckets [Z_OBJ_HANDLE (stmt -> database_object_handle )])
120+ && !(OBJ_FLAGS (Z_OBJ (stmt -> database_object_handle )) & IS_OBJ_FREE_CALLED );
121+
122+ pgsql_stmt_finish (S , FIN_DISCARD |(server_obj_usable ? FIN_CLOSE |FIN_ABORT : 0 ));
123+
124+ if (S -> stmt_name ) {
87125 efree (S -> stmt_name );
88126 S -> stmt_name = NULL ;
89127 }
@@ -137,14 +175,20 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
137175 pdo_pgsql_stmt * S = (pdo_pgsql_stmt * )stmt -> driver_data ;
138176 pdo_pgsql_db_handle * H = S -> H ;
139177 ExecStatusType status ;
178+ int dispatch_result = 1 ;
140179
141180 bool in_trans = stmt -> dbh -> methods -> in_transaction (stmt -> dbh );
142181
143- /* ensure that we free any previous unfetched results */
144- if (S -> result ) {
145- PQclear (S -> result );
146- S -> result = NULL ;
182+ /* in unbuffered mode, finish any running statement: libpq explicitely prohibits this
183+ * and returns a PGRES_FATAL_ERROR when PQgetResult gets called for stmt 2 if DEALLOCATE
184+ * was called for stmt 1 inbetween
185+ * (maybe it will change with pipeline mode in libpq 14?) */
186+ if (S -> is_unbuffered && H -> running_stmt ) {
187+ pgsql_stmt_finish (H -> running_stmt , FIN_CLOSE );
188+ H -> running_stmt = NULL ;
147189 }
190+ /* ensure that we free any previous unfetched results */
191+ pgsql_stmt_finish (S , 0 );
148192
149193 S -> current_row = 0 ;
150194
@@ -219,6 +263,16 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
219263 }
220264 }
221265 }
266+ if (S -> is_unbuffered ) {
267+ dispatch_result = PQsendQueryPrepared (H -> server , S -> stmt_name ,
268+ stmt -> bound_params ?
269+ zend_hash_num_elements (stmt -> bound_params ) :
270+ 0 ,
271+ (const char * * )S -> param_values ,
272+ S -> param_lengths ,
273+ S -> param_formats ,
274+ 0 );
275+ } else {
222276 S -> result = PQexecPrepared (H -> server , S -> stmt_name ,
223277 stmt -> bound_params ?
224278 zend_hash_num_elements (stmt -> bound_params ) :
@@ -227,22 +281,54 @@ static int pgsql_stmt_execute(pdo_stmt_t *stmt)
227281 S -> param_lengths ,
228282 S -> param_formats ,
229283 0 );
284+ }
230285 } else if (stmt -> supports_placeholders == PDO_PLACEHOLDER_NAMED ) {
231286 /* execute query with parameters */
287+ if (S -> is_unbuffered ) {
288+ dispatch_result = PQsendQueryParams (H -> server , ZSTR_VAL (S -> query ),
289+ stmt -> bound_params ? zend_hash_num_elements (stmt -> bound_params ) : 0 ,
290+ S -> param_types ,
291+ (const char * * )S -> param_values ,
292+ S -> param_lengths ,
293+ S -> param_formats ,
294+ 0 );
295+ } else {
232296 S -> result = PQexecParams (H -> server , ZSTR_VAL (S -> query ),
233297 stmt -> bound_params ? zend_hash_num_elements (stmt -> bound_params ) : 0 ,
234298 S -> param_types ,
235299 (const char * * )S -> param_values ,
236300 S -> param_lengths ,
237301 S -> param_formats ,
238302 0 );
303+ }
239304 } else {
240305 /* execute plain query (with embedded parameters) */
306+ if (S -> is_unbuffered ) {
307+ dispatch_result = PQsendQuery (H -> server , ZSTR_VAL (stmt -> active_query_string ));
308+ } else {
241309 S -> result = PQexec (H -> server , ZSTR_VAL (stmt -> active_query_string ));
310+ }
311+ }
312+
313+ H -> running_stmt = S ;
314+
315+ if (S -> is_unbuffered ) {
316+ if (!dispatch_result ) {
317+ pdo_pgsql_error_stmt (stmt , 0 , NULL );
318+ H -> running_stmt = NULL ;
319+ return 0 ;
320+ }
321+ S -> is_running_unbuffered = true;
322+ (void )PQsetSingleRowMode (H -> server );
323+ /* no matter if it returns 0: PQ then transparently fallbacks to full result fetching */
324+
325+ /* try a first fetch to at least have column names and so on */
326+ S -> result = PQgetResult (S -> H -> server );
242327 }
328+
243329 status = PQresultStatus (S -> result );
244330
245- if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK ) {
331+ if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE ) {
246332 pdo_pgsql_error_stmt (stmt , status , pdo_pgsql_sqlstate (S -> result ));
247333 return 0 ;
248334 }
@@ -464,6 +550,34 @@ static int pgsql_stmt_fetch(pdo_stmt_t *stmt,
464550 return 0 ;
465551 }
466552 } else {
553+ if (S -> is_running_unbuffered && S -> current_row >= stmt -> row_count ) {
554+ ExecStatusType status ;
555+
556+ /* @todo in unbuffered mode, PQ allows multiple queries to be passed:
557+ * column_count should be recomputed on each iteration */
558+
559+ if (S -> result ) {
560+ PQclear (S -> result );
561+ S -> result = NULL ;
562+ }
563+
564+ S -> result = PQgetResult (S -> H -> server );
565+ status = PQresultStatus (S -> result );
566+
567+ if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK && status != PGRES_SINGLE_TUPLE ) {
568+ pdo_pgsql_error_stmt (stmt , status , pdo_pgsql_sqlstate (S -> result ));
569+ return 0 ;
570+ }
571+
572+ stmt -> row_count = (zend_long )PQntuples (S -> result );
573+ S -> current_row = 0 ;
574+
575+ if (!stmt -> row_count ) {
576+ S -> is_running_unbuffered = false;
577+ /* libpq requires looping until getResult returns null */
578+ pgsql_stmt_finish (S , 0 );
579+ }
580+ }
467581 if (S -> current_row < stmt -> row_count ) {
468582 S -> current_row ++ ;
469583 return 1 ;
0 commit comments