Skip to content

Commit e91a469

Browse files
ksdhansHans de Ruiter
andauthored
Add ability to use BEGIN IMMEDIATE and BEGIN EXCLUSIVE (#2488)
Co-authored-by: Hans de Ruiter <hans@keasigmadelta.com>
1 parent 2fd1587 commit e91a469

8 files changed

Lines changed: 314 additions & 49 deletions

File tree

orm_lib/inc/drogon/orm/DbClient.h

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ using ExceptionCallback = std::function<void(const DrogonDbException &)>;
4343
class Transaction;
4444
class DbClient;
4545

46+
/// Transaction locking mode.
47+
enum class TransactionType
48+
{
49+
Deferred, ///< BEGIN — lock acquired on first write (default)
50+
Immediate, ///< BEGIN IMMEDIATE — write lock acquired upfront (SQLite only)
51+
Exclusive, ///< BEGIN EXCLUSIVE — exclusive lock acquired upfront (SQLite
52+
///< only)
53+
};
54+
4655
namespace internal
4756
{
4857
#ifdef __cpp_impl_coroutine
@@ -73,14 +82,18 @@ struct [[nodiscard]] SqlAwaiter : public CallbackAwaiter<Result>
7382
struct [[nodiscard]] TransactionAwaiter
7483
: public CallbackAwaiter<std::shared_ptr<Transaction> >
7584
{
76-
explicit TransactionAwaiter(DbClient *client) : client_(client)
85+
explicit TransactionAwaiter(
86+
DbClient *client,
87+
TransactionType transType = TransactionType::Deferred)
88+
: client_(client), transType_(transType)
7789
{
7890
}
7991

8092
void await_suspend(std::coroutine_handle<> handle);
8193

8294
private:
8395
DbClient *client_;
96+
TransactionType transType_;
8497
};
8598

8699
#endif
@@ -269,7 +282,16 @@ class DROGON_EXPORT DbClient : public trantor::NonCopyable
269282
*/
270283
virtual std::shared_ptr<Transaction> newTransaction(
271284
const std::function<void(bool)> &commitCallback =
272-
std::function<void(bool)>()) noexcept(false) = 0;
285+
std::function<void(bool)>(),
286+
TransactionType transType =
287+
TransactionType::Deferred) noexcept(false) = 0;
288+
289+
/// Convenience overload: create a transaction with a specific locking mode.
290+
std::shared_ptr<Transaction> newTransaction(
291+
TransactionType transType) noexcept(false)
292+
{
293+
return newTransaction(std::function<void(bool)>(), transType);
294+
}
273295

274296
/// Create a transaction object in asynchronous mode.
275297
/**
@@ -278,12 +300,24 @@ class DROGON_EXPORT DbClient : public trantor::NonCopyable
278300
*/
279301
virtual void newTransactionAsync(
280302
const std::function<void(const std::shared_ptr<Transaction> &)>
281-
&callback) = 0;
303+
&callback,
304+
TransactionType transType = TransactionType::Deferred) = 0;
305+
306+
/// Convenience overload: create an async transaction with a specific
307+
/// locking mode, with transType as the first argument.
308+
void newTransactionAsync(
309+
TransactionType transType,
310+
const std::function<void(const std::shared_ptr<Transaction> &)>
311+
&callback)
312+
{
313+
newTransactionAsync(callback, transType);
314+
}
282315

283316
#ifdef __cpp_impl_coroutine
284-
orm::internal::TransactionAwaiter newTransactionCoro()
317+
orm::internal::TransactionAwaiter newTransactionCoro(
318+
TransactionType transType = TransactionType::Deferred)
285319
{
286-
return orm::internal::TransactionAwaiter(this);
320+
return orm::internal::TransactionAwaiter(this, transType);
287321
}
288322
#endif
289323

@@ -408,7 +442,8 @@ inline void internal::TransactionAwaiter::await_suspend(
408442
else
409443
setValue(transaction);
410444
handle.resume();
411-
});
445+
},
446+
transType_);
412447
}
413448
#endif
414449

