Skip to content

Commit 1dcddb3

Browse files
committed
Add streaming support for SSH commands with SshCommandStream
Introduce `SshCommandStream` for streaming stdout and stderr, avoiding memory buffering. Add `ExecuteCommandStreaming` and async counterpart to `SshSession`. Extend tests to cover streaming scenarios, including incremental output and file redirection.
1 parent 7c8a3ed commit 1dcddb3

File tree

4 files changed

+612
-1
lines changed

4 files changed

+612
-1
lines changed

src/NullOpsDevs.LibSsh.Test/Program.cs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,12 @@ private static async Task RunCommandTests()
222222
await RunTest("Command with PTY (Xterm256Color)", TestCommandWithXterm256);
223223
await RunTest("Command with custom terminal modes", TestCommandWithCustomModes);
224224
await RunTest("Command with cancellation", TestCommandCancellation);
225+
await RunTest("Streaming command stdout", TestStreamingCommandStdout);
226+
await RunTest("Streaming command stderr", TestStreamingCommandStderr);
227+
await RunTest("Streaming command to file", TestStreamingCommandToFile);
228+
await RunTest("Streaming command exit code", TestStreamingCommandExitCode);
229+
await RunTest("Streaming async command", TestStreamingCommandAsync);
230+
await RunTest("Streaming incremental output", TestStreamingIncrementalOutput);
225231
}
226232

227233
private static Task<bool> TestSimpleCommand()
@@ -300,6 +306,200 @@ private static Task<bool> TestCommandCancellation()
300306
}
301307
}
302308

