Skip to content

Commit 30d3c25

Browse files
committed
Fix TableView's existing key-value will never be updated
1 parent 0e1ed3b commit 30d3c25

8 files changed

Lines changed: 62 additions & 31 deletions

lib/ClientImpl.cc

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,11 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
222222
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
223223
CreateProducerCallback callback, ProducerImplBasePtr producer) {
224224
if (result == ResultOk) {
225-
auto pair = producers_.emplace(producer.get(), producer);
226-
if (!pair.second) {
227-
auto existingProducer = pair.first->second.lock();
225+
auto address = producer.get();
226+
auto existingProducer = producers_.create(address, producer);
227+
if (existingProducer) {
228228
LOG_ERROR("Unexpected existing producer at the same address: "
229-
<< pair.first->first << ", producer: "
230-
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
229+
<< address << ", producer: " << existingProducer.value().lock()->getProducerName());
231230
callback(ResultUnknownError, {});
232231
return;
233232
}
@@ -311,12 +310,11 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
311310
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
312311
auto consumer = weakConsumerPtr.lock();
313312
if (consumer) {
314-
auto pair = consumers_.emplace(consumer.get(), consumer);
315-
if (!pair.second) {
316-
auto existingConsumer = pair.first->second.lock();
313+
auto address = consumer.get();
314+
auto existingConsumer = consumers_.create(address, consumer);
315+
if (existingConsumer) {
317316
LOG_ERROR("Unexpected existing consumer at the same address: "
318-
<< pair.first->first
319-
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
317+
<< address << ", consumer: " << existingConsumer.value().lock()->getName());
320318
}
321319
} else {
322320
LOG_ERROR("Unexpected case: the consumer is somehow expired");
@@ -512,12 +510,11 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
512510
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
513511
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
514512
if (result == ResultOk) {
515-
auto pair = consumers_.emplace(consumer.get(), consumer);
516-
if (!pair.second) {
517-
auto existingConsumer = pair.first->second.lock();
513+
auto address = consumer.get();
514+
auto existingConsumer = consumers_.create(address, consumer);
515+
if (existingConsumer) {
518516
LOG_ERROR("Unexpected existing consumer at the same address: "
519-
<< pair.first->first
520-
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
517+
<< address << ", consumer: " << existingConsumer.value().lock()->getName());
521518
callback(ResultUnknownError, {});
522519
return;
523520
}

lib/ConsumerImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
619619
return;
620620
}
621621
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
622-
possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
622+
possibleSendToDeadLetterTopicMessages_.update(m.getMessageId(), std::vector<Message>{m});
623623
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
624624
redeliverUnacknowledgedMessages({m.getMessageId()});
625625
increaseAvailablePermits(cnx);
@@ -786,7 +786,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
786786
}
787787

788788
if (!possibleToDeadLetter.empty()) {
789-
possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
789+
possibleSendToDeadLetterTopicMessages_.update(batchedMessage.getMessageId(), possibleToDeadLetter);
790790
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
791791
redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
792792
}

lib/MultiTopicsConsumerImpl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
260260
consumer->getConsumerCreatedFuture().addListener(std::bind(
261261
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
262262
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
263-
consumers_.emplace(topicName->toString(), consumer);
263+
consumers_.update(topicName->toString(), consumer);
264264
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
265265
consumer->start();
266266

@@ -287,7 +287,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
287287
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
288288
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
289289
consumer->setPartitionIndex(i);
290-
consumers_.emplace(topicPartitionName, consumer);
290+
consumers_.update(topicPartitionName, consumer);
291291
LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
292292
consumer->start();
293293
}
@@ -1063,7 +1063,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
10631063
});
10641064
consumer->setPartitionIndex(partitionIndex);
10651065
consumer->start();
1066-
consumers_.emplace(topicPartitionName, consumer);
1066+
consumers_.update(topicPartitionName, consumer);
10671067
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
10681068
<< " consumerSize: " << consumers_.size());
10691069
}

lib/SynchronizedHashMap.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,22 @@ class SynchronizedHashMap {
5959
}
6060
}
6161

62-
template <typename... Args>
63-
std::pair<Iterator, bool> emplace(Args&&... args) {
62+
// Create a new key-value pair if the key does not exist.
63+
// Return boost::none if the key already exists or the existing value.
64+
OptValue create(const K& key, const V& value) {
6465
Lock lock(mutex_);
65-
return data_.emplace(std::forward<Args>(args)...);
66+
auto pair = data_.emplace(key, value);
67+
if (pair.second) {
68+
return boost::none;
69+
} else {
70+
return pair.first->second;
71+
}
72+
}
73+
74+
// Update the key with a new value no matter if the key exists.
75+
void update(const K& key, const V& value) {
76+
Lock lock(mutex_);
77+
data_[key] = value;
6678
}
6779

6880
void forEach(std::function<void(const K&, const V&)> f) const {

lib/TableViewImpl.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ void TableViewImpl::handleMessage(const Message& msg) {
104104
if (msg.getLength() == 0) {
105105
data_.remove(msg.getPartitionKey());
106106
} else {
107-
data_.emplace(msg.getPartitionKey(), value);
107+
data_.update(msg.getPartitionKey(), value);
108108
}
109109

110110
Lock lock(listenersMutex_);
@@ -167,4 +167,4 @@ void TableViewImpl::readTailMessage() {
167167
});
168168
}
169169

170-
} // namespace pulsar
170+
} // namespace pulsar