orm_lib/src/DbClientImpl.cc

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ void DbClientImpl::execSql(
197197
}
198198

199199
void DbClientImpl::newTransactionAsync(
200-
const std::function<void(const std::shared_ptr<Transaction> &)> &callback)
200+
const std::function<void(const std::shared_ptr<Transaction> &)> &callback,
201+
TransactionType transType)
201202
{
202203
DbConnectionPtr conn;
203204
{
@@ -231,7 +232,7 @@ void DbClientImpl::newTransactionAsync(
231232
iter != transCallbacks_.end();
232233
++iter)
233234
{
234-
if (cbPtr == *iter)
235+
if (cbPtr == iter->first)
235236
{
236237
transCallbacks_.erase(iter);
237238
break;
@@ -251,24 +252,29 @@ void DbClientImpl::newTransactionAsync(
251252
(*newCallbackPtr) = callbackPtr;
252253
timeoutFlagPtr->runTimer();
253254
}
254-
transCallbacks_.push_back(callbackPtr);
255+
transCallbacks_.push_back({callbackPtr, transType});
255256
}
256257
}
257258
if (conn)
258259
{
259260
makeTrans(conn,
260261
std::function<void(const std::shared_ptr<Transaction> &)>(
261-
callback));
262+
callback),
263+
transType);
262264
}
263265
}
264266

265267
void DbClientImpl::makeTrans(
266268
const DbConnectionPtr &conn,
267-
std::function<void(const std::shared_ptr<Transaction> &)> &&callback)
269+
std::function<void(const std::shared_ptr<Transaction> &)> &&callback,
270+
TransactionType transType)
268271
{
269272
std::weak_ptr<DbClientImpl> weakThis = shared_from_this();
270273
auto trans = std::make_shared<TransactionImpl>(
271-
type_, conn, std::function<void(bool)>(), [weakThis, conn]() {
274+
type_,
275+
conn,
276+
std::function<void(bool)>(),
277+
[weakThis, conn]() {
272278
auto thisPtr = weakThis.lock();
273279
if (!thisPtr)
274280
return;
@@ -306,7 +312,8 @@ void DbClientImpl::makeTrans(
306312
});
307313
thisPtr->handleNewTask(conn);
308314
});
309-
});
315+
},
316+
transType);
310317
trans->doBegin();
311318
if (timeout_ > 0.0)
312319
{
@@ -317,13 +324,16 @@ void DbClientImpl::makeTrans(
317324
}
318325

319326
std::shared_ptr<Transaction> DbClientImpl::newTransaction(
320-
const std::function<void(bool)> &commitCallback) noexcept(false)
327+
const std::function<void(bool)> &commitCallback,
328+
TransactionType transType) noexcept(false)
321329
{
322330
std::promise<std::shared_ptr<Transaction>> pro;
323331
auto f = pro.get_future();
324-
newTransactionAsync([&pro](const std::shared_ptr<Transaction> &trans) {
325-
pro.set_value(trans);
326-
});
332+
newTransactionAsync(
333+
[&pro](const std::shared_ptr<Transaction> &trans) {
334+
pro.set_value(trans);
335+
},
336+
transType);
327337
auto trans = f.get();
328338
if (!trans)
329339
{
@@ -336,12 +346,15 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction(
336346
void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
337347
{
338348
std::function<void(const std::shared_ptr<Transaction> &)> transCallback;
349+
TransactionType transType{TransactionType::Deferred};
339350
std::shared_ptr<SqlCmd> cmd;
340351
{
341352
std::lock_guard<std::mutex> guard(connectionsMutex_);
342353
if (!transCallbacks_.empty())
343354
{
344-
transCallback = std::move(*(transCallbacks_.front()));
355+
auto &entry = transCallbacks_.front();
356+
transCallback = std::move(*entry.first);
357+
transType = entry.second;
345358
transCallbacks_.pop_front();
346359
}
347360
else if (!sqlCmdBuffer_.empty())
@@ -358,7 +371,7 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
358371
}
359372
if (transCallback)
360373
{
361-
makeTrans(connPtr, std::move(transCallback));
374+
makeTrans(connPtr, std::move(transCallback), transType);
362375
return;
363376
}
364377
if (cmd)

orm_lib/src/DbClientImpl.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,13 @@ class DbClientImpl : public DbClient,
5252
&&exceptCallback) override;
5353
std::shared_ptr<Transaction> newTransaction(
5454
const std::function<void(bool)> &commitCallback =
55-
std::function<void(bool)>()) noexcept(false) override;
55+
std::function<void(bool)>(),
56+
TransactionType transType =
57+
TransactionType::Deferred) noexcept(false) override;
5658
void newTransactionAsync(
5759
const std::function<void(const std::shared_ptr<Transaction> &)>
58-
&callback) override;
60+
&callback,
61+
TransactionType transType = TransactionType::Deferred) override;
5962
bool hasAvailableConnections() const noexcept override;
6063

6164
void setTimeout(double timeout) override
@@ -78,16 +81,19 @@ class DbClientImpl : public DbClient,
7881

7982
void makeTrans(
8083
const DbConnectionPtr &conn,
81-
std::function<void(const std::shared_ptr<Transaction> &)> &&callback);
84+
std::function<void(const std::shared_ptr<Transaction> &)> &&callback,
85+
TransactionType transType = TransactionType::Deferred);
8286

