Skip to content

Commit 90b30f1

Browse files
asopov-hererustam-gamidov-here
authored andcommitted
Refactor PartitionsSaxHandler to use boost::json (#1670)
Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com>
1 parent 29f591d commit 90b30f1

File tree

8 files changed

+290
-177
lines changed

8 files changed

+290
-177
lines changed

olp-cpp-sdk-dataservice-read/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,24 @@ set(DESCRIPTION "C++ API library for reading OLP data")
2121
file(GLOB_RECURSE INC "include/*.h*")
2222
file(GLOB_RECURSE SRC "src/*.*")
2323

24+
25+
find_package(Boost REQUIRED)
26+
2427
add_library(${PROJECT_NAME}
2528
${SRC}
2629
${INC})
2730

31+
target_compile_definitions(${PROJECT_NAME}
32+
PRIVATE
33+
BOOST_ALL_NO_LIB
34+
BOOST_JSON_NO_LIB)
35+
2836
target_include_directories(${PROJECT_NAME}
2937
PUBLIC
3038
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
3139
$<INSTALL_INTERFACE:include>
3240
PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src>
41+
$<BUILD_INTERFACE:${Boost_INCLUDE_DIR}>
3342
PRIVATE ${olp-cpp-sdk-core_INCLUDE_DIRS})
3443

3544
target_link_libraries(${PROJECT_NAME}

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023-2025 HERE Europe B.V.
2+
* Copyright (C) 2023-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,38 +24,43 @@ namespace dataservice {
2424
namespace read {
2525
namespace repository {
2626

27-
RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
27+
JsonByteStream::Ch JsonByteStream::Peek() {
2828
if (ReadEmpty()) {
2929
SwapBuffers();
3030
}
3131
return read_buffer_[count_];
3232
}
3333

34-
RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
34+
boost::json::string_view JsonByteStream::ReadView() {
35+
if (ReadEmpty()) {
36+
SwapBuffers();
37+
}
38+
auto begin = read_buffer_.begin() + count_;
39+
auto terminator_it = std::find(begin, read_buffer_.end(), '\0');
40+
boost::json::string_view::size_type size =
41+
std::distance(begin, terminator_it);
42+
count_ += size;
43+
full_count_ += size;
44+
return {&*begin, size};
45+
}
46+
47+
JsonByteStream::Ch JsonByteStream::Take() {
3548
if (ReadEmpty()) {
3649
SwapBuffers();
3750
}
3851
full_count_++;
3952
return read_buffer_[count_++];
4053
}
4154

42-
size_t RapidJsonByteStream::Tell() const { return full_count_; }
43-
44-
// Not implemented
45-
char* RapidJsonByteStream::PutBegin() { return 0; }
46-
void RapidJsonByteStream::Put(char) {}
47-
void RapidJsonByteStream::Flush() {}
48-
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
55+
size_t JsonByteStream::Tell() const { return full_count_; }
4956

50-
bool RapidJsonByteStream::ReadEmpty() const {
51-
return count_ == read_buffer_.size();
52-
}
53-
bool RapidJsonByteStream::WriteEmpty() const {
57+
bool JsonByteStream::ReadEmpty() const { return count_ == read_buffer_.size(); }
58+
bool JsonByteStream::WriteEmpty() const {
5459
std::unique_lock<std::mutex> lock(mutex_);
5560
return write_buffer_.empty();
5661
}
5762

58-
void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
63+
void JsonByteStream::AppendContent(const char* content, size_t length) {
5964
std::unique_lock<std::mutex> lock(mutex_);
6065

6166
const auto buffer_size = write_buffer_.size();
@@ -65,7 +70,7 @@ void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
6570
cv_.notify_one();
6671
}
6772

68-
void RapidJsonByteStream::SwapBuffers() {
73+
void JsonByteStream::SwapBuffers() {
6974
std::unique_lock<std::mutex> lock(mutex_);
7075
cv_.wait(lock, [&]() { return !write_buffer_.empty(); });
7176
std::swap(read_buffer_, write_buffer_);
@@ -74,10 +79,9 @@ void RapidJsonByteStream::SwapBuffers() {
7479
}
7580

7681
AsyncJsonStream::AsyncJsonStream()
77-
: current_stream_(std::make_shared<RapidJsonByteStream>()),
78-
closed_{false} {}
82+
: current_stream_(std::make_shared<JsonByteStream>()), closed_{false} {}
7983

80-
std::shared_ptr<RapidJsonByteStream> AsyncJsonStream::GetCurrentStream() const {
84+
std::shared_ptr<JsonByteStream> AsyncJsonStream::GetCurrentStream() const {
8185
std::unique_lock<std::mutex> lock(mutex_);
8286
return current_stream_;
8387
}
@@ -96,7 +100,7 @@ void AsyncJsonStream::ResetStream(const char* content, size_t length) {
96100
return;
97101
}
98102
current_stream_->AppendContent("\0", 1);
99-
current_stream_ = std::make_shared<RapidJsonByteStream>();
103+
current_stream_ = std::make_shared<JsonByteStream>();
100104
current_stream_->AppendContent(content, length);
101105
}
102106

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023-2025 HERE Europe B.V.
2+
* Copyright (C) 2023-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,14 +25,15 @@
2525

2626
#include <olp/core/client/ApiError.h>
2727
#include <olp/core/porting/optional.h>
28+
#include <boost/json/string_view.hpp>
2829

2930
namespace olp {
3031
namespace dataservice {
3132
namespace read {
3233
namespace repository {
3334

3435
/// Json byte stream class. Implements rapidjson input stream concept.
35-
class RapidJsonByteStream {
36+
class JsonByteStream {
3637
public:
3738
typedef char Ch;
3839

@@ -43,15 +44,12 @@ class RapidJsonByteStream {
4344
/// character.
4445
Ch Take();
4546

47+
/// Return the view of current read buffer until the end of first \0 character
48+
boost::json::string_view ReadView();
49+
4650
/// Get the current read cursor.
4751
size_t Tell() const;
4852

49-
/// Not needed for reading.
50-
char* PutBegin();
51-
void Put(char);
52-
void Flush();
53-
size_t PutEnd(char*);
54-
5553
bool ReadEmpty() const;
5654
bool WriteEmpty() const;
5755

@@ -72,7 +70,7 @@ class AsyncJsonStream {
7270
public:
7371
AsyncJsonStream();
7472

75-
std::shared_ptr<RapidJsonByteStream> GetCurrentStream() const;
73+
std::shared_ptr<JsonByteStream> GetCurrentStream() const;
7674

7775
void AppendContent(const char* content, size_t length);
7876

@@ -86,7 +84,7 @@ class AsyncJsonStream {
8684

8785
private:
8886
mutable std::mutex mutex_;
89-
std::shared_ptr<RapidJsonByteStream> current_stream_;
87+
std::shared_ptr<JsonByteStream> current_stream_;
9088
porting::optional<client::ApiError> error_;
9189
bool closed_;
9290
};

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsRepository.cpp

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <olp/core/client/Condition.h>
2828
#include <olp/core/logging/Log.h>
2929
#include <boost/functional/hash.hpp>
30+
#include <boost/json/basic_parser_impl.hpp>
3031
#include "CatalogRepository.h"
3132
#include "generated/api/MetadataApi.h"
3233
#include "generated/api/QueryApi.h"
@@ -709,37 +710,44 @@ client::ApiNoResponse PartitionsRepository::ParsePartitionsStream(
709710
const std::shared_ptr<AsyncJsonStream>& async_stream,
710711
const PartitionsStreamCallback& partition_callback,
711712
client::CancellationContext context) {
712-
rapidjson::ParseResult parse_result;
713+
auto parse_result =
714+
boost::json::make_error_code(boost::json::error::incomplete);
713715

714716
// We must perform at least one attempt to parse.
715717
do {
716-
rapidjson::Reader reader;
717-
auto partitions_handler =
718-
std::make_shared<repository::PartitionsSaxHandler>(partition_callback);
719-
720-
auto reader_cancellation_token = client::CancellationToken([=]() {
721-
partitions_handler->Abort();
722-
async_stream->CloseStream(client::ApiError::Cancelled());
723-
});
724-
725-
if (!context.ExecuteOrCancelled(
726-
[=]() { return reader_cancellation_token; })) {
718+
auto parser =
719+
std::make_shared<boost::json::basic_parser<PartitionsSaxHandler>>(
720+
boost::json::parse_options{}, partition_callback);
721+
722+
auto reader_cancellation_token =
723+
client::CancellationToken([parser, &async_stream]() {
724+
parser->handler().Abort();
725+
async_stream->CloseStream(client::ApiError::Cancelled());
726+
});
727+
728+
if (!context.ExecuteOrCancelled([reader_cancellation_token]() {
729+
return reader_cancellation_token;
730+
})) {
727731
return client::ApiError::Cancelled();
728732
}
729733

730734
auto json_stream = async_stream->GetCurrentStream();
731735

732-
parse_result = reader.Parse<rapidjson::kParseIterativeFlag>(
733-
*json_stream, *partitions_handler);
736+
while (json_stream->Peek() != '\0') {
737+
auto view = json_stream->ReadView();
738+
if (parser->write_some(true, view.data(), view.size(), parse_result)) {
739+
parse_result = {};
740+
}
741+
}
734742
// Retry to parse the stream until it's closed.
735743
} while (!async_stream->IsClosed());
736744

737745
auto error = async_stream->GetError();
738746

739747
if (error) {
740748
return {*error};
741-
} else if (!parse_result) {
742-
return client::ApiError(parse_result.Code(), "Parsing error");
749+
} else if (parse_result.failed()) {
750+
return client::ApiError(parse_result.value(), "Parsing error");
743751
} else {
744752
return client::ApiNoResult{};
745753
}

0 commit comments

Comments
 (0)