-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathKafkaQueueTransactedMessageAdapter.cs
More file actions
56 lines (46 loc) · 1.69 KB
/
KafkaQueueTransactedMessageAdapter.cs
File metadata and controls
56 lines (46 loc) · 1.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
using Microsoft.Extensions.Logging;
using ViennaNET.Messaging.Messages;
namespace ViennaNET.Messaging.KafkaQueue
{
internal class KafkaQueueTransactedMessageAdapter : KafkaQueueMessageAdapter, IMessageAdapterWithTransactions
{
private readonly ILogger<KafkaQueueTransactedMessageAdapter> _logger;
private readonly KafkaQueueConfiguration _queueConfiguration;
public KafkaQueueTransactedMessageAdapter(
KafkaQueueConfiguration queueConfiguration,
IKafkaConnectionFactory connectionFactory,
ILogger<KafkaQueueTransactedMessageAdapter> logger) : base(queueConfiguration, connectionFactory, logger)
{
_queueConfiguration = queueConfiguration;
_logger = logger;
}
public override void Connect()
{
base.Connect();
if (_producer != null)
{
_producer.InitTransactions(
TimeSpan.FromMilliseconds(_queueConfiguration.InitTransactionsTimeout));
_producer.BeginTransaction();
_logger.LogDebug("Transaction for queue {queueId} started", _queueConfiguration.Id);
}
}
public override void Disconnect()
{
if (_producer != null)
{
_producer.CommitTransaction();
_logger.LogDebug("Transaction for queue {queueId} committed", _queueConfiguration.Id);
}
base.Disconnect();
}
public void CommitIfTransacted(BaseMessage message)
{
_consumer.Commit();
}
public void RollbackIfTransacted()
{
// Just don't commit
}
}
}