Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,18 +489,14 @@ bool partition_manifest::contains(const segment_name& name) const {
return _segments.contains(maybe_key->base_offset);
}

bool partition_manifest::segment_with_offset_range_exists(
model::offset base, model::offset committed) const {
if (auto iter = find(base); iter != end()) {
const auto expected_committed
= _segments.get_committed_offset_column().at_index(iter.index());

// false when committed offset doesn't match
return committed == *expected_committed;
} else {
// base offset doesn't match any segment
bool partition_manifest::segment_with_same_identity_exists(
const partition_manifest::value& meta) const {
auto iter = find(meta.base_offset);
if (iter == end() || iter->committed_offset != meta.committed_offset) {
return false;
}
return generate_remote_segment_name(*iter)
== generate_remote_segment_name(meta);
}

void partition_manifest::delete_replaced_segments() { _replaced.clear(); }
Expand Down Expand Up @@ -2768,12 +2764,10 @@ void partition_manifest::process_anomalies(
}

if (meta.committed_offset >= get_start_offset()) {
// The segment might have been missing because it was merged with
// something else. If the offset range doesn't match a segment
// exactly, discard the anomaly. Only segments from the STM
// manifest may be merged/reuploaded.
return !segment_with_offset_range_exists(
meta.base_offset, meta.committed_offset);
// The segment might have been missing because it was merged
// with something else or replaced by a compacted reupload.
// Only segments from the STM manifest may be merged/reuploaded.
return !segment_with_same_identity_exists(meta);
} else {
// Segment belongs to the archive. No reuploads are done here.
return false;
Expand All @@ -2790,10 +2784,11 @@ void partition_manifest::process_anomalies(
}

if (anomaly_meta.at.committed_offset >= get_start_offset()) {
// Similarly to the missing segment case, if the boundaries of the
// segment where the anomaly was detected changed, drop it.
return !segment_with_offset_range_exists(
anomaly_meta.at.base_offset, anomaly_meta.at.committed_offset);
// Similarly to the missing segment case, if the boundaries
// of the segment where the anomaly was detected changed or
// the segment was replaced (e.g., compacted reupload),
// drop the anomaly.
return !segment_with_same_identity_exists(anomaly_meta.at);
} else {
return false;
}
Expand Down
7 changes: 3 additions & 4 deletions src/v/cloud_storage/partition_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,9 @@ class partition_manifest : public base_manifest {
bool contains(const key& key) const;
bool contains(const segment_name& name) const;

/// Check if the provided offset range matches any segment in the manifest
/// exactly.
bool segment_with_offset_range_exists(
model::offset base, model::offset committed) const;
/// Check if the manifest contains a segment with the same identity
/// (offset range, size, term) as the provided metadata.
bool segment_with_same_identity_exists(const value& meta) const;

struct add_segment_meta_result {
// size in bytes of the segment(s) that has been replaced by this
Expand Down
212 changes: 212 additions & 0 deletions src/v/cloud_storage/tests/anomalies_detector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,218 @@ FIXTURE_TEST(test_filtering_of_segment_merge, bucket_view_fixture) {
BOOST_REQUIRE_EQUAL(filtered_anomalies, filtered_from_partials);
}

FIXTURE_TEST(test_filtering_of_compacted_reupload, bucket_view_fixture) {
/*
* Test for race between scrubber and compacted segment reupload:
* 1. Scrubber downloads stm manifest and records segment S as missing
* 2. Compacted reupload replaces S with S' at the same offset range
* but different size
* 3. GC deletes the original segment S from cloud storage
* 4. process_anomalies must recognise S as a false positive because
* S' now covers the range
*
* Before the fix, process_anomalies only checked offset range
* existence and would keep the anomaly since S' has the same offsets.
* The fix compares generate_remote_segment_name() which encodes
* size_bytes, so a replacement with different size is detected.
*/
constexpr std::string_view stm_man = R"json(
{
"version": 3,
"namespace": "kafka",
"topic": "panda-topic",
"partition": 0,
"revision": 1,
"start_offset": 0,
"last_offset": 29,
"insync_offset": 100,
"segments": {
"0-1-v1.log": {
"size_bytes": 1024,
"base_offset": 0,
"committed_offset": 9,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 0,
"delta_offset_end": 2,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
},
"10-1-v1.log": {
"size_bytes": 1024,
"base_offset": 10,
"committed_offset": 19,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 2,
"delta_offset_end": 4,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
},
"20-1-v1.log": {
"size_bytes": 1024,
"base_offset": 20,
"committed_offset": 29,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 4,
"delta_offset_end": 6,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
}
}
}
)json";

init_view(stm_man, {});

// Target the middle segment for the reupload scenario.
const auto original_seg = *std::next(get_stm_manifest().begin());
BOOST_REQUIRE_EQUAL(original_seg.base_offset, model::offset{10});

// Remove the original segment from cloud storage so the detector
// reports it as missing.
remove_segment(get_stm_manifest(), original_seg);

const auto result = run_detector(archival::run_quota_t{100});
BOOST_REQUIRE_EQUAL(result.status, cloud_storage::scrub_status::full);
BOOST_REQUIRE(result.detected.has_value());
BOOST_REQUIRE_EQUAL(result.detected.missing_segments.size(), 1);

// Simulate a compacted reupload: replace the segment with one at the
// same offset range but smaller size (compaction removed tombstones).
cloud_storage::segment_meta compacted_seg = original_seg;
compacted_seg.size_bytes = original_seg.size_bytes / 2;
compacted_seg.is_compacted = true;

BOOST_REQUIRE(
get_stm_manifest_mut().safe_segment_meta_to_add(compacted_seg));
BOOST_REQUIRE(get_stm_manifest_mut().add(compacted_seg));

// Verify the segment name actually differs (the name encodes size for
// sname_format v2/v3).
BOOST_REQUIRE_NE(
cloud_storage::partition_manifest::generate_remote_segment_name(
original_seg),
cloud_storage::partition_manifest::generate_remote_segment_name(
compacted_seg));

// process_anomalies should filter out the missing segment because the
// manifest entry now refers to a different (compacted) segment.
get_stm_manifest_mut().process_anomalies(
model::timestamp::now(),
result.last_scrubbed_offset,
result.status,
result.detected);

const auto& filtered = get_stm_manifest().detected_anomalies();
BOOST_REQUIRE_EQUAL(filtered.missing_segments.size(), 0);
BOOST_REQUIRE(!filtered.has_value());
}

FIXTURE_TEST(
test_filtering_of_metadata_anomaly_after_compacted_reupload,
bucket_view_fixture) {
/*
* Same race as test_filtering_of_compacted_reupload but for metadata
* anomalies instead of missing segments:
* 1. Scrubber sees segments S0 [0-9] and S1 [10-19] with a
* non-monotonic delta offset → records a segment_metadata_anomaly
* on S1
* 2. Compacted reupload replaces S1 with S1' (same offset range,
* different size)
* 3. process_anomalies must filter the anomaly because the segment
* identity changed (generate_remote_segment_name encodes size)
*/
constexpr std::string_view stm_man = R"json(
{
"version": 3,
"namespace": "kafka",
"topic": "panda-topic",
"partition": 0,
"revision": 1,
"start_offset": 0,
"last_offset": 19,
"insync_offset": 100,
"segments": {
"0-1-v1.log": {
"size_bytes": 1024,
"base_offset": 0,
"committed_offset": 9,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 4,
"delta_offset_end": 6,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
},
"10-1-v1.log": {
"size_bytes": 1024,
"base_offset": 10,
"committed_offset": 19,
"base_timestamp": 1000,
"max_timestamp": 1000,
"delta_offset": 2,
"delta_offset_end": 4,
"ntp_revision": 1,
"archiver_term": 1,
"segment_term": 1,
"sname_format": 2
}
}
}
)json";

init_view(stm_man, {});

// S1 has delta_offset=2 while S0 has delta_offset=4 → non-monotonic.
const auto result = run_detector(archival::run_quota_t{100});
BOOST_REQUIRE_EQUAL(result.status, cloud_storage::scrub_status::full);
BOOST_REQUIRE(result.detected.has_value());
BOOST_REQUIRE_EQUAL(result.detected.segment_metadata_anomalies.size(), 1);

const auto& anomaly = *result.detected.segment_metadata_anomalies.begin();
BOOST_REQUIRE_EQUAL(
anomaly.type, cloud_storage::anomaly_type::non_monotonical_delta);

// Simulate compacted reupload of S1: same offset range and deltas
// (compaction doesn't change delta offsets), but smaller size.
const auto original_seg = *std::next(get_stm_manifest().begin());
BOOST_REQUIRE_EQUAL(original_seg.base_offset, model::offset{10});

cloud_storage::segment_meta compacted_seg = original_seg;
compacted_seg.size_bytes = original_seg.size_bytes / 2;
compacted_seg.is_compacted = true;

// In production the archiver adds replacement segments without the
// safe_segment_meta_to_add gate, so call add() directly.
BOOST_REQUIRE(get_stm_manifest_mut().add(compacted_seg));

BOOST_REQUIRE_NE(
cloud_storage::partition_manifest::generate_remote_segment_name(
original_seg),
cloud_storage::partition_manifest::generate_remote_segment_name(
compacted_seg));

get_stm_manifest_mut().process_anomalies(
model::timestamp::now(),
result.last_scrubbed_offset,
result.status,
result.detected);

const auto& filtered = get_stm_manifest().detected_anomalies();
BOOST_REQUIRE_EQUAL(filtered.segment_metadata_anomalies.size(), 0);
BOOST_REQUIRE(!filtered.has_value());
}

FIXTURE_TEST(test_filtering_of_archive_segments, bucket_view_fixture) {
/*
* This test deletes two segment objects from the cloud: one in the STM
Expand Down
Loading