Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2764,11 +2764,25 @@ void partition_manifest::process_anomalies(

if (meta.committed_offset >= get_start_offset()) {
// The segment might have been missing because it was merged with
// something else. If the offset range doesn't match a segment
// exactly, discard the anomaly. Only segments from the STM
// manifest may be merged/reuploaded.
return !segment_with_offset_range_exists(
meta.base_offset, meta.committed_offset);
// something else or replaced by a compacted reupload. Only
// segments from the STM manifest may be merged/reuploaded.
auto iter = find(meta.base_offset);
if (
iter == end()
|| iter->committed_offset != meta.committed_offset) {
// Offset range no longer matches any segment in the
// manifest (merged or reuploaded with different
// boundaries). Discard the anomaly.
return true;
}
// The offset range still exists. Verify the current segment
// is the same one reported as missing, not a replacement
// (e.g., a compacted reupload with the same offset range but
// different size/term). If a different segment now covers
// the range, the old one being absent from cloud storage is
// expected.
return generate_remote_segment_name(*iter)
!= generate_remote_segment_name(meta);
} else {
// Segment belongs to the archive. No reuploads are done here.
return false;
Expand All @@ -2785,10 +2799,18 @@ void partition_manifest::process_anomalies(
}

if (anomaly_meta.at.committed_offset >= get_start_offset()) {
// Similarly to the missing segment case, if the boundaries of the
// segment where the anomaly was detected changed, drop it.
return !segment_with_offset_range_exists(
anomaly_meta.at.base_offset, anomaly_meta.at.committed_offset);
// Similarly to the missing segment case, if the boundaries
// of the segment where the anomaly was detected changed or
// the segment was replaced (e.g., compacted reupload),
// drop the anomaly.
auto iter = find(anomaly_meta.at.base_offset);
if (
iter == end()
|| iter->committed_offset != anomaly_meta.at.committed_offset) {
return true;
}
return generate_remote_segment_name(*iter)
!= generate_remote_segment_name(anomaly_meta.at);
} else {
return false;
}
Expand Down
115 changes: 115 additions & 0 deletions src/v/cloud_storage/tests/anomalies_detector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,121 @@ FIXTURE_TEST(test_filtering_of_segment_merge, bucket_view_fixture) {
BOOST_REQUIRE_EQUAL(filtered_anomalies, filtered_from_partials);
}

FIXTURE_TEST(test_filtering_of_compacted_reupload, bucket_view_fixture) {
/*
* Test for race between scrubber and compacted segment reupload:
* 1. Scrubber downloads stm manifest and records segment S as missing
* 2. Compacted reupload replaces S with S' at the same offset range
* but different size
* 3. GC deletes the original segment S from cloud storage
* 4. process_anomalies must recognise S as a false positive because
* S' now covers the range
*
* Before the fix, process_anomalies only checked offset range
* existence and would keep the anomaly since S' has the same offsets.
* The fix compares generate_remote_segment_name() which encodes
* size_bytes, so a replacement with different size is detected.
*/
constexpr std::string_view stm_man = R"json(
{
"version": 3,
"namespace": "kafka",
"topic": "panda-topic",
"partition": 0,
"revision": 1,
"start_offset": 0,
"last_offset": 29,
"insync_offset": 100,
"segments": {
"0-1-v1.log": {
"size_bytes": 1024,
"base_offset": 0,
"committed_offset": 9,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 0,
"delta_offset_end": 2,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
},
"10-1-v1.log": {
"size_bytes": 1024,
"base_offset": 10,
"committed_offset": 19,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 2,
"delta_offset_end": 4,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
},
"20-1-v1.log": {
"size_bytes": 1024,
"base_offset": 20,
"committed_offset": 29,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 4,
"delta_offset_end": 6,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
}
}
}
)json";

init_view(stm_man, {});

// Target the middle segment for the reupload scenario.
const auto original_seg = *std::next(get_stm_manifest().begin());
BOOST_REQUIRE_EQUAL(original_seg.base_offset, model::offset{10});

// Remove the original segment from cloud storage so the detector
// reports it as missing.
remove_segment(get_stm_manifest(), original_seg);

const auto result = run_detector(archival::run_quota_t{100});
BOOST_REQUIRE_EQUAL(result.status, cloud_storage::scrub_status::full);
BOOST_REQUIRE(result.detected.has_value());
BOOST_REQUIRE_EQUAL(result.detected.missing_segments.size(), 1);

// Simulate a compacted reupload: replace the segment with one at the
// same offset range but smaller size (compaction removed tombstones).
cloud_storage::segment_meta compacted_seg = original_seg;
compacted_seg.size_bytes = original_seg.size_bytes / 2;
compacted_seg.is_compacted = true;

BOOST_REQUIRE(
get_stm_manifest_mut().safe_segment_meta_to_add(compacted_seg));
BOOST_REQUIRE(get_stm_manifest_mut().add(compacted_seg));

// Verify the segment name actually differs (the name encodes size for
// sname_format v2/v3).
BOOST_REQUIRE_NE(
cloud_storage::partition_manifest::generate_remote_segment_name(
original_seg),
cloud_storage::partition_manifest::generate_remote_segment_name(
compacted_seg));

// process_anomalies should filter out the missing segment because the
// manifest entry now refers to a different (compacted) segment.
get_stm_manifest_mut().process_anomalies(
model::timestamp::now(),
result.last_scrubbed_offset,
result.status,
result.detected);

const auto& filtered = get_stm_manifest().detected_anomalies();
BOOST_REQUIRE_EQUAL(filtered.missing_segments.size(), 0);
BOOST_REQUIRE(!filtered.has_value());
}

FIXTURE_TEST(test_filtering_of_archive_segments, bucket_view_fixture) {
/*
* This test deletes two segment objects from the cloud: one in the STM
Expand Down