Skip to content
This repository was archived by the owner on May 12, 2026. It is now read-only.

Commit 7f933af

Browse files
committed
Allowing Multiple Subscriptions
1 parent 0246a61 commit 7f933af

2 files changed

Lines changed: 53 additions & 38 deletions

File tree

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
11
using System;
2-
using System.Collections.Generic;
3-
using System.Text;
42
using Hangfire.States;
53

64
namespace HangFire.TopicExtensions.Attributes
75
{
8-
[AttributeUsage(AttributeTargets.Class)]
6+
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
97
public class SubscriberJobAttribute : Attribute
108
{
9+
public SubscriberJobAttribute(string topic) : this(topic, EnqueuedState.DefaultQueue)
10+
{
11+
}
1112

12-
public string TopicJobId { get; set; }
13-
14-
public string Queue { get; set; }
1513

16-
17-
public bool Enabled { get; set; } = true;
18-
19-
public SubscriberJobAttribute(string topic) : this(topic, EnqueuedState.DefaultQueue) { }
14+
public SubscriberJobAttribute(string topic, string queue)
15+
{
16+
if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic));
2017

21-
22-
23-
public SubscriberJobAttribute(string topic, string queue)
24-
{
25-
if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException(nameof(topic));
26-
27-
if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue));
18+
if (string.IsNullOrEmpty(queue)) throw new ArgumentNullException(nameof(queue));
2819

29-
30-
Topic = topic;
31-
Queue = queue;
32-
}
20+
21+
Topic = topic;
22+
Queue = queue;
23+
}
24+
25+
public string TopicJobId { get; set; }
26+
27+
public string Queue { get; set; }
28+
29+
30+
public bool Enabled { get; set; } = true;
3331

3432
public string Topic { get; set; }
3533
}
36-
}
34+
}

HangFire.TopicExtensions/TopicPublisher.cs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,45 +7,58 @@
77
using HangFire.TopicExtensions.Interfaces;
88
using Microsoft.Extensions.DependencyInjection;
99

10-
1110
namespace HangFire.TopicExtensions
1211
{
1312
public class TopicPublisher : ITopicPublisher
1413
{
15-
1614
private readonly IServiceProvider _serviceProvider;
1715

1816
public TopicPublisher(IServiceProvider serviceProvider)
1917
{
2018
_serviceProvider = serviceProvider;
19+
BuildSubscribers();
2120
}
2221

22+
public List<Subscription> Subscriptions { get; set; }
23+
2324
public void EnqueueTopic(string topic, object context = null)
2425
{
25-
2626
// Find Subscribers
2727
BackgroundJob.Enqueue(() => DispatchTopic(topic, context));
28-
2928
}
3029

31-
public void DispatchTopic(string topic, object context)
30+
private void BuildSubscribers()
3231
{
32+
var allTypes = GetImplementationsOf<ISubscriber>();
33+
Subscriptions = new List<Subscription>();
3334

34-
var allSubscribers = GetImplementationsOf<ISubscriber>();
35-
36-
foreach (var type in allSubscribers)
35+
foreach (var type in allTypes)
3736
{
38-
3937
var attributes = type.GetCustomAttributes<SubscriberJobAttribute>();
4038

41-
var subscribed = attributes.Any(a=> a.Topic.ToLower() == topic);
42-
if (!subscribed) continue;
43-
44-
var impl = (ISubscriber)ActivatorUtilities.CreateInstance(_serviceProvider,type);
45-
BackgroundJob.Enqueue(() => impl.Execute(context));
39+
var subscriberJobAttributes = attributes.ToList();
40+
if (!subscriberJobAttributes.Any()) continue;
41+
42+
43+
var impl = (ISubscriber) ActivatorUtilities.CreateInstance(_serviceProvider, type);
4644

45+
foreach (var subscriberJobAttribute in subscriberJobAttributes)
46+
{
47+
var subscription = new Subscription
48+
{
49+
Topic = subscriberJobAttribute.Topic,
50+
Subscriber = impl
51+
};
52+
53+
Subscriptions.Add(subscription);
54+
}
4755
}
56+
}
4857

58+
public void DispatchTopic(string topic, object context)
59+
{
60+
foreach (var subscription in Subscriptions.Where(s => s.Topic == topic))
61+
BackgroundJob.Enqueue(() => subscription.Subscriber.Execute(context));
4962
}
5063

5164
private static IEnumerable<Type> GetImplementationsOf<TInterface>()
@@ -56,7 +69,11 @@ private static IEnumerable<Type> GetImplementationsOf<TInterface>()
5669
assembly.GetTypes().Where(type => !type.IsInterface && interfaceType.IsAssignableFrom(type)))
5770
.SelectMany(implementation => implementation);
5871
}
59-
}
60-
6172

73+
public class Subscription
74+
{
75+
public string Topic { get; set; }
76+
public ISubscriber Subscriber { get; set; }
77+
}
78+
}
6279
}

0 commit comments

Comments
 (0)