-
Notifications
You must be signed in to change notification settings - Fork 10
Add postgres message queue (pgmq) publisher and consumer #1654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
12 changes: 12 additions & 0 deletions
12
examples/ConsumeAndPublishWithPgMq/ConsumeAndPublishWithPgMq.csproj
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
| <PropertyGroup> | ||
| <OutputType>Exe</OutputType> | ||
| <TargetFrameworks>net8.0;net9.0</TargetFrameworks> | ||
| <ImplicitUsings>enable</ImplicitUsings> | ||
| <Nullable>enable</Nullable> | ||
| </PropertyGroup> | ||
| <ItemGroup> | ||
| <ProjectReference Include="..\..\src\Motor.Extensions.Hosting.PgMq\Motor.Extensions.Hosting.PgMq.csproj" /> | ||
| <ProjectReference Include="..\..\src\Motor.Extensions.Utilities\Motor.Extensions.Utilities.csproj" /> | ||
| </ItemGroup> | ||
| </Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| namespace ConsumeAndPublishWithPgMq.Model; | ||
|
|
||
| public record InputMessage | ||
| { | ||
| public string FancyText { get; set; } = "FooBar"; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| namespace ConsumeAndPublishWithPgMq.Model; | ||
|
|
||
| public record OutputMessage | ||
| { | ||
| public string? NotSoFancyText { get; set; } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| using ConsumeAndPublishWithPgMq; | ||
| using ConsumeAndPublishWithPgMq.Model; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Hosting; | ||
| using Motor.Extensions.Hosting.Abstractions; | ||
| using Motor.Extensions.Hosting.Consumer; | ||
| using Motor.Extensions.Hosting.PgMq; | ||
| using Motor.Extensions.Hosting.Publisher; | ||
| using Motor.Extensions.Utilities; | ||
|
|
||
| await MotorHost | ||
| .CreateDefaultBuilder() | ||
| .ConfigureSingleOutputService<InputMessage, OutputMessage>() | ||
| .ConfigureServices( | ||
| (_, services) => | ||
| { | ||
| services.AddTransient<ISingleOutputService<InputMessage, OutputMessage>, SingleOutputService>(); | ||
| } | ||
| ) | ||
| .ConfigureConsumer<InputMessage>( | ||
| (_, builder) => | ||
| { | ||
| builder.AddPgMq(); | ||
| } | ||
| ) | ||
| .ConfigurePublisher<OutputMessage>( | ||
| (_, builder) => | ||
| { | ||
| builder.AddPgMq(); | ||
| } | ||
| ) | ||
| .RunConsoleAsync(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| using ConsumeAndPublishWithPgMq.Model; | ||
| using Motor.Extensions.Hosting.Abstractions; | ||
| using Motor.Extensions.Hosting.CloudEvents; | ||
|
|
||
| namespace ConsumeAndPublishWithPgMq; | ||
|
|
||
| public class SingleOutputService : ISingleOutputService<InputMessage, OutputMessage> | ||
| { | ||
| public Task<MotorCloudEvent<OutputMessage>?> ConvertMessageAsync( | ||
| MotorCloudEvent<InputMessage> dataCloudEvent, | ||
| CancellationToken token = default | ||
| ) | ||
| { | ||
| var input = dataCloudEvent.TypedData; | ||
| var output = DoSomething(input); | ||
| Console.WriteLine(output.NotSoFancyText); | ||
| var outputEvent = dataCloudEvent.CreateNew(output); | ||
| return Task.FromResult(outputEvent)!; | ||
| } | ||
|
|
||
| private static OutputMessage DoSomething(InputMessage incomingMessage) | ||
| { | ||
| return new OutputMessage { NotSoFancyText = new string(incomingMessage.FancyText.Reverse().ToArray()) }; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| { | ||
| "Serilog": { | ||
| "MinimumLevel": { | ||
| "Default": "Debug", | ||
| "Override": { | ||
| "Microsoft": "Warning", | ||
| "Microsoft.Hosting.Lifetime": "Information", | ||
| "System": "Warning" | ||
| } | ||
| } | ||
| }, | ||
| "PgMqConsumer": { | ||
| "Host": "localhost", | ||
| "Port": 5432, | ||
| "Database": "mydb", | ||
| "Username": "myuser", | ||
| "Password": "mypassword", | ||
| "QueueName": "my_input_queue", | ||
| "VisibilityTimeoutInSeconds": 300, | ||
| "PollTimeoutSeconds": 5, | ||
| "PollIntervalMilliseconds": 5000 | ||
| }, | ||
| "PgMqPublisher": { | ||
| "Host": "localhost", | ||
| "Port": 5432, | ||
| "Database": "mydb", | ||
| "Username": "myuser", | ||
| "Password": "mypassword", | ||
| "QueueName": "my_output_queue" | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace Motor.Extensions.Hosting.PgMq; | ||
|
|
||
| public static class LogEvents | ||
| { | ||
| // Large base offset to avoid collisions with other Motor.NET components and user-defined event IDs. | ||
| // In practice, log consumers typically filter by name rather than ID, but unique IDs avoid | ||
| // ambiguity when multiple components are used together. | ||
| private const int BaseOffset = 324_290_000; | ||
|
doxthree marked this conversation as resolved.
|
||
|
|
||
| public static readonly EventId CriticalFailureOnConsume = new(BaseOffset + 0, nameof(CriticalFailureOnConsume)); | ||
| public static readonly EventId MessageHandlingUnexpectedException = new( | ||
| BaseOffset + 1, | ||
| nameof(MessageHandlingUnexpectedException) | ||
| ); | ||
| public static readonly EventId ConsumerNotStarted = new(BaseOffset + 2, nameof(ConsumerNotStarted)); | ||
| public static readonly EventId NoMessageReceived = new(BaseOffset + 3, nameof(NoMessageReceived)); | ||
| public static readonly EventId TemporaryFailureOnConsume = new(BaseOffset + 4, nameof(TemporaryFailureOnConsume)); | ||
| public static readonly EventId InvalidInputOnConsume = new(BaseOffset + 5, nameof(InvalidInputOnConsume)); | ||
| public static readonly EventId FailureOnConsume = new(BaseOffset + 6, nameof(FailureOnConsume)); | ||
| public static readonly EventId MessageSuccessfullyProcessed = new( | ||
| BaseOffset + 7, | ||
| nameof(MessageSuccessfullyProcessed) | ||
| ); | ||
| } | ||
14 changes: 14 additions & 0 deletions
14
src/Motor.Extensions.Hosting.PgMq/Motor.Extensions.Hosting.PgMq.csproj
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
| <ItemGroup> | ||
| <ProjectReference Include="..\Motor.Extensions.Hosting.Abstractions\Motor.Extensions.Hosting.Abstractions.csproj" /> | ||
| <ProjectReference Include="..\Motor.Extensions.Hosting.CloudEvents\Motor.Extensions.Hosting.CloudEvents.csproj" /> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <PackageReference Include="CloudNative.CloudEvents.SystemTextJson" /> | ||
| <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" /> | ||
| <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" /> | ||
| <PackageReference Include="Microsoft.Extensions.Options" /> | ||
| <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" /> | ||
| <PackageReference Include="Npgmq" /> | ||
| </ItemGroup> | ||
| </Project> |
24 changes: 24 additions & 0 deletions
24
src/Motor.Extensions.Hosting.PgMq/Options/PgMqConsumerOptions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| namespace Motor.Extensions.Hosting.PgMq.Options; | ||
|
|
||
| // ReSharper disable once UnusedTypeParameter | ||
| public record PgMqConsumerOptions : PgOptions | ||
| { | ||
| /// <summary> | ||
| /// The visibility timeout is the amount of time a message is invisible to other consumers | ||
| /// after it has been read by a consumer. If the message is NOT deleted or archived within | ||
| /// the visibility timeout, it will become visible again and can be read by another consumer. | ||
| /// </summary> | ||
| public int VisibilityTimeoutInSeconds { get; init; } = 300; | ||
|
|
||
| /// <summary> | ||
| /// Maximum duration of the long-polling operation. | ||
| /// 5 is the default, same as in the upstream library. | ||
| /// </summary> | ||
| public int PollTimeoutSeconds { get; init; } = 5; | ||
|
|
||
| /// <summary> | ||
| /// Delay between internal postgres checks of the queue. | ||
| /// 250 is the default, same as in the upstream library. | ||
| /// </summary> | ||
| public int PollIntervalMilliseconds { get; init; } = 250; | ||
| } |
3 changes: 3 additions & 0 deletions
3
src/Motor.Extensions.Hosting.PgMq/Options/PgMqPublisherOptions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| namespace Motor.Extensions.Hosting.PgMq.Options; | ||
|
|
||
| public record PgMqPublisherOptions : PgOptions; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| using System.ComponentModel.DataAnnotations; | ||
|
|
||
| namespace Motor.Extensions.Hosting.PgMq.Options; | ||
|
|
||
| /// <summary> | ||
| /// PostgreSQL connection options shared by PgMq producer and consumer. | ||
| /// </summary> | ||
| public abstract record PgOptions | ||
| { | ||
| /// <summary>PostgreSQL server host name or IP address.</summary> | ||
| public required string Host { get; init; } | ||
|
|
||
| /// <summary>PostgreSQL server port. Defaults to 5432.</summary> | ||
| public int Port { get; init; } = 5432; | ||
|
prskr marked this conversation as resolved.
|
||
|
|
||
| /// <summary>PostgreSQL database name.</summary> | ||
| public required string Database { get; init; } | ||
|
|
||
| /// <summary>PostgreSQL user name.</summary> | ||
| public required string Username { get; init; } | ||
|
|
||
| /// <summary>PostgreSQL password.</summary> | ||
| public required string Password { get; init; } | ||
|
|
||
| /// <summary>The pgmq queue name.</summary> | ||
| public required string QueueName { get; init; } | ||
|
|
||
| /// <summary> | ||
| /// Builds a connection string from the structured properties. | ||
| /// </summary> | ||
| public string ToConnectionString() => | ||
| $"Host={Host};Port={Port};Database={Database};Username={Username};Password={Password}"; | ||
| } | ||
65 changes: 65 additions & 0 deletions
65
src/Motor.Extensions.Hosting.PgMq/PgMqHostBuilderExtensions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| using System; | ||
| using CloudNative.CloudEvents; | ||
| using CloudNative.CloudEvents.SystemTextJson; | ||
| using Microsoft.Extensions.Configuration; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Hosting; | ||
| using Microsoft.Extensions.Logging; | ||
| using Microsoft.Extensions.Options; | ||
| using Motor.Extensions.Hosting.Abstractions; | ||
| using Motor.Extensions.Hosting.CloudEvents; | ||
| using Motor.Extensions.Hosting.PgMq.Options; | ||
| using Npgmq; | ||
| using MSOptions = Microsoft.Extensions.Options.Options; | ||
|
|
||
| namespace Motor.Extensions.Hosting.PgMq; | ||
|
|
||
| public static class PgMqHostBuilderExtensions | ||
| { | ||
| public static void AddPgMqWithConfig<T>(this IConsumerBuilder<T> builder, IConfiguration config) | ||
| where T : notnull | ||
| { | ||
| var options = | ||
| config.Get<PgMqConsumerOptions>() | ||
| ?? throw new InvalidOperationException("PgMqConsumerOptions configuration section is missing or invalid"); | ||
| config.Bind(options); | ||
|
|
||
| builder.AddConsumer(sp => new PgMqMessageConsumer<T>( | ||
| options, | ||
| sp.GetRequiredService<ILogger<PgMqMessageConsumer<T>>>(), | ||
| sp.GetRequiredService<IHostApplicationLifetime>(), | ||
| sp.GetRequiredService<IApplicationNameService>(), | ||
| new NpgmqClient(options.ToConnectionString()) | ||
| )); | ||
| } | ||
|
|
||
| public static void AddPgMq<T>(this IConsumerBuilder<T> builder, string configSection = "PgMqConsumer") | ||
| where T : notnull | ||
| { | ||
| builder.AddPgMqWithConfig(builder.Context.Configuration.GetSection(configSection)); | ||
| } | ||
|
|
||
| public static void AddPgMqWithConfig<T>(this IPublisherBuilder<T> builder, IConfiguration config) | ||
| where T : notnull | ||
| { | ||
| builder.AddTransient<CloudEventFormatter, JsonEventFormatter>(); | ||
|
|
||
| var options = | ||
| config.Get<PgMqPublisherOptions>() | ||
| ?? throw new InvalidOperationException("PgMqPublisherOptions configuration section is missing or invalid"); | ||
| config.Bind(options); | ||
|
|
||
| builder.Configure<PgMqPublisherOptions>(config); | ||
| builder.AddPublisher(sp => new PgMqMessageProducer<T>( | ||
| MSOptions.Create(options), | ||
| sp.GetRequiredService<IOptions<PublisherOptions>>(), | ||
| new NpgmqClient(options.ToConnectionString()) | ||
| )); | ||
| } | ||
|
|
||
| public static void AddPgMq<T>(this IPublisherBuilder<T> builder, string configSection = "PgMqPublisher") | ||
| where T : notnull | ||
| { | ||
| builder.AddPgMqWithConfig(builder.Context.Configuration.GetSection(configSection)); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.