Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,21 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
_clients.emplace_back(client);

// find first unique_ptr with nullptr and reuse it
bool reused = false;
for (auto &c : _clients) {
if (c.get() == nullptr) {
c.reset(client);
reused = true;
break;
}
}

if (!reused) {
_clients.emplace_back(client);
}

if (_connectcb) {
_connectcb(client);
}
Expand All @@ -387,7 +401,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
#endif
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
if (i->get() == client) {
_clients.erase(i);
i->reset(); // reset the unique_ptr but do not remove the list entry yet to keep other iterators valid
break;
}
}
Expand All @@ -402,7 +416,7 @@ void AsyncEventSource::close() {
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
for (const auto &c : _clients) {
if (c->connected()) {
if (c.get() != nullptr && c->connected()) {
/**
* @brief: Fix self-deadlock by using recursive_mutex instead.
* Due to c->close() shall call the callback function _onDisconnect()
Expand All @@ -413,6 +427,17 @@ void AsyncEventSource::close() {
}
}

void AsyncEventSource::cleanup() {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif

// resize the list to remove unique_ptr with nullptr
_clients.remove_if([](const std::unique_ptr<AsyncEventSourceClient> &c) {
return c.get() == nullptr;
});
}

// pmb fix
size_t AsyncEventSource::avgPacketsWaiting() const {
size_t aql = 0;
Expand All @@ -421,7 +446,7 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
for (const auto &c : _clients) {
if (c->connected()) {
if (c.get() != nullptr && c->connected()) {
aql += c->packetsWaiting();
++nConnectedClients;
}
Expand All @@ -437,7 +462,7 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
size_t hits = 0;
size_t miss = 0;
for (const auto &c : _clients) {
if (c->connected()) {
if (c.get() != nullptr && c->connected()) {
if (c->write(shared_msg)) {
++hits;
} else {
Expand All @@ -454,7 +479,7 @@ size_t AsyncEventSource::count() const {
#endif
size_t n_clients{0};
for (const auto &i : _clients) {
if (i->connected()) {
if (i.get() != nullptr && i->connected()) {
++n_clients;
}
}
Expand All @@ -476,7 +501,7 @@ void AsyncEventSource::_adjust_inflight_window() {
if (clientCount) {
size_t inflight = SSE_MAX_INFLIGH / clientCount;
for (const auto &c : _clients) {
if (c->connected()) {
if (c.get() != nullptr && c->connected()) {
c->set_max_inflight_bytes(inflight);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ class AsyncEventSource : public AsyncWebHandler {
// close all connected clients
void close();

// Cleanup internal resources.
// Has to be called periodically in the loop
void cleanup();

/**
* @brief set on-connect callback for the client
* used to deliver messages to client on first connect
Expand Down
Loading