Skip to content

Commit ff3a07b

Browse files
committed
Consolidate parsing.
1 parent f330e97 commit ff3a07b

3 files changed

Lines changed: 55 additions & 144 deletions

File tree

libs/server-sent-events/src/curl_client.cpp

Lines changed: 14 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,9 @@ bool CurlClient::SetupCurlOptions(CURL* curl,
232232
headers = curl_slist_append(headers, header.c_str());
233233
}
234234

235-
// Add Last-Event-ID if we have one
235+
// Add Last-Event-ID if we have one from previous connection
236236
if (context.last_event_id && !context.last_event_id->empty()) {
237-
std::string last_event_header =
238-
"Last-Event-ID: " + *context.last_event_id;
237+
std::string last_event_header = "Last-Event-ID: " + *context.last_event_id;
239238
headers = curl_slist_append(headers, last_event_header.c_str());
240239
}
241240

@@ -322,7 +321,7 @@ int CurlClient::ProgressCallback(void* clientp,
322321
context->last_progress_time = now;
323322
} else {
324323
// No new data - check if we've exceeded the timeout
325-
auto elapsed = std::chrono::duration_cast<
324+
const auto elapsed = std::chrono::duration_cast<
326325
std::chrono::milliseconds>(
327326
now - context->last_progress_time);
328327

@@ -369,128 +368,17 @@ size_t CurlClient::WriteCallback(const char* data,
369368
return 0; // Abort the transfer
370369
}
371370

372-
// Parse SSE data
373-
std::string_view body(data, total_size);
374-
375-
// Parse stream into lines
376-
size_t i = 0;
377-
while (i < body.size()) {
378-
// Find next line delimiter
379-
const size_t delimiter_pos = body.find_first_of("\r\n", i);
380-
const size_t append_size = (delimiter_pos == std::string::npos)
381-
? (body.size() - i)
382-
: (delimiter_pos - i);
383-
384-
// Append to buffered line
385-
if (context->buffered_line.has_value()) {
386-
context->buffered_line->append(body.substr(i, append_size));
387-
} else {
388-
context->buffered_line = std::string(body.substr(i, append_size));
389-
}
390-
391-
i += append_size;
392-
393-
if (i >= body.size()) {
394-
break;
395-
}
396-
397-
// Handle line delimiters
398-
if (body[i] == '\r') {
399-
context->complete_lines.push_back(*context->buffered_line);
400-
context->buffered_line.reset();
401-
context->begin_CR = true;
402-
i++;
403-
} else if (body[i] == '\n') {
404-
if (context->begin_CR) {
405-
context->begin_CR = false;
406-
} else {
407-
context->complete_lines.push_back(*context->buffered_line);
408-
context->buffered_line.reset();
409-
}
410-
i++;
411-
}
412-
}
413-
414-
// Parse completed lines into events
415-
while (!context->complete_lines.empty()) {
416-
std::string line = std::move(context->complete_lines.front());
417-
context->complete_lines.pop_front();
418-
419-
if (line.empty()) {
420-
// Empty line indicates end of event
421-
if (context->current_event) {
422-
// Trim trailing newline from data
423-
if (!context->current_event->data.empty() &&
424-
context->current_event->data.back() == '\n') {
425-
context->current_event->data.pop_back();
426-
}
427-
428-
// Update last_event_id_ only when dispatching a completed event
429-
if (context->current_event->id) {
430-
context->last_event_id = context->current_event->id;
431-
}
432-
433-
// Dispatch event on executor thread
434-
auto event_data = context->current_event->data;
435-
auto event_type = context->current_event->type.empty()
436-
? "message"
437-
: context->current_event->type;
438-
auto event_id = context->current_event->id;
439-
context->receive(Event(
440-
std::move(event_type),
441-
std::move(event_data),
442-
std::move(event_id)));
443-
444-
context->current_event.reset();
445-
}
446-
continue;
447-
}
448-
449-
// Parse field
450-
const size_t colon_pos = line.find(':');
451-
if (colon_pos == 0) {
452-
// Comment line, dispatch it
453-
std::string comment = line.substr(1);
454-
455-
context->receive(Event("comment", comment));
456-
continue;
457-
}
458-
459-
std::string field_name;
460-
std::string field_value;
461-
462-
if (colon_pos == std::string::npos) {
463-
field_name = line;
464-
field_value = "";
465-
} else {
466-
field_name = line.substr(0, colon_pos);
467-
field_value = line.substr(colon_pos + 1);
468-
469-
// Remove leading space from value if present
470-
if (!field_value.empty() && field_value[0] == ' ') {
471-
field_value = field_value.substr(1);
472-
}
473-
}
474-
475-
// Initialize event if needed
476-
if (!context->current_event) {
477-
context->current_event.emplace(detail::Event{});
478-
context->current_event->id = context->last_event_id;
371+
// Set up the event receiver callback for the parser
372+
context->parser_body->on_event([context](Event event) {
373+
// Track last event ID for reconnection
374+
if (event.id()) {
375+
context->last_event_id = event.id();
479376
}
377+
context->receive(std::move(event));
378+
});
480379

481-
// Handle field
482-
if (field_name == "event") {
483-
context->current_event->type = field_value;
484-
} else if (field_name == "data") {
485-
context->current_event->data += field_value;
486-
context->current_event->data += '\n';
487-
} else if (field_name == "id") {
488-
if (field_value.find('\0') == std::string::npos) {
489-
context->current_event->id = field_value;
490-
}
491-
}
492-
// retry field is ignored for now
493-
}
380+
const std::string_view data_view(data, total_size);
381+
context->parser_reader->put(data_view);
494382

495383
return total_size;
496384
}
@@ -525,11 +413,8 @@ void CurlClient::PerformRequestWithMulti(
525413
return;
526414
}
527415

528-
// Clear parser state for new connection
529-
context->buffered_line.reset();
530-
context->complete_lines.clear();
531-
context->current_event.reset();
532-
context->begin_CR = false;
416+
// Initialize parser for new connection (last_event_id is tracked separately)
417+
context->init_parser();
533418

534419
CURL* curl = curl_easy_init();
535420
if (!curl) {

libs/server-sent-events/src/curl_client.hpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,18 @@ class CurlClient final : public Client,
7373
std::optional<Callbacks> callbacks_;
7474

7575
public:
76-
// SSE parser state
77-
std::optional<std::string> buffered_line;
78-
std::deque<std::string> complete_lines;
79-
bool begin_CR;
76+
// SSE parser using common parser from parser.hpp
77+
using ParserBody = detail::EventBody<std::function<void(Event)>>;
78+
std::unique_ptr<typename ParserBody::value_type> parser_body;
79+
std::unique_ptr<typename ParserBody::reader> parser_reader;
80+
81+
// Track last event ID for reconnection (separate from parser state)
82+
std::optional<std::string> last_event_id;
8083

8184
// Progress tracking for read timeout
8285
std::chrono::steady_clock::time_point last_progress_time;
8386
curl_off_t last_download_amount;
8487

85-
86-
std::optional<std::string> last_event_id;
87-
std::optional<detail::Event> current_event;
88-
8988
const http::request<http::string_body> req;
9089
const std::string url;
9190
const std::optional<std::chrono::milliseconds> connect_timeout;
@@ -182,11 +181,7 @@ class CurlClient final : public Client,
182181
bool skip_verify_peer
183182
) : shutting_down_(false),
184183
curl_socket_(CURL_SOCKET_BAD),
185-
buffered_line(std::nullopt),
186-
begin_CR(false),
187184
last_download_amount(0),
188-
last_event_id(std::nullopt),
189-
current_event(std::nullopt),
190185
req(std::move(req)),
191186
url(std::move(url)),
192187
connect_timeout(connect_timeout),
@@ -196,6 +191,12 @@ class CurlClient final : public Client,
196191
proxy_url(std::move(proxy_url)),
197192
skip_verify_peer(skip_verify_peer) {
198193
}
194+
195+
void init_parser() {
196+
parser_body = std::make_unique<typename ParserBody::value_type>();
197+
parser_reader = std::make_unique<typename ParserBody::reader>(*parser_body);
198+
parser_reader->init();
199+
}
199200
};
200201

