From d8bfc8d0dab249e1e33f42bff30a5b5688724b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 22:15:45 -0400 Subject: [PATCH 01/18] remove task seq because of resource leak issues --- src/Propulsion.Feed/FeedReader.fs | 11 +++-- src/Propulsion.Feed/FeedSource.fs | 24 +++++----- src/Propulsion.Feed/Infrastructure.fs | 46 +++++++++++++++++++ src/Propulsion.Feed/PeriodicSource.fs | 8 ++-- src/Propulsion.Feed/Propulsion.Feed.fsproj | 6 ++- .../ReaderCheckpoint.fs | 1 + src/Propulsion/Internal.fs | 5 ++ 7 files changed, 80 insertions(+), 21 deletions(-) create mode 100755 src/Propulsion.Feed/Infrastructure.fs diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index 577fa8c5..e1793b31 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -3,6 +3,7 @@ namespace Propulsion.Feed.Core open FSharp.Control open Propulsion.Feed open Propulsion.Internal +open Propulsion.Infrastructure open Serilog open System open System.Collections.Generic @@ -161,10 +162,12 @@ type FeedReader stats.UpdateCommittedPosition(initialPosition) let mutable currentPos, lastWasTail = initialPosition, false while not (ct.IsCancellationRequested || (lastWasTail && Option.isSome awaitIngesterShutdown)) do - for readLatency, batch in crawl (lastWasTail, currentPos) ct do - do! submitPage (readLatency, batch) - currentPos <- batch.checkpoint - lastWasTail <- batch.isTail + do! AsyncSeq.ofAsyncEnum (crawl (lastWasTail, currentPos) ct) + |> AsyncSeq.iterAsync (fun struct(readLatency, batch) -> async { + do! submitPage (readLatency, batch) |> Async.AwaitTaskCorrect + currentPos <- batch.checkpoint + lastWasTail <- batch.isTail }) + |> Async.startImmediateAsTask ct match awaitIngesterShutdown with | Some a when not ct.IsCancellationRequested -> let completionTimer = Stopwatch.start () diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index afb0ca4b..891ea356 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -267,24 +267,25 @@ type TailingFeedSource inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink, renderPos, ?logCommitFailure = logCommitFailure, ?readersStopAtTail = readersStopAtTail) - let crawl trancheId (wasLast, startPos) ct = taskSeq { - if wasLast then do! Task.delay tailSleepInterval ct - try let batches = crawl.Invoke(trancheId, startPos, ct) + let crawl trancheId (wasLast, startPos) ct = AsyncSeq.toAsyncEnum(asyncSeq { + if wasLast then do! Async.Sleep tailSleepInterval + try let batches = crawl.Invoke(trancheId, startPos, ct) |> AsyncSeq.ofAsyncEnum for batch in batches do yield batch with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext().Warning(e, "Read failure") | Some l -> l e - match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct } + match readFailureSleepInterval with None -> () | Some interval -> do! Async.Sleep interval + }) member _.Pump(readTranches, ct) = base.Pump(readTranches, crawl, ct) module TailingFeedSource = - let readOne readBatch cat pos ct = taskSeq { + let readOne readBatch cat pos ct = AsyncSeq.toAsyncEnum (asyncSeq { let sw = Stopwatch.start () - let! b = readBatch struct (cat, pos, ct) - yield struct (sw.Elapsed, b) } + let! b = readBatch struct (cat, pos, ct) |> Async.AwaitTaskCorrect + yield struct (sw.Elapsed, b) }) /// Drives reading and checkpointing from a source that aggregates data from multiple streams as a singular source /// without shards/physical partitions (tranches), such as the SqlStreamStore and EventStoreDB $all feeds @@ -341,6 +342,7 @@ open Propulsion.Internal open System open System.Threading open System.Threading.Tasks +open Propulsion.Infrastructure [] type Page<'F> = { items : FsCodec.ITimelineEvent<'F>[]; checkpoint : Position; isTail : bool } @@ -357,13 +359,13 @@ type FeedSource let crawl (readPage: Func>>) trancheId = let streamName = FsCodec.StreamName.compose "Messages" [SourceId.toString sourceId; TrancheId.toString trancheId] - fun (wasLast, pos) ct -> taskSeq { + fun (wasLast, pos) ct -> AsyncSeq.toAsyncEnum(asyncSeq { if wasLast then - do! Task.delay tailSleepInterval ct + do! Async.Sleep tailSleepInterval let readTs = Stopwatch.timestamp () - let! page = readPage.Invoke(trancheId, pos, ct) + let! page = (readPage.Invoke(trancheId, pos, ct)) |> Async.AwaitTaskCorrect let items' = page.items |> Array.map (fun x -> struct (streamName, x)) - yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) } + yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) }) member internal _.Pump(readTranches: Func>, readPage: Func>>, ct): Task = diff --git a/src/Propulsion.Feed/Infrastructure.fs b/src/Propulsion.Feed/Infrastructure.fs new file mode 100755 index 00000000..dccf72df --- /dev/null +++ b/src/Propulsion.Feed/Infrastructure.fs @@ -0,0 +1,46 @@ +[] +module internal Propulsion.Infrastructure + +open System.Threading.Tasks + +// http://www.fssnip.net/7Rc/title/AsyncAwaitTaskCorrect +// pending that being officially packaged somewhere or integrated into FSharp.Core https://github.com/fsharp/fslang-suggestions/issues/840 +type Async with + + /// + /// Gets the result of given task so that in the event of exception + /// the actual user exception is raised as opposed to being wrapped + /// in a System.AggregateException. + /// + /// Task to be awaited. + [] + static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> = + Async.FromContinuations(fun (sc, ec, _cc) -> + task.ContinueWith(fun (t : Task<'T>) -> + if t.IsFaulted then + let e = t.Exception + if e.InnerExceptions.Count = 1 then ec e.InnerExceptions[0] + else ec e + elif t.IsCanceled then ec (TaskCanceledException()) + else sc t.Result) + |> ignore) + + /// + /// Gets the result of given task so that in the event of exception + /// the actual user exception is raised as opposed to being wrapped + /// in a System.AggregateException. + /// + /// Task to be awaited. + [] + static member AwaitTaskCorrect(task : Task) : Async = + Async.FromContinuations(fun (sc, ec, _cc) -> + task.ContinueWith(fun (task : Task) -> + if task.IsFaulted then + let e = task.Exception + if e.InnerExceptions.Count = 1 then ec e.InnerExceptions[0] + else ec e + elif task.IsCanceled then + ec (TaskCanceledException()) + else + sc ()) + |> ignore) diff --git a/src/Propulsion.Feed/PeriodicSource.fs b/src/Propulsion.Feed/PeriodicSource.fs index 124ec2ee..e4e05a45 100644 --- a/src/Propulsion.Feed/PeriodicSource.fs +++ b/src/Propulsion.Feed/PeriodicSource.fs @@ -56,11 +56,11 @@ type PeriodicSource inherit Core.FeedSourceBase(log, statsInterval, sourceId, checkpoints, None, sink, defaultArg renderPos DateTimeOffsetPosition.render) // We don't want to checkpoint for real until we know the scheduler has handled the full set of pages in the crawl. - let crawlInternal (read : Func<_, IAsyncEnumerable>) trancheId (_wasLast, position) ct : IAsyncEnumerable)> = taskSeq { + let crawlInternal (read : Func<_, IAsyncEnumerable>) trancheId (_wasLast, position) ct : IAsyncEnumerable)> = AsyncSeq.toAsyncEnum(asyncSeq { let startDate = DateTimeOffsetPosition.getDateTimeOffset position let dueDate = startDate + refreshInterval match dueDate - DateTimeOffset.UtcNow with - | waitTime when waitTime.Ticks > 0L -> do! Task.delay waitTime ct + | waitTime when waitTime.Ticks > 0L -> do! Async.Sleep waitTime | _ -> () let basePosition = DateTimeOffset.UtcNow |> DateTimeOffsetPosition.ofDateTimeOffset @@ -70,7 +70,7 @@ type PeriodicSource let buffer = ResizeArray() let mutable index = 0L let mutable elapsed = TimeSpan.Zero - for ts, xs in read.Invoke trancheId do + for ts, xs in AsyncSeq.ofAsyncEnum (read.Invoke trancheId) do elapsed <- elapsed + ts let streamEvents : Propulsion.Sinks.StreamEvent seq = seq { for si in xs -> @@ -91,7 +91,7 @@ type PeriodicSource match buffer.ToArray() with | [||] as noItems -> noItems, basePosition | finalItem -> finalItem, let struct (_s, e) = Array.last finalItem in e |> Core.TimelineEvent.toCheckpointPosition - yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true } : Core.Batch<_>) } + yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true } : Core.Batch<_>) }) member internal _.Pump(readTranches: Func>, // The TaskSeq is expected to manage its own resilience strategy (retries etc).
diff --git a/src/Propulsion.Feed/Propulsion.Feed.fsproj b/src/Propulsion.Feed/Propulsion.Feed.fsproj index 97d8fdee..0ff17e3a 100644 --- a/src/Propulsion.Feed/Propulsion.Feed.fsproj +++ b/src/Propulsion.Feed/Propulsion.Feed.fsproj @@ -5,6 +5,9 @@ + + Infrastructure.fs + @@ -12,9 +15,8 @@ + - - diff --git a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs index 9ca8283a..cabdd058 100644 --- a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs +++ b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs @@ -4,6 +4,7 @@ open Dapper open FSharp.Control open Microsoft.Data.SqlClient open Propulsion.Feed +open Propulsion.Internal open System open System.Data diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index 73e3e26d..fbb9b29c 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -116,6 +116,11 @@ module Task = let parallelUnlimited ct xs : Task<'t []> = parallel_ 0 ct xs + let inline ignore (a: Task<'T>): Task = task { + let! _ = a + return () + } + type Sem(max) = let inner = new SemaphoreSlim(max) member _.HasCapacity = inner.CurrentCount <> 0 From 03bbbdce6155601e086104f8d828119c8befba5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 22:50:49 -0400 Subject: [PATCH 02/18] fix msgdb test --- tests/Propulsion.MessageDb.Integration/Tests.fs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 4da2ebb2..faf6a1a7 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -56,8 +56,7 @@ let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, T let makeCheckpoints consumerGroup = task { let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5433; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) do! checkpoints.CreateSchemaIfNotExists() - return checkpoints -} + return checkpoints } [] let ``It processes events for a category`` () = task { @@ -136,7 +135,6 @@ let ``It doesn't read the tail event again`` () = task { checkpoints, sink, [| category |]) use capture = new ActivityCapture() - use _src = source.Start() do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task From 603f59f61c7fd2ba5d46779e5c50405a77fce5e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 22:51:41 -0400 Subject: [PATCH 03/18] remove unused file --- src/Propulsion.Feed/Infrastructure.fs | 46 --------------------------- 1 file changed, 46 deletions(-) delete mode 100755 src/Propulsion.Feed/Infrastructure.fs diff --git a/src/Propulsion.Feed/Infrastructure.fs b/src/Propulsion.Feed/Infrastructure.fs deleted file mode 100755 index dccf72df..00000000 --- a/src/Propulsion.Feed/Infrastructure.fs +++ /dev/null @@ -1,46 +0,0 @@ -[] -module internal Propulsion.Infrastructure - -open System.Threading.Tasks - -// http://www.fssnip.net/7Rc/title/AsyncAwaitTaskCorrect -// pending that being officially packaged somewhere or integrated into FSharp.Core https://github.com/fsharp/fslang-suggestions/issues/840 -type Async with - - /// - /// Gets the result of given task so that in the event of exception - /// the actual user exception is raised as opposed to being wrapped - /// in a System.AggregateException. - /// - /// Task to be awaited. - [] - static member AwaitTaskCorrect(task : Task<'T>) : Async<'T> = - Async.FromContinuations(fun (sc, ec, _cc) -> - task.ContinueWith(fun (t : Task<'T>) -> - if t.IsFaulted then - let e = t.Exception - if e.InnerExceptions.Count = 1 then ec e.InnerExceptions[0] - else ec e - elif t.IsCanceled then ec (TaskCanceledException()) - else sc t.Result) - |> ignore) - - /// - /// Gets the result of given task so that in the event of exception - /// the actual user exception is raised as opposed to being wrapped - /// in a System.AggregateException. - /// - /// Task to be awaited. - [] - static member AwaitTaskCorrect(task : Task) : Async = - Async.FromContinuations(fun (sc, ec, _cc) -> - task.ContinueWith(fun (task : Task) -> - if task.IsFaulted then - let e = task.Exception - if e.InnerExceptions.Count = 1 then ec e.InnerExceptions[0] - else ec e - elif task.IsCanceled then - ec (TaskCanceledException()) - else - sc ()) - |> ignore) From c567e04e1de188c41679425921f33afd657f7c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 23:25:34 -0400 Subject: [PATCH 04/18] run the tests in a ci pipeline --- .github/workflows/ci.yaml | 34 +++++++++++++++++++ Propulsion.sln | 1 + .../Propulsion.MessageDb.Integration/Tests.fs | 4 +-- 3 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/ci.yaml diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..4fd435f3 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,34 @@ +name: Run tests + +on: + push: + branches: + - '**' + tags-ignore: + - '*' + +jobs: + build-test-deploy: + name: Build and test + runs-on: ubuntu-latest + services: + postgres: + image: ethangarofolo/messagedb + ports: + # will assign a random free host port + - 5432:5432 + # needed because the postgres container does not provide a healthcheck + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + steps: + - name: Checkout + uses: actions/checkout@v2 + + - uses: actions/setup-dotnet@v1 + with: + dotnet-version: '7.0.x' + + - name: Install dependencies + run: dotnet restore + + - name: Run Tests + run: dotnet test Propulsion.sln --no-restore --verbosity minimal diff --git a/Propulsion.sln b/Propulsion.sln index 4c71986f..4f54b74c 100644 --- a/Propulsion.sln +++ b/Propulsion.sln @@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".project", ".project", "{6E LICENSE = LICENSE README.md = README.md SECURITY.md = SECURITY.md + .github\workflows\ci.yaml = .github\workflows\ci.yaml EndProjectSection EndProject Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion", "src\Propulsion\Propulsion.fsproj", "{0F72360F-1C14-46E3-9A60-B6BF87BD726D}" diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index faf6a1a7..d4210b10 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -28,7 +28,7 @@ let createStreamMessage streamName = cmd [] -let ConnectionString = "Host=localhost; Port=5433; Username=message_store; Password=;" +let ConnectionString = "Host=localhost; Port=5432; Username=message_store; Password=;" let connect () = task { let conn = new NpgsqlConnection(ConnectionString) @@ -54,7 +54,7 @@ let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, T member _.HandleExn(log, x) = () } let makeCheckpoints consumerGroup = task { - let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5433; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) + let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) do! checkpoints.CreateSchemaIfNotExists() return checkpoints } From 5783a5e3a70ba94d9e6d64b1e2c9d78721bddd73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 23:27:47 -0400 Subject: [PATCH 05/18] use correct image --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4fd435f3..6628c24f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest services: postgres: - image: ethangarofolo/messagedb + image: ethangarofolo/message-db ports: # will assign a random free host port - 5432:5432 From eae13eb8650298d62087549d52c220cbb98d9705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 23:38:01 -0400 Subject: [PATCH 06/18] manual msgdb install --- .github/workflows/ci.yaml | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6628c24f..5586fbab 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,7 +13,9 @@ jobs: runs-on: ubuntu-latest services: postgres: - image: ethangarofolo/message-db + image: postgres:12 + env: + POSTGRES_HOST_AUTH_METHOD: trust ports: # will assign a random free host port - 5432:5432 @@ -29,6 +31,19 @@ jobs: - name: Install dependencies run: dotnet restore + + - name: Install message-db + env: + MESSAGE_DB_VERSION: 1.3.0 + PGHOST: localhost + PGUSER: postgres + PGPASSWORD: postgres + PGPORT: '5432' + run: | + mkdir -p /tmp/eventide + curl -L https://github.com/message-db/message-db/archive/refs/tags/v$MESSAGE_DB_VERSION.tar.gz -o /tmp/eventide/message-db.tgz + tar -xf /tmp/eventide/message-db.tgz --directory /tmp/eventide + (cd /tmp/eventide/message-db-${MESSAGE_DB_VERSION}/database && ./install.sh) - name: Run Tests run: dotnet test Propulsion.sln --no-restore --verbosity minimal From 592e4c4d71d977a268a494b9908f64722d4a2f51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 23:39:42 -0400 Subject: [PATCH 07/18] specify restore sln --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5586fbab..a020caf5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -30,7 +30,7 @@ jobs: dotnet-version: '7.0.x' - name: Install dependencies - run: dotnet restore + run: dotnet restore Propulsion.sln - name: Install message-db env: From 88ca745b013bd1bcca8c94022540e11abf53385b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 23:42:47 -0400 Subject: [PATCH 08/18] use dotnet v6 --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a020caf5..12ccb80e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ jobs: - uses: actions/setup-dotnet@v1 with: - dotnet-version: '7.0.x' + dotnet-version: '6.0.x' - name: Install dependencies run: dotnet restore Propulsion.sln From 4757733d5826fb2af67ce394cce8dead59b67d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sat, 13 May 2023 23:48:57 -0400 Subject: [PATCH 09/18] try with kafka too --- .github/workflows/ci.yaml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 12ccb80e..11202dd4 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,6 +21,19 @@ jobs: - 5432:5432 # needed because the postgres container does not provide a healthcheck options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + zookeeper: + image: zookeeper:3.4 + ports: + - 2181:2181 + kafka: + image: ches/kafka + env: + KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 + ZOOKEEPER_IP: 127.0.0.1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: true + KAFKA_NUM_PARTITIONS: 4 + ports: + - 9092:9092 steps: - name: Checkout uses: actions/checkout@v2 @@ -46,4 +59,6 @@ jobs: (cd /tmp/eventide/message-db-${MESSAGE_DB_VERSION}/database && ./install.sh) - name: Run Tests + env: + TEST_KAFKA_BROKER: localhost:9092 run: dotnet test Propulsion.sln --no-restore --verbosity minimal From ca7c6dcb1c19d8df5706f32713e008f75a6807fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 00:03:35 -0400 Subject: [PATCH 10/18] configuration --- .github/workflows/ci.yaml | 4 +++- build.proj | 1 + tests/Propulsion.MessageDb.Integration/Tests.fs | 10 +++++++--- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 11202dd4..bd05a506 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,4 +61,6 @@ jobs: - name: Run Tests env: TEST_KAFKA_BROKER: localhost:9092 - run: dotnet test Propulsion.sln --no-restore --verbosity minimal + MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store" + CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" + run: dotnet build build.proj -v d --no-restore diff --git a/build.proj b/build.proj index c00ac9e5..2d273597 100644 --- a/build.proj +++ b/build.proj @@ -34,6 +34,7 @@ + diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index d4210b10..9c93c433 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -27,8 +27,12 @@ let createStreamMessage streamName = cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore cmd -[] -let ConnectionString = "Host=localhost; Port=5432; Username=message_store; Password=;" +let ConnectionString = Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" +let CheckpointConnectionString = Environment.GetEnvironmentVariable "CHECKPOINT_CONNECTION_STRING" +type FactIfConnString() = + inherit FactAttribute() + override x.Skip = if null <> ConnectionString then null else "Skipping as no MSG_DB_CONNECTION_STRING supplied" + override x.Timeout = 60 * 15 * 1000 let connect () = task { let conn = new NpgsqlConnection(ConnectionString) @@ -54,7 +58,7 @@ let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, T member _.HandleExn(log, x) = () } let makeCheckpoints consumerGroup = task { - let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) + let checkpoints = ReaderCheckpoint.CheckpointStore(CheckpointConnectionString, "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) do! checkpoints.CreateSchemaIfNotExists() return checkpoints } From d63920d207dde663392138fa349dbe0f58f982e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 00:05:36 -0400 Subject: [PATCH 11/18] no need to restore --- .github/workflows/ci.yaml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index bd05a506..a1d61c6c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,9 +42,6 @@ jobs: with: dotnet-version: '6.0.x' - - name: Install dependencies - run: dotnet restore Propulsion.sln - - name: Install message-db env: MESSAGE_DB_VERSION: 1.3.0 @@ -63,4 +60,4 @@ jobs: TEST_KAFKA_BROKER: localhost:9092 MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store" CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" - run: dotnet build build.proj -v d --no-restore + run: dotnet build build.proj -v d From c6c9c037520179d58ba7fd27052dff8aa8571dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 09:42:15 -0400 Subject: [PATCH 12/18] stop trying to wrangle msbuild and just test the project --- .github/workflows/ci.yaml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a1d61c6c..04686e07 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -54,10 +54,16 @@ jobs: curl -L https://github.com/message-db/message-db/archive/refs/tags/v$MESSAGE_DB_VERSION.tar.gz -o /tmp/eventide/message-db.tgz tar -xf /tmp/eventide/message-db.tgz --directory /tmp/eventide (cd /tmp/eventide/message-db-${MESSAGE_DB_VERSION}/database && ./install.sh) - + + - name: Restore + run: dotnet restore Propulsion.sln + + - name: Build + run: dotnet build Propulsion.sln --configuration Release --no-restore + - name: Run Tests env: TEST_KAFKA_BROKER: localhost:9092 MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store" CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" - run: dotnet build build.proj -v d + run: dotnet test Propulsion.sln --no-restore --verbosity normal From cbe3ecec6b0d84a77c687e0cb8b130c27c8c2041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 10:11:53 -0400 Subject: [PATCH 13/18] make it build on net6 --- src/Propulsion.Feed/FeedReader.fs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index e1793b31..cdeef49f 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -126,7 +126,7 @@ type FeedReader renderPos, ?logCommitFailure, // If supplied, an isTail Batch stops the reader loop and waits for supplied cleanup function. Default is a perpetual read loop. - ?awaitIngesterShutdown) = + ?awaitIngesterShutdown: CancellationToken -> Task) = let stats = Stats(partition, source, tranche, renderPos) @@ -168,6 +168,7 @@ type FeedReader currentPos <- batch.checkpoint lastWasTail <- batch.isTail }) |> Async.startImmediateAsTask ct + |> Task.ignore match awaitIngesterShutdown with | Some a when not ct.IsCancellationRequested -> let completionTimer = Stopwatch.start () From b9b69e7e0d45356d8bdd6d30938bb059b39c5812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 10:47:46 -0400 Subject: [PATCH 14/18] try using ip rather than hostname --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 04686e07..f551312b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -63,7 +63,7 @@ jobs: - name: Run Tests env: - TEST_KAFKA_BROKER: localhost:9092 + TEST_KAFKA_BROKER: 127.0.0.1:9092 MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store" CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" run: dotnet test Propulsion.sln --no-restore --verbosity normal From c845ced9a660ee2dd6ee2fa49c3df74a976c4918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 10:54:03 -0400 Subject: [PATCH 15/18] don't try the kafka tests --- .github/workflows/ci.yaml | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f551312b..5772f4b1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,19 +21,6 @@ jobs: - 5432:5432 # needed because the postgres container does not provide a healthcheck options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 - zookeeper: - image: zookeeper:3.4 - ports: - - 2181:2181 - kafka: - image: ches/kafka - env: - KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 - ZOOKEEPER_IP: 127.0.0.1 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: true - KAFKA_NUM_PARTITIONS: 4 - ports: - - 9092:9092 steps: - name: Checkout uses: actions/checkout@v2 @@ -63,7 +50,6 @@ jobs: - name: Run Tests env: - TEST_KAFKA_BROKER: 127.0.0.1:9092 MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store" CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" - run: dotnet test Propulsion.sln --no-restore --verbosity normal + run: dotnet test Propulsion.sln --no-restore --verbosity minimal From 73d031c0a55c8bd1d7f66ba8871c6ca340eb0d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 10:56:54 -0400 Subject: [PATCH 16/18] sorf --- src/Propulsion.Feed/FeedReader.fs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index cdeef49f..0c49a533 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -2,8 +2,8 @@ namespace Propulsion.Feed.Core open FSharp.Control open Propulsion.Feed -open Propulsion.Internal open Propulsion.Infrastructure +open Propulsion.Internal open Serilog open System open System.Collections.Generic From 068609906c95af2b09b33f185d8f4c0c56f1182c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 15:37:47 -0400 Subject: [PATCH 17/18] missed a spot --- src/Propulsion.CosmosStore/ChangeFeedProcessor.fs | 11 +++++------ src/Propulsion.DynamoStore/DynamoStoreSource.fs | 11 ++++++----- .../Propulsion.DynamoStore.fsproj | 1 + 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs index 5ca6acbb..5429794c 100644 --- a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs +++ b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs @@ -152,16 +152,15 @@ type ChangeFeedProcessor = let estimator = monitored.GetChangeFeedEstimator(processorName_, leases) let emitLagMetrics (ct : CancellationToken) = task { while not ct.IsCancellationRequested do - let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : IAsyncEnumerable<'u> = taskSeq { + let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : Task<'u seq> = task { // earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it use query = estimator.GetCurrentStateIterator() // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable + let result = ResizeArray() while query.HasMoreResults do let! res = query.ReadNextAsync(ct) - for x in res do - yield map x } - let! leasesState = - feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag) - |> TaskSeq.toArrayAsync + for x in res do result.Add(map x) + return result :> 'u seq } + let! leasesState = feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag) do! lagMonitorCallback (Seq.sortBy fst leasesState |> List.ofSeq) } emitLagMetrics) let wrap (f : unit -> Task) () = task { return! f () } diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 4dd5fd5c..690091c7 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -2,6 +2,7 @@ open Equinox.DynamoStore open FSharp.Control +open Propulsion.Infrastructure open Propulsion.Internal open System open System.Threading @@ -44,10 +45,10 @@ module private Impl = // Includes optional hydrating of events with event bodies and/or metadata (controlled via hydrating/maybeLoad args) let materializeIndexEpochAsBatchesOfStreamEvents (log : Serilog.ILogger, sourceId, storeLog) (hydrating, maybeLoad : _ -> _ -> (CancellationToken -> Task<_>) voption, loadDop) batchCutoff (context : DynamoStoreContext) - (AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = taskSeq { + (AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = AsyncSeq.toAsyncEnum(asyncSeq { let epochs = AppendsEpoch.Reader.Config.create storeLog context let sw = Stopwatch.start () - let! _maybeSize, version, state = epochs.Read(pid, epochId, offset) |> Async.startImmediateAsTask ct + let! _maybeSize, version, state = epochs.Read(pid, epochId, offset) let totalChanges = state.changes.Length sw.Stop() let totalStreams, chosenEvents, totalEvents, streamEvents = @@ -103,15 +104,15 @@ module private Impl = for i, spans in state.changes do let pending = spans |> Array.filter (fun (span : AppendsEpoch.Events.StreamSpan) -> streamEvents.ContainsKey(span.p)) if buffer.Count <> 0 && buffer.Count + pending.Length > batchCutoff then - let! hydrated = materializeSpans ct + let! hydrated = materializeSpans ct |> Async.AwaitTaskCorrect report (Some i) hydrated.Length yield struct (sw.Elapsed, sliceBatch epochId i hydrated) // not i + 1 as the batch does not include these changes sw.Reset() buffer.Clear() buffer.AddRange(pending) - let! hydrated = materializeSpans ct + let! hydrated = materializeSpans ct |> Async.AwaitTaskCorrect report None hydrated.Length - yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) } + yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) }) /// Defines the strategy to use for hydrating the events prior to routing them to the Handler [] diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index 963bb6a3..08099139 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -6,6 +6,7 @@ + From da71efb61f78914fd16813bb47a3987e33852c3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 14 May 2023 19:28:11 -0400 Subject: [PATCH 18/18] tests --- tests/Propulsion.MessageDb.Integration/Tests.fs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 9c93c433..5de46a98 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -27,12 +27,15 @@ let createStreamMessage streamName = cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore cmd -let ConnectionString = Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" -let CheckpointConnectionString = Environment.GetEnvironmentVariable "CHECKPOINT_CONNECTION_STRING" -type FactIfConnString() = - inherit FactAttribute() - override x.Skip = if null <> ConnectionString then null else "Skipping as no MSG_DB_CONNECTION_STRING supplied" - override x.Timeout = 60 * 15 * 1000 +let ConnectionString = + match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with + | null -> "Host=localhost; Database=message_store; Port=5432; Username=message_store" + | s -> s +let CheckpointConnectionString = + match Environment.GetEnvironmentVariable "CHECKPOINT_CONNECTION_STRING" with + | null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" + | s -> s + let connect () = task { let conn = new NpgsqlConnection(ConnectionString)