Skip to content

Commit 914e0f0

Browse files
author
Timothy Dodd
committed
Refactor agent config and heartbeat handling
Enhanced agent configuration management by introducing global settings and leveraging the heartbeat mechanism to dynamically deliver configurations. - Added `HeartbeatResponse` to streamline config updates via heartbeat. - Increased heartbeat interval to 60 seconds. - Introduced global configuration endpoints and UI for shared settings. - Added `AgentId` to logs for better traceability. - Updated database schema to include `AgentId` column and index. - Improved log details modal with additional metadata. - Refined UI for managing global and per-agent configurations. - Removed periodic config polling in favor of heartbeat-based updates. These changes improve performance, usability, and traceability.
1 parent 152bf79 commit 914e0f0

17 files changed

Lines changed: 342 additions & 72 deletions

src/LogMkAgent/Common/ApiJsonSerializerContext.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace LogMkAgent;
1111
[JsonSerializable(typeof(ValidationSettings))]
1212
[JsonSerializable(typeof(AgentHeartbeat))]
1313
[JsonSerializable(typeof(AgentConfig))]
14+
[JsonSerializable(typeof(HeartbeatResponse))]
1415
public partial class ApiJsonSerializerContext : JsonSerializerContext
1516
{
1617
}

src/LogMkAgent/Services/AgentRegistrationService.cs

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using System.Net.Http.Json;
12
using System.Net.NetworkInformation;
23
using System.Net.Sockets;
34
using System.Reflection;
5+
using System.Text.Json;
46
using LogMkCommon;
57

68
namespace LogMkAgent.Services;
@@ -17,8 +19,13 @@ public class AgentRegistrationService : BackgroundService
1719
private readonly LogWatcher _logWatcher;
1820
private readonly ILogger<AgentRegistrationService> _logger;
1921

20-
private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(30);
21-
private readonly TimeSpan _configPollInterval = TimeSpan.FromSeconds(60);
22+
private readonly TimeSpan _heartbeatInterval = TimeSpan.FromSeconds(60);
23+
24+
private static readonly JsonSerializerOptions _jsonOptions = new()
25+
{
26+
TypeInfoResolver = new ApiJsonSerializerContext(),
27+
PropertyNameCaseInsensitive = true
28+
};
2229

2330
private readonly DateTime _startedAt = DateTime.UtcNow;
2431
private readonly string _agentId;
@@ -42,48 +49,45 @@ public AgentRegistrationService(
4249
_version = Assembly.GetExecutingAssembly().GetName().Version?.ToString();
4350
_ipAddress = TryGetLocalIp();
4451

52+
// Stamp every outgoing log line with this agent's id so the API/UI
53+
// can show which node a log came from.
54+
_batchingService.AgentId = _agentId;
55+
4556
_logger.LogInformation("AgentRegistrationService starting: AgentId={AgentId}, Hostname={Hostname}, Version={Version}",
4657
_agentId, _hostname, _version);
4758
}
4859

4960
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
5061
{
51-
// Pull config once on startup before the regular polling loop
52-
await PullConfigAsync(stoppingToken).ConfigureAwait(false);
53-
54-
var lastConfigPull = DateTime.UtcNow;
62+
// Send an initial heartbeat right away so the agent's config is applied
63+
// before the first batch is processed.
64+
try
65+
{
66+
await SendHeartbeatAsync(stoppingToken).ConfigureAwait(false);
67+
}
68+
catch (Exception ex)
69+
{
70+
_logger.LogWarning(ex, "Initial heartbeat failed");
71+
}
5572

5673
while (!stoppingToken.IsCancellationRequested)
5774
{
5875
try
5976
{
60-
await SendHeartbeatAsync(stoppingToken).ConfigureAwait(false);
61-
}
62-
catch (Exception ex)
63-
{
64-
_logger.LogWarning(ex, "Heartbeat failed");
77+
await Task.Delay(_heartbeatInterval, stoppingToken).ConfigureAwait(false);
6578
}
66-
67-
if (DateTime.UtcNow - lastConfigPull >= _configPollInterval)
79+
catch (OperationCanceledException)
6880
{
69-
try
70-
{
71-
await PullConfigAsync(stoppingToken).ConfigureAwait(false);
72-
lastConfigPull = DateTime.UtcNow;
73-
}
74-
catch (Exception ex)
75-
{
76-
_logger.LogWarning(ex, "Config pull failed");
77-
}
81+
break;
7882
}
7983

8084
try
8185
{
82-
await Task.Delay(_heartbeatInterval, stoppingToken).ConfigureAwait(false);
86+
await SendHeartbeatAsync(stoppingToken).ConfigureAwait(false);
8387
}
84-
catch (OperationCanceledException)
88+
catch (Exception ex)
8589
{
86-
break;
90+
_logger.LogWarning(ex, "Heartbeat failed");
8791
}
8892
}
8993
}
@@ -113,28 +117,24 @@ private async Task SendHeartbeatAsync(CancellationToken cancellationToken)
113117
if (!response.IsSuccessStatusCode)
114118
{
115119
_logger.LogDebug("Heartbeat returned non-success status {StatusCode}", response.StatusCode);
120+
return;
116121
}
117-
}
118122

