-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathCompressionHelpers.cs
More file actions
148 lines (134 loc) · 7.04 KB
/
CompressionHelpers.cs
File metadata and controls
148 lines (134 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
using System;
using System.IO;
using System.IO.Compression;
using SpacetimeDB.ClientApi;
namespace SpacetimeDB
{
internal class CompressionHelpers
{
/// <summary>
/// Compression algorithms supported for data processing.
/// Used to specify the compression method for serializing and deserializing messages
/// between the client and SpacetimeDB server. The selected algorithm determines
/// how data such as query updates and server messages are compressed or decompressed.
/// </summary>
internal enum CompressionAlgos : byte
{
None = 0,
Brotli = 1,
Gzip = 2,
}
/// <summary>
/// Creates a <see cref="BrotliStream"/> for decompressing the provided stream.
/// </summary>
/// <param name="stream">The input stream containing Brotli-compressed data.</param>
/// <returns>A <see cref="BrotliStream"/> set to decompression mode.</returns>
internal static BrotliStream BrotliReader(Stream stream)
{
return new BrotliStream(stream, CompressionMode.Decompress);
}
/// <summary>
/// Creates a <see cref="GZipStream"/> for decompressing the provided stream.
/// </summary>
/// <param name="stream">The input stream containing GZip-compressed data.</param>
/// <returns>A <see cref="GZipStream"/> set to decompression mode.</returns>
internal static GZipStream GzipReader(Stream stream)
{
return new GZipStream(stream, CompressionMode.Decompress);
}
/// <summary>
/// Decompresses and decodes a serialized <see cref="ServerMessage"/> from a byte array,
/// automatically handling the specified compression algorithm (None, Brotli, or Gzip).
/// Ensures efficient decompression by reading the entire stream at once to avoid
/// performance issues with certain stream implementations.
/// Throws <see cref="InvalidOperationException"/> if an unknown compression type is encountered.
/// </summary>
/// <param name="bytes">The compressed and encoded server message as a byte array.</param>
/// <returns>The deserialized <see cref="ServerMessage"/> object.</returns>
internal static ServerMessage DecompressDecodeMessage(byte[] bytes)
{
using var stream = new MemoryStream(bytes);
// The stream will never be empty. It will at least contain the compression algo.
var compression = (CompressionAlgos)stream.ReadByte();
// Conditionally decompress and decode.
Stream decompressedStream = compression switch
{
CompressionAlgos.None => stream,
CompressionAlgos.Brotli => BrotliReader(stream),
CompressionAlgos.Gzip => GzipReader(stream),
_ => throw new InvalidOperationException("Unknown compression type"),
};
// TODO: consider pooling these.
// DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array
// PER BYTE READ. You have to do it all at once to avoid that problem.
MemoryStream memoryStream = new MemoryStream();
decompressedStream.CopyTo(memoryStream);
memoryStream.Seek(0, SeekOrigin.Begin);
return new ServerMessage.BSATN().Read(new BinaryReader(memoryStream));
}
/// <summary>
/// Decompresses and decodes a <see cref="CompressableQueryUpdate"/> into a <see cref="QueryUpdate"/> object,
/// automatically handling uncompressed, Brotli, or Gzip-encoded data. Ensures efficient decompression by
/// reading the entire stream at once to avoid performance issues with certain stream implementations.
/// Throws <see cref="InvalidOperationException"/> if the compression type is unrecognized.
/// </summary>
/// <param name="update">The compressed or uncompressed query update.</param>
/// <returns>The deserialized <see cref="QueryUpdate"/> object.</returns>
internal static QueryUpdate DecompressDecodeQueryUpdate(CompressableQueryUpdate update)
{
Stream decompressedStream;
switch (update)
{
case CompressableQueryUpdate.Uncompressed(var qu):
return qu;
case CompressableQueryUpdate.Brotli(var bytes):
decompressedStream = CompressionHelpers.BrotliReader(new MemoryStream(bytes.ToArray()));
break;
case CompressableQueryUpdate.Gzip(var bytes):
decompressedStream = CompressionHelpers.GzipReader(new MemoryStream(bytes.ToArray()));
break;
default:
throw new InvalidOperationException();
}
// TODO: consider pooling these.
// DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array
// PER BYTE READ. You have to do it all at once to avoid that problem.
MemoryStream memoryStream = new MemoryStream();
decompressedStream.CopyTo(memoryStream);
memoryStream.Seek(0, SeekOrigin.Begin);
return new QueryUpdate.BSATN().Read(new BinaryReader(memoryStream));
}
/// <summary>
/// Prepare to read a BsatnRowList.
///
/// This could return an IEnumerable, but we return the reader and row count directly to avoid an allocation.
/// It is legitimate to repeatedly call <c>IStructuralReadWrite.Read<T></c> <c>rowCount</c> times on the resulting
/// BinaryReader:
/// Our decoding infrastructure guarantees that reading a value consumes the correct number of bytes
/// from the BinaryReader. (This is easy because BSATN doesn't have padding.)
///
/// Previously here we were using LINQ to do what we're now doing with a custsom reader.
///
/// Why are we no longer using LINQ?
///
/// The calls in question, namely `Skip().Take()`, were fast under the Mono runtime,
/// but *much* slower when compiled AOT with IL2CPP.
/// Apparently Mono's JIT is smart enough to optimize away these LINQ ops,
/// resulting in a linear scan of the `BsatnRowList`.
/// Unfortunately IL2CPP could not, resulting in a quadratic scan.
/// See: https://github.com/clockworklabs/com.clockworklabs.spacetimedbsdk/pull/306
/// </summary>
/// <param name="list"></param>
/// <returns>A reader for the rows of the list and a count of rows.</returns>
internal static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList list) =>
(
new BinaryReader(new ListStream(list.RowsData)),
list.SizeHint switch
{
RowSizeHint.FixedSize(var size) => list.RowsData.Count / size,
RowSizeHint.RowOffsets(var offsets) => offsets.Count,
_ => throw new NotImplementedException()
}
);
}
}