Skip to content

Commit 5483ca9

Browse files
grishafcursoragent
andauthored
feat: support null value messages (tombstones) for compacted topics (#304)
Add support for sending and detecting null value messages, which are used as tombstones on compacted topics to delete entries for specific keys. This wraps the C++ client's MessageBuilder::setNullValue() and Message::hasNullValue() APIs added in pulsar-client-cpp#563. Changes: - Bump pulsar-cpp dependency to 4.2.0 - Add pybind11 bindings for set_null_value and has_null_value - Allow Producer.send(None) to produce a null value message - Add Message.has_null_value() to detect tombstone messages - Skip schema encoding when content is None (mirrors Java client) - Add integration tests for null values, compaction, and table view Requires pulsar-client-cpp >= 4.2.0 (not yet released). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 0fc51a0 commit 5483ca9

3 files changed

Lines changed: 170 additions & 5 deletions

File tree

pulsar/__init__.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,19 @@ def producer_name(self) -> str:
366366
"""
367367
return self._message.producer_name()
368368

369+
def has_null_value(self) -> bool:
370+
"""
371+
Check if the message has a null value (tombstone).
372+
373+
Messages with null values are used on compacted topics to delete
374+
the entry for a specific key.
375+
376+
Returns
377+
----------
378+
True if the message has a null value, False otherwise.
379+
"""
380+
return self._message.has_null_value()
381+
369382
def encryption_context(self) -> EncryptionContext | None:
370383
"""
371384
Get the encryption context for this message or None if it's not encrypted.
@@ -1693,7 +1706,9 @@ def send(self, content,
16931706
----------
16941707
16951708
content:
1696-
A ``bytes`` object with the message payload.
1709+
A ``bytes`` object with the message payload, or ``None`` to send a null value
1710+
message (tombstone). Null value messages are used on compacted topics to delete
1711+
the entry for a specific key.
16971712
properties: optional
16981713
A dict of application-defined string properties.
16991714
partition_key: optional
@@ -1775,7 +1790,9 @@ def callback(res, msg_id):
17751790
----------
17761791
17771792
content
1778-
A `bytes` object with the message payload.
1793+
A ``bytes`` object with the message payload, or ``None`` to send a null value
1794+
message (tombstone). Null value messages are used on compacted topics to delete
1795+
the entry for a specific key.
17791796
callback
17801797
A callback that is invoked once the message has been acknowledged by the broker.
17811798
properties: optional
@@ -1823,9 +1840,12 @@ def close(self):
18231840
def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id,
18241841
replication_clusters, disable_replication, event_timestamp,
18251842
deliver_at, deliver_after):
1826-
data = self._schema.encode(content)
1843+
if content is not None:
1844+
data = self._schema.encode(content)
1845+
_check_type(bytes, data, 'data')
1846+
else:
1847+
data = None
18271848

1828-
_check_type(bytes, data, 'data')
18291849
_check_type_or_none(dict, properties, 'properties')
18301850
_check_type_or_none(str, partition_key, 'partition_key')
18311851
_check_type_or_none(str, ordering_key, 'ordering_key')
@@ -1837,7 +1857,10 @@ def _build_msg(self, content, properties, partition_key, ordering_key, sequence_
18371857
_check_type_or_none(timedelta, deliver_after, 'deliver_after')
18381858

18391859
mb = _pulsar.MessageBuilder()
1840-
mb.content(data)
1860+
if data is not None:
1861+
mb.content(data)
1862+
else:
1863+
mb.set_null_value()
18411864
if properties:
18421865
for k, v in properties.items():
18431866
mb.property(k, v)

src/message.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ void export_message(py::module_& m) {
4646
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference)
4747
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference)
4848
.def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference)
49+
.def("set_null_value", &MessageBuilder::setNullValue, return_value_policy::reference)
4950
.def("build", &MessageBuilder::build);
5051

5152
class_<MessageId>(m, "MessageId")
@@ -121,6 +122,7 @@ void export_message(py::module_& m) {
121122
.def("int_schema_version", &Message::getLongSchemaVersion)
122123
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy)
123124
.def("producer_name", &Message::getProducerName, return_value_policy::copy)
125+
.def("has_null_value", &Message::hasNullValue)
124126
.def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference);
125127

126128
MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,

tests/pulsar_test.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2140,6 +2140,146 @@ def router(msg: pulsar.Message, num_partitions: int):
21402140

21412141
client.close()
21422142

