Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 212 additions & 9 deletions custom/integration/settings_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
#include "integration/settings_controller.h"

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <fstream>
#include <functional>
#include <mutex>
#include <queue>
#include <new>
#include <sstream>
#include <string>
#include <thread>
#include <vector>

#if !defined(ESP_PLATFORM)
# include <condition_variable>
# include <mutex>
# include <queue>
# include <thread>
#endif

#include "../app_trace.h"
#include "hal/hal.h"
#include "settings_core/app_cfg.h"
Expand All @@ -35,6 +40,10 @@
# include "diag/diag.h"
# include "esp_err.h"
# include "esp_wifi.h"
# include "freertos/FreeRTOS.h"
# include "freertos/queue.h"
# include "freertos/semphr.h"
# include "freertos/task.h"
# include "net_sntp/net_sntp.h"
# include "ota_update/ota_update.h"
#endif
Expand All @@ -54,6 +63,12 @@ namespace custom::integration

constexpr const char* kDefaultUpdateBaseUrl = "https://updates.m5stack.com/tab5";

#if defined(ESP_PLATFORM)
constexpr UBaseType_t kTaskQueueLength = 8U;
constexpr uint32_t kWorkerTaskStackSize = 4096U;
constexpr UBaseType_t kWorkerTaskPriority = 5U;
#endif

std::string timestamp_string()
{
std::time_t now = std::time(nullptr);
Expand Down Expand Up @@ -115,7 +130,11 @@ namespace custom::integration
std::string current_variant_id() const;

void enqueue_task(std::function<void()> task);
#if defined(ESP_PLATFORM)
static void WorkerTaskEntry(void* arg);
#else
void worker_loop();
#endif
void refresh_all_connections();
void perform_connection_test(const std::string& tester_id);
void test_wifi_connection();
Expand Down Expand Up @@ -148,12 +167,19 @@ namespace custom::integration
bool diag_running_ = false;
#endif

std::atomic<bool> running_{false};
#if defined(ESP_PLATFORM)
TaskHandle_t worker_task_ = nullptr;
QueueHandle_t task_queue_ = nullptr;
SemaphoreHandle_t refresh_semaphore_ = nullptr;
std::atomic<bool> worker_active_{false};
#else
std::thread worker_thread_;
std::mutex mutex_;
std::condition_variable cv_;
bool running_ = false;
std::queue<std::function<void()>> tasks_;
std::chrono::steady_clock::time_point next_refresh_;
#endif

std::string backup_path_;
std::string logs_path_;
Expand All @@ -171,22 +197,107 @@ namespace custom::integration
logs_path_ = "m5tab5_logs.txt";
#endif

running_ = true;
#if defined(ESP_PLATFORM)
running_.store(true);
refresh_semaphore_ = xSemaphoreCreateBinary();
if (refresh_semaphore_ == nullptr)
{
APP_LOG_ERROR(kTag, "Failed to create refresh semaphore");
running_.store(false);
}

if (running_.load())
{
task_queue_ = xQueueCreate(kTaskQueueLength, sizeof(std::function<void()>*));
if (task_queue_ == nullptr)
{
APP_LOG_ERROR(kTag, "Failed to create task queue");
running_.store(false);
}
}

if (running_.load())
{
if (xTaskCreate(&SettingsController::Impl::WorkerTaskEntry,
"settings_ctrl",
kWorkerTaskStackSize,
this,
kWorkerTaskPriority,
&worker_task_)
== pdPASS)
{
worker_active_.store(true);
}
else
{
APP_LOG_ERROR(kTag, "Failed to start worker task");
worker_task_ = nullptr;
running_.store(false);
}
}

if (!running_.load())
{
if (task_queue_ != nullptr)
{
vQueueDelete(task_queue_);
task_queue_ = nullptr;
}
if (refresh_semaphore_ != nullptr)
{
vSemaphoreDelete(refresh_semaphore_);
refresh_semaphore_ = nullptr;
}
worker_active_.store(false);
}
#else
running_.store(true);
next_refresh_ = std::chrono::steady_clock::now() + kRefreshInterval;
worker_thread_ = std::thread(&SettingsController::Impl::worker_loop, this);
#endif
}

