Skip to content

Commit 7c122aa

Browse files
committed
New leadership watcher
1 parent 2e62ddc commit 7c122aa

5 files changed

Lines changed: 157 additions & 6 deletions

File tree

releasenotes/4.1.7.props

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<Project>
2+
<PropertyGroup>
3+
<PackageReleaseNotes>
4+
* Fix spinning on leadership election
5+
</PackageReleaseNotes>
6+
</PropertyGroup>
7+
</Project>

src/CondenserDotNet.Client/Leadership/LeaderRegistry.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace CondenserDotNet.Client.Leadership
77
public class LeaderRegistry : ILeaderRegistry
88
{
99
private readonly IServiceManager _serviceManager;
10-
private readonly Dictionary<string, LeaderWatcher> _leaderWatchers = new Dictionary<string, LeaderWatcher>(StringComparer.OrdinalIgnoreCase);
10+
private readonly Dictionary<string, LeaderWatcherNew> _leaderWatchers = new Dictionary<string, LeaderWatcherNew>(StringComparer.OrdinalIgnoreCase);
1111

1212
public LeaderRegistry(IServiceManager serviceManager) => _serviceManager = serviceManager;
1313

@@ -23,7 +23,7 @@ public async Task<ILeaderWatcher> GetLeaderWatcherAsync(string keyForLeadership)
2323
{
2424
if (!_leaderWatchers.TryGetValue(keyForLeadership, out var returnValue))
2525
{
26-
returnValue = new LeaderWatcher(_serviceManager, keyForLeadership);
26+
returnValue = new LeaderWatcherNew(_serviceManager, keyForLeadership);
2727
_leaderWatchers[keyForLeadership] = returnValue;
2828
}
2929
return returnValue;

src/CondenserDotNet.Client/Leadership/LeaderWatcher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace CondenserDotNet.Client.Leadership
1111
{
12-
public class LeaderWatcher : ILeaderWatcher
12+
public class LeaderWatcherOld : ILeaderWatcher
1313
{
1414
private readonly AsyncManualResetEvent<InformationService> _currentLeaderEvent = new AsyncManualResetEvent<InformationService>();
1515
private readonly AsyncManualResetEvent<bool> _electedLeaderEvent = new AsyncManualResetEvent<bool>();
@@ -20,7 +20,7 @@ public class LeaderWatcher : ILeaderWatcher
2020
private const string KeyPath = "/v1/kv/";
2121
private string _consulIndex = "0";
2222

23-
internal LeaderWatcher(IServiceManager serviceManager, string keyToWatch)
23+
internal LeaderWatcherOld(IServiceManager serviceManager, string keyToWatch)
2424
{
2525
_serviceManager = serviceManager;
2626
_keyToWatch = keyToWatch;
@@ -53,7 +53,7 @@ private async Task TryForElection()
5353
_electedLeaderEvent.Reset();
5454
_currentLeaderEvent.Reset();
5555
CondenserEventSource.Log.LeadershipTryToLock(_keyToWatch);
56-
var leaderResult = await _serviceManager.Client.PutAsync($"{KeyPath}{_keyToWatch}?acquire={_sessionId}&index={_consulIndex}", GetServiceInformation());
56+
var leaderResult = await _serviceManager.Client.PutAsync($"{KeyPath}{_keyToWatch}?acquire={_sessionId}", GetServiceInformation());
5757
if (!leaderResult.IsSuccessStatusCode)
5858
{
5959
//error so we need to get a new session
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Net.Http;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using CondenserDotNet.Client.DataContracts;
8+
using CondenserDotNet.Core;
9+
using CondenserDotNet.Core.Consul;
10+
using CondenserDotNet.Core.DataContracts;
11+
using Newtonsoft.Json;
12+
13+
namespace CondenserDotNet.Client.Leadership
14+
{
15+
public class LeaderWatcherNew : ILeaderWatcher
16+
{
17+
private readonly AsyncManualResetEvent<bool> _electedLeaderEvent = new AsyncManualResetEvent<bool>();
18+
private Action<InformationService> _callback;
19+
private const string KeyPath = "/v1/kv/";
20+
private string _consulIndex = "0";
21+
private Task<Guid> _sessionIdTask;
22+
private IServiceManager _serviceManager;
23+
private string _keyToWatch;
24+
private AsyncManualResetEvent<InformationService> _currentInfoService;
25+
26+
public LeaderWatcherNew(IServiceManager serviceManager, string keyToWatch)
27+
{
28+
_keyToWatch = keyToWatch;
29+
_serviceManager = serviceManager;
30+
_sessionIdTask = GetSession();
31+
}
32+
33+
private async Task<Guid> GetSession()
34+
{
35+
while (true)
36+
{
37+
CondenserEventSource.Log.LeadershipSessionCreated();
38+
var result = await _serviceManager.Client.PutAsync(HttpUtils.SessionCreateUrl, GetCreateSession());
39+
if (!result.IsSuccessStatusCode)
40+
{
41+
await Task.Delay(1000);
42+
continue;
43+
}
44+
return JsonConvert.DeserializeObject<SessionCreateResponse>(await result.Content.ReadAsStringAsync()).Id;
45+
}
46+
}
47+
48+
private StringContent GetCreateSession()
49+
{
50+
var checks = new string[_serviceManager.RegisteredService.Checks.Count + 1];
51+
for (var i = 0; i < _serviceManager.RegisteredService.Checks.Count; i++)
52+
{
53+
checks[i] = _serviceManager.RegisteredService.Checks[i].Name;
54+
}
55+
checks[checks.Length - 1] = "serfHealth";
56+
var sessionCreate = new SessionCreate()
57+
{
58+
Behavior = "delete",
59+
Checks = checks,
60+
LockDelay = "1s",
61+
Name = $"{_serviceManager.ServiceId}:LeaderElection:{_keyToWatch.Replace('/', ':')}"
62+
};
63+
return HttpUtils.GetStringContent(sessionCreate);
64+
}
65+
66+
private StringContent GetServiceInformation() => HttpUtils.GetStringContent(new InformationService()
67+
{
68+
Address = _serviceManager.ServiceAddress,
69+
ID = _serviceManager.ServiceId,
70+
Port = _serviceManager.ServicePort,
71+
Service = _serviceManager.ServiceName,
72+
Tags = _serviceManager.RegisteredService.Tags.ToArray()
73+
});
74+
75+
private async Task KeepLeadershipLoop()
76+
{
77+
var sessionId = await _sessionIdTask;
78+
while (true)
79+
{
80+
var leaderResult = await _serviceManager.Client.PutAsync($"{KeyPath}{_keyToWatch}?acquire={sessionId}", GetServiceInformation());
81+
if (!leaderResult.IsSuccessStatusCode)
82+
{
83+
//error so we need to get a new session
84+
await Task.Delay(500);
85+
continue;
86+
}
87+
var areWeLeader = bool.Parse(await leaderResult.Content.ReadAsStringAsync());
88+
if(areWeLeader)
89+
{
90+
_electedLeaderEvent.Set(true);
91+
}
92+
else
93+
{
94+
_electedLeaderEvent.Set(false);
95+
}
96+
await WaitForLeadershipChange();
97+
}
98+
}
99+
100+
private async Task WaitForLeadershipChange()
101+
{
102+
while(true)
103+
{
104+
var leaderResult = await _serviceManager.Client.GetAsync($"{KeyPath}{_keyToWatch}?index={_consulIndex}");
105+
if(!leaderResult.IsSuccessStatusCode)
106+
{
107+
await Task.Delay(500);
108+
continue;
109+
}
110+
var kv = JsonConvert.DeserializeObject<KeyValue[]>(await leaderResult.Content.ReadAsStringAsync());
111+
if(string.IsNullOrWhiteSpace(kv[0].Session))
112+
{
113+
//no one has leadership
114+
_currentInfoService.Reset();
115+
_electedLeaderEvent.Reset();
116+
return;
117+
}
118+
var infoService = JsonConvert.DeserializeObject<InformationService>(kv[0].ValueFromBase64());
119+
_currentInfoService.Set(infoService);
120+
_consulIndex = leaderResult.GetConsulIndex();
121+
_callback?.Invoke(infoService);
122+
if(await _sessionIdTask != new Guid(kv[0].Session))
123+
{
124+
_electedLeaderEvent.Reset();
125+
return;
126+
}
127+
}
128+
}
129+
130+
public async Task<InformationService> GetCurrentLeaderAsync()
131+
{
132+
await _sessionIdTask;
133+
return await _currentInfoService.WaitAsync();
134+
}
135+
136+
public async Task GetLeadershipAsync()
137+
{
138+
await _sessionIdTask;
139+
await _electedLeaderEvent.WaitAsync();
140+
}
141+
142+
public void SetLeaderCallback(Action<InformationService> callback) => _callback = callback;
143+
}
144+
}

version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<PropertyGroup>
3-
<VersionPrefix>4.1.6</VersionPrefix>
3+
<VersionPrefix>4.1.7</VersionPrefix>
44
<VersionSuffix>beta</VersionSuffix>
55
</PropertyGroup>
66
</Project>

0 commit comments

Comments
 (0)