@@ -19,6 +19,7 @@ use bytes::Bytes;
1919use bytestring:: ByteString ;
2020use core:: {
2121 fmt:: Debug ,
22+ mem,
2223 ops:: { Deref , Range } ,
2324} ;
2425use enum_as_inner:: EnumAsInner ;
@@ -40,8 +41,19 @@ use std::{
4041pub const TEXT_PROTOCOL : & str = "v1.json.spacetimedb" ;
4142pub const BIN_PROTOCOL : & str = "v1.bsatn.spacetimedb" ;
4243
44+ /// A list of rows being built.
45+ pub trait RowListBuilder : Default {
46+ type FinishedList ;
47+
48+ /// Push a row to the list in a serialized format.
49+ fn push ( & mut self , row : impl ToBsatn + Serialize ) ;
50+
51+ /// Finish the in flight list, throwing away the capability to mutate.
52+ fn finish ( self ) -> Self :: FinishedList ;
53+ }
54+
4355pub trait RowListLen {
44- /// Returns the length of the list.
56+ /// Returns the length, in number of rows, not bytes, of the row list.
4557 fn len ( & self ) -> usize ;
4658 /// Returns whether the list is empty or not.
4759 fn is_empty ( & self ) -> bool {
@@ -86,8 +98,19 @@ pub trait WebsocketFormat: Sized {
8698 + Clone
8799 + Default ;
88100
101+ /// The builder for [`Self::List`].
102+ type ListBuilder : RowListBuilder < FinishedList = Self :: List > ;
103+
89104 /// Encodes the `elems` to a list in the format and also returns the length of the list.
90- fn encode_list < R : ToBsatn + Serialize > ( elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) ;
105+ fn encode_list < R : ToBsatn + Serialize > ( elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) {
106+ let mut num_rows = 0 ;
107+ let mut list = Self :: ListBuilder :: default ( ) ;
108+ for elem in elems {
109+ num_rows += 1 ;
110+ list. push ( elem) ;
111+ }
112+ ( list. finish ( ) , num_rows)
113+ }
91114
92115 /// The type used to encode query updates.
93116 /// This type exists so that some formats, e.g., BSATN, can compress an update.
@@ -758,15 +781,7 @@ impl WebsocketFormat for JsonFormat {
758781 type Single = ByteString ;
759782
760783 type List = Vec < ByteString > ;
761-
762- fn encode_list < R : ToBsatn + Serialize > ( elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) {
763- let mut count = 0 ;
764- let list = elems
765- . map ( |elem| serde_json:: to_string ( & SerializeWrapper :: new ( elem) ) . unwrap ( ) . into ( ) )
766- . inspect ( |_| count += 1 )
767- . collect ( ) ;
768- ( list, count)
769- }
784+ type ListBuilder = Self :: List ;
770785
771786 type QueryUpdate = QueryUpdate < Self > ;
772787
@@ -775,6 +790,17 @@ impl WebsocketFormat for JsonFormat {
775790 }
776791}
777792
793+ impl RowListBuilder for Vec < ByteString > {
794+ type FinishedList = Self ;
795+ fn push ( & mut self , row : impl ToBsatn + Serialize ) {
796+ let value = serde_json:: to_string ( & SerializeWrapper :: new ( row) ) . unwrap ( ) . into ( ) ;
797+ self . push ( value) ;
798+ }
799+ fn finish ( self ) -> Self :: FinishedList {
800+ self
801+ }
802+ }
803+
778804#[ derive( Clone , Copy , Default , Debug , SpacetimeType ) ]
779805#[ sats( crate = spacetimedb_lib) ]
780806pub struct BsatnFormat ;
@@ -783,33 +809,7 @@ impl WebsocketFormat for BsatnFormat {
783809 type Single = Box < [ u8 ] > ;
784810
785811 type List = BsatnRowList ;
786-
787- fn encode_list < R : ToBsatn + Serialize > ( mut elems : impl Iterator < Item = R > ) -> ( Self :: List , u64 ) {
788- // For an empty list, the size of a row is unknown, so use `RowOffsets`.
789- let Some ( first) = elems. next ( ) else {
790- return ( BsatnRowList :: row_offsets ( ) , 0 ) ;
791- } ;
792- // We have at least one row. Determine the static size from that, if available.
793- let ( mut list, mut scratch) = match first. static_bsatn_size ( ) {
794- Some ( size) => ( BsatnRowListBuilder :: fixed ( size) , Vec :: with_capacity ( size as usize ) ) ,
795- None => ( BsatnRowListBuilder :: row_offsets ( ) , Vec :: new ( ) ) ,
796- } ;
797- // Add the first element and then the rest.
798- // We assume that the schema of rows yielded by `elems` stays the same,
799- // so once the size is fixed, it will stay that way.
800- let mut count = 0 ;
801- let mut push = |elem : R | {
802- elem. to_bsatn_extend ( & mut scratch) . unwrap ( ) ;
803- list. push ( & scratch) ;
804- scratch. clear ( ) ;
805- count += 1 ;
806- } ;
807- push ( first) ;
808- for elem in elems {
809- push ( elem) ;
810- }
811- ( list. finish ( ) , count)
812- }
812+ type ListBuilder = BsatnRowListBuilder ;
813813
814814 type QueryUpdate = CompressableQueryUpdate < Self > ;
815815
@@ -896,38 +896,38 @@ type RowSize = u16;
896896type RowOffset = u64 ;
897897
898898/// A packed list of BSATN-encoded rows.
899- #[ derive( SpacetimeType , Debug , Clone ) ]
899+ #[ derive( SpacetimeType , Debug , Clone , Default ) ]
900900#[ sats( crate = spacetimedb_lib) ]
901- pub struct BsatnRowList < B = Bytes , I = Arc < [ RowOffset ] > > {
901+ pub struct BsatnRowList {
902902 /// A size hint about `rows_data`
903903 /// intended to facilitate parallel decode purposes on large initial updates.
904- size_hint : RowSizeHint < I > ,
904+ size_hint : RowSizeHint ,
905905 /// The flattened byte array for a list of rows.
906- rows_data : B ,
907- }
908-
909- impl Default for BsatnRowList {
910- fn default ( ) -> Self {
911- Self :: row_offsets ( )
912- }
906+ rows_data : Bytes ,
913907}
914908
915909/// NOTE(centril, 1.0): We might want to add a `None` variant to this
916910/// where the client has to decode in a loop until `rows_data` has been exhausted.
917911/// The use-case for this is clients who are bandwidth limited and where every byte counts.
918912#[ derive( SpacetimeType , Debug , Clone ) ]
919913#[ sats( crate = spacetimedb_lib) ]
920- pub enum RowSizeHint < I > {
914+ pub enum RowSizeHint {
921915 /// Each row in `rows_data` is of the same fixed size as specified here.
922916 FixedSize ( RowSize ) ,
923917 /// The offsets into `rows_data` defining the boundaries of each row.
924918 /// Only stores the offset to the start of each row.
925919 /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
926920 /// The behavior of this is identical to that of `PackedStr`.
927- RowOffsets ( I ) ,
921+ RowOffsets ( Arc < [ RowOffset ] > ) ,
922+ }
923+
924+ impl Default for RowSizeHint {
925+ fn default ( ) -> Self {
926+ Self :: RowOffsets ( [ ] . into ( ) )
927+ }
928928}
929929
930- impl < I : AsRef < [ RowOffset ] > > RowSizeHint < I > {
930+ impl RowSizeHint {
931931 fn index_to_range ( & self , index : usize , data_end : usize ) -> Option < Range < usize > > {
932932 match self {
933933 Self :: FixedSize ( size) => {
@@ -952,37 +952,17 @@ impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
952952 }
953953}
954954
955- impl < B : Default , I > BsatnRowList < B , I > {
956- pub fn fixed ( row_size : RowSize ) -> Self {
957- Self {
958- size_hint : RowSizeHint :: FixedSize ( row_size) ,
959- rows_data : <_ >:: default ( ) ,
960- }
961- }
962-
963- /// Returns a new empty list using indices
964- pub fn row_offsets ( ) -> Self
965- where
966- I : From < [ RowOffset ; 0 ] > ,
967- {
968- Self {
969- size_hint : RowSizeHint :: RowOffsets ( [ ] . into ( ) ) ,
970- rows_data : <_ >:: default ( ) ,
971- }
972- }
973- }
974-
975- impl < B : AsRef < [ u8 ] > , I : AsRef < [ RowOffset ] > > RowListLen for BsatnRowList < B , I > {
976- /// Returns the length of the row list.
955+ impl RowListLen for BsatnRowList {
977956 fn len ( & self ) -> usize {
978957 match & self . size_hint {
958+ // `size != 0` is always the case for `FixedSize`.
979959 RowSizeHint :: FixedSize ( size) => self . rows_data . as_ref ( ) . len ( ) / * size as usize ,
980960 RowSizeHint :: RowOffsets ( offsets) => offsets. as_ref ( ) . len ( ) ,
981961 }
982962 }
983963}
984964
985- impl < B : AsRef < [ u8 ] > , I > ByteListLen for BsatnRowList < B , I > {
965+ impl ByteListLen for BsatnRowList {
986966 /// Returns the uncompressed size of the list in bytes
987967 fn num_bytes ( & self ) -> usize {
988968 self . rows_data . as_ref ( ) . len ( )
@@ -1022,26 +1002,100 @@ impl Iterator for BsatnRowListIter<'_> {
10221002}
10231003
10241004/// A [`BsatnRowList`] that can be added to.
1025- pub type BsatnRowListBuilder = BsatnRowList < Vec < u8 > , Vec < RowOffset > > ;
1026-
1027- impl BsatnRowListBuilder {
1028- /// Adds `row`, BSATN-encoded to this list.
1029- #[ inline]
1030- pub fn push ( & mut self , row : & [ u8 ] ) {
1031- if let RowSizeHint :: RowOffsets ( offsets) = & mut self . size_hint {
1032- offsets. push ( self . rows_data . len ( ) as u64 ) ;
1033- }
1034- self . rows_data . extend_from_slice ( row) ;
1005+ #[ derive( Default ) ]
1006+ pub struct BsatnRowListBuilder {
1007+ /// A size hint about `rows_data`
1008+ /// intended to facilitate parallel decode purposes on large initial updates.
1009+ size_hint : RowSizeHintBuilder ,
1010+ /// The flattened byte array for a list of rows.
1011+ rows_data : Vec < u8 > ,
1012+ }
1013+
1014+ /// A [`RowSizeHint`] under construction.
1015+ pub enum RowSizeHintBuilder {
1016+ /// We haven't seen any rows yet.
1017+ Empty ,
1018+ /// Each row in `rows_data` is of the same fixed size as specified here
1019+ /// but we don't know whether the size fits in `RowSize`
1020+ /// and we don't know whether future rows will also have this size.
1021+ FixedSizeDyn ( usize ) ,
1022+ /// Each row in `rows_data` is of the same fixed size as specified here
1023+ /// and we know that this will be the case for future rows as well.
1024+ FixedSizeStatic ( RowSize ) ,
1025+ /// The offsets into `rows_data` defining the boundaries of each row.
1026+ /// Only stores the offset to the start of each row.
1027+ /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
1028+ /// The behavior of this is identical to that of `PackedStr`.
1029+ RowOffsets ( Vec < RowOffset > ) ,
1030+ }
1031+
1032+ impl Default for RowSizeHintBuilder {
1033+ fn default ( ) -> Self {
1034+ Self :: Empty
10351035 }
1036+ }
10361037
1037- /// Finish the in flight list, throwing away the capability to mutate.
1038- pub fn finish ( self ) -> BsatnRowList {
1038+ impl RowListBuilder for BsatnRowListBuilder {
1039+ type FinishedList = BsatnRowList ;
1040+
1041+ fn push ( & mut self , row : impl ToBsatn + Serialize ) {
1042+ use RowSizeHintBuilder :: * ;
1043+
1044+ // Record the length before. It will be the starting offset of `row`.
1045+ let len_before = self . rows_data . len ( ) ;
1046+ // BSATN-encode the row directly to the buffer.
1047+ row. to_bsatn_extend ( & mut self . rows_data ) . unwrap ( ) ;
1048+
1049+ let encoded_len = || self . rows_data . len ( ) - len_before;
1050+ let push_row_offset = |mut offsets : Vec < _ > | {
1051+ offsets. push ( len_before as u64 ) ;
1052+ RowOffsets ( offsets)
1053+ } ;
1054+
1055+ let hint = mem:: replace ( & mut self . size_hint , Empty ) ;
1056+ self . size_hint = match hint {
1057+ // Static size that is unchanging.
1058+ h @ FixedSizeStatic ( _) => h,
1059+ // Dynamic size that is unchanging.
1060+ h @ FixedSizeDyn ( size) if size == encoded_len ( ) => h,
1061+ // Size mismatch for the dynamic fixed size.
1062+ // Now we must construct `RowOffsets` for all rows thus far.
1063+ // We know that `size != 0` here, as this was excluded when we had `Empty`.
1064+ FixedSizeDyn ( size) => RowOffsets ( collect_offsets_from_num_rows ( 1 + len_before / size, size) ) ,
1065+ // Once there's a size for each row, we'll just add to it.
1066+ RowOffsets ( offsets) => push_row_offset ( offsets) ,
1067+ // First time a row is seen. Use `encoded_len()` as the hint.
1068+ // If we have a static layout, we'll always have a fixed size.
1069+ // Otherwise, let's start out with a potentially fixed size.
1070+ // In either case, if `encoded_len() == 0`, we have to store offsets,
1071+ // as we cannot recover the number of elements otherwise.
1072+ Empty => match row. static_bsatn_size ( ) {
1073+ Some ( 0 ) => push_row_offset ( Vec :: new ( ) ) ,
1074+ Some ( size) => FixedSizeStatic ( size) ,
1075+ None => match encoded_len ( ) {
1076+ 0 => push_row_offset ( Vec :: new ( ) ) ,
1077+ size => FixedSizeDyn ( size) ,
1078+ } ,
1079+ } ,
1080+ } ;
1081+ }
1082+
1083+ fn finish ( self ) -> Self :: FinishedList {
10391084 let Self { size_hint, rows_data } = self ;
1040- let rows_data = rows_data. into ( ) ;
10411085 let size_hint = match size_hint {
1042- RowSizeHint :: FixedSize ( fs) => RowSizeHint :: FixedSize ( fs) ,
1043- RowSizeHint :: RowOffsets ( ro) => RowSizeHint :: RowOffsets ( ro. into ( ) ) ,
1086+ RowSizeHintBuilder :: Empty => RowSizeHint :: RowOffsets ( [ ] . into ( ) ) ,
1087+ RowSizeHintBuilder :: FixedSizeStatic ( fs) => RowSizeHint :: FixedSize ( fs) ,
1088+ RowSizeHintBuilder :: FixedSizeDyn ( fs) => match u16:: try_from ( fs) {
1089+ Ok ( fs) => RowSizeHint :: FixedSize ( fs) ,
1090+ Err ( _) => RowSizeHint :: RowOffsets ( collect_offsets_from_num_rows ( rows_data. len ( ) / fs, fs) . into ( ) ) ,
1091+ } ,
1092+ RowSizeHintBuilder :: RowOffsets ( ro) => RowSizeHint :: RowOffsets ( ro. into ( ) ) ,
10441093 } ;
1094+ let rows_data = rows_data. into ( ) ;
10451095 BsatnRowList { size_hint, rows_data }
10461096 }
10471097}
1098+
1099+ fn collect_offsets_from_num_rows ( num_rows : usize , size : usize ) -> Vec < u64 > {
1100+ ( 0 ..num_rows) . map ( |i| i * size) . map ( |o| o as u64 ) . collect ( )
1101+ }
0 commit comments