Skip to content

Commit 17c7cf6

Browse files
committed
Refactor Docker container management to use MultiplexedStream for log handling
1 parent 837df02 commit 17c7cf6

2 files changed

Lines changed: 47 additions & 37 deletions

File tree

src/Core/ContainerInstance.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.IO;
4+
using Docker.DotNet;
45

56
namespace Squadron;
67

@@ -66,7 +67,7 @@ public class ContainerInstance : IDisposable
6667
/// </value>
6768
public IList<string> Logs { get; set; } = new List<string>();
6869

69-
internal Stream? LogStream { get; set; }
70+
internal MultiplexedStream? LogStream { get; set; }
7071

7172
public void Dispose()
7273
{

src/Core/DockerContainerManager.cs

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -71,30 +71,30 @@ private AuthConfig GetAuthConfig()
7171

7272
return GetAuthConfig(registryConfig);
7373
}
74-
74+
7575
return TrySetDefaultAuthConfig(_settings.Image);
7676
}
77-
77+
7878
private AuthConfig GetAuthConfig(DockerRegistryConfiguration registryConfig)
7979
{
8080
return new AuthConfig
8181
{
82-
Username = string.IsNullOrEmpty(registryConfig.Username)?null:registryConfig.Username,
83-
Password = string.IsNullOrEmpty(registryConfig.Password)?null:registryConfig.Password,
82+
Username = string.IsNullOrEmpty(registryConfig.Username) ? null : registryConfig.Username,
83+
Password = string.IsNullOrEmpty(registryConfig.Password) ? null : registryConfig.Password,
8484
ServerAddress = registryConfig.Address
8585
};
8686
}
8787

8888
private AuthConfig TrySetDefaultAuthConfig(string imageName)
8989
{
9090
var registryName = "index.docker.io";
91-
91+
9292
try
9393
{
9494
registryName = new Uri(imageName).Host;
9595
}
96-
catch{}
97-
96+
catch { }
97+
9898
DockerRegistryConfiguration? registryConfig = _dockerConfiguration
9999
.Registries
100100
.FirstOrDefault(x => x.Name.Equals(
@@ -205,9 +205,9 @@ await Retry(async () =>
205205

206206
await Client.Containers.ExtractArchiveToContainerAsync(
207207
Instance.Id,
208-
new ContainerPathStatParameters
208+
new CopyToContainerParameters
209209
{
210-
AllowOverwriteDirWithFile = true, Path = context.DestinationFolder.Replace("\\", "/")
210+
Path = context.DestinationFolder.Replace("\\", "/")
211211
}, archiver.Stream);
212212
});
213213
}
@@ -216,15 +216,15 @@ await Client.Containers.ExtractArchiveToContainerAsync(
216216
public async Task<string?> InvokeCommandAsync(ContainerExecCreateParameters parameters)
217217
{
218218
ContainerExecCreateResponse response = await Client.Exec
219-
.ExecCreateContainerAsync(Instance.Id, parameters);
219+
.CreateContainerExecAsync(Instance.Id, parameters);
220220

221221
if (string.IsNullOrEmpty(response.ID))
222222
{
223223
return null;
224224
}
225225

226226
using MultiplexedStream stream =
227-
await Client.Exec.StartAndAttachContainerExecAsync(response.ID, false);
227+
await Client.Exec.StartContainerExecAsync(response.ID, new ContainerExecStartParameters());
228228

229229
(var stdout, var stderr) = await stream
230230
.ReadOutputToEndAsync(CancellationToken.None);
@@ -248,12 +248,16 @@ private async Task CreateLogStreamAsync()
248248

249249
var containerStatsParameters = new ContainerLogsParameters
250250
{
251-
Follow = true, ShowStderr = true, ShowStdout = true
251+
Follow = true,
252+
ShowStderr = true,
253+
ShowStdout = true
252254
};
253255

254-
Instance.LogStream = await Client
256+
MultiplexedStream multiplexedStream = await Client
255257
.Containers
256258
.GetContainerLogsAsync(Instance.Id, containerStatsParameters);
259+
260+
Instance.LogStream = multiplexedStream;
257261
}
258262

259263
/// <inheritdoc/>
@@ -597,41 +601,46 @@ private async Task<string> ReadAsync(TimeSpan timeout)
597601

598602
while (true)
599603
{
600-
Task<int> readTask = Instance.LogStream.ReadAsync(buffer, 0, size);
604+
// MultiplexedStream uses a different API - read from the stream
605+
Task<MultiplexedStream.ReadResult> readTask = Instance.LogStream
606+
.ReadOutputAsync(buffer, 0, size, CancellationToken.None);
601607

602608
if (await Task.WhenAny(readTask, timeoutTask) == timeoutTask)
603609
{
604610
break;
605611
}
606612

607-
var read = await readTask;
608-
if (read <= 0)
613+
var readResult = await readTask;
614+
if (readResult.EOF)
609615
{
610616
break;
611617
}
612618

613-
char[] chunkChars = new char[read * 2];
614-
615-
int consumed = 0;
616-
for (int i = 0; i < read; i++)
619+
if (readResult.Count > 0)
617620
{
618-
if (buffer[i] > 31 && buffer[i] < 128)
619-
{
620-
chunkChars[consumed++] = (char)buffer[i];
621-
}
622-
else if (buffer[i] == (byte)'\n')
623-
{
624-
chunkChars[consumed++] = '\r';
625-
chunkChars[consumed++] = '\n';
626-
}
627-
else if (buffer[i] == (byte)'\t')
621+
char[] chunkChars = new char[readResult.Count * 2];
622+
int consumed = 0;
623+
624+
for (int i = 0; i < readResult.Count; i++)
628625
{
629-
chunkChars[consumed++] = '\t';
626+
if (buffer[i] > 31 && buffer[i] < 128)
627+
{
628+
chunkChars[consumed++] = (char)buffer[i];
629+
}
630+
else if (buffer[i] == (byte)'\n')
631+
{
632+
chunkChars[consumed++] = '\r';
633+
chunkChars[consumed++] = '\n';
634+
}
635+
else if (buffer[i] == (byte)'\t')
636+
{
637+
chunkChars[consumed++] = '\t';
638+
}
630639
}
631-
}
632640

633-
string chunk = new string(chunkChars, 0, consumed);
634-
result.Append(chunk);
641+
string chunk = new string(chunkChars, 0, consumed);
642+
result.Append(chunk);
643+
}
635644
}
636645

637646
return result.ToString();
@@ -729,9 +738,9 @@ private Task Retry(Func<Task> execute)
729738
private async Task RetryAction(Exception exception, TimeSpan t, int retryCount, Context c)
730739
{
731740
_settings.Logger.Warning($"Docker command failed {retryCount}. {exception.Message}");
732-
741+
733742
SystemInfoResponse? systemInfo = await Client.System.GetSystemInfoAsync();
734-
743+
735744
if (systemInfo is { DriverStatus: { Count: > 0 } })
736745
{
737746
_settings.Logger.Warning($"Driver status: {string.Join(", ", systemInfo.DriverStatus)}");

0 commit comments

Comments
 (0)