Skip to content

Commit c68a96e

Browse files
committed
Fix #364 Introduce for SSE cleanupClients(N) function
This function is similar to `cleanupClients(N)` for WebSocket: - It allows to cleanup an oldest SSE client if the connected client count is more than N - it resize the client list to remove empty entries following client disconnect This "book-keeping" function has to be called in the loop(). This is a non-elegant attempt to fix issue #364
1 parent 4c86b44 commit c68a96e

2 files changed

Lines changed: 42 additions & 16 deletions

File tree

src/AsyncEventSource.cpp

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
377377
#endif
378378
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
379379
if (i->get() == client) {
380-
_clients.erase(i);
380+
i->reset(); // reset the unique_ptr but do not remove the list entry yet to keep other iterators valid
381381
break;
382382
}
383383
}
@@ -392,7 +392,7 @@ void AsyncEventSource::close() {
392392
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
393393
#endif
394394
for (const auto &c : _clients) {
395-
if (c->connected()) {
395+
if (c.get() != nullptr && c->connected()) {
396396
/**
397397
* @brief: Fix self-deadlock by using recursive_mutex instead.
398398
* Due to c->close() shall call the callback function _onDisconnect()
@@ -403,24 +403,38 @@ void AsyncEventSource::close() {
403403
}
404404
}
405405

406+
void AsyncEventSource::cleanupClients(uint32_t maxClients) {
407+
#ifdef ESP32
408+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
409+
#endif
410+
411+
// first resize the list to remove unique_ptr with nullptr
412+
_clients.remove_if([](const std::unique_ptr<AsyncEventSourceClient> &c) {
413+
return c.get() == nullptr;
414+
});
415+
416+
// then close one old client if we have more than maxClients
417+
// no need to close more: cleanupClients() is expected to be called periodically
418+
// and next invocation will clean up the empty unique_ptr once teh associated client disconnect callback is invoked
419+
if (count() > maxClients) {
420+
_clients.front()->close();
421+
}
422+
}
423+
406424
// pmb fix
407425
size_t AsyncEventSource::avgPacketsWaiting() const {
408426
size_t aql = 0;
409427
uint32_t nConnectedClients = 0;
410428
#ifdef ESP32
411429
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
412430
#endif
413-
if (!_clients.size()) {
414-
return 0;
415-
}
416-
417431
for (const auto &c : _clients) {
418-
if (c->connected()) {
432+
if (c.get() != nullptr && c->connected()) {
419433
aql += c->packetsWaiting();
420434
++nConnectedClients;
421435
}
422436
}
423-
return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
437+
return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
424438
}
425439

426440
AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
@@ -431,10 +445,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
431445
size_t hits = 0;
432446
size_t miss = 0;
433447
for (const auto &c : _clients) {
434-
if (c->write(shared_msg)) {
435-
++hits;
436-
} else {
437-
++miss;
448+
if (c.get() != nullptr && c->connected()) {
449+
if (c->write(shared_msg)) {
450+
++hits;
451+
} else {
452+
++miss;
453+
}
438454
}
439455
}
440456
return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
@@ -446,7 +462,7 @@ size_t AsyncEventSource::count() const {
446462
#endif
447463
size_t n_clients{0};
448464
for (const auto &i : _clients) {
449-
if (i->connected()) {
465+
if (i.get() != nullptr && i->connected()) {
450466
++n_clients;
451467
}
452468
}
@@ -462,11 +478,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
462478
request->send(new AsyncEventSourceResponse(this));
463479
}
464480

481+
// list iteration protected by caller's lock
465482
void AsyncEventSource::_adjust_inflight_window() {
466-
if (_clients.size()) {
467-
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
483+
const size_t clientCount = count();
484+
if (clientCount) {
485+
size_t inflight = SSE_MAX_INFLIGH / clientCount;
468486
for (const auto &c : _clients) {
469-
c->set_max_inflight_bytes(inflight);
487+
if (c.get() != nullptr && c->connected()) {
488+
c->set_max_inflight_bytes(inflight);
489+
}
470490
}
471491
// Serial.printf("adjusted inflight to: %u\n", inflight);
472492
}

src/AsyncEventSource.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
#include <memory>
4848
#include <utility>
4949

50+
#ifndef DEFAULT_MAX_SSE_CLIENTS
51+
#define DEFAULT_MAX_SSE_CLIENTS UINT32_MAX
52+
#endif
53+
5054
class AsyncEventSource;
5155
class AsyncEventSourceResponse;
5256
class AsyncEventSourceClient;
@@ -272,6 +276,8 @@ class AsyncEventSource : public AsyncWebHandler {
272276
// close all connected clients
273277
void close();
274278

279+
void cleanupClients(uint32_t maxClients = DEFAULT_MAX_SSE_CLIENTS);
280+
275281
/**
276282
* @brief set on-connect callback for the client
277283
* used to deliver messages to client on first connect

0 commit comments

Comments
 (0)