Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ set(THREADS_PREFER_PTHREAD_FLAG TRUE)
find_package(Threads REQUIRED)
MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT})

if (NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 17)
endif ()

# Compiler specific configuration:
# https://stackoverflow.com/questions/10046114/in-cmake-how-can-i-test-if-the-compiler-is-clang
if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
Expand Down Expand Up @@ -112,9 +116,6 @@ set(AUTOGEN_DIR ${PROJECT_BINARY_DIR}/generated)
file(MAKE_DIRECTORY ${AUTOGEN_DIR})

if (INTEGRATE_VCPKG)
if (NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 11)
endif ()
set(CMAKE_C_STANDARD 11)
set(Boost_NO_BOOST_CMAKE ON)
find_package(Boost REQUIRED)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ cmake -B build -DINTEGRATE_VCPKG=ON
cmake --build build -j8
```

> - Before 4.0.0, C++11 is required.
> - Since 4.0.0, C++17 is required.

The 1st step will download vcpkg and then install all dependencies according to the version description in [vcpkg.json](./vcpkg.json). The 2nd step will build the Pulsar C++ libraries under `./build/lib/`, where `./build` is the CMake build directory.

> You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed.
Expand Down
5 changes: 2 additions & 3 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
auto weakSelf = weak_from_this();
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
auto self = weakSelf.lock();
if (self && !ec) {
if (auto self = weakSelf.lock(); self && !ec) {
auto consumer = consumer_.lock();
if (!consumer || consumer->isClosingOrClosed()) {
return;
Expand Down
19 changes: 9 additions & 10 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <openssl/x509.h>
#include <pulsar/MessageIdBuilder.h>

#include <boost/optional.hpp>
#include <fstream>

#include "AsioDefines.h"
Expand Down Expand Up @@ -1127,19 +1126,19 @@ void ClientConnection::sendPendingCommands() {

if (--pendingWriteOperations_ > 0) {
assert(!pendingWriteBuffers_.empty());
boost::any any = pendingWriteBuffers_.front();
auto any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();

auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
SharedBuffer buffer = std::any_cast<SharedBuffer>(any);
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler(
[this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));

auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
auto args = std::any_cast<std::shared_ptr<SendArguments>>(any);
BaseCommand outgoingCmd;
PairSharedBuffer buffer =
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
Expand Down Expand Up @@ -1702,9 +1701,9 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
data.schemaVersion = producerSuccess.schema_version();
}
if (producerSuccess.has_topic_epoch()) {
data.topicEpoch = boost::make_optional(producerSuccess.topic_epoch());
data.topicEpoch = std::make_optional(producerSuccess.topic_epoch());
} else {
data.topicEpoch = boost::none;
data.topicEpoch = {};
}
requestData.promise.setValue(data);
cancelTimer(*requestData.timer);
Expand Down Expand Up @@ -1805,7 +1804,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co
}
}

boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
if (closeProducer.has_assignedbrokerserviceurltls()) {
Expand All @@ -1814,10 +1813,10 @@ boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
} else if (closeProducer.has_assignedbrokerserviceurl()) {
return closeProducer.assignedbrokerserviceurl();
}
return boost::none;
return {};
}

boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer) {
if (tlsSocket_) {
if (closeConsumer.has_assignedbrokerserviceurltls()) {
Expand All @@ -1826,7 +1825,7 @@ boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
} else if (closeConsumer.has_assignedbrokerserviceurl()) {
return closeConsumer.assignedbrokerserviceurl();
}
return boost::none;
return {};
}

void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer& closeProducer) {
Expand Down
17 changes: 7 additions & 10 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#endif
Comment thread
BewareMyPower marked this conversation as resolved.
#include <boost/any.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <functional>
#include <memory>
Expand All @@ -53,6 +51,9 @@
#include "SharedBuffer.h"
#include "TimeUtils.h"
#include "UtilAllocator.h"

using std::optional;

namespace pulsar {

class PulsarFriend;
Expand Down Expand Up @@ -108,7 +109,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
boost::optional<uint64_t> topicEpoch;
optional<uint64_t> topicEpoch;
};

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
Expand Down Expand Up @@ -141,10 +142,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
ConnectionPool& pool, size_t poolIndex);
~ClientConnection();

#if __cplusplus < 201703L
std::weak_ptr<ClientConnection> weak_from_this() noexcept { return shared_from_this(); }
#endif

/*
* starts tcp connect_async
* @return future<ConnectionPtr> which is not yet set
Expand Down Expand Up @@ -378,7 +375,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::unique_lock<std::mutex> Lock;

// Pending buffers to write on the socket
std::deque<boost::any> pendingWriteBuffers_;
std::deque<std::any> pendingWriteBuffers_;
int pendingWriteOperations_ = 0;

SharedBuffer outgoingBuffer_;
Expand Down Expand Up @@ -426,8 +423,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
// This method must be called when `mutex_` is held
void unsafeRemovePendingRequest(long requestId);
Expand Down
5 changes: 3 additions & 2 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "OpSendMsg.h"
#include "PulsarApi.pb.h"
#include "Url.h"
#include "boost/throw_exception.hpp"
Comment thread
BewareMyPower marked this conversation as resolved.
#include "checksum/ChecksumProvider.h"

using namespace pulsar;
Expand Down Expand Up @@ -329,7 +330,7 @@ SharedBuffer Commands::newAuthResponse(const AuthenticationPtr& authentication,
SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
Expand Down Expand Up @@ -416,7 +417,7 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
ProducerAccessMode accessMode, optional<uint64_t> topicEpoch,
const std::string& initialSubscriptionName) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
Expand Down
23 changes: 13 additions & 10 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>

#include <boost/optional.hpp>
#include <optional>
#include <set>

#include "ProtoApiEnums.h"
Expand All @@ -41,6 +41,7 @@ class MessageIdImpl;
using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
class BitSet;
struct SendArguments;
using std::optional;

namespace proto {
class BaseCommand;
Expand Down Expand Up @@ -102,14 +103,16 @@ class Commands {
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, ChecksumType checksumType,
const SendArguments& args);

static SharedBuffer newSubscribe(
const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId, bool readCompacted,
const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties, const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition, bool replicateSubscriptionState,
const KeySharedPolicy& keySharedPolicy, int priorityLevel = 0);
static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription,
uint64_t consumerId, uint64_t requestId,
CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode, optional<MessageId> startMessageId,
bool readCompacted, const std::map<std::string, std::string>& metadata,
const std::map<std::string, std::string>& subscriptionProperties,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
bool replicateSubscriptionState, const KeySharedPolicy& keySharedPolicy,
int priorityLevel = 0);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand All @@ -118,7 +121,7 @@ class Commands {
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
bool userProvidedProducerName, bool encrypted,
ProducerAccessMode accessMode, boost::optional<uint64_t> topicEpoch,
ProducerAccessMode accessMode, optional<uint64_t> topicEpoch,
const std::string& initialSubscriptionName);

static SharedBuffer newAck(uint64_t consumerId, int64_t ledgerId, int64_t entryId, const BitSet& ackSet,
Expand Down
Loading
Loading