We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
2 parents a88014d + 574ea60 commit 6249cb9Copy full SHA for 6249cb9
1 file changed
src/framework/KafkaManager.cpp
@@ -107,7 +107,16 @@ namespace OpenWifi {
107
NewMessage.partition(0);
108
NewMessage.payload(Msg->Payload());
109
Producer.produce(NewMessage);
110
- Producer.poll((std::chrono::milliseconds) 0);
+ if (Queue_.size() < 100) {
111
+ // use flush when internal queue is lightly loaded, i.e. flush after each
112
+ // message
113
+ Producer.flush();
114
+ }
115
+ else {
116
+ // use poll when internal queue is loaded to allow messages to be sent in
117
+ // batches
118
+ Producer.poll((std::chrono::milliseconds) 0);
119
120
}
121
} catch (const cppkafka::HandleException &E) {
122
poco_warning(Logger_,
0 commit comments