Skip to content

Commit e3bf582

Browse files
authored
fix: ReaderListenerProxy will make a segfault (#376)
1 parent 81ab43d commit e3bf582

2 files changed

Lines changed: 54 additions & 11 deletions

File tree

src/Reader.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,19 @@ struct ReaderListenerProxyData {
6060
void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) {
6161
Napi::Object msg = Message::NewInstance({}, data->cMessage);
6262
Reader *reader = data->reader;
63-
64-
Napi::Value ret = jsCallback.Call({msg, reader->Value()});
65-
if (ret.IsPromise()) {
66-
Napi::Promise promise = ret.As<Napi::Promise>();
67-
Napi::Value thenValue = promise.Get("then");
68-
if (thenValue.IsFunction()) {
69-
Napi::Function then = thenValue.As<Napi::Function>();
70-
Napi::Function callback =
71-
Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
72-
then.Call(promise, {callback});
73-
return;
63+
// `reader` might be null in certain cases, segmentation fault might happend without this null check.
64+
if (reader) {
65+
Napi::Value ret = jsCallback.Call({msg, reader->Value()});
66+
if (ret.IsPromise()) {
67+
Napi::Promise promise = ret.As<Napi::Promise>();
68+
Napi::Value thenValue = promise.Get("then");
69+
if (thenValue.IsFunction()) {
70+
Napi::Function then = thenValue.As<Napi::Function>();
71+
Napi::Function callback =
72+
Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); });
73+
then.Call(promise, {callback});
74+
return;
75+
}
7476
}
7577
}
7678
data->callback();

tests/reader.test.js

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,5 +130,46 @@ const baseUrl = 'http://localhost:8080';
130130
await reader.close();
131131
await client.close();
132132
});
133+
134+
test('Reader should not throw segmentation fault when create and close', async () => {
135+
const NUM_ITS = 1000;
136+
const its = Array.from({ length: NUM_ITS }, (_, i) => i);
137+
138+
const client = new Pulsar.Client({
139+
serviceUrl: 'pulsar://localhost:6650',
140+
});
141+
142+
const producer = await client.createProducer({
143+
topic: 'persistent://public/default/my-topic',
144+
sendTimeoutMs: 30000,
145+
batchingEnabled: true,
146+
});
147+
148+
// Send messages
149+
for (let i = 0; i < 10; i += 1) {
150+
const msg = `my-message-${i}`;
151+
producer.send({
152+
data: Buffer.from(msg),
153+
});
154+
console.log(`Sent message: ${msg}`);
155+
}
156+
await producer.flush();
157+
158+
await Promise.all(
159+
its.map(async () => {
160+
const reader = await client.createReader({
161+
topic: 'persistent://public/default/my-topic',
162+
startMessageId: Pulsar.MessageId.earliest(),
163+
listener: (message) => {
164+
console.log(message.getData().toString());
165+
},
166+
});
167+
await reader.close();
168+
}),
169+
);
170+
await producer.close();
171+
await client.close();
172+
expect(true).toBe(true);
173+
});
133174
});
134175
})();

0 commit comments

Comments
 (0)