Skip to content

Commit e2c8c78

Browse files
committed
DependencyQueue -> 1.0.0-rc.5
1 parent 989cf1b commit e2c8c78

File tree

3 files changed

+57
-27
lines changed

3 files changed

+57
-27
lines changed

Directory.Packages.props

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
</PropertyGroup>
1818

1919
<ItemGroup>
20-
<PackageVersion Include="DependencyQueue" Version="1.0.0-rc.4" />
20+
<PackageVersion Include="DependencyQueue" Version="1.0.0-rc.5" />
2121
<PackageVersion Include="DotStuff" Version="0.0.0-pre.2" />
22+
<PackageVersion Include="IsExternalInit" Version="1.0.3" />
2223
<PackageVersion Include="Microsoft.Data.SqlClient" Version="[5.2.2, 6.0.0)" />
2324
<PackageVersion Include="Microsoft.PowerShell.SDK" Version="[7.2.24, 7.3.0)" />
2425
<PackageVersion Include="Prequel" Version="1.1.0" />
26+
<PackageVersion Include="Required" Version="1.0.0" />
2527
<PackageVersion Include="Subatomix.Build.Packaging.PowerShellModule" Version="1.1.1" />
2628
<PackageVersion Include="Subatomix.Testing" Version="3.2.0" />
2729
<PackageVersion Include="System.Management.Automation" Version="[7.2.0, 7.3.0)" />

PSql.Deploy.Engine/PSql.Deploy.Engine.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
<ItemGroup>
1818
<PackageReference Include="DependencyQueue" />
19+
<PackageReference Include="IsExternalInit" PrivateAssets="all" />
1920
<PackageReference Include="Prequel" />
21+
<PackageReference Include="Required" PrivateAssets="all" />
2022
<PackageReference Include="Microsoft.Data.SqlClient" />
2123
</ItemGroup>
2224

PSql.Deploy.Engine/Seeds/SeedApplicator.cs

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
namespace PSql.Deploy.Seeds;
88

99
using Queue = DependencyQueue <SeedModule>;
10-
using QueueContext = DependencyQueueContext <SeedModule, ISeedSessionInternal>;
10+
using QueueItemBuilder = DependencyQueueItemBuilder <SeedModule>;
1111
using QueueError = DependencyQueueError ;
1212
using QueueErrorType = DependencyQueueErrorType ;
1313
using CycleError = DependencyQueueCycleError <SeedModule>;
@@ -123,11 +123,8 @@ public async Task ApplyAsync()
123123

124124
ReportApplying();
125125

126-
await queue.RunAsync(
127-
SeedWorkerMainAsync,
128-
Session,
129-
Parallelism.MaxActions,
130-
Session.CancellationToken
126+
await Task.WhenAll(
127+
CreateWorkerContexts(queue).Select(WorkerMainAsync)
131128
);
132129
}
133130
catch (OperationCanceledException)
@@ -149,9 +146,9 @@ await queue.RunAsync(
149146
}
150147
}
151148

152-
private void Populate(DependencyQueue<SeedModule> queue)
149+
private void Populate(Queue queue)
153150
{
154-
var builder = queue.CreateEntryBuilder();
151+
var builder = queue.CreateItemBuilder();
155152

156153
foreach (var module in Seed.Modules)
157154
{
@@ -163,10 +160,10 @@ private void Populate(DependencyQueue<SeedModule> queue)
163160
}
164161
}
165162

166-
private static void Populate(DependencyQueueEntryBuilder<SeedModule> builder, SeedModule module)
163+
private static void Populate(QueueItemBuilder builder, SeedModule module)
167164
{
168165
builder
169-
.NewEntry(module.Name, module)
166+
.NewItem(module.Name, module)
170167
.AddProvides(module.Provides)
171168
.AddRequires(module.Requires)
172169
.Enqueue();
@@ -177,7 +174,7 @@ private static SeedModule Clone(SeedModule module, int workerId)
177174
return new(module.Name, workerId, module.Batches, module.Provides, module.Requires);
178175
}
179176

180-
private void Validate(DependencyQueue<SeedModule> queue)
177+
private void Validate(Queue queue)
181178
{
182179
var errors = queue.Validate();
183180

@@ -187,10 +184,36 @@ private void Validate(DependencyQueue<SeedModule> queue)
187184
throw OnInvalid();
188185
}
189186

