@@ -299,3 +299,138 @@ SEASTAR_THREAD_TEST_CASE(test_writes_disabled) {
299299 return e.code () == pps::error_code::writes_disabled;
300300 });
301301}
302+
303+ // / Transport that simulates a write collision during soft delete.
304+ // /
305+ // / Every write to the _schemas topic is tagged with the offset the writer
306+ // / expects to land at. If another node writes first, the offset doesn't
307+ // / match — a "write collision" — and the operation retries. Before
308+ // / retrying, read_sync() catches up on the topic, consuming the winning
309+ // / writer's record into the store.
310+ // /
311+ // / This transport drives that sequence:
312+ // / 1. Initial read_sync(): HWM=1, _loaded_offset already 0 → no-op
313+ // / 2. First produce (the delete): returns wrong offset → collision
314+ // / 3. Retry read_sync(): HWM=2, consumes the competing delete batch
315+ // / 4. Retry attempt: subject already deleted → returns versions including
316+ // / deletes without re-issuing produce
317+ class colliding_transport final : public pps::transport {
318+ public:
319+ explicit colliding_transport (
320+ pps::context_subject sub, model::offset collision_offset)
321+ : _collision_offset(collision_offset) {
322+ // Build the competing writer's delete batch at the collision offset.
323+ storage::record_batch_builder rb{
324+ model::record_batch_type::raft_data, collision_offset};
325+ rb.add_raw_kv (
326+ to_json_iobuf (
327+ pps::delete_subject_key{
328+ .seq {collision_offset}, .node {model::node_id{99 }}, .sub {sub}}),
329+ to_json_iobuf (pps::delete_subject_value{.sub {sub}}));
330+ _competing_batch = std::move (rb).build ();
331+ }
332+
333+ ss::future<> stop () final { return ss::now (); }
334+ ss::future<cluster::errc> create_topic (
335+ model::topic_namespace_view,
336+ int32_t ,
337+ cluster::topic_properties,
338+ std::optional<int16_t >) final {
339+ throw std::runtime_error (
340+ " colliding_transport::create_topic not implemented" );
341+ }
342+
343+ ss::future<pps::produce_result> produce (model::record_batch) override {
344+ ++_produce_calls;
345+ if (_produce_calls == 1 ) {
346+ // Collision: return an offset that doesn't match write_at.
347+ co_return pps::produce_result{
348+ .base_offset = _collision_offset + model::offset{1 }};
349+ }
350+ // The retry should never produce — it finds the subject
351+ // already deleted and returns the version list via
352+ // include_deleted::yes.
353+ throw std::runtime_error (" unexpected second produce call" );
354+ }
355+
356+ ss::future<model::offset> get_high_watermark () override {
357+ if (_produce_calls == 0 ) {
358+ // Before collision: only the schema record at offset 0.
359+ co_return model::offset{1 };
360+ }
361+ // After collision: schema at 0, competing delete at 1.
362+ co_return model::offset{2 };
363+ }
364+
365+ ss::future<> consume_range (
366+ model::offset start,
367+ model::offset end,
368+ ss::noncopyable_function<ss::future<>(model::record_batch)> consumer)
369+ override {
370+ if (
371+ _competing_batch.has_value () && start <= _collision_offset
372+ && _collision_offset < end) {
373+ co_await consumer (std::move (*_competing_batch));
374+ _competing_batch.reset ();
375+ }
376+ }
377+
378+ int produce_calls () const { return _produce_calls; }
379+
380+ private:
381+ std::optional<model::record_batch> _competing_batch;
382+ model::offset _collision_offset;
383+ int _produce_calls{0 };
384+ };
385+
386+ SEASTAR_THREAD_TEST_CASE (test_delete_subject_write_collision_retry) {
387+ pps::enable_qualified_subjects::set_local (true );
388+ auto reset_flag = ss::defer (
389+ [] { pps::enable_qualified_subjects::reset_local (); });
390+
391+ // Store setup: insert a schema so there's something to delete.
392+ pps::sharded_store store;
393+ store.start (pps::is_mutable::yes, ss::default_smp_service_group ()).get ();
394+ auto stop_store = ss::defer ([&store]() { store.stop ().get (); });
395+
396+ const auto version = pps::schema_version{1 };
397+ store
398+ .upsert (
399+ pps::seq_marker{
400+ .seq = model::offset{0 },
401+ .node = model::node_id{0 },
402+ .version = version,
403+ .key_type = pps::seq_marker_key_type::schema},
404+ pps::subject_schema{subject0, int_def0.share ()},
405+ id0,
406+ version,
407+ pps::is_deleted::no)
408+ .get ();
409+
410+ // The competing delete will land at offset 1 (the next available).
411+ colliding_transport transport (subject0, model::offset{1 });
412+
413+ ss::sharded<pps::seq_writer> seq;
414+ seq
415+ .start (
416+ model::node_id{0 },
417+ ss::default_smp_service_group (),
418+ std::ref (transport),
419+ std::reference_wrapper (store),
420+ ss::sharded_parameter (
421+ [] { return std::make_unique<sequence_state_checker_test>(); }))
422+ .get ();
423+ auto stop_seq = ss::defer ([&seq]() { seq.stop ().get (); });
424+
425+ // Advance _loaded_offset past the schema record.
426+ seq.local ().advance_offset (model::offset{0 }).get ();
427+
428+ // First produce collides. The retry's read_sync consumes the
429+ // competing delete batch, finds the subject already soft-deleted,
430+ // and returns the version list via include_deleted::yes.
431+ auto versions = seq.local ().delete_subject_impermanent (subject0).get ();
432+
433+ BOOST_REQUIRE_EQUAL (versions.size (), 1 );
434+ BOOST_CHECK_EQUAL (versions[0 ], version);
435+ BOOST_CHECK_EQUAL (transport.produce_calls (), 1 );
436+ }
0 commit comments