We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
2 parents 073856d + 8d04cbc commit bb09f91Copy full SHA for bb09f91
1 file changed
src/framework/KafkaManager.cpp
@@ -107,7 +107,7 @@ namespace OpenWifi {
107
NewMessage.partition(0);
108
NewMessage.payload(Msg->Payload());
109
Producer.produce(NewMessage);
110
- Producer.flush();
+ Producer.poll((std::chrono::milliseconds) 0);
111
}
112
} catch (const cppkafka::HandleException &E) {
113
poco_warning(Logger_,
@@ -117,6 +117,10 @@ namespace OpenWifi {
117
} catch (...) {
118
poco_error(Logger_, "std::exception");
119
120
+ if (Queue_.size() == 0) {
121
+ // message queue is empty, flush all previously sent messages
122
+ Producer.flush();
123
+ }
124
Note = Queue_.waitDequeueNotification();
125
126
poco_information(Logger_, "Stopped...");
0 commit comments