44
55from integration .config .service_names import MSK
66from integration .helpers .base_test import BaseTest , nonblocking
7- from integration .helpers .resource import current_region_does_not_support , generate_suffix
7+ from integration .helpers .resource import current_region_does_not_support
88
99
10- # Mark this test suite as nonblocking tests since MSK Cluster creation can take
11- # up to 30 minutes according to https://docs.aws.amazon.com/msk/latest/developerguide/troubleshooting.html#troubleshooting-cluster-stuck
12- # This would cause the test to fail due to MSK Cluster did not stablize.
13- # We should investigate any other cause of failures.
1410@skipIf (current_region_does_not_support ([MSK ]), "MSK is not supported in this testing region" )
1511@nonblocking
1612class TestFunctionWithMsk (BaseTest ):
@@ -19,38 +15,25 @@ def companion_stack_outputs(self, get_companion_stack_outputs):
1915 self .companion_stack_outputs = get_companion_stack_outputs
2016
2117 def test_function_with_msk_trigger (self ):
22- companion_stack_outputs = self .companion_stack_outputs
23- parameters = self .get_parameters (companion_stack_outputs )
24- cluster_name = "MskCluster-" + generate_suffix ()
25- parameters .append (self .generate_parameter ("MskClusterName" , cluster_name ))
18+ parameters = self .get_parameters ()
2619 self ._common_validations_for_MSK ("combination/function_with_msk" , parameters )
2720
2821 def test_function_with_msk_trigger_using_manage_policy (self ):
29- companion_stack_outputs = self .companion_stack_outputs
30- parameters = self .get_parameters (companion_stack_outputs )
31- cluster_name = "MskCluster2-" + generate_suffix ()
32- parameters .append (self .generate_parameter ("MskClusterName2" , cluster_name ))
22+ parameters = self .get_parameters ()
3323 self ._common_validations_for_MSK ("combination/function_with_msk_using_managed_policy" , parameters )
3424
3525 def test_function_with_msk_trigger_and_s3_onfailure_events_destinations (self ):
36- companion_stack_outputs = self .companion_stack_outputs
37- parameters = self .get_parameters (companion_stack_outputs )
38- cluster_name = "MskCluster3-" + generate_suffix ()
39- parameters .append (self .generate_parameter ("MskClusterName3" , cluster_name ))
26+ parameters = self .get_parameters ()
4027 self ._common_validations_for_MSK (
4128 "combination/function_with_msk_trigger_and_s3_onfailure_events_destinations" , parameters
4229 )
4330
4431 def test_function_with_msk_trigger_and_premium_features (self ):
45- companion_stack_outputs = self .companion_stack_outputs
46- parameters = self .get_parameters (companion_stack_outputs )
47- cluster_name = "MskCluster4-" + generate_suffix ()
48- parameters .append (self .generate_parameter ("MskClusterName4" , cluster_name ))
32+ parameters = self .get_parameters ()
4933 self ._common_validations_for_MSK ("combination/function_with_msk_trigger_and_premium_features" , parameters )
5034 event_source_mapping_result = self ._common_validations_for_MSK (
5135 "combination/function_with_msk_trigger_and_confluent_schema_registry" , parameters
5236 )
53- # Verify error handling properties are correctly set
5437 self .assertTrue (event_source_mapping_result .get ("BisectBatchOnFunctionError" ))
5538 self .assertEqual (event_source_mapping_result .get ("MaximumRecordAgeInSeconds" ), 3600 )
5639 self .assertEqual (event_source_mapping_result .get ("MaximumRetryAttempts" ), 3 )
@@ -59,15 +42,8 @@ def test_function_with_msk_trigger_and_premium_features(self):
5942 def _common_validations_for_MSK (self , file_name , parameters ):
6043 self .create_and_verify_stack (file_name , parameters )
6144
62- kafka_client = self .client_provider . kafka_client
45+ msk_cluster_arn = self .companion_stack_outputs [ "PreCreatedMskClusterArn" ]
6346
64- msk_cluster_id = self .get_physical_id_by_type ("AWS::MSK::Cluster" )
65- cluster_info_list = kafka_client .list_clusters ()["ClusterInfoList" ]
66- cluster_info = [x for x in cluster_info_list if x ["ClusterArn" ] == msk_cluster_id ]
67-
68- self .assertEqual (len (cluster_info ), 1 , "One MSK cluster should be present" )
69-
70- msk_cluster_arn = cluster_info [0 ]["ClusterArn" ]
7147 lambda_client = self .client_provider .lambda_client
7248 function_name = self .get_physical_id_by_type ("AWS::Lambda::Function" )
7349 lambda_function_arn = lambda_client .get_function_configuration (FunctionName = function_name )["FunctionArn" ]
@@ -82,8 +58,7 @@ def _common_validations_for_MSK(self, file_name, parameters):
8258 self .assertEqual (event_source_mapping_kafka_cluster_arn , msk_cluster_arn )
8359 return event_source_mapping_result
8460
85- def get_parameters (self , dictionary ):
86- parameters = []
87- parameters .append (self .generate_parameter ("PreCreatedSubnetOne" , dictionary ["PreCreatedSubnetOne" ]))
88- parameters .append (self .generate_parameter ("PreCreatedSubnetTwo" , dictionary ["PreCreatedSubnetTwo" ]))
89- return parameters
61+ def get_parameters (self ):
62+ return [
63+ self .generate_parameter ("PreCreatedMskClusterArn" , self .companion_stack_outputs ["PreCreatedMskClusterArn" ]),
64+ ]
0 commit comments