Skip to content

Commit 5a1ab0f

Browse files
committed
Refactor PartitionsSaxHandler to use boost::json
Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com>
1 parent d6b6721 commit 5a1ab0f

File tree

8 files changed

+279
-158
lines changed

8 files changed

+279
-158
lines changed

olp-cpp-sdk-dataservice-read/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,21 @@ set(DESCRIPTION "C++ API library for reading OLP data")
2121
file(GLOB_RECURSE INC "include/*.h*")
2222
file(GLOB_RECURSE SRC "src/*.*")
2323

24+
find_package(Boost REQUIRED)
25+
2426
add_library(${PROJECT_NAME}
2527
${SRC}
2628
${INC})
2729

30+
target_compile_definitions(${PROJECT_NAME}
31+
PRIVATE
32+
BOOST_ALL_NO_LIB
33+
BOOST_JSON_NO_LIB)
34+
2835
target_include_directories(${PROJECT_NAME}
2936
PUBLIC
3037
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
38+
$<BUILD_INTERFACE:${Boost_INCLUDE_DIR}>
3139
$<INSTALL_INTERFACE:include>
3240
PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src>
3341
PRIVATE ${olp-cpp-sdk-core_INCLUDE_DIRS})

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023-2025 HERE Europe B.V.
2+
* Copyright (C) 2023-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,20 @@ RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
3131
return read_buffer_[count_];
3232
}
3333

