Skip to content

Commit 4b241af

Browse files
committed
PR feedback: make isCompleted a parameter
1 parent 3f6f282 commit 4b241af

1 file changed

Lines changed: 21 additions & 5 deletions

File tree

src/ModelContextProtocol/Server/DistributedCacheEventStreamStore.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,17 @@ public DistributedCacheEventStreamWriter(
160160

161161
public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken cancellationToken = default)
162162
{
163+
ThrowIfDisposed();
164+
163165
LogStreamModeChanged(_sessionId, _streamId, mode);
164166
_mode = mode;
165-
await UpdateMetadataAsync(cancellationToken).ConfigureAwait(false);
167+
await UpdateMetadataAsync(isCompleted: false, cancellationToken).ConfigureAwait(false);
166168
}
167169

168170
public async ValueTask<SseItem<JsonRpcMessage?>> WriteEventAsync(SseItem<JsonRpcMessage?> sseItem, CancellationToken cancellationToken = default)
169171
{
172+
ThrowIfDisposed();
173+
170174
// Skip if already has an event ID
171175
if (sseItem.EventId is not null)
172176
{
@@ -200,18 +204,18 @@ public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken c
200204
}, cancellationToken).ConfigureAwait(false);
201205

202206
// Update metadata with the latest sequence
203-
await UpdateMetadataAsync(cancellationToken).ConfigureAwait(false);
207+
await UpdateMetadataAsync(isCompleted: false, cancellationToken).ConfigureAwait(false);
204208

205209
LogEventWritten(_sessionId, _streamId, eventId, sequence);
206210
return newItem;
207211
}
208212

209-
private async ValueTask UpdateMetadataAsync(CancellationToken cancellationToken)
213+
private async ValueTask UpdateMetadataAsync(bool isCompleted, CancellationToken cancellationToken)
210214
{
211215
var metadata = new StreamMetadata
212216
{
213217
Mode = _mode,
214-
IsCompleted = _disposed,
218+
IsCompleted = isCompleted,
215219
LastSequence = Interlocked.Read(ref _sequence),
216220
};
217221

@@ -225,6 +229,18 @@ private async ValueTask UpdateMetadataAsync(CancellationToken cancellationToken)
225229
}, cancellationToken).ConfigureAwait(false);
226230
}
227231

232+
private void ThrowIfDisposed()
233+
{
234+
#if NET
235+
ObjectDisposedException.ThrowIf(_disposed, this);
236+
#else
237+
if (_disposed)
238+
{
239+
throw new ObjectDisposedException(nameof(DistributedCacheEventStreamWriter));
240+
}
241+
#endif
242+
}
243+
228244
public async ValueTask DisposeAsync()
229245
{
230246
if (_disposed)
@@ -235,7 +251,7 @@ public async ValueTask DisposeAsync()
235251
_disposed = true;
236252

237253
// Mark the stream as completed in the metadata
238-
await UpdateMetadataAsync(CancellationToken.None).ConfigureAwait(false);
254+
await UpdateMetadataAsync(isCompleted: true, CancellationToken.None).ConfigureAwait(false);
239255
LogStreamWriterDisposed(_sessionId, _streamId, Interlocked.Read(ref _sequence));
240256
}
241257

0 commit comments

Comments
 (0)