@@ -161,6 +161,134 @@ async fn test_stream_builder_with_explicit_rowid_keys() {
161161 assert_eq ! ( empty. iter( ) . map( |b| b. num_rows( ) ) . sum:: <usize >( ) , 0 ) ;
162162}
163163
164+ #[ test]
165+ fn test_stream_builder_abandon_rolls_back ( ) {
166+ // Dropping the builder before finish() must roll the transaction back, so
167+ // the table is never persisted. We prove it by re-running begin() on the
168+ // same path: its CREATE TABLE only succeeds if the abandoned build left no
169+ // table behind.
170+ let dir = tempdir ( ) . unwrap ( ) ;
171+ let db_path = dir. path ( ) . join ( "abandon.db" ) ;
172+ let db = db_path. to_str ( ) . unwrap ( ) ;
173+ let schema = Arc :: new ( Schema :: new ( vec ! [
174+ Field :: new( "rowid" , DataType :: Int64 , false ) ,
175+ Field :: new( "name" , DataType :: Utf8 , true ) ,
176+ ] ) ) ;
177+ let batch = RecordBatch :: try_new (
178+ schema. clone ( ) ,
179+ vec ! [
180+ Arc :: new( Int64Array :: from( vec![ 1_i64 , 2 ] ) ) ,
181+ Arc :: new( StringArray :: from( vec![ Some ( "a" ) , Some ( "b" ) ] ) ) ,
182+ ] ,
183+ )
184+ . unwrap ( ) ;
185+
186+ {
187+ let mut builder =
188+ SqliteSidecarBuilder :: begin ( db, "models" , 1 , schema. clone ( ) , 0 , vec ! [ 1 ] ) . unwrap ( ) ;
189+ builder. push_batch ( & batch) . unwrap ( ) ;
190+ // builder dropped here without finish() → rollback
191+ }
192+
193+ // A fresh build on the same path must succeed (table does not already exist).
194+ assert ! (
195+ SqliteSidecarBuilder :: begin( db, "models" , 1 , schema, 0 , vec![ 1 ] ) . is_ok( ) ,
196+ "abandoned build must not persist its table"
197+ ) ;
198+ }
199+
200+ #[ tokio:: test]
201+ async fn test_stream_builder_uint64_keys ( ) {
202+ // The key column may be UInt64 (as well as Int64); it is stored as SQLite
203+ // INTEGER and looked up via the u64 fetch API.
204+ let dir = tempdir ( ) . unwrap ( ) ;
205+ let schema = Arc :: new ( Schema :: new ( vec ! [
206+ Field :: new( "rowid" , DataType :: UInt64 , false ) ,
207+ Field :: new( "name" , DataType :: Utf8 , true ) ,
208+ ] ) ) ;
209+ let batch = RecordBatch :: try_new (
210+ schema. clone ( ) ,
211+ vec ! [
212+ Arc :: new( UInt64Array :: from( vec![ 7_u64 , 11 ] ) ) ,
213+ Arc :: new( StringArray :: from( vec![ Some ( "x" ) , Some ( "y" ) ] ) ) ,
214+ ] ,
215+ )
216+ . unwrap ( ) ;
217+ let db_path = dir. path ( ) . join ( "u64.db" ) ;
218+ let mut builder =
219+ SqliteSidecarBuilder :: begin ( db_path. to_str ( ) . unwrap ( ) , "t" , 2 , schema, 0 , vec ! [ 1 ] ) . unwrap ( ) ;
220+ builder. push_batch ( & batch) . unwrap ( ) ;
221+ let provider = builder. finish ( ) . unwrap ( ) ;
222+
223+ let batches = provider. fetch_by_keys ( & [ 11 ] , "rowid" , None ) . await . unwrap ( ) ;
224+ assert_eq ! ( batches. iter( ) . map( |b| b. num_rows( ) ) . sum:: <usize >( ) , 1 ) ;
225+ }
226+
227+ #[ test]
228+ fn test_stream_builder_validation_errors ( ) {
229+ let dir = tempdir ( ) . unwrap ( ) ;
230+ let db = |n : & str | dir. path ( ) . join ( n) . to_str ( ) . unwrap ( ) . to_string ( ) ;
231+ let schema = Arc :: new ( Schema :: new ( vec ! [
232+ Field :: new( "rowid" , DataType :: Int64 , false ) ,
233+ Field :: new( "name" , DataType :: Utf8 , true ) ,
234+ ] ) ) ;
235+
236+ // pool_size must be >= 1.
237+ assert ! ( SqliteSidecarBuilder :: begin( & db( "a.db" ) , "t" , 0 , schema. clone( ) , 0 , vec![ 1 ] ) . is_err( ) ) ;
238+
239+ // schema has 2 fields → exactly 1 value column index expected, not 2.
240+ assert ! (
241+ SqliteSidecarBuilder :: begin( & db( "b.db" ) , "t" , 1 , schema. clone( ) , 0 , vec![ 1 , 2 ] ) . is_err( )
242+ ) ;
243+
244+ // key_col_index out of range for the pushed batch (2 columns, index 9).
245+ let mut b_oob =
246+ SqliteSidecarBuilder :: begin ( & db ( "c.db" ) , "t" , 1 , schema. clone ( ) , 9 , vec ! [ 1 ] ) . unwrap ( ) ;
247+ let two_col = RecordBatch :: try_new (
248+ schema. clone ( ) ,
249+ vec ! [
250+ Arc :: new( Int64Array :: from( vec![ 1_i64 ] ) ) ,
251+ Arc :: new( StringArray :: from( vec![ Some ( "a" ) ] ) ) ,
252+ ] ,
253+ )
254+ . unwrap ( ) ;
255+ assert ! ( b_oob. push_batch( & two_col) . is_err( ) ) ;
256+
257+ // A null key value is rejected.
258+ let nullable = Arc :: new ( Schema :: new ( vec ! [
259+ Field :: new( "rowid" , DataType :: Int64 , true ) ,
260+ Field :: new( "name" , DataType :: Utf8 , true ) ,
261+ ] ) ) ;
262+ let mut b_null =
263+ SqliteSidecarBuilder :: begin ( & db ( "d.db" ) , "t" , 1 , nullable. clone ( ) , 0 , vec ! [ 1 ] ) . unwrap ( ) ;
264+ let null_key = RecordBatch :: try_new (
265+ nullable,
266+ vec ! [
267+ Arc :: new( Int64Array :: from( vec![ None , Some ( 2 ) ] ) ) ,
268+ Arc :: new( StringArray :: from( vec![ Some ( "a" ) , Some ( "b" ) ] ) ) ,
269+ ] ,
270+ )
271+ . unwrap ( ) ;
272+ assert ! ( b_null. push_batch( & null_key) . is_err( ) ) ;
273+
274+ // A non-integer key column type is rejected.
275+ let text_key = Arc :: new ( Schema :: new ( vec ! [
276+ Field :: new( "rowid" , DataType :: Utf8 , false ) ,
277+ Field :: new( "name" , DataType :: Utf8 , true ) ,
278+ ] ) ) ;
279+ let mut b_text =
280+ SqliteSidecarBuilder :: begin ( & db ( "e.db" ) , "t" , 1 , text_key. clone ( ) , 0 , vec ! [ 1 ] ) . unwrap ( ) ;
281+ let text_batch = RecordBatch :: try_new (
282+ text_key,
283+ vec ! [
284+ Arc :: new( StringArray :: from( vec![ Some ( "k" ) ] ) ) ,
285+ Arc :: new( StringArray :: from( vec![ Some ( "a" ) ] ) ) ,
286+ ] ,
287+ )
288+ . unwrap ( ) ;
289+ assert ! ( b_text. push_batch( & text_batch) . is_err( ) ) ;
290+ }
291+
164292#[ tokio:: test]
165293async fn test_projection ( ) {
166294 let dir = tempdir ( ) . unwrap ( ) ;
0 commit comments