Skip to content

Commit be7fecd

Browse files
committed
Publishing JSON format is broken; Mark as unsupported for now
1 parent 7a401df commit be7fecd

2 files changed

Lines changed: 18 additions & 41 deletions

File tree

src/Motor.Extensions.Hosting.PgMq/PgMqMessageProducer.cs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,11 @@ public async Task StartAsync(CancellationToken token = default)
7272
/// <param name="token">A cancellation token.</param>
7373
/// <returns>A <see cref="Task"/> that completes when the message has been added to the database.</returns>
7474
/// <remarks>
75-
/// <list type="bullet">
76-
/// <item>
77-
/// In Protocol format (CloudEvents Binary Content Mode), CloudEvent attributes are written as Npgmq message
78-
/// headers and the raw payload bytes are sent as the message body.
79-
/// </item>
80-
/// <item>
81-
/// In JSON format (CloudEvents Structured Content Mode), the full CloudEvent – including all attributes and
82-
/// the payload – is encoded as a single JSON envelope. No separate headers are used.
83-
/// </item>
84-
/// </list>
75+
/// Only the protocol format (CloudEvents Binary Content Mode) is currently supported. CloudEvent attributes
76+
/// are written as Npgmq message headers, and the raw payload bytes are sent as the message body.
77+
/// The JSON mode is not yet supported by this producer.
8578
/// </remarks>
79+
/// <exception cref="UnhandledCloudEventFormatException">If an unsupported cloud event format is specified.</exception>
8680
public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, CancellationToken token = default)
8781
{
8882
switch (_publisherOptions.CloudEventFormat)
@@ -91,16 +85,6 @@ public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, C
9185
var headers = BuildHeaders(motorCloudEvent);
9286
await _npgmqClient.SendAsync(_options.QueueName, motorCloudEvent.TypedData, headers, token);
9387
break;
94-
case CloudEventFormat.Json:
95-
var jsonBytes = _cloudEventFormatter.EncodeStructuredModeMessage(
96-
motorCloudEvent.ConvertToCloudEvent(),
97-
out _
98-
);
99-
// Convert ReadOnlyMemory<byte> to string so that NpgmqClient stores the JSON as-is (JSONB).
100-
// Passing the raw bytes directly would cause Npgmq to Base64-encode them via JsonSerializer.
101-
var json = Encoding.UTF8.GetString(jsonBytes.ToArray());
102-
await _npgmqClient.SendAsync(_options.QueueName, json, token);
103-
break;
10488
default:
10589
throw new UnhandledCloudEventFormatException(_publisherOptions.CloudEventFormat);
10690
}

test/Motor.Extensions.Hosting.PgMq_IntegrationTest/PgMqIntegrationTests.cs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ public async Task Consume_ProduceAndConsumeProtocolFormat_ConsumedDataEqualsPubl
4949
await producer.PublishMessageAsync(
5050
MotorCloudEvent.CreateTestCloudEvent(Encoding.UTF8.GetBytes(expectedMessage))
5151
);
52-
var received = await Task.WhenAny(tcs.Task, executeTask, Task.Delay(TimeSpan.FromSeconds(30))) == tcs.Task ? await tcs.Task : null;
52+
var received =
53+
await Task.WhenAny(tcs.Task, executeTask, Task.Delay(TimeSpan.FromSeconds(30))) == tcs.Task
54+
? await tcs.Task
55+
: null;
5356
await cts.CancelAsync();
5457
await executeTask;
5558
await consumer.StopAsync();
@@ -58,31 +61,18 @@ await producer.PublishMessageAsync(
5861
}
5962

6063
[Fact(Timeout = 50000)]
61-
public async Task Consume_ProduceAndConsumeJsonFormat_ConsumedDataEqualsPublished()
64+
public async Task Consume_ProduceAndConsumeJsonFormat_ThrowsUnhandledCloudEventFormatException()
6265
{
6366
const string expectedMessage = "hello-json";
6467
var queueName = _randomizerString.Generate()!;
6568
await using var producer = GetProducer<string>(queueName, CloudEventFormat.Json);
66-
await using var consumer = GetConsumer<string>(queueName);
6769
await producer.StartAsync();
68-
await consumer.StartAsync();
69-
var tcs = new TaskCompletionSource<byte[]>();
70-
consumer.ConsumeCallbackAsync = (evt, _) =>
70+
await Assert.ThrowsAsync<UnhandledCloudEventFormatException>(async () =>
7171
{
72-
tcs.TrySetResult(evt.TypedData);
73-
return Task.FromResult(ProcessedMessageStatus.Success);
74-
};
75-
using var cts = new CancellationTokenSource();
76-
var executeTask = consumer.ExecuteAsync(cts.Token);
77-
await producer.PublishMessageAsync(
78-
MotorCloudEvent.CreateTestCloudEvent(Encoding.UTF8.GetBytes(expectedMessage))
79-
);
80-
var received = await Task.WhenAny(tcs.Task, executeTask, Task.Delay(TimeSpan.FromSeconds(30))) == tcs.Task ? await tcs.Task : null;
81-
await cts.CancelAsync();
82-
await executeTask;
83-
await consumer.StopAsync();
84-
Assert.NotNull(received);
85-
Assert.Equal(expectedMessage, Encoding.UTF8.GetString(received!));
72+
await producer.PublishMessageAsync(
73+
MotorCloudEvent.CreateTestCloudEvent(Encoding.UTF8.GetBytes(expectedMessage))
74+
);
75+
});
8676
}
8777

8878
[Fact(Timeout = 50000)]
@@ -269,7 +259,10 @@ public async Task Consume_SuccessfulMessage_ProtocolFormat_CloudEventAttributesR
269259
null
270260
);
271261
await producer.PublishMessageAsync(cloudEvent);
272-
var receivedEvent = await Task.WhenAny(tcs.Task, executeTask, Task.Delay(TimeSpan.FromSeconds(30))) == tcs.Task ? await tcs.Task : null;
262+
var receivedEvent =
263+
await Task.WhenAny(tcs.Task, executeTask, Task.Delay(TimeSpan.FromSeconds(30))) == tcs.Task
264+
? await tcs.Task
265+
: null;
273266
await cts.CancelAsync();
274267
await executeTask;
275268
await consumer.StopAsync();

0 commit comments

Comments
 (0)