119-
private async Task PullConfigAsync(CancellationToken cancellationToken)
120-
{
121123
try
122124
{
123-
var config = await _apiClient.GetDataAsync<AgentConfig>($"api/agents/{Uri.EscapeDataString(_agentId)}/config")
124-
.ConfigureAwait(false);
125+
var payload = await response.Content
126+
.ReadFromJsonAsync<HeartbeatResponse>(_jsonOptions, cancellationToken)
127+
.ConfigureAwait(false);
125128

126-
if (config == null)
129+
if (payload?.Config != null)
127130
{
128-
_logger.LogDebug("No agent config returned for {AgentId}", _agentId);
129-
return;
131+
_logWatcher.ApplyConfig(payload.Config);
132+
_batchingService.ApplyConfig(payload.Config);
130133
}
131-
132-
_logWatcher.ApplyConfig(config);
133-
_batchingService.ApplyConfig(config);
134134
}
135-
catch (HttpRequestException ex)
135+
catch (Exception ex)
136136
{
137-
_logger.LogDebug(ex, "Could not reach API to pull config");
137+
_logger.LogDebug(ex, "Failed to parse heartbeat response");
138138
}
139139
}
140140

src/LogMkAgent/Services/BatchService.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public void ApplyConfig(AgentConfig config)
115115
_options.MaxBatchSize, _maxBatchesPerMinute, _options.MaxQueueSize);
116116
}
117117

118+
public string? AgentId { get; set; }
119+
118120
public void AddData(LogLine data)
119121
{
120122
if (_disposed)
@@ -129,6 +131,11 @@ public void AddData(LogLine data)
129131
return;
130132
}
131133

134+
if (string.IsNullOrEmpty(data.AgentId))
135+
{
136+
data.AgentId = AgentId;
137+
}
138+
132139
// Enforce queue size limit - drop 10% of queue to make room and avoid per-item churn
133140
if (_options.MaxQueueSize > 0 && _batchData.Count >= _options.MaxQueueSize)
134141
{

src/LogMkApi/Controllers/AgentController.cs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ public AgentController(AgentRepo agentRepo, ILogger<AgentController> logger)
2020
_logger = logger;
2121
}
2222