309+
private static Task<bool> TestStreamingCommandStdout()
310+
{
311+
using var session = TestHelper.CreateConnectAndAuthenticate();
312+
using var stream = session.ExecuteCommandStreaming("echo 'Streaming test output'");
313+
314+
using var reader = new StreamReader(stream.Stdout);
315+
var output = reader.ReadToEnd();
316+
317+
var result = stream.WaitForExit();
318+
return Task.FromResult(result.Successful && output.Contains("Streaming test output") && result.ExitCode == 0);
319+
}
320+
321+
private static Task<bool> TestStreamingCommandStderr()
322+
{
323+
using var session = TestHelper.CreateConnectAndAuthenticate();
324+
using var stream = session.ExecuteCommandStreaming("ls /nonexistent >&2");
325+
326+
using var stdoutReader = new StreamReader(stream.Stdout);
327+
using var stderrReader = new StreamReader(stream.Stderr);
328+
329+
var stdout = stdoutReader.ReadToEnd();
330+
var stderr = stderrReader.ReadToEnd();
331+
332+
var result = stream.WaitForExit();
333+
return Task.FromResult(stderr.Length > 0 && result.ExitCode != 0);
334+
}
335+
336+
private static Task<bool> TestStreamingCommandToFile()
337+
{
338+
using var session = TestHelper.CreateConnectAndAuthenticate();
339+
using var stream = session.ExecuteCommandStreaming("cat /test-files/large.dat | head -c 100000");
340+
341+
var tempFile = Path.GetTempFileName();
342+
try
343+
{
344+
using (var fileStream = File.Create(tempFile))
345+
{
346+
stream.Stdout.CopyTo(fileStream);
347+
}
348+
349+
var result = stream.WaitForExit();
350+
var fileInfo = new FileInfo(tempFile);
351+
352+
return Task.FromResult(result.Successful && fileInfo.Length > 1000);
353+
}
354+
finally
355+
{
356+
File.Delete(tempFile);
357+
}
358+
}
359+
360+
private static Task<bool> TestStreamingCommandExitCode()
361+
{
362+
using var session = TestHelper.CreateConnectAndAuthenticate();
363+
using var stream = session.ExecuteCommandStreaming("exit 42");
364+
365+
// Drain the streams (even though they're empty)
366+
using var stdoutReader = new StreamReader(stream.Stdout);
367+
stdoutReader.ReadToEnd();
368+
369+
var result = stream.WaitForExit();
370+
return Task.FromResult(result.ExitCode == 42);
371+
}
372+
373+
private static async Task<bool> TestStreamingCommandAsync()
374+
{
375+
using var session = TestHelper.CreateConnectAndAuthenticate();
376+
using var stream = await session.ExecuteCommandStreamingAsync("echo 'Async streaming test'");
377+
378+
using var reader = new StreamReader(stream.Stdout);
379+
var output = await reader.ReadToEndAsync();
380+
381+
var result = stream.WaitForExit();
382+
return result.Successful && output.Contains("Async streaming test");
383+
}
384+
385+
private static async Task<bool> TestStreamingIncrementalOutput()
386+
{
387+
AnsiConsole.MarkupLine("[dim] Starting incremental streaming test...[/]");
388+
389+
using var session = TestHelper.CreateConnectAndAuthenticate();
390+
AnsiConsole.MarkupLine("[dim] Session authenticated, executing streaming command...[/]");
391+
392+
using var stream = await session.ExecuteCommandStreamingAsync("echo 1; sleep 1; echo 2; sleep 1; echo 3; sleep 1;");
393+
AnsiConsole.MarkupLine("[dim] Command started, reading stdout stream...[/]");
394+
395+
var lines = new List<(string Line, TimeSpan Elapsed)>();
396+
var stopwatch = Stopwatch.StartNew();
397+
var buffer = new byte[1024];
398+
var lineBuffer = new StringBuilder();
399+
400+
// Use a timeout to prevent hanging
401+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
402+
403+
try
404+
{
405+
var readCount = 0;
406+
while (true)
407+
{
408+
var bytesRead = await stream.Stdout.ReadAsync(buffer, cts.Token);
409+
readCount++;
410+
411+
if (bytesRead == 0)
412+
{
413+
AnsiConsole.MarkupLine($"[dim] Read #{readCount}: EOF reached at {stopwatch.ElapsedMilliseconds}ms[/]");
414+
break;
415+
}
416+
417+
var text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
418+
var escapedText = Markup.Escape(text.Replace("\n", "\\n").Replace("\r", "\\r"));
419+
AnsiConsole.MarkupLine($"[dim] Read #{readCount}: {bytesRead} bytes at {stopwatch.ElapsedMilliseconds}ms: \"{escapedText}\"[/]");
420+
421+
lineBuffer.Append(text);
422+
423+
// Check for complete lines
424+
var content = lineBuffer.ToString();
425+
var lastNewline = content.LastIndexOf('\n');
426+
if (lastNewline >= 0)
427+
{
428+
var completeLines = content[..(lastNewline + 1)];
429+
lineBuffer.Clear();
430+
lineBuffer.Append(content[(lastNewline + 1)..]);
431+
432+
foreach (var line in completeLines.Split('\n', StringSplitOptions.RemoveEmptyEntries))
433+
{
434+
var trimmedLine = line.Trim();
435+
lines.Add((trimmedLine, stopwatch.Elapsed));
436+
AnsiConsole.MarkupLine($"[blue] Line received: \"{Markup.Escape(trimmedLine)}\" at {stopwatch.ElapsedMilliseconds}ms[/]");
437+
}
438+
}
439+
}
440+
}
441+
catch (OperationCanceledException)
442+
{
443+
AnsiConsole.MarkupLine("[red] TIMEOUT: Test timed out after 15 seconds[/]");
444+
return false;
445+
}
446+
447+
stopwatch.Stop();
448+
AnsiConsole.MarkupLine($"[dim] Stream reading complete. Total time: {stopwatch.ElapsedMilliseconds}ms[/]");
449+
450+
var result = stream.WaitForExit();
451+
AnsiConsole.MarkupLine($"[dim] Command exited with code: {result.ExitCode}[/]");
452+
453+
// Verify we got all 3 lines
454+
if (lines.Count != 3)
455+
{
456+
AnsiConsole.MarkupLine($"[red] FAILED: Expected 3 lines, got {lines.Count}[/]");
457+
return false;
458+
}
459+
460+
// Verify the content
461+
if (lines[0].Line != "1" || lines[1].Line != "2" || lines[2].Line != "3")
462+
{
463+
AnsiConsole.MarkupLine($"[red] FAILED: Unexpected content. Got: {string.Join(", ", lines.Select(l => $"\"{l.Line}\""))}[/]");
464+
return false;
465+
}
466+
467+
// Verify timing: each line should appear ~1 second apart (with some tolerance)
468+
var timeBetween1And2 = lines[1].Elapsed - lines[0].Elapsed;
469+
var timeBetween2And3 = lines[2].Elapsed - lines[1].Elapsed;
470+
471+
AnsiConsole.MarkupLine($"[dim] Gap between line 1 and 2: {timeBetween1And2.TotalMilliseconds:F0}ms[/]");
472+
AnsiConsole.MarkupLine($"[dim] Gap between line 2 and 3: {timeBetween2And3.TotalMilliseconds:F0}ms[/]");
473+
474+
// Allow 500ms-2000ms between lines (accounting for network latency and timing variations)
475+
var minGap = TimeSpan.FromMilliseconds(500);
476+
var maxGap = TimeSpan.FromMilliseconds(2000);
477+
478+
if (timeBetween1And2 < minGap || timeBetween1And2 > maxGap)
479+
{
480+
AnsiConsole.MarkupLine($"[red] FAILED: Gap between line 1-2 ({timeBetween1And2.TotalMilliseconds:F0}ms) outside expected range {minGap.TotalMilliseconds}-{maxGap.TotalMilliseconds}ms[/]");
481+
return false;
482+
}
483+
484+
if (timeBetween2And3 < minGap || timeBetween2And3 > maxGap)
485+
{
486+
AnsiConsole.MarkupLine($"[red] FAILED: Gap between line 2-3 ({timeBetween2And3.TotalMilliseconds:F0}ms) outside expected range {minGap.TotalMilliseconds}-{maxGap.TotalMilliseconds}ms[/]");
487+
return false;
488+
}
489+
490+
// Total time should be around 3+ seconds
491+
if (stopwatch.Elapsed < TimeSpan.FromSeconds(2.5))
492+
{
493+
AnsiConsole.MarkupLine($"[red] FAILED: Total time ({stopwatch.ElapsedMilliseconds}ms) too short, expected at least 2500ms[/]");
494+
return false;
495+
}
496+
497+
AnsiConsole.MarkupLine($"[green] -> [/] Line timings: {lines[0].Elapsed.TotalMilliseconds:F0}ms, {lines[1].Elapsed.TotalMilliseconds:F0}ms, {lines[2].Elapsed.TotalMilliseconds:F0}ms");
498+
AnsiConsole.MarkupLine($"[green] -> [/] Gaps: {timeBetween1And2.TotalMilliseconds:F0}ms, {timeBetween2And3.TotalMilliseconds:F0}ms | Total: {stopwatch.ElapsedMilliseconds}ms");
499+
500+
return result.ExitCode == 0;
501+
}
502+
303503
#endregion
304504

