77// the Business Source License, use of this software will be governed
88// by the Apache License, Version 2.0.
99
10- use std:: path:: PathBuf ;
10+ use std:: path:: { Path , PathBuf } ;
1111use std:: sync:: Arc ;
1212
1313use async_trait:: async_trait;
@@ -114,6 +114,19 @@ impl DatasetRepositoryLocalFs {
114114 self . storage_strategy . get_dataset_path ( & dataset_handle) ,
115115 ) )
116116 }
117+
118+ fn get_canonical_path_param ( dataset_path : & Path ) -> Result < ( PathBuf , String ) , InternalError > {
119+ let canonical_dataset_path = std:: fs:: canonicalize ( dataset_path) . int_err ( ) ?;
120+ let dataset_name_str = canonical_dataset_path
121+ . to_str ( )
122+ . unwrap ( )
123+ . split ( '/' )
124+ . last ( )
125+ . unwrap ( )
126+ . to_owned ( ) ;
127+
128+ Ok ( ( canonical_dataset_path, dataset_name_str) )
129+ }
117130}
118131
119132/////////////////////////////////////////////////////////////////////////////////////////
@@ -241,7 +254,21 @@ impl DatasetRepository for DatasetRepositoryLocalFs {
241254
242255 // It's okay to create a new dataset by this point
243256 let dataset_id = seed_block. event . dataset_id . clone ( ) ;
244- let dataset_handle = DatasetHandle :: new ( dataset_id, dataset_alias. clone ( ) ) ;
257+ let dataset_handle = if let Some ( account_name) = & dataset_alias. account_name {
258+ let ( _, canonical_account_name) = self
259+ . storage_strategy
260+ . resolve_account_dir ( account_name)
261+ . int_err ( ) ?;
262+ let canonical_dataset_alias = DatasetAlias :: new (
263+ Some ( canonical_account_name) ,
264+ dataset_alias. dataset_name . clone ( ) ,
265+ ) ;
266+
267+ DatasetHandle :: new ( dataset_id, canonical_dataset_alias)
268+ } else {
269+ DatasetHandle :: new ( dataset_id, dataset_alias. clone ( ) )
270+ } ;
271+
245272 let dataset_path = self . storage_strategy . get_dataset_path ( & dataset_handle) ;
246273 let layout = DatasetLayout :: create ( & dataset_path) . int_err ( ) ?;
247274 let dataset = Self :: build_dataset ( layout, self . event_bus . clone ( ) ) ;
@@ -430,6 +457,11 @@ trait DatasetStorageStrategy: Sync + Send {
430457 dataset_handle : & DatasetHandle ,
431458 new_name : & DatasetName ,
432459 ) -> Result < ( ) , InternalError > ;
460+
461+ fn resolve_account_dir (
462+ & self ,
463+ account_name : & AccountName ,
464+ ) -> Result < ( PathBuf , AccountName ) , ResolveDatasetError > ;
433465}
434466
435467#[ derive( thiserror:: Error , Debug ) ]
@@ -476,10 +508,11 @@ impl DatasetSingleTenantStorageStrategy {
476508 & self ,
477509 dataset_path : & PathBuf ,
478510 dataset_alias : & DatasetAlias ,
479- ) -> Result < DatasetSummary , ResolveDatasetError > {
511+ ) -> Result < ( DatasetSummary , DatasetAlias ) , ResolveDatasetError > {
480512 let layout = DatasetLayout :: new ( dataset_path) ;
481513 let dataset = DatasetRepositoryLocalFs :: build_dataset ( layout, self . event_bus . clone ( ) ) ;
482- dataset
514+
515+ let dataset_summary = dataset
483516 . get_summary ( GetSummaryOpts :: default ( ) )
484517 . await
485518 . map_err ( |e| {
@@ -490,40 +523,31 @@ impl DatasetSingleTenantStorageStrategy {
490523 } else {
491524 ResolveDatasetError :: Internal ( e. int_err ( ) )
492525 }
493- } )
494- }
526+ } ) ?;
495527
496- async fn attempt_resolve_dataset_alias (
497- & self ,
498- dataset_alias : & DatasetAlias ,
499- ) -> Result < DatasetHandle , ResolveDatasetError > {
500- assert ! (
501- !dataset_alias. is_multi_tenant( )
502- || dataset_alias. account_name. as_ref( ) . unwrap( ) == DEFAULT_ACCOUNT_NAME ,
503- "Multi-tenant refs shouldn't have reached down to here with earlier validations"
504- ) ;
505-
506- let dataset_path = self . dataset_path_impl ( dataset_alias) ;
507- if !dataset_path. exists ( ) {
508- return Err ( ResolveDatasetError :: NotFound ( DatasetNotFoundError {
509- dataset_ref : dataset_alias. as_local_ref ( ) ,
510- } ) ) ;
511- }
528+ let ( _, canonical_dataset_name) =
529+ DatasetRepositoryLocalFs :: get_canonical_path_param ( dataset_path) ?;
530+ let canonical_dataset_alias = DatasetAlias {
531+ dataset_name : DatasetName :: new_unchecked ( canonical_dataset_name. as_str ( ) ) ,
532+ account_name : None ,
533+ } ;
512534
513- self . resolve_dataset_handle ( & dataset_path, dataset_alias)
514- . await
535+ Ok ( ( dataset_summary, canonical_dataset_alias) )
515536 }
516537
517538 async fn resolve_dataset_handle (
518539 & self ,
519540 dataset_path : & PathBuf ,
520541 dataset_alias : & DatasetAlias ,
521542 ) -> Result < DatasetHandle , ResolveDatasetError > {
522- let summary = self
543+ let ( summary, canonical_dataset_alias ) = self
523544 . attempt_resolving_summary_via_path ( dataset_path, dataset_alias)
524545 . await ?;
525546
526- Ok ( DatasetHandle :: new ( summary. id , dataset_alias. clone ( ) ) )
547+ Ok ( DatasetHandle :: new (
548+ summary. id ,
549+ canonical_dataset_alias. clone ( ) ,
550+ ) )
527551 }
528552}
529553
@@ -550,7 +574,7 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
550574 }
551575 let dataset_name = DatasetName :: try_from( & dataset_dir_entry. file_name( ) ) . int_err( ) ?;
552576 let dataset_alias = DatasetAlias :: new( None , dataset_name) ;
553- match self . attempt_resolve_dataset_alias ( & dataset_alias) . await {
577+ match self . resolve_dataset_handle ( & dataset_dir_entry . path ( ) , & dataset_alias) . await {
554578 Ok ( hdl) => { yield hdl; Ok ( ( ) ) }
555579 Err ( ResolveDatasetError :: NotFound ( _) ) => Ok ( ( ) ) ,
556580 Err ( e) => Err ( e. int_err( ) )
@@ -623,12 +647,12 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
623647
624648 let dataset_path = self . dataset_path_impl ( & alias) ;
625649
626- let summary = self
650+ let ( summary, canonical_dataset_alias ) = self
627651 . attempt_resolving_summary_via_path ( & dataset_path, & alias)
628652 . await ?;
629653
630654 if summary. id == * dataset_id {
631- return Ok ( DatasetHandle :: new ( summary. id , alias ) ) ;
655+ return Ok ( DatasetHandle :: new ( summary. id , canonical_dataset_alias ) ) ;
632656 }
633657 }
634658
@@ -656,6 +680,16 @@ impl DatasetStorageStrategy for DatasetSingleTenantStorageStrategy {
656680 std:: fs:: rename ( old_dataset_path, new_dataset_path) . int_err ( ) ?;
657681 Ok ( ( ) )
658682 }
683+
684+ fn resolve_account_dir (
685+ & self ,
686+ _account_name : & AccountName ,
687+ ) -> Result < ( PathBuf , AccountName ) , ResolveDatasetError > {
688+ Ok ( (
689+ self . root . join ( DEFAULT_ACCOUNT_NAME ) ,
690+ AccountName :: new_unchecked ( DEFAULT_ACCOUNT_NAME ) ,
691+ ) )
692+ }
659693}
660694
661695/////////////////////////////////////////////////////////////////////////////////////////
@@ -772,32 +806,6 @@ impl DatasetMultiTenantStorageStrategy {
772806 }
773807 } )
774808 }
775-
776- fn resolve_account_dir (
777- & self ,
778- account_name : & AccountName ,
779- ) -> Result < PathBuf , ResolveDatasetError > {
780- let account_dataset_dir_path = self . root . join ( account_name) ;
781-
782- if !account_dataset_dir_path. is_dir ( ) {
783- let read_accout_dirs = std:: fs:: read_dir ( self . root . as_path ( ) ) . int_err ( ) ?;
784-
785- for read_accout_dir in read_accout_dirs {
786- let account_dir_name = AccountName :: new_unchecked (
787- read_accout_dir
788- . int_err ( ) ?
789- . file_name ( )
790- . to_str ( )
791- . unwrap_or ( "" ) ,
792- ) ;
793- if account_name == & account_dir_name {
794- return Ok ( self . root . join ( account_dir_name) ) ;
795- }
796- }
797- }
798-
799- Ok ( account_dataset_dir_path)
800- }
801809}
802810
803811#[ async_trait]
@@ -862,10 +870,14 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy {
862870 dataset_alias : & DatasetAlias ,
863871 ) -> Result < DatasetHandle , ResolveDatasetError > {
864872 let effective_account_name = self . effective_account_name ( dataset_alias) ;
865- let account_dataset_dir_path = self . resolve_account_dir ( effective_account_name) ?;
873+ let ( account_dataset_dir_path, _ ) = self . resolve_account_dir ( effective_account_name) ?;
866874
867875 if account_dataset_dir_path. is_dir ( ) {
868- let read_dataset_dir = std:: fs:: read_dir ( account_dataset_dir_path) . int_err ( ) ?;
876+ let read_dataset_dir = std:: fs:: read_dir ( account_dataset_dir_path) . map_err ( |_| {
877+ ResolveDatasetError :: NotFound ( DatasetNotFoundError {
878+ dataset_ref : dataset_alias. as_local_ref ( ) ,
879+ } )
880+ } ) ?;
869881
870882 for r_dataset_dir in read_dataset_dir {
871883 let dataset_dir_entry = r_dataset_dir. int_err ( ) ?;
@@ -962,6 +974,39 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy {
962974
963975 Ok ( ( ) )
964976 }
977+
978+ fn resolve_account_dir (
979+ & self ,
980+ account_name : & AccountName ,
981+ ) -> Result < ( PathBuf , AccountName ) , ResolveDatasetError > {
982+ let account_dataset_dir_path = self . root . join ( account_name) ;
983+
984+ if !account_dataset_dir_path. is_dir ( ) {
985+ let read_accout_dirs = std:: fs:: read_dir ( self . root . as_path ( ) ) . int_err ( ) ?;
986+
987+ for read_accout_dir in read_accout_dirs {
988+ let account_dir_name = AccountName :: new_unchecked (
989+ read_accout_dir
990+ . int_err ( ) ?
991+ . file_name ( )
992+ . to_str ( )
993+ . unwrap_or ( "" ) ,
994+ ) ;
995+ if account_name == & account_dir_name {
996+ return Ok ( ( self . root . join ( & account_dir_name) , account_dir_name) ) ;
997+ }
998+ }
999+ return Ok ( ( account_dataset_dir_path, account_name. clone ( ) ) ) ;
1000+ }
1001+
1002+ let ( canonical_account_dataset_dir_path, canonical_account_name) =
1003+ DatasetRepositoryLocalFs :: get_canonical_path_param ( & account_dataset_dir_path) ?;
1004+
1005+ Ok ( (
1006+ canonical_account_dataset_dir_path,
1007+ AccountName :: new_unchecked ( canonical_account_name. as_str ( ) ) ,
1008+ ) )
1009+ }
9651010}
9661011
9671012/////////////////////////////////////////////////////////////////////////////////////////
0 commit comments