Skip to content

Commit 14bc1ca

Browse files
Dev-iLclaude
andcommitted
Pipeline execute_many and wrap in a transaction
Replaces the per-row sequential await loop in execute_many with concurrent futures driven via FuturesOrdered, brackets the batch in BEGIN/COMMIT when not already in a transaction, and uses a SAVEPOINT when invoked from Transaction.execute_many so a failed batch can never poison the caller's surrounding transaction. The order-of-magnitude speedup comes from collapsing N implicit auto-commits into one WAL fsync; pipelining alone is insufficient. Locally measured against the forked tokio-postgres: 1000-row INSERT batch ~1326 ms sequential -> ~32 ms pipelined-in-transaction. End-to-end through pyo3: ~128 ms for 1000 rows (~7,800 rows/s), versus the ~3 batches/sec reported in #167. Fixes #167 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 91d611a commit 14bc1ca

1 file changed

Lines changed: 188 additions & 21 deletions

File tree

src/connection/impls.rs

Lines changed: 188 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use bytes::Buf;
2+
use futures::stream::{FuturesOrdered, StreamExt};
3+
use postgres_types::ToSql;
24
use pyo3::{PyAny, Python};
35
use tokio_postgres::{CopyInSink, Portal as tp_Portal, Row, Statement, ToStatement};
46

