|
| 1 | +using Microsoft.Extensions.DependencyInjection; |
| 2 | + |
| 3 | +[NotInParallel] |
| 4 | +public class AddFromIncomingDefaultNameTests |
| 5 | +{ |
| 6 | + static ManualResetEvent resetEvent = new(false); |
| 7 | + static byte[]? receivedBytes; |
| 8 | + |
| 9 | + [Test] |
| 10 | + public async Task RoundTripsWithDefaultName() |
| 11 | + { |
| 12 | + receivedBytes = null; |
| 13 | + resetEvent.Reset(); |
| 14 | + |
| 15 | + await using var database = await Connection.SqlInstance.Build("AddFromIncomingDefaultName"); |
| 16 | + var connectionString = database.ConnectionString; |
| 17 | + var databaseName = new SqlConnectionStringBuilder(connectionString).InitialCatalog; |
| 18 | + |
| 19 | + var configuration = new EndpointConfiguration("SqlAddFromIncomingDefaultName"); |
| 20 | + SqlConnection NewConnection() => new(connectionString); |
| 21 | + var attachments = configuration.EnableAttachments(NewConnection, TimeToKeep.Default, database: databaseName, table: "Attachments"); |
| 22 | + configuration.UseSerialization<SystemJsonSerializer>(); |
| 23 | + configuration.UsePersistence<LearningPersistence>(); |
| 24 | + configuration.DisableRetries(); |
| 25 | + configuration.EnableInstallers(); |
| 26 | + configuration.PurgeOnStartup(true); |
| 27 | + attachments.DisableCleanupTask(); |
| 28 | + configuration.RegisterComponents(_ => _.AddSingleton(resetEvent)); |
| 29 | + var transport = configuration.UseTransport<LearningTransport>(); |
| 30 | + transport.StorageDirectory(Path.Combine(Path.GetTempPath(), "AddFromIncomingDefaultName")); |
| 31 | + transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive); |
| 32 | + |
| 33 | + var endpoint = await Endpoint.Start(configuration); |
| 34 | + |
| 35 | + // Sender writes the incoming attachment under the default name. |
| 36 | + var sendOptions = new SendOptions(); |
| 37 | + sendOptions.RouteToThisEndpoint(); |
| 38 | + var outgoing = sendOptions.Attachments(); |
| 39 | + outgoing.Add(BuildStream("hello")); |
| 40 | + await endpoint.Send(new InMessage(), sendOptions); |
| 41 | + |
| 42 | + if (!resetEvent.WaitOne(TimeSpan.FromSeconds(20))) |
| 43 | + { |
| 44 | + await endpoint.Stop(); |
| 45 | + throw new("TimedOut"); |
| 46 | + } |
| 47 | + |
| 48 | + await endpoint.Stop(); |
| 49 | + |
| 50 | + await Assert.That(Encoding.UTF8.GetString(receivedBytes!)).IsEqualTo("HELLO"); |
| 51 | + } |
| 52 | + |
| 53 | + static MemoryStream BuildStream(string content) => |
| 54 | + new(Encoding.UTF8.GetBytes(content)); |
| 55 | + |
| 56 | + class InMessage : |
| 57 | + IMessage; |
| 58 | + |
| 59 | + class OutMessage : |
| 60 | + IMessage; |
| 61 | + |
| 62 | + class InHandler : |
| 63 | + IHandleMessages<InMessage> |
| 64 | + { |
| 65 | + public Task Handle(InMessage message, HandlerContext context) |
| 66 | + { |
| 67 | + var replyOptions = new ReplyOptions(); |
| 68 | + var outgoing = replyOptions.Attachments(); |
| 69 | + // No-name overload reads the default-named incoming and registers the result under the default name. |
| 70 | + outgoing.AddFromIncoming(transform: async (source, sink, cancel) => |
| 71 | + { |
| 72 | + using var reader = new StreamReader(source, leaveOpen: true); |
| 73 | + var content = await reader.ReadToEndAsync(cancel); |
| 74 | + await using var writer = new StreamWriter(sink, leaveOpen: true); |
| 75 | + await writer.WriteAsync(content.ToUpperInvariant()); |
| 76 | + }); |
| 77 | + return context.Reply(new OutMessage(), replyOptions); |
| 78 | + } |
| 79 | + } |
| 80 | + |
| 81 | + class OutHandler : |
| 82 | + IHandleMessages<OutMessage> |
| 83 | + { |
| 84 | + public async Task Handle(OutMessage message, HandlerContext context) |
| 85 | + { |
| 86 | + await using var memoryStream = new MemoryStream(); |
| 87 | + var incoming = context.Attachments(); |
| 88 | + // No-name CopyTo reads the default-named attachment. |
| 89 | + await incoming.CopyTo(memoryStream, context.CancellationToken); |
| 90 | + receivedBytes = memoryStream.ToArray(); |
| 91 | + resetEvent.Set(); |
| 92 | + } |
| 93 | + } |
| 94 | +} |
0 commit comments