diff --git a/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs b/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs index 97ef728f5..025685f96 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs @@ -4,4 +4,5 @@ public interface ICrontabService { Task> GetCrontable(); Task ScheduledTimeArrived(CrontabItem item); + Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs b/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs index 227ca99ef..71bc20985 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs @@ -32,6 +32,9 @@ public class CrontabItem : ScheduleTaskArgs [JsonPropertyName("trigger_type")] public CronTabItemTriggerType TriggerType { get; set; } = CronTabItemTriggerType.BackgroundWatcher; + [JsonPropertyName("reentry_protection")] + public bool ReentryProtection { get; set; } + public override string ToString() { return $"{Title}: {Description} [AgentId: {AgentId}, UserId: {UserId}]"; diff --git a/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs b/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs index 3a2b05dbb..22d5f9c9d 100644 --- a/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs +++ b/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs @@ -15,6 +15,7 @@ limitations under the License. ******************************************************************************/ using BotSharp.Abstraction.Agents.Models; +using BotSharp.Abstraction.Infrastructures; using BotSharp.Abstraction.Repositories; using BotSharp.Abstraction.Repositories.Filters; using BotSharp.Abstraction.Tasks; @@ -33,11 +34,13 @@ namespace BotSharp.Core.Crontab.Services; public class CrontabService : ICrontabService, ITaskFeeder { private readonly IServiceProvider _services; + private readonly IServiceScopeFactory _scopeFactory; private ILogger _logger; - public CrontabService(IServiceProvider services, ILogger logger) + public CrontabService(IServiceProvider services, IServiceScopeFactory scopeFactory, ILogger logger) { _services = services; + _scopeFactory = scopeFactory; _logger = logger; } @@ -116,7 +119,12 @@ public async Task ScheduledTimeArrived(CrontabItem item) { _logger.LogDebug($"ScheduledTimeArrived {item}"); - if (!await HasEnabledTriggerRule(item)) return; + var triggerEnabled = await HasEnabledTriggerRule(item); + if (!triggerEnabled) + { + _logger.LogWarning("Crontab: {0}, Trigger is not enabled, skipping this occurrence.", item.Title); + return; + } await HookEmitter.Emit(_services, async hook => { @@ -150,4 +158,62 @@ private async Task HasEnabledTriggerRule(CrontabItem item) // Opt-out only: block when a matching trigger rule exists and Disabled is true. return !agent.Rules.Any(r => r.TriggerName == item.Title && r.Disabled); } + + public async Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item) + { + if (!item.ReentryProtection) + { + await ExecuteTimeArrivedItem(item); + return; + } + + var lockKey = $"crontab:execution:{item.Title}"; + using var scope = _scopeFactory.CreateScope(); + var locker = scope.ServiceProvider.GetRequiredService(); + var acquired = false; + var lockAcquired = false; + + try + { + acquired = await locker.LockAsync(lockKey, async () => + { + lockAcquired = true; + _logger.LogInformation("Crontab: {0}, Distributed lock acquired, beginning execution...", item.Title); + await ExecuteTimeArrivedItem(item); + }, timeout: 600); + + if (!acquired) + { + _logger.LogWarning("Crontab: {0}, Failed to acquire distributed lock, task is still executing, skipping this occurrence to prevent re-entry.", item.Title); + } + } + catch (Exception ex) + { + if (!lockAcquired) + { + _logger.LogWarning("Crontab: {0}, Redis exception occurred before acquiring lock: {1}, executing without lock protection (re-entry protection disabled).", item.Title, ex.Message); + await ExecuteTimeArrivedItem(item); + } + else + { + _logger.LogWarning("Crontab: {0}, Redis exception occurred after lock acquired: {1}, task execution completed but lock release failed.", item.Title, ex.Message); + } + } + } + + private async Task ExecuteTimeArrivedItem(CrontabItem item) + { + try + { + _logger.LogInformation($"Start running crontab {item.Title}"); + await ScheduledTimeArrived(item); + _logger.LogInformation($"Complete running crontab {item.Title}"); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when running crontab {item.Title}"); + return false; + } + } } diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs index 09e4cf9e3..41e30df5c 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs @@ -10,13 +10,16 @@ namespace BotSharp.OpenAPI.Controllers; public class CrontabController : ControllerBase { private readonly IServiceProvider _services; + private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; public CrontabController( IServiceProvider services, + IServiceScopeFactory scopeFactory, ILogger logger) { _services = services; + _scopeFactory = scopeFactory; _logger = logger; } @@ -60,8 +63,8 @@ public async Task SchedulingCrontab() { if (item.CheckNextOccurrenceEveryOneMinute()) { - _logger.LogInformation("Crontab: {0}, One occurrence was matched, Beginning execution...", item.Title); - Task.Run(() => ExecuteTimeArrivedItem(item, _services)); + _logger.LogInformation($"Crontab: {item.Title}, One occurrence was matched, attempting to execute..."); + Task.Run(() => ExecuteTimeArrivedItem(item)); result.OccurrenceMatchedItems.Add(item.Title); } } @@ -84,21 +87,10 @@ private async Task> GetCrontabItems(string? title = null) return allowedCrons.Where(cron => cron.Title.IsEqualTo(title)).ToList(); } - private async Task ExecuteTimeArrivedItem(CrontabItem item, IServiceProvider services) + private async Task ExecuteTimeArrivedItem(CrontabItem item) { - try - { - using var scope = services.CreateScope(); - var crontabService = scope.ServiceProvider.GetRequiredService(); - _logger.LogInformation($"Start running crontab {item.Title}"); - await crontabService.ScheduledTimeArrived(item); - _logger.LogInformation($"Complete running crontab {item.Title}"); - return true; - } - catch (Exception ex) - { - _logger.LogError(ex, $"Error when running crontab {item.Title}"); - return false; - } + using var scope = _scopeFactory.CreateScope(); + var crontabService = scope.ServiceProvider.GetRequiredService(); + await crontabService.ExecuteTimeArrivedItemWithReentryProtection(item); } } diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs index 17a49c2bc..9bf332d61 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs @@ -18,6 +18,8 @@ public class CrontabItemDocument : MongoBase public bool LessThan60Seconds { get; set; } = false; public IEnumerable Tasks { get; set; } = []; public DateTime CreatedTime { get; set; } = DateTime.UtcNow; + public int TriggerType { get; set; } + public bool ReentryProtection { get; set; } public static CrontabItem ToDomainModel(CrontabItemDocument item) { @@ -36,7 +38,9 @@ public static CrontabItem ToDomainModel(CrontabItemDocument item) LastExecutionTime = item.LastExecutionTime, LessThan60Seconds = item.LessThan60Seconds, Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToDomainElement(x))?.ToArray() ?? [], - CreatedTime = item.CreatedTime + CreatedTime = item.CreatedTime, + TriggerType = (CronTabItemTriggerType)item.TriggerType, + ReentryProtection = item.ReentryProtection }; } @@ -57,7 +61,9 @@ public static CrontabItemDocument ToMongoModel(CrontabItem item) LastExecutionTime = item.LastExecutionTime, LessThan60Seconds = item.LessThan60Seconds, Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToMongoElement(x))?.ToList() ?? [], - CreatedTime = item.CreatedTime + CreatedTime = item.CreatedTime, + TriggerType = (int)item.TriggerType, + ReentryProtection = item.ReentryProtection }; } }