diff --git a/sdks/csharp/src/CompressionHelpers.cs b/sdks/csharp/src/CompressionHelpers.cs new file mode 100644 index 00000000000..56e35124dce --- /dev/null +++ b/sdks/csharp/src/CompressionHelpers.cs @@ -0,0 +1,148 @@ +using System; +using System.IO; +using System.IO.Compression; +using SpacetimeDB.ClientApi; + +namespace SpacetimeDB +{ + internal class CompressionHelpers + { + /// + /// Compression algorithms supported for data processing. + /// Used to specify the compression method for serializing and deserializing messages + /// between the client and SpacetimeDB server. The selected algorithm determines + /// how data such as query updates and server messages are compressed or decompressed. + /// + internal enum CompressionAlgos : byte + { + None = 0, + Brotli = 1, + Gzip = 2, + } + + /// + /// Creates a for decompressing the provided stream. + /// + /// The input stream containing Brotli-compressed data. + /// A set to decompression mode. + internal static BrotliStream BrotliReader(Stream stream) + { + return new BrotliStream(stream, CompressionMode.Decompress); + } + + /// + /// Creates a for decompressing the provided stream. + /// + /// The input stream containing GZip-compressed data. + /// A set to decompression mode. + internal static GZipStream GzipReader(Stream stream) + { + return new GZipStream(stream, CompressionMode.Decompress); + } + + /// + /// Decompresses and decodes a serialized from a byte array, + /// automatically handling the specified compression algorithm (None, Brotli, or Gzip). + /// Ensures efficient decompression by reading the entire stream at once to avoid + /// performance issues with certain stream implementations. + /// Throws if an unknown compression type is encountered. + /// + /// The compressed and encoded server message as a byte array. + /// The deserialized object. + internal static ServerMessage DecompressDecodeMessage(byte[] bytes) + { + using var stream = new MemoryStream(bytes); + + // The stream will never be empty. It will at least contain the compression algo. + var compression = (CompressionAlgos)stream.ReadByte(); + // Conditionally decompress and decode. + Stream decompressedStream = compression switch + { + CompressionAlgos.None => stream, + CompressionAlgos.Brotli => BrotliReader(stream), + CompressionAlgos.Gzip => GzipReader(stream), + _ => throw new InvalidOperationException("Unknown compression type"), + }; + + // TODO: consider pooling these. + // DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array + // PER BYTE READ. You have to do it all at once to avoid that problem. + MemoryStream memoryStream = new MemoryStream(); + decompressedStream.CopyTo(memoryStream); + memoryStream.Seek(0, SeekOrigin.Begin); + return new ServerMessage.BSATN().Read(new BinaryReader(memoryStream)); + } + + + /// + /// Decompresses and decodes a into a object, + /// automatically handling uncompressed, Brotli, or Gzip-encoded data. Ensures efficient decompression by + /// reading the entire stream at once to avoid performance issues with certain stream implementations. + /// Throws if the compression type is unrecognized. + /// + /// The compressed or uncompressed query update. + /// The deserialized object. + internal static QueryUpdate DecompressDecodeQueryUpdate(CompressableQueryUpdate update) + { + Stream decompressedStream; + + switch (update) + { + case CompressableQueryUpdate.Uncompressed(var qu): + return qu; + + case CompressableQueryUpdate.Brotli(var bytes): + decompressedStream = CompressionHelpers.BrotliReader(new MemoryStream(bytes.ToArray())); + break; + + case CompressableQueryUpdate.Gzip(var bytes): + decompressedStream = CompressionHelpers.GzipReader(new MemoryStream(bytes.ToArray())); + break; + + default: + throw new InvalidOperationException(); + } + + // TODO: consider pooling these. + // DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array + // PER BYTE READ. You have to do it all at once to avoid that problem. + MemoryStream memoryStream = new MemoryStream(); + decompressedStream.CopyTo(memoryStream); + memoryStream.Seek(0, SeekOrigin.Begin); + return new QueryUpdate.BSATN().Read(new BinaryReader(memoryStream)); + } + + /// + /// Prepare to read a BsatnRowList. + /// + /// This could return an IEnumerable, but we return the reader and row count directly to avoid an allocation. + /// It is legitimate to repeatedly call IStructuralReadWrite.Read rowCount times on the resulting + /// BinaryReader: + /// Our decoding infrastructure guarantees that reading a value consumes the correct number of bytes + /// from the BinaryReader. (This is easy because BSATN doesn't have padding.) + /// + /// Previously here we were using LINQ to do what we're now doing with a custsom reader. + /// + /// Why are we no longer using LINQ? + /// + /// The calls in question, namely `Skip().Take()`, were fast under the Mono runtime, + /// but *much* slower when compiled AOT with IL2CPP. + /// Apparently Mono's JIT is smart enough to optimize away these LINQ ops, + /// resulting in a linear scan of the `BsatnRowList`. + /// Unfortunately IL2CPP could not, resulting in a quadratic scan. + /// See: https://github.com/clockworklabs/com.clockworklabs.spacetimedbsdk/pull/306 + /// + /// + /// A reader for the rows of the list and a count of rows. + internal static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList list) => + ( + new BinaryReader(new ListStream(list.RowsData)), + list.SizeHint switch + { + RowSizeHint.FixedSize(var size) => list.RowsData.Count / size, + RowSizeHint.RowOffsets(var offsets) => offsets.Count, + _ => throw new NotImplementedException() + } + ); + } +} \ No newline at end of file diff --git a/sdks/csharp/src/CompressionHelpers.cs.meta b/sdks/csharp/src/CompressionHelpers.cs.meta new file mode 100644 index 00000000000..7d6340607aa --- /dev/null +++ b/sdks/csharp/src/CompressionHelpers.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 17bd26235850b7b4c9ab624df17d4dd2 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/sdks/csharp/src/SpacetimeDBClient.cs b/sdks/csharp/src/SpacetimeDBClient.cs index 1f2785b4124..1dac02ccf60 100644 --- a/sdks/csharp/src/SpacetimeDBClient.cs +++ b/sdks/csharp/src/SpacetimeDBClient.cs @@ -2,17 +2,12 @@ using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; -using System.IO; -using System.IO.Compression; using System.Linq; using System.Threading; using System.Threading.Tasks; using SpacetimeDB.BSATN; -using SpacetimeDB.Internal; using SpacetimeDB.ClientApi; using Thread = System.Threading.Thread; -using System.Diagnostics; - namespace SpacetimeDB { @@ -221,33 +216,6 @@ internal struct UnparsedMessage public uint parseQueueTrackerId; } - internal struct ParsedDatabaseUpdate - { - // Map: table handles -> (primary key -> IStructuralReadWrite). - // If a particular table has no primary key, the "primary key" is just the row itself. - // This is valid because any [SpacetimeDB.Type] automatically has a correct Equals and HashSet implementation. - public Dictionary> Updates; - - // Can't override the default constructor. Make sure you use this one! - public static ParsedDatabaseUpdate New() - { - ParsedDatabaseUpdate result; - result.Updates = new(); - return result; - } - - public MultiDictionaryDelta DeltaForTable(IRemoteTableHandle table) - { - if (!Updates.TryGetValue(table, out var delta)) - { - delta = new MultiDictionaryDelta(EqualityComparer.Default, EqualityComparer.Default); - Updates[table] = delta; - } - - return delta; - } - } - internal struct ParsedMessage { public ServerMessage message; @@ -269,136 +237,9 @@ internal struct ParsedMessage private readonly CancellationTokenSource _parseCancellationTokenSource = new(); private CancellationToken _parseCancellationToken => _parseCancellationTokenSource.Token; - /// - /// Decode a row for a table, producing a primary key. - /// If the table has a specific column marked `#[primary_key]`, use that. - /// If not, the BSATN for the entire row is used instead. - /// - /// - /// - /// - /// - internal static IStructuralReadWrite Decode(IRemoteTableHandle table, BinaryReader reader, out object primaryKey) - { - var obj = table.DecodeValue(reader); - - // TODO(1.1): we should exhaustively check that GenericEqualityComparer works - // for all types that are allowed to be primary keys. - var primaryKey_ = table.GetPrimaryKey(obj); - primaryKey_ ??= obj; - primaryKey = primaryKey_; - - return obj; - } - private static readonly Status Committed = new Status.Committed(default); private static readonly Status OutOfEnergy = new Status.OutOfEnergy(default); - internal enum CompressionAlgos : byte - { - None = 0, - Brotli = 1, - Gzip = 2, - } - - private static BrotliStream BrotliReader(Stream stream) - { - return new BrotliStream(stream, CompressionMode.Decompress); - } - - private static GZipStream GzipReader(Stream stream) - { - return new GZipStream(stream, CompressionMode.Decompress); - } - - private static ServerMessage DecompressDecodeMessage(byte[] bytes) - { - using var stream = new MemoryStream(bytes); - - // The stream will never be empty. It will at least contain the compression algo. - var compression = (CompressionAlgos)stream.ReadByte(); - // Conditionally decompress and decode. - Stream decompressedStream = compression switch - { - CompressionAlgos.None => stream, - CompressionAlgos.Brotli => BrotliReader(stream), - CompressionAlgos.Gzip => GzipReader(stream), - _ => throw new InvalidOperationException("Unknown compression type"), - }; - - // TODO: consider pooling these. - // DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array - // PER BYTE READ. You have to do it all at once to avoid that problem. - MemoryStream memoryStream = new MemoryStream(); - decompressedStream.CopyTo(memoryStream); - memoryStream.Seek(0, SeekOrigin.Begin); - return new ServerMessage.BSATN().Read(new BinaryReader(memoryStream)); - } - - private static QueryUpdate DecompressDecodeQueryUpdate(CompressableQueryUpdate update) - { - Stream decompressedStream; - - switch (update) - { - case CompressableQueryUpdate.Uncompressed(var qu): - return qu; - - case CompressableQueryUpdate.Brotli(var bytes): - decompressedStream = BrotliReader(new MemoryStream(bytes.ToArray())); - break; - - case CompressableQueryUpdate.Gzip(var bytes): - decompressedStream = GzipReader(new MemoryStream(bytes.ToArray())); - break; - - default: - throw new InvalidOperationException(); - } - - // TODO: consider pooling these. - // DO NOT TRY TO TAKE THIS OUT. The BrotliStream ReadByte() implementation allocates an array - // PER BYTE READ. You have to do it all at once to avoid that problem. - MemoryStream memoryStream = new MemoryStream(); - decompressedStream.CopyTo(memoryStream); - memoryStream.Seek(0, SeekOrigin.Begin); - return new QueryUpdate.BSATN().Read(new BinaryReader(memoryStream)); - } - - - /// - /// Prepare to read a BsatnRowList. - /// - /// This could return an IEnumerable, but we return the reader and row count directly to avoid an allocation. - /// It is legitimate to repeatedly call IStructuralReadWrite.Read rowCount times on the resulting - /// BinaryReader: - /// Our decoding infrastructure guarantees that reading a value consumes the correct number of bytes - /// from the BinaryReader. (This is easy because BSATN doesn't have padding.) - /// - /// Previously here we were using LINQ to do what we're now doing with a custsom reader. - /// - /// Why are we no longer using LINQ? - /// - /// The calls in question, namely `Skip().Take()`, were fast under the Mono runtime, - /// but *much* slower when compiled AOT with IL2CPP. - /// Apparently Mono's JIT is smart enough to optimize away these LINQ ops, - /// resulting in a linear scan of the `BsatnRowList`. - /// Unfortunately IL2CPP could not, resulting in a quadratic scan. - /// See: https://github.com/clockworklabs/com.clockworklabs.spacetimedbsdk/pull/306 - /// - /// - /// A reader for the rows of the list and a count of rows. - private static (BinaryReader reader, int rowCount) ParseRowList(BsatnRowList list) => - ( - new BinaryReader(new ListStream(list.RowsData)), - list.SizeHint switch - { - RowSizeHint.FixedSize(var size) => list.RowsData.Count / size, - RowSizeHint.RowOffsets(var offsets) => offsets.Count, - _ => throw new NotImplementedException() - } - ); - /// /// Get a description of a message suitable for storing in the tracker metadata. /// @@ -463,7 +304,7 @@ ParsedDatabaseUpdate ParseLegacySubscription(InitialSubscription initSub) // First apply all of the state foreach (var (table, update) in GetTables(initSub.DatabaseUpdate)) { - ParseInsertOnlyTable(table, update, dbOps); + table.ParseInsertOnly(update, dbOps); } return dbOps; } @@ -477,85 +318,18 @@ ParsedDatabaseUpdate ParseSubscribeMultiApplied(SubscribeMultiApplied subscribeM var dbOps = ParsedDatabaseUpdate.New(); foreach (var (table, update) in GetTables(subscribeMultiApplied.Update)) { - ParseInsertOnlyTable(table, update, dbOps); + table.ParseInsertOnly(update, dbOps); } return dbOps; } - void ParseInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, ParsedDatabaseUpdate dbOps) - { - var delta = dbOps.DeltaForTable(table); - - foreach (var cqu in update.Updates) - { - var qu = DecompressDecodeQueryUpdate(cqu); - if (qu.Deletes.RowsData.Count > 0) - { - Log.Warn("Non-insert during an insert-only server message!"); - } - var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); - for (var i = 0; i < insertRowCount; i++) - { - var obj = Decode(table, insertReader, out var pk); - delta.Add(pk, obj); - } - } - } - - void ParseDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, ParsedDatabaseUpdate dbOps) - { - var delta = dbOps.DeltaForTable(table); - foreach (var cqu in update.Updates) - { - var qu = DecompressDecodeQueryUpdate(cqu); - if (qu.Inserts.RowsData.Count > 0) - { - Log.Warn("Non-delete during a delete-only operation!"); - } - - var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); - for (var i = 0; i < deleteRowCount; i++) - { - var obj = Decode(table, deleteReader, out var pk); - delta.Remove(pk, obj); - } - } - } - - void ParseTable(IRemoteTableHandle table, TableUpdate update, ParsedDatabaseUpdate dbOps) - { - var delta = dbOps.DeltaForTable(table); - foreach (var cqu in update.Updates) - { - var qu = DecompressDecodeQueryUpdate(cqu); - - // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once - // to the table, it doesn't matter that we call Add before Remove here. - - var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); - for (var i = 0; i < insertRowCount; i++) - { - var obj = Decode(table, insertReader, out var pk); - delta.Add(pk, obj); - } - - var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); - for (var i = 0; i < deleteRowCount; i++) - { - var obj = Decode(table, deleteReader, out var pk); - delta.Remove(pk, obj); - } - } - - } - ParsedDatabaseUpdate ParseUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied) { var dbOps = ParsedDatabaseUpdate.New(); foreach (var (table, update) in GetTables(unsubMultiApplied.Update)) { - ParseDeleteOnlyTable(table, update, dbOps); + table.ParseDeleteOnly(update, dbOps); } return dbOps; @@ -567,7 +341,7 @@ ParsedDatabaseUpdate ParseDatabaseUpdate(DatabaseUpdate updates) foreach (var (table, update) in GetTables(updates)) { - ParseTable(table, update, dbOps); + table.Parse(update, dbOps); } return dbOps; } @@ -589,7 +363,7 @@ void ParseOneOffQuery(OneOffQueryResponse resp) ParsedMessage ParseMessage(UnparsedMessage unparsed) { var dbOps = ParsedDatabaseUpdate.New(); - var message = DecompressDecodeMessage(unparsed.bytes); + var message = CompressionHelpers.DecompressDecodeMessage(unparsed.bytes); var trackerMetadata = TrackerMetadataForMessage(message); stats.ParseMessageQueueTracker.FinishTrackingRequest(unparsed.parseQueueTrackerId, trackerMetadata); @@ -1057,7 +831,7 @@ T[] LogAndThrow(string error) return LogAndThrow($"Mismatched result type, expected {typeof(T)} but got {resultTable.TableName}"); } - var (resultReader, resultCount) = ParseRowList(resultTable.Rows); + var (resultReader, resultCount) = CompressionHelpers.ParseRowList(resultTable.Rows); var output = new T[resultCount]; for (int i = 0; i < resultCount; i++) { @@ -1101,6 +875,46 @@ void IDbConnection.Unsubscribe(QueryId queryId) void IDbConnection.AddOnDisconnect(WebSocket.CloseEventHandler cb) => webSocket.OnClose += cb; } + /// + /// Represents the result of parsing a database update message from SpacetimeDB. + /// Contains updates for all tables affected by the update, with each entry mapping a table handle + /// to its respective set of row changes (by primary key or row instance). + /// + /// Note: Due to C#'s struct constructor limitations, you must use + /// to create new instances. + /// Do not use the default constructor, as it will not initialize the Updates dictionary. + /// + internal struct ParsedDatabaseUpdate + { + // Map: table handles -> (primary key -> IStructuralReadWrite). + // If a particular table has no primary key, the "primary key" is just the row itself. + // This is valid because any [SpacetimeDB.Type] automatically has a correct Equals and HashSet implementation. + public Dictionary Updates; + + // Can't override the default constructor. Make sure you use this one! + public static ParsedDatabaseUpdate New() + { + ParsedDatabaseUpdate result; + result.Updates = new(); + return result; + } + + /// + /// Returns the for the specified table. + /// If no update exists for the table, a new one is allocated and added to the Updates dictionary. + /// + public IParsedTableUpdate UpdateForTable(IRemoteTableHandle table) + { + if (!Updates.TryGetValue(table, out var delta)) + { + delta = table.MakeParsedTableUpdate(); + Updates[table] = delta; + } + + return delta; + } + } + internal struct UintAllocator { private uint lastAllocated; diff --git a/sdks/csharp/src/Table.cs b/sdks/csharp/src/Table.cs index ed5203d0422..36219012e97 100644 --- a/sdks/csharp/src/Table.cs +++ b/sdks/csharp/src/Table.cs @@ -1,9 +1,8 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; -using System.Runtime.CompilerServices; using System.Threading.Tasks; using SpacetimeDB.BSATN; using SpacetimeDB.ClientApi; @@ -28,22 +27,52 @@ public interface IRemoteTableHandle internal string RemoteTableName { get; } internal Type ClientTableType { get; } - internal IStructuralReadWrite DecodeValue(BinaryReader reader); + + /// + /// Creates and returns a parsed table update for the current table. + /// Note: The returned is type-erased because is also type-erased. + /// To use the parsed update, you must downcast it to its concrete type. + /// + /// An representing the parsed update. + internal IParsedTableUpdate MakeParsedTableUpdate(); + + /// + /// Parses an insert-only table update and applies the results to the specified parsed database update. + /// + /// The table update containing insert operations. + /// The parsed database update to apply changes to. + internal void ParseInsertOnly(TableUpdate update, ParsedDatabaseUpdate dbOps); + + /// + /// Parses a delete-only table update and applies the results to the specified parsed database update. + /// + /// The table update containing delete operations. + /// The parsed database update to apply changes to. + internal void ParseDeleteOnly(TableUpdate update, ParsedDatabaseUpdate dbOps); + + /// + /// Parses a general table update (insert, delete) and applies the results to the specified parsed database update. + /// + /// The table update containing operations. + /// The parsed database update to apply changes to. + internal void Parse(TableUpdate update, ParsedDatabaseUpdate dbOps); /// /// Start applying a delta to the table. /// This is called for all tables before any updates are actually applied, allowing OnBeforeDelete to be invoked correctly. /// - /// - internal void PreApply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta); + /// + /// + internal void PreApply(IEventContext context, IParsedTableUpdate parsedTableUpdate); /// /// Apply a delta to the table. /// Should not invoke any user callbacks, since not all tables have been updated yet. /// Should fix up indices, to be ready for PostApply. /// - /// - internal void Apply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta); + /// + /// + internal void Apply(IEventContext context, IParsedTableUpdate parsedTableUpdate); /// /// Finish applying a delta to a table. @@ -52,6 +81,9 @@ public interface IRemoteTableHandle internal void PostApply(IEventContext context); } + interface IParsedTableUpdate + { + } /// /// Base class for views of remote tables. @@ -122,6 +154,18 @@ public IEnumerable Filter(Column value) => cache.TryGetValue(value, out var rows) ? rows : Enumerable.Empty(); } + /// + /// Represents a parsed update to a table, storing the changes as a multi-dictionary delta + /// mapping primary keys to their corresponding row updates. + /// + internal class ParsedTableUpdate : IParsedTableUpdate + { + /// + /// Stores the set of changes for the table, mapping primary keys to updated rows. + /// + internal MultiDictionaryDelta Delta = new(EqualityComparer.Default, EqualityComparer.Default); + } + protected abstract string RemoteTableName { get; } string IRemoteTableHandle.RemoteTableName => RemoteTableName; @@ -159,7 +203,7 @@ private event Action OnInternalDelete // - Primary keys, if we have them. // - The entire row itself, if we don't. // But really, the keys are whatever SpacetimeDBClient chooses to give us. - private readonly MultiDictionary Entries = new(EqualityComparer.Default, EqualityComparer.Default); + private readonly MultiDictionary Entries = new(EqualityComparer.Default, EqualityComparer.Default); private static IReadWrite? _serializer; @@ -185,7 +229,120 @@ private static IReadWrite Serializer } // The function to use for decoding a type value. - IStructuralReadWrite IRemoteTableHandle.DecodeValue(BinaryReader reader) => Serializer.Read(reader); + Row DecodeValue(BinaryReader reader) => Serializer.Read(reader); + + /// + /// Decode a row for a table, producing a primary key. + /// If the table has a specific column marked `#[primary_key]`, use that. + /// If not, the BSATN for the entire row is used instead. + /// + /// + /// + /// + /// + public Row Decode(BinaryReader reader, out object primaryKey) + { + var obj = DecodeValue(reader); + + // TODO(1.1): we should exhaustively check that GenericEqualityComparer works + // for all types that are allowed to be primary keys. + var primaryKey_ = GetPrimaryKey(obj); + primaryKey_ ??= obj; + primaryKey = primaryKey_; + + return obj; + } + + /// + /// Creates and returns a parsed table update for the current table. + /// + /// An representing the parsed update. + IParsedTableUpdate IRemoteTableHandle.MakeParsedTableUpdate() + { + return new ParsedTableUpdate(); + } + + /// + /// Parses an insert-only table update and applies the results to the specified parsed database update. + /// + /// The table update containing insert operations. + /// The parsed database update to apply changes to. + void IRemoteTableHandle.ParseInsertOnly(TableUpdate update, ParsedDatabaseUpdate dbOps) + { + var delta = (ParsedTableUpdate)dbOps.UpdateForTable(this); + + foreach (var cqu in update.Updates) + { + var qu = CompressionHelpers.DecompressDecodeQueryUpdate(cqu); + if (qu.Deletes.RowsData.Count > 0) + { + Log.Warn("Non-insert during an insert-only server message!"); + } + var (insertReader, insertRowCount) = CompressionHelpers.ParseRowList(qu.Inserts); + for (var i = 0; i < insertRowCount; i++) + { + var obj = Decode(insertReader, out var pk); + delta.Delta.Add(pk, obj); + } + } + } + + /// + /// Parses a delete-only table update and applies the results to the specified parsed database update. + /// + /// The table update containing delete operations. + /// The parsed database update to apply changes to. + void IRemoteTableHandle.ParseDeleteOnly(TableUpdate update, ParsedDatabaseUpdate dbOps) + { + var delta = (ParsedTableUpdate)dbOps.UpdateForTable(this); + foreach (var cqu in update.Updates) + { + var qu = CompressionHelpers.DecompressDecodeQueryUpdate(cqu); + if (qu.Inserts.RowsData.Count > 0) + { + Log.Warn("Non-delete during a delete-only operation!"); + } + + var (deleteReader, deleteRowCount) = CompressionHelpers.ParseRowList(qu.Deletes); + for (var i = 0; i < deleteRowCount; i++) + { + var obj = Decode(deleteReader, out var pk); + delta.Delta.Remove(pk, obj); + } + } + } + + /// + /// Parses a general table update (insert, update, delete) and applies the results to the specified parsed database update. + /// + /// The table update containing operations. + /// The parsed database update to apply changes to. + void IRemoteTableHandle.Parse(TableUpdate update, ParsedDatabaseUpdate dbOps) + { + var delta = (ParsedTableUpdate)dbOps.UpdateForTable(this); + foreach (var cqu in update.Updates) + { + var qu = CompressionHelpers.DecompressDecodeQueryUpdate(cqu); + + // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once + // to the table, it doesn't matter that we call Add before Remove here. + + var (insertReader, insertRowCount) = CompressionHelpers.ParseRowList(qu.Inserts); + for (var i = 0; i < insertRowCount; i++) + { + var obj = Decode(insertReader, out var pk); + delta.Delta.Add(pk, obj); + } + + var (deleteReader, deleteRowCount) = CompressionHelpers.ParseRowList(qu.Deletes); + for (var i = 0; i < deleteRowCount; i++) + { + var obj = Decode(deleteReader, out var pk); + delta.Delta.Remove(pk, obj); + } + } + + } public delegate void RowEventHandler(EventContext context, Row row); private CustomRowEventHandler OnInsertHandler { get; } = new(); @@ -270,29 +427,41 @@ void InvokeUpdate(IEventContext context, IStructuralReadWrite oldRow, IStructura } } - List> wasInserted = new(); - List<(object key, IStructuralReadWrite oldValue, IStructuralReadWrite newValue)> wasUpdated = new(); - List> wasRemoved = new(); + List> wasInserted = new(); + List<(object key, Row oldValue, Row newValue)> wasUpdated = new(); + List> wasRemoved = new(); - void IRemoteTableHandle.PreApply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta) + /// + /// Invoked before applying the parsed table update (delta) to this table. + /// This is called for all tables before any updates are applied, allowing OnBeforeDelete callbacks to be triggered for rows that will be removed. + /// Calling the OnBeforeDelete callbacks allows the user to read the old values of the rows that will be removed, before they are actually removed. + /// Should be called before Apply and PostApply. + /// + void IRemoteTableHandle.PreApply(IEventContext context, IParsedTableUpdate parsedTableUpdate) { Debug.Assert(wasInserted.Count == 0 && wasUpdated.Count == 0 && wasRemoved.Count == 0, "Call Apply and PostApply before calling PreApply again"); - - foreach (var (_, value) in Entries.WillRemove(multiDictionaryDelta)) + var delta = (ParsedTableUpdate)parsedTableUpdate; + foreach (var (_, value) in Entries.WillRemove(delta.Delta)) { InvokeBeforeDelete(context, value); } } - void IRemoteTableHandle.Apply(IEventContext context, MultiDictionaryDelta multiDictionaryDelta) + /// + /// Applies the parsed table update (delta) to this table. + /// This updates the internal data structures and indices, but does not invoke user callbacks. + /// Should be called before PostApply, after PreApply. + /// + void IRemoteTableHandle.Apply(IEventContext context, IParsedTableUpdate parsedTableUpdate) { try { - Entries.Apply(multiDictionaryDelta, wasInserted, wasUpdated, wasRemoved); + var delta = (ParsedTableUpdate)parsedTableUpdate; + Entries.Apply(delta.Delta, wasInserted, wasUpdated, wasRemoved); } catch (Exception e) { - var deltaString = multiDictionaryDelta.ToString(); + var deltaString = parsedTableUpdate.ToString(); deltaString = deltaString[..Math.Min(deltaString.Length, 10_000)]; var entriesString = Entries.ToString(); entriesString = entriesString[..Math.Min(entriesString.Length, 10_000)]; @@ -346,6 +515,13 @@ void IRemoteTableHandle.Apply(IEventContext context, MultiDictionaryDelta + /// Invoked after applying the parsed table update (delta) to this table. + /// This is when user callbacks (such as OnInsert, OnUpdate, and OnDelete) are actually triggered for the affected rows. + /// All operations should be complete before calling PostApply, + /// so that data structures across all tables are fully updated before invoking user callbacks. + /// Should be called after PreApply and Apply. + /// void IRemoteTableHandle.PostApply(IEventContext context) { foreach (var (_, value) in wasInserted)