Skip to content

Commit 4853267

Browse files
committed
Add JobFilter
1 parent b439343 commit 4853267

16 files changed

Lines changed: 394 additions & 230 deletions

src/plugins/Hangfire/HangfireTest.Server/HangfireTest.Server.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
<PropertyGroup>
44
<OutputType>Exe</OutputType>
5-
<TargetFramework>net6.0</TargetFramework>
5+
<TargetFramework>net9.0</TargetFramework>
66
<Nullable>enable</Nullable>
77
<ImplicitUsings>enable</ImplicitUsings>
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="Hangfire.AspNetCore" Version="1.7.28" />
12-
<PackageReference Include="Hangfire.Redis.StackExchange" Version="1.8.5" />
11+
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.20" />
12+
<PackageReference Include="Hangfire.Redis.StackExchange" Version="1.12.0" />
1313
</ItemGroup>
1414

1515
<ItemGroup>
Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,39 @@
11
using Hangfire;
22
using Hangfire.Dashboard;
3-
using Hangfire.Redis;
3+
using Hangfire.Redis.StackExchange;
44

5-
var configuration = GlobalConfiguration.Configuration;
6-
configuration.UseRedisStorage("10.99.59.47:7000,DefaultDatabase=7,allowAdmin=true");
7-
configuration.UseColouredConsoleLogProvider();
8-
//todo: add a mini web dashboard?
9-
foreach (var metric in DashboardMetrics.GetMetrics())
5+
namespace HangfireTest.Server;
6+
7+
internal static class Program
108
{
11-
configuration.UseDashboardMetric(metric);
12-
}
13-
configuration.UseDashboardMetric(
14-
RedisStorage.GetDashboardMetricFromRedisInfo("使用内存", RedisInfoKeys.used_memory_human));
15-
configuration.UseDashboardMetric(
16-
RedisStorage.GetDashboardMetricFromRedisInfo("高峰内存", RedisInfoKeys.used_memory_peak_human));
179

18-
BackgroundJob.Enqueue(() => Console.WriteLine("Hello, world!"));
10+
#region Constants & Statics
11+
12+
private static void Main(string[] args)
13+
{
14+
var configuration = GlobalConfiguration.Configuration;
15+
_ = configuration.UseRedisStorage("127.0.0.1:6379,DefaultDatabase=7,allowAdmin=true");
16+
_ = configuration.UseColouredConsoleLogProvider();
17+
//todo: add a mini web dashboard?
18+
foreach (var metric in DashboardMetrics.GetMetrics())
19+
{
20+
_ = configuration.UseDashboardMetric(metric);
21+
}
22+
_ = configuration.UseDashboardMetric(
23+
RedisStorage.GetDashboardMetricFromRedisInfo("使用内存", RedisInfoKeys.used_memory_human));
24+
_ = configuration.UseDashboardMetric(
25+
RedisStorage.GetDashboardMetricFromRedisInfo("高峰内存", RedisInfoKeys.used_memory_peak_human));
26+
27+
_ = BackgroundJob.Enqueue(() => Console.WriteLine("Hello, world!"));
28+
29+
var jobServerOptions =
30+
new BackgroundJobServerOptions { ServerName = "HangfireTest.Web", Queues = new[] { "default", "console" } };
31+
using var server = new BackgroundJobServer(jobServerOptions);
32+
33+
Console.WriteLine("Hangfire Server started. Press any key to exit...");
34+
_ = Console.ReadKey();
35+
}
36+
37+
#endregion
1938

20-
var jobServerOptions =
21-
new BackgroundJobServerOptions { ServerName = "HangfireTest.Web", Queues = new[] { "default", "console" } };
22-
using (var server = new BackgroundJobServer(jobServerOptions))
23-
{
24-
Console.WriteLine("Hangfire Server started. Press any key to exit...");
25-
Console.ReadKey();
2639
}
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Threading.Tasks;
21
using Hangfire;
32
using Microsoft.Extensions.Logging;
43

