diff --git a/.cfnlintrc.yaml b/.cfnlintrc.yaml index 23172d0767..55be3cc2ea 100644 --- a/.cfnlintrc.yaml +++ b/.cfnlintrc.yaml @@ -139,6 +139,8 @@ ignore_templates: - tests/translator/output/**/managed_policies_everything.json # intentionally contains wrong arns - tests/translator/output/**/function_with_provisioned_poller_config.json - tests/translator/output/**/function_with_metrics_config.json + - tests/translator/output/**/function_with_self_managed_kafka_and_schema_registry.json # cfnlint is not updated to recognize the SchemaRegistryConfig property + - tests/translator/output/**/function_with_msk_with_schema_registry_config.json # cfnlint is not updated to recognize the SchemaRegistryConfig property ignore_checks: - E2531 # Deprecated runtime; not relevant for transform tests diff --git a/integration/combination/test_function_with_msk.py b/integration/combination/test_function_with_msk.py index 5a9f7b0d5c..472a128f4e 100644 --- a/integration/combination/test_function_with_msk.py +++ b/integration/combination/test_function_with_msk.py @@ -41,6 +41,15 @@ def test_function_with_msk_trigger_and_s3_onfailure_events_destinations(self): "combination/function_with_msk_trigger_and_s3_onfailure_events_destinations", parameters ) + def test_function_with_msk_trigger_and_confluent_schema_registry(self): + companion_stack_outputs = self.companion_stack_outputs + parameters = self.get_parameters(companion_stack_outputs) + cluster_name = "MskCluster4-" + generate_suffix() + parameters.append(self.generate_parameter("MskClusterName4", cluster_name)) + self._common_validations_for_MSK( + "combination/function_with_msk_trigger_and_confluent_schema_registry", parameters + ) + def _common_validations_for_MSK(self, file_name, parameters): self.create_and_verify_stack(file_name, parameters) diff --git a/integration/resources/expected/combination/function_with_msk_trigger_and_confluent_schema_registry.json b/integration/resources/expected/combination/function_with_msk_trigger_and_confluent_schema_registry.json new file mode 100644 index 0000000000..e45f42ea51 --- /dev/null +++ b/integration/resources/expected/combination/function_with_msk_trigger_and_confluent_schema_registry.json @@ -0,0 +1,22 @@ +[ + { + "LogicalResourceId": "MyMskStreamProcessor", + "ResourceType": "AWS::Lambda::Function" + }, + { + "LogicalResourceId": "MyLambdaExecutionRole", + "ResourceType": "AWS::IAM::Role" + }, + { + "LogicalResourceId": "MyMskCluster", + "ResourceType": "AWS::MSK::Cluster" + }, + { + "LogicalResourceId": "MyMskStreamProcessorMyMskEvent", + "ResourceType": "AWS::Lambda::EventSourceMapping" + }, + { + "LogicalResourceId": "PreCreatedS3Bucket", + "ResourceType": "AWS::S3::Bucket" + } +] diff --git a/integration/resources/templates/combination/function_with_msk.yaml b/integration/resources/templates/combination/function_with_msk.yaml index 5f6eecca8c..6b72c36e16 100644 --- a/integration/resources/templates/combination/function_with_msk.yaml +++ b/integration/resources/templates/combination/function_with_msk.yaml @@ -45,7 +45,7 @@ Resources: VolumeSize: 1 ClusterName: Ref: MskClusterName - KafkaVersion: 2.4.1.1 + KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: diff --git a/integration/resources/templates/combination/function_with_msk_trigger_and_confluent_schema_registry.yaml b/integration/resources/templates/combination/function_with_msk_trigger_and_confluent_schema_registry.yaml new file mode 100644 index 0000000000..dedc27203e --- /dev/null +++ b/integration/resources/templates/combination/function_with_msk_trigger_and_confluent_schema_registry.yaml @@ -0,0 +1,86 @@ +Parameters: + PreCreatedSubnetOne: + Type: String + PreCreatedSubnetTwo: + Type: String + MskClusterName4: + Type: String + +Resources: + MyLambdaExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Action: [sts:AssumeRole] + Effect: Allow + Principal: + Service: [lambda.amazonaws.com] + Policies: + - PolicyName: IntegrationTestExecution + PolicyDocument: + Statement: + - Action: [kafka:DescribeCluster, kafka:GetBootstrapBrokers, ec2:CreateNetworkInterface, + ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, + ec2:DescribeSubnets, ec2:DescribeSecurityGroups, logs:CreateLogGroup, + logs:CreateLogStream, logs:PutLogEvents, s3:ListBucket, kafka:DescribeClusterV2] + Effect: Allow + Resource: '*' + ManagedPolicyArns: + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Tags: + - {Value: SAM, Key: lambda:createdBy} + + MyMskCluster: + Type: AWS::MSK::Cluster + Properties: + BrokerNodeGroupInfo: + ClientSubnets: + - Ref: PreCreatedSubnetOne + - Ref: PreCreatedSubnetTwo + InstanceType: kafka.t3.small + StorageInfo: + EBSStorageInfo: + VolumeSize: 1 + ClusterName: + Ref: MskClusterName4 + KafkaVersion: 3.8.x + NumberOfBrokerNodes: 2 + + MyMskStreamProcessor: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs18.x + Handler: index.handler + CodeUri: ${codeuri} + Role: + Fn::GetAtt: [MyLambdaExecutionRole, Arn] + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: + Ref: MyMskCluster + Topics: + - SchemaRegistryTestTopic + ProvisionedPollerConfig: + MinimumPollers: 1 + SchemaRegistryConfig: + AccessConfigs: + - Type: BASIC_AUTH + URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + SchemaValidationConfigs: + - Attribute: KEY + EventRecordFormat: JSON + SchemaRegistryURI: https://confluent.us-east-2.aws.confluent.cloud:9092 + + + + PreCreatedS3Bucket: + Type: AWS::S3::Bucket + DeletionPolicy: Delete + +Metadata: + SamTransformTest: true diff --git a/integration/resources/templates/combination/function_with_msk_trigger_and_s3_onfailure_events_destinations.yaml b/integration/resources/templates/combination/function_with_msk_trigger_and_s3_onfailure_events_destinations.yaml index 7ff0e13f2d..c449edbc4f 100644 --- a/integration/resources/templates/combination/function_with_msk_trigger_and_s3_onfailure_events_destinations.yaml +++ b/integration/resources/templates/combination/function_with_msk_trigger_and_s3_onfailure_events_destinations.yaml @@ -45,7 +45,7 @@ Resources: VolumeSize: 1 ClusterName: Ref: MskClusterName3 - KafkaVersion: 2.4.1.1 + KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: diff --git a/integration/resources/templates/combination/function_with_msk_using_managed_policy.yaml b/integration/resources/templates/combination/function_with_msk_using_managed_policy.yaml index cb1186bfb4..05b0b3e54d 100644 --- a/integration/resources/templates/combination/function_with_msk_using_managed_policy.yaml +++ b/integration/resources/templates/combination/function_with_msk_using_managed_policy.yaml @@ -20,7 +20,7 @@ Resources: VolumeSize: 1 ClusterName: Ref: MskClusterName2 - KafkaVersion: 2.4.1.1 + KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: diff --git a/requirements/base.txt b/requirements/base.txt index cbf14667d0..8a2c639cc6 100755 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,4 +1,4 @@ -boto3>=1.19.5,==1.* +boto3>=1.34.0,<2.0.0 jsonschema<5,>=3.2 # TODO: evaluate risk of removing jsonschema 3.x support typing_extensions>=4.4 # 3.8 doesn't have Required, TypeGuard and ParamSpec diff --git a/requirements/dev.txt b/requirements/dev.txt index 5489cc5746..911e125a72 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -12,8 +12,8 @@ parameterized~=0.7 # Integration tests dateparser~=1.1 -boto3>=1.23,<2 -tenacity~=8.0 +boto3>=1.34.0,<2.0.0 +tenacity~=9.0 # Requirements for examples requests~=2.28 @@ -26,6 +26,9 @@ ruamel.yaml==0.17.21 # It can parse yaml while perserving comments mypy~=1.3.0 # types -boto3-stubs[appconfig,serverlessrepo]>=1.19.5,==1.* +boto3-stubs[appconfig,serverlessrepo]>=1.34.0,<2.0.0 types-PyYAML~=6.0 types-jsonschema~=3.2 + +# CloudFormation CLI tools +cloudformation-cli>=0.2.39,<0.3.0 diff --git a/samtranslator/__init__.py b/samtranslator/__init__.py index ff4f92b088..8e4d66c70f 100644 --- a/samtranslator/__init__.py +++ b/samtranslator/__init__.py @@ -1 +1 @@ -__version__ = "1.98.0" +__version__ = "1.99.0" diff --git a/samtranslator/internal/schema_source/aws_serverless_function.py b/samtranslator/internal/schema_source/aws_serverless_function.py index 4743e82f77..260d2eae7b 100644 --- a/samtranslator/internal/schema_source/aws_serverless_function.py +++ b/samtranslator/internal/schema_source/aws_serverless_function.py @@ -421,6 +421,7 @@ class MSKEventProperties(BaseModel): SourceAccessConfigurations: Optional[PassThroughProp] = mskeventproperties("SourceAccessConfigurations") DestinationConfig: Optional[PassThroughProp] # TODO: add documentation ProvisionedPollerConfig: Optional[PassThroughProp] + SchemaRegistryConfig: Optional[PassThroughProp] class MSKEvent(BaseModel): @@ -460,6 +461,7 @@ class SelfManagedKafkaEventProperties(BaseModel): StartingPositionTimestamp: Optional[PassThroughProp] # TODO: add documentation Topics: PassThroughProp = selfmanagedkafkaeventproperties("Topics") ProvisionedPollerConfig: Optional[PassThroughProp] + SchemaRegistryConfig: Optional[PassThroughProp] class SelfManagedKafkaEvent(BaseModel): diff --git a/samtranslator/model/eventsources/pull.py b/samtranslator/model/eventsources/pull.py index 37349506ac..5845865b72 100644 --- a/samtranslator/model/eventsources/pull.py +++ b/samtranslator/model/eventsources/pull.py @@ -2,6 +2,7 @@ from typing import Any, Dict, List, Optional, Tuple from samtranslator.internal.deprecation_control import deprecated +from samtranslator.intrinsics.resolver import IntrinsicsResolver from samtranslator.metrics.method_decorator import cw_timer from samtranslator.model import PassThroughProperty, Property, PropertyType, ResourceMacro from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX @@ -25,6 +26,8 @@ class PullEventSource(ResourceMacro, metaclass=ABCMeta): :cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source """ + ARN_SEGMENTS_COUNT = 6 + REGISTRY_SEGMENT_POS_IN_ARN = 5 # Event types that support `FilterCriteria`, stored as a list to keep the alphabetical order RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DocumentDB", "DynamoDB", "Kinesis", "MQ", "MSK", "SelfManagedKafka", "SQS"] @@ -56,6 +59,7 @@ class PullEventSource(ResourceMacro, metaclass=ABCMeta): "ConsumerGroupId": PropertyType(False, IS_STR), "ScalingConfig": PropertyType(False, IS_DICT), "ProvisionedPollerConfig": PropertyType(False, IS_DICT), + "SchemaRegistryConfig": PropertyType(False, IS_DICT), "MetricsConfig": PropertyType(False, IS_DICT), } @@ -81,6 +85,7 @@ class PullEventSource(ResourceMacro, metaclass=ABCMeta): ConsumerGroupId: Optional[Intrinsicable[str]] ScalingConfig: Optional[Dict[str, Any]] ProvisionedPollerConfig: Optional[Dict[str, Any]] + SchemaRegistryConfig: Optional[Dict[str, Any]] MetricsConfig: Optional[Dict[str, Any]] @abstractmethod @@ -88,7 +93,9 @@ def get_policy_arn(self) -> Optional[str]: """Policy to be added to the role (if a role applies).""" @abstractmethod - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: """Inline policy statements to be added to the role (if a role applies).""" @abstractmethod @@ -113,6 +120,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P :rtype: list """ function = kwargs.get("function") + intrinsic_resolver = kwargs.get("intrinsics_resolver") if not function: raise TypeError("Missing required keyword argument: function") @@ -168,7 +176,19 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P self.logical_id, f"Property ConsumerGroupId not defined for resource of type {self.resource_type}.", ) - + if self.SchemaRegistryConfig: + if self.resource_type == "MSK": + if not lambda_eventsourcemapping.AmazonManagedKafkaEventSourceConfig: # type: ignore[attr-defined] + lambda_eventsourcemapping.AmazonManagedKafkaEventSourceConfig = {} + lambda_eventsourcemapping.AmazonManagedKafkaEventSourceConfig["SchemaRegistryConfig"] = ( # type: ignore[attr-defined] + self.SchemaRegistryConfig + ) + if self.resource_type == "SelfManagedKafka": + if not lambda_eventsourcemapping.SelfManagedKafkaEventSourceConfig: # type: ignore[attr-defined] + lambda_eventsourcemapping.SelfManagedKafkaEventSourceConfig = {} + lambda_eventsourcemapping.SelfManagedKafkaEventSourceConfig["SchemaRegistryConfig"] = ( # type: ignore[attr-defined] + self.SchemaRegistryConfig + ) destination_config_policy: Optional[Dict[str, Any]] = None if self.DestinationConfig: on_failure: Dict[str, Any] = sam_expect( @@ -211,18 +231,18 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P self.add_extra_eventsourcemapping_fields(lambda_eventsourcemapping) if "role" in kwargs: - self._link_policy(kwargs["role"], destination_config_policy) # type: ignore[no-untyped-call] + self._link_policy(kwargs["role"], intrinsic_resolver, destination_config_policy) # type: ignore[no-untyped-call] return resources - def _link_policy(self, role, destination_config_policy=None): # type: ignore[no-untyped-def] + def _link_policy(self, role, intrinsic_resolver=None, destination_config_policy=None): # type: ignore[no-untyped-def] """If this source triggers a Lambda function whose execution role is auto-generated by SAM, add the appropriate managed policy to this Role. :param model.iam.IAMRole role: the execution role generated for the function """ policy_arn = self.get_policy_arn() - policy_statements = self.get_policy_statements() + policy_statements = self.get_policy_statements(intrinsic_resolver) if role is not None: if policy_arn is not None and policy_arn not in role.ManagedPolicyArns: role.ManagedPolicyArns.append(policy_arn) @@ -324,6 +344,99 @@ def _get_kms_decrypt_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]: }, } + def validate_schema_registry_config(self) -> None: + if not self.SchemaRegistryConfig: + return + + sam_expect(self.SchemaRegistryConfig, self.relative_id, "SchemaRegistryConfig", is_sam_event=True).to_be_a_map() + required_fields = ["SchemaRegistryURI", "EventRecordFormat", "SchemaValidationConfigs"] + for field in required_fields: + if field not in self.SchemaRegistryConfig: + raise InvalidEventException(self.relative_id, f"Missing required field {field} in SchemaRegistryConfig") + + event_record_format = self.SchemaRegistryConfig.get("EventRecordFormat") + if event_record_format not in ["JSON", "SOURCE"]: + raise InvalidEventException( + self.relative_id, "EventRecordFormat in SchemaRegistryConfig must be either 'JSON' or 'SOURCE'" + ) + + validation_configs = self.SchemaRegistryConfig.get("SchemaValidationConfigs") + if not isinstance(validation_configs, list): + raise InvalidEventException(self.relative_id, "SchemaValidationConfigs must be a list") + + access_configs = self.SchemaRegistryConfig.get("AccessConfigs", []) + if access_configs: + if not isinstance(access_configs, list): + raise InvalidEventException(self.relative_id, "AccessConfigs in SchemaRegistryConfig must be a list") + for config in access_configs: + if not isinstance(config, dict) or "Type" not in config or "URI" not in config: + raise InvalidEventException( + self.relative_id, "Each AccessConfig must be a dict with 'Type' and 'URI' fields" + ) + + def get_schema_registry_permissions( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: + + if not self.SchemaRegistryConfig: + return None + + self.validate_schema_registry_config() + + statements = [] + + # Add permissions for AccessConfigs secrets if provided + access_configs = self.SchemaRegistryConfig.get("AccessConfigs", []) + for config in access_configs: + if isinstance(config, dict) and "URI" in config: + statements.append( + { + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": config["URI"], + } + ) + + registry_uri = self.SchemaRegistryConfig.get("SchemaRegistryURI") + registry_arn = None + ## resolved_value is still going to be a dict if intrinsic functions are used. Otherwise, it'd be a string. + if isinstance(registry_uri, str): + registry_arn = registry_uri + elif isinstance(registry_uri, dict) and intrinsic_resolver is not None: + ## We can't handle other intrinsic functions in SAM. Don't pass original dictionary + + resolved_value = intrinsic_resolver.resolve_parameter_refs(registry_uri.copy()) + registry_arn = resolved_value.get("Fn::Sub") + + registry_name_optional = self.get_registry_name(str(registry_arn)) + if registry_name_optional is not None: + statements.append({"Action": ["glue:GetRegistry"], "Effect": "Allow", "Resource": registry_uri}) + statements.append( + { + "Action": ["glue:GetSchemaVersion"], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/" + + registry_name_optional + + "/*" + }, + ], + } + ) + return statements + + def get_registry_name(self, registry_uri: str) -> Optional[str]: + if isinstance(registry_uri, str) and registry_uri.startswith("arn"): + parts = registry_uri.split(":") + if len(parts) >= PullEventSource.ARN_SEGMENTS_COUNT and parts[ + PullEventSource.REGISTRY_SEGMENT_POS_IN_ARN + ].startswith("registry/"): + return parts[PullEventSource.REGISTRY_SEGMENT_POS_IN_ARN][len("registry/") :] + return None + class Kinesis(PullEventSource): """Kinesis event source.""" @@ -343,7 +456,9 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole") - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: return None @@ -365,7 +480,9 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole") - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: return None @@ -386,7 +503,9 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole") - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: return None @@ -408,29 +527,30 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole") - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: + statements: List[Dict[str, Any]] = [] + if self.SchemaRegistryConfig: + schema_registry_statements = self.get_schema_registry_permissions(intrinsic_resolver) + if schema_registry_statements is not None: + statements.extend(schema_registry_statements) if self.SourceAccessConfigurations: for conf in self.SourceAccessConfigurations: # Lambda does not support multiple CLIENT_CERTIFICATE_TLS_AUTH configurations if isinstance(conf, dict) and conf.get("Type") == "CLIENT_CERTIFICATE_TLS_AUTH" and conf.get("URI"): - return [ + statements.append( { - "PolicyName": "MSKExecutionRolePolicy", - "PolicyDocument": { - "Statement": [ - { - "Action": [ - "secretsmanager:GetSecretValue", - ], - "Effect": "Allow", - "Resource": conf.get("URI"), - } - ] - }, + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": conf.get("URI"), } - ] - - return None + ) + if not statements: + return None + return [{"PolicyName": "MSKExecutionRolePolicy", "PolicyDocument": {"Statement": statements}}] class MQ(PullEventSource): @@ -487,7 +607,9 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return None - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: basic_auth_uri = self._validate_source_access_configurations(["BASIC_AUTH", "VIRTUAL_HOST"], "BASIC_AUTH") document = { @@ -544,7 +666,9 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return None - def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: + def get_policy_statements( + self, intrinsic_resolver: Optional[IntrinsicsResolver] = None + ) -> Optional[List[Dict[str, Any]]]: if not self.KafkaBootstrapServers: raise InvalidEventException( self.relative_id, @@ -568,10 +692,12 @@ def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]: self.relative_id, "No SourceAccessConfigurations for self managed kafka event provided.", ) - document = self.generate_policy_document(self.SourceAccessConfigurations) + document = self.generate_policy_document(self.SourceAccessConfigurations, intrinsic_resolver) return [document] - def generate_policy_document(self, source_access_configurations: List[Any]): # type: ignore[no-untyped-def] + def generate_policy_document( # type: ignore[no-untyped-def] + self, source_access_configurations: List[Any], intrinsic_resolver: Optional[IntrinsicsResolver] = None + ): statements = [] authentication_uri, authentication_uri_2, has_vpc_config = self.get_secret_key(source_access_configurations) if authentication_uri: @@ -591,6 +717,12 @@ def generate_policy_document(self, source_access_configurations: List[Any]): # kms_policy = self._get_kms_decrypt_policy(self.SecretsManagerKmsKeyId) statements.append(kms_policy) + # Add schema registry permissions if configured + if self.SchemaRegistryConfig: + schema_registry_statements = self.get_schema_registry_permissions(intrinsic_resolver) + if schema_registry_statements is not None: + statements.extend(schema_registry_statements) + return { "PolicyDocument": { "Statement": statements, @@ -725,7 +857,7 @@ def get_event_source_arn(self) -> Optional[PassThrough]: def get_policy_arn(self) -> Optional[str]: return None - def get_policy_statements(self) -> List[Dict[str, Any]]: + def get_policy_statements(self, intrinsic_resolver: Optional[IntrinsicsResolver] = None) -> List[Dict[str, Any]]: basic_auth_uri = self._validate_source_access_configurations(["BASIC_AUTH"], "BASIC_AUTH") statements = [ diff --git a/samtranslator/schema/schema.json b/samtranslator/schema/schema.json index 0ef249bfcc..91b2e7f0f2 100644 --- a/samtranslator/schema/schema.json +++ b/samtranslator/schema/schema.json @@ -142611,6 +142611,11 @@ "markdownDescription": "The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id) .", "title": "ConsumerGroupId", "type": "string" + }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaRegistryConfig", + "markdownDescription": "Specific configuration settings for a Kafka schema registry.", + "title": "SchemaRegistryConfig" } }, "type": "object" @@ -142708,6 +142713,65 @@ }, "type": "object" }, + "AWS::Lambda::EventSourceMapping.SchemaRegistryAccessConfig": { + "additionalProperties": false, + "properties": { + "Type": { + "markdownDescription": "The type of authentication Lambda uses to access your schema registry.", + "title": "Type", + "type": "string" + }, + "URI": { + "markdownDescription": "The URI of the secret (Secrets Manager secret ARN) to authenticate with your schema registry.", + "title": "URI", + "type": "string" + } + }, + "type": "object" + }, + "AWS::Lambda::EventSourceMapping.SchemaRegistryConfig": { + "additionalProperties": false, + "properties": { + "AccessConfigs": { + "items": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaRegistryAccessConfig" + }, + "markdownDescription": "An array of access configuration objects that tell Lambda how to authenticate with your schema registry.", + "title": "AccessConfigs", + "type": "array" + }, + "EventRecordFormat": { + "markdownDescription": "The record format that Lambda delivers to your function after schema validation.", + "title": "EventRecordFormat", + "type": "string" + }, + "SchemaRegistryURI": { + "markdownDescription": "The URI for your schema registry. The correct URI format depends on the type of schema registry you're using.", + "title": "SchemaRegistryURI", + "type": "string" + }, + "SchemaValidationConfigs": { + "items": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaValidationConfig" + }, + "markdownDescription": "An array of schema validation configuration objects, which tell Lambda the message attributes you want to validate and filter using your schema registry.", + "title": "SchemaValidationConfigs", + "type": "array" + } + }, + "type": "object" + }, + "AWS::Lambda::EventSourceMapping.SchemaValidationConfig": { + "additionalProperties": false, + "properties": { + "Attribute": { + "markdownDescription": "The attribute you want your schema registry to validate and filter for.", + "title": "Attribute", + "type": "string" + } + }, + "type": "object" + }, "AWS::Lambda::EventSourceMapping.SelfManagedEventSource": { "additionalProperties": false, "properties": { @@ -142726,6 +142790,11 @@ "markdownDescription": "The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka-process.html#services-smaa-topic-add) .", "title": "ConsumerGroupId", "type": "string" + }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaRegistryConfig", + "markdownDescription": "Specific configuration settings for a Kafka schema registry.", + "title": "SchemaRegistryConfig" } }, "type": "object" @@ -275836,6 +275905,9 @@ "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/PassThroughProp" + }, "SourceAccessConfigurations": { "allOf": [ { @@ -276681,6 +276753,9 @@ "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/PassThroughProp" + }, "SourceAccessConfigurations": { "allOf": [ { diff --git a/schema_source/cloudformation.schema.json b/schema_source/cloudformation.schema.json index d480011b24..45312f7bad 100644 --- a/schema_source/cloudformation.schema.json +++ b/schema_source/cloudformation.schema.json @@ -142569,10 +142569,74 @@ "markdownDescription": "The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id) .", "title": "ConsumerGroupId", "type": "string" + }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaRegistryConfig", + "markdownDescription": "Specific configuration settings for a Kafka schema registry.", + "title": "SchemaRegistryConfig" } }, "type": "object" }, + "AWS::Lambda::EventSourceMapping.SchemaRegistryConfig": { + "additionalProperties": false, + "type": "object", + "properties": { + "SchemaRegistryURI" : { + "markdownDescription": "The URI for your schema registry. The correct URI format depends on the type of schema registry you're using.", + "title": "SchemaRegistryURI", + "type": "string" + }, + "EventRecordFormat": { + "markdownDescription": "The record format that Lambda delivers to your function after schema validation.", + "title": "EventRecordFormat", + "type": "string" + }, + "AccessConfigs": { + "items": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaRegistryAccessConfig" + }, + "markdownDescription": "An array of access configuration objects that tell Lambda how to authenticate with your schema registry.", + "title": "AccessConfigs", + "type": "array" + }, + "SchemaValidationConfigs": { + "items": { + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaValidationConfig" + }, + "markdownDescription": "An array of schema validation configuration objects, which tell Lambda the message attributes you want to validate and filter using your schema registry.", + "title": "SchemaValidationConfigs", + "type": "array" + } + } + }, + "AWS::Lambda::EventSourceMapping.SchemaRegistryAccessConfig": { + "additionalProperties": false, + "type": "object", + "properties": { + "Type": { + "markdownDescription": "The type of authentication Lambda uses to access your schema registry.", + "title": "Type", + "type": "string" + }, + "URI": { + "markdownDescription": "The URI of the secret (Secrets Manager secret ARN) to authenticate with your schema registry.", + "title": "URI", + "type": "string" + } + } + }, + "AWS::Lambda::EventSourceMapping.SchemaValidationConfig": { + "additionalProperties": false, + "type": "object", + "properties": { + "Attribute": { + "markdownDescription": "The attribute you want your schema registry to validate and filter for.", + "type": "string", + "title": "Attribute" + } + } + }, "AWS::Lambda::EventSourceMapping.DestinationConfig": { "additionalProperties": false, "properties": { @@ -142684,6 +142748,11 @@ "markdownDescription": "The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka-process.html#services-smaa-topic-add) .", "title": "ConsumerGroupId", "type": "string" + }, + "SchemaRegistryConfig": { + "markdownDescription": "Specific configuration settings for a Kafka schema registry.", + "title": "SchemaRegistryConfig", + "$ref": "#/definitions/AWS::Lambda::EventSourceMapping.SchemaRegistryConfig" } }, "type": "object" diff --git a/schema_source/sam.schema.json b/schema_source/sam.schema.json index ef135734ba..169a78c7f1 100644 --- a/schema_source/sam.schema.json +++ b/schema_source/sam.schema.json @@ -2421,6 +2421,9 @@ "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/PassThroughProp" + }, "SourceAccessConfigurations": { "allOf": [ { @@ -3197,6 +3200,9 @@ "ProvisionedPollerConfig": { "$ref": "#/definitions/PassThroughProp" }, + "SchemaRegistryConfig": { + "$ref": "#/definitions/PassThroughProp" + }, "SourceAccessConfigurations": { "allOf": [ { diff --git a/tests/model/eventsources/test_msk_event_source.py b/tests/model/eventsources/test_msk_event_source.py index 03d8e8a30d..87467eaccd 100644 --- a/tests/model/eventsources/test_msk_event_source.py +++ b/tests/model/eventsources/test_msk_event_source.py @@ -1,5 +1,6 @@ from unittest import TestCase +from samtranslator.intrinsics.resolver import IntrinsicsResolver from samtranslator.model.eventsources.pull import MSK @@ -38,6 +39,98 @@ def test_get_policy_statements(self): self.assertEqual(policy_statements, expected_policy_document) + def test_get_policy_statements_with_glue_schema_registry(self): + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY", + "ValidSchemas": ["schema1"], + } + ], + "AccessConfigs": [ + {"Type": "BASIC_AUTH", "URI": "SCHEMA_SECRET_URI"}, + {"Type": "SERVER_ROOT_CA_CERTIFICATE", "URI": "CA_SECRET_URI"}, + ], + } + + policy_statements = self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + expected_policy_document = [ + { + "PolicyName": "MSKExecutionRolePolicy", + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": "SCHEMA_SECRET_URI", + }, + { + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": "CA_SECRET_URI", + }, + { + "Action": ["glue:GetRegistry"], + "Effect": "Allow", + "Resource": "arn:aws:glue:us-west-2:123456789012:registry/example", + }, + { + "Action": ["glue:GetSchemaVersion"], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/example/*" + } + ], + }, + ] + }, + } + ] + + self.assertEqual(policy_statements, expected_policy_document) + + def test_get_policy_statements_with_non_glue_schema_registry(self): + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "https://example.com/testRegistry", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY", + "ValidSchemas": ["schema1"], + } + ], + "AccessConfigs": [ + {"Type": "SERVER_ROOT_CA_CERTIFICATE", "URI": "CA_SECRET_URI"}, + ], + } + + policy_statements = self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + expected_policy_document = [ + { + "PolicyName": "MSKExecutionRolePolicy", + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": "CA_SECRET_URI", + } + ] + }, + } + ] + + self.assertEqual(policy_statements, expected_policy_document) + def test_get_policy_statements_with_no_auth_mechanism(self): self.kafka_event_source.SourceAccessConfigurations = [] @@ -45,3 +138,49 @@ def test_get_policy_statements_with_no_auth_mechanism(self): expected_policy_document = None self.assertEqual(policy_statements, expected_policy_document) + + def test_validate_schema_registry_config_missing_required_fields(self): + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example" + } + with self.assertRaises(Exception) as context: + self.kafka_event_source.validate_schema_registry_config() + self.assertTrue("Missing required field EventRecordFormat in SchemaRegistryConfig" in str(context.exception)) + + def test_validate_schema_registry_config_invalid_format(self): + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "INVALID", + "SchemaValidationConfigs": [], + } + with self.assertRaises(Exception) as context: + self.kafka_event_source.validate_schema_registry_config() + self.assertTrue( + "EventRecordFormat in SchemaRegistryConfig must be either 'JSON' or 'SOURCE'" in str(context.exception) + ) + + def test_validate_schema_registry_config_invalid_validation_configs(self): + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": "not-a-list", + } + with self.assertRaises(Exception) as context: + self.kafka_event_source.validate_schema_registry_config() + self.assertTrue("SchemaValidationConfigs must be a list" in str(context.exception)) + + def test_validate_schema_registry_config_when_access_config_is_empty(self): + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY", + "ValidSchemas": ["schema1"], + } + ], + "AccessConfigs": "not- a list", + } + with self.assertRaises(Exception) as context: + self.kafka_event_source.validate_schema_registry_config() + self.assertTrue("AccessConfigs in SchemaRegistryConfig must be a list" in str(context.exception)) diff --git a/tests/model/eventsources/test_self_managed_kafka_event_source.py b/tests/model/eventsources/test_self_managed_kafka_event_source.py index 2681fae80f..4a2dc6f15e 100644 --- a/tests/model/eventsources/test_self_managed_kafka_event_source.py +++ b/tests/model/eventsources/test_self_managed_kafka_event_source.py @@ -1,6 +1,7 @@ from unittest import TestCase from parameterized import parameterized +from samtranslator.intrinsics.resolver import IntrinsicsResolver from samtranslator.model.eventsources.pull import SelfManagedKafka from samtranslator.model.exceptions import InvalidEventException @@ -321,3 +322,154 @@ def test_must_validate_secrets_manager_kms_key_id(self, kms_key_id_value): with self.assertRaises(InvalidEventException) as error: self.kafka_event_source.get_policy_statements() self.assertEqual(error_message, str(error.exception)) + + def test_validate_schema_registry_config_missing_required_fields(self): + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "SASL_SCRAM_256_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + self.kafka_event_source.Topics = ["Topics"] + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example" + } + with self.assertRaises(InvalidEventException) as context: + self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + self.assertTrue("Missing required field EventRecordFormat in SchemaRegistryConfig" in str(context.exception)) + + def test_validate_schema_registry_config_invalid_format(self): + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "SASL_SCRAM_256_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + self.kafka_event_source.Topics = ["Topics"] + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "INVALID", + "SchemaValidationConfigs": [], + } + with self.assertRaises(InvalidEventException) as context: + self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + self.assertTrue( + "EventRecordFormat in SchemaRegistryConfig must be either 'JSON' or 'SOURCE'" in str(context.exception) + ) + + def test_validate_schema_registry_config_invalid_validation_configs(self): + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "SASL_SCRAM_256_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + self.kafka_event_source.Topics = ["Topics"] + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": "not-a-list", + } + with self.assertRaises(InvalidEventException) as context: + self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + self.assertTrue("SchemaValidationConfigs must be a list" in str(context.exception)) + + def test_get_policy_statements_with_schema_registry(self): + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "BASIC_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + self.kafka_event_source.Topics = ["Topics"] + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "arn:aws:glue:us-west-2:123456789012:registry/example", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": [{"Attribute": "KEY"}], + "AccessConfigs": [{"Type": "BASIC_AUTH", "URI": "BASIC_AUTH_URI"}], + } + + policy_statements = self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + expected_statements = [ + {"Action": ["secretsmanager:GetSecretValue"], "Effect": "Allow", "Resource": "SECRET_URI"}, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups", + ], + "Effect": "Allow", + "Resource": "*", + }, + {"Action": ["secretsmanager:GetSecretValue"], "Effect": "Allow", "Resource": "BASIC_AUTH_URI"}, + { + "Action": ["glue:GetRegistry"], + "Effect": "Allow", + "Resource": "arn:aws:glue:us-west-2:123456789012:registry/example", + }, + { + "Action": ["glue:GetSchemaVersion"], + "Effect": "Allow", + "Resource": [ + {"Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/example/*"} + ], + }, + ] + self.assertEqual(len(policy_statements), 1) + self.assertEqual(len(policy_statements[0]["PolicyDocument"]["Statement"]), len(expected_statements)) + for statement in expected_statements: + self.assertIn(statement, policy_statements[0]["PolicyDocument"]["Statement"]) + + def test_get_policy_statements_with_non_glue_schema_registry(self): + self.kafka_event_source.SourceAccessConfigurations = [ + {"Type": "BASIC_AUTH", "URI": "SECRET_URI"}, + {"Type": "VPC_SUBNET", "URI": "SECRET_URI"}, + {"Type": "VPC_SECURITY_GROUP", "URI": "SECRET_URI"}, + ] + self.kafka_event_source.Topics = ["Topics"] + self.kafka_event_source.KafkaBootstrapServers = ["endpoint1", "endpoint2"] + self.kafka_event_source.SchemaRegistryConfig = { + "SchemaRegistryURI": "https://example.com/testRegistry", + "EventRecordFormat": "JSON", + "SchemaValidationConfigs": [{"Attribute": "KEY"}], + "AccessConfigs": [ + {"Type": "SERVER_ROOT_CA_CERTIFICATE", "URI": "CA_SECRET_URI"}, + ], + } + + policy_statements = self.kafka_event_source.get_policy_statements(IntrinsicsResolver({})) + expected_statements = [ + { + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": "SECRET_URI", + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups", + ], + "Effect": "Allow", + "Resource": "*", + }, + { + "Action": [ + "secretsmanager:GetSecretValue", + ], + "Effect": "Allow", + "Resource": "CA_SECRET_URI", + }, + ] + + self.assertEqual(len(policy_statements[0]["PolicyDocument"]["Statement"]), len(expected_statements)) + for statement in expected_statements: + self.assertIn(statement, policy_statements[0]["PolicyDocument"]["Statement"]) diff --git a/tests/translator/input/error_invalid_schema_registry_config_for_kafka.yaml b/tests/translator/input/error_invalid_schema_registry_config_for_kafka.yaml new file mode 100644 index 0000000000..dcef734328 --- /dev/null +++ b/tests/translator/input/error_invalid_schema_registry_config_for_kafka.yaml @@ -0,0 +1,67 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: {} +Resources: + MSKFunctionWithoutSchemaValidationConfigs: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: MSK + Properties: + Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + StartingPosition: LATEST + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + Topics: + - abcd + SchemaRegistryConfig: + SchemaRegistryURI: https://example.com/registry + EventRecordFormat: JSON + MSKFunctionWithoutEventRecordFormat: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: MSK + Properties: + Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + StartingPosition: LATEST + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + Topics: + - abcd + SchemaRegistryConfig: + SchemaRegistryURI: https://example.com/registry + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + MSKFunctionWithoutSchemaRegistryURI: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: MSK + Properties: + Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + StartingPosition: LATEST + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + Topics: + - abcd + SchemaRegistryConfig: + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + EventRecordFormat: JSON diff --git a/tests/translator/input/error_invalid_schema_registry_config_for_self_managed_kafka.yaml b/tests/translator/input/error_invalid_schema_registry_config_for_self_managed_kafka.yaml new file mode 100644 index 0000000000..a0c0d68a16 --- /dev/null +++ b/tests/translator/input/error_invalid_schema_registry_config_for_self_managed_kafka.yaml @@ -0,0 +1,85 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: {} +Resources: + KafkaFunctionWithoutSchemaValidationConfigs: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + StartingPosition: LATEST + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + Topics: + - abcd + SchemaRegistryConfig: + SchemaRegistryURI: https://example.com/registry + EventRecordFormat: JSON + KafkaFunctionWithoutEventRecordFormat: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + StartingPosition: LATEST + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + Topics: + - abcd + SchemaRegistryConfig: + SchemaRegistryURI: https://example.com/registry + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + KafkaFunctionWithoutSchemaRegistryURI: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + StartingPosition: LATEST + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + Topics: + - abcd + SchemaRegistryConfig: + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + EventRecordFormat: JSON diff --git a/tests/translator/input/function_with_msk_with_schema_registry_config.yaml b/tests/translator/input/function_with_msk_with_schema_registry_config.yaml new file mode 100644 index 0000000000..3fb4f1b812 --- /dev/null +++ b/tests/translator/input/function_with_msk_with_schema_registry_config.yaml @@ -0,0 +1,94 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: {} + +Resources: + MyMskStreamProcessorWithGlueSchemaAndIamAuth: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !Sub arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + Topics: + - MyDummyTestTopic + ConsumerGroupId: consumergroup1 + SchemaRegistryConfig: + SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1 + EventRecordFormat: JSON + SchemaValidationConfigs: + - Attribute: KEY + + MyMskStreamProcessorWithGlueSchemaAndBasicAuth: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !Sub arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + Topics: + - MyDummyTestTopic + ConsumerGroupId: consumergroup1 + SchemaRegistryConfig: + SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1 + EventRecordFormat: JSON + AccessConfigs: + - Type: BASIC_AUTH + URI: !Sub arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + + MyMskStreamProcessorWithConfluentSchemaAndBasicAuth: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !Sub arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + Topics: + - MyDummyTestTopic + ConsumerGroupId: consumergroup1 + SchemaRegistryConfig: + SchemaRegistryURI: https://example.com/registry + EventRecordFormat: JSON + AccessConfigs: + - Type: BASIC_AUTH + URI: !Sub arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + + MyMskStreamProcessorWithOnlySchemaRegistry: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyMskEvent: + Type: MSK + Properties: + StartingPosition: LATEST + Stream: !Sub arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2 + Topics: + - MyDummyTestTopic + SchemaRegistryConfig: + SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1 + EventRecordFormat: JSON + SchemaValidationConfigs: + - Attribute: KEY diff --git a/tests/translator/input/function_with_self_managed_kafka_and_schema_registry.yaml b/tests/translator/input/function_with_self_managed_kafka_and_schema_registry.yaml new file mode 100644 index 0000000000..4842e52815 --- /dev/null +++ b/tests/translator/input/function_with_self_managed_kafka_and_schema_registry.yaml @@ -0,0 +1,127 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: {} +Resources: + MyKafkaStreamProcessorWithGlueSchemaAndIamAuth: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + Topics: + - Topic1 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + ConsumerGroupId: consumergroup1 + SchemaRegistryConfig: + SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1 + EventRecordFormat: JSON + SchemaValidationConfigs: + - Attribute: KEY + + MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth: + Type: AWS::Serverless::Function + Properties: + CodeUri: s3://sam-demo-bucket/kafka.zip + Handler: index.kafka_handler + Runtime: python3.9 + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + Topics: + - Topic1 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + ConsumerGroupId: consumergroup1 + SchemaRegistryConfig: + SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1 + EventRecordFormat: JSON + AccessConfigs: + - Type: BASIC_AUTH + URI: !Sub arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path + SchemaValidationConfigs: + - Attribute: KEY + + + MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + StartingPosition: LATEST + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + Topics: + - MyDummyTestTopic + ConsumerGroupId: consumergroup1 + SchemaRegistryConfig: + SchemaRegistryURI: https://example.com/registry + EventRecordFormat: JSON + AccessConfigs: + - Type: BASIC_AUTH + URI: !Sub arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path + SchemaValidationConfigs: + - Attribute: KEY + - Attribute: VALUE + + MyMskStreamProcessorWithOnlySchemaRegistry: + Type: AWS::Serverless::Function + Properties: + Runtime: nodejs12.x + Handler: index.handler + CodeUri: s3://sam-demo-bucket/kafka.zip + Events: + MyKafkaCluster: + Type: SelfManagedKafka + Properties: + StartingPosition: LATEST + KafkaBootstrapServers: + - abc.xyz.com:9092 + - 123.45.67.89:9096 + SourceAccessConfigurations: + - Type: SASL_SCRAM_512_AUTH + URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c + - Type: VPC_SUBNET + URI: subnet:subnet-12345 + - Type: VPC_SECURITY_GROUP + URI: security_group:sg-67890 + Topics: + - MyDummyTestTopic + SchemaRegistryConfig: + SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1 + EventRecordFormat: JSON + SchemaValidationConfigs: + - Attribute: KEY diff --git a/tests/translator/output/aws-cn/function_with_msk_with_schema_registry_config.json b/tests/translator/output/aws-cn/function_with_msk_with_schema_registry_config.json new file mode 100644 index 0000000000..49e4a12cd6 --- /dev/null +++ b/tests/translator/output/aws-cn/function_with_msk_with_schema_registry_config.json @@ -0,0 +1,471 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": "https://example.com/registry", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithConfluentSchemaAndBasicAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithGlueSchemaAndBasicAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithGlueSchemaAndIamAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithGlueSchemaAndIamAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithOnlySchemaRegistry": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithOnlySchemaRegistryRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithOnlySchemaRegistry" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/aws-cn/function_with_self_managed_kafka_and_schema_registry.json b/tests/translator/output/aws-cn/function_with_self_managed_kafka_and_schema_registry.json new file mode 100644 index 0000000000..25e5c57cab --- /dev/null +++ b/tests/translator/output/aws-cn/function_with_self_managed_kafka_and_schema_registry.json @@ -0,0 +1,618 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": "https://example.com/registry", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.kafka_handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "python3.9", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.kafka_handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthRole", + "Arn" + ] + }, + "Runtime": "python3.9", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithGlueSchemaAndIamAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithOnlySchemaRegistry": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithOnlySchemaRegistryRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyMskStreamProcessorWithOnlySchemaRegistry" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/aws-us-gov/function_with_msk_with_schema_registry_config.json b/tests/translator/output/aws-us-gov/function_with_msk_with_schema_registry_config.json new file mode 100644 index 0000000000..a2f631553e --- /dev/null +++ b/tests/translator/output/aws-us-gov/function_with_msk_with_schema_registry_config.json @@ -0,0 +1,471 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": "https://example.com/registry", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithConfluentSchemaAndBasicAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithGlueSchemaAndBasicAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithGlueSchemaAndIamAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithGlueSchemaAndIamAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithOnlySchemaRegistry": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithOnlySchemaRegistryRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithOnlySchemaRegistry" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/aws-us-gov/function_with_self_managed_kafka_and_schema_registry.json b/tests/translator/output/aws-us-gov/function_with_self_managed_kafka_and_schema_registry.json new file mode 100644 index 0000000000..3d0fa8cffa --- /dev/null +++ b/tests/translator/output/aws-us-gov/function_with_self_managed_kafka_and_schema_registry.json @@ -0,0 +1,618 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": "https://example.com/registry", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.kafka_handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "python3.9", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.kafka_handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthRole", + "Arn" + ] + }, + "Runtime": "python3.9", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithGlueSchemaAndIamAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithOnlySchemaRegistry": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithOnlySchemaRegistryRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyMskStreamProcessorWithOnlySchemaRegistry" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/error_invalid_schema_registry_config_for_kafka.json b/tests/translator/output/error_invalid_schema_registry_config_for_kafka.json new file mode 100644 index 0000000000..da02e0f6a6 --- /dev/null +++ b/tests/translator/output/error_invalid_schema_registry_config_for_kafka.json @@ -0,0 +1,25 @@ +{ + "_autoGeneratedBreakdownErrorMessage": [ + "Invalid Serverless Application Specification document. ", + "Number of errors found: 3. ", + "Resource with id [MSKFunctionWithoutEventRecordFormat] is invalid. ", + "Event with id [MyKafkaCluster] is invalid. ", + "Missing required field EventRecordFormat in SchemaRegistryConfig Resource with id [MSKFunctionWithoutSchemaRegistryURI] is invalid. ", + "Event with id [MyKafkaCluster] is invalid. ", + "Missing required field SchemaRegistryURI in SchemaRegistryConfig Resource with id [MSKFunctionWithoutSchemaValidationConfigs] is invalid. ", + "Event with id [MyKafkaCluster] is invalid. ", + "Missing required field SchemaValidationConfigs in SchemaRegistryConfig" + ], + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 3. Resource with id [MSKFunctionWithoutEventRecordFormat] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field EventRecordFormat in SchemaRegistryConfig Resource with id [MSKFunctionWithoutSchemaRegistryURI] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaRegistryURI in SchemaRegistryConfig Resource with id [MSKFunctionWithoutSchemaValidationConfigs] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaValidationConfigs in SchemaRegistryConfig", + "errors": [ + { + "errorMessage": "Resource with id [MSKFunctionWithoutEventRecordFormat] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field EventRecordFormat in SchemaRegistryConfig" + }, + { + "errorMessage": "Resource with id [MSKFunctionWithoutSchemaRegistryURI] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaRegistryURI in SchemaRegistryConfig" + }, + { + "errorMessage": "Resource with id [MSKFunctionWithoutSchemaValidationConfigs] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaValidationConfigs in SchemaRegistryConfig" + } + ] +} diff --git a/tests/translator/output/error_invalid_schema_registry_config_for_self_managed_kafka.json b/tests/translator/output/error_invalid_schema_registry_config_for_self_managed_kafka.json new file mode 100644 index 0000000000..8e46c0bd30 --- /dev/null +++ b/tests/translator/output/error_invalid_schema_registry_config_for_self_managed_kafka.json @@ -0,0 +1,25 @@ +{ + "_autoGeneratedBreakdownErrorMessage": [ + "Invalid Serverless Application Specification document. ", + "Number of errors found: 3. ", + "Resource with id [KafkaFunctionWithoutEventRecordFormat] is invalid. ", + "Event with id [MyKafkaCluster] is invalid. ", + "Missing required field EventRecordFormat in SchemaRegistryConfig Resource with id [KafkaFunctionWithoutSchemaRegistryURI] is invalid. ", + "Event with id [MyKafkaCluster] is invalid. ", + "Missing required field SchemaRegistryURI in SchemaRegistryConfig Resource with id [KafkaFunctionWithoutSchemaValidationConfigs] is invalid. ", + "Event with id [MyKafkaCluster] is invalid. ", + "Missing required field SchemaValidationConfigs in SchemaRegistryConfig" + ], + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 3. Resource with id [KafkaFunctionWithoutEventRecordFormat] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field EventRecordFormat in SchemaRegistryConfig Resource with id [KafkaFunctionWithoutSchemaRegistryURI] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaRegistryURI in SchemaRegistryConfig Resource with id [KafkaFunctionWithoutSchemaValidationConfigs] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaValidationConfigs in SchemaRegistryConfig", + "errors": [ + { + "errorMessage": "Resource with id [KafkaFunctionWithoutEventRecordFormat] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field EventRecordFormat in SchemaRegistryConfig" + }, + { + "errorMessage": "Resource with id [KafkaFunctionWithoutSchemaRegistryURI] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaRegistryURI in SchemaRegistryConfig" + }, + { + "errorMessage": "Resource with id [KafkaFunctionWithoutSchemaValidationConfigs] is invalid. Event with id [MyKafkaCluster] is invalid. Missing required field SchemaValidationConfigs in SchemaRegistryConfig" + } + ] +} diff --git a/tests/translator/output/function_with_msk_with_schema_registry_config.json b/tests/translator/output/function_with_msk_with_schema_registry_config.json new file mode 100644 index 0000000000..d3759b50d2 --- /dev/null +++ b/tests/translator/output/function_with_msk_with_schema_registry_config.json @@ -0,0 +1,471 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": "https://example.com/registry", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithConfluentSchemaAndBasicAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithConfluentSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithGlueSchemaAndBasicAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithGlueSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithGlueSchemaAndIamAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuthMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithGlueSchemaAndIamAuth" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithGlueSchemaAndIamAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithOnlySchemaRegistry": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithOnlySchemaRegistryRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryMyMskEvent": { + "Properties": { + "AmazonManagedKafkaEventSourceConfig": { + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "EventSourceArn": { + "Fn::Sub": "arn:${AWS::Partition}:kafka:${AWS::Region}:012345678901:cluster/mycluster/6cc0432b-8618-4f44-bccc-e1fbd8fb7c4d-2" + }, + "FunctionName": { + "Ref": "MyMskStreamProcessorWithOnlySchemaRegistry" + }, + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ] + }, + "PolicyName": "MSKExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/function_with_self_managed_kafka_and_schema_registry.json b/tests/translator/output/function_with_self_managed_kafka_and_schema_registry.json new file mode 100644 index 0000000000..0563f1b507 --- /dev/null +++ b/tests/translator/output/function_with_self_managed_kafka_and_schema_registry.json @@ -0,0 +1,618 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Parameters": {}, + "Resources": { + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": "https://example.com/registry", + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + }, + { + "Attribute": "VALUE" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithConfluentSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.kafka_handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthRole", + "Arn" + ] + }, + "Runtime": "python3.9", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "AccessConfigs": [ + { + "Type": "BASIC_AUTH", + "URI": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + } + ], + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndBasicAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:secretsmanager:${AWS::Region}:123456789012:secret:my-path" + } + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuth": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.kafka_handler", + "Role": { + "Fn::GetAtt": [ + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthRole", + "Arn" + ] + }, + "Runtime": "python3.9", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyKafkaStreamProcessorWithGlueSchemaAndIamAuth" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "consumergroup1", + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "Topics": [ + "Topic1" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyKafkaStreamProcessorWithGlueSchemaAndIamAuthRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + }, + "MyMskStreamProcessorWithOnlySchemaRegistry": { + "Properties": { + "Code": { + "S3Bucket": "sam-demo-bucket", + "S3Key": "kafka.zip" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "MyMskStreamProcessorWithOnlySchemaRegistryRole", + "Arn" + ] + }, + "Runtime": "nodejs12.x", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryMyKafkaCluster": { + "Properties": { + "FunctionName": { + "Ref": "MyMskStreamProcessorWithOnlySchemaRegistry" + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "abc.xyz.com:9092", + "123.45.67.89:9096" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "SchemaRegistryConfig": { + "EventRecordFormat": "JSON", + "SchemaRegistryURI": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + }, + "SchemaValidationConfigs": [ + { + "Attribute": "KEY" + } + ] + } + }, + "SourceAccessConfigurations": [ + { + "Type": "SASL_SCRAM_512_AUTH", + "URI": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Type": "VPC_SUBNET", + "URI": "subnet:subnet-12345" + }, + { + "Type": "VPC_SECURITY_GROUP", + "URI": "security_group:sg-67890" + } + ], + "StartingPosition": "LATEST", + "Topics": [ + "MyDummyTestTopic" + ] + }, + "Type": "AWS::Lambda::EventSourceMapping" + }, + "MyMskStreamProcessorWithOnlySchemaRegistryRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ], + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": "arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c" + }, + { + "Action": [ + "ec2:CreateNetworkInterface", + "ec2:DescribeNetworkInterfaces", + "ec2:DeleteNetworkInterface", + "ec2:DescribeVpcs", + "ec2:DescribeSubnets", + "ec2:DescribeSecurityGroups" + ], + "Effect": "Allow", + "Resource": "*" + }, + { + "Action": [ + "glue:GetRegistry" + ], + "Effect": "Allow", + "Resource": { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:012345678901:registry/registry1" + } + }, + { + "Action": [ + "glue:GetSchemaVersion" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Sub": "arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:schema/registry1/*" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "SelfManagedKafkaExecutionRolePolicy" + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +}