@@ -1045,5 +1045,130 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
10451045 client.close ();
10461046}
10471047
1048+ TEST (ReaderTest, testReadCompactedWithNullValue) {
1049+ Client client (serviceUrl);
1050+
1051+ const std::string topicName =
1052+ " persistent://public/default/testReadCompactedWithNullValue-" + std::to_string (time (nullptr ));
1053+
1054+ Producer producer;
1055+ ASSERT_EQ (ResultOk, client.createProducer (topicName, producer));
1056+
1057+ // Send messages with keys
1058+ ASSERT_EQ (ResultOk,
1059+ producer.send (MessageBuilder ().setPartitionKey (" key1" ).setContent (" value1" ).build ()));
1060+ ASSERT_EQ (ResultOk,
1061+ producer.send (MessageBuilder ().setPartitionKey (" key2" ).setContent (" value2" ).build ()));
1062+ ASSERT_EQ (ResultOk,
1063+ producer.send (MessageBuilder ().setPartitionKey (" key3" ).setContent (" value3" ).build ()));
1064+
1065+ // Send a tombstone (null value) for key2
1066+ auto tombstone = MessageBuilder ().setPartitionKey (" key2" ).setNullValue ().build ();
1067+ ASSERT_TRUE (tombstone.hasNullValue ());
1068+ ASSERT_EQ (tombstone.getLength (), 0 );
1069+ ASSERT_EQ (ResultOk, producer.send (tombstone));
1070+
1071+ // Update key1 with a new value
1072+ ASSERT_EQ (ResultOk,
1073+ producer.send (MessageBuilder ().setPartitionKey (" key1" ).setContent (" value1-updated" ).build ()));
1074+
1075+ // Trigger compaction via admin API
1076+ {
1077+ std::string compactUrl =
1078+ adminUrl + " admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
1079+ std::to_string (time (nullptr )) + " /compaction" ;
1080+ // Note: Compaction is async, we just trigger it
1081+ makePutRequest (compactUrl, " " );
1082+ }
1083+
1084+ // Create a reader with readCompacted enabled
1085+ ReaderConfiguration readerConf;
1086+ readerConf.setReadCompacted (true );
1087+ Reader reader;
1088+ ASSERT_EQ (ResultOk, client.createReader (topicName, MessageId::earliest (), readerConf, reader));
1089+
1090+ // Read all messages and verify we can detect null values
1091+ std::map<std::string, std::string> keyValues;
1092+ std::set<std::string> nullValueKeys;
1093+
1094+ for (int i = 0 ; i < 10 ; i++) {
1095+ bool hasMessageAvailable = false ;
1096+ ASSERT_EQ (ResultOk, reader.hasMessageAvailable (hasMessageAvailable));
1097+ if (!hasMessageAvailable) {
1098+ break ;
1099+ }
1100+
1101+ Message msg;
1102+ Result res = reader.readNext (msg, 3000 );
1103+ if (res != ResultOk) {
1104+ break ;
1105+ }
1106+
1107+ std::string key = msg.getPartitionKey ();
1108+ if (msg.hasNullValue ()) {
1109+ nullValueKeys.insert (key);
1110+ LOG_INFO (" Received null value (tombstone) for key: " << key);
1111+ } else {
1112+ keyValues[key] = msg.getDataAsString ();
1113+ LOG_INFO (" Received message for key: " << key << " , value: " << msg.getDataAsString ());
1114+ }
1115+ }
1116+
1117+ // Verify we received the tombstone for key2
1118+ // Note: Without compaction completing, we see all messages including the tombstone
1119+ // After compaction, we would only see the latest value for each key
1120+ ASSERT_TRUE (nullValueKeys.count (" key2" ) > 0 || keyValues.count (" key2" ) == 0 )
1121+ << " key2 should either have a null value or be absent after compaction" ;
1122+
1123+ producer.close ();
1124+ reader.close ();
1125+ client.close ();
1126+ }
1127+
1128+ TEST (ReaderTest, testNullValueMessageProperties) {
1129+ Client client (serviceUrl);
1130+
1131+ const std::string topicName =
1132+ " persistent://public/default/testNullValueMessageProperties-" + std::to_string (time (nullptr ));
1133+
1134+ Producer producer;
1135+ ASSERT_EQ (ResultOk, client.createProducer (topicName, producer));
1136+
1137+ // Send a null value message with properties
1138+ auto tombstone = MessageBuilder ()
1139+ .setPartitionKey (" user-123" )
1140+ .setNullValue ()
1141+ .setProperty (" reason" , " account-deleted" )
1142+ .setProperty (" deleted-by" , " admin" )
1143+ .build ();
1144+
1145+ ASSERT_TRUE (tombstone.hasNullValue ());
1146+ ASSERT_EQ (tombstone.getPartitionKey (), " user-123" );
1147+ ASSERT_EQ (tombstone.getProperty (" reason" ), " account-deleted" );
1148+ ASSERT_EQ (tombstone.getProperty (" deleted-by" ), " admin" );
1149+ ASSERT_EQ (tombstone.getLength (), 0 );
1150+
1151+ ASSERT_EQ (ResultOk, producer.send (tombstone));
1152+
1153+ // Create a reader and verify the message
1154+ ReaderConfiguration readerConf;
1155+ Reader reader;
1156+ ASSERT_EQ (ResultOk, client.createReader (topicName, MessageId::earliest (), readerConf, reader));
1157+
1158+ Message msg;
1159+ ASSERT_EQ (ResultOk, reader.readNext (msg, 5000 ));
1160+
1161+ // Verify all properties are preserved
1162+ ASSERT_TRUE (msg.hasNullValue ());
1163+ ASSERT_EQ (msg.getPartitionKey (), " user-123" );
1164+ ASSERT_EQ (msg.getProperty (" reason" ), " account-deleted" );
1165+ ASSERT_EQ (msg.getProperty (" deleted-by" ), " admin" );
1166+ ASSERT_EQ (msg.getLength (), 0 );
1167+
1168+ producer.close ();
1169+ reader.close ();
1170+ client.close ();
1171+ }
1172+
10481173INSTANTIATE_TEST_SUITE_P (Pulsar, ReaderTest, ::testing::Values(true , false ));
10491174INSTANTIATE_TEST_SUITE_P (Pulsar, ReaderSeekTest, ::testing::Values(true , false ));
0 commit comments