@@ -16,7 +16,6 @@ class TestFunctionWithSelfManagedKafka(BaseTest):
1616 @pytest .mark .flaky (reruns = 5 )
1717 @parameterized .expand (
1818 [
19- "combination/function_with_self_managed_kafka" ,
2019 "combination/function_with_self_managed_kafka_intrinsics" ,
2120 ]
2221 )
@@ -30,3 +29,29 @@ def test_function_with_self_managed_kafka(self, file_name):
3029 event_source_mapping_result = lambda_client .get_event_source_mapping (UUID = event_source_mapping_id )
3130 event_source_mapping_function_arn = event_source_mapping_result ["FunctionArn" ]
3231 self .assertEqual (event_source_mapping_function_arn , lambda_function_arn )
32+
33+ @parameterized .expand (["combination/function_with_self_managed_kafka" ])
34+ def test_function_with_self_managed_kafka_with_provisioned_mode (self , file_name ):
35+ self .create_and_verify_stack (file_name )
36+ # Get the notification configuration and make sure Lambda Function connection is added
37+ lambda_client = self .client_provider .lambda_client
38+ function_name = self .get_physical_id_by_type ("AWS::Lambda::Function" )
39+ lambda_function_arn = lambda_client .get_function_configuration (FunctionName = function_name )["FunctionArn" ]
40+ event_source_mapping_id = self .get_physical_id_by_type ("AWS::Lambda::EventSourceMapping" )
41+ event_source_mapping_result = lambda_client .get_event_source_mapping (UUID = event_source_mapping_id )
42+ event_source_mapping_function_arn = event_source_mapping_result ["FunctionArn" ]
43+ self .assertEqual (event_source_mapping_function_arn , lambda_function_arn )
44+
45+ # Verify error handling properties are correctly set
46+ self .assertTrue (event_source_mapping_result .get ("BisectBatchOnFunctionError" ))
47+ self .assertEqual (event_source_mapping_result .get ("MaximumRecordAgeInSeconds" ), 3600 )
48+ self .assertEqual (event_source_mapping_result .get ("MaximumRetryAttempts" ), 3 )
49+ self .assertEqual (event_source_mapping_result .get ("FunctionResponseTypes" ), ["ReportBatchItemFailures" ])
50+ # Uncomment this once SDK is updated.
51+ # provisioned_poller_config = event_source_mapping_result["ProvisionedPollerConfig"]
52+ # actual_poller_group_name = provisioned_poller_config["PollerGroupName"]
53+ # self.assertEqual(
54+ # actual_poller_group_name,
55+ # "test1",
56+ # f"Expected PollerGroupName to be 'test1' but got '{actual_poller_group_name}'",
57+ # )
0 commit comments