@@ -199,8 +199,8 @@ SSEClient::SSEClient(
199199 req->on_response ([&, silenceTimeout, reportIgnoredLines](const ng_client::response& res) {
200200 requestSent.release ();
201201 res.on_data ([&, silenceTimeout, reportIgnoredLines](const uint8_t * data, std::size_t len) {
202- // not a production-ready code. In real-life condition the data received in one callback might probably be incomplete
203- parseEvents (std::string ( reinterpret_cast < const char *>(data), len), eventWatcher, reportIgnoredLines);
202+ dataBuffer. append ( std::string ( reinterpret_cast < const char *>( data), len));
203+ parseEvents (eventWatcher, reportIgnoredLines);
204204 t.expires_from_now (silenceTimeout);
205205 });
206206 });
@@ -217,30 +217,39 @@ SSEClient::SSEClient(
217217 });
218218}
219219
220- void SSEClient::parseEvents (const std::string& msg, const RestconfNotificationWatcher& eventWatcher, const ReportIgnoredLines reportIgnoredLines)
220+ void SSEClient::parseEvents (const RestconfNotificationWatcher& eventWatcher, const ReportIgnoredLines reportIgnoredLines)
221221{
222222 static const std::string dataPrefix = " data:" ;
223223 static const std::string ignorePrefix = " :" ;
224224
225- std::istringstream iss (msg);
226- std::string line;
227- std::string event;
228-
229- while (std::getline (iss, line)) {
230- if (line.starts_with (ignorePrefix) && reportIgnoredLines == ReportIgnoredLines::Yes) {
231- eventWatcher.commentEvent (line);
232- } else if (line.starts_with (ignorePrefix)) {
233- continue ;
234- } else if (line.starts_with (dataPrefix)) {
235- event += line.substr (dataPrefix.size ());
236- } else if (line.empty () && !event.empty ()) {
237- eventWatcher.dataEvent (event);
238- event.clear ();
239- } else if (line.empty ()) {
240- continue ;
241- } else {
242- CAPTURE (msg);
243- FAIL (" Unprefixed response" );
225+ std::size_t pos = 0 ;
226+ constexpr auto EVENT_SEPARATOR = " \n\n " ; // FIXME: Not a production-ready code; does not deal with all possible newline combinations of CR and LF
227+
228+ while ((pos = dataBuffer.find (EVENT_SEPARATOR)) != std::string::npos) {
229+ // extract event
230+ auto rawEvent = dataBuffer.substr (0 , pos + std::char_traits<char >::length (EVENT_SEPARATOR));
231+ std::istringstream stream (rawEvent);
232+ dataBuffer.erase (0 , pos + std::char_traits<char >::length (EVENT_SEPARATOR));
233+
234+ // split on newlines
235+ std::string line;
236+ std::string event;
237+ while (std::getline (stream, line)) {
238+ if (line.starts_with (ignorePrefix) && reportIgnoredLines == ReportIgnoredLines::Yes) {
239+ eventWatcher.commentEvent (line);
240+ } else if (line.starts_with (ignorePrefix)) {
241+ continue ;
242+ } else if (line.starts_with (dataPrefix)) {
243+ event += line.substr (dataPrefix.size ());
244+ } else if (line.empty () && !event.empty ()) {
245+ eventWatcher.dataEvent (event);
246+ event.clear ();
247+ } else if (line.empty ()) {
248+ continue ;
249+ } else {
250+ CAPTURE (rawEvent);
251+ FAIL (" Unprefixed response" );
252+ }
244253 }
245254 }
246255}
0 commit comments