Skip to content

Commit f2f50d2

Browse files
RobertIndiegeniusjoeninjazhouLanayx
authored
[2.x] Add cluster failover support for 2.x (#352)
* feat: Add HttpLookupService to support http serviceUrl (#297) Co-authored-by: ninjazhou <ninjazhou@tencent.com> (cherry picked from commit 203754b) * Fix CI for HTTP lookup service * Retry get-last-message-id after transient consumer errors * Cluster failover (#350) * Initial implementation * Fixed unit test + config refactoring * Made service addresses and topic to be arrays * Cleanup * Autoclusterfrilover updates * Updates for ControlledClusterFailover * ControlledClusterFailover - header support * Refactoring * Fixed typo * Removed unused failover policy * Refactor timestamps to use DateTime option for better clarity in failover logic * Made UpdateServiceUrl method async * Operations reorder * Introduced ServiceInfo to group serviceUrl, authentication and tlsTrustCertificate * Pulsar client updates * Fixes for copilot review comments * More copilot comments fixes * Even more copilot fixes * Final fixes of copilot comments (cherry picked from commit 944a385) * Fix .NET 7 CI compatibility * Cleanup * Cleanup codes --------- Co-authored-by: zhou zhuohan <843520313@qq.com> Co-authored-by: ninjazhou <ninjazhou@tencent.com> Co-authored-by: Vladimir Shchur <odindafna2006@rambler.ru>
1 parent b945966 commit f2f50d2

34 files changed

Lines changed: 1197 additions & 168 deletions

src/Pulsar.Client/Api/AuthenticationDataProvider.fs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
open Pulsar.Client.Common
44
open System.Text
55
open System.Security.Cryptography.X509Certificates
6+
open System.Collections.Generic
67

78
type AuthenticationDataProvider() =
89

@@ -26,6 +27,16 @@ type AuthenticationDataProvider() =
2627
default this.GetCommandData() =
2728
""
2829

30+
//HTTP
31+
32+
abstract member HasDataForHttp: unit -> bool
33+
default this.HasDataForHttp() =
34+
false
35+
36+
abstract member GetHttpHeaders: unit -> KeyValuePair<string, string> seq
37+
default this.GetHttpHeaders() =
38+
Seq.empty
39+
2940
abstract member Authenticate: AuthData -> AuthData
3041
default this.Authenticate authData =
3142
let bytes =
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
namespace Pulsar.Client.Api
2+
3+
open System
4+
open System.Net.Sockets
5+
open System.Threading
6+
open System.Threading.Tasks
7+
open Microsoft.Extensions.Logging
8+
open Pulsar.Client.Common
9+
open Pulsar.Client.Internal
10+
11+
type private AutoServiceInfo = {
12+
ServiceInfo: ServiceInfo
13+
EndPointResolver: EndPointResolver
14+
}
15+
16+
type AutoClusterFailover
17+
(
18+
primary: ServiceInfo,
19+
secondary: ServiceInfo array,
20+
failoverDelay: TimeSpan,
21+
switchBackDelay: TimeSpan,
22+
checkInterval: TimeSpan
23+
) =
24+
25+
let getAutoServiceInfo (serviceInfo: ServiceInfo) =
26+
{ ServiceInfo = serviceInfo; EndPointResolver = EndPointResolver(serviceInfo.ServiceUrl.Addresses) }
27+
28+
let primaryServiceInfo = getAutoServiceInfo primary
29+
let secondaryServiceInfos = secondary |> Array.map getAutoServiceInfo
30+
let mutable currentServiceInfo = primaryServiceInfo
31+
let cts = new CancellationTokenSource()
32+
33+
let mutable recoveredTimestamp: DateTime option = None
34+
let mutable failedTimestamp: DateTime option = None
35+
36+
let probeAvailable (resolve: EndPointResolver) =
37+
backgroundTask {
38+
let endpoint = resolve.Resolve()
39+
try
40+
use client = new TcpClient()
41+
let connectTask = client.ConnectAsync(endpoint.Host, endpoint.Port)
42+
let! completedTask = Task.WhenAny(connectTask, Task.Delay(30_000))
43+
if completedTask = (connectTask :> Task) then
44+
do! connectTask
45+
return true
46+
else
47+
return false
48+
with ex ->
49+
Log.Logger.LogWarning(ex, "Failed to probe available, url: {0}", endpoint)
50+
return false
51+
}
52+
53+
let run (ctx: IServiceInfoProviderContext) =
54+
Log.Logger.LogInformation("Initializing AutoClusterFailover")
55+
backgroundTask {
56+
while not cts.IsCancellationRequested do
57+
try
58+
do! Task.Delay(checkInterval, cts.Token)
59+
if currentServiceInfo = primaryServiceInfo then
60+
let! available = probeAvailable primaryServiceInfo.EndPointResolver
61+
if not available then
62+
match failedTimestamp with
63+
| None ->
64+
failedTimestamp <- Some DateTime.UtcNow
65+
| Some ts when DateTime.UtcNow - ts >= failoverDelay ->
66+
let! targetSecondary =
67+
task {
68+
let mutable found = None
69+
for sec in secondaryServiceInfos do
70+
if found.IsNone then
71+
let! avail = probeAvailable sec.EndPointResolver
72+
if avail then found <- Some sec
73+
return found
74+
}
75+
match targetSecondary with
76+
| Some sec ->
77+
Log.Logger.LogInformation("Switching to secondary cluster {0}", sec.ServiceInfo.ServiceUrl)
78+
currentServiceInfo <- sec
79+
do! ctx.UpdateServiceInfo(sec.ServiceInfo)
80+
failedTimestamp <- None
81+
| None ->
82+
Log.Logger.LogWarning("Could not find any available secondary cluster")
83+
| _ -> ()
84+
else
85+
failedTimestamp <- None
86+
else
87+
let! available = probeAvailable primaryServiceInfo.EndPointResolver
88+
if available then
89+
match recoveredTimestamp with
90+
| None ->
91+
recoveredTimestamp <- Some DateTime.UtcNow
92+
| Some ts when DateTime.UtcNow - ts >= switchBackDelay ->
93+
Log.Logger.LogInformation("Switching back to primary cluster {0}", primary)
94+
currentServiceInfo <- primaryServiceInfo
95+
do! ctx.UpdateServiceInfo(primary)
96+
recoveredTimestamp <- None
97+
| _ -> ()
98+
else
99+
recoveredTimestamp <- None
100+
with
101+
| :? OperationCanceledException when cts.IsCancellationRequested -> ()
102+
| :? TaskCanceledException when cts.IsCancellationRequested -> ()
103+
| Flatten ex ->
104+
Log.Logger.LogError(ex, "Error checking cluster")
105+
}
106+
|> ignore
107+
108+
109+
interface IServiceInfoProvider with
110+
member this.Initialize(context: IServiceInfoProviderContext) =
111+
run context
112+
member this.GetServiceInfo() = currentServiceInfo.ServiceInfo
113+
114+
member this.Dispose() =
115+
cts.Cancel()
116+
cts.Dispose()
117+
118+
119+
type AutoClusterFailoverBuilder() =
120+
let mutable primary = None
121+
let mutable secondary = [||]
122+
let mutable failoverDelay = TimeSpan.FromSeconds(30.0)
123+
let mutable switchBackDelay = TimeSpan.FromSeconds(60.0)
124+
let mutable checkInterval = TimeSpan.FromSeconds(30.0)
125+
126+
member this.Primary(serviceInfo: ServiceInfo) =
127+
primary <- serviceInfo |> invalidArgIfDefault "ServiceInfo can't be null" |> Some
128+
this
129+
130+
member this.Secondary(serviceInfos: ServiceInfo seq) =
131+
secondary <- serviceInfos |> Seq.toArray
132+
this
133+
134+
member this.FailoverDelay(delay: TimeSpan) =
135+
failoverDelay <- delay
136+
this
137+
138+
member this.SwitchBackDelay(delay: TimeSpan) =
139+
switchBackDelay <- delay
140+
this
141+
142+
member this.CheckInterval(interval: TimeSpan) =
143+
checkInterval <- interval
144+
this
145+
146+
member this.Build() : IServiceInfoProvider =
147+
if primary.IsNone then
148+
invalidArg "primary" "Primary serviceInfo must be set"
149+
if Array.isEmpty secondary then
150+
invalidArg "secondary" "Secondary serviceInfo list should have at least one item"
151+
152+
new AutoClusterFailover(
153+
primary.Value,
154+
secondary,
155+
failoverDelay,
156+
switchBackDelay,
157+
checkInterval
158+
) :> IServiceInfoProvider

src/Pulsar.Client/Api/Configuration.fs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
namespace Pulsar.Client.Api
1+
namespace Pulsar.Client.Api
22

33
open FSharp.UMX
44
open Pulsar.Client.Common
@@ -9,11 +9,12 @@ open System.Security.Cryptography.X509Certificates
99

1010
type PulsarClientConfiguration =
1111
{
12-
ServiceAddresses: Uri list
12+
ServiceAddresses: Uri array
1313
OperationTimeout: TimeSpan
1414
StatsInterval: TimeSpan
1515
MaxNumberOfRejectedRequestPerConnection: int
1616
UseTls: bool
17+
Scheme: string
1718
TlsHostnameVerificationEnable: bool
1819
TlsAllowInsecureConnection: bool
1920
TlsTrustCertificate: X509Certificate2
@@ -26,14 +27,16 @@ type PulsarClientConfiguration =
2627
InitialBackoffInterval: TimeSpan
2728
MaxBackoffInterval: TimeSpan
2829
KeepAliveInterval: TimeSpan
30+
ServiceInfoProvider: IServiceInfoProvider option
2931
}
3032
static member Default =
3133
{
32-
ServiceAddresses = List.empty<Uri>
34+
ServiceAddresses = [||]
3335
OperationTimeout = TimeSpan.FromMilliseconds(30000.0)
3436
StatsInterval = TimeSpan.Zero
3537
MaxNumberOfRejectedRequestPerConnection = 50
3638
UseTls = false
39+
Scheme = "pulsar"
3740
TlsHostnameVerificationEnable = false
3841
TlsAllowInsecureConnection = false
3942
TlsTrustCertificate = null
@@ -46,11 +49,12 @@ type PulsarClientConfiguration =
4649
InitialBackoffInterval = TimeSpan.FromMilliseconds(100.0)
4750
MaxBackoffInterval = TimeSpan.FromSeconds(60.0)
4851
KeepAliveInterval = TimeSpan.FromSeconds(30.0)
52+
ServiceInfoProvider = None
4953
}
5054

5155
type ConsumerConfiguration<'T> =
5256
{
53-
Topics: TopicName seq
57+
Topics: TopicName array
5458
TopicsPattern: string
5559
ConsumerName: string
5660
SubscriptionName: SubscriptionName
@@ -82,10 +86,10 @@ type ConsumerConfiguration<'T> =
8286
ExpireTimeOfIncompleteChunkedMessage: TimeSpan
8387
ReplicateSubscriptionState: bool
8488
}
85-
member this.SingleTopic with get() = this.Topics |> Seq.head
89+
member this.SingleTopic with get() = this.Topics |> Array.head
8690
static member Default =
8791
{
88-
Topics = []
92+
Topics = [||]
8993
TopicsPattern = ""
9094
ConsumerName = ""
9195
SubscriptionName = %""

src/Pulsar.Client/Api/ConsumerBuilder.fs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
8080
{ c with
8181
DeadLetterProcessor = deadLettersProcessor c newPolicy
8282
DeadLetterPolicy = Some newPolicy
83-
Topics = seq { yield! c.Topics; yield TopicName(newPolicy.RetryLetterTopic) } |> Seq.cache }
83+
Topics = [| yield! c.Topics; yield TopicName(newPolicy.RetryLetterTopic) |] }
8484
else
8585
c
8686
)
@@ -97,10 +97,8 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
9797
{ config with
9898
Topics = topic
9999
|> invalidArgIfBlankString "Topic must not be blank"
100-
|> fun t -> seq { TopicName(t.Trim()) }
101-
|> Seq.append config.Topics
102-
|> Seq.distinct
103-
|> Seq.cache }
100+
|> fun t -> [| yield TopicName(t.Trim()); yield! config.Topics |]
101+
|> Array.distinct }
104102
|> this.With
105103

