Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,20 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

_messageQueue.emplace_back(message, len);
if (_client) {
_messageQueue.emplace_back(message, len);
} else {
_messageQueue.clear();
return false;
}

/*
throttle queue run
if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) {
_runQueue();
}

Expand All @@ -235,15 +240,20 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

_messageQueue.emplace_back(std::move(msg));
if (_client) {
_messageQueue.emplace_back(std::move(msg));
} else {
_messageQueue.clear();
return false;
}

/*
throttle queue run
if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
*/
if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) {
if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) {
_runQueue();
}
return true;
Expand Down Expand Up @@ -334,7 +344,7 @@ void AsyncEventSourceClient::_runQueue() {
}

// flush socket
if (total_bytes_written) {
if (_client && total_bytes_written) {
_client->send();
}
}
Expand Down Expand Up @@ -410,17 +420,13 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
if (!_clients.size()) {
return 0;
}

for (const auto &c : _clients) {
if (c->connected()) {
aql += c->packetsWaiting();
++nConnectedClients;
}
}
return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up
}

AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
Expand All @@ -431,10 +437,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
size_t hits = 0;
size_t miss = 0;
for (const auto &c : _clients) {
if (c->write(shared_msg)) {
++hits;
} else {
++miss;
if (c->connected()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need a client-pointer-is-not-null check here? (and all the other c->connected() pattern usages?)

Copy link
Copy Markdown
Member Author

@mathieucarbou mathieucarbou Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR at least because c is the unique ptr of a AsyncEventSourceClient object wrapping the _client pointer. c->connected() si checking for a null _client ptr behind. c is not supposed to be null.

This PR is just some code cleanup and null checks.

I would rather discuss how to correctly protect the iteration over the AsyncEventSourceClient list in #370 which was opened for that goal.

if (c->write(shared_msg)) {
++hits;
} else {
++miss;
}
}
}
return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
Expand Down Expand Up @@ -462,11 +470,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
request->send(new AsyncEventSourceResponse(this));
}

// list iteration protected by caller's lock
void AsyncEventSource::_adjust_inflight_window() {
if (_clients.size()) {
size_t inflight = SSE_MAX_INFLIGH / _clients.size();
const size_t clientCount = count();
if (clientCount) {
size_t inflight = SSE_MAX_INFLIGH / clientCount;
Comment thread
mathieucarbou marked this conversation as resolved.
for (const auto &c : _clients) {
c->set_max_inflight_bytes(inflight);
if (c->connected()) {
c->set_max_inflight_bytes(inflight);
}
}
// Serial.printf("adjusted inflight to: %u\n", inflight);
}
Expand Down
Loading