diff --git a/README.md b/README.md index 645589a..d9018b4 100644 --- a/README.md +++ b/README.md @@ -109,19 +109,22 @@ Example: consume credits only once per account. ```ruby account_scope = event_store.read.with_tag("account:x") -begin - event_store.append( - [ - En57::Event.new( - type: "CreditsUsed", - data: { amount: 100 }, - tags: ["account:x"], - ), - ], - fail_if: account_scope.of_type("CreditsUsed"), - ) -rescue En57::AppendConditionViolated - # lost the race; another writer already consumed credits +result = event_store.append( + [ + En57::Event.new( + type: "CreditsUsed", + data: { amount: 100 }, + tags: ["account:x"], + ), + ], + fail_if: account_scope.of_type("CreditsUsed"), +) + +case result +in En57::Success(position:) + # credits consumed at event position +in En57::Failure(position:) + # lost the race; another writer already consumed credits at position end ``` @@ -144,18 +147,21 @@ Example: ensure no event exists with this email tag before writing. ```ruby email_tag = "email:alice@example.com" -begin - event_store.append( - [ - En57::Event.new( - type: "UserRegistered", - data: { name: "Alice" }, - tags: [email_tag], - ), - ], - fail_if: event_store.read.with_tag(email_tag), - ) -rescue En57::AppendConditionViolated - # email already used +result = event_store.append( + [ + En57::Event.new( + type: "UserRegistered", + data: { name: "Alice" }, + tags: [email_tag], + ), + ], + fail_if: event_store.read.with_tag(email_tag), +) + +case result +in En57::Success(position:) + # user registered at event position +in En57::Failure(position:) + # email already used at event position end ``` diff --git a/db/schema/0.1.0.sql b/db/schema/0.1.0.sql index 3ec3595..4976580 100644 --- a/db/schema/0.1.0.sql +++ b/db/schema/0.1.0.sql @@ -26,8 +26,13 @@ CREATE TYPE en57.event AS ( tags text[] ); +CREATE TYPE en57.append_result AS ( + status text, + "position" bigint +); + CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb) - RETURNS void + RETURNS en57.append_result LANGUAGE plpgsql SET enable_seqscan = OFF AS $$ @@ -39,6 +44,8 @@ DECLARE req_types text[]; req_tags text[]; req_after bigint; + appended_position bigint; + matched_position bigint; BEGIN FOREACH criterion IN ARRAY criteria LOOP req_after := (criterion ->> 'after')::bigint; @@ -48,56 +55,70 @@ BEGIN req_tags := ARRAY ( SELECT DISTINCT jsonb_array_elements_text(COALESCE(criterion -> 'tags', '[]'::jsonb))); IF cardinality(req_tags) > 0 THEN - IF EXISTS ( + SELECT + max(e.position) + FROM ( SELECT - 1 - FROM ( - SELECT - t.event_id - FROM - en57.tags AS t - WHERE - t.value = ANY (req_tags) - GROUP BY - t.event_id - HAVING - count(*) = cardinality(req_tags)) AS matched - JOIN en57.events AS e ON e.id = matched.event_id - WHERE (req_after IS NULL - OR e.position > req_after) + t.event_id + FROM + en57.tags AS t + WHERE + t.value = ANY (req_tags) + GROUP BY + t.event_id + HAVING + count(*) = cardinality(req_tags)) AS matched + JOIN en57.events AS e ON e.id = matched.event_id + WHERE (req_after IS NULL + OR e.position > req_after) AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; - END IF; - ELSE - IF EXISTS ( + OR e.type = ANY (req_types)) + INTO + matched_position; + ELSE SELECT - 1 + max(e.position) FROM en57.events AS e WHERE (req_after IS NULL OR e.position > req_after) - AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; - END IF; -END IF; -END LOOP; + AND (criterion -> 'types' IS NULL + OR e.type = ANY (req_types)) + INTO + matched_position; + END IF; + IF matched_position IS NOT NULL THEN + RETURN ROW ('append_condition_violated', + matched_position)::en57.append_result; + END IF; + END LOOP; + WITH inserted_events AS ( INSERT INTO en57.events (id, type, data, meta) -SELECT - e.id, - e.type, - e.data, - e.meta -FROM - unnest(new_events) AS e; -INSERT INTO en57.tags (event_id, value) -SELECT - e.id, - t.value -FROM - unnest(new_events) AS e + SELECT + e.id, + e.type, + e.data, + e.meta + FROM + unnest(new_events) AS e + RETURNING + "position" +) + SELECT + max("position") + FROM + inserted_events + INTO + appended_position; + INSERT INTO en57.tags (event_id, value) + SELECT + e.id, + t.value + FROM + unnest(new_events) AS e CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value); + RETURN ROW ('success', + appended_position)::en57.append_result; END; $$; diff --git a/lib/benchmark/append_non_conflicting_tags.rb b/lib/benchmark/append_non_conflicting_tags.rb index 0622a0e..074c9ca 100644 --- a/lib/benchmark/append_non_conflicting_tags.rb +++ b/lib/benchmark/append_non_conflicting_tags.rb @@ -22,12 +22,15 @@ def call(measure, retries, run_id) .with_tag(tags = ["writer:#{run_id}"]) events = @batch_size.times.map { Event.new(type:, tags:) } + position = 0 measure.call do - begin - @event_store.append(events, fail_if: scope.after(position = 0)) - rescue AppendConditionViolated - retries.call - retry + loop do + case @event_store.append(events, fail_if: scope.after(position)) + in Success + break + in Failure[position:] + retries.call + end end end end diff --git a/lib/benchmark/concurrent_append_conflicting_tags.rb b/lib/benchmark/concurrent_append_conflicting_tags.rb index 4da0f6a..80d50bd 100644 --- a/lib/benchmark/concurrent_append_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_conflicting_tags.rb @@ -23,15 +23,15 @@ def call(measure, retries, run_id) events = @batch_size.times.map { Event.new(type:, tags:) } barrier.wait + position = 0 measure.call do - begin - @event_store.append(events, fail_if: scope.after(position = 0)) - rescue AppendConditionViolated - retries.call - scope.each_with_position do |_event, event_position| - position = event_position + loop do + case @event_store.append(events, fail_if: scope.after(position)) + in Success + break + in Failure[position:] + retries.call end - retry end end end diff --git a/lib/benchmark/concurrent_append_non_conflicting_tags.rb b/lib/benchmark/concurrent_append_non_conflicting_tags.rb index abe2d62..2c2d687 100644 --- a/lib/benchmark/concurrent_append_non_conflicting_tags.rb +++ b/lib/benchmark/concurrent_append_non_conflicting_tags.rb @@ -27,13 +27,9 @@ def call(measure, retries, _run_id) events = @batch_size.times.map { Event.new(type:, tags:) } barrier.wait + position = 0 measure.call do - begin - @event_store.append(events, fail_if: scope.after(position = 0)) - rescue AppendConditionViolated - retries.call - retry - end + @event_store.append(events, fail_if: scope.after(position)) end end end diff --git a/lib/en57.rb b/lib/en57.rb index 42ff441..6f8a7bb 100644 --- a/lib/en57.rb +++ b/lib/en57.rb @@ -14,7 +14,9 @@ require_relative "en57/configuration" module En57 - AppendConditionViolated = Class.new(StandardError) + Success = Data.define(:position) + Failure = Data.define(:position) + AppendRetriesExhausted = Class.new(StandardError) def self.configuration = Configuration.instance diff --git a/lib/en57/active_record_adapter.rb b/lib/en57/active_record_adapter.rb index 078ca91..3925e9a 100644 --- a/lib/en57/active_record_adapter.rb +++ b/lib/en57/active_record_adapter.rb @@ -19,6 +19,8 @@ def with_transaction(&block) = run_transaction({}, &block) def with_serializable_transaction(&block) = run_transaction({ isolation: :serializable }, &block) + def serialization_error = ActiveRecord::SerializationFailure + private def run_transaction(options) diff --git a/lib/en57/benchmark.rb b/lib/en57/benchmark.rb index e5bd42a..fca99f5 100644 --- a/lib/en57/benchmark.rb +++ b/lib/en57/benchmark.rb @@ -256,6 +256,9 @@ def initialize(scenarios:, formatter:) end def run + original_append_retries = En57.configuration.append_retries + En57.configuration.append_retries = 100 + results = @scenarios.map do |instance_name, mk_scenario| PgEphemeral.with_server(instance_name:) do |server| @@ -283,6 +286,8 @@ def run end @formatter.format(results) + ensure + En57.configuration.append_retries = original_append_retries end end diff --git a/lib/en57/configuration.rb b/lib/en57/configuration.rb index 1670cbf..00aa196 100644 --- a/lib/en57/configuration.rb +++ b/lib/en57/configuration.rb @@ -6,9 +6,10 @@ module En57 class Configuration include Singleton - attr_accessor :serializer + attr_accessor :append_retries, :serializer def initialize + @append_retries = 9 @serializer = JsonSerializer.new end end diff --git a/lib/en57/event_store.rb b/lib/en57/event_store.rb index b29238e..d475f39 100644 --- a/lib/en57/event_store.rb +++ b/lib/en57/event_store.rb @@ -8,7 +8,6 @@ def initialize(repository) def append(events, fail_if: EmptyScope.new) @repository.append(events, fail_if: fail_if.to_query) - self end def read diff --git a/lib/en57/pg_adapter.rb b/lib/en57/pg_adapter.rb index 2140ede..4b5e83e 100644 --- a/lib/en57/pg_adapter.rb +++ b/lib/en57/pg_adapter.rb @@ -19,18 +19,22 @@ def with_transaction(&block) = run_transaction("BEGIN", &block) def with_serializable_transaction(&block) = run_transaction("BEGIN ISOLATION LEVEL SERIALIZABLE", &block) + def serialization_error = PG::TRSerializationFailure + private def run_transaction(begin_statement) with_connection do |connection| connection.exec(begin_statement) - begin - yield connection - rescue StandardError - connection.exec("ROLLBACK") - raise - end + result = + begin + yield connection + rescue StandardError + connection.exec("ROLLBACK") + raise + end connection.exec("COMMIT") + result end end diff --git a/lib/en57/repository.rb b/lib/en57/repository.rb index c5e4fb4..bddd3f5 100644 --- a/lib/en57/repository.rb +++ b/lib/en57/repository.rb @@ -13,6 +13,8 @@ def initialize(adapter, serializer = En57.configuration.serializer) end def append(events, fail_if:) + return Success.new(position: nil) if events.empty? + event_records = events.map do |event| serialized, description = @serializer.dump(event.data) @@ -32,23 +34,44 @@ def append(events, fail_if:) :fail_if_events_match ] = fail_if_events_match unless fail_if_events_match.empty? - statement = "SELECT en57.append_events($1::en57.event[], $2::jsonb)" + statement = + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)" params = [ @array_encoder.encode(event_records), JSON.generate(append_condition), ] - if fail_if_events_match.empty? - @adapter.with_transaction do |connection| - connection.exec_params(statement, params) + attempts_remaining = En57.configuration.append_retries + begin + row = + if fail_if_events_match.empty? + @adapter.with_transaction do |connection| + connection.exec_params(statement, params) + end + else + @adapter.with_serializable_transaction do |connection| + connection.exec_params(statement, params) + end + end + + case row.first.fetch("status") + when "success" + Success.new( + position: row.first.fetch("position").then { Integer(it) }, + ) + when "append_condition_violated" + Failure.new( + position: row.first.fetch("position").then { Integer(it) }, + ) end - else - @adapter.with_serializable_transaction do |connection| - connection.exec_params(statement, params) + rescue @adapter.serialization_error + if attempts_remaining.positive? + attempts_remaining -= 1 + retry end + + raise AppendRetriesExhausted end - rescue PG::RaiseException, PG::TRSerializationFailure - raise AppendConditionViolated end def read(query) diff --git a/lib/en57/sequel_adapter.rb b/lib/en57/sequel_adapter.rb index daa48be..89d6419 100644 --- a/lib/en57/sequel_adapter.rb +++ b/lib/en57/sequel_adapter.rb @@ -18,6 +18,8 @@ def with_transaction(&block) = run_transaction({}, &block) def with_serializable_transaction(&block) = run_transaction({ isolation: :serializable }, &block) + def serialization_error = PG::TRSerializationFailure + private def run_transaction(options) diff --git a/test/pg_regress/expected/001_schema.out b/test/pg_regress/expected/001_schema.out index 3ec3595..4976580 100644 --- a/test/pg_regress/expected/001_schema.out +++ b/test/pg_regress/expected/001_schema.out @@ -26,8 +26,13 @@ CREATE TYPE en57.event AS ( tags text[] ); +CREATE TYPE en57.append_result AS ( + status text, + "position" bigint +); + CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb) - RETURNS void + RETURNS en57.append_result LANGUAGE plpgsql SET enable_seqscan = OFF AS $$ @@ -39,6 +44,8 @@ DECLARE req_types text[]; req_tags text[]; req_after bigint; + appended_position bigint; + matched_position bigint; BEGIN FOREACH criterion IN ARRAY criteria LOOP req_after := (criterion ->> 'after')::bigint; @@ -48,56 +55,70 @@ BEGIN req_tags := ARRAY ( SELECT DISTINCT jsonb_array_elements_text(COALESCE(criterion -> 'tags', '[]'::jsonb))); IF cardinality(req_tags) > 0 THEN - IF EXISTS ( + SELECT + max(e.position) + FROM ( SELECT - 1 - FROM ( - SELECT - t.event_id - FROM - en57.tags AS t - WHERE - t.value = ANY (req_tags) - GROUP BY - t.event_id - HAVING - count(*) = cardinality(req_tags)) AS matched - JOIN en57.events AS e ON e.id = matched.event_id - WHERE (req_after IS NULL - OR e.position > req_after) + t.event_id + FROM + en57.tags AS t + WHERE + t.value = ANY (req_tags) + GROUP BY + t.event_id + HAVING + count(*) = cardinality(req_tags)) AS matched + JOIN en57.events AS e ON e.id = matched.event_id + WHERE (req_after IS NULL + OR e.position > req_after) AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; - END IF; - ELSE - IF EXISTS ( + OR e.type = ANY (req_types)) + INTO + matched_position; + ELSE SELECT - 1 + max(e.position) FROM en57.events AS e WHERE (req_after IS NULL OR e.position > req_after) - AND (criterion -> 'types' IS NULL - OR e.type = ANY (req_types))) THEN - RAISE EXCEPTION 'append_condition_violated'; - END IF; -END IF; -END LOOP; + AND (criterion -> 'types' IS NULL + OR e.type = ANY (req_types)) + INTO + matched_position; + END IF; + IF matched_position IS NOT NULL THEN + RETURN ROW ('append_condition_violated', + matched_position)::en57.append_result; + END IF; + END LOOP; + WITH inserted_events AS ( INSERT INTO en57.events (id, type, data, meta) -SELECT - e.id, - e.type, - e.data, - e.meta -FROM - unnest(new_events) AS e; -INSERT INTO en57.tags (event_id, value) -SELECT - e.id, - t.value -FROM - unnest(new_events) AS e + SELECT + e.id, + e.type, + e.data, + e.meta + FROM + unnest(new_events) AS e + RETURNING + "position" +) + SELECT + max("position") + FROM + inserted_events + INTO + appended_position; + INSERT INTO en57.tags (event_id, value) + SELECT + e.id, + t.value + FROM + unnest(new_events) AS e CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value); + RETURN ROW ('success', + appended_position)::en57.append_result; END; $$; diff --git a/test/test_active_record_adapter.rb b/test/test_active_record_adapter.rb index 70ccc99..3a45a0b 100644 --- a/test/test_active_record_adapter.rb +++ b/test/test_active_record_adapter.rb @@ -48,6 +48,13 @@ def test_with_serializable_transaction_wraps_block_in_transaction end end + def test_serialization_error_returns_active_record_serialization_failure + with_mock_adapter do |_pool, _connection, _raw_connection, adapter| + assert_same ActiveRecord::SerializationFailure, + adapter.serialization_error + end + end + def test_with_serializable_transaction_reraises_block_errors error = RuntimeError.new("boom") diff --git a/test/test_benchmark.rb b/test/test_benchmark.rb index 69cd80c..2521402 100644 --- a/test/test_benchmark.rb +++ b/test/test_benchmark.rb @@ -122,6 +122,47 @@ def run(measure, retries) assert_equal([3, 3], formatted_results.map(&:retry_count)) end + def test_runner_sets_append_retries_during_benchmark + formatter = Object.new + formatter.define_singleton_method(:format) { |_results| "formatted" } + server = Data.define(:url).new("postgres://example") + append_retries = nil + original_append_retries = En57.configuration.append_retries + En57.configuration.append_retries = 7 + scenario = + Class + .new do + def initialize(capture) + @capture = capture + end + + def name = "scenario" + def runs = 1 + def run(measure, _retries) + @capture.call(En57.configuration.append_retries) + measure.call { nil } + end + end + .new(->(value) { append_retries = value }) + + PgEphemeral.stub( + :with_server, + ->(instance_name:, &block) { block.call(server) }, + ) do + Runner.new( + formatter:, + scenarios: { + "instance" => ->(_database_url, _warmup_runs) { scenario }, + }, + ).run + end + + assert_equal(100, append_retries) + assert_equal(7, En57.configuration.append_retries) + ensure + En57.configuration.append_retries = original_append_retries + end + def test_runner_uses_concurrent_array_for_samples formatter = Object.new formatted_results = nil diff --git a/test/test_en57.rb b/test/test_en57.rb index 3a0f030..eadbeb8 100644 --- a/test/test_en57.rb +++ b/test/test_en57.rb @@ -24,6 +24,10 @@ def test_requiring_en57_without_active_record_skips_active_record_adapter RUBY end + def test_configuration_default_append_retries + assert_equal 9, En57.configuration.append_retries + end + def test_configuration_default_serializer assert_kind_of JsonSerializer, En57.configuration.serializer end @@ -31,8 +35,12 @@ def test_configuration_default_serializer def test_configure with_empty_configuration do serializer = Object.new - En57.configure { |c| c.serializer = serializer } + En57.configure do |c| + c.append_retries = 2 + c.serializer = serializer + end + assert_equal 2, En57.configuration.append_retries assert_equal serializer, En57.configuration.serializer end end diff --git a/test/test_event_store.rb b/test/test_event_store.rb index 0270029..a8e893c 100644 --- a/test/test_event_store.rb +++ b/test/test_event_store.rb @@ -10,7 +10,12 @@ def test_append_event event = Event.new(type: "CreditsToppedUp") with_repository do |repository| - repository.expect(:append, nil, [[event]], fail_if: Query.all) + repository.expect( + :append, + Success.new(position: 1), + [[event]], + fail_if: Query.all, + ) EventStore.new(repository).append([event]) end @@ -29,15 +34,21 @@ def test_read_returns_scope_for_query_all end end - def test_return_self_from_append + def test_returns_success_from_append event = Event.new(type: "CreditsToppedUp") with_repository do |repository| - repository.expect(:append, nil, [[event]], fail_if: Query.all) - - event_store = EventStore.new(repository) - - assert_equal(event_store, event_store.append([event])) + repository.expect( + :append, + Success.new(position: 1), + [[event]], + fail_if: Query.all, + ) + + assert_equal( + Success.new(position: 1), + EventStore.new(repository).append([event]), + ) end end @@ -47,7 +58,12 @@ def test_append_accepts_scope_for_fail_if with_repository do |repository| event_store = EventStore.new(repository) fail_if = event_store.read.with_tag("order_id:123") - repository.expect(:append, nil, [[event]], fail_if: fail_if.to_query) + repository.expect( + :append, + Success.new(position: 1), + [[event]], + fail_if: fail_if.to_query, + ) event_store.append([event], fail_if:) end diff --git a/test/test_factories.rb b/test/test_factories.rb index d7f0e6d..cb2f92c 100644 --- a/test/test_factories.rb +++ b/test/test_factories.rb @@ -44,7 +44,8 @@ def test_event_store_does_not_conflict_with_public_schema_tables def assert_round_trip(event_store) event = Event.new(type: "FactoryTested") - assert_equal [event], event_store.append([event]).read.each.to_a + assert_equal Success.new(position: 1), event_store.append([event]) + assert_equal [event], event_store.read.each.to_a end end end diff --git a/test/test_integration.rb b/test/test_integration.rb index 2fea4df..a33cdec 100644 --- a/test/test_integration.rb +++ b/test/test_integration.rb @@ -24,7 +24,8 @@ class TestIntegration < IntegrationTest ), ] - assert_equal(events, event_store.append(events).read.each.to_a) + assert_equal(Success.new(position: 2), event_store.append(events)) + assert_equal(events, event_store.read.each.to_a) end end @@ -35,9 +36,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal( events.map.with_index(1) { |event, position| [event, position] }, - event_store.append(events).read.each_with_position.to_a, + event_store.read.each_with_position.to_a, ) end end @@ -45,26 +47,33 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_fail_if_and_no_matches_appends_events" do with_event_store(factory) do |event_store| event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append( - [event], - fail_if: event_store.read.of_type("PriceChanged"), + assert_equal( + Success.new(position: 1), + event_store.append( + [event], + fail_if: event_store.read.of_type("PriceChanged"), + ), ) assert_equal([event], event_store.read.each.to_a) end end - define_method "test_#{name}_append_with_fail_if_and_matches_raises_append_condition_violated" do + define_method "test_#{name}_append_with_fail_if_and_matches_returns_failure" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) - assert_raises(AppendConditionViolated) do + assert_equal( + Failure.new(position: 1), event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced"), - ) - end + ), + ) assert_equal([existing_event], event_store.read.each.to_a) end @@ -73,10 +82,16 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_after_ignores_matches_at_or_before_cutoff" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) - event_store.append( - [Event.new(id: ids[1], type: "ShipmentScheduled")], - fail_if: event_store.read.of_type("OrderPlaced").after(1), + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) + assert_equal( + Success.new(position: 2), + event_store.append( + [Event.new(id: ids[1], type: "ShipmentScheduled")], + fail_if: event_store.read.of_type("OrderPlaced").after(1), + ), ) assert_equal( @@ -86,17 +101,21 @@ class TestIntegration < IntegrationTest end end - define_method "test_#{name}_append_with_after_raises_if_match_is_after_cutoff" do + define_method "test_#{name}_append_with_after_returns_failure_if_match_is_after_cutoff" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) - assert_raises(AppendConditionViolated) do + assert_equal( + Failure.new(position: 1), event_store.append( [Event.new(id: ids[1], type: "ShipmentScheduled")], fail_if: event_store.read.of_type("OrderPlaced").after(0), - ) - end + ), + ) assert_equal([existing_event], event_store.read.each.to_a) end @@ -105,7 +124,10 @@ class TestIntegration < IntegrationTest define_method "test_#{name}_append_with_duplicate_id_raises_unique_violation" do with_event_store(factory) do |event_store| existing_event = Event.new(id: ids[0], type: "OrderPlaced") - event_store.append([existing_event]) + assert_equal( + Success.new(position: 1), + event_store.append([existing_event]), + ) assert_raises(PG::UniqueViolation) do event_store.append( @@ -122,7 +144,8 @@ class TestIntegration < IntegrationTest event = Event.new(id: ids[0], type: "OrderPlaced", tags: ["order_id:123"]) - assert_equal([event], event_store.append([event]).read.each.to_a) + assert_equal(Success.new(position: 1), event_store.append([event])) + assert_equal([event], event_store.read.each.to_a) end end @@ -133,10 +156,8 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] - assert_equal( - events.drop(1), - event_store.append(events).read.after(1).each.to_a, - ) + assert_equal(Success.new(position: 2), event_store.append(events)) + assert_equal(events.drop(1), event_store.read.after(1).each.to_a) end end @@ -155,10 +176,10 @@ class TestIntegration < IntegrationTest ), ] + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal( events.take(1), event_store - .append(events) .read .with_tag("order_id:123", "tenant_id:acme") .each @@ -174,9 +195,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[1], type: "PriceChanged"), ] + assert_equal(Success.new(position: 2), event_store.append(events)) assert_equal( events.take(1), - event_store.append(events).read.of_type("OrderPlaced").each.to_a, + event_store.read.of_type("OrderPlaced").each.to_a, ) end end @@ -189,14 +211,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[2], type: "OrderCancelled"), ] + assert_equal(Success.new(position: 3), event_store.append(events)) assert_equal( events.drop(1), - event_store - .append(events) - .read - .of_type("OrderPlaced", "OrderCancelled") - .each - .to_a, + event_store.read.of_type("OrderPlaced", "OrderCancelled").each.to_a, ) end end @@ -209,10 +227,10 @@ class TestIntegration < IntegrationTest Event.new(id: ids[2], type: "PriceChanged", tags: ["order_id:123"]), ] + assert_equal(Success.new(position: 3), event_store.append(events)) assert_equal( events.take(1), event_store - .append(events) .read .of_type("OrderPlaced") .with_tag("order_id:123") @@ -235,7 +253,7 @@ class TestIntegration < IntegrationTest tags: ["order_id:123"], ), ] - event_store.append(events) + assert_equal(Success.new(position: 5), event_store.append(events)) orders = event_store.read.of_type("OrderPlaced").with_tag("order_id:123") diff --git a/test/test_migrator.rb b/test/test_migrator.rb index 952057d..db74e45 100644 --- a/test/test_migrator.rb +++ b/test/test_migrator.rb @@ -55,7 +55,8 @@ def test_migrate_installs_schema_used_by_event_store ), ) - assert_equal [event], event_store.append([event]).read.each.to_a + assert_equal Success.new(position: 1), event_store.append([event]) + assert_equal [event], event_store.read.each.to_a ensure connection&.close end diff --git a/test/test_pg_adapter.rb b/test/test_pg_adapter.rb index e2968dd..8ab560b 100644 --- a/test/test_pg_adapter.rb +++ b/test/test_pg_adapter.rb @@ -24,6 +24,10 @@ def test_for_pool_uses_connection_pool assert_same connection, adapter.with_connection { |conn| conn } end + def test_for_connection_returns_adapter + assert_instance_of(PgAdapter, PgAdapter.for_connection(Object.new)) + end + def test_for_connection_synchronizes_access connection = Object.new adapter = PgAdapter.for_connection(connection) @@ -60,10 +64,12 @@ def test_with_serializable_transaction_commits_on_success ) connection.expect(:exec, nil, ["COMMIT"]) - adapter.with_serializable_transaction do |conn| - assert_equal :written, - conn.exec_params("SELECT en57.append_events()", []) - end + assert_equal( + :written, + adapter.with_serializable_transaction do |conn| + conn.exec_params("SELECT en57.append_events()", []) + end, + ) end end @@ -94,6 +100,12 @@ def test_with_serializable_transaction_rolls_back_on_failure assert_same error, raised end + def test_serialization_error_returns_pg_serialization_failure + with_mock_adapter do |_connection, adapter| + assert_same PG::TRSerializationFailure, adapter.serialization_error + end + end + def test_with_transaction_commits_on_success with_mock_adapter do |connection, adapter| connection.expect(:exec, nil, ["BEGIN"]) @@ -104,10 +116,12 @@ def test_with_transaction_commits_on_success ) connection.expect(:exec, nil, ["COMMIT"]) - adapter.with_transaction do |conn| - assert_equal :written, - conn.exec_params("SELECT en57.append_events()", []) - end + assert_equal( + :written, + adapter.with_transaction do |conn| + conn.exec_params("SELECT en57.append_events()", []) + end, + ) end end diff --git a/test/test_repository.rb b/test/test_repository.rb index 8225692..17cb8df 100644 --- a/test/test_repository.rb +++ b/test/test_repository.rb @@ -34,9 +34,9 @@ def test_append_without_fail_if_uses_plain_transaction connection.expect(:exec, nil, ["BEGIN"]) connection.expect( :exec_params, - nil, + success_result, [ - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [expected_events, "{}"], ], ) @@ -78,9 +78,9 @@ def test_append_persists_empty_event_data_as_null connection.expect(:exec, nil, ["BEGIN"]) connection.expect( :exec_params, - nil, + success_result, [ - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [expected_events, "{}"], ], ) @@ -97,36 +97,56 @@ def test_append_persists_empty_event_data_as_null end def test_append_passes_fail_if_and_after_conditions + expected_events = + array_encoder.encode( + [record_encoder.encode([ids[0], "OrderPaid", nil, nil, "{}"])], + ) + with_connection do |connection| connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) connection.expect( :exec_params, - nil, + success_result, [ - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", [ - array_encoder.encode([]), + expected_events, '{"fail_if_events_match":[{"types":["OrderPlaced"],"after":42}]}', ], ], ) connection.expect(:exec, nil, ["COMMIT"]) - Repository.new( - PgAdapter.for_connection(connection), - JsonSerializer.new, - ).append( - [], - fail_if: - Query.new( - criteria: [ - Query::Criteria.new( - types: ["OrderPlaced"], - tags: [], - after: 42, - ), - ], - ), + assert_equal( + Success.new(position: 1), + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append( + [Event.new(id: ids[0], type: "OrderPaid")], + fail_if: + Query.new( + criteria: [ + Query::Criteria.new( + types: ["OrderPlaced"], + tags: [], + after: 42, + ), + ], + ), + ), + ) + end + end + + def test_append_short_circuits_empty_event_set + with_connection do |connection| + assert_equal( + Success.new(position: nil), + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append([], fail_if: fail_if_with_criteria), ) end end @@ -134,21 +154,24 @@ def test_append_passes_fail_if_and_after_conditions def test_append_rolls_back_transaction_on_pg_failure with_connection do |connection| connection.expect(:exec, nil, ["BEGIN"]) - connection.expect(:exec, nil, ["ROLLBACK"]) connection.expect(:exec_params, nil) do |sql, params| assert_equal( - "SELECT en57.append_events($1::en57.event[], $2::jsonb)", + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", sql, ) - assert_equal([array_encoder.encode([]), "{}"], params) + assert_equal( + [array_encoder.encode(append_event_records), "{}"], + params, + ) raise PG::Error, "boom" end + connection.expect(:exec, nil, ["ROLLBACK"]) assert_raises(PG::Error) do Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: Query.all) + ).append(append_events, fail_if: Query.all) end end end @@ -156,14 +179,14 @@ def test_append_rolls_back_transaction_on_pg_failure def test_append_rolls_back_transaction_on_failure with_connection do |connection| connection.expect(:exec, nil, ["BEGIN"]) - connection.expect(:exec, nil, ["ROLLBACK"]) connection.expect(:exec_params, nil) { raise RuntimeError, "boom" } + connection.expect(:exec, nil, ["ROLLBACK"]) assert_raises(RuntimeError) do Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: Query.all) + ).append(append_events, fail_if: Query.all) end end end @@ -513,38 +536,97 @@ def test_read_events_filtered_by_type end end - def test_append_raises_append_condition_violated_from_pg_error_sqlstate + def test_append_returns_failure_when_sql_status_is_append_condition_violated with_connection do |connection| connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) - connection.expect(:exec, nil, ["ROLLBACK"]) - connection.expect(:exec_params, nil) { raise(PG::RaiseException.new) } + connection.expect(:exec_params, failure_result, append_args) + connection.expect(:exec, nil, ["COMMIT"]) - assert_raises(AppendConditionViolated) do + assert_equal( + Failure.new(position: 3), Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria) - end + ).append(append_events, fail_if: fail_if_with_criteria), + ) end end - def test_append_raises_append_condition_violated_from_serialization_failure_result_sqlstate + def test_append_retries_on_serialization_error_and_succeeds with_connection do |connection| connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) - connection.expect(:exec, nil, ["ROLLBACK"]) connection.expect(:exec_params, nil) do raise PG::TRSerializationFailure.new end + connection.expect(:exec, nil, ["ROLLBACK"]) + connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) + connection.expect(:exec_params, success_result, append_args) + connection.expect(:exec, nil, ["COMMIT"]) - assert_raises(AppendConditionViolated) do + assert_equal( + Success.new(position: 1), Repository.new( PgAdapter.for_connection(connection), JsonSerializer.new, - ).append([], fail_if: fail_if_with_criteria) + ).append(append_events, fail_if: fail_if_with_criteria), + ) + end + end + + def test_append_raises_serialization_error_after_default_retries + attempts = 0 + + with_connection do |connection| + 10.times do + connection.expect(:exec, nil, ["BEGIN ISOLATION LEVEL SERIALIZABLE"]) + connection.expect(:exec_params, nil) do + attempts += 1 + raise PG::TRSerializationFailure.new + end + connection.expect(:exec, nil, ["ROLLBACK"]) end + + assert_raises(AppendRetriesExhausted) do + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append(append_events, fail_if: fail_if_with_criteria) + end + assert_equal(10, attempts) end end + def test_append_raises_serialization_error_after_configured_retries + attempts = 0 + + En57 + .configuration + .stub(:append_retries, 1) do + with_connection do |connection| + 2.times do + connection.expect( + :exec, + nil, + ["BEGIN ISOLATION LEVEL SERIALIZABLE"], + ) + connection.expect(:exec_params, nil) do + attempts += 1 + raise PG::TRSerializationFailure.new + end + connection.expect(:exec, nil, ["ROLLBACK"]) + end + + assert_raises(AppendRetriesExhausted) do + Repository.new( + PgAdapter.for_connection(connection), + JsonSerializer.new, + ).append(append_events, fail_if: fail_if_with_criteria) + end + assert_equal(2, attempts) + end + end + end + private def ids = @ids ||= Hash.new { |h, k| h[k] = SecureRandom.uuid_v7 } @@ -560,6 +642,27 @@ def array_encoder = @array_encoder ||= PG::TextEncoder::Array.new def record_encoder = @record_encoder ||= PG::TextEncoder::Record.new + def success_result = [{ "status" => "success", "position" => "1" }] + + def failure_result = + [{ "status" => "append_condition_violated", "position" => "3" }] + + def append_events = [Event.new(id: ids[0], type: "OrderPaid")] + + def append_event_records + [record_encoder.encode([ids[0], "OrderPaid", nil, nil, "{}"])] + end + + def append_args + [ + "SELECT status, position FROM en57.append_events($1::en57.event[], $2::jsonb)", + [ + array_encoder.encode(append_event_records), + '{"fail_if_events_match":[{"types":["OrderPlaced"]}]}', + ], + ] + end + def fail_if_with_criteria Query.new( criteria: [Query::Criteria.new(types: ["OrderPlaced"], tags: [])], diff --git a/test/test_sequel_adapter.rb b/test/test_sequel_adapter.rb index fca176b..e07c7d1 100644 --- a/test/test_sequel_adapter.rb +++ b/test/test_sequel_adapter.rb @@ -73,6 +73,12 @@ def test_with_serializable_transaction_unwraps_pg_error_subclasses end end + def test_serialization_error_returns_pg_serialization_failure + with_mock_adapter do |_database, _connection, adapter| + assert_same PG::TRSerializationFailure, adapter.serialization_error + end + end + def test_with_transaction_synchronizes_inside_transaction with_mock_adapter do |database, connection, adapter| database.expect(:transaction, :committed) { |&block| block.call } diff --git a/test/test_stress.rb b/test/test_stress.rb index ff35966..9d9eb5e 100644 --- a/test/test_stress.rb +++ b/test/test_stress.rb @@ -42,14 +42,18 @@ class TestStress < IntegrationTest ], fail_if: account_scope.of_type("CreditsUsed"), ) - rescue AppendConditionViolated => e + rescue AppendRetriesExhausted => e e end end + results = threads.map(&:value) + assert_equal(1, results.select { Success === it }.size) assert_equal( (concurrency - 1), - threads.map(&:value).select { AppendConditionViolated === it }.size, + results + .select { Failure === it || AppendRetriesExhausted === it } + .size, ) assert_equal( 1,