|
20 | 20 | #include <iostream> |
21 | 21 | #include <fstream> |
22 | 22 | #include <sstream> |
| 23 | +#include <queue> |
| 24 | +#include <mutex> |
| 25 | +#include <string> |
23 | 26 | #include "../mqtt/hasp_mqtt.h" |
| 27 | + |
| 28 | +/* Deferred command queue: MQTT callback runs on Paho thread; jsonl/json handlers call LVGL |
| 29 | + * (hasp_new_object) which is not thread-safe. Queue jsonl/json for processing on main thread. |
| 30 | + * 64 is enough for burst layout + state; PC has plenty of memory. */ |
| 31 | +#define DISPATCH_DEFERRED_QUEUE_MAX 64 |
| 32 | +static std::queue<std::pair<std::string, std::string>> deferred_queue; |
| 33 | +static std::mutex deferred_mutex; |
24 | 34 | #else |
25 | 35 | #include "StringStream.h" |
26 | 36 | #include "StreamUtils.h" // for exec ReadBufferingStream |
@@ -463,6 +473,33 @@ void dispatch_topic_payload(const char* topic, const char* payload, bool update, |
463 | 473 | dispatch_command(topic, (char*)payload, update, source); // dispatch as is |
464 | 474 | } |
465 | 475 |
|
| 476 | +#if HASP_TARGET_PC |
| 477 | +void dispatch_defer_command(const char* topic, const char* payload) |
| 478 | +{ |
| 479 | + std::lock_guard<std::mutex> lock(deferred_mutex); |
| 480 | + if(deferred_queue.size() >= DISPATCH_DEFERRED_QUEUE_MAX) { |
| 481 | + (void)deferred_queue.front(); |
| 482 | + deferred_queue.pop(); // drop oldest |
| 483 | + } |
| 484 | + deferred_queue.push({std::string(topic), std::string(payload)}); |
| 485 | +} |
| 486 | + |
| 487 | +void dispatch_process_deferred(void) |
| 488 | +{ |
| 489 | + std::pair<std::string, std::string> item; |
| 490 | + for(;;) { |
| 491 | + { |
| 492 | + std::lock_guard<std::mutex> lock(deferred_mutex); |
| 493 | + if(deferred_queue.empty()) break; |
| 494 | + item = deferred_queue.front(); |
| 495 | + deferred_queue.pop(); |
| 496 | + } |
| 497 | + bool update = item.second.size() > 0; |
| 498 | + dispatch_topic_payload(item.first.c_str(), item.second.c_str(), update, TAG_MQTT); |
| 499 | + } |
| 500 | +} |
| 501 | +#endif |
| 502 | + |
466 | 503 | // void dispatch_output_group_state(uint8_t groupid, uint16_t state) |
467 | 504 | // { |
468 | 505 | // char payload[64]; |
|
0 commit comments