106104
member this.Topics (topics: string seq) =
@@ -110,7 +108,7 @@ type ConsumerBuilder<'T> private (createConsumerAsync, createProducerAsync, conf
110108
|> Seq.map (fun t -> TopicName(t.Trim()))
111109
|> Seq.append config.Topics
112110
|> Seq.distinct
113-
|> Seq.cache }
111+
|> Array.ofSeq }
114112
|> this.With
115113

116114
member this.TopicsPattern pattern =
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
namespace Pulsar.Client.Api
2+
3+
open System
4+
open System.Collections.Generic
5+
open System.Net.Http
6+
open System.Net.Http.Headers
7+
open System.Threading
8+
open System.Threading.Tasks
9+
open System.Text.Json
10+
open Pulsar.Client.Common
11+
open Microsoft.Extensions.Logging
12+
13+
[<CLIMutable>]
14+
type ControlledFailoverResponse = {
15+
ServiceUrl: string
16+
TlsTrustCertsFilePath: string
17+
AuthPluginClassName: string
18+
AuthParamsString: string
19+
}
20+
// Example
21+
// {
22+
// "serviceUrl": "pulsar+ssl://target:6651",
23+
// "tlsTrustCertsFilePath": "/security/ca.cert.pem",
24+
// "authPluginClassName":"org.apache.pulsar.client.impl.auth.AuthenticationTls",
25+
// "authParamsString": " \"tlsCertFile\": \"/security/client.cert.pem\"
26+
// \"tlsKeyFile\": \"/security/client-pk8.pem\" "
27+
// }
28+
29+
type ControlledClusterFailover
30+
(
31+
providerUrl: string,
32+
checkInterval: TimeSpan,
33+
defaultServiceInfo: ServiceInfo,
34+
urlProviderHeader: IReadOnlyDictionary<string, string>
35+
) =
36+
37+
let jsonOptions = JsonSerializerOptions(JsonSerializerDefaults.Web)
38+
let mutable currentServiceInfo = defaultServiceInfo
39+
let cts = new CancellationTokenSource()
40+
41+
let run (ctx: IServiceInfoProviderContext) =
42+
backgroundTask {
43+
// https://learn.microsoft.com/en-us/dotnet/fundamentals/networking/http/httpclient-guidelines
44+
use httpClient = new HttpClient(new HttpClientHandler())
45+
httpClient.DefaultRequestHeaders.Accept.Add(MediaTypeWithQualityHeaderValue("application/json"))
46+
for header in urlProviderHeader do
47+
httpClient.DefaultRequestHeaders.Add(header.Key, header.Value)
48+
while not cts.IsCancellationRequested do
49+
try
50+
do! Task.Delay(checkInterval, cts.Token)
51+
use! response = httpClient.GetAsync(providerUrl, cts.Token)
52+
if response.IsSuccessStatusCode then
53+
use! stream = response.Content.ReadAsStreamAsync()
54+
let! failoverResponse =
55+
JsonSerializer.DeserializeAsync<ControlledFailoverResponse>(stream, jsonOptions)
56+
let newServiceUrl = failoverResponse.ServiceUrl
57+
// This is a minimal implementation of ControlledClusterFailover
58+
if not (String.IsNullOrEmpty(newServiceUrl))
59+
&& newServiceUrl <> currentServiceInfo.ServiceUrl.OriginalString then
60+
let newServiceInfo = ServiceInfo(newServiceUrl)
61+
Log.Logger.LogInformation("ControlledClusterFailover switching to {0}", newServiceUrl)
62+
currentServiceInfo <- newServiceInfo
63+
do! ctx.UpdateServiceInfo(newServiceInfo)
64+
else
65+
Log.Logger.LogWarning("ControlledClusterFailover failed to fetch config from {0}, status {1}", providerUrl, response.StatusCode)
66+
with
67+
| :? OperationCanceledException when cts.IsCancellationRequested -> ()
68+
| :? TaskCanceledException when cts.IsCancellationRequested -> ()
69+
| Flatten ex ->
70+
Log.Logger.LogError(ex, "Error checking controlled cluster failover url")
71+
}
72+
|> ignore
73+
74+
interface IServiceInfoProvider with
75+
member this.Initialize(context: IServiceInfoProviderContext) =
76+
run context
77+
member this.GetServiceInfo() = currentServiceInfo
78+
member this.Dispose() =
79+
cts.Cancel()
80+
cts.Dispose()
81+
82+
type ControlledClusterFailoverBuilder() =
83+
let mutable providerUrl = ""
84+
let mutable checkInterval = TimeSpan.FromMinutes(1.0)
85+
let mutable defaultServiceInfo = None
86+
let mutable urlProviderHeader = readOnlyDict []
87+
88+
member this.ProviderUrl(url: string) =
89+
providerUrl <- url
90+
this
91+
92+
member this.CheckInterval(interval: TimeSpan) =
93+
checkInterval <- interval
94+
this
95+
96+
member this.DefaultServiceInfo(serviceInfo: ServiceInfo) =
97+
defaultServiceInfo <- serviceInfo |> invalidArgIfDefault "ServiceInfo can't be null" |> Some
98+
this
99+
100+
member this.UrlProviderHeader(header: IReadOnlyDictionary<string, string>) =
101+
urlProviderHeader <- header
102+
this
103+
104+
member this.Build() : IServiceInfoProvider =
105+
if String.IsNullOrEmpty(providerUrl) then
106+
invalidArg "providerUrl" "providerUrl shouldn't be null or empty"
107+
if defaultServiceInfo.IsNone then
108+
invalidArg "defaultServiceInfo" "defaultServiceInfo must be set"
109+
if isNull urlProviderHeader then
110+
invalidArg "urlProviderHeader" "UrlProviderHeader shouldn't be null"
111+
112+
new ControlledClusterFailover(
113+
providerUrl,
114+
checkInterval,
115+
defaultServiceInfo.Value,
116+
urlProviderHeader
117+
) :> IServiceInfoProvider

0 commit comments

Comments
 (0)