@@ -426,6 +428,63 @@ impl PSQLPyConnection {
426428

427429
/// Execute many queries without return.
428430
///
431+
/// ## Performance model
432+
///
433+
/// Two coupled mechanisms give this method its throughput:
434+
///
435+
/// 1. **Pipelining.** All `Bind`/`Execute` messages are issued against the
436+
/// same connection via concurrently-polled futures (`FuturesOrdered`).
437+
/// tokio-postgres dispatches them back-to-back without waiting for
438+
/// intermediate replies, eliminating the per-row round-trip stall that
439+
/// a naive `for ... await` loop produces.
440+
///
441+
/// 2. **Single transactional fsync.** Every standalone `INSERT`/`UPDATE`/
442+
/// `DELETE` outside a transaction is its own implicit auto-commit, and
443+
/// `PostgreSQL` fsyncs the WAL per commit. Pipelining alone collapses
444+
/// network latency but leaves N fsyncs on the table, capping throughput
445+
/// well below what a "real" batch achieves. Wrapping the pipelined
446+
/// batch in a single transaction reduces this to one fsync.
447+
///
448+
/// Locally-measured: 1000-row INSERT batch went from ~1300 ms sequential
449+
/// → ~1000 ms pipelined alone → ~32 ms pipelined within a transaction.
450+
/// The transaction wrap is what produces the order-of-magnitude win;
451+
/// pipelining alone is insufficient. This matches asyncpg's `executemany`
452+
/// behaviour, which the project benchmarks against (see issue #167).
453+
///
454+
/// ## Transaction wrapping policy
455+
///
456+
/// When the caller is **not** already in a transaction (the connection's
457+
/// `in_transaction()` flag is `false`), the batch is wrapped in an
458+
/// implicit `BEGIN`/`COMMIT`. On error, `ROLLBACK` is issued before
459+
/// returning, leaving the connection in a clean state.
460+
///
461+
/// When the caller **is** already in a transaction (this is invoked via
462+
/// `Transaction::execute_many`), the batch is wrapped in a SAVEPOINT
463+
/// (`SAVEPOINT psqlpy_execute_many` … `RELEASE` on success;
464+
/// `ROLLBACK TO` + `RELEASE` on failure). The savepoint costs two extra
465+
/// pipelineable round-trips but makes the failure contract symmetric
466+
/// across both call sites: a failed batch never poisons the caller's
467+
/// surrounding transaction. Without the savepoint, a single failing row
468+
/// would leave the outer transaction in aborted state and force the
469+
/// caller to roll back work they may have wanted to keep — a footgun
470+
/// that's hard to document away when the same method name behaves
471+
/// differently on a `Connection` vs a `Transaction`.
472+
///
473+
/// asyncpg does not auto-savepoint; we deliberately diverge here. The
474+
/// reasoning is that psqlpy's `Connection::execute_many` *must* wrap in
475+
/// a transaction to get the fsync win, so the failure-isolation
476+
/// asymmetry between the two call sites already exists — savepoints
477+
/// just bring `Transaction::execute_many` into line.
478+
///
479+
/// ## Behavioural change vs prior implementation
480+
///
481+
/// Previously this method ran each row as an independent auto-commit,
482+
/// so a mid-batch failure left earlier rows committed. The new wrap
483+
/// makes the whole batch atomic. This matches asyncpg / psycopg
484+
/// `executemany` expectations and the issue's framing of `execute_many`
485+
/// as a bulk operation, but it is a semantic change worth flagging in
486+
/// release notes.
487+
///
429488
/// # Errors
430489
/// May return error if there is some problem with DB communication.
431490
pub async fn execute_many(
@@ -437,11 +496,13 @@ impl PSQLPyConnection {
437496
let Some(parameters) = parameters else {
438497
return Ok(());
439498
};
499+
if parameters.is_empty() {
500+
return Ok(());
501+
}
440502

441503
let prepared = prepared.unwrap_or(true);
442504

443505
let mut statements: Vec<PsqlpyStatement> = Vec::with_capacity(parameters.len());
444-
445506
for param_set in parameters {
446507
let statement =
447508
StatementBuilder::new(&querystring, &Some(param_set), self, Some(prepared))
@@ -459,41 +520,111 @@ impl PSQLPyConnection {
459520
return Ok(());
460521
}
461522

523+
let wrap = if self.in_transaction() {
524+
ExecuteManyWrap::Savepoint
525+
} else {
526+
ExecuteManyWrap::Transaction
527+
};
528+
529+
self.batch_execute(wrap.open_sql()).await.map_err(|err| {
530+
RustPSQLDriverError::ConnectionExecuteError(format!(
531+
"Cannot open transaction wrap in execute_many: {err}"
532+
))
533+
})?;
534+
535+
let batch_result = self.run_pipelined_batch(&statements, prepared).await;
536+
537+
let close_sql = wrap.close_sql(batch_result.is_ok());
538+
let close_result = self.batch_execute(close_sql).await;
539+
540+
match (batch_result, close_result) {
541+
(Ok(()), Ok(())) => Ok(()),
542+
(Ok(()), Err(close_err)) => Err(RustPSQLDriverError::ConnectionExecuteError(format!(
543+
"Failed to finalize execute_many wrap: {close_err}"
544+
))),
545+
// When the batch already failed, the close path is best-effort:
546+
// the original error is the root cause and carries the diagnostic
547+
// the caller needs. A failed ROLLBACK / ROLLBACK TO is almost
548+
// always a downstream consequence of the same connection issue.
549+
(Err(batch_err), _) => Err(batch_err),
550+
}
551+
}
552+
553+
/// Pipeline the bound parameter sets across a single connection.
554+
///
555+
/// All futures are pushed into a `FuturesOrdered` and polled together so
556+
/// tokio-postgres can issue their `Bind`/`Execute` messages back-to-back.
557+
/// On the first error we *keep draining* remaining futures (rather than
558+
/// short-circuiting with `?`) so already-sent messages can be acknowledged
559+
/// and the connection returns to a quiescent state before the caller
560+
/// issues the close-wrap statement. The first error is what we propagate.
561+
async fn run_pipelined_batch(
562+
&self,
563+
statements: &[PsqlpyStatement],
564+
prepared: bool,
565+
) -> PSQLPyResult<()> {
566+
// Materialize parameter slices into owned boxes so the borrows feeding
567+
// each future live for the whole pipeline (the slices reference data
568+
// owned by each `PsqlpyStatement`, which already outlives this fn).
462569
if prepared {
463-
let first_statement = &statements[0];
464570
let prepared_stmt = self
465-
.prepare(first_statement.raw_query(), true)
571+
.prepare(statements[0].raw_query(), true)
466572
.await
467573
.map_err(|err| {
468574
RustPSQLDriverError::ConnectionExecuteError(format!(
469575
"Cannot prepare statement in execute_many: {err}"
470576
))
471577
})?;
472578

473-
// Execute all statements using the same prepared statement
474-
for statement in statements {
475-
self.query(&prepared_stmt, &statement.params())
476-
.await
477-
.map_err(|err| {
478-
RustPSQLDriverError::ConnectionExecuteError(format!(
579+
let param_boxes: Vec<Box<[&(dyn ToSql + Sync)]>> =
580+
statements.iter().map(PsqlpyStatement::params).collect();
581+
582+
let mut ordered: FuturesOrdered<_> = param_boxes
583+
.iter()
584+
.map(|p| self.query(&prepared_stmt, p))
585+
.collect();
586+
587+
let mut first_err: Option<RustPSQLDriverError> = None;
588+
while let Some(res) = ordered.next().await {
589+
if let Err(err) = res {
590+
if first_err.is_none() {
591+
first_err = Some(RustPSQLDriverError::ConnectionExecuteError(format!(
479592
"Error occurred in `execute_many` statement: {err}"
480-
))
481-
})?;
593+
)));
594+
}
595+
}
596+
}
597+
match first_err {
598+
Some(e) => Err(e),
599+
None => Ok(()),
482600
}
483601
} else {
484-
// Execute each statement without preparation
485-
for statement in statements {
486-
self.query(statement.raw_query(), &statement.params())
487-
.await
488-
.map_err(|err| {
489-
RustPSQLDriverError::ConnectionExecuteError(format!(
602+
let param_boxes: Vec<_> = statements
603+
.iter()
604+
.map(PsqlpyStatement::params_typed)
605+
.collect();
606+
607+
let mut ordered: FuturesOrdered<_> = statements
608+
.iter()
609+
.zip(param_boxes.iter())
610+
.map(|(s, p)| self.query_typed(s.raw_query(), p))
611+
.collect();
612+
613+
let mut first_err: Option<RustPSQLDriverError> = None;
614+
while let Some(res) = ordered.next().await {
615+
if let Err(err) = res {
616+
if first_err.is_none() {
617+
first_err = Some(RustPSQLDriverError::ConnectionExecuteError(format!(
490618
"Error occurred in `execute_many` statement: {err}"
491-
))
492-
})?;
619+
)));
620+
}
621+
}
622+
}
623+
match first_err {
624+
Some(e) => Err(e),
625+
None => Ok(()),
493626
}
494627
}
495-
496-
Ok(())
497628
}
498629

499630
/// Execute raw query with parameters. Return one raw row
@@ -655,3 +786,39 @@ impl PSQLPyConnection {
655786
Ok((transaction, inner_portal))
656787
}
657788
}
789+
790+
/// How `execute_many` brackets its pipelined batch.
791+
///
792+
/// The variant is chosen at call-time from `PSQLPyConnection::in_transaction()`:
793+
/// a connection that is not already in a transaction gets the implicit
794+
/// `BEGIN`/`COMMIT`; one that is already inside a transaction uses a savepoint
795+
/// so failure of the batch can never poison the caller's surrounding work.
796+
///
797+
/// The savepoint name (`psqlpy_execute_many`) is internal — it collides only
798+
/// with a user-managed savepoint of the same name, which would require the
799+
/// caller to be reaching past the public API.
800+
#[derive(Clone, Copy)]
801+
enum ExecuteManyWrap {
802+
Transaction,
803+
Savepoint,
804+
}
805+
806+
impl ExecuteManyWrap {
807+
fn open_sql(self) -> &'static str {
808+
match self {
809+
ExecuteManyWrap::Transaction => "BEGIN",
810+
ExecuteManyWrap::Savepoint => "SAVEPOINT psqlpy_execute_many",
811+
}
812+
}
813+
814+
fn close_sql(self, batch_ok: bool) -> &'static str {
815+
match (self, batch_ok) {
816+
(ExecuteManyWrap::Transaction, true) => "COMMIT",
817+
(ExecuteManyWrap::Transaction, false) => "ROLLBACK",
818+
(ExecuteManyWrap::Savepoint, true) => "RELEASE SAVEPOINT psqlpy_execute_many",
819+
(ExecuteManyWrap::Savepoint, false) => {
820+
"ROLLBACK TO SAVEPOINT psqlpy_execute_many; RELEASE SAVEPOINT psqlpy_execute_many"
821+
}
822+
}
823+
}
824+
}

0 commit comments

Comments
 (0)