201202
public:

libs/server-sent-events/src/parser.hpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ struct EventBody<EventReceiver>::reader {
6262
std::optional<Event> event_;
6363

6464
public:
65+
// Constructor for standalone use (curl_client) - no Boost types required
66+
explicit reader(value_type& body)
67+
: body_(body),
68+
buffered_line_(),
69+
complete_lines_(),
70+
begin_CR_(false),
71+
event_() {
72+
}
73+
74+
// Constructor for Boost Beast HTTP body reader (FoxyClient)
6575
template <bool isRequest, class Fields>
6676
reader(http::header<isRequest, Fields>& h, value_type& body)
6777
: body_(body),
@@ -87,6 +97,11 @@ struct EventBody<EventReceiver>::reader {
8797
ec = {};
8898
}
8999

100+
// Simplified init for standalone use - no Boost types required
101+
void init() {
102+
// Nothing to initialize
103+
}
104+
90105
/**
91106
* Store buffers.
92107
* This is called zero or more times with parsed body octets.
@@ -104,6 +119,16 @@ struct EventBody<EventReceiver>::reader {
104119
return buffer_bytes(buffers);
105120
}
106121

122+
/**
123+
* Simplified put for standalone use - no Boost types required.
124+
* Feed data into the parser. This can be called multiple times as data arrives.
125+
* @param data The data to parse
126+
*/
127+
void put(std::string_view data) {
128+
parse_stream(data);
129+
parse_events();
130+
}
131+
107132
/**
108133
* Called when the body is complete.
109134
* @param ec Set to the error, if any occurred.
@@ -124,20 +149,20 @@ struct EventBody<EventReceiver>::reader {
124149
// Appends the body to the buffered line until reaching any of the
125150
// characters specified within the search parameter. The search parameter is
126151
// treated as an array of search characters, not as a single token.
127-
size_t append_up_to(boost::string_view body, std::string const& search) {
152+
size_t append_up_to(std::string_view body, std::string const& search) {
128153
std::size_t index = body.find_first_of(search);
129154
if (index != std::string::npos) {
130155
body.remove_suffix(body.size() - index);
131156
}
132157
if (buffered_line_.has_value()) {
133-
buffered_line_->append(body.to_string());
158+
buffered_line_->append(body);
134159
} else {
135160
buffered_line_ = std::string{body};
136161
}
137162
return index == std::string::npos ? body.size() : index;
138163
}
139164

140-
void parse_stream(boost::string_view body) {
165+
void parse_stream(std::string_view body) {
141166
size_t i = 0;
142167
while (i < body.size()) {
143168
i += this->append_up_to(body.substr(i, body.length() - i), "\r\n");

0 commit comments

Comments
 (0)