Skip to content

Commit 83e0549

Browse files
authored
Refactor PartitionsSaxHandler to use boost::json (#1670)
* Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> * Refactor PartitionsSaxHandler to use boost::json Previously it was based on rapidjson The tests are adjusted accordingly. Relates-To: OCMAM-448 Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com> --------- Signed-off-by: Alexander Sopov <ext-alexander.sopov@here.com>
1 parent d6b6721 commit 83e0549

8 files changed

Lines changed: 291 additions & 178 deletions

File tree

olp-cpp-sdk-dataservice-read/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,24 @@ set(DESCRIPTION "C++ API library for reading OLP data")
2121
file(GLOB_RECURSE INC "include/*.h*")
2222
file(GLOB_RECURSE SRC "src/*.*")
2323

24+
25+
find_package(Boost REQUIRED)
26+
2427
add_library(${PROJECT_NAME}
2528
${SRC}
2629
${INC})
2730

31+
target_compile_definitions(${PROJECT_NAME}
32+
PRIVATE
33+
BOOST_ALL_NO_LIB
34+
BOOST_JSON_NO_LIB)
35+
2836
target_include_directories(${PROJECT_NAME}
2937
PUBLIC
3038
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
3139
$<INSTALL_INTERFACE:include>
3240
PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src>
41+
$<BUILD_INTERFACE:${Boost_INCLUDE_DIR}>
3342
PRIVATE ${olp-cpp-sdk-core_INCLUDE_DIRS})
3443

3544
target_link_libraries(${PROJECT_NAME}

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023-2025 HERE Europe B.V.
2+
* Copyright (C) 2023-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,38 +24,43 @@ namespace dataservice {
2424
namespace read {
2525
namespace repository {
2626

27-
RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
27+
JsonByteStream::Ch JsonByteStream::Peek() {
2828
if (ReadEmpty()) {
2929
SwapBuffers();
3030
}
3131
return read_buffer_[count_];
3232
}
3333

34-
RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
34+
boost::json::string_view JsonByteStream::ReadView() {
35+
if (ReadEmpty()) {
36+
SwapBuffers();
37+
}
38+
auto begin = read_buffer_.begin() + count_;
39+
auto terminator_it = std::find(begin, read_buffer_.end(), '\0');
40+
boost::json::string_view::size_type size =
41+
std::distance(begin, terminator_it);
42+
count_ += size;
43+
full_count_ += size;
44+
return {&*begin, size};
45+
}
46+
47+
JsonByteStream::Ch JsonByteStream::Take() {
3548
if (ReadEmpty()) {
3649
SwapBuffers();
3750
}
3851
full_count_++;
3952
return read_buffer_[count_++];
4053
}
4154

42-
size_t RapidJsonByteStream::Tell() const { return full_count_; }
43-
44-
// Not implemented
45-
char* RapidJsonByteStream::PutBegin() { return 0; }
46-
void RapidJsonByteStream::Put(char) {}
47-
void RapidJsonByteStream::Flush() {}
48-
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
55+
size_t JsonByteStream::Tell() const { return full_count_; }
4956

50-
bool RapidJsonByteStream::ReadEmpty() const {
51-
return count_ == read_buffer_.size();
52-
}
53-
bool RapidJsonByteStream::WriteEmpty() const {
57+
bool JsonByteStream::ReadEmpty() const { return count_ == read_buffer_.size(); }
58+
bool JsonByteStream::WriteEmpty() const {
5459
std::unique_lock<std::mutex> lock(mutex_);
5560
return write_buffer_.empty();
5661
}
5762

58-
void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
63+
void JsonByteStream::AppendContent(const char* content, size_t length) {
5964
std::unique_lock<std::mutex> lock(mutex_);
6065

6166
const auto buffer_size = write_buffer_.size();
@@ -65,7 +70,7 @@ void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
6570
cv_.notify_one();
6671
}
6772

68-
void RapidJsonByteStream::SwapBuffers() {
73+
void JsonByteStream::SwapBuffers() {
6974
std::unique_lock<std::mutex> lock(mutex_);
7075
cv_.wait(lock, [&]() { return !write_buffer_.empty(); });
7176
std::swap(read_buffer_, write_buffer_);
@@ -74,10 +79,9 @@ void RapidJsonByteStream::SwapBuffers() {
7479
}
7580

7681
AsyncJsonStream::AsyncJsonStream()
77-
: current_stream_(std::make_shared<RapidJsonByteStream>()),
78-
closed_{false} {}
82+
: current_stream_(std::make_shared<JsonByteStream>()), closed_{false} {}
7983

80-
std::shared_ptr<RapidJsonByteStream> AsyncJsonStream::GetCurrentStream() const {
84+
std::shared_ptr<JsonByteStream> AsyncJsonStream::GetCurrentStream() const {
8185
std::unique_lock<std::mutex> lock(mutex_);
8286
return current_stream_;
8387
}
@@ -96,7 +100,7 @@ void AsyncJsonStream::ResetStream(const char* content, size_t length) {
96100
return;
97101
}
98102
current_stream_->AppendContent("\0", 1);
99-
current_stream_ = std::make_shared<RapidJsonByteStream>();
103+
current_stream_ = std::make_shared<JsonByteStream>();
100104
current_stream_->AppendContent(content, length);
101105
}
102106

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023-2025 HERE Europe B.V.
2+
* Copyright (C) 2023-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,14 +25,15 @@
2525

2626
#include <olp/core/client/ApiError.h>
2727
#include <olp/core/porting/optional.h>
28+
#include <boost/json/string_view.hpp>
2829

2930
namespace olp {
3031
namespace dataservice {
3132
namespace read {
3233
namespace repository {
3334

3435
/// Json byte stream class. Implements rapidjson input stream concept.
35-
class RapidJsonByteStream {
36+
class JsonByteStream {
3637
public:
3738
typedef char Ch;
3839

@@ -43,15 +44,12 @@ class RapidJsonByteStream {
4344
/// character.
4445
Ch Take();
4546

47+
/// Return the view of current read buffer until the end of first \0 character
48+
boost::json::string_view ReadView();
49+
4650
/// Get the current read cursor.
4751
size_t Tell() const;
4852

49-
/// Not needed for reading.
50-
char* PutBegin();
51-
void Put(char);
52-
void Flush();
53-
size_t PutEnd(char*);
54-
5553
bool ReadEmpty() const;
5654
bool WriteEmpty() const;
5755

@@ -72,7 +70,7 @@ class AsyncJsonStream {
7270
public:
7371
AsyncJsonStream();
7472

75-
std::shared_ptr<RapidJsonByteStream> GetCurrentStream() const;
73+
std::shared_ptr<JsonByteStream> GetCurrentStream() const;
7674

7775
void AppendContent(const char* content, size_t length);
7876

@@ -86,7 +84,7 @@ class AsyncJsonStream {
8684

8785
private:
8886
mutable std::mutex mutex_;
89-
std::shared_ptr<RapidJsonByteStream> current_stream_;
87+
std::shared_ptr<JsonByteStream> current_stream_;
9088
porting::optional<client::ApiError> error_;
9189
bool closed_;
9290
};

olp-cpp-sdk-dataservice-read/src/repositories/PartitionsRepository.cpp

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2019-2024 HERE Europe B.V.
2+
* Copyright (C) 2019-2026 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
#include <olp/core/client/Condition.h>
2727
#include <olp/core/logging/Log.h>
2828
#include <boost/functional/hash.hpp>
29+
#include <boost/json/basic_parser_impl.hpp>
2930
#include "CatalogRepository.h"
3031
#include "generated/api/MetadataApi.h"
3132
#include "generated/api/QueryApi.h"
@@ -648,37 +649,44 @@ client::ApiNoResponse PartitionsRepository::ParsePartitionsStream(
648649
const std::shared_ptr<AsyncJsonStream>& async_stream,
649650
const PartitionsStreamCallback& partition_callback,
650651
client::CancellationContext context) {
651-
rapidjson::ParseResult parse_result;
652+
auto parse_result =
653+
boost::json::make_error_code(boost::json::error::incomplete);
652654

653655
// We must perform at least one attempt to parse.
654656
do {
655-
rapidjson::Reader reader;
656-
auto partitions_handler =
657-
std::make_shared<repository::PartitionsSaxHandler>(partition_callback);
658-
659-
auto reader_cancellation_token = client::CancellationToken([=]() {
660-
partitions_handler->Abort();
661-
async_stream->CloseStream(client::ApiError::Cancelled());
662-
});
663-
664-
if (!context.ExecuteOrCancelled(
665-
[=]() { return reader_cancellation_token; })) {
657+
auto parser =
658+
std::make_shared<boost::json::basic_parser<PartitionsSaxHandler>>(
659+
boost::json::parse_options{}, partition_callback);
660+
661+
auto reader_cancellation_token =
662+
client::CancellationToken([parser, &async_stream]() {
663+
parser->handler().Abort();
664+
async_stream->CloseStream(client::ApiError::Cancelled());
665+
});
666+
667+
if (!context.ExecuteOrCancelled([reader_cancellation_token]() {
668+
return reader_cancellation_token;
669+
})) {
666670
return client::ApiError::Cancelled();
667671
}
668672

669673
auto json_stream = async_stream->GetCurrentStream();
670674

671-
parse_result = reader.Parse<rapidjson::kParseIterativeFlag>(
672-
*json_stream, *partitions_handler);
675+
while (json_stream->Peek() != '\0') {
676+
auto view = json_stream->ReadView();
677+
if (parser->write_some(true, view.data(), view.size(), parse_result)) {
678+
parse_result = {};
679+
}
680+
}
673681
// Retry to parse the stream until it's closed.
674682
} while (!async_stream->IsClosed());
675683

676684
auto error = async_stream->GetError();
677685

678686
if (error) {
679687
return {*error};
680-
} else if (!parse_result) {
681-
return client::ApiError(parse_result.Code(), "Parsing error");
688+
} else if (parse_result.failed()) {
689+
return client::ApiError(parse_result.value(), "Parsing error");
682690
} else {
683691
return client::ApiNoResult{};
684692
}

0 commit comments

Comments
 (0)