Skip to content

Commit 93294d0

Browse files
rbenzingclaude
andcommitted
fix: replace lock() with SemaphoreSlim in ChatSession, SessionManager bundle lock, and GroupSession
- ChatSession: introduce _historyLock (SemaphoreSlim) to replace lock(_messageHistory) in GetMessageHistory, GetMessageCount, and ClearMessageHistoryInternal; remove incorrect lock(_sessionLock) in SetMetadata since Metadata is already ConcurrentDictionary; dispose _historyLock in Dispose() - SessionManager: change _localBundleLock from object to SemaphoreSlim(1,1); replace all four lock(_localBundleLock) blocks with Wait()/WaitAsync()+Release() patterns; use await WaitAsync in ReplenishOPKsAsync (async method); dispose _localBundleLock in Dispose() - GroupSession: change _seenMessageIds value type from HashSet<string> to ConcurrentDictionary<string, byte> (concurrent set); remove lock(senderMessageIds) blocks in ValidateGroupMessage and RecordMessageSeen; use TryAdd/ContainsKey for thread-safe check-and-add - Add ChatSession_ConcurrentHistoryAccess_DoesNotDeadlock test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 3badc95 commit 93294d0

5 files changed

Lines changed: 84 additions & 34 deletions

File tree

LibEmiddle.Tests.Unit/ChatSessionTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Microsoft.VisualStudio.TestTools.UnitTesting;
22
using System;
3+
using System.Collections.Generic;
34
using System.Linq;
45
using System.Threading.Tasks;
56
using LibEmiddle.Core;
@@ -403,6 +404,17 @@ public async Task ChatSession_Dispose_ShouldTerminateSession()
403404
await Assert.ThrowsExceptionAsync<ObjectDisposedException>(() => _aliceChatSession.ActivateAsync());
404405
}
405406