305505
#region File Transfer Tests
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
using NullOpsDevs.LibSsh.Generated;
2+
3+
namespace NullOpsDevs.LibSsh.Core;
4+
5+
/// <summary>
6+
/// A read-only stream that reads from a libssh2 channel.
7+
/// This stream does not own the channel and will not close it when disposed.
8+
/// </summary>
9+
internal sealed unsafe class SshChannelStream : Stream
10+
{
11+
private readonly _LIBSSH2_CHANNEL* _channel;
12+
private readonly int _streamId;
13+
private bool _isDisposed;
14+
private bool _isEof;
15+
16+
/// <summary>
17+
/// Creates a new channel stream for reading from the specified stream ID.
18+
/// </summary>
19+
/// <param name="channel">The libssh2 channel pointer.</param>
20+
/// <param name="streamId">The stream ID to read from (0 for stdout, 1 for stderr).</param>
21+
public SshChannelStream(_LIBSSH2_CHANNEL* channel, int streamId)
22+
{
23+
_channel = channel;
24+
_streamId = streamId;
25+
}
26+
27+
/// <inheritdoc />
28+
public override bool CanRead => !_isDisposed;
29+
30+
/// <inheritdoc />
31+
public override bool CanSeek => false;
32+
33+
/// <inheritdoc />
34+
public override bool CanWrite => false;
35+
36+
/// <inheritdoc />
37+
public override long Length => throw new NotSupportedException("Length is not supported on SshChannelStream.");
38+
39+
/// <inheritdoc />
40+
public override long Position
41+
{
42+
get => throw new NotSupportedException("Position is not supported on SshChannelStream.");
43+
set => throw new NotSupportedException("Position is not supported on SshChannelStream.");
44+
}
45+
46+
/// <inheritdoc />
47+
public override int Read(byte[] buffer, int offset, int count)
48+
{
49+
if (buffer == null)
50+
throw new ArgumentNullException(nameof(buffer));
51+
if (offset < 0)
52+
throw new ArgumentOutOfRangeException(nameof(offset), "Offset cannot be negative.");
53+
if (count < 0)
54+
throw new ArgumentOutOfRangeException(nameof(count), "Count cannot be negative.");
55+
if (offset + count > buffer.Length)
56+
throw new ArgumentException("The sum of offset and count is greater than the buffer length.");
57+
58+
return ReadCore(buffer.AsSpan(offset, count));
59+
}
60+
61+
#if NET6_0_OR_GREATER
62+
/// <inheritdoc />
63+
public override int Read(Span<byte> buffer)
64+
{
65+
return ReadCore(buffer);
66+
}
67+
#endif
68+
69+
private int ReadCore(Span<byte> buffer)
70+
{
71+
if (_isDisposed)
72+
throw new ObjectDisposedException(nameof(SshChannelStream));
73+
74+
if (_isEof || buffer.Length == 0)
75+
return 0;
76+
77+
// Pin the managed buffer and read directly into it
78+
fixed (byte* bufferPtr = buffer)
79+
{
80+
var bytesRead = LibSshNative.libssh2_channel_read_ex(
81+
_channel,
82+
_streamId,
83+
(sbyte*)bufferPtr,
84+
(nuint)buffer.Length);
85+
86+
if (bytesRead < 0)
87+
{
88+
// Error occurred - treat as EOF for stream purposes
89+
// The actual error can be retrieved via the session
90+
_isEof = true;
91+
return 0;
92+
}
93+
94+
if (bytesRead == 0)
95+
{
96+
_isEof = true;
97+
return 0;
98+
}
99+
100+
return (int)bytesRead;
101+
}
102+
}
103+
104+
/// <inheritdoc />
105+
public override void Flush()
106+
{
107+
// No-op for read-only stream
108+
}
109+
110+
/// <inheritdoc />
111+
public override long Seek(long offset, SeekOrigin origin)
112+
{
113+
throw new NotSupportedException("Seek is not supported on SshChannelStream.");
114+
}
115+
116+
/// <inheritdoc />
117+
public override void SetLength(long value)
118+
{
119+
throw new NotSupportedException("SetLength is not supported on SshChannelStream.");
120+
}
121+
122+
/// <inheritdoc />
123+
public override void Write(byte[] buffer, int offset, int count)
124+
{
125+
throw new NotSupportedException("Write is not supported on SshChannelStream.");
126+
}
127+
128+
/// <inheritdoc />
129+
protected override void Dispose(bool disposing)
130+
{
131+
if (_isDisposed)
132+
return;
133+
134+
_isDisposed = true;
135+
base.Dispose(disposing);
136+
}
137+
}

0 commit comments

Comments
 (0)