23-
// Called by agents — anonymous like other agent endpoints
23+
// Called by agents — anonymous like other agent endpoints. Returns the
24+
// latest config so changes saved in the web UI are applied on the next
25+
// heartbeat (no separate poll needed).
2426
[AllowAnonymous]
2527
[HttpPost("heartbeat")]
26-
public async Task<IActionResult> Heartbeat([FromBody] AgentHeartbeat heartbeat)
28+
public async Task<ActionResult<HeartbeatResponse>> Heartbeat([FromBody] AgentHeartbeat heartbeat)
2729
{
2830
if (heartbeat == null || string.IsNullOrWhiteSpace(heartbeat.AgentId))
2931
{
@@ -33,7 +35,12 @@ public async Task<IActionResult> Heartbeat([FromBody] AgentHeartbeat heartbeat)
3335
try
3436
{
3537
await _agentRepo.UpsertHeartbeatAsync(heartbeat);
36-
return Ok(new { Status = "ok", ReceivedAt = DateTime.UtcNow });
38+
var config = await _agentRepo.GetMergedConfigAsync(heartbeat.AgentId);
39+
return Ok(new HeartbeatResponse
40+
{
41+
ReceivedAt = DateTime.UtcNow,
42+
Config = config
43+
});
3744
}
3845
catch (Exception ex)
3946
{
@@ -42,8 +49,32 @@ public async Task<IActionResult> Heartbeat([FromBody] AgentHeartbeat heartbeat)
4249
}
4350
}
4451

45-
// Called by agents — anonymous so the agent can fetch its own config
46-
[AllowAnonymous]
52+
[Authorize]
53+
[HttpGet("global/config")]
54+
public async Task<ActionResult<AgentConfig>> GetGlobalConfig()
55+
{
56+
var config = await _agentRepo.GetGlobalConfigAsync();
57+
return Ok(config);
58+
}
59+
60+
[Authorize]
61+
[HttpPut("global/config")]
62+
public async Task<ActionResult<AgentConfig>> UpdateGlobalConfig([FromBody] AgentConfig config)
63+
{
64+
if (config == null)
65+
{
66+
return BadRequest(new { Error = "Config is required" });
67+
}
68+
69+
await _agentRepo.SaveGlobalConfigAsync(config, User.Identity?.Name);
70+
var saved = await _agentRepo.GetGlobalConfigAsync();
71+
_logger.LogInformation("Global agent config updated by {User}", User.Identity?.Name);
72+
return Ok(saved);
73+
}
74+
75+
// Called by the web UI to load raw per-agent config for editing.
76+
// Agents themselves get config via the heartbeat response (merged).
77+
[Authorize]
4778
[HttpGet("{agentId}/config")]
4879
public async Task<ActionResult<AgentConfig>> GetConfig(string agentId)
4980
{
@@ -52,6 +83,11 @@ public async Task<ActionResult<AgentConfig>> GetConfig(string agentId)
5283
return BadRequest(new { Error = "AgentId is required" });
5384
}
5485

86+
if (string.Equals(agentId, AgentRepo.GlobalConfigId, StringComparison.Ordinal))
87+
{
88+
return BadRequest(new { Error = "Use /api/agents/global/config for global settings" });
89+
}
90+
5591
var config = await _agentRepo.GetConfigAsync(agentId);
5692
return Ok(config);
5793
}
@@ -82,6 +118,11 @@ public async Task<ActionResult<AgentConfig>> UpdateConfig(string agentId, [FromB
82118
return BadRequest(new { Error = "Config is required" });
83119
}
84120

121+
if (string.Equals(agentId, AgentRepo.GlobalConfigId, StringComparison.Ordinal))
122+
{
123+
return BadRequest(new { Error = "Use /api/agents/global/config for global settings" });
124+
}
125+
85126
if (config.MaxBatchSize <= 0 || config.MaxBatchSize > 5000)
86127
{
87128
return BadRequest(new { Error = "MaxBatchSize must be between 1 and 5000" });

src/LogMkApi/Controllers/LogController.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ public async Task<ActionResult<LogResponse>> Create(
150150
SequenceNumber = logLine.SequenceNumber,
151151
Fingerprint = logLine.Fingerprint,
152152
BatchId = batchId,
153-
ReceivedAt = receivedAt.UtcDateTime
153+
ReceivedAt = receivedAt.UtcDateTime,
154+
AgentId = logLine.AgentId
154155
};
155156

156157
validLogs.Add(logEntity);

src/LogMkApi/Data/AgentRepo.cs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ namespace LogMkApi.Data;
77

88
public class AgentRepo
99
{
10+
public const string GlobalConfigId = "__global__";
11+
1012
private readonly DbConnectionFactory _dbFactory;
1113
private readonly ILogger<AgentRepo> _logger;
1214

@@ -69,11 +71,63 @@ public async Task UpsertHeartbeatAsync(AgentHeartbeat heartbeat)
6971
public async Task<List<AgentInfo>> GetAllAsync()
7072
{
7173
using var db = _dbFactory.CreateConnection();
72-
var query = db.From<AgentRecord>().OrderByDescending(x => x.LastSeenAt);
74+
var query = db.From<AgentRecord>()
75+
.Where(x => x.AgentId != GlobalConfigId)
76+
.OrderByDescending(x => x.LastSeenAt);
7377
var records = await db.SelectAsync(query);
7478
return records.Select(MapToInfo).ToList();
7579
}
7680

81+
public Task<AgentConfig> GetGlobalConfigAsync() => GetConfigAsync(GlobalConfigId);
82+
83+
public Task SaveGlobalConfigAsync(AgentConfig config, string? updatedBy) =>
84+
SaveConfigAsync(GlobalConfigId, config, updatedBy);
85+
86+
/// <summary>
87+
/// Returns the per-agent config merged with the global config.
88+
/// Scalar fields (MaxBatchSize, MaxDaysOld, DefaultLogLevel, ...) come from
89+
/// the per-agent config. Collection fields (PodLogLevels, IgnoredPods) are
90+
/// unioned across global + per-agent, with the per-agent value winning for
91+
/// any key conflict.
92+
/// </summary>
93+
public async Task<AgentConfig> GetMergedConfigAsync(string agentId)
94+
{
95+
if (string.Equals(agentId, GlobalConfigId, StringComparison.Ordinal))
96+
{
97+
return await GetGlobalConfigAsync();
98+
}
99+
100+
var globalConfig = await GetGlobalConfigAsync();
101+
var agentConfig = await GetConfigAsync(agentId);
102+
103+
var mergedPodLevels = new Dictionary<string, string>(globalConfig.PodLogLevels, StringComparer.OrdinalIgnoreCase);
104+
foreach (var kvp in agentConfig.PodLogLevels)
105+
{
106+
mergedPodLevels[kvp.Key] = kvp.Value;
107+
}
108+
109+
var mergedIgnored = new HashSet<string>(globalConfig.IgnoredPods, StringComparer.OrdinalIgnoreCase);
110+
foreach (var pod in agentConfig.IgnoredPods)
111+
{
112+
mergedIgnored.Add(pod);
113+
}
114+
115+
return new AgentConfig
116+
{
117+
AgentId = agentId,
118+
DefaultLogLevel = agentConfig.DefaultLogLevel,
119+
PodLogLevels = mergedPodLevels,
120+
IgnoredPods = mergedIgnored.ToList(),
121+
MaxDaysOld = agentConfig.MaxDaysOld,
122+
MaxLineLength = agentConfig.MaxLineLength,
123+
MaxBatchSize = agentConfig.MaxBatchSize,
124+
MaxBatchesPerMinute = agentConfig.MaxBatchesPerMinute,
125+
MaxQueueSize = agentConfig.MaxQueueSize,
126+
UpdatedAt = agentConfig.UpdatedAt > globalConfig.UpdatedAt ? agentConfig.UpdatedAt : globalConfig.UpdatedAt,
127+
UpdatedBy = agentConfig.UpdatedBy ?? globalConfig.UpdatedBy
128+
};
129+
}
130+
77131
public async Task<AgentInfo?> GetByIdAsync(string agentId)
78132
{
79133
using var db = _dbFactory.CreateConnection();

src/LogMkApi/Data/DatabaseInitializer.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,35 @@ public void CreateTable()
4343
}
4444
db.CreateTableIfNotExists<RefreshToken>(true);
4545
// Ensure critical indexes exist (migration for existing databases)
46+
EnsureLogColumns(db);
4647
EnsureLogIndexes(db);
4748
}
4849
}
4950

51+
private void EnsureLogColumns(System.Data.IDbConnection db)
52+
{
53+
try
54+
{
55+
var checkSql = @"
56+
SELECT COUNT(*)
57+
FROM information_schema.columns
58+
WHERE table_schema = DATABASE()
59+
AND table_name = 'Log'
60+
AND column_name = 'AgentId'";
61+
62+
var exists = db.ExecuteScalar<int>(checkSql) > 0;
63+
if (!exists)
64+
{
65+
_logger.LogInformation("Adding AgentId column to Log table");
66+
db.Execute("ALTER TABLE `Log` ADD COLUMN `AgentId` VARCHAR(100) NULL");
67+
}
68+
}
69+
catch (Exception ex)
70+
{
71+
_logger.LogWarning(ex, "Failed to ensure Log.AgentId column. Add it manually: ALTER TABLE `Log` ADD COLUMN `AgentId` VARCHAR(100) NULL");
72+
}
73+
}
74+
5075
private void EnsureLogIndexes(System.Data.IDbConnection db)
5176
{
5277
var missingIndexes = new List<(string Name, string Sql)>();
@@ -56,7 +81,8 @@ private void EnsureLogIndexes(System.Data.IDbConnection db)
5681
var indexesToCheck = new[]
5782
{
5883
("Deployment_Pod_TimeStamp_idx", "CREATE INDEX Deployment_Pod_TimeStamp_idx ON `Log` (Deployment, Pod, TimeStamp)"),
59-
("LogDate_idx", "CREATE INDEX LogDate_idx ON `Log` (LogDate)")
84+
("LogDate_idx", "CREATE INDEX LogDate_idx ON `Log` (LogDate)"),
85+
("AgentId_idx", "CREATE INDEX AgentId_idx ON `Log` (AgentId)")
6086
};
6187

6288
foreach (var (name, createSql) in indexesToCheck)

src/LogMkApi/Data/Models/Log.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public class Log
4141
[MaxLength(50)]
4242
public string BatchId { get; set; } = string.Empty;
4343
public DateTime ReceivedAt { get; set; }
44+
[Index]
45+
[MaxLength(100)]
46+
public string? AgentId { get; set; }
4447
// Computed property for pod key (useful for queries)
4548
[Ignore] // Don't store in database
4649
public string PodKey => $"{Deployment}:{Pod}";

0 commit comments

Comments
 (0)