Skip to content

Commit 8349c04

Browse files
committed
add option to use the thread-pool for scheduling
1 parent 6b20e13 commit 8349c04

3 files changed

Lines changed: 111 additions & 24 deletions

File tree

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.IO;
5+
using System.IO.Pipelines;
56
using System.Runtime.CompilerServices;
67
using System.Text;
78
using System.Threading;
@@ -768,7 +769,7 @@ private bool PushToBacklog(Message message, bool onlyIfExists)
768769
[MethodImpl(MethodImplOptions.AggressiveInlining)]
769770
private void StartBacklogProcessor()
770771
{
771-
var sched = Multiplexer.SocketManager?.SchedulerPool ?? DedicatedThreadPoolPipeScheduler.Default;
772+
var sched = Multiplexer.SocketManager?.Scheduler ?? PipeScheduler.ThreadPool;
772773
#if DEBUG
773774
_backlogProcessorRequestedTime = Environment.TickCount;
774775
#endif

src/StackExchange.Redis/SocketManager.cs

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,61 @@ public sealed partial class SocketManager : IDisposable
2424
/// </summary>
2525
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
2626
public SocketManager(string name)
27-
: this(name, DEFAULT_WORKERS, false) { }
27+
: this(name, DEFAULT_WORKERS, SocketManagerOptions.None) { }
2828

2929
/// <summary>
3030
/// Creates a new <see cref="SocketManager"/> instance
3131
/// </summary>
3232
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
3333
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
3434
public SocketManager(string name, bool useHighPrioritySocketThreads)
35-
: this(name, DEFAULT_WORKERS, useHighPrioritySocketThreads) { }
35+
: this(name, DEFAULT_WORKERS, UseHighPrioritySocketThreads(useHighPrioritySocketThreads)) { }
3636

3737
/// <summary>
3838
/// Creates a new (optionally named) <see cref="SocketManager"/> instance
3939
/// </summary>
4040
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
4141
/// <param name="workerCount">the number of dedicated workers for this <see cref="SocketManager"/>.</param>
4242
/// <param name="useHighPrioritySocketThreads">Whether this <see cref="SocketManager"/> should use high priority sockets.</param>
43-
public SocketManager(string name = null, int workerCount = 0, bool useHighPrioritySocketThreads = false)
43+
public SocketManager(string name, int workerCount, bool useHighPrioritySocketThreads)
44+
: this(name, workerCount, UseHighPrioritySocketThreads(useHighPrioritySocketThreads)) {}
45+
46+
private static SocketManagerOptions UseHighPrioritySocketThreads(bool value)
47+
=> value ? SocketManagerOptions.UseHighPrioritySocketThreads : SocketManagerOptions.None;
48+
49+
/// <summary>
50+
/// Additional options for configuring the socket manager
51+
/// </summary>
52+
[Flags]
53+
public enum SocketManagerOptions
54+
{
55+
/// <summary>
56+
/// No additional options
57+
/// </summary>
58+
None = 0,
59+
/// <summary>
60+
/// Whether the <see cref="SocketManager"/> should use high priority sockets.
61+
/// </summary>
62+
UseHighPrioritySocketThreads = 1 << 0,
63+
/// <summary>
64+
/// Use the regular thread-pool for all scheduling
65+
/// </summary>
66+
UseThreadPool = 1 << 1,
67+
}
68+
69+
/// <summary>
70+
/// Creates a new (optionally named) <see cref="SocketManager"/> instance
71+
/// </summary>
72+
/// <param name="name">The name for this <see cref="SocketManager"/>.</param>
73+
/// <param name="workerCount">the number of dedicated workers for this <see cref="SocketManager"/>.</param>
74+
/// <param name="options"></param>
75+
public SocketManager(string name = null, int workerCount = 0, SocketManagerOptions options = SocketManagerOptions.None)
4476
{
4577
if (string.IsNullOrWhiteSpace(name)) name = GetType().Name;
4678
if (workerCount <= 0) workerCount = DEFAULT_WORKERS;
4779
Name = name;
80+
bool useHighPrioritySocketThreads = (options & SocketManagerOptions.UseHighPrioritySocketThreads) != 0,
81+
useThreadPool = (options & SocketManagerOptions.UseThreadPool) != 0;
4882

4983
const long Receive_PauseWriterThreshold = 4L * 1024 * 1024 * 1024; // receive: let's give it up to 4GiB of buffer for now
5084
const long Receive_ResumeWriterThreshold = 3L * 1024 * 1024 * 1024; // (large replies get crazy big)
@@ -58,45 +92,70 @@ public SocketManager(string name = null, int workerCount = 0, bool useHighPriori
5892
Send_PauseWriterThreshold / 2,
5993
defaultPipeOptions.ResumeWriterThreshold);
6094

61-
_schedulerPool = new DedicatedThreadPoolPipeScheduler(name + ":IO",
62-
workerCount: workerCount,
63-
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
95+
Scheduler = PipeScheduler.ThreadPool;
96+
if (!useThreadPool)
97+
{
98+
Scheduler = new DedicatedThreadPoolPipeScheduler(name + ":IO",
99+
workerCount: workerCount,
100+
priority: useHighPrioritySocketThreads ? ThreadPriority.AboveNormal : ThreadPriority.Normal);
101+
}
64102
SendPipeOptions = new PipeOptions(
65103
pool: defaultPipeOptions.Pool,
66-
readerScheduler: _schedulerPool,
67-
writerScheduler: _schedulerPool,
104+
readerScheduler: Scheduler,
105+
writerScheduler: Scheduler,
68106
pauseWriterThreshold: Send_PauseWriterThreshold,
69107
resumeWriterThreshold: Send_ResumeWriterThreshold,
70108
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
71109
useSynchronizationContext: false);
72110
ReceivePipeOptions = new PipeOptions(
73111
pool: defaultPipeOptions.Pool,
74-
readerScheduler: _schedulerPool,
75-
writerScheduler: _schedulerPool,
112+
readerScheduler: Scheduler,
113+
writerScheduler: Scheduler,
76114
pauseWriterThreshold: Receive_PauseWriterThreshold,
77115
resumeWriterThreshold: Receive_ResumeWriterThreshold,
78116
minimumSegmentSize: Math.Max(defaultPipeOptions.MinimumSegmentSize, MINIMUM_SEGMENT_SIZE),
79117
useSynchronizationContext: false);
80118
}
81119

82120
/// <summary>
83-
/// Default / shared socket manager
121+
/// Default / shared socket manager using a dedicated thread-pool
84122
/// </summary>
85123
public static SocketManager Shared
86124
{
87125
get
88126
{
89-
var shared = _shared;
90-
if (shared != null) return _shared;
127+
var shared = s_shared;
128+
if (shared != null) return shared;
91129
try
92130
{
93131
// note: we'll allow a higher max thread count on the shared one
94132
shared = new SocketManager("DefaultSocketManager", DEFAULT_WORKERS * 2, false);
95-
if (Interlocked.CompareExchange(ref _shared, shared, null) == null)
133+
if (Interlocked.CompareExchange(ref s_shared, shared, null) == null)
134+
shared = null;
135+
}
136+
finally { shared?.Dispose(); }
137+
return Volatile.Read(ref s_shared);
138+
}
139+
}
140+
141+
/// <summary>
142+
/// Shared socket manager using the main thread-pool
143+
/// </summary>
144+
public static SocketManager ThreadPool
145+
{
146+
get
147+
{
148+
var shared = s_threadPool;
149+
if (shared != null) return shared;
150+
try
151+
{
152+
// note: we'll allow a higher max thread count on the shared one
153+
shared = new SocketManager("ThreadPoolSocketManager", options: SocketManagerOptions.UseThreadPool);
154+
if (Interlocked.CompareExchange(ref s_threadPool, shared, null) == null)
96155
shared = null;
97156
}
98157
finally { shared?.Dispose(); }
99-
return Volatile.Read(ref _shared);
158+
return Volatile.Read(ref s_threadPool);
100159
}
101160
}
102161

@@ -105,18 +164,19 @@ public static SocketManager Shared
105164
public override string ToString()
106165
{
107166
var scheduler = SchedulerPool;
108-
109-
return $"scheduler - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}";
167+
if (scheduler == null) return Name;
168+
return $"{Name} - queue: {scheduler?.TotalServicedByQueue}, pool: {scheduler?.TotalServicedByPool}";
110169
}
111170

112-
private static SocketManager _shared;
171+
private static SocketManager s_shared, s_threadPool;
113172

114173
private const int DEFAULT_WORKERS = 5, MINIMUM_SEGMENT_SIZE = 8 * 1024;
115174

116-
private DedicatedThreadPoolPipeScheduler _schedulerPool;
117175
internal readonly PipeOptions SendPipeOptions, ReceivePipeOptions;
118176

119-
internal DedicatedThreadPoolPipeScheduler SchedulerPool => _schedulerPool;
177+
internal PipeScheduler Scheduler { get; private set; }
178+
179+
internal DedicatedThreadPoolPipeScheduler SchedulerPool => Scheduler as DedicatedThreadPoolPipeScheduler;
120180

121181
private enum CallbackOperation
122182
{
@@ -134,8 +194,9 @@ private void Dispose(bool disposing)
134194
// note: the scheduler *can't* be collected by itself - there will
135195
// be threads, and those threads will be rooting the DedicatedThreadPool;
136196
// but: we can lend a hand! We need to do this even in the finalizer
137-
try { _schedulerPool?.Dispose(); } catch { }
138-
_schedulerPool = null;
197+
var tmp = SchedulerPool;
198+
Scheduler = PipeScheduler.ThreadPool;
199+
try { tmp?.Dispose(); } catch { }
139200
if (disposing)
140201
{
141202
GC.SuppressFinalize(this);
@@ -167,7 +228,7 @@ internal static Socket CreateSocket(EndPoint endpoint)
167228

168229
internal string GetState()
169230
{
170-
var s = _schedulerPool;
231+
var s = SchedulerPool;
171232
return s == null ? null : $"{s.AvailableCount} of {s.WorkerCount} available";
172233
}
173234
}

tests/StackExchange.Redis.Tests/Config.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
using System;
22
using System.IO;
3+
using System.IO.Pipelines;
34
using System.Linq;
45
using System.Net;
56
using System.Net.Sockets;
67
using System.Security.Authentication;
78
using System.Threading.Tasks;
9+
using Pipelines.Sockets.Unofficial;
810
using Xunit;
911
using Xunit.Abstractions;
1012

@@ -403,5 +405,28 @@ public void EndpointIteratorIsReliableOverChanges()
403405
Assert.Equal(8001, ((IPEndPoint)iter.Current).Port);
404406
Assert.False(iter.MoveNext());
405407
}
408+
409+
[Fact]
410+
public void ThreadPoolManagerIsDetected()
411+
{
412+
var config = new ConfigurationOptions
413+
{
414+
EndPoints = { { IPAddress.Loopback, 6379 } },
415+
SocketManager = SocketManager.ThreadPool
416+
};
417+
using var muxer = ConnectionMultiplexer.Connect(config);
418+
Assert.Same(PipeScheduler.ThreadPool, muxer.SocketManager.Scheduler);
419+
}
420+
421+
[Fact]
422+
public void DefaultThreadPoolManagerIsDetected()
423+
{
424+
var config = new ConfigurationOptions
425+
{
426+
EndPoints = { { IPAddress.Loopback, 6379 } },
427+
};
428+
using var muxer = ConnectionMultiplexer.Connect(config);
429+
Assert.Same(SocketManager.Shared.Scheduler, muxer.SocketManager.Scheduler);
430+
}
406431
}
407432
}

0 commit comments

Comments
 (0)