Skip to content

Commit 7d67ec4

Browse files
authored
Merge branch 'apache:main' into fix/issue-528
2 parents 9b05df1 + 69afa1a commit 7d67ec4

34 files changed

Lines changed: 1645 additions & 247 deletions

.github/workflows/ci-build-binary-artifacts.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ jobs:
5050
uses: actions/checkout@v3
5151

5252
- name: Set up QEMU
53-
uses: docker/setup-qemu-action@v2
53+
uses: docker/setup-qemu-action@ce360397dd3f832beb865e1373c09c0e9f86d70a
5454

5555
- name: Package Pulsar source
5656
run: build-support/generate-source-archive.sh
5757

58-
- uses: docker/setup-buildx-action@v2
58+
- uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd
5959

6060
- name: Build dependencies Docker image
61-
uses: docker/build-push-action@v3
61+
uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294
6262
with:
6363
context: ./pkg/${{matrix.pkg.type}}
6464
load: true

.github/workflows/ci-pr-validation.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,15 +395,15 @@ jobs:
395395
uses: actions/checkout@v3
396396

397397
- name: Set up QEMU
398-
uses: docker/setup-qemu-action@v2
398+
uses: docker/setup-qemu-action@ce360397dd3f832beb865e1373c09c0e9f86d70a
399399

400400
- name: Package Pulsar source
401401
run: build-support/generate-source-archive.sh
402402

403-
- uses: docker/setup-buildx-action@v2
403+
- uses: docker/setup-buildx-action@4d04d5d9486b7bd6fa91e7baf45bbb4f8b9deedd
404404

405405
- name: Build dependencies Docker image
406-
uses: docker/build-push-action@v3
406+
uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294
407407
with:
408408
context: ./pkg/${{matrix.pkg.type}}
409409
load: true

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ else() # GCC or Clang are mostly compatible:
9999
add_compile_options(-Wall -Wformat-security -Wvla -Werror)
100100
# Turn off certain warnings that are too much pain for too little gain:
101101
add_compile_options(-Wno-sign-compare -Wno-deprecated-declarations -Wno-error=cpp)
102-
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR APPLE)
102+
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
103103
add_compile_options(-msse4.2 -mpclmul)
104104
endif()
105105
# Options unique to Clang or GCC:
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef PULSAR_AUTO_CLUSTER_FAILOVER_H_
20+
#define PULSAR_AUTO_CLUSTER_FAILOVER_H_
21+
22+
#include <pulsar/ServiceInfoProvider.h>
23+
24+
#include <chrono>
25+
#include <cstdint>
26+
#include <functional>
27+
#include <memory>
28+
#include <vector>
29+
30+
namespace pulsar {
31+
32+
class Client;
33+
class AutoClusterFailoverImpl;
34+
35+
class PULSAR_PUBLIC AutoClusterFailover final : public ServiceInfoProvider {
36+
public:
37+
struct Config {
38+
const ServiceInfo primary;
39+
const std::vector<ServiceInfo> secondary;
40+
std::chrono::milliseconds checkInterval{5000}; // 5 seconds
41+
uint32_t failoverThreshold{1};
42+
uint32_t switchBackThreshold{1};
43+
44+
Config(ServiceInfo primary, std::vector<ServiceInfo> secondary)
45+
: primary(std::move(primary)), secondary(std::move(secondary)) {}
46+
};
47+
48+
/**
49+
* Builder helps create an AutoClusterFailover configuration.
50+
*
51+
* Example:
52+
* ServiceInfo primary{...};
53+
* std::vector<ServiceInfo> secondaries{...};
54+
* AutoClusterFailover provider = AutoClusterFailover::Builder(primary, secondaries)
55+
* .withCheckInterval(std::chrono::seconds(5))
56+
* .withFailoverThreshold(3)
57+
* .withSwitchBackThreshold(3)
58+
* .build();
59+
*
60+
* Notes:
61+
* - primary: the preferred cluster to use when available.
62+
* - secondary: ordered list of fallback clusters.
63+
* - checkInterval: frequency of health probes.
64+
* - failoverThreshold: the number of consecutive failed probes required before switching away from
65+
* the current cluster.
66+
* - switchBackThreshold: the number of consecutive successful probes to the primary required before
67+
* switching back from a secondary while that secondary remains available. If the active secondary
68+
* becomes unavailable and the primary is available, the implementation may switch back to the
69+
* primary immediately, regardless of this threshold.
70+
*/
71+
class Builder {
72+
public:
73+
Builder(ServiceInfo primary, std::vector<ServiceInfo> secondary)
74+
: config_(std::move(primary), std::move(secondary)) {}
75+
76+
// Set how frequently probes run against the active cluster(s). Default: 5 seconds.
77+
Builder& withCheckInterval(std::chrono::milliseconds interval) {
78+
config_.checkInterval = interval;
79+
return *this;
80+
}
81+
82+
// Set the number of consecutive failed probes required before attempting failover. Default: 1.
83+
Builder& withFailoverThreshold(uint32_t threshold) {
84+
config_.failoverThreshold = threshold;
85+
return *this;
86+
}
87+
88+
// Set the number of consecutive successful primary probes required before switching back from a
89+
// healthy secondary. If the active secondary becomes unavailable and the primary is available,
90+
// the implementation may switch back immediately regardless of this threshold. Default: 1.
91+
Builder& withSwitchBackThreshold(uint32_t threshold) {
92+
config_.switchBackThreshold = threshold;
93+
return *this;
94+
}
95+
96+
AutoClusterFailover build() { return AutoClusterFailover(std::move(config_)); }
97+
98+
private:
99+
Config config_;
100+
};
101+
102+
explicit AutoClusterFailover(Config&& config);
103+
104+
~AutoClusterFailover() final;
105+
106+
ServiceInfo initialServiceInfo() final;
107+
108+
void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) final;
109+
110+
private:
111+
std::shared_ptr<AutoClusterFailoverImpl> impl_;
112+
};
113+
114+
} // namespace pulsar
115+
116+
#endif

