@@ -299,3 +299,139 @@ SEASTAR_THREAD_TEST_CASE(test_writes_disabled) {
299299 return e.code () == pps::error_code::writes_disabled;
300300 });
301301}
302+
303+ // / Transport that simulates a write collision during soft delete.
304+ // /
305+ // / Every write to the _schemas topic is tagged with the offset the writer
306+ // / expects to land at. If another node writes first, the offset doesn't
307+ // / match — a "write collision" — and the operation retries. Before
308+ // / retrying, read_sync() catches up on the topic, consuming the winning
309+ // / writer's record into the store.
310+ // /
311+ // / This transport drives that sequence:
312+ // / 1. Initial read_sync(): HWM=1, _loaded_offset already 0 → no-op
313+ // / 2. First produce (the delete): returns wrong offset → collision
314+ // / 3. Retry read_sync(): HWM=2, consumes the competing delete batch
315+ // / 4. Retry attempt: subject already deleted → returns versions including
316+ // / deletes without re-issuing produce
317+ class colliding_transport final : public pps::transport {
318+ public:
319+ explicit colliding_transport (
320+ pps::context_subject sub, model::offset collision_offset)
321+ : _collision_offset(collision_offset) {
322+ // Build the competing writer's delete batch at the collision offset.
323+ storage::record_batch_builder rb{
324+ model::record_batch_type::raft_data, collision_offset};
325+ rb.add_raw_kv (
326+ to_json_iobuf (
327+ pps::delete_subject_key{
328+ .seq {collision_offset}, .node {model::node_id{99 }}, .sub {sub}}),
329+ to_json_iobuf (pps::delete_subject_value{.sub {sub}}));
330+ _competing_batch = std::move (rb).build ();
331+ }
332+
333+ ss::future<> stop () final { return ss::now (); }
334+ ss::future<cluster::errc> create_topic (
335+ model::topic_namespace_view,
336+ int32_t ,
337+ cluster::topic_properties,
338+ std::optional<int16_t >) final {
339+ throw std::runtime_error (
340+ " colliding_transport::create_topic not implemented" );
341+ }
342+ bool has_ephemeral_credentials () const final { return false ; }
343+
344+ ss::future<pps::produce_result> produce (model::record_batch) override {
345+ ++_produce_calls;
346+ if (_produce_calls == 1 ) {
347+ // Collision: return an offset that doesn't match write_at.
348+ co_return pps::produce_result{
349+ .base_offset = _collision_offset + model::offset{1 }};
350+ }
351+ // The retry should never produce — it finds the subject
352+ // already deleted and returns the version list via
353+ // include_deleted::yes.
354+ throw std::runtime_error (" unexpected second produce call" );
355+ }
356+
357+ ss::future<model::offset> get_high_watermark () override {
358+ if (_produce_calls == 0 ) {
359+ // Before collision: only the schema record at offset 0.
360+ co_return model::offset{1 };
361+ }
362+ // After collision: schema at 0, competing delete at 1.
363+ co_return model::offset{2 };
364+ }
365+
366+ ss::future<> consume_range (
367+ model::offset start,
368+ model::offset end,
369+ ss::noncopyable_function<ss::future<>(model::record_batch)> consumer)
370+ override {
371+ if (
372+ _competing_batch.has_value () && start <= _collision_offset
373+ && _collision_offset < end) {
374+ co_await consumer (std::move (*_competing_batch));
375+ _competing_batch.reset ();
376+ }
377+ }
378+
379+ int produce_calls () const { return _produce_calls; }
380+
381+ private:
382+ std::optional<model::record_batch> _competing_batch;
383+ model::offset _collision_offset;
384+ int _produce_calls{0 };
385+ };
386+
387+ SEASTAR_THREAD_TEST_CASE (test_delete_subject_write_collision_retry) {
388+ pps::enable_qualified_subjects::set_local (true );
389+ auto reset_flag = ss::defer (
390+ [] { pps::enable_qualified_subjects::reset_local (); });
391+
392+ // Store setup: insert a schema so there's something to delete.
393+ pps::sharded_store store;
394+ store.start (pps::is_mutable::yes, ss::default_smp_service_group ()).get ();
395+ auto stop_store = ss::defer ([&store]() { store.stop ().get (); });
396+
397+ const auto version = pps::schema_version{1 };
398+ store
399+ .upsert (
400+ pps::seq_marker{
401+ .seq = model::offset{0 },
402+ .node = model::node_id{0 },
403+ .version = version,
404+ .key_type = pps::seq_marker_key_type::schema},
405+ pps::subject_schema{subject0, int_def0.share ()},
406+ id0,
407+ version,
408+ pps::is_deleted::no)
409+ .get ();
410+
411+ // The competing delete will land at offset 1 (the next available).
412+ colliding_transport transport (subject0, model::offset{1 });
413+
414+ ss::sharded<pps::seq_writer> seq;
415+ seq
416+ .start (
417+ model::node_id{0 },
418+ ss::default_smp_service_group (),
419+ &transport,
420+ std::reference_wrapper (store),
421+ ss::sharded_parameter (
422+ [] { return std::make_unique<sequence_state_checker_test>(); }))
423+ .get ();
424+ auto stop_seq = ss::defer ([&seq]() { seq.stop ().get (); });
425+
426+ // Advance _loaded_offset past the schema record.
427+ seq.local ().advance_offset (model::offset{0 }).get ();
428+
429+ // First produce collides. The retry's read_sync consumes the
430+ // competing delete batch, finds the subject already soft-deleted,
431+ // and returns the version list via include_deleted::yes.
432+ auto versions = seq.local ().delete_subject_impermanent (subject0).get ();
433+
434+ BOOST_REQUIRE_EQUAL (versions.size (), 1 );
435+ BOOST_CHECK_EQUAL (versions[0 ], version);
436+ BOOST_CHECK_EQUAL (transport.produce_calls (), 1 );
437+ }
0 commit comments