Skip to content

Commit e7a93a2

Browse files
authored
Auto cluster failover refactoring (#354)
* AutoClusterFailover refactoring and unit tests, initial commit * Bumped version * Review fixes * Review fixes * More review fixes
1 parent 8866929 commit e7a93a2

9 files changed

Lines changed: 665 additions & 92 deletions

src/Pulsar.Client/Api/AutoClusterFailover.fs

Lines changed: 186 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -4,112 +4,225 @@ open System
44
open System.Net.Sockets
55
open System.Threading
66
open System.Threading.Tasks
7+
open System.Timers
78
open Microsoft.Extensions.Logging
89
open Pulsar.Client.Common
910
open Pulsar.Client.Internal
1011

11-
type private AutoServiceInfo = {
12+
type internal AutoServiceInfo = {
1213
ServiceInfo: ServiceInfo
1314
EndPointResolver: EndPointResolver
1415
}
1516

17+
// Pure state machine
18+
19+
[<RequireQualifiedAccess>]
20+
type internal AutoClusterMode =
21+
| Primary
22+
| Secondary of index: int
23+
24+
type internal AutoClusterState = {
25+
Mode: AutoClusterMode
26+
PrimaryFailedTimestamp: DateTime option
27+
PrimaryRecoveredTimestamp: DateTime option
28+
}
29+
30+
[<RequireQualifiedAccess>]
31+
type internal AutoClusterDecision =
32+
| NoAction
33+
| SwitchToSecondary of index: int
34+
| SwitchToPrimary
35+
36+
type internal AutoClusterConfig = {
37+
FailoverDelay: TimeSpan
38+
SwitchBackDelay: TimeSpan
39+
SecondaryCount: int
40+
}
41+
42+
[<RequireQualifiedAccess>]
43+
module internal AutoClusterFailoverLogic =
44+
45+
let initialState = {
46+
Mode = AutoClusterMode.Primary
47+
PrimaryFailedTimestamp = None
48+
PrimaryRecoveredTimestamp = None
49+
}
50+
51+
/// Single pure state transition covering both primary and secondary modes.
52+
/// - primaryAvailable: result of probing the primary endpoint.
53+
/// - findFirstAvailableSecondary: get index of the first available secondary
54+
/// (None means no secondary was probed or none was available).
55+
let step
56+
(now: DateTime)
57+
(config: AutoClusterConfig)
58+
(primaryAvailable: bool)
59+
(findFirstAvailableSecondary: unit -> Task<int option>)
60+
(state: AutoClusterState) =
61+
backgroundTask {
62+
match state.Mode, primaryAvailable with
63+
| AutoClusterMode.Primary, true ->
64+
return { state with PrimaryFailedTimestamp = None }, AutoClusterDecision.NoAction
65+
| AutoClusterMode.Primary, false ->
66+
match state.PrimaryFailedTimestamp with
67+
| None ->
68+
return { state with PrimaryFailedTimestamp = Some now }, AutoClusterDecision.NoAction
69+
| Some ts when now - ts >= config.FailoverDelay ->
70+
match! findFirstAvailableSecondary() with
71+
| Some idx ->
72+
return {
73+
Mode = AutoClusterMode.Secondary idx
74+
PrimaryFailedTimestamp = None
75+
PrimaryRecoveredTimestamp = None
76+
}, AutoClusterDecision.SwitchToSecondary idx
77+
| None ->
78+
return state, AutoClusterDecision.NoAction
79+
| _ ->
80+
return state, AutoClusterDecision.NoAction
81+
| AutoClusterMode.Secondary _, true ->
82+
match state.PrimaryRecoveredTimestamp with
83+
| None ->
84+
return { state with PrimaryRecoveredTimestamp = Some now }, AutoClusterDecision.NoAction
85+
| Some ts when now - ts >= config.SwitchBackDelay ->
86+
return {
87+
Mode = AutoClusterMode.Primary
88+
PrimaryFailedTimestamp = None
89+
PrimaryRecoveredTimestamp = None
90+
}, AutoClusterDecision.SwitchToPrimary
91+
| _ ->
92+
return state, AutoClusterDecision.NoAction
93+
| AutoClusterMode.Secondary _, false ->
94+
return { state with PrimaryRecoveredTimestamp = None }, AutoClusterDecision.NoAction
95+
}
96+
97+
// Orchestrator
98+
1699
type AutoClusterFailover
100+
internal
17101
(
18102
primary: ServiceInfo,
19103
secondary: ServiceInfo array,
20104
failoverDelay: TimeSpan,
21105
switchBackDelay: TimeSpan,
22-
checkInterval: TimeSpan
106+
checkInterval: TimeSpan,
107+
getCurrentTime: unit -> DateTime,
108+
probeAvailable: EndPointResolver -> Task<bool>,
109+
getTickScheduler: ((unit -> Task<unit>) -> IDisposable) option
23110
) =
24111

25112
let getAutoServiceInfo (serviceInfo: ServiceInfo) =
26113
{ ServiceInfo = serviceInfo; EndPointResolver = EndPointResolver(serviceInfo.ServiceUrl.Addresses) }
27114

115+
let config = {
116+
FailoverDelay = failoverDelay
117+
SwitchBackDelay = switchBackDelay
118+
SecondaryCount = secondary.Length
119+
}
120+
28121
let primaryServiceInfo = getAutoServiceInfo primary
29122
let secondaryServiceInfos = secondary |> Array.map getAutoServiceInfo
30123
let mutable currentServiceInfo = primaryServiceInfo
31-
let cts = new CancellationTokenSource()
124+
let mutable state = AutoClusterFailoverLogic.initialState
125+
126+
let mutable context: IServiceInfoProviderContext option = None
127+
let mutable isDisposed = false
32128

33-
let mutable recoveredTimestamp: DateTime option = None
34-
let mutable failedTimestamp: DateTime option = None
129+
let findFirstAvailableSecondary () =
130+
task {
131+
let mutable found = None
132+
let mutable i = 0
133+
while found.IsNone && i < secondaryServiceInfos.Length do
134+
let! avail = probeAvailable secondaryServiceInfos[i].EndPointResolver
135+
if avail then found <- Some i
136+
i <- i + 1
137+
if found.IsNone then
138+
Log.Logger.LogWarning("Available secondary cluster wasn't found")
139+
return found
140+
}
35141

36-
let probeAvailable (resolve: EndPointResolver) =
142+
let applyDecision (decision: AutoClusterDecision) =
37143
backgroundTask {
38-
let endpoint = resolve.Resolve()
39-
try
40-
use client = new TcpClient()
41-
use cts = new CancellationTokenSource(30_000)
42-
do! client.ConnectAsync(endpoint.Host, endpoint.Port, cts.Token)
43-
return true
44-
with ex ->
45-
Log.Logger.LogWarning(ex, "Failed to probe available, url: {0}", endpoint)
46-
return false
144+
match decision with
145+
| AutoClusterDecision.SwitchToSecondary idx ->
146+
let sec = secondaryServiceInfos[idx]
147+
Log.Logger.LogInformation("Switching to secondary cluster {0}", sec.ServiceInfo.ServiceUrl)
148+
currentServiceInfo <- sec
149+
match context with
150+
| Some ctx -> do! ctx.UpdateServiceInfo(sec.ServiceInfo)
151+
| None -> ()
152+
| AutoClusterDecision.SwitchToPrimary ->
153+
Log.Logger.LogInformation("Switching back to primary cluster {0}", primary.ServiceUrl)
154+
currentServiceInfo <- primaryServiceInfo
155+
match context with
156+
| Some ctx -> do! ctx.UpdateServiceInfo(primary)
157+
| None -> ()
158+
| AutoClusterDecision.NoAction -> ()
47159
}
48160

49-
let run (ctx: IServiceInfoProviderContext) =
50-
Log.Logger.LogInformation("Initializing AutoClusterFailover")
161+
let tick () =
51162
backgroundTask {
52-
while not cts.IsCancellationRequested do
53-
try
54-
do! Task.Delay(checkInterval, cts.Token)
55-
if currentServiceInfo = primaryServiceInfo then
56-
let! available = probeAvailable primaryServiceInfo.EndPointResolver
57-
if not available then
58-
match failedTimestamp with
59-
| None ->
60-
failedTimestamp <- Some DateTime.UtcNow
61-
| Some ts when DateTime.UtcNow - ts >= failoverDelay ->
62-
let! targetSecondary =
63-
task {
64-
let mutable found = None
65-
for sec in secondaryServiceInfos do
66-
if found.IsNone then
67-
let! avail = probeAvailable sec.EndPointResolver
68-
if avail then found <- Some sec
69-
return found
70-
}
71-
match targetSecondary with
72-
| Some sec ->
73-
Log.Logger.LogInformation("Switching to secondary cluster {0}", sec.ServiceInfo.ServiceUrl)
74-
currentServiceInfo <- sec
75-
do! ctx.UpdateServiceInfo(sec.ServiceInfo)
76-
failedTimestamp <- None
77-
| None ->
78-
Log.Logger.LogWarning("Could not find any available secondary cluster")
79-
| _ -> ()
80-
else
81-
failedTimestamp <- None
82-
else
83-
let! available = probeAvailable primaryServiceInfo.EndPointResolver
84-
if available then
85-
match recoveredTimestamp with
86-
| None ->
87-
recoveredTimestamp <- Some DateTime.UtcNow
88-
| Some ts when DateTime.UtcNow - ts >= switchBackDelay ->
89-
Log.Logger.LogInformation("Switching back to primary cluster {0}", primary)
90-
currentServiceInfo <- primaryServiceInfo
91-
do! ctx.UpdateServiceInfo(primary)
92-
recoveredTimestamp <- None
93-
| _ -> ()
94-
else
95-
recoveredTimestamp <- None
96-
with
97-
| :? OperationCanceledException when cts.IsCancellationRequested -> ()
98-
| :? TaskCanceledException when cts.IsCancellationRequested -> ()
99-
| Flatten ex ->
100-
Log.Logger.LogError(ex, "Error checking cluster")
163+
try
164+
let! primaryAvailable = probeAvailable primaryServiceInfo.EndPointResolver
165+
let now = getCurrentTime()
166+
let! newState, decision =
167+
AutoClusterFailoverLogic.step now config primaryAvailable findFirstAvailableSecondary state
168+
state <- newState
169+
do! applyDecision decision
170+
with Flatten ex ->
171+
Log.Logger.LogError(ex, "Error checking cluster")
101172
}
102-
|> ignore
103173

174+
let timer =
175+
match getTickScheduler with
176+
| None ->
177+
let t = new Timer(checkInterval.TotalMilliseconds)
178+
t.AutoReset <- false
179+
t.Elapsed.Add(fun _ ->
180+
backgroundTask {
181+
if not isDisposed then
182+
do! tick()
183+
try t.Start() with _ -> ()
184+
} |> ignore)
185+
t :> IDisposable
186+
| Some getScheduler ->
187+
getScheduler(tick)
188+
189+
/// Production constructor — uses real clock, TCP probe, and timer-based scheduler.
190+
new(primary, secondary, failoverDelay, switchBackDelay, checkInterval) =
191+
let defaultProbe (resolver: EndPointResolver) =
192+
backgroundTask {
193+
let endpoint = resolver.Resolve()
194+
try
195+
use client = new TcpClient()
196+
use cts = new CancellationTokenSource(30_000)
197+
do! client.ConnectAsync(endpoint.Host, endpoint.Port, cts.Token)
198+
return true
199+
with Flatten ex ->
200+
Log.Logger.LogWarning(ex, "Failed to probe available, url: {0}", endpoint)
201+
return false
202+
}
203+
new AutoClusterFailover(
204+
primary, secondary, failoverDelay, switchBackDelay, checkInterval,
205+
(fun () -> DateTime.UtcNow),
206+
defaultProbe,
207+
None
208+
)
104209

105210
interface IServiceInfoProvider with
106-
member this.Initialize(context: IServiceInfoProviderContext) =
107-
run context
108-
member this.GetServiceInfo() = currentServiceInfo.ServiceInfo
211+
member _.Initialize(ctx: IServiceInfoProviderContext) =
212+
Log.Logger.LogInformation("Initializing AutoClusterFailover")
213+
context <- Some ctx
214+
match getTickScheduler with
215+
| None ->
216+
// Start the production timer on Initialize
217+
(timer :?> Timer).Start()
218+
| Some _ ->
219+
// Test scheduler is already ready; ticks are driven externally
220+
()
221+
member _.GetServiceInfo() = currentServiceInfo.ServiceInfo
109222

110-
member this.Dispose() =
111-
cts.Cancel()
112-
cts.Dispose()
223+
member _.Dispose() =
224+
isDisposed <- true
225+
timer.Dispose()
113226

114227

115228
type AutoClusterFailoverBuilder() =
@@ -152,4 +265,3 @@ type AutoClusterFailoverBuilder() =
152265
switchBackDelay,
153266
checkInterval
154267
) :> IServiceInfoProvider
155-

src/Pulsar.Client/Internal/NegativeAcksTracker.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type internal NegativeAcksTracker(prefix: string,
8181
let timer = new Timer(timerIntervalms)
8282
timer.AutoReset <- true
8383
timer.Elapsed.Add(fun _ -> post mb TickTime)
84-
timer.Start() |> ignore
84+
timer.Start()
8585
timer :> IDisposable
8686
| Some getScheduler ->
8787
getScheduler(fun _ -> post mb TickTime)

src/Pulsar.Client/Pulsar.Client.fsproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@
77
<Title>Pulsar.Client</Title>
88
<RootNamespace>Pulsar.Client</RootNamespace>
99
<AssemblyName>Pulsar.Client</AssemblyName>
10-
<Version>3.15.0</Version>
10+
<Version>3.15.1</Version>
1111
<Company>F# community</Company>
1212
<Description>.NET client library for Apache Pulsar</Description>
1313
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
14-
<PackageReleaseNotes>Auto cluster failover support</PackageReleaseNotes>
14+
<PackageReleaseNotes>Auto cluster failover refactoring</PackageReleaseNotes>
1515
<PackageLicenseExpression>MIT</PackageLicenseExpression>
1616
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
1717
<RepositoryType>git</RepositoryType>
1818
<PackageTags>Apache;Pulsar;F#;FSharp</PackageTags>
1919
<Authors>Vladimir Shchur and contributors</Authors>
20-
<PackageVersion>3.15.0</PackageVersion>
20+
<PackageVersion>3.15.1</PackageVersion>
2121
<DebugType>portable</DebugType>
2222
<GenerateDocumentationFile>true</GenerateDocumentationFile>
2323
<PackageReadmeFile>README.md</PackageReadmeFile>

0 commit comments

Comments
 (0)