include/pulsar/Message.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
159159
*/
160160
bool hasOrderingKey() const;
161161

162+
/**
163+
* Check if the message has a null value.
164+
*
165+
* Messages with null values are used as tombstones on compacted topics
166+
* to delete the message for a specific key.
167+
*
168+
* @return true if the message has a null value (tombstone)
169+
* false if the message has actual payload data
170+
*/
171+
bool hasNullValue() const;
172+
162173
/**
163174
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
164175
* producer
@@ -197,6 +208,11 @@ class PULSAR_PUBLIC Message {
197208
*/
198209
const std::string& getSchemaVersion() const;
199210

211+
/**
212+
* Set the schema version of the message.
213+
*/
214+
void setSchemaVersion(const std::string& schemaVersion);
215+
200216
/**
201217
* Get the producer name which produced this message.
202218
*

include/pulsar/MessageBuilder.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
156156
*/
157157
MessageBuilder& disableReplication(bool flag);
158158

159+
/**
160+
* Mark the message as having a null value.
161+
*
162+
* This is used for messages on compacted topics where a null value
163+
* acts as a tombstone for a specific key, removing the message from
164+
* the compacted view.
165+
*
166+
* @return the message builder instance
167+
*/
168+
MessageBuilder& setNullValue();
169+
159170
/**
160171
* create a empty message, with no properties or data
161172
*

include/pulsar/c/message.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes
127127
*/
128128
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag);
129129

130+
/**
131+
* Mark the message as having a null value.
132+
*
133+
* This is used for messages on compacted topics where a null value
134+
* acts as a tombstone for a specific key, removing the message from
135+
* the compacted view.
136+
*/
137+
PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);
138+
130139
/// Accessor for built messages
131140

132141
/**
@@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
221230
*/
222231
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);
223232

233+
/**
234+
* Check if the message has a null value.
235+
*
236+
* Messages with null values are used as tombstones on compacted topics
237+
* to delete the message for a specific key.
238+
*
239+
* @return 1 if the message has a null value, 0 otherwise
240+
*/
241+
PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);
242+
224243
#ifdef __cplusplus
225244
}
226245
#endif

0 commit comments

Comments
 (0)