@@ -992,6 +992,218 @@ FIXTURE_TEST(test_filtering_of_segment_merge, bucket_view_fixture) {
992992 BOOST_REQUIRE_EQUAL (filtered_anomalies, filtered_from_partials);
993993}
994994
995+ FIXTURE_TEST (test_filtering_of_compacted_reupload, bucket_view_fixture) {
996+ /*
997+ * Test for race between scrubber and compacted segment reupload:
998+ * 1. Scrubber downloads stm manifest and records segment S as missing
999+ * 2. Compacted reupload replaces S with S' at the same offset range
1000+ * but different size
1001+ * 3. GC deletes the original segment S from cloud storage
1002+ * 4. process_anomalies must recognise S as a false positive because
1003+ * S' now covers the range
1004+ *
1005+ * Before the fix, process_anomalies only checked offset range
1006+ * existence and would keep the anomaly since S' has the same offsets.
1007+ * The fix compares generate_remote_segment_name() which encodes
1008+ * size_bytes, so a replacement with different size is detected.
1009+ */
1010+ constexpr std::string_view stm_man = R"json(
1011+ {
1012+ "version": 3,
1013+ "namespace": "kafka",
1014+ "topic": "panda-topic",
1015+ "partition": 0,
1016+ "revision": 1,
1017+ "start_offset": 0,
1018+ "last_offset": 29,
1019+ "insync_offset": 100,
1020+ "segments": {
1021+ "0-1-v1.log": {
1022+ "size_bytes": 1024,
1023+ "base_offset": 0,
1024+ "committed_offset": 9,
1025+ "base_timestamp": 1000,
1026+ "max_timestamp": 1000,
1027+ "delta_offset": 0,
1028+ "delta_offset_end": 2,
1029+ "ntp_revision": 1,
1030+ "archiver_term": 1,
1031+ "segment_term": 1,
1032+ "sname_format": 2
1033+ },
1034+ "10-1-v1.log": {
1035+ "size_bytes": 1024,
1036+ "base_offset": 10,
1037+ "committed_offset": 19,
1038+ "base_timestamp": 1000,
1039+ "max_timestamp": 1000,
1040+ "delta_offset": 2,
1041+ "delta_offset_end": 4,
1042+ "ntp_revision": 1,
1043+ "archiver_term": 1,
1044+ "segment_term": 1,
1045+ "sname_format": 2
1046+ },
1047+ "20-1-v1.log": {
1048+ "size_bytes": 1024,
1049+ "base_offset": 20,
1050+ "committed_offset": 29,
1051+ "base_timestamp": 1000,
1052+ "max_timestamp": 1000,
1053+ "delta_offset": 4,
1054+ "delta_offset_end": 6,
1055+ "ntp_revision": 1,
1056+ "archiver_term": 1,
1057+ "segment_term": 1,
1058+ "sname_format": 2
1059+ }
1060+ }
1061+ }
1062+ )json" ;
1063+
1064+ init_view (stm_man, {});
1065+
1066+ // Target the middle segment for the reupload scenario.
1067+ const auto original_seg = *std::next (get_stm_manifest ().begin ());
1068+ BOOST_REQUIRE_EQUAL (original_seg.base_offset , model::offset{10 });
1069+
1070+ // Remove the original segment from cloud storage so the detector
1071+ // reports it as missing.
1072+ remove_segment (get_stm_manifest (), original_seg);
1073+
1074+ const auto result = run_detector (archival::run_quota_t {100 });
1075+ BOOST_REQUIRE_EQUAL (result.status , cloud_storage::scrub_status::full);
1076+ BOOST_REQUIRE (result.detected .has_value ());
1077+ BOOST_REQUIRE_EQUAL (result.detected .missing_segments .size (), 1 );
1078+
1079+ // Simulate a compacted reupload: replace the segment with one at the
1080+ // same offset range but smaller size (compaction removed tombstones).
1081+ cloud_storage::segment_meta compacted_seg = original_seg;
1082+ compacted_seg.size_bytes = original_seg.size_bytes / 2 ;
1083+ compacted_seg.is_compacted = true ;
1084+
1085+ BOOST_REQUIRE (
1086+ get_stm_manifest_mut ().safe_segment_meta_to_add (compacted_seg));
1087+ BOOST_REQUIRE (get_stm_manifest_mut ().add (compacted_seg));
1088+
1089+ // Verify the segment name actually differs (the name encodes size for
1090+ // sname_format v2/v3).
1091+ BOOST_REQUIRE_NE (
1092+ cloud_storage::partition_manifest::generate_remote_segment_name (
1093+ original_seg),
1094+ cloud_storage::partition_manifest::generate_remote_segment_name (
1095+ compacted_seg));
1096+
1097+ // process_anomalies should filter out the missing segment because the
1098+ // manifest entry now refers to a different (compacted) segment.
1099+ get_stm_manifest_mut ().process_anomalies (
1100+ model::timestamp::now (),
1101+ result.last_scrubbed_offset ,
1102+ result.status ,
1103+ result.detected );
1104+
1105+ const auto & filtered = get_stm_manifest ().detected_anomalies ();
1106+ BOOST_REQUIRE_EQUAL (filtered.missing_segments .size (), 0 );
1107+ BOOST_REQUIRE (!filtered.has_value ());
1108+ }
1109+
1110+ FIXTURE_TEST (
1111+ test_filtering_of_metadata_anomaly_after_compacted_reupload,
1112+ bucket_view_fixture) {
1113+ /*
1114+ * Same race as test_filtering_of_compacted_reupload but for metadata
1115+ * anomalies instead of missing segments:
1116+ * 1. Scrubber sees segments S0 [0-9] and S1 [10-19] with a
1117+ * non-monotonic delta offset → records a segment_metadata_anomaly
1118+ * on S1
1119+ * 2. Compacted reupload replaces S1 with S1' (same offset range,
1120+ * different size)
1121+ * 3. process_anomalies must filter the anomaly because the segment
1122+ * identity changed (generate_remote_segment_name encodes size)
1123+ */
1124+ constexpr std::string_view stm_man = R"json(
1125+ {
1126+ "version": 3,
1127+ "namespace": "kafka",
1128+ "topic": "panda-topic",
1129+ "partition": 0,
1130+ "revision": 1,
1131+ "start_offset": 0,
1132+ "last_offset": 19,
1133+ "insync_offset": 100,
1134+ "segments": {
1135+ "0-1-v1.log": {
1136+ "size_bytes": 1024,
1137+ "base_offset": 0,
1138+ "committed_offset": 9,
1139+ "base_timestamp": 1000,
1140+ "max_timestamp": 1000,
1141+ "delta_offset": 4,
1142+ "delta_offset_end": 6,
1143+ "ntp_revision": 1,
1144+ "archiver_term": 1,
1145+ "segment_term": 1,
1146+ "sname_format": 2
1147+ },
1148+ "10-1-v1.log": {
1149+ "size_bytes": 1024,
1150+ "base_offset": 10,
1151+ "committed_offset": 19,
1152+ "base_timestamp": 1000,
1153+ "max_timestamp": 1000,
1154+ "delta_offset": 2,
1155+ "delta_offset_end": 4,
1156+ "ntp_revision": 1,
1157+ "archiver_term": 1,
1158+ "segment_term": 1,
1159+ "sname_format": 2
1160+ }
1161+ }
1162+ }
1163+ )json" ;
1164+
1165+ init_view (stm_man, {});
1166+
1167+ // S1 has delta_offset=2 while S0 has delta_offset=4 → non-monotonic.
1168+ const auto result = run_detector (archival::run_quota_t {100 });
1169+ BOOST_REQUIRE_EQUAL (result.status , cloud_storage::scrub_status::full);
1170+ BOOST_REQUIRE (result.detected .has_value ());
1171+ BOOST_REQUIRE_EQUAL (result.detected .segment_metadata_anomalies .size (), 1 );
1172+
1173+ const auto & anomaly = *result.detected .segment_metadata_anomalies .begin ();
1174+ BOOST_REQUIRE_EQUAL (
1175+ anomaly.type , cloud_storage::anomaly_type::non_monotonical_delta);
1176+
1177+ // Simulate compacted reupload of S1: same offset range and deltas
1178+ // (compaction doesn't change delta offsets), but smaller size.
1179+ const auto original_seg = *std::next (get_stm_manifest ().begin ());
1180+ BOOST_REQUIRE_EQUAL (original_seg.base_offset , model::offset{10 });
1181+
1182+ cloud_storage::segment_meta compacted_seg = original_seg;
1183+ compacted_seg.size_bytes = original_seg.size_bytes / 2 ;
1184+ compacted_seg.is_compacted = true ;
1185+
1186+ // In production the archiver adds replacement segments without the
1187+ // safe_segment_meta_to_add gate, so call add() directly.
1188+ BOOST_REQUIRE (get_stm_manifest_mut ().add (compacted_seg));
1189+
1190+ BOOST_REQUIRE_NE (
1191+ cloud_storage::partition_manifest::generate_remote_segment_name (
1192+ original_seg),
1193+ cloud_storage::partition_manifest::generate_remote_segment_name (
1194+ compacted_seg));
1195+
1196+ get_stm_manifest_mut ().process_anomalies (
1197+ model::timestamp::now (),
1198+ result.last_scrubbed_offset ,
1199+ result.status ,
1200+ result.detected );
1201+
1202+ const auto & filtered = get_stm_manifest ().detected_anomalies ();
1203+ BOOST_REQUIRE_EQUAL (filtered.segment_metadata_anomalies .size (), 0 );
1204+ BOOST_REQUIRE (!filtered.has_value ());
1205+ }
1206+
9951207FIXTURE_TEST (test_filtering_of_archive_segments, bucket_view_fixture) {
9961208 /*
9971209 * This test deletes two segment objects from the cloud: one in the STM
0 commit comments