Skip to content

Commit b73bcef

Browse files
oleimanclaude
andcommitted
sr/seq_writer: fix retry crash in soft delete
do_delete_subject_impermanent had a latent ordering bug: get_versions(include_deleted::no) was called BEFORE is_subject_deleted(), so on retry after a write collision where the subject was already soft-deleted (by the winning writer), get_versions would throw subject_not_found — which propagated as HTTP 404 to the client. Fix by checking is_subject_deleted first. A cached version list (via lw_shared_ptr) preserves the pre-delete version list across retries, since after a subject-level soft delete all versions are marked deleted and the store can no longer distinguish individually-deleted versions from those deleted as part of the subject-level operation. This bug was never observed on the kafka::client transport because the full Kafka protocol round-trip (SASL, request queuing, acks) acts as a natural pacer between different nodes' writes, making write collisions rare. The RPC transport's lower latency tightens the timing window enough to trigger collisions routinely in integration tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent 3b85e1a commit b73bcef

4 files changed

Lines changed: 167 additions & 4 deletions

File tree

src/v/pandaproxy/schema_registry/seq_writer.cc

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -586,13 +586,31 @@ seq_writer::do_delete_subject_impermanent(
586586
context_subject sub, model::offset write_at) {
587587
co_await check_mutable(sub.ctx, sub.sub);
588588

589+
// On retry after a write collision, the subject may already be
590+
// soft-deleted (by the collided write or a concurrent writer). Return
591+
// the cached version list from the first attempt, since the store no
592+
// longer distinguishes individually-deleted versions after a
593+
// subject-level soft delete.
594+
if (co_await _store.is_subject_deleted(sub)) {
595+
if (
596+
_delete_versions_cache.has_value()
597+
&& _delete_versions_cache->sub == sub) {
598+
auto versions = std::move(_delete_versions_cache->versions);
599+
_delete_versions_cache.reset();
600+
co_return versions;
601+
}
602+
// Subject was already deleted before our first attempt.
603+
co_return co_await _store.get_versions(sub, include_deleted::yes);
604+
}
605+
589606
// Grab the versions before they're gone.
590607
auto versions = co_await _store.get_versions(sub, include_deleted::no);
591608

592-
// Inspect the subject to see if its already deleted
593-
if (co_await _store.is_subject_deleted(sub)) {
594-
co_return std::make_optional(std::move(versions));
595-
}
609+
// Cache versions for potential retry — after a subject-level soft
610+
// delete all versions are marked deleted and the pre-delete list
611+
// cannot be reconstructed from the store. Tagged with subject so
612+
// stale entries from a prior delete of a different subject are ignored.
613+
_delete_versions_cache.emplace(delete_version_cache{sub, versions.copy()});
596614

597615
// Check that the subject is not referenced
598616
if (co_await _store.is_referenced(sub, std::nullopt)) {
@@ -624,6 +642,7 @@ seq_writer::do_delete_subject_impermanent(
624642
}
625643

626644
if (co_await produce_and_apply(write_at, std::move(rb).build())) {
645+
_delete_versions_cache.reset();
627646
co_return versions;
628647
} else {
629648
// Pass up a None, our caller's cue to retry

src/v/pandaproxy/schema_registry/seq_writer.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,15 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
225225
/// Shard 0 only: Serialize write operations.
226226
ssx::semaphore _write_sem{1, "pproxy/schema-write"};
227227

228+
/// Shard 0 only: cached version list for delete_subject_impermanent
229+
/// retries. Protected by _write_sem. Tagged with subject so stale
230+
/// entries from a previous delete of a different subject are ignored.
231+
struct delete_version_cache {
232+
context_subject sub;
233+
chunked_vector<schema_version> versions;
234+
};
235+
std::optional<delete_version_cache> _delete_versions_cache;
236+
228237
// ======================
229238
// End of Shard 0 state
230239
};

src/v/pandaproxy/schema_registry/test/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ redpanda_cc_btest(
216216
":utils",
217217
"//src/v/model",
218218
"//src/v/pandaproxy/schema_registry:core",
219+
"//src/v/storage:record_batch_builder",
219220
"//src/v/test_utils:seastar_boost",
220221
"@boost//:test",
221222
"@seastar",

src/v/pandaproxy/schema_registry/test/consume_to_store.cc

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,3 +299,137 @@ SEASTAR_THREAD_TEST_CASE(test_writes_disabled) {
299299
return e.code() == pps::error_code::writes_disabled;
300300
});
301301
}
302+
303+
/// Transport that simulates a write collision during soft delete.
304+
///
305+
/// Every write to the _schemas topic is tagged with the offset the writer
306+
/// expects to land at. If another node writes first, the offset doesn't
307+
/// match — a "write collision" — and the operation retries. Before
308+
/// retrying, read_sync() catches up on the topic, consuming the winning
309+
/// writer's record into the store.
310+
///
311+
/// This transport drives that sequence:
312+
/// 1. Initial read_sync(): HWM=1, _loaded_offset already 0 → no-op
313+
/// 2. First produce (the delete): returns wrong offset → collision
314+
/// 3. Retry read_sync(): HWM=2, consumes the competing delete batch
315+
/// 4. Retry attempt: subject already deleted → returns cached versions
316+
class colliding_transport final : public pps::transport {
317+
public:
318+
explicit colliding_transport(
319+
pps::context_subject sub, model::offset collision_offset)
320+
: _collision_offset(collision_offset) {
321+
// Build the competing writer's delete batch at the collision offset.
322+
storage::record_batch_builder rb{
323+
model::record_batch_type::raft_data, collision_offset};
324+
rb.add_raw_kv(
325+
to_json_iobuf(
326+
pps::delete_subject_key{
327+
.seq{collision_offset}, .node{model::node_id{99}}, .sub{sub}}),
328+
to_json_iobuf(pps::delete_subject_value{.sub{sub}}));
329+
_competing_batch = std::move(rb).build();
330+
}
331+
332+
ss::future<> stop() final { return ss::now(); }
333+
ss::future<cluster::errc> create_topic(
334+
model::topic_namespace_view,
335+
int32_t,
336+
cluster::topic_properties,
337+
std::optional<int16_t>) final {
338+
throw std::runtime_error(
339+
"colliding_transport::create_topic not implemented");
340+
}
341+
bool has_ephemeral_credentials() const final { return false; }
342+
343+
ss::future<pps::produce_result> produce(model::record_batch) override {
344+
++_produce_calls;
345+
if (_produce_calls == 1) {
346+
// Collision: return an offset that doesn't match write_at.
347+
co_return pps::produce_result{
348+
.base_offset = _collision_offset + model::offset{1}};
349+
}
350+
// The retry should never produce — it finds the subject
351+
// already deleted and returns the cached version list.
352+
throw std::runtime_error("unexpected second produce call");
353+
}
354+
355+
ss::future<model::offset> get_high_watermark() override {
356+
if (_produce_calls == 0) {
357+
// Before collision: only the schema record at offset 0.
358+
co_return model::offset{1};
359+
}
360+
// After collision: schema at 0, competing delete at 1.
361+
co_return model::offset{2};
362+
}
363+
364+
ss::future<> consume_range(
365+
model::offset start,
366+
model::offset end,
367+
ss::noncopyable_function<ss::future<>(model::record_batch)> consumer)
368+
override {
369+
if (
370+
_competing_batch.has_value() && start <= _collision_offset
371+
&& _collision_offset < end) {
372+
co_await consumer(std::move(*_competing_batch));
373+
_competing_batch.reset();
374+
}
375+
}
376+
377+
int produce_calls() const { return _produce_calls; }
378+
379+
private:
380+
std::optional<model::record_batch> _competing_batch;
381+
model::offset _collision_offset;
382+
int _produce_calls{0};
383+
};
384+
385+
SEASTAR_THREAD_TEST_CASE(test_delete_subject_write_collision_retry) {
386+
pps::enable_qualified_subjects::set_local(true);
387+
auto reset_flag = ss::defer(
388+
[] { pps::enable_qualified_subjects::reset_local(); });
389+
390+
// Store setup: insert a schema so there's something to delete.
391+
pps::sharded_store store;
392+
store.start(pps::is_mutable::yes, ss::default_smp_service_group()).get();
393+
auto stop_store = ss::defer([&store]() { store.stop().get(); });
394+
395+
const auto version = pps::schema_version{1};
396+
store
397+
.upsert(
398+
pps::seq_marker{
399+
.seq = model::offset{0},
400+
.node = model::node_id{0},
401+
.version = version,
402+
.key_type = pps::seq_marker_key_type::schema},
403+
pps::subject_schema{subject0, int_def0.share()},
404+
id0,
405+
version,
406+
pps::is_deleted::no)
407+
.get();
408+
409+
// The competing delete will land at offset 1 (the next available).
410+
colliding_transport transport(subject0, model::offset{1});
411+
412+
ss::sharded<pps::seq_writer> seq;
413+
seq
414+
.start(
415+
model::node_id{0},
416+
ss::default_smp_service_group(),
417+
&transport,
418+
std::reference_wrapper(store),
419+
ss::sharded_parameter(
420+
[] { return std::make_unique<sequence_state_checker_test>(); }))
421+
.get();
422+
auto stop_seq = ss::defer([&seq]() { seq.stop().get(); });
423+
424+
// Advance _loaded_offset past the schema record.
425+
seq.local().advance_offset(model::offset{0}).get();
426+
427+
// First produce collides. The retry's read_sync consumes the
428+
// competing delete batch, finds the subject already soft-deleted,
429+
// and returns the cached version list from the first attempt.
430+
auto versions = seq.local().delete_subject_impermanent(subject0).get();
431+
432+
BOOST_REQUIRE_EQUAL(versions.size(), 1);
433+
BOOST_CHECK_EQUAL(versions[0], version);
434+
BOOST_CHECK_EQUAL(transport.produce_calls(), 1);
435+
}

0 commit comments

Comments
 (0)