Skip to content

Commit c08683e

Browse files
committed
Return set to vector for clients to be processed ensuring we run them in order
1 parent 490ee84 commit c08683e

6 files changed

Lines changed: 17 additions & 13 deletions

File tree

src/AsyncWorkQueue.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void AsyncWorkQueue::WorkerThreadMain()
5656
listRelease(vars.clients_pending_asyncwrite);
5757

5858
std::unique_lock<fastlock> lockf(serverTL->lockPendingWrite);
59-
serverTL->setclientsProcess.clear();
59+
serverTL->vecclientsProcess.clear();
6060
serverTL->clients_pending_write.clear();
6161
std::atomic_thread_fence(std::memory_order_seq_cst);
6262
}

src/blocked.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ void unblockClient(client *c) {
209209
listDelNode(g_pserver->paused_clients,c->paused_list_node);
210210
c->paused_list_node = NULL;
211211
} else if (c->btype == BLOCKED_STORAGE) {
212-
serverTL->setclientsProcess.insert(c);
212+
serverTL->vecclientsProcess.push_back(c);
213213
} else {
214214
serverPanic("Unknown btype in unblockClient().");
215215
}

src/db.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3142,8 +3142,11 @@ void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
31423142
if (fBulk)
31433143
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
31443144

3145-
if (sizeHint > 0 && aeThreadOwnsLock())
3145+
if (sizeHint > 0) {
3146+
aeAcquireLock();
31463147
dictExpand(m_dictChanged, sizeHint, false);
3148+
aeReleaseLock();
3149+
}
31473150
}
31483151

31493152
void redisDbPersistentData::removeAllCachedValues()
@@ -3362,8 +3365,9 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)
33623365
auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, veckeys.data(), veckeys.size());
33633366
if (tok != nullptr) {
33643367
for (client *c : setcBlocked) {
3365-
if (!(c->flags & CLIENT_BLOCKED))
3368+
if (!(c->flags & CLIENT_BLOCKED)) {
33663369
blockClient(c, BLOCKED_STORAGE);
3370+
}
33673371
}
33683372
tok->setc = std::move(setcBlocked);
33693373
tok->type = StorageToken::TokenType::SingleRead;
@@ -3429,6 +3433,6 @@ void redisDbPersistentData::processStorageToken(StorageToken *tok) {
34293433
if (c->flags & CLIENT_BLOCKED)
34303434
unblockClient(c);
34313435
else
3432-
serverTL->setclientsProcess.insert(c);
3436+
serverTL->vecclientsProcess.push_back(c);
34333437
}
34343438
}

src/networking.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,7 +1626,7 @@ void unlinkClient(client *c) {
16261626
c->fPendingAsyncWrite = FALSE;
16271627
}
16281628

1629-
serverTL->setclientsProcess.erase(c);
1629+
serverTL->vecclientsProcess.erase(std::remove(serverTL->vecclientsProcess.begin(), serverTL->vecclientsProcess.end(), c), serverTL->vecclientsProcess.end());
16301630
serverTL->setclientsPrefetch.erase(c);
16311631

16321632
/* Clear the tracking status. */
@@ -2822,7 +2822,7 @@ void readQueryFromClient(connection *conn) {
28222822
}
28232823
c->vecqueuedcmd.clear();
28242824
} else {
2825-
serverTL->setclientsProcess.insert(c);
2825+
serverTL->vecclientsProcess.push_back(c);
28262826
}
28272827
}
28282828
}
@@ -2839,10 +2839,10 @@ void processClients()
28392839
{
28402840
serverAssert(GlobalLocksAcquired());
28412841

2842-
// Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer
2843-
while (!serverTL->setclientsProcess.empty()) {
2844-
client *c = *serverTL->setclientsProcess.begin();
2845-
serverTL->setclientsProcess.erase(serverTL->setclientsProcess.begin());
2842+
// Note that this function is reentrant and vecclientsProcess may be modified by code called from processInputBuffer
2843+
while (!serverTL->vecclientsProcess.empty()) {
2844+
client *c = *serverTL->vecclientsProcess.begin();
2845+
serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin());
28462846

28472847
/* There is more data in the client input buffer, continue parsing it
28482848
* in case to check if there is a full command to execute. */

src/server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2847,7 +2847,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
28472847
g_pserver->db[0]->prefetchKeysFlash(serverTL->setclientsPrefetch);
28482848
for (client *c : serverTL->setclientsPrefetch) {
28492849
if (!(c->flags & CLIENT_BLOCKED))
2850-
serverTL->setclientsProcess.insert(c);
2850+
serverTL->vecclientsProcess.push_back(c);
28512851
}
28522852
serverTL->setclientsPrefetch.clear();
28532853
}

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2230,7 +2230,7 @@ struct redisServerThreadVars {
22302230

22312231
int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */
22322232
int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */
2233-
std::unordered_set<client*> setclientsProcess;
2233+
std::vector<client*> vecclientsProcess;
22342234
std::unordered_set<client*> setclientsPrefetch;
22352235
std::unordered_set<StorageToken*> setStorageTokensProcess;
22362236
dictAsyncRehashCtl *rehashCtl = nullptr;

0 commit comments

Comments
 (0)