Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/csharp/SpacetimeDB.ClientSDK.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

<ItemGroup>
<PackageReference Include="SpacetimeDB.BSATN.Runtime" Version="1.2.*" />
<PackageReference Include="System.Threading.Channels" Version="9.0.7" />

<InternalsVisibleTo Include="SpacetimeDB.Tests" />
</ItemGroup>
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

8 changes: 0 additions & 8 deletions sdks/csharp/packages/spacetimedb.bsatn.runtime/1.2.1/lib.meta

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
39 changes: 23 additions & 16 deletions sdks/csharp/src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using SpacetimeDB.BSATN;
using SpacetimeDB.Internal;
using SpacetimeDB.ClientApi;
using Thread = System.Threading.Thread;
using System.Diagnostics;


Expand Down Expand Up @@ -167,7 +167,7 @@ public abstract class DbConnectionBase<DbConnection, Tables, Reducer> : IDbConne
private readonly Dictionary<Guid, TaskCompletionSource<OneOffQueryResponse>> waitingOneOffQueries = new();

private bool isClosing;
private readonly Thread networkMessageParseThread;
private readonly ConfiguredTaskAwaitable networkMessageParseTask;
public readonly Stats stats = new();

protected DbConnectionBase()
Expand Down Expand Up @@ -198,8 +198,7 @@ protected DbConnectionBase()

#if !(UNITY_WEBGL && !UNITY_EDITOR)
// For targets other than webgl we start a thread to parse messages
networkMessageParseThread = new Thread(ParseMessages);
networkMessageParseThread.Start();
networkMessageParseTask = ParseMessages().ConfigureAwait(false);
#endif
}

Expand Down Expand Up @@ -257,14 +256,22 @@ internal struct ParsedMessage
public ReducerEvent<Reducer>? reducerEvent;
}

private readonly BlockingCollection<UnparsedMessage> _parseQueue =
new(new ConcurrentQueue<UnparsedMessage>());
private readonly Channel<UnparsedMessage> _parseQueue =
Channel.CreateUnbounded<UnparsedMessage>(new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = false
});

private readonly BlockingCollection<ParsedMessage> _applyQueue =
new(new ConcurrentQueue<ParsedMessage>());
private readonly Channel<ParsedMessage> _applyQueue =
Channel.CreateUnbounded<ParsedMessage>(new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = false
});

internal static bool IsTesting;
internal bool HasMessageToApply => _applyQueue.Count > 0;
internal bool HasMessageToApply => _applyQueue.Reader.Count > 0;

private readonly CancellationTokenSource _parseCancellationTokenSource = new();
private CancellationToken _parseCancellationToken => _parseCancellationTokenSource.Token;
Expand Down Expand Up @@ -413,7 +420,7 @@ private static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList lis
#if UNITY_WEBGL && !UNITY_EDITOR
internal IEnumerator ParseMessages()
#else
internal void ParseMessages()
internal async Task ParseMessages()
#endif
{
while (!isClosing)
Expand All @@ -425,9 +432,9 @@ internal void ParseMessages()
#endif
try
{
var message = _parseQueue.Take(_parseCancellationToken);
var message = await _parseQueue.Reader.ReadAsync(_parseCancellationToken);
var parsedMessage = ParseMessage(message);
_applyQueue.Add(parsedMessage, _parseCancellationToken);
await _applyQueue.Writer.WriteAsync(parsedMessage, _parseCancellationToken);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -751,7 +758,7 @@ async Task Function()
}
_ = Function();
#else
});
}).ConfigureAwait(false);
#endif
}
}
Expand Down Expand Up @@ -939,7 +946,7 @@ private void ApplyMessage(ParsedMessage parsed)
// Note: this method is called from unit tests.
internal void OnMessageReceived(byte[] bytes, DateTime timestamp)
{
_parseQueue.Add(new UnparsedMessage { bytes = bytes, timestamp = timestamp, parseQueueTrackerId = stats.ParseMessageQueueTracker.StartTrackingRequest() });
_parseQueue.Writer.TryWrite(new UnparsedMessage { bytes = bytes, timestamp = timestamp, parseQueueTrackerId = stats.ParseMessageQueueTracker.StartTrackingRequest() });
}

// TODO: this should become [Obsolete] but for now is used by autogenerated code.
Expand Down Expand Up @@ -1071,7 +1078,7 @@ T[] LogAndThrow(string error)
public void FrameTick()
{
webSocket.Update();
while (_applyQueue.TryTake(out var parsedMessage))
while (_applyQueue.Reader.TryRead(out var parsedMessage))
{
ApplyMessage(parsedMessage);
}
Expand Down
17 changes: 11 additions & 6 deletions sdks/csharp/src/WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,24 @@ public async Task Connect(string? auth, string host, string nameOrAddress, Conne
{
uri += "&light=true";
}
var url = new Uri(uri);
Ws.Options.AddSubProtocol(_options.Protocol);

var source = new CancellationTokenSource(10000);
if (!string.IsNullOrEmpty(auth))
{
Ws.Options.SetRequestHeader("Authorization", $"Bearer {auth}");
}
else
{
Ws.Options.UseDefaultCredentials = true;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Create("Browser")))
{
// For WebAssembly, we need to pass the auth token as a query parameter
uri += $"&token={Uri.EscapeDataString(auth)}";
Copy link
Copy Markdown
Contributor

@jdetter jdetter Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally not considered best practice to send an auth token via a URL param. To get around this in our C# (WebGL) and Typescript SDKs we generate a short-lived token which you can generate via an API call. This short lived token is then used instead of your real token. You can see an implementation of this here: (C# SDK) https://github.com/clockworklabs/SpacetimeDB/pull/2988/files and then also here for the typescript SDK: https://github.com/clockworklabs/SpacetimeDB/blob/36f7ca2583a7491285460454839e41ff43ec27a7/sdks/typescript/packages/sdk/src/websocket_decompress_adapter.ts#L93C1-L110C6

I think let's wait for #2988 to land first and see if we can just use the same auth flow that we're using for WebGL?

}
else
{
Ws.Options.SetRequestHeader("Authorization", $"Bearer {auth}");
}
}

var url = new Uri(uri);

try
{
await Ws.ConnectAsync(url, source.Token);
Expand Down