8387
mutable std::mutex connectionsMutex_;
8488
std::unordered_set<DbConnectionPtr> connections_;
8589
std::unordered_set<DbConnectionPtr> readyConnections_;
8690
std::unordered_set<DbConnectionPtr> busyConnections_;
8791

88-
std::list<std::shared_ptr<
89-
std::function<void(const std::shared_ptr<Transaction> &)>>>
90-
transCallbacks_;
92+
using TransCallbackEntry =
93+
std::pair<std::shared_ptr<std::function<void(
94+
const std::shared_ptr<Transaction> &)>>,
95+
TransactionType>;
96+
std::list<TransCallbackEntry> transCallbacks_;
9197

9298
std::deque<std::shared_ptr<SqlCmd>> sqlCmdBuffer_;
9399

orm_lib/src/DbClientLockFree.cc

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ void DbClientLockFree::execSql(
230230
}
231231

232232
std::shared_ptr<Transaction> DbClientLockFree::newTransaction(
233-
const std::function<void(bool)> &) noexcept(false)
233+
const std::function<void(bool)> &,
234+
TransactionType) noexcept(false)
234235
{
235236
// Don't support transaction;
236237
LOG_ERROR
@@ -241,7 +242,8 @@ std::shared_ptr<Transaction> DbClientLockFree::newTransaction(
241242
}
242243

243244
void DbClientLockFree::newTransactionAsync(
244-
const std::function<void(const std::shared_ptr<Transaction> &)> &callback)
245+
const std::function<void(const std::shared_ptr<Transaction> &)> &callback,
246+
TransactionType transType)
245247
{
246248
loop_->assertInLoopThread();
247249
for (auto &conn : connections_)
@@ -250,7 +252,8 @@ void DbClientLockFree::newTransactionAsync(
250252
{
251253
makeTrans(conn,
252254
std::function<void(const std::shared_ptr<Transaction> &)>(
253-
callback));
255+
callback),
256+
transType);
254257
return;
255258
}
256259
}
@@ -272,7 +275,7 @@ void DbClientLockFree::newTransactionAsync(
272275
iter != transCallbacks_.end();
273276
++iter)
274277
{
275-
if (cbPtr == *iter)
278+
if (cbPtr == iter->first)
276279
{
277280
transCallbacks_.erase(iter);
278281
break;
@@ -292,16 +295,20 @@ void DbClientLockFree::newTransactionAsync(
292295
*newCallbackPtr = callbackPtr;
293296
timeoutFlagPtr->runTimer();
294297
}
295-
transCallbacks_.push_back(callbackPtr);
298+
transCallbacks_.push_back({callbackPtr, transType});
296299
}
297300

298301
void DbClientLockFree::makeTrans(
299302
const DbConnectionPtr &conn,
300-
std::function<void(const std::shared_ptr<Transaction> &)> &&callback)
303+
std::function<void(const std::shared_ptr<Transaction> &)> &&callback,
304+
TransactionType transType)
301305
{
302306
std::weak_ptr<DbClientLockFree> weakThis = shared_from_this();
303307
auto trans = std::make_shared<TransactionImpl>(
304-
type_, conn, std::function<void(bool)>(), [weakThis, conn]() {
308+
type_,
309+
conn,
310+
std::function<void(bool)>(),
311+
[weakThis, conn]() {
305312
auto thisPtr = weakThis.lock();
306313
if (!thisPtr)
307314
return;
@@ -312,9 +319,11 @@ void DbClientLockFree::makeTrans(
312319
}
313320
if (!thisPtr->transCallbacks_.empty())
314321
{
315-
auto callback = std::move(thisPtr->transCallbacks_.front());
322+
auto &entry = thisPtr->transCallbacks_.front();
323+
auto nextCallback = std::move(*entry.first);
324+
auto nextType = entry.second;
316325
thisPtr->transCallbacks_.pop_front();
317-
thisPtr->makeTrans(conn, std::move(*callback));
326+
thisPtr->makeTrans(conn, std::move(nextCallback), nextType);
318327
return;
319328
}
320329

@@ -342,7 +351,8 @@ void DbClientLockFree::makeTrans(
342351
break;
343352
}
344353
}
345-
});
354+
},
355+
transType);
346356
transSet_.insert(conn);
347357
trans->doBegin();
348358
if (timeout_ > 0.0)
@@ -360,9 +370,11 @@ void DbClientLockFree::handleNewTask(const DbConnectionPtr &conn)
360370

