Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ public interface ICrontabService
{
Task<List<CrontabItem>> GetCrontable();
Task ScheduledTimeArrived(CrontabItem item);
Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class CrontabItem : ScheduleTaskArgs
[JsonPropertyName("trigger_type")]
public CronTabItemTriggerType TriggerType { get; set; } = CronTabItemTriggerType.BackgroundWatcher;

[JsonPropertyName("reentry_protection")]
public bool ReentryProtection { get; set; }

public override string ToString()
{
return $"{Title}: {Description} [AgentId: {AgentId}, UserId: {UserId}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
******************************************************************************/

using BotSharp.Abstraction.Agents.Models;
using BotSharp.Abstraction.Infrastructures;
using BotSharp.Abstraction.Repositories;
using BotSharp.Abstraction.Repositories.Filters;
using BotSharp.Abstraction.Tasks;
Expand All @@ -33,11 +34,13 @@ namespace BotSharp.Core.Crontab.Services;
public class CrontabService : ICrontabService, ITaskFeeder
{
private readonly IServiceProvider _services;
private readonly IServiceScopeFactory _scopeFactory;
private ILogger _logger;

public CrontabService(IServiceProvider services, ILogger<CrontabService> logger)
public CrontabService(IServiceProvider services, IServiceScopeFactory scopeFactory, ILogger<CrontabService> logger)
{
_services = services;
_scopeFactory = scopeFactory;
_logger = logger;
}

Expand Down Expand Up @@ -116,7 +119,12 @@ public async Task ScheduledTimeArrived(CrontabItem item)
{
_logger.LogDebug($"ScheduledTimeArrived {item}");

if (!await HasEnabledTriggerRule(item)) return;
var triggerEnabled = await HasEnabledTriggerRule(item);
if (!triggerEnabled)
{
_logger.LogWarning("Crontab: {0}, Trigger is not enabled, skipping this occurrence.", item.Title);
return;
}

await HookEmitter.Emit<ICrontabHook>(_services, async hook =>
{
Expand Down Expand Up @@ -150,4 +158,62 @@ private async Task<bool> HasEnabledTriggerRule(CrontabItem item)
// Opt-out only: block when a matching trigger rule exists and Disabled is true.
return !agent.Rules.Any(r => r.TriggerName == item.Title && r.Disabled);
}

public async Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item)
{
if (!item.ReentryProtection)
{
await ExecuteTimeArrivedItem(item);
return;
}

var lockKey = $"crontab:execution:{item.Title}";
using var scope = _scopeFactory.CreateScope();
var locker = scope.ServiceProvider.GetRequiredService<IDistributedLocker>();
var acquired = false;
var lockAcquired = false;

try
{
acquired = await locker.LockAsync(lockKey, async () =>
{
lockAcquired = true;
_logger.LogInformation("Crontab: {0}, Distributed lock acquired, beginning execution...", item.Title);
await ExecuteTimeArrivedItem(item);
}, timeout: 600);

if (!acquired)
{
_logger.LogWarning("Crontab: {0}, Failed to acquire distributed lock, task is still executing, skipping this occurrence to prevent re-entry.", item.Title);
}
}
catch (Exception ex)
{
if (!lockAcquired)
{
_logger.LogWarning("Crontab: {0}, Redis exception occurred before acquiring lock: {1}, executing without lock protection (re-entry protection disabled).", item.Title, ex.Message);
await ExecuteTimeArrivedItem(item);
}
else
{
_logger.LogWarning("Crontab: {0}, Redis exception occurred after lock acquired: {1}, task execution completed but lock release failed.", item.Title, ex.Message);
}
}
}

private async Task<bool> ExecuteTimeArrivedItem(CrontabItem item)
{
try
{
_logger.LogInformation($"Start running crontab {item.Title}");
await ScheduledTimeArrived(item);
_logger.LogInformation($"Complete running crontab {item.Title}");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error when running crontab {item.Title}");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ namespace BotSharp.OpenAPI.Controllers;
public class CrontabController : ControllerBase
{
private readonly IServiceProvider _services;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<CrontabController> _logger;

public CrontabController(
IServiceProvider services,
IServiceScopeFactory scopeFactory,
ILogger<CrontabController> logger)
{
_services = services;
_scopeFactory = scopeFactory;
_logger = logger;
}

Expand Down Expand Up @@ -60,8 +63,8 @@ public async Task<CrontabSchedulingResult> SchedulingCrontab()
{
if (item.CheckNextOccurrenceEveryOneMinute())
{
_logger.LogInformation("Crontab: {0}, One occurrence was matched, Beginning execution...", item.Title);
Task.Run(() => ExecuteTimeArrivedItem(item, _services));
_logger.LogInformation($"Crontab: {item.Title}, One occurrence was matched, attempting to execute...");
Task.Run(() => ExecuteTimeArrivedItem(item));
result.OccurrenceMatchedItems.Add(item.Title);
}
}
Expand All @@ -84,21 +87,10 @@ private async Task<List<CrontabItem>> GetCrontabItems(string? title = null)
return allowedCrons.Where(cron => cron.Title.IsEqualTo(title)).ToList();
}

private async Task<bool> ExecuteTimeArrivedItem(CrontabItem item, IServiceProvider services)
private async Task ExecuteTimeArrivedItem(CrontabItem item)
{
try
{
using var scope = services.CreateScope();
var crontabService = scope.ServiceProvider.GetRequiredService<ICrontabService>();
_logger.LogInformation($"Start running crontab {item.Title}");
await crontabService.ScheduledTimeArrived(item);
_logger.LogInformation($"Complete running crontab {item.Title}");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error when running crontab {item.Title}");
return false;
}
using var scope = _scopeFactory.CreateScope();
var crontabService = scope.ServiceProvider.GetRequiredService<ICrontabService>();
await crontabService.ExecuteTimeArrivedItemWithReentryProtection(item);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class CrontabItemDocument : MongoBase
public bool LessThan60Seconds { get; set; } = false;
public IEnumerable<CronTaskMongoElement> Tasks { get; set; } = [];
public DateTime CreatedTime { get; set; } = DateTime.UtcNow;
public int TriggerType { get; set; }
public bool ReentryProtection { get; set; }

public static CrontabItem ToDomainModel(CrontabItemDocument item)
{
Expand All @@ -36,7 +38,9 @@ public static CrontabItem ToDomainModel(CrontabItemDocument item)
LastExecutionTime = item.LastExecutionTime,
LessThan60Seconds = item.LessThan60Seconds,
Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToDomainElement(x))?.ToArray() ?? [],
CreatedTime = item.CreatedTime
CreatedTime = item.CreatedTime,
TriggerType = (CronTabItemTriggerType)item.TriggerType,
ReentryProtection = item.ReentryProtection
};
}

Expand All @@ -57,7 +61,9 @@ public static CrontabItemDocument ToMongoModel(CrontabItem item)
LastExecutionTime = item.LastExecutionTime,
LessThan60Seconds = item.LessThan60Seconds,
Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToMongoElement(x))?.ToList() ?? [],
CreatedTime = item.CreatedTime
CreatedTime = item.CreatedTime,
TriggerType = (int)item.TriggerType,
ReentryProtection = item.ReentryProtection
};
}
}
Loading