Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>

<ItemGroup>
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="4.0.3.40" />
<PackageVersion Include="AWSSDK.SQS" Version="4.0.2.31" />
Expand Down Expand Up @@ -43,6 +42,7 @@
<PackageVersion Include="Moq" Version="4.20.72" />
<PackageVersion Include="NATS.Client" Version="1.1.8" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.4" />
<PackageVersion Include="Npgmq" Version="1.13.0" />
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Exporter.Jaeger" Version="1.5.1" />
Expand All @@ -65,6 +65,7 @@
<PackageVersion Include="System.Text.Json" Version="9.0.14" />
<PackageVersion Include="TestContainers" Version="4.11.0" />
<PackageVersion Include="TestContainers.Kafka" Version="4.11.0" />
<PackageVersion Include="Testcontainers.PostgreSql" Version="4.11.0" />
<PackageVersion Include="TestContainers.RabbitMq" Version="4.11.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.5" />
Expand Down
3 changes: 3 additions & 0 deletions Motor.NET.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<Project Path="examples/ConsumeAndPublishNATS/ConsumeAndPublishNATS.csproj" />
<Project Path="examples/ConsumeAndPublishWithKafka/ConsumeAndPublishWithKafka.csproj" />
<Project Path="examples/ConsumeAndPublishWithKafka_IntegrationTest/ConsumeAndPublishWithKafka_IntegrationTest.csproj" />
<Project Path="examples/ConsumeAndPublishWithPgMq/ConsumeAndPublishWithPgMq.csproj" />
<Project Path="examples/ConsumeAndPublishWithRabbitMQ/ConsumeAndPublishWithRabbitMQ.csproj" />
<Project Path="examples/ConsumeSQS/ConsumeSQS.csproj" />
<Project Path="examples/ConsumeWithRabbitMQAndDeadLetterExchange/ConsumeWithRabbitMQAndDeadLetterExchange.csproj" />
Expand Down Expand Up @@ -48,6 +49,7 @@
<Project Path="src/Motor.Extensions.Hosting.Consumer/Motor.Extensions.Hosting.Consumer.csproj" />
<Project Path="src/Motor.Extensions.Hosting.Kafka/Motor.Extensions.Hosting.Kafka.csproj" />
<Project Path="src/Motor.Extensions.Hosting.NATS/Motor.Extensions.Hosting.NATS.csproj" />
<Project Path="src/Motor.Extensions.Hosting.PgMq/Motor.Extensions.Hosting.PgMq.csproj" />
<Project Path="src/Motor.Extensions.Hosting.Publisher/Motor.Extensions.Hosting.Publisher.csproj" />
<Project Path="src/Motor.Extensions.Hosting.RabbitMQ/Motor.Extensions.Hosting.RabbitMQ.csproj" />
<Project Path="src/Motor.Extensions.Hosting.SQS/Motor.Extensions.Hosting.SQS.csproj" />
Expand All @@ -74,6 +76,7 @@
<Project Path="test/Motor.Extensions.Hosting.Kafka_IntegrationTest/Motor.Extensions.Hosting.Kafka_IntegrationTest.csproj" />
<Project Path="test/Motor.Extensions.Hosting.Kafka_UnitTest/Motor.Extensions.Hosting.Kafka_UnitTest.csproj" />
<Project Path="test/Motor.Extensions.Hosting.NATS_IntegrationTest/Motor.Extensions.Hosting.NATS_IntegrationTest.csproj" />
<Project Path="test/Motor.Extensions.Hosting.PgMq_IntegrationTest/Motor.Extensions.Hosting.PgMq_IntegrationTest.csproj" />
<Project Path="test/Motor.Extensions.Hosting.Publisher_UnitTest/Motor.Extensions.Hosting.Publisher_UnitTest.csproj" />
<Project Path="test/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest/Motor.Extensions.Hosting.RabbitMQ_IntegrationTest.csproj" />
<Project Path="test/Motor.Extensions.Hosting.RabbitMQ_UnitTest/Motor.Extensions.Hosting.RabbitMQ_UnitTest.csproj" />
Expand Down
1 change: 1 addition & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ You find working examples for different use-cases under the [examples](./example
| Timer | ( :heavy_check_mark: ) | - | :x: | :x: | :x: | :x: | |
| SQS | ( :heavy_check_mark: ) | - | :x: | :x: | :x: | :x: | |
| NATS | ( :heavy_check_mark: ) | :heavy_check_mark: | :x: | :heavy_check_mark: | :x: | :x: | |
| PGMQ | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :x: | :x: | :x: | |

**CloudEvents (Protocol)**: If supported, the protocol format uses headers from Kafka or RabbitMQ to store CloudEvent metadata.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFrameworks>net8.0;net9.0</TargetFrameworks>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Motor.Extensions.Hosting.PgMq\Motor.Extensions.Hosting.PgMq.csproj" />
<ProjectReference Include="..\..\src\Motor.Extensions.Utilities\Motor.Extensions.Utilities.csproj" />
</ItemGroup>
</Project>
6 changes: 6 additions & 0 deletions examples/ConsumeAndPublishWithPgMq/Model/InputMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ConsumeAndPublishWithPgMq.Model;

public record InputMessage
{
public string FancyText { get; set; } = "FooBar";
}
6 changes: 6 additions & 0 deletions examples/ConsumeAndPublishWithPgMq/Model/OutputMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace ConsumeAndPublishWithPgMq.Model;

public record OutputMessage
{
public string? NotSoFancyText { get; set; }
}
32 changes: 32 additions & 0 deletions examples/ConsumeAndPublishWithPgMq/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using ConsumeAndPublishWithPgMq;
using ConsumeAndPublishWithPgMq.Model;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.Consumer;
using Motor.Extensions.Hosting.PgMq;
using Motor.Extensions.Hosting.Publisher;
using Motor.Extensions.Utilities;

await MotorHost
.CreateDefaultBuilder()
.ConfigureSingleOutputService<InputMessage, OutputMessage>()
.ConfigureServices(
(_, services) =>
{
services.AddTransient<ISingleOutputService<InputMessage, OutputMessage>, SingleOutputService>();
}
)
.ConfigureConsumer<InputMessage>(
(_, builder) =>
{
builder.AddPgMq();
}
)
.ConfigurePublisher<OutputMessage>(
(_, builder) =>
{
builder.AddPgMq();
}
)
.RunConsoleAsync();
25 changes: 25 additions & 0 deletions examples/ConsumeAndPublishWithPgMq/SingleOutputService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using ConsumeAndPublishWithPgMq.Model;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;

namespace ConsumeAndPublishWithPgMq;

public class SingleOutputService : ISingleOutputService<InputMessage, OutputMessage>
{
public Task<MotorCloudEvent<OutputMessage>?> ConvertMessageAsync(
MotorCloudEvent<InputMessage> dataCloudEvent,
CancellationToken token = default
)
{
var input = dataCloudEvent.TypedData;
var output = DoSomething(input);
Console.WriteLine(output.NotSoFancyText);
var outputEvent = dataCloudEvent.CreateNew(output);
return Task.FromResult(outputEvent)!;
}

private static OutputMessage DoSomething(InputMessage incomingMessage)
{
return new OutputMessage { NotSoFancyText = new string(incomingMessage.FancyText.Reverse().ToArray()) };
}
}
31 changes: 31 additions & 0 deletions examples/ConsumeAndPublishWithPgMq/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"Serilog": {
"MinimumLevel": {
"Default": "Debug",
"Override": {
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information",
"System": "Warning"
}
}
},
"PgMqConsumer": {
"Host": "localhost",
"Port": 5432,
"Database": "mydb",
"Username": "myuser",
"Password": "mypassword",
"QueueName": "my_input_queue",
"VisibilityTimeoutInSeconds": 300,
"PollTimeoutSeconds": 5,
"PollIntervalMilliseconds": 5000
},
"PgMqPublisher": {
"Host": "localhost",
"Port": 5432,
"Database": "mydb",
"Username": "myuser",
"Password": "mypassword",
"QueueName": "my_output_queue"
}
}
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>0.16.1</Version>
<Version>0.17.0</Version>
<TargetFrameworks>net8.0;net9.0;</TargetFrameworks>
<LangVersion>11</LangVersion>
<Nullable>enable</Nullable>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net.Mime;
using System.Text;
using CloudNative.CloudEvents;

