Skip to content

Commit 9ef2175

Browse files
committed
feat: avoiding redis deadlocks
1 parent 1cfb63d commit 9ef2175

7 files changed

Lines changed: 155 additions & 105 deletions

File tree

src/ByteSync.Client/Services/Synchronizations/SynchronizationActionRemoteUploader.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public async Task UploadForRemote(SharedActionsGroup sharedActionsGroup)
9797
try
9898
{
9999
await CloseAndUploadCurrentMultiZip();
100+
101+
// await Task.Delay(3000);
100102
}
101103
catch (Exception ex)
102104
{
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace ByteSync.ServerCommon.Business.Repositories;
2+
3+
public class TrackingActionUpdateHandlerResult
4+
{
5+
public TrackingActionUpdateHandlerResult(bool isSuccess)
6+
{
7+
IsSuccess = isSuccess;
8+
}
9+
10+
public bool IsSuccess { get; set; }
11+
12+
public int FinishedActionsCount { get; set; }
13+
14+
public int ErrorsCount { get; set; }
15+
16+
public long ProcessedVolume { get; set; }
17+
18+
public long ExchangedVolume { get; set; }
19+
20+
public bool IsAChange
21+
{
22+
get
23+
{
24+
return IsSuccess && (FinishedActionsCount > 0 || ErrorsCount > 0 || ProcessedVolume > 0 || ExchangedVolume > 0);
25+
}
26+
}
27+
}

src/ByteSync.ServerCommon/Interfaces/Repositories/ITrackingActionRepository.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ public interface ITrackingActionRepository : IRepository<TrackingActionEntity>
77
{
88
Task<TrackingActionEntity> GetOrBuild(string sessionId, string key);
99

10-
Task<TrackingActionResult> AddOrUpdate(string sessionId, List<string> actionsGroupIds, bool updateSynchronization,
11-
Func<TrackingActionEntity, SynchronizationEntity, bool> updateHandler);
10+
Task<TrackingActionResult> AddOrUpdate(string sessionId, List<string> actionsGroupIds,
11+
Func<TrackingActionEntity, SynchronizationEntity, TrackingActionUpdateHandlerResult> updateHandler);
1212
}

src/ByteSync.ServerCommon/Repositories/TrackingActionRepository.cs

Lines changed: 87 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ public class TrackingActionRepository : BaseRepository<TrackingActionEntity>, IT
1616
private readonly ITrackingActionEntityFactory _trackingActionEntityFactory;
1717
private readonly ICacheRepository<SynchronizationEntity> _synchronizationCacheRepository;
1818
private readonly ILogger<TrackingActionRepository> _logger;
19-
20-
public TrackingActionRepository(IRedisInfrastructureService redisInfrastructureService, ISynchronizationRepository synchronizationRepository,
19+
20+
public TrackingActionRepository(IRedisInfrastructureService redisInfrastructureService, ISynchronizationRepository synchronizationRepository,
2121
ITrackingActionEntityFactory trackingActionEntityFactory, ICacheRepository<TrackingActionEntity> cacheRepository,
22-
ICacheRepository<SynchronizationEntity> synchronizationCacheRepository, ILogger<TrackingActionRepository> logger)
22+
ICacheRepository<SynchronizationEntity> synchronizationCacheRepository, ILogger<TrackingActionRepository> logger)
2323
: base(redisInfrastructureService, cacheRepository)
2424
{
2525
_redisInfrastructureService = redisInfrastructureService;
@@ -30,11 +30,11 @@ public TrackingActionRepository(IRedisInfrastructureService redisInfrastructureS
3030
}
3131

3232
public override EntityType EntityType => EntityType.TrackingAction;
33-
33+
3434
public async Task<TrackingActionEntity> GetOrBuild(string sessionId, string actionsGroupId)
3535
{
3636
var cacheKey = _redisInfrastructureService.ComputeCacheKey(EntityType, $"{sessionId}_{actionsGroupId}");
37-
37+
3838
await using var actionsGroupIdLock = await _redisInfrastructureService.AcquireLockAsync(cacheKey);
3939

4040
return await DoGetOrBuild(sessionId, actionsGroupId, cacheKey, actionsGroupIdLock);
@@ -43,96 +43,111 @@ public async Task<TrackingActionEntity> GetOrBuild(string sessionId, string acti
4343
private async Task<TrackingActionEntity> DoGetOrBuild(string sessionId, string actionsGroupId, CacheKey cacheKey, IRedLock actionsGroupIdLock)
4444
{
4545
var trackingActionEntity = await Get($"{sessionId}_{actionsGroupId}");
46-
46+
4747
if (trackingActionEntity == null)
4848
{
4949
trackingActionEntity = await _trackingActionEntityFactory.Create(sessionId, actionsGroupId);
50-
50+
5151
await Save(cacheKey, trackingActionEntity, null, actionsGroupIdLock);
5252
}
53-
54-
return trackingActionEntity;
55-
}
56-
57-
public async Task<TrackingActionResult> AddOrUpdate(string sessionId, List<string> actionsGroupIds, bool updateSynchronization,
58-
Func<TrackingActionEntity, SynchronizationEntity, bool> updateHandler)
59-
{
60-
CacheKey? synchronizationCacheKey = null;
61-
IRedLock? synchronizationLock = null;
62-
63-
var locks = new ConcurrentBag<IAsyncDisposable>(); // Thread-safe
64-
var trackingActionEntities = new ConcurrentBag<TrackingActionEntity>();
65-
bool areAllUpdated = true;
6653

67-
if (updateSynchronization)
68-
{
69-
synchronizationCacheKey = _redisInfrastructureService.ComputeCacheKey(EntityType.Synchronization, sessionId);
70-
synchronizationLock = await _redisInfrastructureService.AcquireLockAsync(synchronizationCacheKey);
71-
locks.Add(synchronizationLock);
54+
return trackingActionEntity;
7255
}
7356

74-
var synchronizationEntity = await _synchronizationRepository.Get(sessionId);
75-
if (synchronizationEntity == null)
57+
public async Task<TrackingActionResult> AddOrUpdate(string sessionId, List<string> actionsGroupIds,
58+
Func<TrackingActionEntity, SynchronizationEntity, TrackingActionUpdateHandlerResult> updateHandler)
7659
{
77-
throw new InvalidOperationException($"SynchronizationEntity for session {sessionId} not found");
78-
}
60+
// CacheKey? synchronizationCacheKey = null;
61+
// IRedLock? synchronizationLock = null;
62+
63+
var locks = new ConcurrentBag<IAsyncDisposable>(); // Thread-safe
64+
var trackingActionEntities = new ConcurrentBag<TrackingActionEntity>();
65+
var updateHandlerResults = new List<TrackingActionUpdateHandlerResult>();
66+
// bool areAllUpdated = true;
67+
68+
// if (updateSynchronization)
69+
// {
70+
// synchronizationCacheKey = _redisInfrastructureService.ComputeCacheKey(EntityType.Synchronization, sessionId);
71+
// synchronizationLock = await _redisInfrastructureService.AcquireLockAsync(synchronizationCacheKey);
72+
// locks.Add(synchronizationLock);
73+
// }
74+
75+
var synchronizationEntity = await _synchronizationRepository.Get(sessionId);
76+
if (synchronizationEntity == null)
77+
{
78+
throw new InvalidOperationException($"SynchronizationEntity for session {sessionId} not found");
79+
}
7980

80-
var transaction = _redisInfrastructureService.OpenTransaction();
81+
var transaction = _redisInfrastructureService.OpenTransaction();
8182

82-
var semaphore = new SemaphoreSlim(20);
83-
var updateFailures = new ConcurrentBag<bool>();
83+
var semaphore = new SemaphoreSlim(20);
84+
var updateFailures = new ConcurrentBag<bool>();
8485

85-
var tasks = actionsGroupIds.Select(async actionsGroupId =>
86-
{
87-
await semaphore.WaitAsync();
88-
try
86+
var tasks = actionsGroupIds.Select(async actionsGroupId =>
8987
{
90-
var cacheKey = _redisInfrastructureService.ComputeCacheKey(EntityType, $"{sessionId}_{actionsGroupId}");
91-
var actionsGroupIdLock = await _redisInfrastructureService.AcquireLockAsync(cacheKey);
92-
locks.Add(actionsGroupIdLock);
93-
94-
var trackingActionEntity = await DoGetOrBuild(sessionId, actionsGroupId, cacheKey, actionsGroupIdLock);
95-
bool isUpdated = updateHandler.Invoke(trackingActionEntity, synchronizationEntity);
96-
97-
if (isUpdated)
88+
await semaphore.WaitAsync();
89+
try
9890
{
99-
await Save(cacheKey, trackingActionEntity, transaction, actionsGroupIdLock);
100-
trackingActionEntities.Add(trackingActionEntity);
91+
var cacheKey = _redisInfrastructureService.ComputeCacheKey(EntityType, $"{sessionId}_{actionsGroupId}");
92+
var actionsGroupIdLock = await _redisInfrastructureService.AcquireLockAsync(cacheKey);
93+
locks.Add(actionsGroupIdLock);
94+
95+
var trackingActionEntity = await DoGetOrBuild(sessionId, actionsGroupId, cacheKey, actionsGroupIdLock);
96+
var updateHandlerResult = updateHandler.Invoke(trackingActionEntity, synchronizationEntity);
97+
98+
if (updateHandlerResult.IsSuccess)
99+
{
100+
await Save(cacheKey, trackingActionEntity, transaction, actionsGroupIdLock);
101+
trackingActionEntities.Add(trackingActionEntity);
102+
updateHandlerResults.Add(updateHandlerResult);
103+
}
104+
else
105+
{
106+
_logger.LogWarning("AddOrUpdate: can not update element {TrackingActionEntity} for session {SessionId}. No element will be updated",
107+
trackingActionEntity.ActionsGroupId, sessionId);
108+
109+
updateFailures.Add(true); // flag pour arrêt
110+
}
101111
}
102-
else
112+
finally
103113
{
104-
_logger.LogWarning("AddOrUpdate: can not update element {TrackingActionEntity} for session {SessionId}. No element will be updated",
105-
trackingActionEntity.ActionsGroupId, sessionId);
106-
107-
updateFailures.Add(true); // flag pour arrêt
114+
semaphore.Release();
108115
}
109-
}
110-
finally
111-
{
112-
semaphore.Release();
113-
}
114-
});
116+
});
115117

116-
await Task.WhenAll(tasks);
118+
await Task.WhenAll(tasks);
117119

118-
areAllUpdated = updateFailures.IsEmpty;
120+
var areAllUpdated = updateFailures.IsEmpty;
119121

120-
if (areAllUpdated)
121-
{
122-
if (updateSynchronization)
122+
if (areAllUpdated)
123123
{
124-
await _synchronizationCacheRepository.Save(synchronizationCacheKey!, synchronizationEntity, transaction, synchronizationLock);
124+
if (updateHandlerResults.Any(uhr => uhr.IsAChange))
125+
{
126+
var synchronizationCacheKey = _redisInfrastructureService.ComputeCacheKey(EntityType.Synchronization, sessionId);
127+
await _synchronizationCacheRepository.AddOrUpdate(synchronizationCacheKey, synEnt =>
128+
{
129+
foreach (var updateHandlerResult in updateHandlerResults)
130+
{
131+
synEnt!.Progress.FinishedActionsCount += updateHandlerResult.FinishedActionsCount;
132+
synEnt.Progress.ErrorsCount += updateHandlerResult.ErrorsCount;
133+
synEnt.Progress.ProcessedVolume += updateHandlerResult.ProcessedVolume;
134+
synEnt.Progress.ExchangedVolume += updateHandlerResult.ExchangedVolume;
135+
}
136+
137+
return synEnt;
138+
}, transaction);
139+
140+
// await _synchronizationCacheRepository.Save(synchronizationCacheKey!, synchronizationEntity, transaction, synchronizationLock);
141+
}
142+
143+
await transaction.ExecuteAsync();
125144
}
126145

127-
await transaction.ExecuteAsync();
128-
}
146+
foreach (var redisLock in locks)
147+
{
148+
await redisLock.DisposeAsync();
149+
}
129150

130-
foreach (var redisLock in locks)
131-
{
132-
await redisLock.DisposeAsync();
151+
return new TrackingActionResult(areAllUpdated, trackingActionEntities.ToList(), synchronizationEntity);
133152
}
134-
135-
return new TrackingActionResult(areAllUpdated, trackingActionEntities.ToList(), synchronizationEntity);
136-
}
137-
138153
}

0 commit comments

Comments
 (0)