tests/SynchronizedHashMapTest.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <algorithm>
2222
#include <atomic>
23+
#include <boost/optional/optional_io.hpp>
2324
#include <chrono>
2425
#include <thread>
2526
#include <vector>
@@ -100,18 +101,27 @@ TEST(SynchronizedHashMapTest, testForEach) {
100101
ASSERT_TRUE(values.empty());
101102
ASSERT_EQ(result, 1);
102103

103-
m.emplace(1, 100);
104+
ASSERT_EQ(m.create(1, 100), boost::none);
105+
ASSERT_EQ(m.create(1, 101), boost::optional<int>(100));
104106
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
105107
[&result] { result = 2; });
106108
ASSERT_EQ(values, (std::vector<int>({100})));
107109
ASSERT_EQ(result, 1);
108110

111+
m.update(1, 102);
109112
values.clear();
110-
m.emplace(2, 200);
113+
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
114+
[&result] { result = 2; });
115+
ASSERT_EQ(values, (std::vector<int>({102})));
116+
ASSERT_EQ(result, 1);
117+
118+
values.clear();
119+
ASSERT_EQ(m.create(2, 200), boost::none);
120+
ASSERT_EQ(m.create(2, 201), boost::optional<int>(200));
111121
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
112122
[&result] { result = 2; });
113123
std::sort(values.begin(), values.end());
114-
ASSERT_EQ(values, (std::vector<int>({100, 200})));
124+
ASSERT_EQ(values, (std::vector<int>({102, 200})));
115125
ASSERT_EQ(result, 1);
116126
}
117127

tests/TableViewTest.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,22 @@ TEST(TableViewTest, testSimpleTableView) {
9696

9797
// assert interfaces.
9898
std::string value;
99+
ASSERT_TRUE(tableView.containsKey("key1"));
99100
ASSERT_TRUE(tableView.getValue("key1", value));
100101
ASSERT_EQ(value, "value1");
102+
103+
// Test value update
104+
ASSERT_EQ(ResultOk,
105+
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-update").build()));
106+
ASSERT_TRUE(waitUntil(std::chrono::seconds(2), [&tableView]() {
107+
std::string value;
108+
tableView.getValue("key1", value);
109+
return value == "value1-update";
110+
}));
111+
112+
// retrieveValue will remove the key/value from the table view.
101113
ASSERT_TRUE(tableView.retrieveValue("key1", value));
102-
ASSERT_EQ(value, "value1");
114+
ASSERT_EQ(value, "value1-update");
103115
ASSERT_FALSE(tableView.containsKey("key1"));
104116
ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1);
105117
ASSERT_EQ(tableView.size(), 0);

tests/extensibleLM/ExtensibleLoadManagerTest.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
116116
ASSERT_EQ(sendResult, ResultOk);
117117
ASSERT_TRUE(elapsed < maxWaitTimeMs);
118118

119-
producedMsgs.emplace(i, i);
119+
producedMsgs.update(i, i);
120120
i++;
121121
}
122122
LOG_INFO("producer finished");
@@ -143,7 +143,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
143143
LOG_INFO("acked i:" << i << " " << elapsed << " ms");
144144
ASSERT_TRUE(elapsed < maxWaitTimeMs);
145145
ASSERT_EQ(ackResult, ResultOk);
146-
consumedMsgs.emplace(i, i);
146+
consumedMsgs.update(i, i);
147147
}
148148
LOG_INFO("consumer finished");
149149
};

0 commit comments

Comments
 (0)