@@ -44,12 +44,16 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4444 Box < dyn Fn ( & VssError ) -> bool + ' static + Send + Sync > ,
4545> ;
4646
47+ #[ derive( Debug , PartialEq ) ]
4748enum VssSchemaVersion {
4849 // The initial schema version.
4950 //
5051 // This used an empty `aad` and unobfuscated `primary_namespace`/`secondary_namespace`s in the
5152 // stored key.
5253 V0 ,
54+ // The second deployed schema version.
55+ // Here we started to obfuscate the primary and secondary namespaces and the obfuscated `store_key` (`obfuscate(primary_namespace#secondary_namespace)#obfuscate(key)`) is now used as `aad` for encryption, ensuring that the encrypted blobs commit to the key they're stored under.
56+ V1 ,
5357}
5458
5559// We set this to a small number of threads that would still allow to make some progress if one
@@ -322,9 +326,10 @@ impl Drop for VssStore {
322326}
323327
324328struct VssStoreInner {
329+ schema_version : VssSchemaVersion ,
325330 client : VssClient < CustomRetryPolicy > ,
326331 store_id : String ,
327- storable_builder : StorableBuilder < RandEntropySource > ,
332+ data_encryption_key : [ u8 ; 32 ] ,
328333 key_obfuscator : KeyObfuscator ,
329334 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
330335 // The lock also encapsulates the latest written version per key.
@@ -336,10 +341,10 @@ impl VssStoreInner {
336341 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
337342 header_provider : Arc < dyn VssHeaderProvider > ,
338343 ) -> Self {
344+ let schema_version = VssSchemaVersion :: V0 ;
339345 let ( data_encryption_key, obfuscation_master_key) =
340346 derive_data_encryption_and_obfuscation_keys ( & vss_seed) ;
341347 let key_obfuscator = KeyObfuscator :: new ( obfuscation_master_key) ;
342- let storable_builder = StorableBuilder :: new ( data_encryption_key, RandEntropySource ) ;
343348 let retry_policy = ExponentialBackoffRetryPolicy :: new ( Duration :: from_millis ( 10 ) )
344349 . with_max_attempts ( 10 )
345350 . with_max_total_delay ( Duration :: from_secs ( 15 ) )
@@ -355,7 +360,7 @@ impl VssStoreInner {
355360
356361 let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
357362 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
358- Self { client, store_id, storable_builder , key_obfuscator, locks }
363+ Self { schema_version , client, store_id, data_encryption_key , key_obfuscator, locks }
359364 }
360365
361366 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -366,17 +371,45 @@ impl VssStoreInner {
366371 fn build_obfuscated_key (
367372 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
368373 ) -> String {
369- let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
370- if primary_namespace. is_empty ( ) {
371- obfuscated_key
374+ if self . schema_version == VssSchemaVersion :: V1 {
375+ let obfuscated_prefix =
376+ self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
377+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
378+ format ! ( "{}#{}" , obfuscated_prefix, obfuscated_key)
379+ } else {
380+ // Default to V0 schema
381+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
382+ if primary_namespace. is_empty ( ) {
383+ obfuscated_key
384+ } else {
385+ format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
386+ }
387+ }
388+ }
389+
390+ fn build_obfuscated_prefix (
391+ & self , primary_namespace : & str , secondary_namespace : & str ,
392+ ) -> String {
393+ if self . schema_version == VssSchemaVersion :: V1 {
394+ let prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
395+ self . key_obfuscator . obfuscate ( & prefix)
372396 } else {
373- format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
397+ // Default to V0 schema
398+ format ! ( "{}#{}" , primary_namespace, secondary_namespace)
374399 }
375400 }
376401
377402 fn extract_key ( & self , unified_key : & str ) -> io:: Result < String > {
378- let mut parts = unified_key. splitn ( 3 , '#' ) ;
379- let ( _primary_namespace, _secondary_namespace) = ( parts. next ( ) , parts. next ( ) ) ;
403+ let mut parts = if self . schema_version == VssSchemaVersion :: V1 {
404+ let mut parts = unified_key. splitn ( 2 , '#' ) ;
405+ let _obfuscated_namespace = parts. next ( ) ;
406+ parts
407+ } else {
408+ // Default to V0 schema
409+ let mut parts = unified_key. splitn ( 3 , '#' ) ;
410+ let ( _primary_namespace, _secondary_namespace) = ( parts. next ( ) , parts. next ( ) ) ;
411+ parts
412+ } ;
380413 match parts. next ( ) {
381414 Some ( obfuscated_key) => {
382415 let actual_key = self . key_obfuscator . deobfuscate ( obfuscated_key) ?;
@@ -391,7 +424,7 @@ impl VssStoreInner {
391424 ) -> io:: Result < Vec < String > > {
392425 let mut page_token = None ;
393426 let mut keys = vec ! [ ] ;
394- let key_prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
427+ let key_prefix = self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
395428 while page_token != Some ( "" . to_string ( ) ) {
396429 let request = ListKeyVersionsRequest {
397430 store_id : self . store_id . clone ( ) ,
@@ -421,9 +454,8 @@ impl VssStoreInner {
421454 ) -> io:: Result < Vec < u8 > > {
422455 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, Some ( & key) , "read" ) ?;
423456
424- let obfuscated_key =
425- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
426- let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : obfuscated_key } ;
457+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
458+ let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : store_key. clone ( ) } ;
427459 let resp = self . client . get_object ( & request) . await . map_err ( |e| {
428460 let msg = format ! (
429461 "Failed to read from key {}/{}/{}: {}" ,
@@ -445,7 +477,11 @@ impl VssStoreInner {
445477 Error :: new ( ErrorKind :: Other , msg)
446478 } ) ?;
447479
448- Ok ( self . storable_builder . deconstruct ( storable) ?. 0 )
480+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
481+ let aad =
482+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
483+ let decrypted = storable_builder. deconstruct ( storable, & self . data_encryption_key , aad) ?. 0 ;
484+ Ok ( decrypted)
449485 }
450486
451487 async fn write_internal (
@@ -459,22 +495,25 @@ impl VssStoreInner {
459495 "write" ,
460496 ) ?;
461497
462- self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
463- let obfuscated_key =
464- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
465- let vss_version = -1 ;
466- let storable = self . storable_builder . build ( buf, vss_version) ;
467- let request = PutObjectRequest {
468- store_id : self . store_id . clone ( ) ,
469- global_version : None ,
470- transaction_items : vec ! [ KeyValue {
471- key: obfuscated_key,
472- version: vss_version,
473- value: storable. encode_to_vec( ) ,
474- } ] ,
475- delete_items : vec ! [ ] ,
476- } ;
498+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
499+ let vss_version = -1 ;
500+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
501+ let aad =
502+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
503+ let storable =
504+ storable_builder. build ( buf. to_vec ( ) , vss_version, & self . data_encryption_key , aad) ;
505+ let request = PutObjectRequest {
506+ store_id : self . store_id . clone ( ) ,
507+ global_version : None ,
508+ transaction_items : vec ! [ KeyValue {
509+ key: store_key,
510+ version: vss_version,
511+ value: storable. encode_to_vec( ) ,
512+ } ] ,
513+ delete_items : vec ! [ ] ,
514+ } ;
477515
516+ self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
478517 self . client . put_object ( & request) . await . map_err ( |e| {
479518 let msg = format ! (
480519 "Failed to write to key {}/{}/{}: {}" ,
0 commit comments