Skip to content

Commit 1ff5c57

Browse files
committed
AutoClusterFailover refactoring and unit tests, initial commit
1 parent 8866929 commit 1ff5c57

8 files changed

Lines changed: 716 additions & 88 deletions

File tree

src/Pulsar.Client/Api/AutoClusterFailover.fs

Lines changed: 208 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,112 +4,248 @@ open System
44
open System.Net.Sockets
55
open System.Threading
66
open System.Threading.Tasks
7+
open System.Timers
78
open Microsoft.Extensions.Logging
89
open Pulsar.Client.Common
910
open Pulsar.Client.Internal
1011

11-
type private AutoServiceInfo = {
12+
type internal AutoServiceInfo = {
1213
ServiceInfo: ServiceInfo
1314
EndPointResolver: EndPointResolver
1415
}
1516

17+
// Pure state machine
18+
19+
[<RequireQualifiedAccess>]
20+
type internal AutoClusterMode =
21+
| Primary
22+
| Secondary of index: int
23+
24+
type internal AutoClusterState = {
25+
Mode: AutoClusterMode
26+
FailedTimestamp: DateTime option
27+
RecoveredTimestamp: DateTime option
28+
}
29+
30+
[<RequireQualifiedAccess>]
31+
type internal AutoClusterDecision =
32+
| Noop
33+
| SwitchToSecondary of index: int
34+
| SwitchToPrimary
35+
36+
type internal AutoClusterConfig = {
37+
FailoverDelay: TimeSpan
38+
SwitchBackDelay: TimeSpan
39+
SecondaryCount: int
40+
}
41+
42+
[<RequireQualifiedAccess>]
43+
module internal AutoClusterFailoverLogic =
44+
45+
let initialState = {
46+
Mode = AutoClusterMode.Primary
47+
FailedTimestamp = None
48+
RecoveredTimestamp = None
49+
}
50+
51+
/// Determines whether secondary endpoints need to be probed this tick.
52+
/// This avoids unnecessary network calls when the primary is healthy or the
53+
/// failover delay hasn't elapsed yet.
54+
let shouldProbeSecondaries
55+
(now: DateTime)
56+
(config: AutoClusterConfig)
57+
(primaryAvailable: bool)
58+
(state: AutoClusterState)
59+
: bool =
60+
match state.Mode with
61+
| AutoClusterMode.Primary ->
62+
not primaryAvailable &&
63+
match state.FailedTimestamp with
64+
| Some ts -> now - ts >= config.FailoverDelay
65+
| None -> false
66+
| AutoClusterMode.Secondary _ ->
67+
false
68+
69+
/// Single pure state transition covering both primary and secondary modes.
70+
/// - primaryAvailable: result of probing the primary endpoint.
71+
/// - availableSecondaryIndex: index of the first available secondary
72+
/// (None means no secondary was probed or none was available).
73+
let step
74+
(now: DateTime)
75+
(config: AutoClusterConfig)
76+
(primaryAvailable: bool)
77+
(availableSecondaryIndex: int option)
78+
(state: AutoClusterState)
79+
: AutoClusterState * AutoClusterDecision =
80+
81+
match state.Mode with
82+
| AutoClusterMode.Primary ->
83+
if primaryAvailable then
84+
{ state with FailedTimestamp = None }, AutoClusterDecision.Noop
85+
else
86+
match state.FailedTimestamp with
87+
| None ->
88+
{ state with FailedTimestamp = Some now }, AutoClusterDecision.Noop
89+
| Some ts when now - ts >= config.FailoverDelay ->
90+
match availableSecondaryIndex with
91+
| Some idx ->
92+
{ Mode = AutoClusterMode.Secondary idx
93+
FailedTimestamp = None
94+
RecoveredTimestamp = None },
95+
AutoClusterDecision.SwitchToSecondary idx
96+
| None ->
97+
state, AutoClusterDecision.Noop
98+
| _ ->
99+
state, AutoClusterDecision.Noop
100+
101+
| AutoClusterMode.Secondary _ ->
102+
if primaryAvailable then
103+
match state.RecoveredTimestamp with
104+
| None ->
105+
{ state with RecoveredTimestamp = Some now }, AutoClusterDecision.Noop
106+
| Some ts when now - ts >= config.SwitchBackDelay ->
107+
{ Mode = AutoClusterMode.Primary
108+
FailedTimestamp = None
109+
RecoveredTimestamp = None },
110+
AutoClusterDecision.SwitchToPrimary
111+
| _ ->
112+
state, AutoClusterDecision.Noop
113+
else
114+
{ state with RecoveredTimestamp = None }, AutoClusterDecision.Noop
115+
116+
// Orchestrator
117+
16118
type AutoClusterFailover
119+
internal
17120
(
18121
primary: ServiceInfo,
19122
secondary: ServiceInfo array,
20123
failoverDelay: TimeSpan,
21124
switchBackDelay: TimeSpan,
22-
checkInterval: TimeSpan
125+
checkInterval: TimeSpan,
126+
getCurrentTime: unit -> DateTime,
127+
probeAvailable: EndPointResolver -> Task<bool>,
128+
getTickScheduler: ((unit -> Task<unit>) -> IDisposable) option
23129
) =
24130

25131
let getAutoServiceInfo (serviceInfo: ServiceInfo) =
26132
{ ServiceInfo = serviceInfo; EndPointResolver = EndPointResolver(serviceInfo.ServiceUrl.Addresses) }
27133

134+
let config = {
135+
FailoverDelay = failoverDelay
136+
SwitchBackDelay = switchBackDelay
137+
SecondaryCount = secondary.Length
138+
}
139+
28140
let primaryServiceInfo = getAutoServiceInfo primary
29141
let secondaryServiceInfos = secondary |> Array.map getAutoServiceInfo
30142
let mutable currentServiceInfo = primaryServiceInfo
31-
let cts = new CancellationTokenSource()
143+
let mutable state = AutoClusterFailoverLogic.initialState
32144

33-
let mutable recoveredTimestamp: DateTime option = None
34-
let mutable failedTimestamp: DateTime option = None
145+
let mutable context: IServiceInfoProviderContext option = None
146+
let mutable isDisposed = false
35147

36-
let probeAvailable (resolve: EndPointResolver) =
148+
let findFirstAvailableSecondary () =
149+
task {
150+
let mutable found = None
151+
let mutable i = 0
152+
while found.IsNone && i < secondaryServiceInfos.Length do
153+
let! avail = probeAvailable secondaryServiceInfos[i].EndPointResolver
154+
if avail then found <- Some i
155+
i <- i + 1
156+
return found
157+
}
158+
159+
let applyDecision (decision: AutoClusterDecision) =
37160
backgroundTask {
38-
let endpoint = resolve.Resolve()
39-
try
40-
use client = new TcpClient()
41-
use cts = new CancellationTokenSource(30_000)
42-
do! client.ConnectAsync(endpoint.Host, endpoint.Port, cts.Token)
43-
return true
44-
with ex ->
45-
Log.Logger.LogWarning(ex, "Failed to probe available, url: {0}", endpoint)
46-
return false
161+
match decision with
162+
| AutoClusterDecision.SwitchToSecondary idx ->
163+
let sec = secondaryServiceInfos[idx]
164+
Log.Logger.LogInformation("Switching to secondary cluster {0}", sec.ServiceInfo.ServiceUrl)
165+
currentServiceInfo <- sec
166+
match context with
167+
| Some ctx -> do! ctx.UpdateServiceInfo(sec.ServiceInfo)
168+
| None -> ()
169+
| AutoClusterDecision.SwitchToPrimary ->
170+
Log.Logger.LogInformation("Switching back to primary cluster {0}", primary)
171+
currentServiceInfo <- primaryServiceInfo
172+
match context with
173+
| Some ctx -> do! ctx.UpdateServiceInfo(primary)
174+
| None -> ()
175+
| AutoClusterDecision.Noop -> ()
47176
}
48177

49-
let run (ctx: IServiceInfoProviderContext) =
50-
Log.Logger.LogInformation("Initializing AutoClusterFailover")
178+
let tick () =
51179
backgroundTask {
52-
while not cts.IsCancellationRequested do
53-
try
54-
do! Task.Delay(checkInterval, cts.Token)
55-
if currentServiceInfo = primaryServiceInfo then
56-
let! available = probeAvailable primaryServiceInfo.EndPointResolver
57-
if not available then
58-
match failedTimestamp with
59-
| None ->
60-
failedTimestamp <- Some DateTime.UtcNow
61-
| Some ts when DateTime.UtcNow - ts >= failoverDelay ->
62-
let! targetSecondary =
63-
task {
64-
let mutable found = None
65-
for sec in secondaryServiceInfos do
66-
if found.IsNone then
67-
let! avail = probeAvailable sec.EndPointResolver
68-
if avail then found <- Some sec
69-
return found
70-
}
71-
match targetSecondary with
72-
| Some sec ->
73-
Log.Logger.LogInformation("Switching to secondary cluster {0}", sec.ServiceInfo.ServiceUrl)
74-
currentServiceInfo <- sec
75-
do! ctx.UpdateServiceInfo(sec.ServiceInfo)
76-
failedTimestamp <- None
77-
| None ->
78-
Log.Logger.LogWarning("Could not find any available secondary cluster")
79-
| _ -> ()
80-
else
81-
failedTimestamp <- None
180+
try
181+
let! primaryAvailable = probeAvailable primaryServiceInfo.EndPointResolver
182+
let now = getCurrentTime()
183+
let! availableSecondaryIndex =
184+
if AutoClusterFailoverLogic.shouldProbeSecondaries now config primaryAvailable state then
185+
findFirstAvailableSecondary()
82186
else
83-
let! available = probeAvailable primaryServiceInfo.EndPointResolver
84-
if available then
85-
match recoveredTimestamp with
86-
| None ->
87-
recoveredTimestamp <- Some DateTime.UtcNow
88-
| Some ts when DateTime.UtcNow - ts >= switchBackDelay ->
89-
Log.Logger.LogInformation("Switching back to primary cluster {0}", primary)
90-
currentServiceInfo <- primaryServiceInfo
91-
do! ctx.UpdateServiceInfo(primary)
92-
recoveredTimestamp <- None
93-
| _ -> ()
94-
else
95-
recoveredTimestamp <- None
96-
with
97-
| :? OperationCanceledException when cts.IsCancellationRequested -> ()
98-
| :? TaskCanceledException when cts.IsCancellationRequested -> ()
99-
| Flatten ex ->
100-
Log.Logger.LogError(ex, "Error checking cluster")
187+
Task.FromResult(None)
188+
let newState, decision =
189+
AutoClusterFailoverLogic.step now config primaryAvailable availableSecondaryIndex state
190+
state <- newState
191+
do! applyDecision decision
192+
with
193+
| Flatten ex ->
194+
Log.Logger.LogError(ex, "Error checking cluster")
101195
}
102-
|> ignore
103196

197+
let timer =
198+
match getTickScheduler with
199+
| None ->
200+
let t = new Timer(checkInterval.TotalMilliseconds)
201+
t.AutoReset <- false
202+
t.Elapsed.Add(fun _ ->
203+
backgroundTask {
204+
do! tick()
205+
if not isDisposed then
206+
try t.Start() with _ -> ()
207+
} |> ignore)
208+
t :> IDisposable
209+
| Some getScheduler ->
210+
getScheduler(tick)
211+
212+
/// Production constructor — uses real clock, TCP probe, and timer-based scheduler.
213+
new(primary, secondary, failoverDelay, switchBackDelay, checkInterval) =
214+
let defaultProbe (resolver: EndPointResolver) =
215+
backgroundTask {
216+
let endpoint = resolver.Resolve()
217+
try
218+
use client = new TcpClient()
219+
use cts = new CancellationTokenSource(30_000)
220+
do! client.ConnectAsync(endpoint.Host, endpoint.Port, cts.Token)
221+
return true
222+
with ex ->
223+
Log.Logger.LogWarning(ex, "Failed to probe available, url: {0}", endpoint)
224+
return false
225+
}
226+
new AutoClusterFailover(
227+
primary, secondary, failoverDelay, switchBackDelay, checkInterval,
228+
(fun () -> DateTime.UtcNow),
229+
defaultProbe,
230+
None
231+
)
104232

105233
interface IServiceInfoProvider with
106-
member this.Initialize(context: IServiceInfoProviderContext) =
107-
run context
108-
member this.GetServiceInfo() = currentServiceInfo.ServiceInfo
234+
member _.Initialize(ctx: IServiceInfoProviderContext) =
235+
Log.Logger.LogInformation("Initializing AutoClusterFailover")
236+
context <- Some ctx
237+
match getTickScheduler with
238+
| None ->
239+
// Start the production timer on Initialize
240+
(timer :?> Timer).Start()
241+
| Some _ ->
242+
// Test scheduler is already ready; ticks are driven externally
243+
()
244+
member _.GetServiceInfo() = currentServiceInfo.ServiceInfo
109245

110-
member this.Dispose() =
111-
cts.Cancel()
112-
cts.Dispose()
246+
member _.Dispose() =
247+
isDisposed <- true
248+
timer.Dispose()
113249

114250

115251
type AutoClusterFailoverBuilder() =
@@ -152,4 +288,3 @@ type AutoClusterFailoverBuilder() =
152288
switchBackDelay,
153289
checkInterval
154290
) :> IServiceInfoProvider
155-

src/Pulsar.Client/Internal/NegativeAcksTracker.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type internal NegativeAcksTracker(prefix: string,
8181
let timer = new Timer(timerIntervalms)
8282
timer.AutoReset <- true
8383
timer.Elapsed.Add(fun _ -> post mb TickTime)
84-
timer.Start() |> ignore
84+
timer.Start()
8585
timer :> IDisposable
8686
| Some getScheduler ->
8787
getScheduler(fun _ -> post mb TickTime)

0 commit comments

Comments
 (0)