190-
private async Task SeedWorkerMainAsync(QueueContext context)
187+
private class WorkerContext
188+
{
189+
public required Queue Queue { get; init; }
190+
public required Guid RunId { get; init; }
191+
public required int WorkerId { get; init; }
192+
}
193+
194+
private IEnumerable<WorkerContext> CreateWorkerContexts(Queue queue)
195+
{
196+
var runId = Guid.NewGuid();
197+
var count = Parallelism.MaxActions;
198+
199+
for (var workerId = 1; workerId <= count; workerId++)
200+
{
201+
yield return new()
202+
{
203+
Queue = queue,
204+
RunId = runId,
205+
WorkerId = workerId
206+
};
207+
}
208+
}
209+
210+
private async Task WorkerMainAsync(WorkerContext context)
191211
{
192212
try
193213
{
214+
// Hop off caller's thread so caller can start other workers
215+
await Task.Yield();
216+
194217
await using var connection = Connect(context);
195218

196219
await PrepareAsync(connection, context);
@@ -199,8 +222,11 @@ bool CanTake(SeedModule module)
199222
=> module.WorkerId == 0
200223
|| module.WorkerId == context.WorkerId;
201224

202-
while (await context.GetNextEntryAsync(CanTake) is { Value: var module })
225+
while (await context.Queue.DequeueAsync(CanTake) is { Value: var module } item)
226+
{
203227
await ExecuteAsync(module, connection, context);
228+
context.Queue.Complete(item);
229+
}
204230
}
205231
catch (OperationCanceledException)
206232
{
@@ -214,47 +240,47 @@ bool CanTake(SeedModule module)
214240
}
215241
}
216242

217-
private ISeedTargetConnection Connect(QueueContext context)
243+
private ISeedTargetConnection Connect(WorkerContext context)
218244
{
219245
Assume.NotNull(_logWriter);
220246

221-
var prefix = $"{context.WorkerId}>";
247+
var prefix = $"W{context.WorkerId}>";
222248
var logger = new PrefixTextWriterSqlMessageLogger(_logWriter, prefix);
223249

224250
return Session.Connect(Target, logger);
225251
}
226252

227-
private async Task PrepareAsync(ISeedTargetConnection connection, QueueContext context)
253+
private async Task PrepareAsync(ISeedTargetConnection connection, WorkerContext context)
228254
{
229-
using var _ = await Parallelism.BeginActionScopeAsync(context.CancellationToken);
255+
using var _ = await Parallelism.BeginActionScopeAsync(Session.CancellationToken);
230256

231-
await connection.OpenAsync(context.CancellationToken);
257+
await connection.OpenAsync(Session.CancellationToken);
232258

233259
await connection.PrepareAsync(
234260
context.RunId,
235261
context.WorkerId,
236-
context.CancellationToken
262+
Session.CancellationToken
237263
);
238264
}
239265

240-
private async Task ExecuteAsync(SeedModule module, ISeedTargetConnection connection, QueueContext context)
266+
private async Task ExecuteAsync(SeedModule module, ISeedTargetConnection connection, WorkerContext context)
241267
{
242-
using var _ = await Parallelism.BeginActionScopeAsync(context.CancellationToken);
268+
using var _ = await Parallelism.BeginActionScopeAsync(Session.CancellationToken);
243269

244270
ReportApplying(module, context.WorkerId);
245271

246272
foreach (var batch in module.Batches)
247-
await connection.ExecuteSeedBatchAsync(batch, context.CancellationToken);
273+
await connection.ExecuteSeedBatchAsync(batch, Session.CancellationToken);
248274

249275
Interlocked.Increment(ref _appliedCount);
250276
}
251277

252-
private void HandleError(Exception e, QueueContext context)
278+
private void HandleError(Exception e, WorkerContext context)
253279
{
254280
if (e.Data is { IsReadOnly: false } data)
255281
data[nameof(context.WorkerId)] = context.WorkerId;
256282

257-
context.SetEnding();
283+
context.Queue.Clear();
258284
}
259285

260286
private void ReportStarting()
@@ -415,8 +441,8 @@ private static string Format(CycleError error)
415441
"The module '{0}' cannot require the topic '{1}' because " +
416442
"a module providing '{1}' already requires '{0}'. " +
417443
"The dependency graph does not permit cycles.",
418-
error.RequiringEntry.Name,
419-
error.RequiredTopic .Name
444+
error.RequiringItem.Name,
445+
error.RequiredTopic.Name
420446
);
421447
}
422448

0 commit comments

Comments
 (0)