-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathConcurrentMemoryStream.cs
More file actions
152 lines (105 loc) · 4.83 KB
/
ConcurrentMemoryStream.cs
File metadata and controls
152 lines (105 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
using Gsemac.Collections;
using Gsemac.IO.Properties;
using System;
using System.IO;
using System.Threading;
namespace Gsemac.IO {
/// <summary>
/// Provides a view of a sequence of bytes that supports concurrent read/write operations.
/// </summary>
public class ConcurrentMemoryStream :
Stream {
// Public members
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override bool CanTimeout => true;
public override long Length {
get {
lock (streamBuffer)
return streamBuffer.Length;
}
}
public override long Position {
get => throw new NotSupportedException(ExceptionMessages.StreamDoesNotSupportSeeking);
set => throw new NotSupportedException(ExceptionMessages.StreamDoesNotSupportSeeking);
}
public override int ReadTimeout { get; set; } = Timeout.Infinite;
public override int WriteTimeout { get; set; } = Timeout.Infinite;
/// <summary>
/// If <see langword="true" />, reads will block until data is available.
/// </summary>
public bool Blocking {
get => isBlocking;
set => isBlocking = value;
}
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentMemoryStream"/> class.
/// </summary>
public ConcurrentMemoryStream() {
}
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentMemoryStream"/> class.
/// </summary>
/// <param name="bufferSize">Initial size of the underlying buffer.</param>
public ConcurrentMemoryStream(int bufferSize) {
if (bufferSize < 0)
throw new ArgumentOutOfRangeException(nameof(bufferSize), ExceptionMessages.CapacityMustBePositive);
streamBuffer.Capacity = bufferSize;
}
public override void Flush() {
}
public override int Read(byte[] buffer, int offset, int count) {
if (buffer is null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset), Core.Properties.ExceptionMessages.NonNegativeNumberRequired);
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count), Core.Properties.ExceptionMessages.NonNegativeNumberRequired);
// Closed streams can be read from, but cannot be written to.
// A closed stream indicates that there is no more data to write it.
lock (streamBuffer) {
while (streamBuffer.Length <= 0 && isBlocking && !isClosed)
if (!Monitor.Wait(streamBuffer, ReadTimeout))
throw new TimeoutException();
return streamBuffer.Read(buffer, offset, count);
}
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(ExceptionMessages.StreamDoesNotSupportSeeking);
public override void SetLength(long value) => throw new NotSupportedException(ExceptionMessages.StreamDoesNotSupportThisOperation);
public override void Write(byte[] buffer, int offset, int count) {
if (buffer is null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentOutOfRangeException(nameof(offset), Core.Properties.ExceptionMessages.NonNegativeNumberRequired);
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count), Core.Properties.ExceptionMessages.NonNegativeNumberRequired);
lock (streamBuffer) {
if (isClosed)
throw new ObjectDisposedException(null, ExceptionMessages.CannotAccessAClosedStream);
streamBuffer.Write(buffer, offset, count);
// PulseAll is used instead of Pulse because a single reading thread may not read all of the available data.
if (count > 0)
Monitor.PulseAll(streamBuffer);
}
}
public override void Close() {
lock (streamBuffer) {
if (!isClosed) {
isClosed = true;
Monitor.PulseAll(streamBuffer);
}
}
}
// Protected members
protected override void Dispose(bool disposing) {
if (disposing)
Close();
base.Dispose(disposing);
}
// Private members
private readonly CircularBuffer<byte> streamBuffer = new CircularBuffer<byte>();
private volatile bool isBlocking = false;
private volatile bool isClosed = false;
}
}