@@ -55,7 +55,7 @@ use crate::metastore::wal::{WALIndexKey, WALRocksIndex};
5555
5656use crate :: table:: { Row , TableValue } ;
5757
58- use crate :: util:: lock:: acquire_lock;
58+ use crate :: util:: lock:: { acquire_lock, acquire_lock_duration } ;
5959use crate :: util:: WorkerLoop ;
6060use crate :: { meta_store_table_impl, CubeError } ;
6161use byteorder:: { BigEndian , ReadBytesExt , WriteBytesExt } ;
@@ -884,6 +884,10 @@ pub trait MetaStore: DIService + Send + Sync {
884884
885885 fn partition_table ( & self ) -> PartitionMetaStoreTable ;
886886 async fn create_partition ( & self , partition : Partition ) -> Result < IdRow < Partition > , CubeError > ;
887+ async fn create_partitions (
888+ & self ,
889+ partitions : Vec < Partition > ,
890+ ) -> Result < Vec < IdRow < Partition > > , CubeError > ;
887891 async fn get_partition ( & self , partition_id : u64 ) -> Result < IdRow < Partition > , CubeError > ;
888892 async fn get_partition_out_of_queue (
889893 & self ,
@@ -973,6 +977,13 @@ pub trait MetaStore: DIService + Send + Sync {
973977 & self ,
974978 index_id : u64 ,
975979 ) -> Result < Vec < IdRow < Partition > > , CubeError > ;
980+ /// Active partitions for each index id, positionally aligned with `index_ids`
981+ /// (result[i] corresponds to index_ids[i]). Returns a Vec rather than a map because the
982+ /// metastore RPC serializes with flexbuffers, which rejects non-string map keys.
983+ async fn get_active_partitions_for_indexes (
984+ & self ,
985+ index_ids : Vec < u64 > ,
986+ ) -> Result < Vec < Vec < IdRow < Partition > > > , CubeError > ;
976987 async fn get_index ( & self , index_id : u64 ) -> Result < IdRow < Index > , CubeError > ;
977988
978989 async fn get_index_with_active_partitions_out_of_queue (
@@ -2760,6 +2771,21 @@ impl MetaStore for RocksMetaStore {
27602771 . await
27612772 }
27622773
2774+ async fn create_partitions (
2775+ & self ,
2776+ partitions : Vec < Partition > ,
2777+ ) -> Result < Vec < IdRow < Partition > > , CubeError > {
2778+ self . write_operation ( "create_partitions" , move |db_ref, batch_pipe| {
2779+ let table = PartitionRocksTable :: new ( db_ref. clone ( ) ) ;
2780+ let mut result = Vec :: with_capacity ( partitions. len ( ) ) ;
2781+ for partition in partitions {
2782+ result. push ( table. insert ( partition, batch_pipe) ?) ;
2783+ }
2784+ Ok ( result)
2785+ } )
2786+ . await
2787+ }
2788+
27632789 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
27642790 async fn get_partition ( & self , partition_id : u64 ) -> Result < IdRow < Partition > , CubeError > {
27652791 self . read_operation ( "get_partition" , move |db_ref| {
@@ -2829,21 +2855,25 @@ impl MetaStore for RocksMetaStore {
28292855 // Single-flight: serialize the scan so a burst of concurrent callers
28302856 // (e.g. many partition writes during an import/repartition) share one
28312857 // computation instead of each materializing a full metastore scan.
2832- let _compute_guard =
2833- match acquire_lock ( "disk space compute" , self . disk_space_compute_lock . lock ( ) ) . await
2834- {
2835- Ok ( guard) => guard,
2836- Err ( e) => {
2837- log:: error!(
2858+ let _compute_guard = match acquire_lock_duration (
2859+ "disk space compute" ,
2860+ self . disk_space_compute_lock . lock ( ) ,
2861+ Duration :: from_millis ( self . store . config . disk_space_compute_lock_timeout_ms ( ) ) ,
2862+ )
2863+ . await
2864+ {
2865+ Ok ( guard) => guard,
2866+ Err ( e) => {
2867+ log:: error!(
28382868 "Timed out waiting for the disk space scan lock: {}. The single-flight \
28392869 scan is stuck; reporting 0 used disk space so the disk-space check \
28402870 passes. THE DISK-SPACE LIMIT IS NOT BEING ENFORCED until the scan \
28412871 recovers.",
28422872 e
28432873 ) ;
2844- return Ok ( 0 ) ;
2845- }
2846- } ;
2874+ return Ok ( 0 ) ;
2875+ }
2876+ } ;
28472877 if let Some ( sizes) = self . disk_space_cached ( ) . await ? {
28482878 sizes
28492879 } else {
@@ -3598,6 +3628,29 @@ impl MetaStore for RocksMetaStore {
35983628 . await
35993629 }
36003630
3631+ async fn get_active_partitions_for_indexes (
3632+ & self ,
3633+ index_ids : Vec < u64 > ,
3634+ ) -> Result < Vec < Vec < IdRow < Partition > > > , CubeError > {
3635+ self . read_operation_out_of_queue ( "get_active_partitions_for_indexes" , move |db_ref| {
3636+ let rocks_partition = PartitionRocksTable :: new ( db_ref) ;
3637+ let mut result = Vec :: with_capacity ( index_ids. len ( ) ) ;
3638+ for index_id in index_ids {
3639+ let partitions = rocks_partition
3640+ . get_rows_by_index (
3641+ & PartitionIndexKey :: ByIndexId ( index_id) ,
3642+ & PartitionRocksIndex :: IndexId ,
3643+ ) ?
3644+ . into_iter ( )
3645+ . filter ( |r| r. get_row ( ) . active )
3646+ . collect :: < Vec < _ > > ( ) ;
3647+ result. push ( partitions) ;
3648+ }
3649+ Ok ( result)
3650+ } )
3651+ . await
3652+ }
3653+
36013654 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
36023655 async fn get_index ( & self , index_id : u64 ) -> Result < IdRow < Index > , CubeError > {
36033656 self . read_operation ( "get_index" , move |db_ref| {
@@ -5801,6 +5854,151 @@ mod tests {
58015854 Ok ( ( ) )
58025855 }
58035856
5857+ #[ tokio:: test]
5858+ async fn get_active_partitions_for_indexes_test ( ) -> Result < ( ) , CubeError > {
5859+ init_test_logger ( ) . await ;
5860+
5861+ let ( _remote_fs, meta_store) =
5862+ RocksMetaStore :: prepare_test_metastore ( "get_active_partitions_for_indexes" ) ;
5863+
5864+ meta_store. create_schema ( "foo" . to_string ( ) , false ) . await ?;
5865+ let columns = vec ! [
5866+ Column :: new( "col1" . to_string( ) , ColumnType :: Int , 0 ) ,
5867+ Column :: new( "col2" . to_string( ) , ColumnType :: String , 1 ) ,
5868+ ] ;
5869+ // Two tables → two default indexes, each with its own initial active partition.
5870+ let table1 = meta_store
5871+ . create_table (
5872+ "foo" . to_string ( ) ,
5873+ "t1" . to_string ( ) ,
5874+ columns. clone ( ) ,
5875+ None ,
5876+ None ,
5877+ vec ! [ ] ,
5878+ true ,
5879+ None ,
5880+ None ,
5881+ None ,
5882+ None ,
5883+ None ,
5884+ None ,
5885+ None ,
5886+ None ,
5887+ None ,
5888+ false ,
5889+ None ,
5890+ )
5891+ . await ?;
5892+ let table2 = meta_store
5893+ . create_table (
5894+ "foo" . to_string ( ) ,
5895+ "t2" . to_string ( ) ,
5896+ columns. clone ( ) ,
5897+ None ,
5898+ None ,
5899+ vec ! [ ] ,
5900+ true ,
5901+ None ,
5902+ None ,
5903+ None ,
5904+ None ,
5905+ None ,
5906+ None ,
5907+ None ,
5908+ None ,
5909+ None ,
5910+ false ,
5911+ None ,
5912+ )
5913+ . await ?;
5914+
5915+ let index1 = meta_store. get_default_index ( table1. get_id ( ) ) . await ?;
5916+ let index2 = meta_store. get_default_index ( table2. get_id ( ) ) . await ?;
5917+
5918+ let single1 = meta_store
5919+ . get_active_partitions_by_index_id ( index1. get_id ( ) )
5920+ . await ?;
5921+ let single2 = meta_store
5922+ . get_active_partitions_by_index_id ( index2. get_id ( ) )
5923+ . await ?;
5924+
5925+ // Batch result is positionally aligned with the requested ids; it must match the
5926+ // per-index calls and return an empty vec (not an error) for the unknown index.
5927+ let unknown_index_id = index2. get_id ( ) + 1000 ;
5928+ let batch = meta_store
5929+ . get_active_partitions_for_indexes ( vec ! [
5930+ index1. get_id( ) ,
5931+ index2. get_id( ) ,
5932+ unknown_index_id,
5933+ ] )
5934+ . await ?;
5935+
5936+ let ids = |ps : & Vec < IdRow < Partition > > | ps. iter ( ) . map ( |p| p. get_id ( ) ) . collect :: < Vec < _ > > ( ) ;
5937+ assert_eq ! ( batch. len( ) , 3 ) ;
5938+ assert_eq ! ( ids( & batch[ 0 ] ) , ids( & single1) ) ;
5939+ assert_eq ! ( ids( & batch[ 1 ] ) , ids( & single2) ) ;
5940+ assert ! ( batch[ 2 ] . is_empty( ) ) ;
5941+
5942+ Ok ( ( ) )
5943+ }
5944+
5945+ #[ tokio:: test]
5946+ async fn create_partitions_test ( ) -> Result < ( ) , CubeError > {
5947+ init_test_logger ( ) . await ;
5948+
5949+ let ( _remote_fs, meta_store) = RocksMetaStore :: prepare_test_metastore ( "create_partitions" ) ;
5950+
5951+ meta_store. create_schema ( "foo" . to_string ( ) , false ) . await ?;
5952+ let columns = vec ! [ Column :: new( "col1" . to_string( ) , ColumnType :: Int , 0 ) ] ;
5953+ let table = meta_store
5954+ . create_table (
5955+ "foo" . to_string ( ) ,
5956+ "t1" . to_string ( ) ,
5957+ columns,
5958+ None ,
5959+ None ,
5960+ vec ! [ ] ,
5961+ true ,
5962+ None ,
5963+ None ,
5964+ None ,
5965+ None ,
5966+ None ,
5967+ None ,
5968+ None ,
5969+ None ,
5970+ None ,
5971+ false ,
5972+ None ,
5973+ )
5974+ . await ?;
5975+ let index = meta_store. get_default_index ( table. get_id ( ) ) . await ?;
5976+ let parent = meta_store
5977+ . get_active_partitions_by_index_id ( index. get_id ( ) )
5978+ . await ?[ 0 ]
5979+ . clone ( ) ;
5980+
5981+ let created = meta_store
5982+ . create_partitions ( vec ! [
5983+ Partition :: new_child( & parent, None ) ,
5984+ Partition :: new_child( & parent, None ) ,
5985+ ] )
5986+ . await ?;
5987+
5988+ assert_eq ! ( created. len( ) , 2 ) ;
5989+ assert_ne ! ( created[ 0 ] . get_id( ) , created[ 1 ] . get_id( ) ) ;
5990+ // Both rows must be persisted and point at the same parent partition.
5991+ for child in & created {
5992+ let fetched = meta_store. get_partition ( child. get_id ( ) ) . await ?;
5993+ assert_eq ! (
5994+ fetched. get_row( ) . parent_partition_id( ) ,
5995+ & Some ( parent. get_id( ) )
5996+ ) ;
5997+ }
5998+
5999+ Ok ( ( ) )
6000+ }
6001+
58046002 #[ tokio:: test]
58056003 async fn table_test ( ) -> Result < ( ) , CubeError > {
58066004 init_test_logger ( ) . await ;
0 commit comments