361371
if (!transCallbacks_.empty())
362372
{
363-
auto callback = std::move(transCallbacks_.front());
373+
auto &entry = transCallbacks_.front();
374+
auto callback = std::move(*entry.first);
375+
auto transType = entry.second;
364376
transCallbacks_.pop_front();
365-
makeTrans(conn, std::move(*callback));
377+
makeTrans(conn, std::move(callback), transType);
366378
return;
367379
}
368380

orm_lib/src/DbClientLockFree.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,13 @@ class DbClientLockFree : public DbClient,
5555
&&exceptCallback) override;
5656
std::shared_ptr<Transaction> newTransaction(
5757
const std::function<void(bool)> &commitCallback =
58-
std::function<void(bool)>()) noexcept(false) override;
58+
std::function<void(bool)>(),
59+
TransactionType transType =
60+
TransactionType::Deferred) noexcept(false) override;
5961
void newTransactionAsync(
6062
const std::function<void(const std::shared_ptr<Transaction> &)>
61-
&callback) override;
63+
&callback,
64+
TransactionType transType = TransactionType::Deferred) override;
6265
bool hasAvailableConnections() const noexcept override;
6366

6467
void setTimeout(double timeout) override
@@ -78,15 +81,18 @@ class DbClientLockFree : public DbClient,
7881
std::unordered_set<DbConnectionPtr> transSet_;
7982
std::deque<std::shared_ptr<SqlCmd>> sqlCmdBuffer_;
8083

81-
std::list<std::shared_ptr<
82-
std::function<void(const std::shared_ptr<Transaction> &)>>>
83-
transCallbacks_;
84+
using TransCallbackEntry =
85+
std::pair<std::shared_ptr<std::function<void(
86+
const std::shared_ptr<Transaction> &)>>,
87+
TransactionType>;
88+
std::list<TransCallbackEntry> transCallbacks_;
8489

8590
double timeout_{-1.0};
8691

8792
void makeTrans(
8893
const DbConnectionPtr &conn,
89-
std::function<void(const std::shared_ptr<Transaction> &)> &&callback);
94+
std::function<void(const std::shared_ptr<Transaction> &)> &&callback,
95+
TransactionType transType = TransactionType::Deferred);
9096
void execSqlWithTimeout(
9197
const char *sql,
9298
size_t sqlLength,

0 commit comments

Comments
 (0)