-
-
Notifications
You must be signed in to change notification settings - Fork 704
New major version of Log Exporter app #1580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 35 commits
59db1c4
5439bb1
ddad746
c1754e9
8bdab01
7a25db9
98c3b4a
2980cde
0135361
3e1d3bd
419d542
ef87c9b
dbab7f6
6469ef9
25d511e
b3c9b6a
f53f897
43a5991
ffe1644
96cbbac
49358e3
9318522
ebad5f5
f641750
77d5da4
587dbff
259e710
01ca2d1
1193ee0
b920e53
7fe820b
35c9b2c
20b4a29
2ac557b
cbf1979
261ef81
6431cd0
7f520f2
c5382b1
cd75a91
8b7bcab
d962238
9a96a91
ca25751
152bba8
051f2aa
24b3968
bc9fd89
37c1016
4a51fad
4599d90
e113420
34343ea
6c52654
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,195 +21,265 @@ You should have received a copy of the GNU General Public License | |
| using DnsServerCore.ApplicationCommon; | ||
| using LogExporter.Strategy; | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Net; | ||
| using System.Threading; | ||
| using System.Threading.Channels; | ||
| using System.Threading.Tasks; | ||
| using TechnitiumLibrary; | ||
| using TechnitiumLibrary.Net.Dns; | ||
|
|
||
| namespace LogExporter | ||
| { | ||
| public sealed class App : IDnsApplication, IDnsQueryLogger | ||
| public sealed class App : IDnsApplication, IDnsQueryLogger, IDisposable | ||
| { | ||
| #region variables | ||
|
|
||
| IDnsServer? _dnsServer; | ||
| AppConfig? _config; | ||
|
|
||
| readonly ExportManager _exportManager = new ExportManager(); | ||
|
|
||
| bool _enableLogging; | ||
|
|
||
| readonly ConcurrentQueue<LogEntry> _queuedLogs = new ConcurrentQueue<LogEntry>(); | ||
| readonly Timer _queueTimer; | ||
| const int QUEUE_TIMER_INTERVAL = 10000; | ||
| const int BULK_INSERT_COUNT = 1000; | ||
|
|
||
| readonly ExportManager _exportManager = new ExportManager(); | ||
| Task? _backgroundTask; | ||
| Channel<LogEntry> _channel = default!; | ||
| AppConfig? _config; | ||
| CancellationTokenSource? _cts; | ||
| bool _disposed; | ||
|
|
||
| #endregion | ||
| IDnsServer? _dnsServer; | ||
| volatile bool _enableLogging; // volatile to improve cross-thread visibility | ||
| long _droppedCount; | ||
| DateTime _lastDropLog = DateTime.UtcNow; | ||
| static readonly TimeSpan DropLogInterval = TimeSpan.FromSeconds(5); | ||
| #endregion variables | ||
|
|
||
| #region constructor | ||
|
|
||
| public App() | ||
| { | ||
| _queueTimer = new Timer(HandleExportLogCallback); | ||
| } | ||
| { } | ||
|
|
||
| #endregion | ||
| #endregion constructor | ||
|
|
||
| #region IDisposable | ||
|
|
||
| public void Dispose() | ||
| { | ||
| Dispose(disposing: true); | ||
| GC.SuppressFinalize(this); | ||
| } | ||
| if (_disposed) | ||
| return; | ||
|
|
||
| private void Dispose(bool disposing) | ||
| { | ||
| if (!_disposed) | ||
| _disposed = true; | ||
|
|
||
| // Stop accepting new entries immediately; cannot throw. | ||
| _enableLogging = false; | ||
|
|
||
| // ADR: Previously Dispose swallowed all exceptions, hiding exporter or | ||
| // shutdown failures and making diagnosis impossible. We now log every | ||
| // unexpected exception without rethrowing, preserving best-effort teardown | ||
| // while ensuring operational visibility. | ||
| try | ||
| { | ||
| if (disposing) | ||
| try | ||
| { | ||
| _queueTimer?.Dispose(); | ||
|
|
||
| ExportLogsAsync().Sync(); //flush any pending logs | ||
| _channel?.Writer.TryComplete(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _dnsServer?.WriteLog(ex); | ||
| } | ||
|
|
||
| _exportManager.Dispose(); | ||
| try | ||
| { | ||
| _cts?.Cancel(); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _dnsServer?.WriteLog(ex); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _dnsServer?.WriteLog(ex); | ||
| } | ||
|
|
||
| _disposed = true; | ||
| try | ||
| { | ||
| _backgroundTask?.GetAwaiter().GetResult(); | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Expected; no log needed. | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _dnsServer?.WriteLog(ex); | ||
| } | ||
|
|
||
| _exportManager.Dispose(); | ||
| GC.SuppressFinalize(this); | ||
| } | ||
|
|
||
| #endregion | ||
| #endregion IDisposable | ||
|
|
||
| #region public | ||
|
|
||
| public Task InitializeAsync(IDnsServer dnsServer, string config) | ||
| { | ||
| _dnsServer = dnsServer; | ||
| _config = AppConfig.Deserialize(config); | ||
|
|
||
| if (_config is null) | ||
| throw new DnsClientException("Invalid application configuration."); | ||
|
|
||
| if (_config.FileTarget!.Enabled) | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(FileExportStrategy)); | ||
| _exportManager.AddStrategy(new FileExportStrategy(_config.FileTarget!.Path)); | ||
| } | ||
| else | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(FileExportStrategy)); | ||
| } | ||
| _config = AppConfig.Deserialize(config) | ||
| ?? throw new DnsClientException("Invalid application configuration."); | ||
|
|
||
| if (_config.HttpTarget!.Enabled) | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(HttpExportStrategy)); | ||
| _exportManager.AddStrategy(new HttpExportStrategy(_config.HttpTarget.Endpoint, _config.HttpTarget.Headers)); | ||
| } | ||
| else | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(HttpExportStrategy)); | ||
| } | ||
| ConfigureStrategies(); | ||
|
|
||
| if (_config.SyslogTarget!.Enabled) | ||
| // If no sinks exist, never enable logging. | ||
| if (!_exportManager.HasStrategy()) | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(SyslogExportStrategy)); | ||
| _exportManager.AddStrategy(new SyslogExportStrategy(_config.SyslogTarget.Address, _config.SyslogTarget.Port, _config.SyslogTarget.Protocol)); | ||
| _enableLogging = false; | ||
| return Task.CompletedTask; | ||
| } | ||
| else | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(SyslogExportStrategy)); | ||
| } | ||
|
|
||
| _enableLogging = _exportManager.HasStrategy(); | ||
|
|
||
| if (_enableLogging) | ||
| _queueTimer.Change(QUEUE_TIMER_INTERVAL, Timeout.Infinite); | ||
| else | ||
| _queueTimer.Change(Timeout.Infinite, Timeout.Infinite); | ||
| // Create bounded channel to avoid memory explosion | ||
| _channel = Channel.CreateBounded<LogEntry>( | ||
| new BoundedChannelOptions(_config!.MaxQueueSize) | ||
| { | ||
| SingleReader = true, | ||
| SingleWriter = false, | ||
| FullMode = BoundedChannelFullMode.DropWrite | ||
| }); | ||
|
|
||
| // Start background worker | ||
| _cts = new CancellationTokenSource(); | ||
| _backgroundTask = Task.Run(() => BackgroundWorkerAsync(_cts.Token)); | ||
|
|
||
| // ADR: _enableLogging is intentionally set last so that any caller observing | ||
| // _enableLogging == true can rely on the entire logging pipeline being fully | ||
| // constructed (channel, CTS, background worker). This prevents subtle race | ||
| // conditions where concurrent InsertLogAsync calls see "enabled" before internal | ||
| // structures are ready. | ||
| _enableLogging = true; | ||
|
|
||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, IPEndPoint remoteEP, DnsTransportProtocol protocol, DnsDatagram response) | ||
| public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, | ||
| IPEndPoint remoteEP, DnsTransportProtocol protocol, | ||
| DnsDatagram response) | ||
| { | ||
| if (_enableLogging) | ||
| { | ||
| if (_queuedLogs.Count < _config!.MaxQueueSize) | ||
| _queuedLogs.Enqueue(new LogEntry(timestamp, remoteEP, protocol, request, response, _config.EnableEdnsLogging)); | ||
| var entry = new LogEntry(timestamp, remoteEP, protocol, request, response, _config!.EnableEdnsLogging); | ||
|
|
||
| if (!_channel.Writer.TryWrite(entry)) | ||
| { | ||
| Interlocked.Increment(ref _droppedCount); | ||
|
|
||
| var now = DateTime.UtcNow; | ||
| if (now - _lastDropLog >= DropLogInterval) | ||
| { | ||
| var dropped = Interlocked.Exchange(ref _droppedCount, 0); | ||
| _lastDropLog = now; | ||
| _dnsServer?.WriteLog($"Log export queue full; dropped {dropped} entries over last {DropLogInterval.TotalSeconds:F0}s."); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| #endregion | ||
| #endregion public | ||
|
|
||
| #region private | ||
|
|
||
| private async Task ExportLogsAsync() | ||
| private async Task BackgroundWorkerAsync(CancellationToken token) | ||
| { | ||
| // ADR: Reuse this list buffer to avoid GC churn during high-volume logging. | ||
| var batch = new List<LogEntry>(BULK_INSERT_COUNT); | ||
|
|
||
| try | ||
| { | ||
| List<LogEntry> logs = new List<LogEntry>(BULK_INSERT_COUNT); | ||
|
|
||
| while (true) | ||
| while (await _channel.Reader.WaitToReadAsync(token).ConfigureAwait(false)) | ||
| { | ||
| while (logs.Count < BULK_INSERT_COUNT && _queuedLogs.TryDequeue(out LogEntry? log)) | ||
| while (batch.Count < BULK_INSERT_COUNT && | ||
| _channel.Reader.TryRead(out var entry)) | ||
| { | ||
| logs.Add(log); | ||
| } | ||
|
|
||
| if (logs.Count < 1) | ||
| break; | ||
| if (token.IsCancellationRequested) | ||
| break; | ||
|
|
||
| await _exportManager.ImplementStrategyAsync(logs); | ||
| batch.Add(entry); | ||
| } | ||
|
|
||
| logs.Clear(); | ||
| if (batch.Count > 0) | ||
| { | ||
| await _exportManager.ImplementStrategyAsync(batch, token).ConfigureAwait(false); | ||
| batch.Clear(); // REUSE — do not reassign | ||
| } | ||
| } | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| await DrainRemainingLogs(batch, token).ConfigureAwait(false); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _dnsServer?.WriteLog(ex); | ||
| await DrainRemainingLogs(batch, token).ConfigureAwait(false); | ||
| } | ||
| } | ||
|
|
||
| private async void HandleExportLogCallback(object? state) | ||
| private void ConfigureStrategies() | ||
| { | ||
| _exportManager.RemoveStrategy(typeof(ConsoleExportStrategy)); | ||
| if (_config!.ConsoleTarget != null && _config.ConsoleTarget.Enabled) | ||
| _exportManager.AddStrategy(new ConsoleExportStrategy()); | ||
|
|
||
| _exportManager.RemoveStrategy(typeof(FileExportStrategy)); | ||
| if (_config.FileTarget != null && _config.FileTarget.Enabled) | ||
| _exportManager.AddStrategy(new FileExportStrategy(_config.FileTarget.Path)); | ||
|
|
||
| _exportManager.RemoveStrategy(typeof(HttpExportStrategy)); | ||
| if (_config.HttpTarget != null && _config.HttpTarget.Enabled) | ||
| _exportManager.AddStrategy( | ||
| new HttpExportStrategy(_config.HttpTarget.Endpoint, _config.HttpTarget.Headers)); | ||
|
|
||
| _exportManager.RemoveStrategy(typeof(SyslogExportStrategy)); | ||
| if (_config.SyslogTarget != null && _config.SyslogTarget.Enabled) | ||
| _exportManager.AddStrategy( | ||
| new SyslogExportStrategy(_config.SyslogTarget.Address, | ||
| _config.SyslogTarget.Port!.Value, | ||
|
||
| _config.SyslogTarget.Protocol!)); | ||
|
zbalkan marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| private async Task DrainRemainingLogs(List<LogEntry> batch, CancellationToken token) | ||
| { | ||
| try | ||
| { | ||
| // Process logs within the timer interval, then let the timer reschedule | ||
| await ExportLogsAsync(); | ||
| while (_channel!.Reader.TryRead(out var item)) | ||
| { | ||
| if (token.IsCancellationRequested) | ||
| break; | ||
|
|
||
| batch.Add(item); | ||
|
|
||
| if (batch.Count >= BULK_INSERT_COUNT) | ||
| { | ||
| await _exportManager.ImplementStrategyAsync(batch, token).ConfigureAwait(false); | ||
| batch.Clear(); // reuse instead of creating new list | ||
| } | ||
| } | ||
|
|
||
| if (batch.Count > 0 && !token.IsCancellationRequested) | ||
| { | ||
| await _exportManager.ImplementStrategyAsync(batch, token).ConfigureAwait(false); | ||
| batch.Clear(); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _dnsServer?.WriteLog(ex); | ||
| } | ||
| finally | ||
| { | ||
| try | ||
| { | ||
| _queueTimer?.Change(QUEUE_TIMER_INTERVAL, Timeout.Infinite); | ||
| } | ||
| catch (ObjectDisposedException) | ||
| { } | ||
| } | ||
| } | ||
|
|
||
| #endregion | ||
| #endregion private | ||
|
|
||
| #region properties | ||
|
|
||
| public string Description | ||
| { | ||
| get { return "Allows exporting query logs to third party sinks. It supports exporting to File, HTTP endpoint, and Syslog (UDP, TCP, TLS, and Local protocols)."; } | ||
| } | ||
| public string Description => | ||
| "Allows exporting query logs to third party sinks. Supports exporting to File, HTTP endpoint, and Syslog."; | ||
|
|
||
| #endregion | ||
| #endregion properties | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
_lastDropLogfield is accessed and modified on lines 171 and 174 without synchronization. Multiple threads callingInsertLogAsyncconcurrently could race on reading and writing this field, potentially causing duplicate log messages or skipped logging windows. Consider usingInterlocked.CompareExchangeto atomically update_lastDropLogor protecting it with a lock to ensure thread-safe access.