Skip to content

Commit 03da956

Browse files
authored
Merge branch 'master' into kim/snapshot/fsync
2 parents a647a70 + f22f800 commit 03da956

13 files changed

Lines changed: 374 additions & 200 deletions

File tree

crates/bindings-typescript/src/lib/binary_writer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export default class BinaryWriter {
6363
return fromByteArray(this.getBuffer());
6464
}
6565

66-
getBuffer(): Uint8Array {
66+
getBuffer(): Uint8Array<ArrayBuffer> {
6767
return new Uint8Array(this.buffer.buffer, 0, this.offset);
6868
}
6969

crates/bindings-typescript/src/sdk/db_connection_builder.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
} from '../';
99
import { ensureMinimumVersionOrThrow } from './version';
1010
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter';
11+
import type { WebSocketFactory } from './ws';
1112

1213
/**
1314
* The database client connection to a SpacetimeDB server.
@@ -23,10 +24,10 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
2324
#identity?: Identity;
2425
#token?: string;
2526
#emitter: EventEmitter<ConnectionEvent> = new EventEmitter();
26-
#compression: 'gzip' | 'none' = 'gzip';
27+
#compression: 'gzip' | 'brotli' | 'none' = 'gzip';
2728
#lightMode: boolean = false;
2829
#confirmedReads?: boolean;
29-
#createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn;
30+
#createWSFn: WebSocketFactory;
3031

3132
/**
3233
* Creates a new `DbConnectionBuilder` database client and set the initial parameters.
@@ -42,7 +43,7 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
4243
config: DbConnectionConfig<RemoteModuleOf<DbConnection>>
4344
) => DbConnection
4445
) {
45-
this.#createWSFn = WebsocketDecompressAdapter.createWebSocketFn;
46+
this.#createWSFn = WebsocketDecompressAdapter.openWebSocket;
4647
}
4748

4849
/**
@@ -82,9 +83,7 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
8283
return this;
8384
}
8485

85-
withWSFn(
86-
createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn
87-
): this {
86+
withWSFn(createWSFn: WebSocketFactory): this {
8887
this.#createWSFn = createWSFn;
8988
return this;
9089
}
@@ -94,7 +93,17 @@ export class DbConnectionBuilder<DbConnection extends DbConnectionImpl<any>> {
9493
*
9594
* @param compression The compression algorithm to use for the connection.
9695
*/
97-
withCompression(compression: 'gzip' | 'none'): this {
96+
withCompression(compression: 'gzip' | 'brotli' | 'none'): this {
97+
if (compression === 'brotli') {
98+
try {
99+
new DecompressionStream('brotli' as CompressionFormat);
100+
} catch (e) {
101+
throw new TypeError(
102+
`Brotli compression is not supported by the runtime. Please choose a different compression method.`,
103+
{ cause: e }
104+
);
105+
}
106+
}
98107
this.#compression = compression;
99108
return this;
100109
}

crates/bindings-typescript/src/sdk/db_connection_impl.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,6 @@ import {
3737
type PendingCallback,
3838
type TableUpdate as CacheTableUpdate,
3939
} from './table_cache.ts';
40-
import {
41-
WebsocketDecompressAdapter,
42-
type WebsocketAdapter,
43-
} from './websocket_decompress_adapter.ts';
4440
import {
4541
SubscriptionBuilderImpl,
4642
SubscriptionHandleImpl,
@@ -60,6 +56,7 @@ import type { ProceduresView } from './procedures.ts';
6056
import type { Values } from '../lib/type_util.ts';
6157
import type { TransactionUpdate } from './client_api/types.ts';
6258
import { InternalError, SenderError } from '../lib/errors.ts';
59+
import type { WebSocketAdapter, WebSocketFactory } from './ws.ts';
6360
import {
6461
normalizeWsProtocol,
6562
PREFERRED_WS_PROTOCOLS,
@@ -101,8 +98,8 @@ export type DbConnectionConfig<RemoteModule extends UntypedRemoteModule> = {
10198
identity?: Identity;
10299
token?: string;
103100
emitter: EventEmitter<ConnectionEvent>;
104-
createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn;
105-
compression: 'gzip' | 'none';
101+
createWSFn: WebSocketFactory;
102+
compression: 'gzip' | 'brotli' | 'none';
106103
lightMode: boolean;
107104
confirmedReads?: boolean;
108105
remoteModule: RemoteModule;
@@ -186,7 +183,7 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
186183
#inboundQueue: Uint8Array[] = [];
187184
#inboundQueueOffset = 0;
188185
#isDrainingInboundQueue = false;
189-
#outboundQueue: Uint8Array[] = [];
186+
#outboundQueue: Uint8Array<ArrayBuffer>[] = [];
190187
#isOutboundFlushScheduled = false;
191188
#negotiatedWsProtocol: NegotiatedWsProtocol = V2_WS_PROTOCOL;
192189
#subscriptionManager = new SubscriptionManager<RemoteModule>();
@@ -224,8 +221,8 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
224221
// private fields.
225222
// We use them in testing.
226223
private clientCache: ClientCache<RemoteModule>;
227-
private ws?: WebsocketAdapter;
228-
private wsPromise: Promise<WebsocketAdapter | undefined>;
224+
private ws?: WebSocketAdapter;
225+
private wsPromise: Promise<WebSocketAdapter | undefined>;
229226

230227
constructor({
231228
uri,
@@ -612,22 +609,22 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
612609
return this.#mergeTableUpdates(updates);
613610
}
614611

615-
#flushOutboundQueue(wsResolved: WebsocketAdapter): void {
612+
#flushOutboundQueue(wsResolved: WebSocketAdapter): void {
616613
if (this.#negotiatedWsProtocol === V3_WS_PROTOCOL) {
617614
this.#flushOutboundQueueV3(wsResolved);
618615
return;
619616
}
620617
this.#flushOutboundQueueV2(wsResolved);
621618
}
622619

623-
#flushOutboundQueueV2(wsResolved: WebsocketAdapter): void {
620+
#flushOutboundQueueV2(wsResolved: WebSocketAdapter): void {
624621
const pending = this.#outboundQueue.splice(0);
625622
for (const message of pending) {
626623
wsResolved.send(message);
627624
}
628625
}
629626

630-
#flushOutboundQueueV3(wsResolved: WebsocketAdapter): void {
627+
#flushOutboundQueueV3(wsResolved: WebSocketAdapter): void {
631628
if (this.#outboundQueue.length === 0) {
632629
return;
633630
}
@@ -692,7 +689,10 @@ export class DbConnectionImpl<RemoteModule extends UntypedRemoteModule>
692689

693690
#reducerArgsEncoder = new BinaryWriter(1024);
694691
#clientMessageEncoder = new BinaryWriter(1024);
695-
#sendEncodedMessage(encoded: Uint8Array, describe: () => string): void {
692+
#sendEncodedMessage(
693+
encoded: Uint8Array<ArrayBuffer>,
694+
describe: () => string
695+
): void {
696696
stdbLogger('trace', describe);
697697
if (this.ws && this.isActive) {
698698
if (this.#negotiatedWsProtocol === V2_WS_PROTOCOL) {
Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
export async function decompress(
2-
buffer: Uint8Array,
3-
// Leaving it here to expand to brotli when it lands in the browsers and NodeJS
4-
type: 'gzip',
2+
buffer: Uint8Array<ArrayBuffer>,
3+
type: CompressionFormat,
54
chunkSize: number = 128 * 1024 // 128KB
65
): Promise<Uint8Array> {
76
// Create a single ReadableStream to handle chunks
87
let offset = 0;
9-
const readableStream = new ReadableStream({
8+
const readableStream = new ReadableStream<BufferSource>({
109
pull(controller) {
1110
if (offset < buffer.length) {
1211
// Slice a chunk of the buffer and enqueue it
@@ -29,24 +28,9 @@ export async function decompress(
2928
const decompressedStream = readableStream.pipeThrough(decompressionStream);
3029

3130
// Collect the decompressed chunks efficiently
32-
const reader = decompressedStream.getReader();
33-
const chunks: Uint8Array[] = [];
34-
let totalLength = 0;
35-
let result: any;
36-
37-
while (!(result = await reader.read()).done) {
38-
chunks.push(result.value);
39-
totalLength += result.value.length;
31+
const chunks = [];
32+
for await (const chunk of decompressedStream) {
33+
chunks.push(chunk);
4034
}
41-
42-
// Allocate a single Uint8Array for the decompressed data
43-
const decompressedArray = new Uint8Array(totalLength);
44-
let chunkOffset = 0;
45-
46-
for (const chunk of chunks) {
47-
decompressedArray.set(chunk, chunkOffset);
48-
chunkOffset += chunk.length;
49-
}
50-
51-
return decompressedArray;
35+
return new Blob(chunks).bytes();
5236
}
Lines changed: 12 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,10 @@
11
import { decompress } from './decompress';
2-
import { resolveWS } from './ws';
2+
import { openWebSocket, type WebSocketAdapter, type WebSocketArgs } from './ws';
33

4-
export interface WebsocketAdapter {
5-
readonly protocol: string;
6-
send(msg: Uint8Array): void;
7-
close(): void;
8-
9-
set onclose(handler: (ev: CloseEvent) => void);
10-
set onopen(handler: () => void);
11-
set onmessage(handler: (msg: { data: Uint8Array }) => void);
12-
set onerror(handler: (msg: ErrorEvent) => void);
13-
}
14-
15-
export class WebsocketDecompressAdapter implements WebsocketAdapter {
4+
export class WebsocketDecompressAdapter implements WebSocketAdapter {
165
get protocol(): string {
176
return this.#ws.protocol;
187
}
19-
208
set onclose(handler: (ev: CloseEvent) => void) {
219
this.#ws.onclose = handler;
2210
}
@@ -35,16 +23,17 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
3523

3624
#ws: WebSocket;
3725

38-
async #decompress(buffer: Uint8Array): Promise<Uint8Array> {
26+
async #decompress(buffer: Uint8Array<ArrayBuffer>): Promise<Uint8Array> {
3927
const tag = buffer[0];
4028
const data = buffer.subarray(1);
4129
switch (tag) {
4230
case 0:
4331
return data;
4432
case 1:
45-
throw new Error(
46-
'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.'
47-
);
33+
// Some runtimes support brotli, but it's not yet defined in `lib.dom.d.ts`.
34+
// We assert runtime support in `DbConnectionBuilder.withCompression`, so
35+
// this cast is safe.
36+
return await decompress(data, 'brotli' as CompressionFormat);
4837
case 2:
4938
return await decompress(data, 'gzip');
5039
default:
@@ -54,7 +43,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
5443
}
5544
}
5645

57-
send(msg: Uint8Array): void {
46+
send(msg: Uint8Array<ArrayBuffer>): void {
5847
this.#ws.send(msg);
5948
}
6049

@@ -63,68 +52,12 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter {
6352
}
6453

6554
constructor(ws: WebSocket) {
66-
ws.binaryType = 'arraybuffer';
67-
6855
this.#ws = ws;
6956
}
7057

71-
static async createWebSocketFn({
72-
url,
73-
nameOrAddress,
74-
wsProtocol,
75-
authToken,
76-
compression,
77-
lightMode,
78-
confirmedReads,
79-
}: {
80-
url: URL;
81-
wsProtocol: string | string[];
82-
nameOrAddress: string;
83-
authToken?: string;
84-
compression: 'gzip' | 'none';
85-
lightMode: boolean;
86-
confirmedReads?: boolean;
87-
}): Promise<WebsocketDecompressAdapter> {
88-
const headers = new Headers();
89-
90-
const WS = await resolveWS();
91-
92-
// We swap our original token to a shorter-lived token
93-
// to avoid sending the original via query params.
94-
let temporaryAuthToken: string | undefined = undefined;
95-
if (authToken) {
96-
headers.set('Authorization', `Bearer ${authToken}`);
97-
const tokenUrl = new URL('v1/identity/websocket-token', url);
98-
tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:';
99-
100-
const response = await fetch(tokenUrl, { method: 'POST', headers });
101-
if (response.ok) {
102-
const { token } = await response.json();
103-
temporaryAuthToken = token;
104-
} else {
105-
return Promise.reject(
106-
new Error(`Failed to verify token: ${response.statusText}`)
107-
);
108-
}
109-
}
110-
111-
const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url);
112-
if (temporaryAuthToken) {
113-
databaseUrl.searchParams.set('token', temporaryAuthToken);
114-
}
115-
databaseUrl.searchParams.set(
116-
'compression',
117-
compression === 'gzip' ? 'Gzip' : 'None'
118-
);
119-
if (lightMode) {
120-
databaseUrl.searchParams.set('light', 'true');
121-
}
122-
if (confirmedReads !== undefined) {
123-
databaseUrl.searchParams.set('confirmed', confirmedReads.toString());
124-
}
125-
126-
const ws = new WS(databaseUrl.toString(), wsProtocol);
127-
128-
return new WebsocketDecompressAdapter(ws);
58+
static async openWebSocket(
59+
args: WebSocketArgs
60+
): Promise<WebsocketDecompressAdapter> {
61+
return new this(await openWebSocket(args));
12962
}
13063
}

crates/bindings-typescript/src/sdk/websocket_test_adapter.ts

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import BinaryReader from '../lib/binary_reader.ts';
22
import BinaryWriter from '../lib/binary_writer.ts';
33
import { ClientMessage, ServerMessage } from './client_api/types';
4-
import type { WebsocketAdapter } from './websocket_decompress_adapter';
4+
import type { WebSocketAdapter, WebSocketFactory } from './ws';
55
import { PREFERRED_WS_PROTOCOLS, V3_WS_PROTOCOL } from './websocket_protocols';
66
import {
77
decodeClientMessagesV3,
88
encodeServerMessagesV3,
99
} from './websocket_v3_frames.ts';
1010

11-
class WebsocketTestAdapter implements WebsocketAdapter {
11+
class WebsocketTestAdapter implements WebSocketAdapter {
1212
protocol: string = '';
1313

14-
messageQueue: Uint8Array[];
14+
messageQueue: Uint8Array<ArrayBuffer>[];
1515
outgoingMessages: ClientMessage[];
1616
closed: boolean;
1717
supportedProtocols: string[];
@@ -41,7 +41,7 @@ class WebsocketTestAdapter implements WebsocketAdapter {
4141

4242
set onerror(_handler: (msg: ErrorEvent) => void) {}
4343

44-
send(message: Uint8Array): void {
44+
send(message: Uint8Array<ArrayBuffer>): void {
4545
const rawMessage = message.slice();
4646
const outgoingMessages =
4747
this.protocol === V3_WS_PROTOCOL
@@ -85,28 +85,16 @@ class WebsocketTestAdapter implements WebsocketAdapter {
8585
this.#onmessage({ data: outboundData });
8686
}
8787

88-
async createWebSocketFn(_args: {
89-
url: URL;
90-
wsProtocol: string | string[];
91-
nameOrAddress: string;
92-
authToken?: string;
93-
compression: 'gzip' | 'none';
94-
lightMode: boolean;
95-
confirmedReads?: boolean;
96-
}): Promise<WebsocketTestAdapter> {
97-
const requestedProtocols = Array.isArray(_args.wsProtocol)
98-
? _args.wsProtocol
99-
: [_args.wsProtocol];
100-
const negotiatedProtocol = requestedProtocols.find(protocol =>
88+
openWebSocket: WebSocketFactory = async ({ wsProtocol }) => {
89+
const negotiatedProtocol = wsProtocol.find(protocol =>
10190
this.supportedProtocols.includes(protocol)
10291
);
10392
if (!negotiatedProtocol) {
104-
return Promise.reject(new Error('No compatible websocket protocol'));
93+
throw new Error('No compatible websocket protocol');
10594
}
10695
this.protocol = negotiatedProtocol;
10796
return this;
108-
}
97+
};
10998
}
11099

111-
export type { WebsocketTestAdapter };
112100
export default WebsocketTestAdapter;

0 commit comments

Comments
 (0)