Skip to content

Commit 0e1f181

Browse files
authored
feat(transaction): fix commit fail (#14)
1 parent 3dfdc54 commit 0e1f181

7 files changed

Lines changed: 52 additions & 66 deletions

File tree

EfCore.BulkOperations.API/Repositories/ProductRepository.cs

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
using System.Data;
2-
using System.Data.Common;
31
using EfCore.BulkOperations.API.Models;
42
using Microsoft.EntityFrameworkCore;
5-
using Microsoft.EntityFrameworkCore.Storage;
63

74
namespace EfCore.BulkOperations.API.Repositories;
85

@@ -57,14 +54,9 @@ public async Task<int> BulkMergeProducts(List<Product> products)
5754

5855
public async Task SyncDataThenCommit(List<Product> list1, List<Product> list2)
5956
{
60-
IDbContextTransaction? transaction = null;
61-
DbConnection? connection = null;
6257
try
6358
{
64-
connection = dbContext.Database.GetDbConnection();
65-
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
66-
transaction = await dbContext.Database.BeginTransactionAsync();
67-
var dbTransaction = transaction.GetDbTransaction();
59+
var dbTransaction = await dbContext.BeginTransactionAsync();
6860

6961
await dbContext.BulkInsertAsync(
7062
list1,
@@ -76,29 +68,20 @@ await dbContext.BulkInsertAsync(
7668
dbTransaction);
7769
await dbContext.BulkInsertAsync(list2, null, dbTransaction);
7870

79-
await transaction.CommitAsync();
71+
await dbContext.CommitAsync();
8072
}
8173
catch (Exception)
8274
{
83-
if (transaction is not null) await transaction.RollbackAsync();
75+
await dbContext.RollbackAsync();
8476
throw;
8577
}
86-
finally
87-
{
88-
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
89-
}
9078
}
9179

9280
public async Task SyncDataThenRollback(Product item1, List<Product> list2, List<Product> list3)
9381
{
94-
IDbContextTransaction? transaction = null;
95-
DbConnection? connection = null;
9682
try
9783
{
98-
connection = dbContext.Database.GetDbConnection();
99-
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
100-
transaction = await dbContext.Database.BeginTransactionAsync();
101-
var dbTransaction = transaction.GetDbTransaction();
84+
var dbTransaction = await dbContext.BeginTransactionAsync();
10285

10386
await dbContext.Products.AddAsync(item1);
10487
await dbContext.SaveChangesAsync();
@@ -109,12 +92,8 @@ public async Task SyncDataThenRollback(Product item1, List<Product> list2, List<
10992
}
11093
catch (Exception)
11194
{
112-
if (transaction is not null) await transaction.RollbackAsync();
95+
await dbContext.RollbackAsync();
11396
throw;
11497
}
115-
finally
116-
{
117-
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
118-
}
11998
}
12099
}

EfCore.BulkOperations/BulkCommand.cs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -323,12 +323,7 @@ internal static IEnumerable<BatchData> GenerateMergeBatches<T>(DbContext dbConte
323323
var colIndex = 0;
324324
columns.ToList().ForEach(column =>
325325
{
326-
var value = type.GetProperty(column.RefName)?.GetValue(row);
327-
if (column.ValueConverter is not null)
328-
value = column.ValueConverter.ConvertToProvider(value);
329-
330-
var paramName = $"{Prefix}{rowIndex}_{colIndex}".ToString();
331-
list.Add(new SqlParameter(paramName, value));
326+
var paramName = ProcessParameter(type, column, row, rowIndex, colIndex, list);
332327
sql.Append($"{paramName} AS `{column.Name}`, ");
333328
colIndex++;
334329
});
@@ -357,12 +352,7 @@ internal static IEnumerable<BatchData> GenerateMergeBatches<T>(DbContext dbConte
357352
var colIndex = 0;
358353
columns.ToList().ForEach(column =>
359354
{
360-
var value = type.GetProperty(column.RefName)?.GetValue(row);
361-
if (column.ValueConverter is not null)
362-
value = column.ValueConverter.ConvertToProvider(value);
363-
364-
var paramName = $"{Prefix}{rowIndex}_{colIndex}".ToString();
365-
list.Add(new SqlParameter(paramName, value));
355+
var paramName = ProcessParameter(type, column, row, rowIndex, colIndex, list);
366356
sql.Append($"{paramName}, ");
367357
colIndex++;
368358
});
@@ -373,4 +363,16 @@ internal static IEnumerable<BatchData> GenerateMergeBatches<T>(DbContext dbConte
373363
sql.Remove(sql.Length - 2, 1);
374364
return new TempTable(sql, parameters);
375365
}
366+
367+
private static string ProcessParameter<T>(Type type, ColumnInfo column, T row, int rowIndex, int colIndex,
368+
List<SqlParameter> list)
369+
{
370+
var value = type.GetProperty(column.RefName)?.GetValue(row);
371+
if (column.ValueConverter is not null)
372+
value = column.ValueConverter.ConvertToProvider(value);
373+
374+
var paramName = $"{Prefix}{rowIndex}_{colIndex}".ToString();
375+
list.Add(new SqlParameter(paramName, value));
376+
return paramName;
377+
}
376378
}

EfCore.BulkOperations/DbContextExtensions.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Data.Common;
22
using Microsoft.EntityFrameworkCore;
3+
using Microsoft.EntityFrameworkCore.Storage;
34

45
namespace EfCore.BulkOperations;
56

@@ -83,4 +84,24 @@ public static async Task<int> BulkMergeAsync<T>(this DbContext dbContext,
8384
{
8485
return await EfCoreBulkUtils.BulkMergeAsync(dbContext, items, optionFactory, transaction, cancellationToken);
8586
}
87+
88+
public static async Task<DbTransaction> BeginTransactionAsync(this DbContext dbContext,
89+
CancellationToken cancellationToken = default)
90+
{
91+
var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken);
92+
var dbTransaction = transaction.GetDbTransaction();
93+
return dbTransaction;
94+
}
95+
96+
public static async Task CommitAsync(this DbContext dbContext, CancellationToken cancellationToken = default)
97+
{
98+
if (dbContext.Database.CurrentTransaction != null)
99+
await dbContext.Database.CurrentTransaction.CommitAsync(cancellationToken);
100+
}
101+
102+
public static async Task RollbackAsync(this DbContext dbContext, CancellationToken cancellationToken = default)
103+
{
104+
if (dbContext.Database.CurrentTransaction != null)
105+
await dbContext.Database.CurrentTransaction.RollbackAsync(cancellationToken);
106+
}
86107
}

EfCore.BulkOperations/Docs/README.md

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,21 @@ EfCore.BulkOperations utilizes local transactions within bulk processes. If you
9898
can pass an existing transaction into the bulk process.
9999

100100
```js
101-
IDbContextTransaction? transaction = null;
102-
DbConnection? connection = null;
103101
try
104102
{
105-
connection = dbContext.Database.GetDbConnection();
106-
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
107-
transaction = await dbContext.Database.BeginTransactionAsync();
108-
var dbTransaction = transaction.GetDbTransaction();
103+
var dbTransaction = dbContext.BeginTransactionAsync();
109104

110-
await dbContext.Products.AddAsync(item1);
105+
await dbContext.Products.AddAsync (item1);
111106
await dbContext.SaveChangesAsync();
112107
await dbContext.BulkInsertAsync(list2, null, dbTransaction);
113108
await dbContext.BulkInsertAsync(list3, null, dbTransaction);
114109

115-
throw new DbUpdateException("Internal Server Error");
110+
throw new DbUpdateException("Some error occurs");
111+
await dbTransaction.CommitAsync();
116112
}
117-
catch (Exception ex)
113+
catch (Exception)
118114
{
119-
if (transaction is not null) await transaction.RollbackAsync();
115+
await dbContext.RollbackAsync();
120116
throw;
121117
}
122-
finally
123-
{
124-
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
125-
}
126118
```

EfCore.BulkOperations/EfCore.BulkOperations.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<RepositoryUrl>https://github.com/hongjs/EfCore.BulkOperations</RepositoryUrl>
1111
<RepositoryType>github</RepositoryType>
1212
<PackageTags>BulkInsert,BulkUpdate,BulkDelete,BulkMerge</PackageTags>
13-
<Version>1.4.1</Version>
13+
<Version>1.5.0</Version>
1414
<PackageReadmeFile>README.md</PackageReadmeFile>
1515
<PackageProjectUrl>https://github.com/hongjs/EfCore.BulkOperations</PackageProjectUrl>
1616
<PackageLicenseUrl>https://github.com/hongjs/EfCore.BulkOperations?tab=MIT-1-ov-file</PackageLicenseUrl>

EfCore.BulkOperations/EfCoreBulkUtils.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private static async Task<int> ExecuteBatchDataAsync(
175175
CancellationToken? cancellationToken = null)
176176
{
177177
await using var command = connection.CreateCommand();
178-
if (command.Connection is null) throw new ArgumentException("Command.Connection is null");
178+
if (command.Connection is null) throw new ArgumentNullException(nameof(connection));
179179
if (dbTransaction is not null) command.Transaction = dbTransaction;
180180
command.CommandText = batch.Sql.ToString();
181181
if (commandTimeout is not null) command.CommandTimeout = commandTimeout.Value;

README.md

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,21 @@ EfCore.BulkOperations utilizes local transactions within bulk processes. If you
9898
can pass an existing transaction into the bulk process.
9999

100100
```js
101-
IDbContextTransaction? transaction = null;
102-
DbConnection? connection = null;
103101
try
104102
{
105-
connection = dbContext.Database.GetDbConnection();
106-
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
107-
transaction = await dbContext.Database.BeginTransactionAsync();
108-
var dbTransaction = transaction.GetDbTransaction();
103+
var dbTransaction = dbContext.BeginTransactionAsync();
109104

110105
await dbContext.Products.AddAsync (item1);
111106
await dbContext.SaveChangesAsync();
112107
await dbContext.BulkInsertAsync(list2, null, dbTransaction);
113108
await dbContext.BulkInsertAsync(list3, null, dbTransaction);
114109

115-
throw new DbUpdateException("Internal Server Error");
110+
throw new DbUpdateException("Some error occurs");
111+
await dbTransaction.CommitAsync();
116112
}
117113
catch (Exception)
118114
{
119-
if (transaction is not null) await transaction.RollbackAsync();
115+
await dbContext.RollbackAsync();
120116
throw;
121117
}
122-
finally
123-
{
124-
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
125-
}
126118
```

0 commit comments

Comments
 (0)