@@ -7,19 +6,21 @@ namespace HangfireTest.Service;
76
[Queue("console")]
87
public class ConsoleJobs
98
{
10-
private readonly ILogger<DefaultJobs> _logger;
9+
private readonly ILogger<ConsoleJobs> _logger;
1110

12-
private readonly BackgroundJobServerOptions _options;
13-
14-
public ConsoleJobs(ILogger<DefaultJobs> logger, BackgroundJobServerOptions options)
11+
public ConsoleJobs(ILogger<ConsoleJobs> logger)
1512
{
1613
_logger = logger;
17-
_options = options;
1814
}
1915

16+
#region Methods
17+
2018
public async Task TestJob()
2119
{
2220
await Task.Delay(500);
2321
_logger.LogInformation($"console job:{nameof(TestJob)} completed.");
2422
}
23+
24+
#endregion
25+
2526
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using Hangfire.Common;
2+
using Hangfire.Logging;
3+
using Hangfire.Server;
4+
5+
namespace HangfireTest.Service;
6+
7+
public class CustomServerFilterAttribute : JobFilterAttribute, IServerFilter
8+
{
9+
10+
#region Constants & Statics
11+
12+
//尽量少用 LogProvider
13+
private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();
14+
15+
#endregion
16+
17+
#region IServerFilter implementations
18+
19+
public void OnPerformed(PerformedContext context)
20+
{
21+
Logger.InfoFormat("Job `{0}` has been performed", context.BackgroundJob.Id);
22+
Logger.InfoFormat(
23+
$"{context.BackgroundJob.Job.Method.DeclaringType}.{context.BackgroundJob.Job.Method.Name} @{context.ServerId} completed.");
24+
}
25+
26+
public void OnPerforming(PerformingContext context)
27+
{
28+
Logger.InfoFormat("Starting to perform job `{0}`", context.BackgroundJob.Id);
29+
}
30+
31+
#endregion
32+
33+
}
Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,72 @@
1-
using System.Threading;
2-
using System.Threading.Tasks;
31
using Hangfire;
42
using Microsoft.Extensions.Logging;
53

64
namespace HangfireTest.Service;
75

86
[Queue("default")]
7+
[CustomServerFilter]
98
public class DefaultJobs
109
{
1110
private readonly ILogger<DefaultJobs> _logger;
1211

13-
private readonly BackgroundJobServerOptions _options;
14-
15-
public DefaultJobs(ILogger<DefaultJobs> logger, BackgroundJobServerOptions options)
12+
public DefaultJobs(ILogger<DefaultJobs> logger)
1613
{
1714
_logger = logger;
18-
_options = options;
1915
}
2016

17+
#region Methods
18+
2119
[JobDisplayName("Job1, id is:{0}")]
2220
[AutomaticRetry(Attempts = 3)]
2321
public async Task Job1(int id)
2422
{
2523
await Task.Delay(500);
2624
//todo: how to get Processing Server and Worker
27-
//_logger.LogInformation($"{nameof(DefaultJobs)}.{nameof(Job1)} @{_options.ServerName} completed.");
2825
_logger.LogInformation($"{nameof(DefaultJobs)}.{nameof(Job1)} completed.");
2926
}
3027

3128
public async Task Job2(int seconds)
3229
{
3330
await Task.Delay(500);
34-
_logger.LogInformation(
35-
$"{nameof(DefaultJobs)}.{nameof(Job2)} delay {seconds} seconds completed.");
31+
_logger.LogInformation($"{nameof(DefaultJobs)}.{nameof(Job2)} delay {seconds} seconds completed.");
3632
}
3733

3834
[DisableConcurrentExecution(3)]
3935
public async Task Job3()
4036
{
4137
//并发时执行失败,自动重试
4238

43-
int count = 0;
39+
var count = 0;
4440
while (count++ < 12)
4541
{
4642
await Task.Delay(6000);
47-
_logger.LogInformation(
48-
$"{nameof(DefaultJobs)}.{nameof(Job3)} completed.");
43+
_logger.LogInformation($"{nameof(DefaultJobs)}.{nameof(Job3)} completed.");
4944
}
5045
}
5146

5247
[SkipConcurrentExecution(3)]
5348
public async Task Job4()
5449
{
5550
//并发时标记为成功,continuation 被执行
56-
int count = 0;
51+
var count = 0;
5752
while (count++ < 12)
5853
{
5954
await Task.Delay(6000);
60-
_logger.LogInformation(
61-
$"{nameof(DefaultJobs)}.{nameof(Job4)} completed.");
55+
_logger.LogInformation($"{nameof(DefaultJobs)}.{nameof(Job4)} completed.");
6256
}
6357
}
6458

6559
public async Task Job5(CancellationToken cancellation)
6660
{
6761
//Cancell exception will be re-queued
68-
int count = 0;
62+
var count = 0;
6963
while (count++ < 12)
7064
{
7165
await Task.Delay(6000, cancellation);
72-
_logger.LogInformation(
73-
$"{nameof(DefaultJobs)}.{nameof(Job5)} completed.");
66+
_logger.LogInformation($"{nameof(DefaultJobs)}.{nameof(Job5)} completed.");
7467
}
7568
}
69+
70+
#endregion
71+
7672
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using Hangfire.Client;
2+
using Hangfire.Common;
3+
using Hangfire.Server;
4+
using Microsoft.Extensions.Logging;
5+
6+
namespace HangfireTest.Service;
7+
8+
public class ExternalDataFilterAttribute : JobFilterAttribute, IClientFilter, IServerFilter
9+
{
10+
private readonly ILogger<ExternalDataFilterAttribute> _logger;
11+
12+
public ExternalDataFilterAttribute(ILogger<ExternalDataFilterAttribute> logger)
13+
{
14+
_logger = logger;
15+
}
16+
17+
#region IClientFilter implementations
18+
19+
public void OnCreated(CreatedContext context)
20+
{
21+
_logger.LogInformation(
22+
"Job that is based on method `{0}` has been created with id `{1}`",
23+
context.Job.Method.Name,
24+
context.BackgroundJob?.Id);
25+
}
26+
27+
public void OnCreating(CreatingContext context)
28+
{
29+
_logger.LogInformation("Creating a job based on method `{0}`...", context.Job.Method.Name);
30+
31+
context.SetJobParameter("externalData", DateTime.Now.ToString("O"));
32+
}
33+
34+
#endregion
35+
36+
#region IServerFilter implementations
37+
38+
public void OnPerformed(PerformedContext context)
39+
{
40+
}
41+
42+
public void OnPerforming(PerformingContext context)
43+
{
44+
var time = context.GetJobParameter<DateTime>("externalData");
45+
_logger.LogInformation("get externalData from context, {0}", time.ToString("O"));
46+
}
47+
48+
#endregion
49+
50+
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFramework>net6.0</TargetFramework>
4+
<TargetFramework>net9.0</TargetFramework>
55
</PropertyGroup>
66

77
<ItemGroup>
8-
<PackageReference Include="Hangfire.AspNetCore" Version="1.7.28" />
8+
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.20" />
9+
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
910
</ItemGroup>
1011

1112
</Project>
Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System;
21
using Hangfire.Common;
32
using Hangfire.Server;
43
using Hangfire.Storage;
@@ -10,25 +9,50 @@ namespace HangfireTest.Service;
109
/// </summary>
1110
public class SkipConcurrentExecutionAttribute : JobFilterAttribute, IServerFilter
1211
{
12+
13+
#region Constants & Statics
14+
15+
private static string GetResource(Job job)
16+
{
17+
return job.ToString();
18+
}
19+
20+
#endregion
21+
1322
private readonly int _timeoutInSeconds;
1423

1524
/// <summary>
1625
/// Initializes a new instance of the <see cref="SkipConcurrentExecutionAttribute"/> class.
1726
/// </summary>
1827
/// <param name="timeoutInSeconds">The timeout in seconds.</param>
19-
/// <exception cref="System.ArgumentException">Timeout argument value should be greater that zero.</exception>
20-
public SkipConcurrentExecutionAttribute(int timeoutInSeconds) => _timeoutInSeconds = timeoutInSeconds >= 0
21-
? timeoutInSeconds
22-
: throw new ArgumentException("Timeout argument value should be greater that zero.");
28+
/// <exception cref="ArgumentException">Timeout argument value should be greater that zero.</exception>
29+
public SkipConcurrentExecutionAttribute(int timeoutInSeconds)
30+
{
31+
_timeoutInSeconds = timeoutInSeconds >= 0
32+
? timeoutInSeconds
33+
: throw new ArgumentException("Timeout argument value should be greater that zero.");
34+
}
35+
36+
#region IServerFilter implementations
37+
38+
/// <inheritdoc/>
39+
public void OnPerformed(PerformedContext filterContext)
40+
{
41+
if (!filterContext.Items.TryGetValue("DistributedLock", out var value))
42+
{
43+
throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
44+
}
45+
((IDisposable)value).Dispose();
46+
}
2347

24-
/// <inheritdoc />
48+
/// <inheritdoc/>
2549
public void OnPerforming(PerformingContext filterContext)
2650
{
27-
string resource = GetResource(filterContext.BackgroundJob.Job);
28-
TimeSpan timeout = TimeSpan.FromSeconds(_timeoutInSeconds);
51+
var resource = GetResource(filterContext.BackgroundJob.Job);
52+
var timeout = TimeSpan.FromSeconds(_timeoutInSeconds);
2953
try
3054
{
31-
IDisposable disposable = filterContext.Connection.AcquireDistributedLock(resource, timeout);
55+
var disposable = filterContext.Connection.AcquireDistributedLock(resource, timeout);
3256
filterContext.Items["DistributedLock"] = disposable;
3357
}
3458
catch (DistributedLockTimeoutException)
@@ -37,15 +61,6 @@ public void OnPerforming(PerformingContext filterContext)
3761
}
3862
}
3963

40-
/// <inheritdoc />
41-
public void OnPerformed(PerformedContext filterContext)
42-
{
43-
if (!filterContext.Items.ContainsKey("DistributedLock"))
44-
{
45-
throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");
46-
}
47-
((IDisposable)filterContext.Items["DistributedLock"]).Dispose();
48-
}
64+
#endregion
4965

50-
private static string GetResource(Job job) => job.ToString();
5166
}

0 commit comments

Comments
 (0)