@@ -395,6 +395,98 @@ def test_default_bucket_name_failure(self):
395395 DEFAULT_GCP_PROJECT , "us-central1" , kms_key = "kmskey!" )),
396396 None )
397397
398+ @mock .patch ('google.cloud.resourcemanager_v3.ProjectsClient' )
399+ @mock .patch ('apache_beam.io.gcp.gcsio.GcsIO' )
400+ def test_get_or_create_default_gcs_bucket_ownership_match (
401+ self , mock_gcsio_class , mock_crm_class ):
402+ mock_gcsio = mock_gcsio_class .return_value
403+ mock_bucket = mock .Mock ()
404+ mock_bucket .project_number = 123456789
405+ mock_gcsio .get_bucket .return_value = mock_bucket
406+
407+ mock_crm_client = mock_crm_class .return_value
408+ mock_project_info = mock .Mock ()
409+ mock_project_info .name = 'projects/123456789'
410+ mock_crm_client .get_project .return_value = mock_project_info
411+
412+ options = SampleOptions (DEFAULT_GCP_PROJECT , 'us-central1' )
413+ bucket = gcsio .get_or_create_default_gcs_bucket (options )
414+
415+ self .assertEqual (bucket , mock_bucket )
416+ mock_gcsio .get_bucket .assert_called_once_with (
417+ 'dataflow-staging-us-central1-77b801c0838aee13391c0d1885860494' )
418+
419+ @mock .patch ('google.cloud.resourcemanager_v3.ProjectsClient' )
420+ @mock .patch ('apache_beam.io.gcp.gcsio.GcsIO' )
421+ def test_get_or_create_default_gcs_bucket_ownership_mismatch (
422+ self , mock_gcsio_class , mock_crm_class ):
423+ mock_gcsio = mock_gcsio_class .return_value
424+ mock_bucket = mock .Mock ()
425+ mock_bucket .project_number = 999999999
426+ mock_gcsio .get_bucket .return_value = mock_bucket
427+
428+ mock_crm_client = mock_crm_class .return_value
429+ mock_project_info = mock .Mock ()
430+ mock_project_info .name = 'projects/123456789'
431+ mock_crm_client .get_project .return_value = mock_project_info
432+
433+ options = SampleOptions (DEFAULT_GCP_PROJECT , 'us-central1' )
434+ with self .assertRaises (ValueError ) as ctx :
435+ gcsio .get_or_create_default_gcs_bucket (options )
436+
437+ self .assertIn (
438+ f'is not owned by project { DEFAULT_GCP_PROJECT } ' , str (ctx .exception ))
439+
440+ @mock .patch ('google.cloud.resourcemanager_v3.ProjectsClient' )
441+ @mock .patch ('apache_beam.io.gcp.gcsio.GcsIO' )
442+ def test_get_or_create_default_gcs_bucket_ownership_no_bucket_number (
443+ self , mock_gcsio_class , mock_crm_class ):
444+ mock_gcsio = mock_gcsio_class .return_value
445+ mock_bucket = mock .Mock ()
446+ mock_bucket .project_number = None
447+ mock_bucket .name = 'dataflow-staging-us-central1-77b801c0838aee13391c0d1885860494'
448+ mock_gcsio .get_bucket .return_value = mock_bucket
449+
450+ options = SampleOptions (DEFAULT_GCP_PROJECT , 'us-central1' )
451+ bucket = gcsio .get_or_create_default_gcs_bucket (options )
452+
453+ self .assertEqual (bucket , mock_bucket )
454+ mock_crm_class .assert_not_called ()
455+
456+ @mock .patch ('google.cloud.resourcemanager_v3.ProjectsClient' )
457+ @mock .patch ('apache_beam.io.gcp.gcsio.GcsIO' )
458+ def test_get_or_create_default_gcs_bucket_ownership_crm_error (
459+ self , mock_gcsio_class , mock_crm_class ):
460+ mock_gcsio = mock_gcsio_class .return_value
461+ mock_bucket = mock .Mock ()
462+ mock_bucket .project_number = 123456789
463+ mock_gcsio .get_bucket .return_value = mock_bucket
464+
465+ mock_crm_client = mock_crm_class .return_value
466+ mock_crm_client .get_project .side_effect = Exception ("API Disabled" )
467+
468+ options = SampleOptions (DEFAULT_GCP_PROJECT , 'us-central1' )
469+ bucket = gcsio .get_or_create_default_gcs_bucket (options )
470+
471+ # Should fall back to success (warn only)
472+ self .assertEqual (bucket , mock_bucket )
473+
474+ @mock .patch ('google.cloud.resourcemanager_v3.ProjectsClient' )
475+ @mock .patch ('apache_beam.io.gcp.gcsio.GcsIO' )
476+ def test_get_or_create_default_gcs_bucket_ownership_mock_project_number (
477+ self , mock_gcsio_class , mock_crm_class ):
478+ mock_gcsio = mock_gcsio_class .return_value
479+ # Creating a mock bucket without setting project_number (it returns another mock object)
480+ mock_bucket = mock .Mock ()
481+ mock_gcsio .get_bucket .return_value = mock_bucket
482+
483+ options = SampleOptions (DEFAULT_GCP_PROJECT , 'us-central1' )
484+ bucket = gcsio .get_or_create_default_gcs_bucket (options )
485+
486+ # Verification should be skipped, returning the mock bucket and never calling CRM
487+ self .assertEqual (bucket , mock_bucket )
488+ mock_crm_class .assert_not_called ()
489+
398490 def test_exists (self ):
399491 file_name = 'gs://gcsio-test/dummy_file'
400492 file_size = 1234
0 commit comments