File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -107,7 +107,7 @@ namespace OpenWifi {
107107 NewMessage.partition (0 );
108108 NewMessage.payload (Msg->Payload ());
109109 Producer.produce (NewMessage);
110- Producer.flush ( );
110+ Producer.poll ((std::chrono::milliseconds) 0 );
111111 }
112112 } catch (const cppkafka::HandleException &E) {
113113 poco_warning (Logger_,
@@ -117,8 +117,13 @@ namespace OpenWifi {
117117 } catch (...) {
118118 poco_error (Logger_, " std::exception" );
119119 }
120+ if (Queue_.size () == 0 ) {
121+ // message queue is empty, flush all previously sent messages
122+ Producer.flush ();
123+ }
120124 Note = Queue_.waitDequeueNotification ();
121125 }
126+ Producer.flush ();
122127 poco_information (Logger_, " Stopped..." );
123128 }
124129
@@ -324,4 +329,4 @@ namespace OpenWifi {
324329 partitions.front ().get_partition ()));
325330 }
326331
327- } // namespace OpenWifi
332+ } // namespace OpenWifi
You can’t perform that action at this time.
0 commit comments