@@ -19,39 +19,29 @@ use bytes::Bytes;
1919use bytestring:: ByteString ;
2020use core:: {
2121 fmt:: Debug ,
22- mem,
2322 ops:: { Deref , Range } ,
2423} ;
2524use enum_as_inner:: EnumAsInner ;
2625use smallvec:: SmallVec ;
2726use spacetimedb_lib:: { ConnectionId , Identity , TimeDuration , Timestamp } ;
2827use spacetimedb_primitives:: TableId ;
2928use spacetimedb_sats:: {
30- bsatn:: { self , ToBsatn } ,
29+ bsatn,
3130 de:: { Deserialize , Error } ,
3231 impl_deserialize, impl_serialize, impl_st,
33- ser:: { serde :: SerializeWrapper , Serialize } ,
32+ ser:: Serialize ,
3433 AlgebraicType , SpacetimeType ,
3534} ;
3635use std:: {
37- io:: { self , Read as _, Write as _ } ,
36+ io:: { self , Read as _} ,
3837 sync:: Arc ,
3938} ;
4039
40+ pub use crate :: websocket_building:: * ;
41+
4142pub const TEXT_PROTOCOL : & str = "v1.json.spacetimedb" ;
4243pub const BIN_PROTOCOL : & str = "v1.bsatn.spacetimedb" ;
4344
44- /// A list of rows being built.
45- pub trait RowListBuilder : Default {
46- type FinishedList ;
47-
48- /// Push a row to the list in a serialized format.
49- fn push ( & mut self , row : impl ToBsatn + Serialize ) ;
50-
51- /// Finish the in flight list, throwing away the capability to mutate.
52- fn finish ( self ) -> Self :: FinishedList ;
53- }
54-
5545pub trait RowListLen {
5646 /// Returns the length, in number of rows, not bytes, of the row list.
5747 fn len ( & self ) -> usize ;
@@ -103,26 +93,6 @@ pub trait WebsocketFormat: Sized {
10393 type QueryUpdate : SpacetimeType + for < ' de > Deserialize < ' de > + Serialize + Debug + Clone + Send ;
10494}
10595
106- pub trait BuildableWebsocketFormat : WebsocketFormat {
107- /// The builder for [`Self::List`].
108- type ListBuilder : RowListBuilder < FinishedList = Self :: List > ;
109-
110- /// Encodes the `elems` to a list in the format and also returns the length of the list.
111- fn encode_list < R : ToBsatn + Serialize > ( elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) {
112- let mut num_rows = 0 ;
113- let mut list = Self :: ListBuilder :: default ( ) ;
114- for elem in elems {
115- num_rows += 1 ;
116- list. push ( elem) ;
117- }
118- ( list. finish ( ) , num_rows)
119- }
120-
121- /// Convert a `QueryUpdate` into `Self::QueryUpdate`.
122- /// This allows some formats to e.g., compress the update.
123- fn into_query_update ( qu : QueryUpdate < Self > , compression : Compression ) -> Self :: QueryUpdate ;
124- }
125-
12696/// Messages sent from the client to the server.
12797///
12898/// Parametric over the reducer argument type to enable [`ClientMessage::map_args`].
@@ -785,25 +755,6 @@ impl WebsocketFormat for JsonFormat {
785755 type QueryUpdate = QueryUpdate < Self > ;
786756}
787757
788- impl BuildableWebsocketFormat for JsonFormat {
789- type ListBuilder = Self :: List ;
790-
791- fn into_query_update ( qu : QueryUpdate < Self > , _: Compression ) -> Self :: QueryUpdate {
792- qu
793- }
794- }
795-
796- impl RowListBuilder for Vec < ByteString > {
797- type FinishedList = Self ;
798- fn push ( & mut self , row : impl ToBsatn + Serialize ) {
799- let value = serde_json:: to_string ( & SerializeWrapper :: new ( row) ) . unwrap ( ) . into ( ) ;
800- self . push ( value) ;
801- }
802- fn finish ( self ) -> Self :: FinishedList {
803- self
804- }
805- }
806-
807758#[ derive( Clone , Copy , Default , Debug , SpacetimeType ) ]
808759#[ sats( crate = spacetimedb_lib) ]
809760pub struct BsatnFormat ;
@@ -814,30 +765,6 @@ impl WebsocketFormat for BsatnFormat {
814765 type QueryUpdate = CompressableQueryUpdate < Self > ;
815766}
816767
817- impl BuildableWebsocketFormat for BsatnFormat {
818- type ListBuilder = BsatnRowListBuilder ;
819-
820- fn into_query_update ( qu : QueryUpdate < Self > , compression : Compression ) -> Self :: QueryUpdate {
821- let qu_len_would_have_been = bsatn:: to_len ( & qu) . unwrap ( ) ;
822-
823- match decide_compression ( qu_len_would_have_been, compression) {
824- Compression :: None => CompressableQueryUpdate :: Uncompressed ( qu) ,
825- Compression :: Brotli => {
826- let bytes = bsatn:: to_vec ( & qu) . unwrap ( ) ;
827- let mut out = Vec :: new ( ) ;
828- brotli_compress ( & bytes, & mut out) ;
829- CompressableQueryUpdate :: Brotli ( out. into ( ) )
830- }
831- Compression :: Gzip => {
832- let bytes = bsatn:: to_vec ( & qu) . unwrap ( ) ;
833- let mut out = Vec :: new ( ) ;
834- gzip_compress ( & bytes, & mut out) ;
835- CompressableQueryUpdate :: Gzip ( out. into ( ) )
836- }
837- }
838- }
839- }
840-
841768/// A specification of either a desired or decided compression algorithm.
842769#[ derive( serde:: Deserialize , Default , PartialEq , Eq , Clone , Copy , Hash , Debug ) ]
843770pub enum Compression {
@@ -850,54 +777,20 @@ pub enum Compression {
850777 Gzip ,
851778}
852779
853- pub fn decide_compression ( len : usize , compression : Compression ) -> Compression {
854- /// The threshold beyond which we start to compress messages.
855- /// 1KiB was chosen without measurement.
856- /// TODO(perf): measure!
857- const COMPRESS_THRESHOLD : usize = 1024 ;
858-
859- if len > COMPRESS_THRESHOLD {
860- compression
861- } else {
862- Compression :: None
863- }
864- }
865-
866- pub fn brotli_compress ( bytes : & [ u8 ] , out : & mut impl io:: Write ) {
867- // We are optimizing for compression speed,
868- // so we choose the lowest (fastest) level of compression.
869- // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
870- // for large `SubscriptionUpdate` messages at this level.
871- const COMPRESSION_LEVEL : i32 = 1 ;
872-
873- let params = brotli:: enc:: BrotliEncoderParams {
874- quality : COMPRESSION_LEVEL ,
875- ..<_ >:: default ( )
876- } ;
877- let reader = & mut & bytes[ ..] ;
878- brotli:: BrotliCompress ( reader, out, & params) . expect ( "should be able to BrotliCompress" ) ;
879- }
880-
881780pub fn brotli_decompress ( bytes : & [ u8 ] ) -> Result < Vec < u8 > , io:: Error > {
882781 let mut decompressed = Vec :: new ( ) ;
883782 brotli:: BrotliDecompress ( & mut & bytes[ ..] , & mut decompressed) ?;
884783 Ok ( decompressed)
885784}
886785
887- pub fn gzip_compress ( bytes : & [ u8 ] , out : & mut impl io:: Write ) {
888- let mut encoder = flate2:: write:: GzEncoder :: new ( out, flate2:: Compression :: fast ( ) ) ;
889- encoder. write_all ( bytes) . unwrap ( ) ;
890- encoder. finish ( ) . expect ( "should be able to gzip compress `bytes`" ) ;
891- }
892-
893786pub fn gzip_decompress ( bytes : & [ u8 ] ) -> Result < Vec < u8 > , io:: Error > {
894787 let mut decompressed = Vec :: new ( ) ;
895788 let _ = flate2:: read:: GzDecoder :: new ( bytes) . read_to_end ( & mut decompressed) ?;
896789 Ok ( decompressed)
897790}
898791
899- type RowSize = u16 ;
900- type RowOffset = u64 ;
792+ pub type RowSize = u16 ;
793+ pub type RowOffset = u64 ;
901794
902795/// A packed list of BSATN-encoded rows.
903796#[ derive( SpacetimeType , Debug , Clone , Default ) ]
@@ -910,6 +803,17 @@ pub struct BsatnRowList {
910803 rows_data : Bytes ,
911804}
912805
806+ impl BsatnRowList {
807+ /// Returns a new row list where `rows_data` is the flattened byte array
808+ /// containing the BSATN of each row, without any markers for where a row begins and end.
809+ ///
810+ /// The `size_hint` encodes the boundaries of each row in `rows_data`.
811+ /// See [`RowSizeHint`] for more details on the encoding.
812+ pub fn new ( size_hint : RowSizeHint , rows_data : Bytes ) -> Self {
813+ Self { size_hint, rows_data }
814+ }
815+ }
816+
913817/// NOTE(centril, 1.0): We might want to add a `None` variant to this
914818/// where the client has to decode in a loop until `rows_data` has been exhausted.
915819/// The use-case for this is clients who are bandwidth limited and where every byte counts.
@@ -1004,102 +908,3 @@ impl Iterator for BsatnRowListIter<'_> {
1004908 self . list . get ( index)
1005909 }
1006910}
1007-
1008- /// A [`BsatnRowList`] that can be added to.
1009- #[ derive( Default ) ]
1010- pub struct BsatnRowListBuilder {
1011- /// A size hint about `rows_data`
1012- /// intended to facilitate parallel decode purposes on large initial updates.
1013- size_hint : RowSizeHintBuilder ,
1014- /// The flattened byte array for a list of rows.
1015- rows_data : Vec < u8 > ,
1016- }
1017-
1018- /// A [`RowSizeHint`] under construction.
1019- pub enum RowSizeHintBuilder {
1020- /// We haven't seen any rows yet.
1021- Empty ,
1022- /// Each row in `rows_data` is of the same fixed size as specified here
1023- /// but we don't know whether the size fits in `RowSize`
1024- /// and we don't know whether future rows will also have this size.
1025- FixedSizeDyn ( usize ) ,
1026- /// Each row in `rows_data` is of the same fixed size as specified here
1027- /// and we know that this will be the case for future rows as well.
1028- FixedSizeStatic ( RowSize ) ,
1029- /// The offsets into `rows_data` defining the boundaries of each row.
1030- /// Only stores the offset to the start of each row.
1031- /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
1032- /// The behavior of this is identical to that of `PackedStr`.
1033- RowOffsets ( Vec < RowOffset > ) ,
1034- }
1035-
1036- impl Default for RowSizeHintBuilder {
1037- fn default ( ) -> Self {
1038- Self :: Empty
1039- }
1040- }
1041-
1042- impl RowListBuilder for BsatnRowListBuilder {
1043- type FinishedList = BsatnRowList ;
1044-
1045- fn push ( & mut self , row : impl ToBsatn + Serialize ) {
1046- use RowSizeHintBuilder :: * ;
1047-
1048- // Record the length before. It will be the starting offset of `row`.
1049- let len_before = self . rows_data . len ( ) ;
1050- // BSATN-encode the row directly to the buffer.
1051- row. to_bsatn_extend ( & mut self . rows_data ) . unwrap ( ) ;
1052-
1053- let encoded_len = || self . rows_data . len ( ) - len_before;
1054- let push_row_offset = |mut offsets : Vec < _ > | {
1055- offsets. push ( len_before as u64 ) ;
1056- RowOffsets ( offsets)
1057- } ;
1058-
1059- let hint = mem:: replace ( & mut self . size_hint , Empty ) ;
1060- self . size_hint = match hint {
1061- // Static size that is unchanging.
1062- h @ FixedSizeStatic ( _) => h,
1063- // Dynamic size that is unchanging.
1064- h @ FixedSizeDyn ( size) if size == encoded_len ( ) => h,
1065- // Size mismatch for the dynamic fixed size.
1066- // Now we must construct `RowOffsets` for all rows thus far.
1067- // We know that `size != 0` here, as this was excluded when we had `Empty`.
1068- FixedSizeDyn ( size) => RowOffsets ( collect_offsets_from_num_rows ( 1 + len_before / size, size) ) ,
1069- // Once there's a size for each row, we'll just add to it.
1070- RowOffsets ( offsets) => push_row_offset ( offsets) ,
1071- // First time a row is seen. Use `encoded_len()` as the hint.
1072- // If we have a static layout, we'll always have a fixed size.
1073- // Otherwise, let's start out with a potentially fixed size.
1074- // In either case, if `encoded_len() == 0`, we have to store offsets,
1075- // as we cannot recover the number of elements otherwise.
1076- Empty => match row. static_bsatn_size ( ) {
1077- Some ( 0 ) => push_row_offset ( Vec :: new ( ) ) ,
1078- Some ( size) => FixedSizeStatic ( size) ,
1079- None => match encoded_len ( ) {
1080- 0 => push_row_offset ( Vec :: new ( ) ) ,
1081- size => FixedSizeDyn ( size) ,
1082- } ,
1083- } ,
1084- } ;
1085- }
1086-
1087- fn finish ( self ) -> Self :: FinishedList {
1088- let Self { size_hint, rows_data } = self ;
1089- let size_hint = match size_hint {
1090- RowSizeHintBuilder :: Empty => RowSizeHint :: RowOffsets ( [ ] . into ( ) ) ,
1091- RowSizeHintBuilder :: FixedSizeStatic ( fs) => RowSizeHint :: FixedSize ( fs) ,
1092- RowSizeHintBuilder :: FixedSizeDyn ( fs) => match u16:: try_from ( fs) {
1093- Ok ( fs) => RowSizeHint :: FixedSize ( fs) ,
1094- Err ( _) => RowSizeHint :: RowOffsets ( collect_offsets_from_num_rows ( rows_data. len ( ) / fs, fs) . into ( ) ) ,
1095- } ,
1096- RowSizeHintBuilder :: RowOffsets ( ro) => RowSizeHint :: RowOffsets ( ro. into ( ) ) ,
1097- } ;
1098- let rows_data = rows_data. into ( ) ;
1099- BsatnRowList { size_hint, rows_data }
1100- }
1101- }
1102-
1103- fn collect_offsets_from_num_rows ( num_rows : usize , size : usize ) -> Vec < u64 > {
1104- ( 0 ..num_rows) . map ( |i| i * size) . map ( |o| o as u64 ) . collect ( )
1105- }
0 commit comments