From 57b5058ae364de92cafac9b39680b836a1e55761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 22:33:41 +0200 Subject: [PATCH 01/10] Return append results - Replace append-condition exceptions with explicit success and failure result values so callers can branch without relying on control flow. - Return append status from the SQL append function to distinguish normal condition failures from serialization failures. - Retry serialization failures in the repository and raise a library-level error only after retry exhaustion. - Update adapter contracts, benchmarks, documentation, and tests for the new append result API. --- README.md | 54 +++++++------ db/schema/0.1.0.sql | 11 ++- lib/benchmark/append_non_conflicting_tags.rb | 12 +-- .../concurrent_append_conflicting_tags.rb | 19 +++-- .../concurrent_append_non_conflicting_tags.rb | 15 ++-- lib/en57.rb | 4 +- lib/en57/active_record_adapter.rb | 2 + lib/en57/event_store.rb | 1 - lib/en57/pg_adapter.rb | 16 ++-- lib/en57/repository.rb | 35 ++++++--- lib/en57/sequel_adapter.rb | 2 + test/pg_regress/expected/001_schema.out | 11 ++- test/test_active_record_adapter.rb | 7 ++ test/test_event_store.rb | 17 ++-- test/test_factories.rb | 3 +- test/test_integration.rb | 76 +++++++++--------- test/test_migrator.rb | 3 +- test/test_pg_adapter.rb | 30 +++++-- test/test_repository.rb | 78 ++++++++++++++----- test/test_sequel_adapter.rb | 6 ++ test/test_stress.rb | 6 +- 21 files changed, 272 insertions(+), 136 deletions(-) diff --git a/README.md b/README.md index 645589a..9cd066c 100644 --- a/README.md +++ b/README.md @@ -109,18 +109,21 @@ Example: consume credits only once per account. ```ruby account_scope = event_store.read.with_tag("account:x") -begin - event_store.append( - [ - En57::Event.new( - type: "CreditsUsed", - data: { amount: 100 }, - tags: ["account:x"], - ), - ], - fail_if: account_scope.of_type("CreditsUsed"), - ) -rescue En57::AppendConditionViolated +result = event_store.append( + [ + En57::Event.new( + type: "CreditsUsed", + data: { amount: 100 }, + tags: ["account:x"], + ), + ], + fail_if: account_scope.of_type("CreditsUsed"), +) + +case result +in En57::Success + # credits consumed +in En57::Failure # lost the race; another writer already consumed credits end ``` @@ -144,18 +147,21 @@ Example: ensure no event exists with this email tag before writing. ```ruby email_tag = "email:alice@example.com" -begin - event_store.append( - [ - En57::Event.new( - type: "UserRegistered", - data: { name: "Alice" }, - tags: [email_tag], - ), - ], - fail_if: event_store.read.with_tag(email_tag), - ) -rescue En57::AppendConditionViolated +result = event_store.append( + [ + En57::Event.new( + type: "UserRegistered", + data: { name: "Alice" }, + tags: [email_tag], + ), + ], + fail_if: event_store.read.with_tag(email_tag), +) + +case result +in En57::Success + # user registered +in En57::Failure # email already used end ``` diff --git a/db/schema/0.1.0.sql b/db/schema/0.1.0.sql index 3ec3595..1e3e6e6 100644 --- a/db/schema/0.1.0.sql +++ b/db/schema/0.1.0.sql @@ -26,8 +26,12 @@ CREATE TYPE en57.event AS ( tags text[] ); +CREATE TYPE en57.append_result AS ( + status text +); + CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb) - RETURNS void + RETURNS en57.append_result LANGUAGE plpgsql SET enable_seqscan = OFF AS $$ @@ -67,7 +71,7 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; + RETURN ROW ('append_condition_violated')::en57.append_result; END IF; ELSE IF EXISTS ( @@ -79,7 +83,7 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; + RETURN ROW ('append_condition_violated')::en57.append_result; END IF; END IF; END LOOP; @@ -98,6 +102,7 @@ SELECT FROM unnest(new_events) AS e CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value); + RETURN ROW ('success')::en57.append_result; END; $$; diff --git a/lib/benchmark/append_non_conflicting_tags.rb b/lib/benchmark/append_non_conflicting_tags.rb index 0622a0e..038cff4 100644 --- a/lib/benchmark/append_non_conflicting_tags.rb +++ b/lib/benchmark/append_non_conflicting_tags.rb @@ -23,11 +23,13 @@ def call(measure, retries, run_id) events = @batch_size.times.map { Event.new(type:, tags:) } measure.call do - begin - @event_store.append(events, fail_if: scope.after(position = 0)) - rescue AppendConditionViolated - retries.call - retry + loop do + case @event_store.append(events, fail_if: scope.after(position = 0)) + in Success + break + in Failure + retries.call + end end end end diff --git a/lib/benchmark/concurrent_append_conflicting_tags.rb b/lib/benchmark/concurrent_append_conflicting_tags.rb index 4da0f6a..8e610fe 100644 --- a/lib/benchmark/concurrent_append_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_conflicting_tags.rb @@ -24,14 +24,19 @@ def call(measure, retries, run_id) barrier.wait measure.call do - begin - @event_store.append(events, fail_if: scope.after(position = 0)) - rescue AppendConditionViolated - retries.call - scope.each_with_position do |_event, event_position| - position = event_position + loop do + case @event_store.append( + events, + fail_if: scope.after(position = 0), + ) + in Success + break + in Failure + retries.call + scope.each_with_position do |_event, event_position| + position = event_position + end end - retry end end end diff --git a/lib/benchmark/concurrent_append_non_conflicting_tags.rb b/lib/benchmark/concurrent_append_non_conflicting_tags.rb index abe2d62..54c94be 100644 --- a/lib/benchmark/concurrent_append_non_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_non_conflicting_tags.rb @@ -28,11 +28,16 @@ def call(measure, retries, _run_id) barrier.wait measure.call do - begin - @event_store.append(events, fail_if: scope.after(position = 0)) - rescue AppendConditionViolated - retries.call - retry + loop do + case @event_store.append( + events, + fail_if: scope.after(position = 0), + ) + in Success + break + in Failure + retries.call + end end end end diff --git a/lib/en57.rb b/lib/en57.rb index 42ff441..8a46d09 100644 --- a/lib/en57.rb +++ b/lib/en57.rb @@ -14,7 +14,9 @@ require_relative "en57/configuration" module En57 - AppendConditionViolated = Class.new(StandardError) + Success = Data.define + Failure = Data.define + SerializationError = Class.new(StandardError) def self.configuration = Configuration.instance diff --git a/lib/en57/active_record_adapter.rb b/lib/en57/active_record_adapter.rb index 078ca91..3925e9a 100644 --- a/lib/en57/active_record_adapter.rb +++ b/lib/en57/active_record_adapter.rb @@ -19,6 +19,8 @@ def with_transaction(&block) = run_transaction({}, &block) def with_serializable_transaction(&block) = run_transaction({ isolation: :serializable }, &block) + def serialization_error = ActiveRecord::SerializationFailure + private def run_transaction(options) diff --git a/lib/en57/event_store.rb b/lib/en57/event_store.rb index b29238e..d475f39 100644 --- a/lib/en57/event_store.rb +++ b/lib/en57/event_store.rb @@ -8,7 +8,6 @@ def initialize(repository) def append(events, fail_if: EmptyScope.new) @repository.append(events, fail_if: fail_if.to_query) - self end def read diff --git a/lib/en57/pg_adapter.rb b/lib/en57/pg_adapter.rb index 2140ede..4b5e83e 100644 --- a/lib/en57/pg_adapter.rb +++ b/lib/en57/pg_adapter.rb @@ -19,18 +19,22 @@ def with_transaction(&block) = run_transaction("BEGIN", &block) def with_serializable_transaction(&block) = run_transaction("BEGIN ISOLATION LEVEL SERIALIZABLE", &block) + def serialization_error = PG::TRSerializationFailure + private def run_transaction(begin_statement) with_connection do |connection| connection.exec(begin_statement) - begin - yield connection - rescue StandardError - connection.exec("ROLLBACK") - raise - end + result = + begin + yield connection + rescue StandardError + connection.exec("ROLLBACK") + raise + end connection.exec("COMMIT") + result end end diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index c5e4fb4..0c3ccf5 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -32,23 +32,40 @@ def append(events, fail_if:) :fail_if_events_match ] = fail_if_events_match unless fail_if_events_match.empty? - statement = "SELECT en57.append_events($1::en57.event[], $2::jsonb)" + statement = + "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)" params = [ @array_encoder.encode(event_records), JSON.generate(append_condition), ] - if fail_if_events_match.empty? - @adapter.with_transaction do |connection| - connection.exec_params(statement, params) + attempts_remaining = 3 + begin + row = + if fail_if_events_match.empty? + @adapter.with_transaction do |connection| + connection.exec_params(statement, params) + end + else + @adapter.with_serializable_transaction do |connection| + connection.exec_params(statement, params) + end + end + + case row.first.fetch("status") + when "success" + Success.new + when "append_condition_violated" + Failure.new end - else - @adapter.with_serializable_transaction do |connection| - connection.exec_params(statement, params) + rescue @adapter.serialization_error + if attempts_remaining.positive? + attempts_remaining -= 1 + retry end + + raise SerializationError end - rescue PG::RaiseException, PG::TRSerializationFailure - raise AppendConditionViolated end def read(query) diff --git a/lib/en57/sequel_adapter.rb b/lib/en57/sequel_adapter.rb index daa48be..89d6419 100644 --- a/lib/en57/sequel_adapter.rb +++ b/lib/en57/sequel_adapter.rb @@ -18,6 +18,8 @@ def with_transaction(&block) = run_transaction({}, &block) def with_serializable_transaction(&block) = run_transaction({ isolation: :serializable }, &block) + def serialization_error = PG::TRSerializationFailure + private def run_transaction(options) diff --git a/test/pg_regress/expected/001_schema.out b/test/pg_regress/expected/001_schema.out index 3ec3595..1e3e6e6 100644 --- a/test/pg_regress/expected/001_schema.out +++ b/test/pg_regress/expected/001_schema.out @@ -26,8 +26,12 @@ CREATE TYPE en57.event AS ( tags text[] ); +CREATE TYPE en57.append_result AS ( + status text +); + CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb) - RETURNS void + RETURNS en57.append_result LANGUAGE plpgsql SET enable_seqscan = OFF AS $$ @@ -67,7 +71,7 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; + RETURN ROW ('append_condition_violated')::en57.append_result; END IF; ELSE IF EXISTS ( @@ -79,7 +83,7 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; + RETURN ROW ('append_condition_violated')::en57.append_result; END IF; END IF; END LOOP; @@ -98,6 +102,7 @@ SELECT FROM unnest(new_events) AS e CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value); + RETURN ROW ('success')::en57.append_result; END; $$; diff --git a/test/test_active_record_adapter.rb b/test/test_active_record_adapter.rb index 70ccc99..3a45a0b 100644 --- a/test/test_active_record_adapter.rb +++ b/test/test_active_record_adapter.rb @@ -48,6 +48,13 @@ def test_with_serializable_transaction_wraps_block_in_transaction end end + def test_serialization_error_returns_active_record_serialization_failure + with_mock_adapter do |_pool, _connection, _raw_connection, adapter| + assert_same ActiveRecord::SerializationFailure, + adapter.serialization_error + end + end + def test_with_serializable_transaction_reraises_block_errors error = RuntimeError.new("boom") diff --git a/test/test_event_store.rb b/test/test_event_store.rb index 0270029..0f92989 100644 --- a/test/test_event_store.rb +++ b/test/test_event_store.rb @@ -10,7 +10,7 @@ def test_append_event event = Event.new(type: "CreditsToppedUp") with_repository do |repository| - repository.expect(:append, nil, [[event]], fail_if: Query.all) + repository.expect(:append, Success.new, [[event]], fail_if: Query.all) EventStore.new(repository).append([event]) end @@ -29,15 +29,13 @@ def test_read_returns_scope_for_query_all end end - def test_return_self_from_append + def test_returns_success_from_append event = Event.new(type: "CreditsToppedUp") with_repository do |repository| - repository.expect(:append, nil, [[event]], fail_if: Query.all) + repository.expect(:append, Success.new, [[event]], fail_if: Query.all) - event_store = EventStore.new(repository) - - assert_equal(event_store, event_store.append([event])) + assert_equal(Success.new, EventStore.new(repository).append([event])) end end @@ -47,7 +45,12 @@ def test_append_accepts_scope_for_fail_if with_repository do |repository| event_store = EventStore.new(repository) fail_if = event_store.read.with_tag("order_id:123") - repository.expect(:append, nil, [[event]], fail_if: fail_if.to_query) + repository.expect( + :append, + Success.new, + [[event]], + fail_if: fail_if.to_query, + ) event_store.append([event], fail_if:) end diff --git a/test/test_factories.rb b/test/test_factories.rb index d7f0e6d..e352a0c 100644 --- a/test/test_factories.rb +++ b/test/test_factories.rb @@ -44,7 +44,8 @@ def test_event_store_does_not_conflict_with_public_schema_tables def assert_round_trip(event_store) event = Event.new(type: "FactoryTested") - assert_equal [event], event_store.append([event]).read.each.to_a + assert_equal Success.new, event_store.append([event]) + assert_equal [event], event_store.read.each.to_a end end end diff --git a/test/test_integration.rb b/test/test_integration.rb index 2fea4df..90429de 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -24,7 +24,8 @@ class TestIntegration < IntegrationTest ), ] - assert_equal(events, event_store.append(events).read.each.to_a) + assert_equal(Success.new, event_store.append(events)) + assert_equal(events, event_store.read.each.to_a) end end @@ -35,9 +36,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] + assert_equal(Success.new, event_store.append(events)) assert_equal( events.map.with_index(1) { |event, position| [event, position] }, - event_store.append(events).read.each_with_position.to_a, + event_store.read.each_with_position.to_a, ) end end @@ -45,26 +47,30 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_fail_if_and_no_matches_appends_events" do with_event_store(factory) do |event_store| event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append( - [event], - fail_if: event_store.read.of_type("PriceChanged"), + assert_equal( + Success.new, + event_store.append( + [event], + fail_if: event_store.read.of_type("PriceChanged"), + ), ) assert_equal([event], event_store.read.each.to_a) end end - define_method "test_#{name}_append_with_fail_if_and_matches_raises_append_condition_violated" do + define_method "test_#{name}_append_with_fail_if_and_matches_returns_failure" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) + assert_equal(Success.new, event_store.append([existing_event])) - assert_raises(AppendConditionViolated) do + assert_equal( + Failure.new, event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced"), - ) - end + ), + ) assert_equal([existing_event], event_store.read.each.to_a) end @@ -73,10 +79,13 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_after_ignores_matches_at_or_before_cutoff" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) - event_store.append( - [Event.new(id: ids[1], type: "ShipmentScheduled")], - fail_if: event_store.read.of_type("OrderPlaced").after(1), + assert_equal(Success.new, event_store.append([existing_event])) + assert_equal( + Success.new, + event_store.append( + [Event.new(id: ids[1], type: "ShipmentScheduled")], + fail_if: event_store.read.of_type("OrderPlaced").after(1), + ), ) assert_equal( @@ -86,17 +95,18 @@ class TestIntegration < IntegrationTest end end - define_method "test_#{name}_append_with_after_raises_if_match_is_after_cutoff" do + define_method "test_#{name}_append_with_after_returns_failure_if_match_is_after_cutoff" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) + assert_equal(Success.new, event_store.append([existing_event])) - assert_raises(AppendConditionViolated) do + assert_equal( + Failure.new, event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced").after(0), - ) - end + ), + ) assert_equal([existing_event], event_store.read.each.to_a) end @@ -105,7 +115,7 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_duplicate_id_raises_unique_violation" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) + assert_equal(Success.new, event_store.append([existing_event])) assert_raises(PG::UniqueViolation) do event_store.append( @@ -122,7 +132,8 @@ class TestIntegration < IntegrationTest event = Event.new(id: ids[0], type: "OrderPlaced", tags: ["order_id:123"]) - assert_equal([event], event_store.append([event]).read.each.to_a) + assert_equal(Success.new, event_store.append([event])) + assert_equal([event], event_store.read.each.to_a) end end @@ -133,10 +144,8 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] - assert_equal( - events.drop(1), - event_store.append(events).read.after(1).each.to_a, - ) + assert_equal(Success.new, event_store.append(events)) + assert_equal(events.drop(1), event_store.read.after(1).each.to_a) end end @@ -155,10 +164,10 @@ class TestIntegration < IntegrationTest ), ] + assert_equal(Success.new, event_store.append(events)) assert_equal( events.take(1), event_store - .append(events) .read .with_tag("order_id:123", "tenant_id:acme") .each @@ -174,9 +183,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] + assert_equal(Success.new, event_store.append(events)) assert_equal( events.take(1), - event_store.append(events).read.of_type("OrderPlaced").each.to_a, + event_store.read.of_type("OrderPlaced").each.to_a, ) end end @@ -189,14 +199,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[2], type: "OrderCancelled"), ] + assert_equal(Success.new, event_store.append(events)) assert_equal( events.drop(1), - event_store - .append(events) - .read - .of_type("OrderPlaced", "OrderCancelled") - .each - .to_a, + event_store.read.of_type("OrderPlaced", "OrderCancelled").each.to_a, ) end end @@ -209,10 +215,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[2], type: "PriceChanged", tags: ["order_id:123"]), ] + assert_equal(Success.new, event_store.append(events)) assert_equal( events.take(1), event_store - .append(events) .read .of_type("OrderPlaced") .with_tag("order_id:123") @@ -235,7 +241,7 @@ class TestIntegration < IntegrationTest tags: ["order_id:123"], ), ] - event_store.append(events) + assert_equal(Success.new, event_store.append(events)) orders = event_store.read.of_type("OrderPlaced").with_tag("order_id:123") diff --git a/test/test_migrator.rb b/test/test_migrator.rb index 952057d..f06468d 100644 --- a/test/test_migrator.rb +++ b/test/test_migrator.rb @@ -55,7 +55,8 @@ def test_migrate_installs_schema_used_by_event_store ), ) - assert_equal [event], event_store.append([event]).read.each.to_a + assert_equal Success.new, event_store.append([event]) + assert_equal [event], event_store.read.each.to_a ensure connection&.close end diff --git a/test/test_pg_adapter.rb b/test/test_pg_adapter.rb index e2968dd..8ab560b 100644 --- a/test/test_pg_adapter.rb +++ b/test/test_pg_adapter.rb @@ -24,6 +24,10 @@ def test_for_pool_uses_connection_pool assert_same connection, adapter.with_connection { |conn| conn } end + def test_for_connection_returns_adapter + assert_instance_of(PgAdapter, PgAdapter.for_connection(Object.new)) + end + def test_for_connection_synchronizes_access connection = Object.new adapter = PgAdapter.for_connection(connection) @@ -60,10 +64,12 @@ def test_with_serializable_transaction_commits_on_success ) connection.expect(:exec, nil, ["COMMIT"]) - adapter.with_serializable_transaction do |conn| - assert_equal :written, - conn.exec_params("SELECT en57.append_events()", []) - end + assert_equal( + :written, + adapter.with_serializable_transaction do |conn| + conn.exec_params("SELECT en57.append_events()", []) + end, + ) end end @@ -94,6 +100,12 @@ def test_with_serializable_transaction_rolls_back_on_failure assert_same error, raised end + def test_serialization_error_returns_pg_serialization_failure + with_mock_adapter do |_connection, adapter| + assert_same PG::TRSerializationFailure, adapter.serialization_error + end + end + def test_with_transaction_commits_on_success with_mock_adapter do |connection, adapter| connection.expect(:exec, nil, ["BEGIN"]) @@ -104,10 +116,12 @@ def test_with_transaction_commits_on_success ) connection.expect(:exec, nil, ["COMMIT"]) - adapter.with_transaction do |conn| - assert_equal :written, - conn.exec_params("SELECT en57.append_events()", []) - end + assert_equal( + :written, + adapter.with_transaction do |conn| + conn.exec_params("SELECT en57.append_events()", []) + end, + ) end end diff --git a/test/test_repository.rb b/test/test_repository.rb index 8225692..79d4957 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -34,9 +34,9 @@ def test_append_without_fail_if_uses_plain_transaction connection.expect(:exec, nil, ["BEGIN"]) connection.expect( :exec_params, - nil, + success_result, [ - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", [expected_events, "{}"], ], ) @@ -78,9 +78,9 @@ def test_append_persists_empty_event_data_as_null connection.expect(:exec, nil, ["BEGIN"]) connection.expect( :exec_params, - nil, + success_result, [ - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", [expected_events, "{}"], ], ) @@ -101,9 +101,9 @@ def test_append_passes_fail_if_and_after_conditions connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) connection.expect( :exec_params, - nil, + success_result, [ - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", [ array_encoder.encode([]), '{"fail_if_events_match":[{"types":["OrderPlaced"],"after":42}]}', @@ -134,15 +134,15 @@ def test_append_passes_fail_if_and_after_conditions def test_append_rolls_back_transaction_on_pg_failure with_connection do |connection| connection.expect(:exec, nil, ["BEGIN"]) - connection.expect(:exec, nil, ["ROLLBACK"]) connection.expect(:exec_params, nil) do |sql, params| assert_equal( - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", sql, ) assert_equal([array_encoder.encode([]), "{}"], params) raise PG::Error, "boom" end + connection.expect(:exec, nil, ["ROLLBACK"]) assert_raises(PG::Error) do Repository.new( @@ -156,8 +156,8 @@ def test_append_rolls_back_transaction_on_pg_failure def test_append_rolls_back_transaction_on_failure with_connection do |connection| connection.expect(:exec, nil, ["BEGIN"]) - connection.expect(:exec, nil, ["ROLLBACK"]) connection.expect(:exec_params, nil) { raise RuntimeError, "boom" } + connection.expect(:exec, nil, ["ROLLBACK"]) assert_raises(RuntimeError) do Repository.new( @@ -513,35 +513,63 @@ def test_read_events_filtered_by_type end end - def test_append_raises_append_condition_violated_from_pg_error_sqlstate + def test_append_returns_failure_when_sql_status_is_append_condition_violated with_connection do |connection| connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) - connection.expect(:exec, nil, ["ROLLBACK"]) - connection.expect(:exec_params, nil) { raise(PG::RaiseException.new) } + connection.expect(:exec_params, failure_result, append_args) + connection.expect(:exec, nil, ["COMMIT"]) - assert_raises(AppendConditionViolated) do + assert_equal( + Failure.new, Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria) - end + ).append([], fail_if: fail_if_with_criteria), + ) end end - def test_append_raises_append_condition_violated_from_serialization_failure_result_sqlstate + def test_append_retries_on_serialization_error_and_succeeds with_connection do |connection| connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) - connection.expect(:exec, nil, ["ROLLBACK"]) connection.expect(:exec_params, nil) do raise PG::TRSerializationFailure.new end + connection.expect(:exec, nil, ["ROLLBACK"]) + connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) + connection.expect(:exec_params, success_result, append_args) + connection.expect(:exec, nil, ["COMMIT"]) + + assert_equal( + Success.new, + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append([], fail_if: fail_if_with_criteria), + ) + end + end + + def test_append_raises_serialization_error_after_four_attempts + attempts = 0 + + with_connection do |connection| + 4.times do + connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) + connection.expect(:exec_params, nil) do + attempts += 1 + raise PG::TRSerializationFailure.new + end + connection.expect(:exec, nil, ["ROLLBACK"]) + end - assert_raises(AppendConditionViolated) do + assert_raises(SerializationError) do Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, ).append([], fail_if: fail_if_with_criteria) end + assert_equal(4, attempts) end end @@ -560,6 +588,20 @@ def array_encoder = @array_encoder ||= PG::TextEncoder::Array.new def record_encoder = @record_encoder ||= PG::TextEncoder::Record.new + def success_result = [{ "status" => "success" }] + + def failure_result = [{ "status" => "append_condition_violated" }] + + def append_args + [ + "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", + [ + array_encoder.encode([]), + '{"fail_if_events_match":[{"types":["OrderPlaced"]}]}', + ], + ] + end + def fail_if_with_criteria Query.new( criteria: [Query::Criteria.new(types: ["OrderPlaced"], tags: [])], diff --git a/test/test_sequel_adapter.rb b/test/test_sequel_adapter.rb index fca176b..e07c7d1 100644 --- a/test/test_sequel_adapter.rb +++ b/test/test_sequel_adapter.rb @@ -73,6 +73,12 @@ def test_with_serializable_transaction_unwraps_pg_error_subclasses end end + def test_serialization_error_returns_pg_serialization_failure + with_mock_adapter do |_database, _connection, adapter| + assert_same PG::TRSerializationFailure, adapter.serialization_error + end + end + def test_with_transaction_synchronizes_inside_transaction with_mock_adapter do |database, connection, adapter| database.expect(:transaction, :committed) { |&block| block.call } diff --git a/test/test_stress.rb b/test/test_stress.rb index ff35966..2ed88ec 100644 --- a/test/test_stress.rb +++ b/test/test_stress.rb @@ -42,14 +42,16 @@ class TestStress < IntegrationTest ], fail_if: account_scope.of_type("CreditsUsed"), ) - rescue AppendConditionViolated => e + rescue SerializationError => e e end end + results = threads.map(&:value) + assert_equal(1, results.select { Success === it }.size) assert_equal( (concurrency - 1), - threads.map(&:value).select { AppendConditionViolated === it }.size, + results.select { Failure === it || SerializationError === it }.size, ) assert_equal( 1, From 5efefe91b64706fff90ecf5df071d4a6f94e2fd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 22:40:09 +0200 Subject: [PATCH 02/10] Rename append retry error - Use a domain-specific error name that describes the actual failure: append retries were exhausted after serialization conflicts. --- lib/en57.rb | 2 +- lib/en57/repository.rb | 2 +- test/test_repository.rb | 2 +- test/test_stress.rb | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/en57.rb b/lib/en57.rb index 8a46d09..8ef0864 100644 --- a/lib/en57.rb +++ b/lib/en57.rb @@ -16,7 +16,7 @@ module En57 Success = Data.define Failure = Data.define - SerializationError = Class.new(StandardError) + AppendRetriesExhausted = Class.new(StandardError) def self.configuration = Configuration.instance diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index 0c3ccf5..0d4c72e 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -64,7 +64,7 @@ def append(events, fail_if:) retry end - raise SerializationError + raise AppendRetriesExhausted end end diff --git a/test/test_repository.rb b/test/test_repository.rb index 79d4957..cf6355a 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -563,7 +563,7 @@ def test_append_raises_serialization_error_after_four_attempts connection.expect(:exec, nil, ["ROLLBACK"]) end - assert_raises(SerializationError) do + assert_raises(AppendRetriesExhausted) do Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, diff --git a/test/test_stress.rb b/test/test_stress.rb index 2ed88ec..6441ecb 100644 --- a/test/test_stress.rb +++ b/test/test_stress.rb @@ -42,7 +42,7 @@ class TestStress < IntegrationTest ], fail_if: account_scope.of_type("CreditsUsed"), ) - rescue SerializationError => e + rescue AppendRetriesExhausted => e e end end @@ -51,7 +51,7 @@ class TestStress < IntegrationTest assert_equal(1, results.select { Success === it }.size) assert_equal( (concurrency - 1), - results.select { Failure === it || SerializationError === it }.size, + results.select { Failure === it || AppendRetriesExhausted === it }.size, ) assert_equal( 1, From 006801bf16afc4d06a18ca77cacc3ccefe17669a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 22:42:36 +0200 Subject: [PATCH 03/10] Configure append retries - Make append serialization retries configurable so applications can tune contention handling for their workload. - Raise the default retry count to 9 to allow ten total attempts before surfacing an append-retries-exhausted error. --- lib/en57/configuration.rb | 3 ++- lib/en57/repository.rb | 2 +- test/test_en57.rb | 10 +++++++++- test/test_repository.rb | 37 ++++++++++++++++++++++++++++++++++--- test/test_stress.rb | 4 +++- 5 files changed, 49 insertions(+), 7 deletions(-) diff --git a/lib/en57/configuration.rb b/lib/en57/configuration.rb index 1670cbf..00aa196 100644 --- a/lib/en57/configuration.rb +++ b/lib/en57/configuration.rb @@ -6,9 +6,10 @@ module En57 class Configuration include Singleton - attr_accessor :serializer + attr_accessor :append_retries, :serializer def initialize + @append_retries = 9 @serializer = JsonSerializer.new end end diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index 0d4c72e..0d1b02c 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -39,7 +39,7 @@ def append(events, fail_if:) JSON.generate(append_condition), ] - attempts_remaining = 3 + attempts_remaining = En57.configuration.append_retries begin row = if fail_if_events_match.empty? diff --git a/test/test_en57.rb b/test/test_en57.rb index 3a0f030..eadbeb8 100644 --- a/test/test_en57.rb +++ b/test/test_en57.rb @@ -24,6 +24,10 @@ def test_requiring_en57_without_active_record_skips_active_record_adapter RUBY end + def test_configuration_default_append_retries + assert_equal 9, En57.configuration.append_retries + end + def test_configuration_default_serializer assert_kind_of JsonSerializer, En57.configuration.serializer end @@ -31,8 +35,12 @@ def test_configuration_default_serializer def test_configure with_empty_configuration do serializer = Object.new - En57.configure { |c| c.serializer = serializer } + En57.configure do |c| + c.append_retries = 2 + c.serializer = serializer + end + assert_equal 2, En57.configuration.append_retries assert_equal serializer, En57.configuration.serializer end end diff --git a/test/test_repository.rb b/test/test_repository.rb index cf6355a..3aefccf 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -550,11 +550,11 @@ def test_append_retries_on_serialization_error_and_succeeds end end - def test_append_raises_serialization_error_after_four_attempts + def test_append_raises_serialization_error_after_default_retries attempts = 0 with_connection do |connection| - 4.times do + 10.times do connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) connection.expect(:exec_params, nil) do attempts += 1 @@ -569,10 +569,41 @@ def test_append_raises_serialization_error_after_four_attempts JsonSerializer.new, ).append([], fail_if: fail_if_with_criteria) end - assert_equal(4, attempts) + assert_equal(10, attempts) end end + def test_append_raises_serialization_error_after_configured_retries + attempts = 0 + + En57 + .configuration + .stub(:append_retries, 1) do + with_connection do |connection| + 2.times do + connection.expect( + :exec, + nil, + ["BEGIN ISOLATION LEVEL SERIALIZABLE"], + ) + connection.expect(:exec_params, nil) do + attempts += 1 + raise PG::TRSerializationFailure.new + end + connection.expect(:exec, nil, ["ROLLBACK"]) + end + + assert_raises(AppendRetriesExhausted) do + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append([], fail_if: fail_if_with_criteria) + end + assert_equal(2, attempts) + end + end + end + private def ids = @ids ||= Hash.new { |h, k| h[k] = SecureRandom.uuid_v7 } diff --git a/test/test_stress.rb b/test/test_stress.rb index 6441ecb..9d9eb5e 100644 --- a/test/test_stress.rb +++ b/test/test_stress.rb @@ -51,7 +51,9 @@ class TestStress < IntegrationTest assert_equal(1, results.select { Success === it }.size) assert_equal( (concurrency - 1), - results.select { Failure === it || AppendRetriesExhausted === it }.size, + results + .select { Failure === it || AppendRetriesExhausted === it } + .size, ) assert_equal( 1, From d80ea2b6f8a13dea0b17340de63758d2d0ad539f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 22:55:21 +0200 Subject: [PATCH 04/10] Return append position - Include the last appended event position in successful append results so callers can continue from the write boundary without issuing a read. - Return the position from the SQL append result alongside the status to keep the Ruby result derived from the database write. - Preserve nil for successful empty appends because no event position is created in that case. --- README.md | 8 ++-- db/schema/0.1.0.sql | 49 ++++++++++++++++--------- lib/en57.rb | 2 +- lib/en57/repository.rb | 4 +- test/pg_regress/expected/001_schema.out | 49 ++++++++++++++++--------- test/test_event_store.rb | 21 +++++++++-- test/test_factories.rb | 2 +- test/test_integration.rb | 42 +++++++++++++-------- test/test_migrator.rb | 2 +- test/test_repository.rb | 49 +++++++++++++------------ 10 files changed, 143 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index 9cd066c..d32c00d 100644 --- a/README.md +++ b/README.md @@ -121,8 +121,8 @@ result = event_store.append( ) case result -in En57::Success - # credits consumed +in En57::Success(position:) + # credits consumed at event position in En57::Failure # lost the race; another writer already consumed credits end @@ -159,8 +159,8 @@ result = event_store.append( ) case result -in En57::Success - # user registered +in En57::Success(position:) + # user registered at event position in En57::Failure # email already used end diff --git a/db/schema/0.1.0.sql b/db/schema/0.1.0.sql index 1e3e6e6..67c05f1 100644 --- a/db/schema/0.1.0.sql +++ b/db/schema/0.1.0.sql @@ -27,7 +27,8 @@ CREATE TYPE en57.event AS ( ); CREATE TYPE en57.append_result AS ( - status text + status text, + "position" bigint ); CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb) @@ -43,6 +44,7 @@ DECLARE req_types text[]; req_tags text[]; req_after bigint; + appended_position bigint; BEGIN FOREACH criterion IN ARRAY criteria LOOP req_after := (criterion ->> 'after')::bigint; @@ -71,7 +73,8 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated')::en57.append_result; + RETURN ROW ('append_condition_violated', + NULL)::en57.append_result; END IF; ELSE IF EXISTS ( @@ -83,26 +86,38 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated')::en57.append_result; + RETURN ROW ('append_condition_violated', + NULL)::en57.append_result; END IF; END IF; END LOOP; + WITH inserted_events AS ( INSERT INTO en57.events (id, type, data, meta) -SELECT - e.id, - e.type, - e.data, - e.meta -FROM - unnest(new_events) AS e; -INSERT INTO en57.tags (event_id, value) -SELECT - e.id, - t.value -FROM - unnest(new_events) AS e + SELECT + e.id, + e.type, + e.data, + e.meta + FROM + unnest(new_events) AS e + RETURNING + "position" +) + SELECT + max("position") + FROM + inserted_events + INTO + appended_position; + INSERT INTO en57.tags (event_id, value) + SELECT + e.id, + t.value + FROM + unnest(new_events) AS e CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value); - RETURN ROW ('success')::en57.append_result; + RETURN ROW ('success', + appended_position)::en57.append_result; END; $$; diff --git a/lib/en57.rb b/lib/en57.rb index 8ef0864..171dcc0 100644 --- a/lib/en57.rb +++ b/lib/en57.rb @@ -14,7 +14,7 @@ require_relative "en57/configuration" module En57 - Success = Data.define + Success = Data.define(:position) Failure = Data.define AppendRetriesExhausted = Class.new(StandardError) diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index 0d1b02c..2e5ab05 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -33,7 +33,7 @@ def append(events, fail_if:) ] = fail_if_events_match unless fail_if_events_match.empty? statement = - "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)" + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)" params = [ @array_encoder.encode(event_records), JSON.generate(append_condition), @@ -54,7 +54,7 @@ def append(events, fail_if:) case row.first.fetch("status") when "success" - Success.new + Success.new(position: row.first.fetch("position")&.then { Integer(it) }) when "append_condition_violated" Failure.new end diff --git a/test/pg_regress/expected/001_schema.out b/test/pg_regress/expected/001_schema.out index 1e3e6e6..67c05f1 100644 --- a/test/pg_regress/expected/001_schema.out +++ b/test/pg_regress/expected/001_schema.out @@ -27,7 +27,8 @@ CREATE TYPE en57.event AS ( ); CREATE TYPE en57.append_result AS ( - status text + status text, + "position" bigint ); CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb) @@ -43,6 +44,7 @@ DECLARE req_types text[]; req_tags text[]; req_after bigint; + appended_position bigint; BEGIN FOREACH criterion IN ARRAY criteria LOOP req_after := (criterion ->> 'after')::bigint; @@ -71,7 +73,8 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated')::en57.append_result; + RETURN ROW ('append_condition_violated', + NULL)::en57.append_result; END IF; ELSE IF EXISTS ( @@ -83,26 +86,38 @@ BEGIN OR e.position > req_after) AND (criterion -> 'types' IS NULL OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated')::en57.append_result; + RETURN ROW ('append_condition_violated', + NULL)::en57.append_result; END IF; END IF; END LOOP; + WITH inserted_events AS ( INSERT INTO en57.events (id, type, data, meta) -SELECT - e.id, - e.type, - e.data, - e.meta -FROM - unnest(new_events) AS e; -INSERT INTO en57.tags (event_id, value) -SELECT - e.id, - t.value -FROM - unnest(new_events) AS e + SELECT + e.id, + e.type, + e.data, + e.meta + FROM + unnest(new_events) AS e + RETURNING + "position" +) + SELECT + max("position") + FROM + inserted_events + INTO + appended_position; + INSERT INTO en57.tags (event_id, value) + SELECT + e.id, + t.value + FROM + unnest(new_events) AS e CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value); - RETURN ROW ('success')::en57.append_result; + RETURN ROW ('success', + appended_position)::en57.append_result; END; $$; diff --git a/test/test_event_store.rb b/test/test_event_store.rb index 0f92989..a8e893c 100644 --- a/test/test_event_store.rb +++ b/test/test_event_store.rb @@ -10,7 +10,12 @@ def test_append_event event = Event.new(type: "CreditsToppedUp") with_repository do |repository| - repository.expect(:append, Success.new, [[event]], fail_if: Query.all) + repository.expect( + :append, + Success.new(position: 1), + [[event]], + fail_if: Query.all, + ) EventStore.new(repository).append([event]) end @@ -33,9 +38,17 @@ def test_returns_success_from_append event = Event.new(type: "CreditsToppedUp") with_repository do |repository| - repository.expect(:append, Success.new, [[event]], fail_if: Query.all) + repository.expect( + :append, + Success.new(position: 1), + [[event]], + fail_if: Query.all, + ) - assert_equal(Success.new, EventStore.new(repository).append([event])) + assert_equal( + Success.new(position: 1), + EventStore.new(repository).append([event]), + ) end end @@ -47,7 +60,7 @@ def test_append_accepts_scope_for_fail_if fail_if = event_store.read.with_tag("order_id:123") repository.expect( :append, - Success.new, + Success.new(position: 1), [[event]], fail_if: fail_if.to_query, ) diff --git a/test/test_factories.rb b/test/test_factories.rb index e352a0c..cb2f92c 100644 --- a/test/test_factories.rb +++ b/test/test_factories.rb @@ -44,7 +44,7 @@ def test_event_store_does_not_conflict_with_public_schema_tables def assert_round_trip(event_store) event = Event.new(type: "FactoryTested") - assert_equal Success.new, event_store.append([event]) + assert_equal Success.new(position: 1), event_store.append([event]) assert_equal [event], event_store.read.each.to_a end end diff --git a/test/test_integration.rb b/test/test_integration.rb index 90429de..1f3a788 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -24,7 +24,7 @@ class TestIntegration < IntegrationTest ), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal(events, event_store.read.each.to_a) end end @@ -36,7 +36,7 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal( events.map.with_index(1) { |event, position| [event, position] }, event_store.read.each_with_position.to_a, @@ -48,7 +48,7 @@ class TestIntegration < IntegrationTest with_event_store(factory) do |event_store| event = Event.new(id: ids[0], type: "OrderPlaced") assert_equal( - Success.new, + Success.new(position: 1), event_store.append( [event], fail_if: event_store.read.of_type("PriceChanged"), @@ -62,7 +62,10 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_fail_if_and_matches_returns_failure" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - assert_equal(Success.new, event_store.append([existing_event])) + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) assert_equal( Failure.new, @@ -79,9 +82,12 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_after_ignores_matches_at_or_before_cutoff" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - assert_equal(Success.new, event_store.append([existing_event])) assert_equal( - Success.new, + Success.new(position: 1), + event_store.append([existing_event]), + ) + assert_equal( + Success.new(position: 2), event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced").after(1), @@ -98,7 +104,10 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_after_returns_failure_if_match_is_after_cutoff" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - assert_equal(Success.new, event_store.append([existing_event])) + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) assert_equal( Failure.new, @@ -115,7 +124,10 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_duplicate_id_raises_unique_violation" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - assert_equal(Success.new, event_store.append([existing_event])) + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) assert_raises(PG::UniqueViolation) do event_store.append( @@ -132,7 +144,7 @@ class TestIntegration < IntegrationTest event = Event.new(id: ids[0], type: "OrderPlaced", tags: ["order_id:123"]) - assert_equal(Success.new, event_store.append([event])) + assert_equal(Success.new(position: 1), event_store.append([event])) assert_equal([event], event_store.read.each.to_a) end end @@ -144,7 +156,7 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal(events.drop(1), event_store.read.after(1).each.to_a) end end @@ -164,7 +176,7 @@ class TestIntegration < IntegrationTest ), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal( events.take(1), event_store @@ -183,7 +195,7 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal( events.take(1), event_store.read.of_type("OrderPlaced").each.to_a, @@ -199,7 +211,7 @@ class TestIntegration < IntegrationTest Event.new(id: ids[2], type: "OrderCancelled"), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 3), event_store.append(events)) assert_equal( events.drop(1), event_store.read.of_type("OrderPlaced", "OrderCancelled").each.to_a, @@ -215,7 +227,7 @@ class TestIntegration < IntegrationTest Event.new(id: ids[2], type: "PriceChanged", tags: ["order_id:123"]), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 3), event_store.append(events)) assert_equal( events.take(1), event_store @@ -241,7 +253,7 @@ class TestIntegration < IntegrationTest tags: ["order_id:123"], ), ] - assert_equal(Success.new, event_store.append(events)) + assert_equal(Success.new(position: 5), event_store.append(events)) orders = event_store.read.of_type("OrderPlaced").with_tag("order_id:123") diff --git a/test/test_migrator.rb b/test/test_migrator.rb index f06468d..db74e45 100644 --- a/test/test_migrator.rb +++ b/test/test_migrator.rb @@ -55,7 +55,7 @@ def test_migrate_installs_schema_used_by_event_store ), ) - assert_equal Success.new, event_store.append([event]) + assert_equal Success.new(position: 1), event_store.append([event]) assert_equal [event], event_store.read.each.to_a ensure connection&.close diff --git a/test/test_repository.rb b/test/test_repository.rb index 3aefccf..f2789b5 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -36,7 +36,7 @@ def test_append_without_fail_if_uses_plain_transaction :exec_params, success_result, [ - "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [expected_events, "{}"], ], ) @@ -80,7 +80,7 @@ def test_append_persists_empty_event_data_as_null :exec_params, success_result, [ - "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [expected_events, "{}"], ], ) @@ -101,9 +101,9 @@ def test_append_passes_fail_if_and_after_conditions connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) connection.expect( :exec_params, - success_result, + [{ "status" => "success", "position" => nil }], [ - "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [ array_encoder.encode([]), '{"fail_if_events_match":[{"types":["OrderPlaced"],"after":42}]}', @@ -112,21 +112,24 @@ def test_append_passes_fail_if_and_after_conditions ) connection.expect(:exec, nil, ["COMMIT"]) - Repository.new( - PgAdapter.for_connection(connection), - JsonSerializer.new, - ).append( - [], - fail_if: - Query.new( - criteria: [ - Query::Criteria.new( - types: ["OrderPlaced"], - tags: [], - after: 42, - ), - ], - ), + assert_equal( + Success.new(position: nil), + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append( + [], + fail_if: + Query.new( + criteria: [ + Query::Criteria.new( + types: ["OrderPlaced"], + tags: [], + after: 42, + ), + ], + ), + ), ) end end @@ -136,7 +139,7 @@ def test_append_rolls_back_transaction_on_pg_failure connection.expect(:exec, nil, ["BEGIN"]) connection.expect(:exec_params, nil) do |sql, params| assert_equal( - "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", sql, ) assert_equal([array_encoder.encode([]), "{}"], params) @@ -541,7 +544,7 @@ def test_append_retries_on_serialization_error_and_succeeds connection.expect(:exec, nil, ["COMMIT"]) assert_equal( - Success.new, + Success.new(position: 1), Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, @@ -619,13 +622,13 @@ def array_encoder = @array_encoder ||= PG::TextEncoder::Array.new def record_encoder = @record_encoder ||= PG::TextEncoder::Record.new - def success_result = [{ "status" => "success" }] + def success_result = [{ "status" => "success", "position" => "1" }] def failure_result = [{ "status" => "append_condition_violated" }] def append_args [ - "SELECT status FROM en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [ array_encoder.encode([]), '{"fail_if_events_match":[{"types":["OrderPlaced"]}]}', From bcd61dd12a8cec709eaa23f86b552b530431c191 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 22:58:40 +0200 Subject: [PATCH 05/10] Skip empty appends - Return a successful append result without opening a transaction when there are no events to write. - Avoid invoking append-condition checks for empty writes because no database state can change. --- lib/en57/repository.rb | 4 +++- test/test_repository.rb | 50 +++++++++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 13 deletions(-) diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index 2e5ab05..2297259 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -13,6 +13,8 @@ def initialize(adapter, serializer = En57.configuration.serializer) end def append(events, fail_if:) + return Success.new(position: nil) if events.empty? + event_records = events.map do |event| serialized, description = @serializer.dump(event.data) @@ -54,7 +56,7 @@ def append(events, fail_if:) case row.first.fetch("status") when "success" - Success.new(position: row.first.fetch("position")&.then { Integer(it) }) + Success.new(position: row.first.fetch("position").then { Integer(it) }) when "append_condition_violated" Failure.new end diff --git a/test/test_repository.rb b/test/test_repository.rb index f2789b5..123dd1e 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -97,15 +97,20 @@ def test_append_persists_empty_event_data_as_null end def test_append_passes_fail_if_and_after_conditions + expected_events = + array_encoder.encode( + [record_encoder.encode([ids[0], "OrderPaid", nil, nil, "{}"])], + ) + with_connection do |connection| connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) connection.expect( :exec_params, - [{ "status" => "success", "position" => nil }], + success_result, [ "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [ - array_encoder.encode([]), + expected_events, '{"fail_if_events_match":[{"types":["OrderPlaced"],"after":42}]}', ], ], @@ -113,12 +118,12 @@ def test_append_passes_fail_if_and_after_conditions connection.expect(:exec, nil, ["COMMIT"]) assert_equal( - Success.new(position: nil), + Success.new(position: 1), Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, ).append( - [], + [Event.new(id: ids[0], type: "OrderPaid")], fail_if: Query.new( criteria: [ @@ -134,6 +139,18 @@ def test_append_passes_fail_if_and_after_conditions end end + def test_append_short_circuits_empty_event_set + with_connection do |connection| + assert_equal( + Success.new(position: nil), + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append([], fail_if: fail_if_with_criteria), + ) + end + end + def test_append_rolls_back_transaction_on_pg_failure with_connection do |connection| connection.expect(:exec, nil, ["BEGIN"]) @@ -142,7 +159,10 @@ def test_append_rolls_back_transaction_on_pg_failure "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", sql, ) - assert_equal([array_encoder.encode([]), "{}"], params) + assert_equal( + [array_encoder.encode(append_event_records), "{}"], + params, + ) raise PG::Error, "boom" end connection.expect(:exec, nil, ["ROLLBACK"]) @@ -151,7 +171,7 @@ def test_append_rolls_back_transaction_on_pg_failure Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: Query.all) + ).append(append_events, fail_if: Query.all) end end end @@ -166,7 +186,7 @@ def test_append_rolls_back_transaction_on_failure Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: Query.all) + ).append(append_events, fail_if: Query.all) end end end @@ -527,7 +547,7 @@ def test_append_returns_failure_when_sql_status_is_append_condition_violated Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria), + ).append(append_events, fail_if: fail_if_with_criteria), ) end end @@ -548,7 +568,7 @@ def test_append_retries_on_serialization_error_and_succeeds Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria), + ).append(append_events, fail_if: fail_if_with_criteria), ) end end @@ -570,7 +590,7 @@ def test_append_raises_serialization_error_after_default_retries Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria) + ).append(append_events, fail_if: fail_if_with_criteria) end assert_equal(10, attempts) end @@ -600,7 +620,7 @@ def test_append_raises_serialization_error_after_configured_retries Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria) + ).append(append_events, fail_if: fail_if_with_criteria) end assert_equal(2, attempts) end @@ -626,11 +646,17 @@ def success_result = [{ "status" => "success", "position" => "1" }] def failure_result = [{ "status" => "append_condition_violated" }] + def append_events = [Event.new(id: ids[0], type: "OrderPaid")] + + def append_event_records + [record_encoder.encode([ids[0], "OrderPaid", nil, nil, "{}"])] + end + def append_args [ "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [ - array_encoder.encode([]), + array_encoder.encode(append_event_records), '{"fail_if_events_match":[{"types":["OrderPlaced"]}]}', ], ] From 955fb9252a2bc4a5781883ffe7766aba6630e225 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 23:05:03 +0200 Subject: [PATCH 06/10] Return failure position - Include the last matching fail-if event position in failure results so callers can refresh their read boundary after a conditional append loses. - Derive the position in the append SQL function so the reported boundary is consistent with the append-condition check. --- README.md | 8 ++-- db/schema/0.1.0.sql | 61 +++++++++++++------------ lib/en57.rb | 2 +- lib/en57/repository.rb | 2 +- test/pg_regress/expected/001_schema.out | 61 +++++++++++++------------ test/test_integration.rb | 4 +- test/test_repository.rb | 5 +- 7 files changed, 73 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index d32c00d..d9018b4 100644 --- a/README.md +++ b/README.md @@ -123,8 +123,8 @@ result = event_store.append( case result in En57::Success(position:) # credits consumed at event position -in En57::Failure - # lost the race; another writer already consumed credits +in En57::Failure(position:) + # lost the race; another writer already consumed credits at position end ``` @@ -161,7 +161,7 @@ result = event_store.append( case result in En57::Success(position:) # user registered at event position -in En57::Failure - # email already used +in En57::Failure(position:) + # email already used at event position end ``` diff --git a/db/schema/0.1.0.sql b/db/schema/0.1.0.sql index 67c05f1..4976580 100644 --- a/db/schema/0.1.0.sql +++ b/db/schema/0.1.0.sql @@ -45,6 +45,7 @@ DECLARE req_tags text[]; req_after bigint; appended_position bigint; + matched_position bigint; BEGIN FOREACH criterion IN ARRAY criteria LOOP req_after := (criterion ->> 'after')::bigint; @@ -54,43 +55,43 @@ BEGIN req_tags := ARRAY ( SELECT DISTINCT jsonb_array_elements_text(COALESCE(criterion -> 'tags', '[]'::jsonb))); IF cardinality(req_tags) > 0 THEN - IF EXISTS ( + SELECT + max(e.position) + FROM ( SELECT - 1 - FROM ( - SELECT - t.event_id - FROM - en57.tags AS t - WHERE - t.value = ANY (req_tags) - GROUP BY - t.event_id - HAVING - count(*) = cardinality(req_tags)) AS matched - JOIN en57.events AS e ON e.id = matched.event_id - WHERE (req_after IS NULL - OR e.position > req_after) + t.event_id + FROM + en57.tags AS t + WHERE + t.value = ANY (req_tags) + GROUP BY + t.event_id + HAVING + count(*) = cardinality(req_tags)) AS matched + JOIN en57.events AS e ON e.id = matched.event_id + WHERE (req_after IS NULL + OR e.position > req_after) AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated', - NULL)::en57.append_result; - END IF; - ELSE - IF EXISTS ( + OR e.type = ANY (req_types)) + INTO + matched_position; + ELSE SELECT - 1 + max(e.position) FROM en57.events AS e WHERE (req_after IS NULL OR e.position > req_after) - AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated', - NULL)::en57.append_result; - END IF; -END IF; -END LOOP; + AND (criterion -> 'types' IS NULL + OR e.type = ANY (req_types)) + INTO + matched_position; + END IF; + IF matched_position IS NOT NULL THEN + RETURN ROW ('append_condition_violated', + matched_position)::en57.append_result; + END IF; + END LOOP; WITH inserted_events AS ( INSERT INTO en57.events (id, type, data, meta) SELECT diff --git a/lib/en57.rb b/lib/en57.rb index 171dcc0..6f8a7bb 100644 --- a/lib/en57.rb +++ b/lib/en57.rb @@ -15,7 +15,7 @@ module En57 Success = Data.define(:position) - Failure = Data.define + Failure = Data.define(:position) AppendRetriesExhausted = Class.new(StandardError) def self.configuration = Configuration.instance diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index 2297259..323da9a 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -58,7 +58,7 @@ def append(events, fail_if:) when "success" Success.new(position: row.first.fetch("position").then { Integer(it) }) when "append_condition_violated" - Failure.new + Failure.new(position: row.first.fetch("position").then { Integer(it) }) end rescue @adapter.serialization_error if attempts_remaining.positive? diff --git a/test/pg_regress/expected/001_schema.out b/test/pg_regress/expected/001_schema.out index 67c05f1..4976580 100644 --- a/test/pg_regress/expected/001_schema.out +++ b/test/pg_regress/expected/001_schema.out @@ -45,6 +45,7 @@ DECLARE req_tags text[]; req_after bigint; appended_position bigint; + matched_position bigint; BEGIN FOREACH criterion IN ARRAY criteria LOOP req_after := (criterion ->> 'after')::bigint; @@ -54,43 +55,43 @@ BEGIN req_tags := ARRAY ( SELECT DISTINCT jsonb_array_elements_text(COALESCE(criterion -> 'tags', '[]'::jsonb))); IF cardinality(req_tags) > 0 THEN - IF EXISTS ( + SELECT + max(e.position) + FROM ( SELECT - 1 - FROM ( - SELECT - t.event_id - FROM - en57.tags AS t - WHERE - t.value = ANY (req_tags) - GROUP BY - t.event_id - HAVING - count(*) = cardinality(req_tags)) AS matched - JOIN en57.events AS e ON e.id = matched.event_id - WHERE (req_after IS NULL - OR e.position > req_after) + t.event_id + FROM + en57.tags AS t + WHERE + t.value = ANY (req_tags) + GROUP BY + t.event_id + HAVING + count(*) = cardinality(req_tags)) AS matched + JOIN en57.events AS e ON e.id = matched.event_id + WHERE (req_after IS NULL + OR e.position > req_after) AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated', - NULL)::en57.append_result; - END IF; - ELSE - IF EXISTS ( + OR e.type = ANY (req_types)) + INTO + matched_position; + ELSE SELECT - 1 + max(e.position) FROM en57.events AS e WHERE (req_after IS NULL OR e.position > req_after) - AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RETURN ROW ('append_condition_violated', - NULL)::en57.append_result; - END IF; -END IF; -END LOOP; + AND (criterion -> 'types' IS NULL + OR e.type = ANY (req_types)) + INTO + matched_position; + END IF; + IF matched_position IS NOT NULL THEN + RETURN ROW ('append_condition_violated', + matched_position)::en57.append_result; + END IF; + END LOOP; WITH inserted_events AS ( INSERT INTO en57.events (id, type, data, meta) SELECT diff --git a/test/test_integration.rb b/test/test_integration.rb index 1f3a788..a33cdec 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -68,7 +68,7 @@ class TestIntegration < IntegrationTest ) assert_equal( - Failure.new, + Failure.new(position: 1), event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced"), @@ -110,7 +110,7 @@ class TestIntegration < IntegrationTest ) assert_equal( - Failure.new, + Failure.new(position: 1), event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced").after(0), diff --git a/test/test_repository.rb b/test/test_repository.rb index 123dd1e..17cb8df 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -543,7 +543,7 @@ def test_append_returns_failure_when_sql_status_is_append_condition_violated connection.expect(:exec, nil, ["COMMIT"]) assert_equal( - Failure.new, + Failure.new(position: 3), Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, @@ -644,7 +644,8 @@ def record_encoder = @record_encoder ||= PG::TextEncoder::Record.new def success_result = [{ "status" => "success", "position" => "1" }] - def failure_result = [{ "status" => "append_condition_violated" }] + def failure_result = + [{ "status" => "append_condition_violated", "position" => "3" }] def append_events = [Event.new(id: ids[0], type: "OrderPaid")] From 19e21fa7963755db7f1e7c6aba05895b0ee9382c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Mon, 25 May 2026 23:07:55 +0200 Subject: [PATCH 07/10] Reuse failure positions in benchmarks - Use the failure result position when retrying conditional append benchmark writes so retries do not need a follow-up read to find the new boundary. - Keep benchmark contention behavior aligned with the public append result API. --- lib/benchmark/append_non_conflicting_tags.rb | 5 +++-- lib/benchmark/concurrent_append_conflicting_tags.rb | 11 +++-------- .../concurrent_append_non_conflicting_tags.rb | 8 +++----- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/lib/benchmark/append_non_conflicting_tags.rb b/lib/benchmark/append_non_conflicting_tags.rb index 038cff4..a8c80de 100644 --- a/lib/benchmark/append_non_conflicting_tags.rb +++ b/lib/benchmark/append_non_conflicting_tags.rb @@ -22,12 +22,13 @@ def call(measure, retries, run_id) .with_tag(tags = ["writer:#{run_id}"]) events = @batch_size.times.map { Event.new(type:, tags:) } + position = 0 measure.call do loop do - case @event_store.append(events, fail_if: scope.after(position = 0)) + case @event_store.append(events, fail_if: scope.after(position)) in Success break - in Failure + in Failure(position:) retries.call end end diff --git a/lib/benchmark/concurrent_append_conflicting_tags.rb b/lib/benchmark/concurrent_append_conflicting_tags.rb index 8e610fe..25f4077 100644 --- a/lib/benchmark/concurrent_append_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_conflicting_tags.rb @@ -23,19 +23,14 @@ def call(measure, retries, run_id) events = @batch_size.times.map { Event.new(type:, tags:) } barrier.wait + position = 0 measure.call do loop do - case @event_store.append( - events, - fail_if: scope.after(position = 0), - ) + case @event_store.append(events, fail_if: scope.after(position)) in Success break - in Failure + in Failure(position:) retries.call - scope.each_with_position do |_event, event_position| - position = event_position - end end end end diff --git a/lib/benchmark/concurrent_append_non_conflicting_tags.rb b/lib/benchmark/concurrent_append_non_conflicting_tags.rb index 54c94be..e6727d8 100644 --- a/lib/benchmark/concurrent_append_non_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_non_conflicting_tags.rb @@ -27,15 +27,13 @@ def call(measure, retries, _run_id) events = @batch_size.times.map { Event.new(type:, tags:) } barrier.wait + position = 0 measure.call do loop do - case @event_store.append( - events, - fail_if: scope.after(position = 0), - ) + case @event_store.append(events, fail_if: scope.after(position)) in Success break - in Failure + in Failure(position:) retries.call end end From b8d5ce3725b329e78de816e0b520a1aee3d81c1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Tue, 26 May 2026 10:57:47 +0200 Subject: [PATCH 08/10] Simpler with internal retries --- lib/benchmark/concurrent_append_non_conflicting_tags.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/benchmark/concurrent_append_non_conflicting_tags.rb b/lib/benchmark/concurrent_append_non_conflicting_tags.rb index e6727d8..2c2d687 100644 --- a/lib/benchmark/concurrent_append_non_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_non_conflicting_tags.rb @@ -29,14 +29,7 @@ def call(measure, retries, _run_id) position = 0 measure.call do - loop do - case @event_store.append(events, fail_if: scope.after(position)) - in Success - break - in Failure(position:) - retries.call - end - end + @event_store.append(events, fail_if: scope.after(position)) end end end From 225ea352f7ee52b0fa54e727cc416977dc29448a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Tue, 26 May 2026 10:58:17 +0200 Subject: [PATCH 09/10] Formatting --- lib/benchmark/append_non_conflicting_tags.rb | 2 +- lib/benchmark/concurrent_append_conflicting_tags.rb | 2 +- lib/en57/repository.rb | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/benchmark/append_non_conflicting_tags.rb b/lib/benchmark/append_non_conflicting_tags.rb index a8c80de..074c9ca 100644 --- a/lib/benchmark/append_non_conflicting_tags.rb +++ b/lib/benchmark/append_non_conflicting_tags.rb @@ -28,7 +28,7 @@ def call(measure, retries, run_id) case @event_store.append(events, fail_if: scope.after(position)) in Success break - in Failure(position:) + in Failure[position:] retries.call end end diff --git a/lib/benchmark/concurrent_append_conflicting_tags.rb b/lib/benchmark/concurrent_append_conflicting_tags.rb index 25f4077..80d50bd 100644 --- a/lib/benchmark/concurrent_append_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_conflicting_tags.rb @@ -29,7 +29,7 @@ def call(measure, retries, run_id) case @event_store.append(events, fail_if: scope.after(position)) in Success break - in Failure(position:) + in Failure[position:] retries.call end end diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index 323da9a..bddd3f5 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -56,9 +56,13 @@ def append(events, fail_if:) case row.first.fetch("status") when "success" - Success.new(position: row.first.fetch("position").then { Integer(it) }) + Success.new( + position: row.first.fetch("position").then { Integer(it) }, + ) when "append_condition_violated" - Failure.new(position: row.first.fetch("position").then { Integer(it) }) + Failure.new( + position: row.first.fetch("position").then { Integer(it) }, + ) end rescue @adapter.serialization_error if attempts_remaining.positive? From f3d077734916214bf7c64bde93cae1a2798d87c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Pacana?= Date: Tue, 26 May 2026 10:58:37 +0200 Subject: [PATCH 10/10] Bigger ceiling for concurrent appends --- lib/en57/benchmark.rb | 5 +++++ test/test_benchmark.rb | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/lib/en57/benchmark.rb b/lib/en57/benchmark.rb index e5bd42a..fca99f5 100644 --- a/lib/en57/benchmark.rb +++ b/lib/en57/benchmark.rb @@ -256,6 +256,9 @@ def initialize(scenarios:, formatter:) end def run + original_append_retries = En57.configuration.append_retries + En57.configuration.append_retries = 100 + results = @scenarios.map do |instance_name, mk_scenario| PgEphemeral.with_server(instance_name:) do |server| @@ -283,6 +286,8 @@ def run end @formatter.format(results) + ensure + En57.configuration.append_retries = original_append_retries end end diff --git a/test/test_benchmark.rb b/test/test_benchmark.rb index 69cd80c..2521402 100644 --- a/test/test_benchmark.rb +++ b/test/test_benchmark.rb @@ -122,6 +122,47 @@ def run(measure, retries) assert_equal([3, 3], formatted_results.map(&:retry_count)) end + def test_runner_sets_append_retries_during_benchmark + formatter = Object.new + formatter.define_singleton_method(:format) { |_results| "formatted" } + server = Data.define(:url).new("postgres://example") + append_retries = nil + original_append_retries = En57.configuration.append_retries + En57.configuration.append_retries = 7 + scenario = + Class + .new do + def initialize(capture) + @capture = capture + end + + def name = "scenario" + def runs = 1 + def run(measure, _retries) + @capture.call(En57.configuration.append_retries) + measure.call { nil } + end + end + .new(->(value) { append_retries = value }) + + PgEphemeral.stub( + :with_server, + ->(instance_name:, &block) { block.call(server) }, + ) do + Runner.new( + formatter:, + scenarios: { + "instance" => ->(_database_url, _warmup_runs) { scenario }, + }, + ).run + end + + assert_equal(100, append_retries) + assert_equal(7, En57.configuration.append_retries) + ensure + En57.configuration.append_retries = original_append_retries + end + def test_runner_uses_concurrent_array_for_samples formatter = Object.new formatted_results = nil