Skip to content

Commit 44f22fa

Browse files
committed
feat: improvements, synchronizationStart is now sent to the initiator to avoid timeouts
1 parent 85bf05b commit 44f22fa

12 files changed

Lines changed: 107 additions & 36 deletions

File tree

src/ByteSync.Client/Interfaces/Controls/Communications/Http/ISynchronizationApiClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace ByteSync.Interfaces.Controls.Communications.Http;
77

88
public interface ISynchronizationApiClient
99
{
10-
public Task<Synchronization> StartSynchronization(SynchronizationStartRequest synchronizationStartRequest);
10+
public Task StartSynchronization(SynchronizationStartRequest synchronizationStartRequest);
1111

1212
public Task AssertLocalCopyIsDone(string sessionId, List<string> actionsGroupIds);
1313

src/ByteSync.Client/Services/Communications/Api/SynchronizationApiClient.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@ public SynchronizationApiClient(IApiInvoker apiInvoker, ILogger<SynchronizationA
1717
_logger = logger;
1818
}
1919

20-
public async Task<Synchronization> StartSynchronization(SynchronizationStartRequest synchronizationStartRequest)
20+
public async Task StartSynchronization(SynchronizationStartRequest synchronizationStartRequest)
2121
{
2222
try
2323
{
24-
var result = await _apiInvoker.PostAsync<Synchronization>($"session/{synchronizationStartRequest.SessionId}/synchronization/start",
24+
await _apiInvoker.PostAsync($"session/{synchronizationStartRequest.SessionId}/synchronization/start",
2525
synchronizationStartRequest);
26-
27-
return result;
2826
}
2927
catch (Exception ex)
3028
{

src/ByteSync.Client/Services/Synchronizations/SynchronizationStarter.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,8 @@ private async Task StartCloudSessionSynchronization(CloudSession cloudSession)
9292
SessionId = cloudSession.SessionId,
9393
ActionsGroupDefinitions = actionsGroupDefinitions,
9494
};
95-
var synchronization = await _synchronizationApiClient.StartSynchronization(synchronizationStartRequest);
96-
97-
try
98-
{
99-
await _synchronizationService.OnSynchronizationStarted(synchronization);
100-
}
101-
catch (Exception ex)
102-
{
103-
_logger.LogError(ex, "Error during synchronization loop");
104-
}
95+
96+
await _synchronizationApiClient.StartSynchronization(synchronizationStartRequest);
10597
}
10698

10799
private async Task UploadSynchronizationStartData(CloudSession cloudSession, SharedSynchronizationStartData sharedSynchronizationStartData)

src/ByteSync.Functions/Http/SynchronizationFunction.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ public async Task<HttpResponseData> StartSynchronization(
2727
var client = FunctionHelper.GetClientFromContext(executionContext);
2828
var synchronizationStartRequest = await FunctionHelper.DeserializeRequestBody<SynchronizationStartRequest>(req);
2929

30-
var result = await _synchronizationService.StartSynchronization(sessionId, client, synchronizationStartRequest.ActionsGroupDefinitions);
30+
await _synchronizationService.StartSynchronization(sessionId, client, synchronizationStartRequest.ActionsGroupDefinitions);
3131

3232
var response = req.CreateResponse();
33-
await response.WriteAsJsonAsync(result, HttpStatusCode.OK);
33+
response.StatusCode = HttpStatusCode.OK;
3434

3535
return response;
3636
}

src/ByteSync.ServerCommon/Interfaces/Services/ISynchronizationProgressService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public interface ISynchronizationProgressService
1212

1313
Task UpdateSynchronizationProgress(SynchronizationEntity synchronizationEntity, bool needSendSynchronizationUpdated);
1414

15-
Task<Synchronization> InformSynchronizationStarted(SynchronizationEntity synchronizationEntity, Client client);
15+
Task InformSynchronizationStarted(SynchronizationEntity synchronizationEntity, Client client);
1616

1717
Task<Synchronization> MapToSynchronization(SynchronizationEntity synchronizationEntity);
1818

src/ByteSync.ServerCommon/Interfaces/Services/ISynchronizationService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace ByteSync.ServerCommon.Interfaces.Services;
77

88
public interface ISynchronizationService
99
{
10-
Task<Synchronization> StartSynchronization(string sessionId, Client client, List<ActionsGroupDefinition> actionsGroupDefinitions);
10+
Task StartSynchronization(string sessionId, Client client, List<ActionsGroupDefinition> actionsGroupDefinitions);
1111

1212
Task OnUploadIsFinishedAsync(SharedFileDefinition sharedFileDefinition, int totalParts, Client client);
1313

src/ByteSync.ServerCommon/Repositories/SynchronizationRepository.cs

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,72 @@ namespace ByteSync.ServerCommon.Repositories;
88
public class SynchronizationRepository : BaseRepository<SynchronizationEntity>, ISynchronizationRepository
99
{
1010
private readonly IActionsGroupDefinitionsRepository _actionsGroupDefinitionsRepository;
11-
11+
private readonly ICacheRepository<TrackingActionEntity> _cacheTrackingAction;
12+
1213
public SynchronizationRepository(IRedisInfrastructureService redisInfrastructureService, ICacheRepository<SynchronizationEntity> cacheRepository,
13-
IActionsGroupDefinitionsRepository actionsGroupDefinitionsRepository) : base(redisInfrastructureService, cacheRepository)
14+
IActionsGroupDefinitionsRepository actionsGroupDefinitionsRepository, ICacheRepository<TrackingActionEntity> cacheTrackingAction) : base(redisInfrastructureService, cacheRepository)
1415
{
1516
_actionsGroupDefinitionsRepository = actionsGroupDefinitionsRepository;
17+
_cacheTrackingAction = cacheTrackingAction;
1618
}
1719

1820
public override EntityType EntityType => EntityType.Synchronization;
1921

2022
public async Task AddSynchronization(SynchronizationEntity synchronizationEntity, List<ActionsGroupDefinition> actionsGroupDefinitions)
2123
{
22-
await Save(synchronizationEntity.SessionId, synchronizationEntity);
24+
var synchronizationCacheKey = _cacheService.ComputeCacheKey(EntityType.Synchronization, synchronizationEntity.SessionId);
25+
var synchronizationLock = await _cacheService.AcquireLockAsync(synchronizationCacheKey);
26+
27+
await Save(synchronizationEntity.SessionId, synchronizationEntity, null, synchronizationLock);
28+
29+
// foreach (var groupDefinition in actionsGroupDefinitions)
30+
// {
31+
// var trackingActionEntity = new TrackingActionEntity
32+
// {
33+
// ActionsGroupId = groupDefinition.ActionsGroupId,
34+
// SourceClientInstanceId = groupDefinition.Source,
35+
// TargetClientInstanceIds = [..groupDefinition.Targets],
36+
// Size = groupDefinition.Size,
37+
// };
38+
//
39+
// var cacheKey = _redisInfrastructureService2.ComputeCacheKey(EntityType.TrackingAction, $"{synchronizationEntity.SessionId}_{groupDefinition.ActionsGroupId}");
40+
//
41+
// await _cacheTrackingAction.Save(cacheKey, trackingActionEntity, null, synchronizationLock);
42+
// }
43+
44+
var semaphore = new SemaphoreSlim(20); // Limite à 20 tâches en parallèle
45+
var tasks = actionsGroupDefinitions.Select(async groupDefinition =>
46+
{
47+
await semaphore.WaitAsync();
48+
try
49+
{
50+
var trackingActionEntity = new TrackingActionEntity
51+
{
52+
ActionsGroupId = groupDefinition.ActionsGroupId,
53+
SourceClientInstanceId = groupDefinition.Source,
54+
TargetClientInstanceIds = [..groupDefinition.Targets],
55+
Size = groupDefinition.Size,
56+
};
57+
58+
var cacheKey = _cacheService.ComputeCacheKey(EntityType.TrackingAction, $"{synchronizationEntity.SessionId}_{groupDefinition.ActionsGroupId}");
59+
60+
await _cacheTrackingAction.Save(cacheKey, trackingActionEntity, null, synchronizationLock);
61+
}
62+
finally
63+
{
64+
semaphore.Release();
65+
}
66+
});
67+
68+
await Task.WhenAll(tasks);
2369

24-
await _actionsGroupDefinitionsRepository.AddOrUpdateActionsGroupDefinitions(synchronizationEntity.SessionId, actionsGroupDefinitions);
70+
// await _actionsGroupDefinitionsRepository.AddOrUpdateActionsGroupDefinitions(synchronizationEntity.SessionId, actionsGroupDefinitions);
2571
}
2672

2773
public async Task ResetSession(string sessionId)
2874
{
2975
await Delete(sessionId);
3076

31-
await _actionsGroupDefinitionsRepository.DeleteActionsGroupDefinitions(sessionId);
77+
// await _actionsGroupDefinitionsRepository.DeleteActionsGroupDefinitions(sessionId);
3278
}
3379
}

src/ByteSync.ServerCommon/Repositories/TrackingActionRepository.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private async Task<TrackingActionEntity> DoGetOrBuild(string sessionId, string a
4848
{
4949
trackingActionEntity = await _trackingActionEntityFactory.Create(sessionId, actionsGroupId);
5050

51-
await Save(cacheKey, trackingActionEntity, null, actionsGroupIdLock);
51+
// await Save(cacheKey, trackingActionEntity, null, actionsGroupIdLock);
5252
}
5353

5454
return trackingActionEntity;

src/ByteSync.ServerCommon/Services/RedisInfrastructureService.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,20 @@ public async Task<IRedLock> AcquireLockAsync(CacheKey cacheKey)
9090
{
9191
throw new AcquireRedisLockException(cacheKey.Value, redisLock);
9292
}
93+
94+
// var myLock = new MyLock
95+
// {
96+
// Resource = cacheKey.Value,
97+
// LockId = Guid.NewGuid().ToString(),
98+
// IsAcquired = true,
99+
// Status = RedLockStatus.Acquired,
100+
// InstanceSummary = new RedLockInstanceSummary(),
101+
// ExtendCount = 0
102+
// };
103+
//
104+
// return myLock;
105+
106+
93107
}
94108

95109
public CacheKey ComputeCacheKey(EntityType entityType, string entityId)
@@ -98,4 +112,30 @@ public CacheKey ComputeCacheKey(EntityType entityType, string entityId)
98112

99113
return cacheKey;
100114
}
115+
116+
// public class MyLock : IRedLock
117+
// {
118+
// public MyLock()
119+
// {
120+
//
121+
// }
122+
//
123+
//
124+
// public void Dispose()
125+
// {
126+
//
127+
// }
128+
//
129+
// public ValueTask DisposeAsync()
130+
// {
131+
// return ValueTask.CompletedTask;
132+
// }
133+
//
134+
// public string Resource { get; set; }
135+
// public string LockId { get; set; }
136+
// public bool IsAcquired { get; set; }
137+
// public RedLockStatus Status { get; set; }
138+
// public RedLockInstanceSummary InstanceSummary { get; set; }
139+
// public int ExtendCount { get; set; }
140+
// }
101141
}

src/ByteSync.ServerCommon/Services/SynchronizationProgressService.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@ public async Task UpdateSynchronizationProgress(SynchronizationEntity synchroniz
4545
}
4646
}
4747

48-
public async Task<Synchronization> InformSynchronizationStarted(SynchronizationEntity synchronizationEntity, Client client)
48+
public async Task InformSynchronizationStarted(SynchronizationEntity synchronizationEntity, Client client)
4949
{
5050
var synchronization = await MapToSynchronization(synchronizationEntity);
51-
await _invokeClientsService.SessionGroupExcept(synchronization.SessionId, client).SynchronizationStarted(synchronization);
52-
53-
return synchronization;
51+
await _invokeClientsService.SessionGroup(synchronization.SessionId).SynchronizationStarted(synchronization);
5452
}
5553

5654
public async Task UploadIsFinished(SharedFileDefinition sharedFileDefinition, int totalParts, HashSet<string> targetInstanceIds)

0 commit comments

Comments
 (0)