Skip to content

Commit edc75e4

Browse files
committed
Fix #364 Introduce cleanup() function
This function is similar to `cleanupClients(N)` for WebSocket except that it only cleanups internal resources. This "book-keeping" function has to be called in the loop(). This is a non-elegant attempt to fix issue #364
1 parent 4c86b44 commit edc75e4

2 files changed

Lines changed: 48 additions & 17 deletions

File tree

src/AsyncEventSource.cpp

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,21 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
360360
#ifdef ESP32
361361
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
362362
#endif
363-
_clients.emplace_back(client);
363+
364+
// find first unique_ptr with nullptr and reuse it
365+
bool reused = false;
366+
for (auto &c : _clients) {
367+
if (c.get() == nullptr) {
368+
c.reset(client);
369+
reused = true;
370+
break;
371+
}
372+
}
373+
374+
if (!reused) {
375+
_clients.emplace_back(client);
376+
}
377+
364378
if (_connectcb) {
365379
_connectcb(client);
366380
}
@@ -377,7 +391,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
377391
#endif
378392
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
379393
if (i->get() == client) {
380-
_clients.erase(i);
394+
i->reset(); // reset the unique_ptr but do not remove the list entry yet to keep other iterators valid
381395
break;
382396
}
383397
}
@@ -392,7 +406,7 @@ void AsyncEventSource::close() {
392406
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
393407
#endif
394408
for (const auto &c : _clients) {
395-
if (c->connected()) {
409+
if (c.get() != nullptr && c->connected()) {
396410
/**
397411
* @brief: Fix self-deadlock by using recursive_mutex instead.
398412
* Due to c->close() shall call the callback function _onDisconnect()
@@ -403,24 +417,31 @@ void AsyncEventSource::close() {
403417
}
404418
}
405419

420+
void AsyncEventSource::cleanup() {
421+
#ifdef ESP32
422+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
423+
#endif
424+
425+
// resize the list to remove unique_ptr with nullptr
426+
_clients.remove_if([](const std::unique_ptr<AsyncEventSourceClient> &c) {
427+
return c.get() == nullptr;
428+
});
429+
}
430+
406431
// pmb fix
407432
size_t AsyncEventSource::avgPacketsWaiting() const {
408433
size_t aql = 0;
409434
uint32_t nConnectedClients = 0;
410435
#ifdef ESP32
411436
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
412437
#endif
413-
if (!_clients.size()) {
414-
return 0;
415-
}
416-
417438
for (const auto &c : _clients) {
418-
if (c->connected()) {
439+
if (c.get() != nullptr && c->connected()) {
419440
aql += c->packetsWaiting();
420441
++nConnectedClients;
421442
}
422443
}
423-
return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
444+
return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
424445
}
425446

426447
AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
@@ -431,10 +452,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
431452
size_t hits = 0;
432453
size_t miss = 0;
433454
for (const auto &c : _clients) {
434-
if (c->write(shared_msg)) {
435-
++hits;
436-
} else {
437-
++miss;
455+
if (c.get() != nullptr && c->connected()) {
456+
if (c->write(shared_msg)) {
457+
++hits;
458+
} else {
459+
++miss;
460+
}
438461
}
439462
}
440463
return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
@@ -446,7 +469,7 @@ size_t AsyncEventSource::count() const {
446469
#endif
447470
size_t n_clients{0};
448471
for (const auto &i : _clients) {
449-
if (i->connected()) {
472+
if (i.get() != nullptr && i->connected()) {
450473
++n_clients;
451474
}
452475
}
@@ -462,11 +485,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
462485
request->send(new AsyncEventSourceResponse(this));
463486
}
464487

488+
// list iteration protected by caller's lock
465489
void AsyncEventSource::_adjust_inflight_window() {
466-
if (_clients.size()) {
467-
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
490+
const size_t clientCount = count();
491+
if (clientCount) {
492+
size_t inflight = SSE_MAX_INFLIGH / clientCount;
468493
for (const auto &c : _clients) {
469-
c->set_max_inflight_bytes(inflight);
494+
if (c.get() != nullptr && c->connected()) {
495+
c->set_max_inflight_bytes(inflight);
496+
}
470497
}
471498
// Serial.printf("adjusted inflight to: %u\n", inflight);
472499
}

src/AsyncEventSource.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ class AsyncEventSource : public AsyncWebHandler {
272272
// close all connected clients
273273
void close();
274274

275+
// Cleanup internal resources.
276+
// Has to be called periodically in the loop
277+
void cleanup();
278+
275279
/**
276280
* @brief set on-connect callback for the client
277281
* used to deliver messages to client on first connect

0 commit comments

Comments
 (0)