2143+
def test_null_value_message(self):
2144+
client = Client(self.serviceUrl)
2145+
topic = "null-value-%s" % uuid.uuid4()
2146+
producer = client.create_producer(topic, batching_enabled=False)
2147+
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)
2148+
2149+
producer.send(b"not null", partition_key="k1")
2150+
producer.send(None, partition_key="k2")
2151+
producer.send(b"also not null", partition_key="k3")
2152+
2153+
msg1 = consumer.receive(TM)
2154+
self.assertEqual(msg1.data(), b"not null")
2155+
self.assertFalse(msg1.has_null_value())
2156+
2157+
msg2 = consumer.receive(TM)
2158+
self.assertTrue(msg2.has_null_value())
2159+
self.assertEqual(msg2.data(), b"")
2160+
2161+
msg3 = consumer.receive(TM)
2162+
self.assertEqual(msg3.data(), b"also not null")
2163+
self.assertFalse(msg3.has_null_value())
2164+
2165+
consumer.close()
2166+
producer.close()
2167+
client.close()
2168+
2169+
def test_null_value_vs_empty_bytes(self):
2170+
client = Client(self.serviceUrl)
2171+
topic = "null-vs-empty-%s" % uuid.uuid4()
2172+
producer = client.create_producer(topic, batching_enabled=False)
2173+
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)
2174+
2175+
producer.send(b"", partition_key="k1")
2176+
producer.send(None, partition_key="k2")
2177+
2178+
msg1 = consumer.receive(TM)
2179+
self.assertFalse(msg1.has_null_value())
2180+
self.assertEqual(msg1.data(), b"")
2181+
2182+
msg2 = consumer.receive(TM)
2183+
self.assertTrue(msg2.has_null_value())
2184+
2185+
consumer.close()
2186+
producer.close()
2187+
client.close()
2188+
2189+
def test_null_value_compaction(self):
2190+
client = Client(self.serviceUrl)
2191+
topic = "null-compact-%s" % uuid.uuid4()
2192+
producer = client.create_producer(topic, batching_enabled=False)
2193+
2194+
consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
2195+
consumer.close()
2196+
2197+
# key1: value then tombstone -> removed after compaction
2198+
producer.send(b"hello-1", partition_key="key1")
2199+
producer.send(None, partition_key="key1")
2200+
2201+
# key2: value only -> survives
2202+
producer.send(b"hello-2", partition_key="key2")
2203+
2204+
# key3: value then tombstone -> removed
2205+
producer.send(b"hello-3", partition_key="key3")
2206+
producer.send(None, partition_key="key3")
2207+
2208+
# key4: value only -> survives
2209+
producer.send(b"hello-4", partition_key="key4")
2210+
producer.close()
2211+
2212+
url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
2213+
doHttpPut(url, "")
2214+
while True:
2215+
s = doHttpGet(url).decode("utf-8")
2216+
if "RUNNING" in s:
2217+
time.sleep(0.2)
2218+
else:
2219+
self.assertTrue("SUCCESS" in s)
2220+
break
2221+
2222+
# The compacted ledger cursor update is async on the broker side,
2223+
# wait for it to be persisted before reading.
2224+
time.sleep(1.0)
2225+
2226+
consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
2227+
messages = []
2228+
while True:
2229+
try:
2230+
msg = consumer.receive(2000)
2231+
messages.append(msg)
2232+
except pulsar.Timeout:
2233+
break
2234+
2235+
keys = [m.partition_key() for m in messages]
2236+
self.assertIn("key2", keys)
2237+
self.assertIn("key4", keys)
2238+
self.assertNotIn("key1", keys)
2239+
self.assertNotIn("key3", keys)
2240+
self.assertEqual(len(messages), 2)
2241+
2242+
consumer.close()
2243+
client.close()
2244+
2245+
def test_null_value_table_view(self):
2246+
client = Client(self.serviceUrl)
2247+
topic = "null-tv-%s" % uuid.uuid4()
2248+
producer = client.create_producer(topic, batching_enabled=False)
2249+
2250+
producer.send(b"hello", partition_key="key1")
2251+
2252+
tv = client.create_table_view(topic)
2253+
self.assertEqual(tv.get("key1"), b"hello")
2254+
2255+
producer.send(None, partition_key="key1")
2256+
for _ in range(50):
2257+
if tv.get("key1") is None:
2258+
break
2259+
time.sleep(0.1)
2260+
self.assertIsNone(tv.get("key1"))
2261+
2262+
tv.close()
2263+
producer.close()
2264+
client.close()
2265+
2266+
def test_null_value_with_properties(self):
2267+
client = Client(self.serviceUrl)
2268+
topic = "null-props-%s" % uuid.uuid4()
2269+
producer = client.create_producer(topic, batching_enabled=False)
2270+
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)
2271+
2272+
producer.send(None, partition_key="k1", properties={"action": "delete"})
2273+
2274+
msg = consumer.receive(TM)
2275+
self.assertTrue(msg.has_null_value())
2276+
self.assertEqual(msg.properties(), {"action": "delete"})
2277+
self.assertEqual(msg.partition_key(), "k1")
2278+
2279+
consumer.close()
2280+
producer.close()
2281+
client.close()
2282+
21432283

21442284
if __name__ == "__main__":
21452285
main()

0 commit comments

Comments
 (0)