Skip to content

Commit 329808a

Browse files
RTLSclaude
andcommitted
Add AWS MSK IAM support to Kafka sink YAML config
Allow users to define Kafka sinks with AWS MSK IAM authentication via sequin.yaml by parsing/exporting the aws_access_key_id, aws_secret_access_key, and aws_region fields. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 93f3c8a commit 329808a

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

lib/sequin/transforms/transforms.ex

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,10 @@ defmodule Sequin.Transforms do
223223
tls: sink.tls,
224224
username: sink.username,
225225
password: SensitiveValue.new(sink.password, show_sensitive),
226-
sasl_mechanism: sink.sasl_mechanism
226+
sasl_mechanism: sink.sasl_mechanism,
227+
aws_access_key_id: sink.aws_access_key_id,
228+
aws_secret_access_key: SensitiveValue.new(sink.aws_secret_access_key, show_sensitive),
229+
aws_region: sink.aws_region
227230
})
228231
end
229232

@@ -1136,7 +1139,10 @@ defmodule Sequin.Transforms do
11361139
tls: attrs["tls"] || false,
11371140
username: attrs["username"],
11381141
password: attrs["password"],
1139-
sasl_mechanism: sasl_mechanism
1142+
sasl_mechanism: sasl_mechanism,
1143+
aws_access_key_id: attrs["aws_access_key_id"],
1144+
aws_secret_access_key: attrs["aws_secret_access_key"],
1145+
aws_region: attrs["aws_region"]
11401146
}}
11411147
end
11421148
end
@@ -1428,13 +1434,14 @@ defmodule Sequin.Transforms do
14281434
defp parse_sasl_mechanism("plain"), do: {:ok, :plain}
14291435
defp parse_sasl_mechanism("scram_sha_256"), do: {:ok, :scram_sha_256}
14301436
defp parse_sasl_mechanism("scram_sha_512"), do: {:ok, :scram_sha_512}
1437+
defp parse_sasl_mechanism("aws_msk_iam"), do: {:ok, :aws_msk_iam}
14311438

14321439
defp parse_sasl_mechanism(invalid),
14331440
do:
14341441
{:error,
14351442
Error.validation(
14361443
summary:
1437-
"[sasl_mechanism] Invalid SASL mechanism '#{invalid}'. Must be one of: plain, scram_sha_256, scram_sha_512"
1444+
"[sasl_mechanism] Invalid SASL mechanism '#{invalid}'. Must be one of: plain, scram_sha_256, scram_sha_512, aws_msk_iam"
14381445
)}
14391446

14401447
defp env do

test/sequin/yaml_loader_test.exs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,41 @@ defmodule Sequin.YamlLoaderTest do
12331233
} = consumer.sink
12341234
end
12351235

1236+
test "creates kafka sink consumer with AWS MSK IAM" do
1237+
assert :ok =
1238+
YamlLoader.apply_from_yml!("""
1239+
#{account_and_db_yml()}
1240+
1241+
sinks:
1242+
- name: "kafka-msk-consumer"
1243+
database: "test-db"
1244+
destination:
1245+
type: "kafka"
1246+
hosts: "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098"
1247+
topic: "test-topic"
1248+
tls: true
1249+
sasl_mechanism: "aws_msk_iam"
1250+
aws_access_key_id: "AKIAXXXXXXXXXXXXXXXX"
1251+
aws_secret_access_key: "secret123"
1252+
aws_region: "us-east-1"
1253+
""")
1254+
1255+
assert [consumer] = Repo.all(SinkConsumer)
1256+
1257+
assert consumer.name == "kafka-msk-consumer"
1258+
1259+
assert %KafkaSink{
1260+
type: :kafka,
1261+
hosts: "b-1.msk-cluster.abc123.kafka.us-east-1.amazonaws.com:9098",
1262+
topic: "test-topic",
1263+
tls: true,
1264+
sasl_mechanism: :aws_msk_iam,
1265+
aws_access_key_id: "AKIAXXXXXXXXXXXXXXXX",
1266+
aws_secret_access_key: "secret123",
1267+
aws_region: "us-east-1"
1268+
} = consumer.sink
1269+
end
1270+
12361271
test "creates sqs sink consumer" do
12371272
assert :ok =
12381273
YamlLoader.apply_from_yml!("""

0 commit comments

Comments
 (0)