407+
[TestMethod]
408+
public async Task ChatSession_ConcurrentHistoryAccess_DoesNotDeadlock()
409+
{
410+
var tasks = Enumerable.Range(0, 10).Select(_ => Task.Run(() =>
411+
{
412+
var history = _aliceChatSession.GetMessageHistory(10, 0);
413+
Assert.IsNotNull(history);
414+
}));
415+
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(5));
416+
}
417+
406418
[TestMethod]
407419
public async Task ChatSession_MessagePagination_ShouldReturnCorrectSubset()
408420
{

LibEmiddle/Messaging/Chat/ChatSession.cs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ChatSession : IChatSession, ISession, IDisposable
4545

4646
// Collections
4747
private readonly List<MessageRecord> _messageHistory = new();
48+
private readonly SemaphoreSlim _historyLock = new(1, 1);
4849
private const int MaxTrackedMessages = 100;
4950
public ConcurrentDictionary<string, string> Metadata { get; } = new();
5051

@@ -499,20 +500,30 @@ public bool IsValid()
499500
public IReadOnlyCollection<MessageRecord> GetMessageHistory(int limit = 100, int startIndex = 0)
500501
{
501502
ThrowIfDisposed();
502-
lock (_messageHistory)
503+
_historyLock.Wait();
504+
try
503505
{
504506
return _messageHistory.Skip(Math.Max(0, startIndex)).Take(Math.Max(0, limit)).ToList().AsReadOnly();
505507
}
508+
finally
509+
{
510+
_historyLock.Release();
511+
}
506512
}
507513

508514
/// <summary> Gets the count of messages in the history. </summary>
509515
public int GetMessageCount()
510516
{
511517
ThrowIfDisposed();
512-
lock (_messageHistory)
518+
_historyLock.Wait();
519+
try
513520
{
514521
return _messageHistory.Count;
515522
}
523+
finally
524+
{
525+
_historyLock.Release();
526+
}
516527
}
517528

518529
/// <summary> Clears the message history, zeroing plaintext content before removal. </summary>
@@ -524,7 +535,8 @@ public int ClearMessageHistory()
524535

525536
private int ClearMessageHistoryInternal()
526537
{
527-
lock (_messageHistory)
538+
_historyLock.Wait();
539+
try
528540
{
529541
int count = _messageHistory.Count;
530542
foreach (var record in _messageHistory)
@@ -534,6 +546,10 @@ private int ClearMessageHistoryInternal()
534546
_messageHistory.Clear();
535547
return count;
536548
}
549+
finally
550+
{
551+
_historyLock.Release();
552+
}
537553
}
538554

539555
/// <summary>
@@ -601,11 +617,8 @@ public void SetMetadata(string key, string value)
601617
{
602618
ThrowIfDisposed();
603619
if (string.IsNullOrEmpty(key)) throw new ArgumentException("Metadata key cannot be null or empty.", nameof(key));
604-
// Dictionary is not thread-safe by default, lock if concurrent access is possible
605-
lock (_sessionLock) // Or use ConcurrentDictionary for Metadata
606-
{
607-
Metadata[key] = value;
608-
}
620+
// Metadata is a ConcurrentDictionary — writes are already thread-safe without locking.
621+
Metadata[key] = value;
609622
}
610623

611624
/// <summary> Raises the StateChanged event. </summary>
@@ -890,7 +903,17 @@ protected virtual void Dispose(bool disposing)
890903
}
891904
finally
892905
{
893-
// Dispose the semaphore outside of any lock usage
906+
// Dispose the history semaphore
907+
try
908+
{
909+
_historyLock.Dispose();
910+
}
911+
catch (ObjectDisposedException)
912+
{
913+
// Already disposed
914+
}
915+
916+
// Dispose the session semaphore outside of any lock usage
894917
try
895918
{
896919
lockToDispose?.Dispose();

LibEmiddle/Messaging/Group/GroupSession.Validation.cs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,14 @@ private bool ValidateGroupMessage(EncryptedGroupMessage message)
3838
if (!string.IsNullOrEmpty(message.MessageId))
3939
{
4040
string senderId = GetMemberId(message.SenderIdentityKey!);
41-
var senderMessageIds = _seenMessageIds.GetOrAdd(senderId, _ => new HashSet<string>());
41+
var senderMessageIds = _seenMessageIds.GetOrAdd(senderId, _ => new ConcurrentDictionary<string, byte>());
4242

43-
lock (senderMessageIds)
43+
if (senderMessageIds.ContainsKey(message.MessageId))
4444
{
45-
if (senderMessageIds.Contains(message.MessageId))
46-
{
47-
LoggingManager.LogSecurityEvent(nameof(GroupSession), "Message ID replay detected", isAlert: true);
48-
return false;
49-
}
50-
// NOTE: Do NOT add to senderMessageIds here — that happens after successful decryption
45+
LoggingManager.LogSecurityEvent(nameof(GroupSession), "Message ID replay detected", isAlert: true);
46+
return false;
5147
}
48+
// NOTE: Do NOT add to senderMessageIds here — that happens after successful decryption
5249
}
5350

5451
// When the message carries a MessageId the _seenMessageIds check above provides complete replay
@@ -118,19 +115,16 @@ private void RecordMessageSeen(EncryptedGroupMessage message)
118115
// Register the message ID so subsequent presentations of the same message are rejected
119116
if (!string.IsNullOrEmpty(message.MessageId))
120117
{
121-
var senderMessageIds = _seenMessageIds.GetOrAdd(senderId, _ => new HashSet<string>());
122-
lock (senderMessageIds)
123-
{
124-
senderMessageIds.Add(message.MessageId);
118+
var senderMessageIds = _seenMessageIds.GetOrAdd(senderId, _ => new ConcurrentDictionary<string, byte>());
119+
senderMessageIds.TryAdd(message.MessageId, 0);
125120

126-
// Cap the set at 1000 entries per sender to prevent unbounded memory growth
127-
if (senderMessageIds.Count > 1000)
121+
// Cap the set at 1000 entries per sender to prevent unbounded memory growth
122+
if (senderMessageIds.Count > 1000)
123+
{
124+
var oldestIds = senderMessageIds.Keys.Take(senderMessageIds.Count - 1000).ToList();
125+
foreach (var oldId in oldestIds)
128126
{
129-
var oldestIds = senderMessageIds.Take(senderMessageIds.Count - 1000).ToList();
130-
foreach (var oldId in oldestIds)
131-
{
132-
senderMessageIds.Remove(oldId);
133-
}
127+
senderMessageIds.TryRemove(oldId, out _);
134128
}
135129
}
136130
}

LibEmiddle/Messaging/Group/GroupSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public sealed partial class GroupSession : IGroupSession, ISession, IDisposable
3636
// Message tracking for replay protection
3737
private readonly ConcurrentDictionary<string, long> _lastSeenSequence = new();
3838
private readonly ConcurrentDictionary<string, long> _joinTimestamps = new();
39-
private readonly ConcurrentDictionary<string, HashSet<string>> _seenMessageIds = new();
39+
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, byte>> _seenMessageIds = new();
4040

4141
// Enhanced Group Management
4242
private readonly ConcurrentDictionary<string, GroupInvitation> _activeInvitations = new();

LibEmiddle/Sessions/SessionManager.cs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class SessionManager(
5555
/// the existing OPK count. Updated via <see cref="RegisterLocalKeyBundleAsync"/>.
5656
/// </summary>
5757
private X3DHKeyBundle? _localKeyBundle;
58-
private readonly object _localBundleLock = new object();
58+
private readonly SemaphoreSlim _localBundleLock = new(1, 1);
5959

6060
private readonly ConcurrentDictionary<string, ISession> _activeSessions = new();
6161

@@ -643,10 +643,15 @@ public Task RegisterLocalKeyBundleAsync(X3DHKeyBundle bundle)
643643
{
644644
ArgumentNullException.ThrowIfNull(bundle);
645645

646-
lock (_localBundleLock)
646+
_localBundleLock.Wait();
647+
try
647648
{
648649
_localKeyBundle = bundle;
649650
}
651+
finally
652+
{
653+
_localBundleLock.Release();
654+
}
650655

651656
// Configure the replenishment callback now that we have a bundle reference.
652657
_opkManager.SetReplenishmentCallback(async count =>
@@ -664,10 +669,15 @@ public Task RegisterLocalKeyBundleAsync(X3DHKeyBundle bundle)
664669
public IReadOnlyList<uint> GetAvailableOPKIds()
665670
{
666671
X3DHKeyBundle? bundle;
667-
lock (_localBundleLock)
672+
_localBundleLock.Wait();
673+
try
668674
{
669675
bundle = _localKeyBundle;
670676
}
677+
finally
678+
{
679+
_localBundleLock.Release();
680+
}
671681

672682
if (bundle == null)
673683
return Array.Empty<uint>();
@@ -682,10 +692,15 @@ public IReadOnlyList<uint> GetAvailableOPKIds()
682692
private async Task ReplenishOPKsAsync(int count)
683693
{
684694
X3DHKeyBundle? bundle;
685-
lock (_localBundleLock)
695+
await _localBundleLock.WaitAsync().ConfigureAwait(false);
696+
try
686697
{
687698
bundle = _localKeyBundle;
688699
}
700+
finally
701+
{
702+
_localBundleLock.Release();
703+
}
689704

690705
if (bundle == null)
691706
{
@@ -699,7 +714,8 @@ private async Task ReplenishOPKsAsync(int count)
699714
var tempBundle = await _x3dhProtocol.CreateKeyBundleAsync(_identityKeyPair, count);
700715

701716
// Copy the new OPKs (public + private) into the existing bundle.
702-
lock (_localBundleLock)
717+
await _localBundleLock.WaitAsync().ConfigureAwait(false);
718+
try
703719
{
704720
for (int i = 0; i < tempBundle.OneTimePreKeyIds.Count; i++)
705721
{
@@ -718,6 +734,10 @@ private async Task ReplenishOPKsAsync(int count)
718734
}
719735
}
720736
}
737+
finally
738+
{
739+
_localBundleLock.Release();
740+
}
721741

722742
int available = _opkManager.GetAvailableCount(bundle.OneTimePreKeyIds);
723743
LoggingManager.LogInformation("SessionManager",
@@ -866,6 +886,7 @@ protected virtual void Dispose(bool disposing)
866886
if (disposing)
867887
{
868888
_operationLock.Dispose();
889+
_localBundleLock.Dispose();
869890

870891
// Dispose all active sessions
871892
foreach (var session in _activeSessions.Values)

0 commit comments

Comments
 (0)