Skip to content

Commit f9995da

Browse files
authored
[fix][cpp-client] Pass seek error to callback when SEEK command fails (#549)
1 parent 69afa1a commit f9995da

3 files changed

Lines changed: 74 additions & 10 deletions

File tree

.github/workflows/ci-pr-validation.yaml

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,25 @@ jobs:
9999
key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
100100
restore-keys: vcpkg-${{ runner.os }}-
101101

102+
- name: Restore vcpkg downloads cache
103+
uses: actions/cache@v4
104+
with:
105+
path: vcpkg/downloads
106+
key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
107+
restore-keys: vcpkg-downloads-${{ runner.os }}-
108+
102109
- name: Build the project
103110
run: |
104-
cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON
105-
cmake --build build -j8
111+
for attempt in 1 2 3; do
112+
echo "Build attempt $attempt/3"
113+
if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON && cmake --build build -j8; then
114+
exit 0
115+
fi
116+
echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..."
117+
sleep 90
118+
done
119+
echo "All build attempts failed"
120+
exit 1
106121
107122
- name: Tidy check
108123
run: |
@@ -137,10 +152,25 @@ jobs:
137152
key: vcpkg-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
138153
restore-keys: vcpkg-${{ runner.os }}-
139154

155+
- name: Restore vcpkg downloads cache
156+
uses: actions/cache@v4
157+
with:
158+
path: vcpkg/downloads
159+
key: vcpkg-downloads-${{ runner.os }}-${{ hashFiles('vcpkg.json') }}
160+
restore-keys: vcpkg-downloads-${{ runner.os }}-
161+
140162
- name: Build core libraries
141163
run: |
142-
cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF
143-
cmake --build build -j8
164+
for attempt in 1 2 3; do
165+
echo "Build attempt $attempt/3"
166+
if cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF && cmake --build build -j8; then
167+
exit 0
168+
fi
169+
echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..."
170+
sleep 90
171+
done
172+
echo "All build attempts failed"
173+
exit 1
144174
145175
- name: Check formatting
146176
run: |
@@ -164,8 +194,16 @@ jobs:
164194

165195
- name: Build with Boost.Asio
166196
run: |
167-
cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
168-
cmake --build build-boost-asio -j8
197+
for attempt in 1 2 3; do
198+
echo "Build Boost.Asio attempt $attempt/3"
199+
if cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON && cmake --build build-boost-asio -j8; then
200+
exit 0
201+
fi
202+
echo "Attempt $attempt failed (e.g. vcpkg download 502), retrying in 90s..."
203+
sleep 90
204+
done
205+
echo "All build attempts failed"
206+
exit 1
169207
170208
- name: Build perf tools
171209
run: |

lib/ConsumerImpl.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,7 +1835,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
18351835
if (result == ResultOk) {
18361836
LockGuard lock(mutex_);
18371837
if (getCnx().expired() || reconnectionPending_) {
1838-
// It's during reconnection, complete the seek future after connection is established
1838+
// Reconnection path: delay the seek callback until connectionOpened. clearReceiveQueue()
1839+
// and handleCreateConsumer() (which clears incomingMessages_ under the lock) run before
1840+
// the seek callback is invoked, so hasMessageAvailable() after seek sees cleared state.
18391841
seekStatus_ = SeekStatus::COMPLETED;
18401842
LOG_INFO(getName() << "Delay the seek future until the reconnection is done");
18411843
} else {
@@ -1860,9 +1862,8 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
18601862
LockGuard lock{mutex_};
18611863
seekStatus_ = SeekStatus::NOT_STARTED;
18621864
lastSeekArg_ = previousLastSeekArg;
1863-
executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
1864-
callback(ResultOk);
1865-
});
1865+
executor_->postWork([self, callback{std::exchange(seekCallback_, std::nullopt).value()},
1866+
result]() { callback(result); });
18661867
}
18671868
});
18681869
}

tests/ConsumerSeekTest.cc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,31 @@ TEST_F(ConsumerSeekTest, testReconnectionSlow) {
258258
client.close();
259259
}
260260

261+
TEST_F(ConsumerSeekTest, testSeekFailureIsPropagated) {
262+
using namespace std::chrono_literals;
263+
264+
Client client(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(1));
265+
Consumer consumer;
266+
ASSERT_EQ(ResultOk, client.subscribe("testSeekFailureIsPropagated", "sub", consumer));
267+
268+
auto connection = *PulsarFriend::getConnections(client).begin();
269+
auto mockServer = std::make_shared<MockServer>(connection);
270+
connection->attachMockServer(mockServer);
271+
mockServer->setRequestDelay({{"SEEK", 5000}});
272+
273+
std::promise<Result> promise;
274+
auto future = promise.get_future();
275+
consumer.seekAsync(MessageId::earliest(), [&promise](Result result) { promise.set_value(result); });
276+
277+
// Cancel the mocked SEEK success so request completes with timeout.
278+
ASSERT_GE(mockServer->close(), 1);
279+
280+
ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
281+
ASSERT_EQ(future.get(), ResultTimeout);
282+
283+
client.close();
284+
}
285+
261286
INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
262287

263288
} // namespace pulsar

0 commit comments

Comments
 (0)