34+
boost::json::string_view RapidJsonByteStream::ReadView() {
35+
if (ReadEmpty()) {
36+
SwapBuffers();
37+
}
38+
auto terminator_it =
39+
std::find(read_buffer_.begin() + count_, read_buffer_.end(), '\0');
40+
auto begin = read_buffer_.begin() + count_;
41+
boost::core::basic_string_view<char>::size_type size =
42+
std::distance(begin, terminator_it);
43+
count_ += size;
44+
full_count_ += size;
45+
return {&*begin, size};
46+
}
47+
3448
RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
3549
if (ReadEmpty()) {
3650
SwapBuffers();
@@ -41,12 +55,6 @@ RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
4155

4256
size_t RapidJsonByteStream::Tell() const { return full_count_; }
4357

44-
// Not implemented
45-
char* RapidJsonByteStream::PutBegin() { return 0; }
46-
void RapidJsonByteStream::Put(char) {}
47-
void RapidJsonByteStream::Flush() {}
48-
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
49-
5058
bool RapidJsonByteStream::ReadEmpty() const {
5159
return count_ == read_buffer_.size();
5260
}

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023-2025 HERE Europe B.V.
2+
* Copyright (C) 2023-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525

2626
#include <olp/core/client/ApiError.h>
2727
#include <olp/core/porting/optional.h>
28+
#include <boost/json/string_view.hpp>
2829

2930
namespace olp {
3031
namespace dataservice {
@@ -43,15 +44,12 @@ class RapidJsonByteStream {
4344
/// character.
4445
Ch Take();
4546

47+
/// Return the view of current read buffer until the end of first \0 character
48+
boost::json::string_view ReadView();
49+
4650
/// Get the current read cursor.
4751
size_t Tell() const;
4852

49-
/// Not needed for reading.
50-
char* PutBegin();
51-
void Put(char);
52-
void Flush();
53-
size_t PutEnd(char*);
54-
5553
bool ReadEmpty() const;
5654
bool WriteEmpty() const;
5755

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsRepository.cpp

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2024 HERE Europe B.V.
2+
* Copyright (C) 2019-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,8 @@
2626
#include <olp/core/client/Condition.h>
2727
#include <olp/core/logging/Log.h>
2828
#include <boost/functional/hash.hpp>
29+
#include <boost/json/basic_parser_impl.hpp>
30+
#include <boost/json/src.hpp>
2931
#include "CatalogRepository.h"
3032
#include "generated/api/MetadataApi.h"
3133
#include "generated/api/QueryApi.h"
@@ -648,37 +650,45 @@ client::ApiNoResponse PartitionsRepository::ParsePartitionsStream(
648650
const std::shared_ptr<AsyncJsonStream>& async_stream,
649651
const PartitionsStreamCallback& partition_callback,
650652
client::CancellationContext context) {
651-
rapidjson::ParseResult parse_result;
653+
auto parse_result =
654+
boost::json::make_error_code(boost::json::error::incomplete);
652655

653656
// We must perform at least one attempt to parse.
654657
do {
655-
rapidjson::Reader reader;
656-
auto partitions_handler =
657-
std::make_shared<repository::PartitionsSaxHandler>(partition_callback);
658-
659-
auto reader_cancellation_token = client::CancellationToken([=]() {
660-
partitions_handler->Abort();
661-
async_stream->CloseStream(client::ApiError::Cancelled());
662-
});
663-
664-
if (!context.ExecuteOrCancelled(
665-
[=]() { return reader_cancellation_token; })) {
658+
auto parser =
659+
std::make_shared<boost::json::basic_parser<PartitionsSaxHandler>>(
660+
boost::json::parse_options{}, partition_callback);
661+
662+
auto reader_cancellation_token =
663+
client::CancellationToken([parser, &async_stream]() {
664+
// partitions_handler->Abort();
665+
parser->handler().Abort();
666+
async_stream->CloseStream(client::ApiError::Cancelled());
667+
});
668+
669+
if (!context.ExecuteOrCancelled([reader_cancellation_token]() {
670+
return reader_cancellation_token;
671+
})) {
666672
return client::ApiError::Cancelled();
667673
}
668674

669675
auto json_stream = async_stream->GetCurrentStream();
670676

671-
parse_result = reader.Parse<rapidjson::kParseIterativeFlag>(
672-
*json_stream, *partitions_handler);
677+
while (json_stream->Peek() != '\0') {
678+
auto view = json_stream->ReadView();
679+
if (parser->write_some(true, view.data(), view.size(), parse_result)) {
680+
parse_result = {};
681+
}
682+
}
673683
// Retry to parse the stream until it's closed.
674684
} while (!async_stream->IsClosed());
675685

676686
auto error = async_stream->GetError();
677687

678688
if (error) {
679689
return {*error};
680-
} else if (!parse_result) {
681-
return client::ApiError(parse_result.Code(), "Parsing error");
690+
} else if (parse_result.failed()) {
691+
return client::ApiError(parse_result.value(), "Parsing error");
682692
} else {
683693
return client::ApiNoResult{};
684694
}

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsSaxHandler.cpp

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,46 +35,45 @@ constexpr unsigned long long int HashStringToInt(
3535
PartitionsSaxHandler::PartitionsSaxHandler(PartitionCallback partition_callback)
3636
: partition_callback_(std::move(partition_callback)) {}
3737

38-
bool PartitionsSaxHandler::StartObject() {
38+
bool PartitionsSaxHandler::on_object_begin(boost::json::error_code& ec) {
3939
if (state_ == State::kWaitForRootObject) {
4040
state_ = State::kWaitForRootPartitions;
41-
return continue_parsing_;
41+
return CanContinue(ec);
4242
}
4343

4444
if (state_ != State::kWaitForNextPartition) {
45-
return false;
45+
return NotSupported(ec);
4646
}
4747

4848
state_ = State::kProcessingAttribute;
4949

50-
return continue_parsing_;
50+
return CanContinue(ec);
5151
}
52-
53-
bool PartitionsSaxHandler::String(const char* str, unsigned int length, bool) {
52+
bool PartitionsSaxHandler::String(const std::string& str, error_code& ec) {
5453
switch (state_) {
5554
case State::kProcessingAttribute:
56-
state_ = ProcessNextAttribute(str, length);
57-
return continue_parsing_;
55+
state_ = ProcessNextAttribute(str);
56+
return CanContinue(ec);
5857

59-
case State::kWaitForRootPartitions:
60-
if (HashStringToInt("partitions") == HashStringToInt(str)) {
58+
case State::kWaitForRootPartitions: {
59+
if (HashStringToInt("partitions") == HashStringToInt(str.c_str())) {
6160
state_ = State::kWaitPartitionsArray;
62-
return continue_parsing_;
63-
} else {
64-
return false;
61+
return CanContinue(ec);
6562
}
63+
return NotSupported(ec);
64+
}
6665

6766
case State::kParsingPartitionName:
68-
partition_.SetPartition(std::string(str, length));
67+
partition_.SetPartition(str);
6968
break;
7069
case State::kParsingDataHandle:
71-
partition_.SetDataHandle(std::string(str, length));
70+
partition_.SetDataHandle(str);
7271
break;
7372
case State::kParsingChecksum:
74-
partition_.SetChecksum(std::string(str, length));
73+
partition_.SetChecksum(str);
7574
break;
7675
case State::kParsingCrc:
77-
partition_.SetCrc(std::string(str, length));
76+
partition_.SetCrc(str);
7877
break;
7978
case State::kParsingIgnoreAttribute:
8079
break;
@@ -92,72 +91,72 @@ bool PartitionsSaxHandler::String(const char* str, unsigned int length, bool) {
9291

9392
state_ = State::kProcessingAttribute;
9493

95-
return continue_parsing_;
94+
return CanContinue(ec);
9695
}
9796

98-
bool PartitionsSaxHandler::Uint(unsigned int value) {
97+
bool PartitionsSaxHandler::on_int64(const int64_t value, string_view,
98+
error_code& ec) {
9999
if (state_ == State::kParsingVersion) {
100100
partition_.SetVersion(value);
101101
} else if (state_ == State::kParsingDataSize) {
102102
partition_.SetDataSize(value);
103103
} else if (state_ == State::kParsingCompressedDataSize) {
104104
partition_.SetCompressedDataSize(value);
105105
} else {
106-
return false;
106+
return NotSupported(ec);
107107
}
108108

109109
state_ = State::kProcessingAttribute;
110-
return continue_parsing_;
110+
return CanContinue(ec);
111111
}
112112

113-
bool PartitionsSaxHandler::EndObject(unsigned int) {
113+
bool PartitionsSaxHandler::on_object_end(std::size_t, error_code& ec) {
114114
if (state_ == State::kWaitForRootObjectEnd) {
115115
state_ = State::kParsingComplete;
116-
return true; // complete
116+
return CanContinue(ec); // complete
117117
}
118118

119119
if (state_ != State::kProcessingAttribute) {
120-
return false;
120+
return NotSupported(ec);
121121
}
122122

123123
if (partition_.GetDataHandle().empty() || partition_.GetPartition().empty()) {
124-
return false; // partition is not valid
124+
return NotSupported(ec); // partition is not valid
125125
}
126126

127127
partition_callback_(std::move(partition_));
128128

129129
state_ = State::kWaitForNextPartition;
130130

131-
return continue_parsing_;
131+
return CanContinue(ec);
132132
}
133133

134-
bool PartitionsSaxHandler::StartArray() {
134+
bool PartitionsSaxHandler::on_array_begin(boost::json::error_code& ec) {
135135
// We expect only a single array in whol response
136136
if (state_ != State::kWaitPartitionsArray) {
137-
return false;
137+
return NotSupported(ec);
138138
}
139139

140140
state_ = State::kWaitForNextPartition;
141141

142-
return continue_parsing_;
142+
return CanContinue(ec);
143143
}
144144

145-
bool PartitionsSaxHandler::EndArray(unsigned int) {
145+
bool PartitionsSaxHandler::on_array_end(std::size_t,
146+
boost::json::error_code& ec) {
146147
if (state_ != State::kWaitForNextPartition) {
147-
return false;
148+
return NotSupported(ec);
148149
}
149150

150151
state_ = State::kWaitForRootObjectEnd;
151-
return continue_parsing_;
152+
return CanContinue(ec);
152153
}
153154

154-
bool PartitionsSaxHandler::Default() { return false; }
155-
156155
void PartitionsSaxHandler::Abort() { continue_parsing_.store(false); }
157156

158157
PartitionsSaxHandler::State PartitionsSaxHandler::ProcessNextAttribute(
159-
const char* name, unsigned int /*length*/) {
160-
switch (HashStringToInt(name)) {
158+
const std::string& name) {
159+
switch (HashStringToInt(name.c_str())) {
161160
case HashStringToInt("dataHandle"):
162161
return State::kParsingDataHandle;
163162
case HashStringToInt("partition"):

0 commit comments

Comments
 (0)