namespace Motor.Extensions.Hosting.CloudEvents;
Expand Down
26 changes: 26 additions & 0 deletions src/Motor.Extensions.Hosting.PgMq/LogEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Microsoft.Extensions.Logging;

namespace Motor.Extensions.Hosting.PgMq;

public static class LogEvents
{
// Large base offset to avoid collisions with other Motor.NET components and user-defined event IDs.
// In practice, log consumers typically filter by name rather than ID, but unique IDs avoid
// ambiguity when multiple components are used together.
private const int BaseOffset = 324_290_000;
Comment thread
rngcntr marked this conversation as resolved.
Comment thread
doxthree marked this conversation as resolved.

public static readonly EventId CriticalFailureOnConsume = new(BaseOffset + 0, nameof(CriticalFailureOnConsume));
public static readonly EventId MessageHandlingUnexpectedException = new(
BaseOffset + 1,
nameof(MessageHandlingUnexpectedException)
);
public static readonly EventId ConsumerNotStarted = new(BaseOffset + 2, nameof(ConsumerNotStarted));
public static readonly EventId NoMessageReceived = new(BaseOffset + 3, nameof(NoMessageReceived));
public static readonly EventId TemporaryFailureOnConsume = new(BaseOffset + 4, nameof(TemporaryFailureOnConsume));
public static readonly EventId InvalidInputOnConsume = new(BaseOffset + 5, nameof(InvalidInputOnConsume));
public static readonly EventId FailureOnConsume = new(BaseOffset + 6, nameof(FailureOnConsume));
public static readonly EventId MessageSuccessfullyProcessed = new(
BaseOffset + 7,
nameof(MessageSuccessfullyProcessed)
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\Motor.Extensions.Hosting.Abstractions\Motor.Extensions.Hosting.Abstractions.csproj" />
<ProjectReference Include="..\Motor.Extensions.Hosting.CloudEvents\Motor.Extensions.Hosting.CloudEvents.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="CloudNative.CloudEvents.SystemTextJson" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="Npgmq" />
</ItemGroup>
</Project>
24 changes: 24 additions & 0 deletions src/Motor.Extensions.Hosting.PgMq/Options/PgMqConsumerOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Motor.Extensions.Hosting.PgMq.Options;

// ReSharper disable once UnusedTypeParameter
public record PgMqConsumerOptions : PgOptions
{
/// <summary>
/// The visibility timeout is the amount of time a message is invisible to other consumers
/// after it has been read by a consumer. If the message is NOT deleted or archived within
/// the visibility timeout, it will become visible again and can be read by another consumer.
/// </summary>
public int VisibilityTimeoutInSeconds { get; init; } = 300;

/// <summary>
/// Maximum duration of the long-polling operation.
/// 5 is the default, same as in the upstream library.
/// </summary>
public int PollTimeoutSeconds { get; init; } = 5;

/// <summary>
/// Delay between internal postgres checks of the queue.
/// 250 is the default, same as in the upstream library.
/// </summary>
public int PollIntervalMilliseconds { get; init; } = 250;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Motor.Extensions.Hosting.PgMq.Options;

public record PgMqPublisherOptions : PgOptions;
33 changes: 33 additions & 0 deletions src/Motor.Extensions.Hosting.PgMq/Options/PgOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.ComponentModel.DataAnnotations;

namespace Motor.Extensions.Hosting.PgMq.Options;

/// <summary>
/// PostgreSQL connection options shared by PgMq producer and consumer.
/// </summary>
public abstract record PgOptions
{
/// <summary>PostgreSQL server host name or IP address.</summary>
public required string Host { get; init; }

/// <summary>PostgreSQL server port. Defaults to 5432.</summary>
public int Port { get; init; } = 5432;
Comment thread
prskr marked this conversation as resolved.

/// <summary>PostgreSQL database name.</summary>
public required string Database { get; init; }

/// <summary>PostgreSQL user name.</summary>
public required string Username { get; init; }

/// <summary>PostgreSQL password.</summary>
public required string Password { get; init; }

/// <summary>The pgmq queue name.</summary>
public required string QueueName { get; init; }

/// <summary>
/// Builds a connection string from the structured properties.
/// </summary>
public string ToConnectionString() =>
$"Host={Host};Port={Port};Database={Database};Username={Username};Password={Password}";
}
65 changes: 65 additions & 0 deletions src/Motor.Extensions.Hosting.PgMq/PgMqHostBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System;
using CloudNative.CloudEvents;
using CloudNative.CloudEvents.SystemTextJson;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.PgMq.Options;
using Npgmq;
using MSOptions = Microsoft.Extensions.Options.Options;

namespace Motor.Extensions.Hosting.PgMq;

public static class PgMqHostBuilderExtensions
{
public static void AddPgMqWithConfig<T>(this IConsumerBuilder<T> builder, IConfiguration config)
where T : notnull
{
var options =
config.Get<PgMqConsumerOptions>()
?? throw new InvalidOperationException("PgMqConsumerOptions configuration section is missing or invalid");
config.Bind(options);

builder.AddConsumer(sp => new PgMqMessageConsumer<T>(
options,
sp.GetRequiredService<ILogger<PgMqMessageConsumer<T>>>(),
sp.GetRequiredService<IHostApplicationLifetime>(),
sp.GetRequiredService<IApplicationNameService>(),
new NpgmqClient(options.ToConnectionString())
));
}

public static void AddPgMq<T>(this IConsumerBuilder<T> builder, string configSection = "PgMqConsumer")
where T : notnull
{
builder.AddPgMqWithConfig(builder.Context.Configuration.GetSection(configSection));
}

public static void AddPgMqWithConfig<T>(this IPublisherBuilder<T> builder, IConfiguration config)
where T : notnull
{
builder.AddTransient<CloudEventFormatter, JsonEventFormatter>();

var options =
config.Get<PgMqPublisherOptions>()
?? throw new InvalidOperationException("PgMqPublisherOptions configuration section is missing or invalid");
config.Bind(options);

builder.Configure<PgMqPublisherOptions>(config);
builder.AddPublisher(sp => new PgMqMessageProducer<T>(
MSOptions.Create(options),
sp.GetRequiredService<IOptions<PublisherOptions>>(),
new NpgmqClient(options.ToConnectionString())
));
}

public static void AddPgMq<T>(this IPublisherBuilder<T> builder, string configSection = "PgMqPublisher")
where T : notnull
{
builder.AddPgMqWithConfig(builder.Context.Configuration.GetSection(configSection));
}
}
Loading
Loading