SettingsController::Impl::~Impl()
{
#if defined(ESP_PLATFORM)
running_.store(false);
if (refresh_semaphore_ != nullptr)
{
xSemaphoreGive(refresh_semaphore_);
}

while (worker_active_.load())
{
vTaskDelay(pdMS_TO_TICKS(10));
}

if (task_queue_ != nullptr)
{
std::function<void()>* pending = nullptr;
while (xQueueReceive(task_queue_, &pending, 0) == pdTRUE)
{
delete pending;
}
vQueueDelete(task_queue_);
task_queue_ = nullptr;
}

if (refresh_semaphore_ != nullptr)
{
vSemaphoreDelete(refresh_semaphore_);
refresh_semaphore_ = nullptr;
}
#else
{
std::lock_guard<std::mutex> lock(mutex_);
running_ = false;
running_.store(false);
}
cv_.notify_all();
if (worker_thread_.joinable())
{
worker_thread_.join();
}
#endif

#if defined(ESP_PLATFORM)
if (diag_running_)
Expand Down Expand Up @@ -637,17 +748,44 @@ namespace custom::integration

void SettingsController::Impl::enqueue_task(std::function<void()> task)
{
#if defined(ESP_PLATFORM)
if (!running_.load() || task_queue_ == nullptr)
{
return;
}

auto* heap_task = new (std::nothrow) std::function<void()>(std::move(task));
if (heap_task == nullptr)
{
APP_LOG_WARN(kTag, "Failed to allocate task");
return;
}

if (xQueueSend(task_queue_, &heap_task, 0) != pdPASS)
{
APP_LOG_WARN(kTag, "Task queue full");
delete heap_task;
return;
Comment on lines 748 to +768

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Avoid dropping queued settings tasks when queue fills

The new FreeRTOS worker uses a queue with a fixed length of 8 and xQueueSend(..., 0) rejects enqueues when the queue is full. When a burst of UI actions enqueues more than 8 lambdas (for example moving the brightness slider or toggling multiple options quickly), later tasks are silently discarded after logging "Task queue full". Previously the std::queue path buffered every request, so the final user action was eventually executed. With this change, a user’s last action can be lost and the device ends up in a state that doesn’t match the UI. Consider either blocking until space is available or draining/overwriting older tasks so that the most recent command is processed.

Useful? React with 👍 / 👎.

}

if (refresh_semaphore_ != nullptr)
{
xSemaphoreGive(refresh_semaphore_);
}
#else
{
std::lock_guard<std::mutex> lock(mutex_);
tasks_.push(std::move(task));
}
cv_.notify_all();
#endif
}

#if !defined(ESP_PLATFORM)
void SettingsController::Impl::worker_loop()
{
std::unique_lock<std::mutex> lock(mutex_);
while (running_)
while (running_.load())
{
if (!tasks_.empty())
{
Expand All @@ -670,9 +808,74 @@ namespace custom::integration
continue;
}

cv_.wait_until(lock, next_refresh_, [this]() { return !running_ || !tasks_.empty(); });
cv_.wait_until(
lock, next_refresh_, [this]() { return !running_.load() || !tasks_.empty(); });
}
}
#else
void SettingsController::Impl::WorkerTaskEntry(void* arg)
{
auto* self = static_cast<SettingsController::Impl*>(arg);
if (self == nullptr)
{
vTaskDelete(nullptr);
return;
}

if (self->task_queue_ == nullptr || self->refresh_semaphore_ == nullptr)
{
self->running_.store(false);
self->worker_active_.store(false);
self->worker_task_ = nullptr;
vTaskDelete(nullptr);
return;
}

self->worker_active_.store(true);

const uint32_t refresh_ms = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(kRefreshInterval).count());
const TickType_t refresh_ticks = std::max<TickType_t>(1, pdMS_TO_TICKS(refresh_ms));

while (self->running_.load())
{
std::function<void()>* task_ptr = nullptr;
while (self->running_.load()
&& xQueueReceive(self->task_queue_, &task_ptr, 0) == pdTRUE)
{
if (task_ptr == nullptr)
{
self->running_.store(false);
break;
}

(*task_ptr)();
delete task_ptr;
task_ptr = nullptr;
}

if (!self->running_.load())
{
break;
}

if (xSemaphoreTake(self->refresh_semaphore_, refresh_ticks) == pdFALSE)
{
self->refresh_all_connections();
}
}

std::function<void()>* leftover = nullptr;
while (xQueueReceive(self->task_queue_, &leftover, 0) == pdTRUE)
{
delete leftover;
}

self->worker_active_.store(false);
self->worker_task_ = nullptr;
vTaskDelete(nullptr);
}
#endif

void SettingsController::Impl::refresh_all_connections()
{
Expand Down
Loading