11using System ;
2+ using System . Buffers ;
23using System . Diagnostics ;
34using System . Net ;
5+ using System . Threading ;
46using System . Threading . Tasks ;
57
68namespace StackExchange . Redis ;
79
810public sealed partial class HealthCheck
911{
12+ /// <summary>
13+ /// Evaluate the health of the specified multiplexer, by evaluating all endpoints.
14+ /// </summary>
15+ public Task < HealthCheckResult > CheckHealthAsync ( IConnectionMultiplexer multiplexer )
16+ => multiplexer . IsConnected ? CheckHealthCoreAsync ( multiplexer ) : HealthCheckProbe . UnhealthyTask ;
17+
18+ private async Task < HealthCheckResult > CheckHealthCoreAsync ( IConnectionMultiplexer multiplexer )
19+ {
20+ try
21+ {
22+ Task < HealthCheckResult > [ ] pending ;
23+ if ( multiplexer is IInternalConnectionMultiplexer internalMultiplexer )
24+ {
25+ var snapshot = internalMultiplexer . GetServerSnapshot ( ) ;
26+ pending = GetReusablePending ( ref _reusablePending , snapshot . Length ) ;
27+ for ( int i = 0 ; i < pending . Length ; i ++ )
28+ {
29+ pending [ i ] = CheckHealthAsync ( snapshot [ i ] . GetRedisServer ( null ) ) ;
30+ }
31+ }
32+ else
33+ {
34+ var servers = multiplexer . GetServers ( ) ;
35+ pending = GetReusablePending ( ref _reusablePending , servers . Length ) ;
36+ for ( int i = 0 ; i < pending . Length ; i ++ )
37+ {
38+ pending [ i ] = CheckHealthAsync ( servers [ i ] ) ;
39+ }
40+ }
41+ var result = await CollateAsync ( pending , TotalTimeoutMillis ( ) ) . ForAwait ( ) ;
42+
43+ // on successful completion (regardless of outcome), we can reuse the pending array
44+ PutReusablePending ( ref _reusablePending , ref pending ) ;
45+ return result ;
46+ }
47+ catch
48+ {
49+ // definitely unhappy
50+ return HealthCheckResult . Unhealthy ;
51+ }
52+ }
53+
54+ internal int TotalTimeoutMillis ( )
55+ {
56+ int count = ProbeCount ;
57+ if ( count <= 0 )
58+ {
59+ Debug . Fail ( "We shouldn't get as far as calculating timeouts with a non-positive probe count." ) ;
60+ return 0 ;
61+ }
62+
63+ TimeSpan probeTimeout = ProbeTimeout , probeInterval = ProbeInterval ;
64+
65+ // the first probe doesn't have an interval before it, the rest do
66+ var totalTicks = probeTimeout . Ticks
67+ + ( ( probeTimeout . Ticks + probeInterval . Ticks ) * ( count - 1 ) ) ;
68+ var millis = ( int ) TimeSpan . FromTicks ( totalTicks ) . TotalMilliseconds ;
69+ Debug . Assert ( millis > 0 , "Total timeout should be positive" ) ;
70+ return millis ;
71+ }
72+
73+ // apply timeout and collation logic to a group of probes
74+ internal static async Task < HealthCheckResult > CollateAsync ( Task < HealthCheckResult > [ ] probes , int timeoutMilliseconds )
75+ {
76+ var pendingAll = Task . WhenAll ( probes ) . ObserveErrors ( ) ;
77+ int success = 0 , failure = 0 ;
78+
79+ if ( await pendingAll . TimeoutAfter ( timeoutMilliseconds ) . ForAwait ( ) )
80+ {
81+ // all completed inside timeout; all results should now be available
82+ for ( int i = 0 ; i < probes . Length ; i ++ )
83+ {
84+ var individualResult = await probes [ i ] . ForAwait ( ) ;
85+ switch ( individualResult )
86+ {
87+ case HealthCheckResult . Healthy : success ++ ; break ;
88+ case HealthCheckResult . Unhealthy : failure ++ ; break ;
89+ }
90+ }
91+ }
92+ else
93+ {
94+ // timeout
95+ for ( int i = 0 ; i < probes . Length ; i ++ )
96+ {
97+ _ = probes [ i ] . ObserveErrors ( ) ;
98+ }
99+ throw new TimeoutException ( ) ;
100+ }
101+
102+ if ( failure > 0 ) return HealthCheckResult . Unhealthy ;
103+ if ( success > 0 ) return HealthCheckResult . Healthy ;
104+ return HealthCheckResult . Inconclusive ;
105+ }
106+
107+ private Task < HealthCheckResult > [ ] ? _reusablePending ;
108+
109+ // The number of pending tasks is determined by the number of endpoints, which doesn't change frequently
110+ // (if at all); consequently, we can often re-use this buffer between health-checks, as long as we're careful.
111+ internal static Task < HealthCheckResult > [ ] GetReusablePending ( ref Task < HealthCheckResult > [ ] ? field , int count )
112+ {
113+ var result = Interlocked . Exchange ( ref field , null ) ;
114+ if ( result is null || result . Length != count )
115+ {
116+ result = count == 0 ? [ ] : new Task < HealthCheckResult > [ count ] ;
117+ }
118+ return result ;
119+ }
120+
121+ internal static void PutReusablePending ( ref Task < HealthCheckResult > [ ] ? field , ref Task < HealthCheckResult > [ ] value )
122+ {
123+ if ( value is { Length : > 0 } )
124+ {
125+ Array . Clear ( value , 0 , value . Length ) ;
126+ Interlocked . Exchange ( ref field , value ) ;
127+ value = [ ] ;
128+ }
129+ }
130+
10131 /// <summary>
11132 /// Evaluate the health of an endpoint.
12133 /// </summary>
13- public async Task < HealthCheckResult > CheckHealthAsync ( IConnectionMultiplexer multiplexer , EndPoint endpoint )
134+ public Task < HealthCheckResult > CheckHealthAsync ( IServer server )
135+ => server . IsConnected ? CheckHealthCoreAsync ( server ) : HealthCheckProbe . UnhealthyTask ;
136+
137+ private async Task < HealthCheckResult > CheckHealthCoreAsync ( IServer server )
14138 {
15139 try
16140 {
@@ -20,7 +144,7 @@ public async Task<HealthCheckResult> CheckHealthAsync(IConnectionMultiplexer mul
20144 HealthCheckResult probeResult ;
21145 try
22146 {
23- var pendingProbe = Probe . CheckHealthAsync ( this , multiplexer , endpoint ) ;
147+ var pendingProbe = Probe . CheckHealthAsync ( this , server ) ;
24148 probeResult = await pendingProbe . TimeoutAfter ( timeout ) . ForAwait ( )
25149 ? await pendingProbe . ForAwait ( ) // completed
26150 : HealthCheckResult . Unhealthy ; // timeout
0 commit comments