Skip to content

Commit b8c75de

Browse files
committed
Fix message might lost when use listener (#406)
* Fix message might lost when use listener * Remove todo * code format * Remove consumer if null judge (cherry picked from commit b17a2e1)
1 parent 7f1cbf4 commit b8c75de

2 files changed

Lines changed: 25 additions & 37 deletions

File tree

src/Consumer.cc

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -77,32 +77,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListe
7777
Napi::Object msg = Message::NewInstance({}, data->cMessage);
7878
Consumer *consumer = data->consumer;
7979

80-
// `consumer` might be null in certain cases, segmentation fault might happend without this null check. We
81-
// need to handle this rare case in future.
82-
if (consumer) {
83-
Napi::Value ret;
84-
try {
85-
ret = jsCallback.Call({msg, consumer->Value()});
86-
} catch (std::exception &exception) {
87-
logMessageListenerError(consumer, exception.what());
88-
}
80+
Napi::Value ret;
81+
try {
82+
ret = jsCallback.Call({msg, consumer->Value()});
83+
} catch (std::exception &exception) {
84+
logMessageListenerError(consumer, exception.what());
85+
}
8986

90-
if (ret.IsPromise()) {
91-
Napi::Promise promise = ret.As<Napi::Promise>();
92-
Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
87+
if (ret.IsPromise()) {
88+
Napi::Promise promise = ret.As<Napi::Promise>();
89+
Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
9390

94-
ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) {
95-
Napi::Error error = info[0].As<Napi::Error>();
96-
logMessageListenerError(consumer, error.what());
97-
})});
91+
ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) {
92+
Napi::Error error = info[0].As<Napi::Error>();
93+
logMessageListenerError(consumer, error.what());
94+
})});
9895

99-
promise = ret.As<Napi::Promise>();
100-
Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
96+
promise = ret.As<Napi::Promise>();
97+
Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
10198

102-
finallyFunc.Call(
103-
promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })});
104-
return;
105-
}
99+
finallyFunc.Call(
100+
promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })});
101+
return;
106102
}
107103
data->callback();
108104
}
@@ -111,7 +107,7 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag
111107
std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
112108
MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx;
113109

114-
Consumer *consumer = (Consumer *)listenerCallback->consumer;
110+
Consumer *consumer = static_cast<Consumer *>(listenerCallback->consumerFuture.get());
115111

116112
if (listenerCallback->callback.Acquire() != napi_ok) {
117113
return;
@@ -135,7 +131,7 @@ void Consumer::SetListenerCallback(MessageListenerCallback *listener) {
135131
}
136132

137133
if (listener != nullptr) {
138-
listener->consumer = this;
134+
listener->consumerPromise.set_value(this);
139135
// If a consumer listener is set, the Consumer instance is kept alive even if it goes out of scope in JS
140136
// code.
141137
this->Ref();
@@ -168,23 +164,13 @@ struct ConsumerNewInstanceContext {
168164
auto cConsumer = std::shared_ptr<pulsar_consumer_t>(rawConsumer, pulsar_consumer_free);
169165
auto listener = consumerConfig->GetListenerCallback();
170166

171-
if (listener) {
172-
// pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer
173-
pulsar_consumer_pause_message_listener(cConsumer.get());
174-
}
175-
176167
deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) {
177168
Napi::Object obj = Consumer::constructor.New({});
178169
Consumer *consumer = Consumer::Unwrap(obj);
179170

180171
consumer->SetCConsumer(cConsumer);
181172
consumer->SetListenerCallback(listener);
182173

183-
if (listener) {
184-
// resume to enable MessageListener function callback
185-
resume_message_listener(cConsumer.get());
186-
}
187-
188174
return obj;
189175
});
190176
}

src/MessageListener.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121
#define MESSAGELISTENER_H
2222

2323
#include <napi.h>
24+
#include <future>
2425

2526
struct MessageListenerCallback {
2627
Napi::ThreadSafeFunction callback;
2728

28-
// Using consumer as void* since the ListenerCallback is shared between Config and Consumer.
29-
void *consumer;
29+
// Use future store consumer point, because need ensure sync.
30+
std::promise<void *> consumerPromise;
31+
std::shared_future<void *> consumerFuture;
3032

31-
MessageListenerCallback() : consumer(nullptr) {}
33+
MessageListenerCallback() : consumerPromise(), consumerFuture(consumerPromise.get_future()) {}
3234
};
3335

3436
#endif

0 commit comments

Comments
 (0)