@@ -26,22 +26,18 @@ use smallvec::SmallVec;
2626use spacetimedb_lib:: { ConnectionId , Identity , TimeDuration , Timestamp } ;
2727use spacetimedb_primitives:: TableId ;
2828use spacetimedb_sats:: {
29- bsatn:: { self , ToBsatn } ,
3029 de:: { Deserialize , Error } ,
3130 impl_deserialize, impl_serialize, impl_st,
32- ser:: { serde :: SerializeWrapper , Serialize } ,
31+ ser:: Serialize ,
3332 AlgebraicType , SpacetimeType ,
3433} ;
35- use std:: {
36- io:: { self , Read as _, Write as _} ,
37- sync:: Arc ,
38- } ;
34+ use std:: sync:: Arc ;
3935
4036pub const TEXT_PROTOCOL : & str = "v1.json.spacetimedb" ;
4137pub const BIN_PROTOCOL : & str = "v1.bsatn.spacetimedb" ;
4238
4339pub trait RowListLen {
44- /// Returns the length of the list.
40+ /// Returns the length, in number of rows, not bytes, of the row list.
4541 fn len ( & self ) -> usize ;
4642 /// Returns whether the list is empty or not.
4743 fn is_empty ( & self ) -> bool {
@@ -86,16 +82,9 @@ pub trait WebsocketFormat: Sized {
8682 + Clone
8783 + Default ;
8884
89- /// Encodes the `elems` to a list in the format and also returns the length of the list.
90- fn encode_list < R : ToBsatn + Serialize > ( elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) ;
91-
9285 /// The type used to encode query updates.
9386 /// This type exists so that some formats, e.g., BSATN, can compress an update.
9487 type QueryUpdate : SpacetimeType + for < ' de > Deserialize < ' de > + Serialize + Debug + Clone + Send ;
95-
96- /// Convert a `QueryUpdate` into `Self::QueryUpdate`.
97- /// This allows some formats to e.g., compress the update.
98- fn into_query_update ( qu : QueryUpdate < Self > , compression : Compression ) -> Self :: QueryUpdate ;
9988}
10089
10190/// Messages sent from the client to the server.
@@ -666,22 +655,6 @@ pub enum CompressableQueryUpdate<F: WebsocketFormat> {
666655 Gzip ( Bytes ) ,
667656}
668657
669- impl CompressableQueryUpdate < BsatnFormat > {
670- pub fn maybe_decompress ( self ) -> QueryUpdate < BsatnFormat > {
671- match self {
672- Self :: Uncompressed ( qu) => qu,
673- Self :: Brotli ( bytes) => {
674- let bytes = brotli_decompress ( & bytes) . unwrap ( ) ;
675- bsatn:: from_slice ( & bytes) . unwrap ( )
676- }
677- Self :: Gzip ( bytes) => {
678- let bytes = gzip_decompress ( & bytes) . unwrap ( ) ;
679- bsatn:: from_slice ( & bytes) . unwrap ( )
680- }
681- }
682- }
683- }
684-
685658#[ derive( SpacetimeType , Debug , Clone ) ]
686659#[ sats( crate = spacetimedb_lib) ]
687660pub struct QueryUpdate < F : WebsocketFormat > {
@@ -756,23 +729,8 @@ pub struct JsonFormat;
756729
757730impl WebsocketFormat for JsonFormat {
758731 type Single = ByteString ;
759-
760732 type List = Vec < ByteString > ;
761-
762- fn encode_list < R : ToBsatn + Serialize > ( elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) {
763- let mut count = 0 ;
764- let list = elems
765- . map ( |elem| serde_json:: to_string ( & SerializeWrapper :: new ( elem) ) . unwrap ( ) . into ( ) )
766- . inspect ( |_| count += 1 )
767- . collect ( ) ;
768- ( list, count)
769- }
770-
771733 type QueryUpdate = QueryUpdate < Self > ;
772-
773- fn into_query_update ( qu : QueryUpdate < Self > , _: Compression ) -> Self :: QueryUpdate {
774- qu
775- }
776734}
777735
778736#[ derive( Clone , Copy , Default , Debug , SpacetimeType ) ]
@@ -781,57 +739,8 @@ pub struct BsatnFormat;
781739
782740impl WebsocketFormat for BsatnFormat {
783741 type Single = Box < [ u8 ] > ;
784-
785742 type List = BsatnRowList ;
786-
787- fn encode_list < R : ToBsatn + Serialize > ( mut elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) {
788- // For an empty list, the size of a row is unknown, so use `RowOffsets`.
789- let Some ( first) = elems. next ( ) else {
790- return ( BsatnRowList :: row_offsets ( ) , 0 ) ;
791- } ;
792- // We have at least one row. Determine the static size from that, if available.
793- let ( mut list, mut scratch) = match first. static_bsatn_size ( ) {
794- Some ( size) => ( BsatnRowListBuilder :: fixed ( size) , Vec :: with_capacity ( size as usize ) ) ,
795- None => ( BsatnRowListBuilder :: row_offsets ( ) , Vec :: new ( ) ) ,
796- } ;
797- // Add the first element and then the rest.
798- // We assume that the schema of rows yielded by `elems` stays the same,
799- // so once the size is fixed, it will stay that way.
800- let mut count = 0 ;
801- let mut push = |elem : R | {
802- elem. to_bsatn_extend ( & mut scratch) . unwrap ( ) ;
803- list. push ( & scratch) ;
804- scratch. clear ( ) ;
805- count += 1 ;
806- } ;
807- push ( first) ;
808- for elem in elems {
809- push ( elem) ;
810- }
811- ( list. finish ( ) , count)
812- }
813-
814743 type QueryUpdate = CompressableQueryUpdate < Self > ;
815-
816- fn into_query_update ( qu : QueryUpdate < Self > , compression : Compression ) -> Self :: QueryUpdate {
817- let qu_len_would_have_been = bsatn:: to_len ( & qu) . unwrap ( ) ;
818-
819- match decide_compression ( qu_len_would_have_been, compression) {
820- Compression :: None => CompressableQueryUpdate :: Uncompressed ( qu) ,
821- Compression :: Brotli => {
822- let bytes = bsatn:: to_vec ( & qu) . unwrap ( ) ;
823- let mut out = Vec :: new ( ) ;
824- brotli_compress ( & bytes, & mut out) ;
825- CompressableQueryUpdate :: Brotli ( out. into ( ) )
826- }
827- Compression :: Gzip => {
828- let bytes = bsatn:: to_vec ( & qu) . unwrap ( ) ;
829- let mut out = Vec :: new ( ) ;
830- gzip_compress ( & bytes, & mut out) ;
831- CompressableQueryUpdate :: Gzip ( out. into ( ) )
832- }
833- }
834- }
835744}
836745
837746/// A specification of either a desired or decided compression algorithm.
@@ -846,69 +755,28 @@ pub enum Compression {
846755 Gzip ,
847756}
848757
849- pub fn decide_compression ( len : usize , compression : Compression ) -> Compression {
850- /// The threshold beyond which we start to compress messages.
851- /// 1KiB was chosen without measurement.
852- /// TODO(perf): measure!
853- const COMPRESS_THRESHOLD : usize = 1024 ;
854-
855- if len > COMPRESS_THRESHOLD {
856- compression
857- } else {
858- Compression :: None
859- }
860- }
861-
862- pub fn brotli_compress ( bytes : & [ u8 ] , out : & mut impl io:: Write ) {
863- // We are optimizing for compression speed,
864- // so we choose the lowest (fastest) level of compression.
865- // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
866- // for large `SubscriptionUpdate` messages at this level.
867- const COMPRESSION_LEVEL : i32 = 1 ;
868-
869- let params = brotli:: enc:: BrotliEncoderParams {
870- quality : COMPRESSION_LEVEL ,
871- ..<_ >:: default ( )
872- } ;
873- let reader = & mut & bytes[ ..] ;
874- brotli:: BrotliCompress ( reader, out, & params) . expect ( "should be able to BrotliCompress" ) ;
875- }
876-
877- pub fn brotli_decompress ( bytes : & [ u8 ] ) -> Result < Vec < u8 > , io:: Error > {
878- let mut decompressed = Vec :: new ( ) ;
879- brotli:: BrotliDecompress ( & mut & bytes[ ..] , & mut decompressed) ?;
880- Ok ( decompressed)
881- }
882-
883- pub fn gzip_compress ( bytes : & [ u8 ] , out : & mut impl io:: Write ) {
884- let mut encoder = flate2:: write:: GzEncoder :: new ( out, flate2:: Compression :: fast ( ) ) ;
885- encoder. write_all ( bytes) . unwrap ( ) ;
886- encoder. finish ( ) . expect ( "should be able to gzip compress `bytes`" ) ;
887- }
888-
889- pub fn gzip_decompress ( bytes : & [ u8 ] ) -> Result < Vec < u8 > , io:: Error > {
890- let mut decompressed = Vec :: new ( ) ;
891- let _ = flate2:: read:: GzDecoder :: new ( bytes) . read_to_end ( & mut decompressed) ?;
892- Ok ( decompressed)
893- }
894-
895- type RowSize = u16 ;
896- type RowOffset = u64 ;
758+ pub type RowSize = u16 ;
759+ pub type RowOffset = u64 ;
897760
898761/// A packed list of BSATN-encoded rows.
899- #[ derive( SpacetimeType , Debug , Clone ) ]
762+ #[ derive( SpacetimeType , Debug , Clone , Default ) ]
900763#[ sats( crate = spacetimedb_lib) ]
901- pub struct BsatnRowList < B = Bytes , I = Arc < [ RowOffset ] > > {
764+ pub struct BsatnRowList {
902765 /// A size hint about `rows_data`
903766 /// intended to facilitate parallel decode purposes on large initial updates.
904- size_hint : RowSizeHint < I > ,
767+ size_hint : RowSizeHint ,
905768 /// The flattened byte array for a list of rows.
906- rows_data : B ,
769+ rows_data : Bytes ,
907770}
908771
909- impl Default for BsatnRowList {
910- fn default ( ) -> Self {
911- Self :: row_offsets ( )
772+ impl BsatnRowList {
773+ /// Returns a new row list where `rows_data` is the flattened byte array
774+ /// containing the BSATN of each row, without any markers for where a row begins and end.
775+ ///
776+ /// The `size_hint` encodes the boundaries of each row in `rows_data`.
777+ /// See [`RowSizeHint`] for more details on the encoding.
778+ pub fn new ( size_hint : RowSizeHint , rows_data : Bytes ) -> Self {
779+ Self { size_hint, rows_data }
912780 }
913781}
914782
@@ -917,17 +785,23 @@ impl Default for BsatnRowList {
917785/// The use-case for this is clients who are bandwidth limited and where every byte counts.
918786#[ derive( SpacetimeType , Debug , Clone ) ]
919787#[ sats( crate = spacetimedb_lib) ]
920- pub enum RowSizeHint < I > {
788+ pub enum RowSizeHint {
921789 /// Each row in `rows_data` is of the same fixed size as specified here.
922790 FixedSize ( RowSize ) ,
923791 /// The offsets into `rows_data` defining the boundaries of each row.
924792 /// Only stores the offset to the start of each row.
925793 /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
926794 /// The behavior of this is identical to that of `PackedStr`.
927- RowOffsets ( I ) ,
795+ RowOffsets ( Arc < [ RowOffset ] > ) ,
796+ }
797+
798+ impl Default for RowSizeHint {
799+ fn default ( ) -> Self {
800+ Self :: RowOffsets ( [ ] . into ( ) )
801+ }
928802}
929803
930- impl < I : AsRef < [ RowOffset ] > > RowSizeHint < I > {
804+ impl RowSizeHint {
931805 fn index_to_range ( & self , index : usize , data_end : usize ) -> Option < Range < usize > > {
932806 match self {
933807 Self :: FixedSize ( size) => {
@@ -952,37 +826,17 @@ impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
952826 }
953827}
954828
955- impl < B : Default , I > BsatnRowList < B , I > {
956- pub fn fixed ( row_size : RowSize ) -> Self {
957- Self {
958- size_hint : RowSizeHint :: FixedSize ( row_size) ,
959- rows_data : <_ >:: default ( ) ,
960- }
961- }
962-
963- /// Returns a new empty list using indices
964- pub fn row_offsets ( ) -> Self
965- where
966- I : From < [ RowOffset ; 0 ] > ,
967- {
968- Self {
969- size_hint : RowSizeHint :: RowOffsets ( [ ] . into ( ) ) ,
970- rows_data : <_ >:: default ( ) ,
971- }
972- }
973- }
974-
975- impl < B : AsRef < [ u8 ] > , I : AsRef < [ RowOffset ] > > RowListLen for BsatnRowList < B , I > {
976- /// Returns the length of the row list.
829+ impl RowListLen for BsatnRowList {
977830 fn len ( & self ) -> usize {
978831 match & self . size_hint {
832+ // `size != 0` is always the case for `FixedSize`.
979833 RowSizeHint :: FixedSize ( size) => self . rows_data . as_ref ( ) . len ( ) / * size as usize ,
980834 RowSizeHint :: RowOffsets ( offsets) => offsets. as_ref ( ) . len ( ) ,
981835 }
982836 }
983837}
984838
985- impl < B : AsRef < [ u8 ] > , I > ByteListLen for BsatnRowList < B , I > {
839+ impl ByteListLen for BsatnRowList {
986840 /// Returns the uncompressed size of the list in bytes
987841 fn num_bytes ( & self ) -> usize {
988842 self . rows_data . as_ref ( ) . len ( )
@@ -1020,28 +874,3 @@ impl Iterator for BsatnRowListIter<'_> {
1020874 self . list . get ( index)
1021875 }
1022876}
1023-
1024- /// A [`BsatnRowList`] that can be added to.
1025- pub type BsatnRowListBuilder = BsatnRowList < Vec < u8 > , Vec < RowOffset > > ;
1026-
1027- impl BsatnRowListBuilder {
1028- /// Adds `row`, BSATN-encoded to this list.
1029- #[ inline]
1030- pub fn push ( & mut self , row : & [ u8 ] ) {
1031- if let RowSizeHint :: RowOffsets ( offsets) = & mut self . size_hint {
1032- offsets. push ( self . rows_data . len ( ) as u64 ) ;
1033- }
1034- self . rows_data . extend_from_slice ( row) ;
1035- }
1036-
1037- /// Finish the in flight list, throwing away the capability to mutate.
1038- pub fn finish ( self ) -> BsatnRowList {
1039- let Self { size_hint, rows_data } = self ;
1040- let rows_data = rows_data. into ( ) ;
1041- let size_hint = match size_hint {
1042- RowSizeHint :: FixedSize ( fs) => RowSizeHint :: FixedSize ( fs) ,
1043- RowSizeHint :: RowOffsets ( ro) => RowSizeHint :: RowOffsets ( ro. into ( ) ) ,
1044- } ;
1045- BsatnRowList { size_hint, rows_data }
1046- }
1047- }
0 commit comments