diff --git a/src/v/cloud_storage/partition_manifest.cc b/src/v/cloud_storage/partition_manifest.cc index b3d693e9a6848..06a934318f1d9 100644 --- a/src/v/cloud_storage/partition_manifest.cc +++ b/src/v/cloud_storage/partition_manifest.cc @@ -489,18 +489,14 @@ bool partition_manifest::contains(const segment_name& name) const { return _segments.contains(maybe_key->base_offset); } -bool partition_manifest::segment_with_offset_range_exists( - model::offset base, model::offset committed) const { - if (auto iter = find(base); iter != end()) { - const auto expected_committed - = _segments.get_committed_offset_column().at_index(iter.index()); - - // false when committed offset doesn't match - return committed == *expected_committed; - } else { - // base offset doesn't match any segment +bool partition_manifest::segment_with_same_identity_exists( + const partition_manifest::value& meta) const { + auto iter = find(meta.base_offset); + if (iter == end() || iter->committed_offset != meta.committed_offset) { return false; } + return generate_remote_segment_name(*iter) + == generate_remote_segment_name(meta); } void partition_manifest::delete_replaced_segments() { _replaced.clear(); } @@ -2768,12 +2764,10 @@ void partition_manifest::process_anomalies( } if (meta.committed_offset >= get_start_offset()) { - // The segment might have been missing because it was merged with - // something else. If the offset range doesn't match a segment - // exactly, discard the anomaly. Only segments from the STM - // manifest may be merged/reuploaded. - return !segment_with_offset_range_exists( - meta.base_offset, meta.committed_offset); + // The segment might have been missing because it was merged + // with something else or replaced by a compacted reupload. + // Only segments from the STM manifest may be merged/reuploaded. + return !segment_with_same_identity_exists(meta); } else { // Segment belongs to the archive. No reuploads are done here. return false; @@ -2790,10 +2784,11 @@ void partition_manifest::process_anomalies( } if (anomaly_meta.at.committed_offset >= get_start_offset()) { - // Similarly to the missing segment case, if the boundaries of the - // segment where the anomaly was detected changed, drop it. - return !segment_with_offset_range_exists( - anomaly_meta.at.base_offset, anomaly_meta.at.committed_offset); + // Similarly to the missing segment case, if the boundaries + // of the segment where the anomaly was detected changed or + // the segment was replaced (e.g., compacted reupload), + // drop the anomaly. + return !segment_with_same_identity_exists(anomaly_meta.at); } else { return false; } diff --git a/src/v/cloud_storage/partition_manifest.h b/src/v/cloud_storage/partition_manifest.h index 7cc10c2d8b3a0..9a11712a58b41 100644 --- a/src/v/cloud_storage/partition_manifest.h +++ b/src/v/cloud_storage/partition_manifest.h @@ -323,10 +323,9 @@ class partition_manifest : public base_manifest { bool contains(const key& key) const; bool contains(const segment_name& name) const; - /// Check if the provided offset range matches any segment in the manifest - /// exactly. - bool segment_with_offset_range_exists( - model::offset base, model::offset committed) const; + /// Check if the manifest contains a segment with the same identity + /// (offset range, size, term) as the provided metadata. + bool segment_with_same_identity_exists(const value& meta) const; struct add_segment_meta_result { // size in bytes of the segment(s) that has been replaced by this diff --git a/src/v/cloud_storage/tests/anomalies_detector_test.cc b/src/v/cloud_storage/tests/anomalies_detector_test.cc index b3cd0a9687bb0..6d32f22ef8f4d 100644 --- a/src/v/cloud_storage/tests/anomalies_detector_test.cc +++ b/src/v/cloud_storage/tests/anomalies_detector_test.cc @@ -992,6 +992,218 @@ FIXTURE_TEST(test_filtering_of_segment_merge, bucket_view_fixture) { BOOST_REQUIRE_EQUAL(filtered_anomalies, filtered_from_partials); } +FIXTURE_TEST(test_filtering_of_compacted_reupload, bucket_view_fixture) { + /* + * Test for race between scrubber and compacted segment reupload: + * 1. Scrubber downloads stm manifest and records segment S as missing + * 2. Compacted reupload replaces S with S' at the same offset range + * but different size + * 3. GC deletes the original segment S from cloud storage + * 4. process_anomalies must recognise S as a false positive because + * S' now covers the range + * + * Before the fix, process_anomalies only checked offset range + * existence and would keep the anomaly since S' has the same offsets. + * The fix compares generate_remote_segment_name() which encodes + * size_bytes, so a replacement with different size is detected. + */ + constexpr std::string_view stm_man = R"json( +{ + "version": 3, + "namespace": "kafka", + "topic": "panda-topic", + "partition": 0, + "revision": 1, + "start_offset": 0, + "last_offset": 29, + "insync_offset": 100, + "segments": { + "0-1-v1.log": { + "size_bytes": 1024, + "base_offset": 0, + "committed_offset": 9, + "base_timestamp": 1000, + "max_timestamp": 1000, + "delta_offset": 0, + "delta_offset_end": 2, + "ntp_revision": 1, + "archiver_term": 1, + "segment_term": 1, + "sname_format": 2 + }, + "10-1-v1.log": { + "size_bytes": 1024, + "base_offset": 10, + "committed_offset": 19, + "base_timestamp": 1000, + "max_timestamp": 1000, + "delta_offset": 2, + "delta_offset_end": 4, + "ntp_revision": 1, + "archiver_term": 1, + "segment_term": 1, + "sname_format": 2 + }, + "20-1-v1.log": { + "size_bytes": 1024, + "base_offset": 20, + "committed_offset": 29, + "base_timestamp": 1000, + "max_timestamp": 1000, + "delta_offset": 4, + "delta_offset_end": 6, + "ntp_revision": 1, + "archiver_term": 1, + "segment_term": 1, + "sname_format": 2 + } + } +} +)json"; + + init_view(stm_man, {}); + + // Target the middle segment for the reupload scenario. + const auto original_seg = *std::next(get_stm_manifest().begin()); + BOOST_REQUIRE_EQUAL(original_seg.base_offset, model::offset{10}); + + // Remove the original segment from cloud storage so the detector + // reports it as missing. + remove_segment(get_stm_manifest(), original_seg); + + const auto result = run_detector(archival::run_quota_t{100}); + BOOST_REQUIRE_EQUAL(result.status, cloud_storage::scrub_status::full); + BOOST_REQUIRE(result.detected.has_value()); + BOOST_REQUIRE_EQUAL(result.detected.missing_segments.size(), 1); + + // Simulate a compacted reupload: replace the segment with one at the + // same offset range but smaller size (compaction removed tombstones). + cloud_storage::segment_meta compacted_seg = original_seg; + compacted_seg.size_bytes = original_seg.size_bytes / 2; + compacted_seg.is_compacted = true; + + BOOST_REQUIRE( + get_stm_manifest_mut().safe_segment_meta_to_add(compacted_seg)); + BOOST_REQUIRE(get_stm_manifest_mut().add(compacted_seg)); + + // Verify the segment name actually differs (the name encodes size for + // sname_format v2/v3). + BOOST_REQUIRE_NE( + cloud_storage::partition_manifest::generate_remote_segment_name( + original_seg), + cloud_storage::partition_manifest::generate_remote_segment_name( + compacted_seg)); + + // process_anomalies should filter out the missing segment because the + // manifest entry now refers to a different (compacted) segment. + get_stm_manifest_mut().process_anomalies( + model::timestamp::now(), + result.last_scrubbed_offset, + result.status, + result.detected); + + const auto& filtered = get_stm_manifest().detected_anomalies(); + BOOST_REQUIRE_EQUAL(filtered.missing_segments.size(), 0); + BOOST_REQUIRE(!filtered.has_value()); +} + +FIXTURE_TEST( + test_filtering_of_metadata_anomaly_after_compacted_reupload, + bucket_view_fixture) { + /* + * Same race as test_filtering_of_compacted_reupload but for metadata + * anomalies instead of missing segments: + * 1. Scrubber sees segments S0 [0-9] and S1 [10-19] with a + * non-monotonic delta offset → records a segment_metadata_anomaly + * on S1 + * 2. Compacted reupload replaces S1 with S1' (same offset range, + * different size) + * 3. process_anomalies must filter the anomaly because the segment + * identity changed (generate_remote_segment_name encodes size) + */ + constexpr std::string_view stm_man = R"json( +{ + "version": 3, + "namespace": "kafka", + "topic": "panda-topic", + "partition": 0, + "revision": 1, + "start_offset": 0, + "last_offset": 19, + "insync_offset": 100, + "segments": { + "0-1-v1.log": { + "size_bytes": 1024, + "base_offset": 0, + "committed_offset": 9, + "base_timestamp": 1000, + "max_timestamp": 1000, + "delta_offset": 4, + "delta_offset_end": 6, + "ntp_revision": 1, + "archiver_term": 1, + "segment_term": 1, + "sname_format": 2 + }, + "10-1-v1.log": { + "size_bytes": 1024, + "base_offset": 10, + "committed_offset": 19, + "base_timestamp": 1000, + "max_timestamp": 1000, + "delta_offset": 2, + "delta_offset_end": 4, + "ntp_revision": 1, + "archiver_term": 1, + "segment_term": 1, + "sname_format": 2 + } + } +} +)json"; + + init_view(stm_man, {}); + + // S1 has delta_offset=2 while S0 has delta_offset=4 → non-monotonic. + const auto result = run_detector(archival::run_quota_t{100}); + BOOST_REQUIRE_EQUAL(result.status, cloud_storage::scrub_status::full); + BOOST_REQUIRE(result.detected.has_value()); + BOOST_REQUIRE_EQUAL(result.detected.segment_metadata_anomalies.size(), 1); + + const auto& anomaly = *result.detected.segment_metadata_anomalies.begin(); + BOOST_REQUIRE_EQUAL( + anomaly.type, cloud_storage::anomaly_type::non_monotonical_delta); + + // Simulate compacted reupload of S1: same offset range and deltas + // (compaction doesn't change delta offsets), but smaller size. + const auto original_seg = *std::next(get_stm_manifest().begin()); + BOOST_REQUIRE_EQUAL(original_seg.base_offset, model::offset{10}); + + cloud_storage::segment_meta compacted_seg = original_seg; + compacted_seg.size_bytes = original_seg.size_bytes / 2; + compacted_seg.is_compacted = true; + + // In production the archiver adds replacement segments without the + // safe_segment_meta_to_add gate, so call add() directly. + BOOST_REQUIRE(get_stm_manifest_mut().add(compacted_seg)); + + BOOST_REQUIRE_NE( + cloud_storage::partition_manifest::generate_remote_segment_name( + original_seg), + cloud_storage::partition_manifest::generate_remote_segment_name( + compacted_seg)); + + get_stm_manifest_mut().process_anomalies( + model::timestamp::now(), + result.last_scrubbed_offset, + result.status, + result.detected); + + const auto& filtered = get_stm_manifest().detected_anomalies(); + BOOST_REQUIRE_EQUAL(filtered.segment_metadata_anomalies.size(), 0); + BOOST_REQUIRE(!filtered.has_value()); +} + FIXTURE_TEST(test_filtering_of_archive_segments, bucket_view_fixture) { /* * This test deletes two segment objects from the cloud: one in the STM