Skip to content

Commit cf283c9

Browse files
committed
Fix tx issue
1 parent 5c81f21 commit cf283c9

1 file changed

Lines changed: 27 additions & 27 deletions

File tree

  • crates/vespertide-macro/src

crates/vespertide-macro/src/lib.rs

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use vespertide_loader::{
1111
load_config_or_default, load_migrations_at_compile_time, load_models_at_compile_time,
1212
};
1313
use vespertide_planner::apply_action;
14-
use vespertide_query::{DatabaseBackend, build_plan_queries};
14+
use vespertide_query::{build_plan_queries, DatabaseBackend};
1515

1616
struct MacroInput {
1717
pool: Expr,
@@ -90,12 +90,7 @@ pub(crate) fn build_migration_block(
9090

9191
// Generate version guard and SQL execution block
9292
let block = quote! {
93-
if version < #version {
94-
// Begin transaction
95-
let txn = __pool.begin().await.map_err(|e| {
96-
::vespertide::MigrationError::DatabaseError(format!("Failed to begin transaction: {}", e))
97-
})?;
98-
93+
if __version < #version {
9994
// Select SQL statements based on backend
10095
let sqls: &[&str] = match backend {
10196
sea_orm::DatabaseBackend::Postgres => &[#(#pg_sqls),*],
@@ -108,24 +103,18 @@ pub(crate) fn build_migration_block(
108103
for sql in sqls {
109104
if !sql.is_empty() {
110105
let stmt = sea_orm::Statement::from_string(backend, *sql);
111-
txn.execute_raw(stmt).await.map_err(|e| {
106+
__txn.execute_raw(stmt).await.map_err(|e| {
112107
::vespertide::MigrationError::DatabaseError(format!("Failed to execute SQL '{}': {}", sql, e))
113108
})?;
114109
}
115110
}
116111

117112
// Insert version record for this migration
118-
let q = if matches!(backend, sea_orm::DatabaseBackend::MySql) { '`' } else { '"' };
119-
let insert_sql = format!("INSERT INTO {q}{}{q} (version) VALUES ({})", version_table, #version);
113+
let insert_sql = format!("INSERT INTO {q}{}{q} (version) VALUES ({})", __version_table, #version);
120114
let stmt = sea_orm::Statement::from_string(backend, insert_sql);
121-
txn.execute_raw(stmt).await.map_err(|e| {
115+
__txn.execute_raw(stmt).await.map_err(|e| {
122116
::vespertide::MigrationError::DatabaseError(format!("Failed to insert version: {}", e))
123117
})?;
124-
125-
// Commit transaction
126-
txn.commit().await.map_err(|e| {
127-
::vespertide::MigrationError::DatabaseError(format!("Failed to commit transaction: {}", e))
128-
})?;
129118
}
130119
};
131120

@@ -141,36 +130,47 @@ pub(crate) fn generate_migration_code(
141130
quote! {
142131
async {
143132
use sea_orm::{ConnectionTrait, TransactionTrait};
144-
let __pool = #pool;
145-
let version_table = #version_table;
133+
let __pool = &#pool;
134+
let __version_table = #version_table;
146135
let backend = __pool.get_database_backend();
147-
148-
// Create version table if it does not exist
149-
// Table structure: version (INTEGER PRIMARY KEY), created_at (timestamp)
150136
let q = if matches!(backend, sea_orm::DatabaseBackend::MySql) { '`' } else { '"' };
137+
138+
// Create version table if it does not exist (outside transaction)
151139
let create_table_sql = format!(
152140
"CREATE TABLE IF NOT EXISTS {q}{}{q} (version INTEGER PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)",
153-
version_table
141+
__version_table
154142
);
155143
let stmt = sea_orm::Statement::from_string(backend, create_table_sql);
156144
__pool.execute_raw(stmt).await.map_err(|e| {
157145
::vespertide::MigrationError::DatabaseError(format!("Failed to create version table: {}", e))
158146
})?;
159147

160-
// Read current maximum version (latest applied migration)
161-
let select_sql = format!("SELECT MAX(version) as version FROM {q}{}{q}", version_table);
148+
// Single transaction for the entire migration process.
149+
// This prevents race conditions when multiple connections exist
150+
// (e.g. SQLite with max_connections > 1).
151+
let __txn = __pool.begin().await.map_err(|e| {
152+
::vespertide::MigrationError::DatabaseError(format!("Failed to begin transaction: {}", e))
153+
})?;
154+
155+
// Read current maximum version inside the transaction (holds lock)
156+
let select_sql = format!("SELECT MAX(version) as version FROM {q}{}{q}", __version_table);
162157
let stmt = sea_orm::Statement::from_string(backend, select_sql);
163-
let version_result = __pool.query_one_raw(stmt).await.map_err(|e| {
158+
let version_result = __txn.query_one_raw(stmt).await.map_err(|e| {
164159
::vespertide::MigrationError::DatabaseError(format!("Failed to read version: {}", e))
165160
})?;
166161

167-
let mut version = version_result
162+
let __version = version_result
168163
.and_then(|row| row.try_get::<i32>("", "version").ok())
169164
.unwrap_or(0) as u32;
170165

171-
// Execute each migration block
166+
// Execute each migration block within the same transaction
172167
#(#migration_blocks)*
173168

169+
// Commit the entire migration
170+
__txn.commit().await.map_err(|e| {
171+
::vespertide::MigrationError::DatabaseError(format!("Failed to commit transaction: {}", e))
172+
})?;
173+
174174
Ok::<(), ::vespertide::MigrationError>(())
175175
}
176176
}

0 commit comments

Comments
 (0)