Skip to content

Commit 3563675

Browse files
vkuttypclaude
andcommitted
v2.5.9: add CosmoSQLClient.CosmoKvPipes driver (binary protocol over UDS)
Mirror image of cosmokvd v0.4.0's PipesListener. Implements ISqlDatabase via length-prefixed binary frames on a Unix Domain Socket — no HTTP, no JSON, no Kestrel framing. Connection string: Endpoint=pipes:///path/to/sock;AuthToken=<token> Scope: - AUTH handshake on Open (bearer token, FixedTimeEquals on server) - QUERY (returns IReadOnlyList<SqlRow>) - ExecuteAsync (returns int rows-affected) - QueryStreamAsync (buffer-and-yield — cosmokvd's binary protocol returns the full result set in one frame today) - QueryTableAsync, QueryJsonStreamAsync (built on top of QueryAsync) - BeginTransactionAsync / Commit / Rollback throw NotSupportedException; callers needing explicit transactions stay on CosmoSQLClient.CosmoKvHttp Framing matches the server's FrameCodec byte-for-byte: u8 opcode + u32-LE body_len + body. SqlValue payloads use the same InvariantCulture decimal-text encoding the JSON wire uses, so the only difference vs HTTP is the envelope. Concurrency: one in-process SemaphoreSlim(1,1) serialises Query/Execute on the connection's single socket. Open multiple connections for parallelism — same model as MsSql / Sqlite / Postgres drivers. Pipe-lifecycle constraint borrowed from the server-side codec: body arrays are owned (ToArray'd) BEFORE PipeReader.AdvanceTo, otherwise the pipe can reclaim the underlying memory and any later access throws ArgumentOutOfRangeException. 6 xUnit end-to-end tests against a real CosmoKvD instance: - Open + dispose - Bad token → SqlException.Auth - Full CREATE/INSERT/SELECT round-trip (text + int + null params) - Null parameter encode/decode - 50 sequential statements on one connection - BeginTransactionAsync throws NotSupported Honesty about perf: same UDS + concurrent-reads lesson applies — binary protocol wins only become measurable after the upstream CosmoKvConnection._lock is broken. Infrastructure here for that future + immediately useful for low-saturation latency-sensitive workloads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 826cdc4 commit 3563675

9 files changed

Lines changed: 887 additions & 1 deletion

CosmoSQLClient-Dotnet.sln

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmoSQLClient.CosmoKvHttp"
4343
EndProject
4444
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmoSQLClient.CosmoKv.Cli", "src\CosmoSQLClient.CosmoKv.Cli\CosmoSQLClient.CosmoKv.Cli.csproj", "{520F292D-D5B8-415F-84DD-B3072EBBBE63}"
4545
EndProject
46+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmoSQLClient.CosmoKvPipes", "src\CosmoSQLClient.CosmoKvPipes\CosmoSQLClient.CosmoKvPipes.csproj", "{A0DA0036-A751-4808-89D3-C28857D09C98}"
47+
EndProject
48+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmoSQLClient.CosmoKvPipes.Tests", "tests\CosmoSQLClient.CosmoKvPipes.Tests\CosmoSQLClient.CosmoKvPipes.Tests.csproj", "{21733160-6FC3-4171-9328-60FDF36883DB}"
49+
EndProject
50+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CosmoKvD", "..\..\CosmoKvD\src\CosmoKvD\CosmoKvD.csproj", "{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}"
51+
EndProject
4652
Global
4753
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4854
Debug|Any CPU = Debug|Any CPU
@@ -269,6 +275,42 @@ Global
269275
{520F292D-D5B8-415F-84DD-B3072EBBBE63}.Release|x64.Build.0 = Release|Any CPU
270276
{520F292D-D5B8-415F-84DD-B3072EBBBE63}.Release|x86.ActiveCfg = Release|Any CPU
271277
{520F292D-D5B8-415F-84DD-B3072EBBBE63}.Release|x86.Build.0 = Release|Any CPU
278+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
279+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Debug|Any CPU.Build.0 = Debug|Any CPU
280+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Debug|x64.ActiveCfg = Debug|Any CPU
281+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Debug|x64.Build.0 = Debug|Any CPU
282+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Debug|x86.ActiveCfg = Debug|Any CPU
283+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Debug|x86.Build.0 = Debug|Any CPU
284+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Release|Any CPU.ActiveCfg = Release|Any CPU
285+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Release|Any CPU.Build.0 = Release|Any CPU
286+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Release|x64.ActiveCfg = Release|Any CPU
287+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Release|x64.Build.0 = Release|Any CPU
288+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Release|x86.ActiveCfg = Release|Any CPU
289+
{A0DA0036-A751-4808-89D3-C28857D09C98}.Release|x86.Build.0 = Release|Any CPU
290+
{21733160-6FC3-4171-9328-60FDF36883DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
291+
{21733160-6FC3-4171-9328-60FDF36883DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
292+
{21733160-6FC3-4171-9328-60FDF36883DB}.Debug|x64.ActiveCfg = Debug|Any CPU
293+
{21733160-6FC3-4171-9328-60FDF36883DB}.Debug|x64.Build.0 = Debug|Any CPU
294+
{21733160-6FC3-4171-9328-60FDF36883DB}.Debug|x86.ActiveCfg = Debug|Any CPU
295+
{21733160-6FC3-4171-9328-60FDF36883DB}.Debug|x86.Build.0 = Debug|Any CPU
296+
{21733160-6FC3-4171-9328-60FDF36883DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
297+
{21733160-6FC3-4171-9328-60FDF36883DB}.Release|Any CPU.Build.0 = Release|Any CPU
298+
{21733160-6FC3-4171-9328-60FDF36883DB}.Release|x64.ActiveCfg = Release|Any CPU
299+
{21733160-6FC3-4171-9328-60FDF36883DB}.Release|x64.Build.0 = Release|Any CPU
300+
{21733160-6FC3-4171-9328-60FDF36883DB}.Release|x86.ActiveCfg = Release|Any CPU
301+
{21733160-6FC3-4171-9328-60FDF36883DB}.Release|x86.Build.0 = Release|Any CPU
302+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
303+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Debug|Any CPU.Build.0 = Debug|Any CPU
304+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Debug|x64.ActiveCfg = Debug|Any CPU
305+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Debug|x64.Build.0 = Debug|Any CPU
306+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Debug|x86.ActiveCfg = Debug|Any CPU
307+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Debug|x86.Build.0 = Debug|Any CPU
308+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
309+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Release|Any CPU.Build.0 = Release|Any CPU
310+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Release|x64.ActiveCfg = Release|Any CPU
311+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Release|x64.Build.0 = Release|Any CPU
312+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Release|x86.ActiveCfg = Release|Any CPU
313+
{A98757EB-42FB-4D2E-BB8F-9E07AF3009A5}.Release|x86.Build.0 = Release|Any CPU
272314
EndGlobalSection
273315
GlobalSection(SolutionProperties) = preSolution
274316
HideSolutionNode = FALSE
@@ -292,5 +334,7 @@ Global
292334
{65E8881F-CEEC-4085-84DC-7BBFDE746E43} = {0AB3BF05-4346-4AA6-1389-037BE0695223}
293335
{89630EF9-0FE4-4C0C-A858-798CDCE700E3} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
294336
{520F292D-D5B8-415F-84DD-B3072EBBBE63} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
337+
{A0DA0036-A751-4808-89D3-C28857D09C98} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
338+
{21733160-6FC3-4171-9328-60FDF36883DB} = {0AB3BF05-4346-4AA6-1389-037BE0695223}
295339
EndGlobalSection
296340
EndGlobal

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
<Authors>vkuttyp</Authors>
88
<PackageLicenseExpression>MIT</PackageLicenseExpression>
99
<RepositoryUrl>https://github.com/vkuttyp/CosmoSQLClient-Dotnet</RepositoryUrl>
10-
<Version>2.5.8</Version>
10+
<Version>2.5.9</Version>
1111
</PropertyGroup>
1212
</Project>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace CosmoSQLClient.CosmoKvPipes;
2+
3+
/// <summary>
4+
/// Wire-format constants for the cosmokvd binary SQL protocol — client-side
5+
/// mirror of <c>CosmoKvD.Pipes.BinaryProtocol</c>. Duplicated rather than
6+
/// shared via a third assembly because the surface is small enough that
7+
/// a shared dependency would be more overhead than help. Any change must
8+
/// be applied to both copies in lockstep.
9+
/// </summary>
10+
internal static class BinaryProtocol
11+
{
12+
public const uint Magic = 0x4B564364; // 'KVCd'
13+
public const byte Version = 1;
14+
15+
public const byte OpAuth = 0x01;
16+
public const byte OpQuery = 0x02;
17+
public const byte OpExecute = 0x03;
18+
public const byte OpPing = 0x04;
19+
20+
public const byte StatusOk = 0x00;
21+
public const byte StatusError = 0x01;
22+
23+
public const string ErrAuthRequired = "auth_required";
24+
public const string ErrBadFrame = "bad_frame";
25+
public const string ErrUniqueViolation = "unique_violation";
26+
public const string ErrTxnConflict = "txn_conflict";
27+
public const string ErrInvalidOp = "invalid_operation";
28+
public const string ErrInternal = "internal_error";
29+
30+
public const int HeaderSize = 5;
31+
public const int MaxFrameBodyBytes = 64 * 1024 * 1024;
32+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
namespace CosmoSQLClient.CosmoKvPipes;
2+
3+
/// <summary>
4+
/// Connection settings for <see cref="CosmoKvPipesConnection"/>. Built from
5+
/// the connection string of the form
6+
/// <c>Endpoint=pipes:///path/to/sock;AuthToken=secret;[Timeout=30]</c>.
7+
/// The <c>pipes://</c> scheme is required and bare-host path is the
8+
/// absolute filesystem path to the Unix socket cosmokvd is listening on
9+
/// (i.e. <c>COSMOKVD_PIPES_PATH</c> on the server).
10+
/// </summary>
11+
public sealed record CosmoKvPipesConfiguration
12+
{
13+
/// <summary>Required. Must start with <c>pipes://</c>.</summary>
14+
public required string Endpoint { get; init; }
15+
16+
/// <summary>Required bearer token; must match the server's COSMOKVD_AUTH_TOKEN.</summary>
17+
public required string AuthToken { get; init; }
18+
19+
/// <summary>Per-request timeout. Default 30 s.</summary>
20+
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(30);
21+
22+
/// <summary>Absolute socket-file path parsed out of <see cref="Endpoint"/>.</summary>
23+
public string SocketPath
24+
{
25+
get
26+
{
27+
const string Prefix = "pipes://";
28+
if (!Endpoint.StartsWith(Prefix, StringComparison.OrdinalIgnoreCase))
29+
throw new ArgumentException(
30+
$"Endpoint must start with '{Prefix}' (got '{Endpoint}').", nameof(Endpoint));
31+
return Endpoint.Substring(Prefix.Length);
32+
}
33+
}
34+
35+
public string ConnectionString =>
36+
$"Endpoint={Endpoint};AuthToken={AuthToken};Timeout={(int)Timeout.TotalSeconds}";
37+
38+
public static CosmoKvPipesConfiguration Parse(string connectionString)
39+
{
40+
if (string.IsNullOrWhiteSpace(connectionString))
41+
throw new ArgumentException("Connection string is empty.", nameof(connectionString));
42+
43+
var dict = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
44+
foreach (var part in connectionString.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
45+
{
46+
int eq = part.IndexOf('=');
47+
if (eq < 0) continue;
48+
dict[part[..eq].Trim()] = part[(eq + 1)..].Trim();
49+
}
50+
51+
string? Get(params string[] keys)
52+
{
53+
foreach (var k in keys)
54+
if (dict.TryGetValue(k, out var v)) return v;
55+
return null;
56+
}
57+
58+
var endpoint = Get("Endpoint", "Url")
59+
?? throw new ArgumentException("Connection string missing 'Endpoint'.", nameof(connectionString));
60+
var token = Get("AuthToken", "Token")
61+
?? throw new ArgumentException("Connection string missing 'AuthToken'.", nameof(connectionString));
62+
63+
var timeout = TimeSpan.FromSeconds(30);
64+
if (Get("Timeout") is string t && int.TryParse(t, out var sec) && sec > 0)
65+
timeout = TimeSpan.FromSeconds(sec);
66+
67+
return new CosmoKvPipesConfiguration
68+
{
69+
Endpoint = endpoint,
70+
AuthToken = token,
71+
Timeout = timeout,
72+
};
73+
}
74+
}

0 commit comments

Comments
 (0)