Skip to content

Commit eadd8a7

Browse files
aws: msk_iam: pass TLS context to credential provider chain
The MSK IAM OAUTHBEARER callback created its AWS credential provider chain with a NULL TLS context: flb_standard_chain_provider_create(config->flb_config, NULL, ...) The provider chain reaches AWS endpoints over HTTPS - in particular the STS AssumeRoleWithWebIdentity call used by IRSA (IAM Roles for Service Accounts) on EKS. With no TLS context that call cannot connect, and the chain silently falls back to the EC2 instance metadata service, picking up the node role instead of the pod's IRSA role. The MSK token is then signed with the wrong principal and the broker rejects it with "SASL authentication error: Access denied", making IRSA unusable with native aws_msk_iam. Create a dedicated TLS instance (verifying peers against the system trust store, as the other AWS plugins do) for each credential provider and tear it down together with the provider. A separate instance is used at each provider-creation site since TLS instances cannot be shared between providers. Fixes #11255 Signed-off-by: James Carpenter <james@james-carpenter.net>
1 parent 2da48de commit eadd8a7

1 file changed

Lines changed: 46 additions & 16 deletions

File tree

src/aws/flb_aws_msk_iam.c

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <fluent-bit/aws/flb_aws_msk_iam.h>
3131

3232
#include <fluent-bit/flb_signv4.h>
33+
#include <fluent-bit/tls/flb_tls.h>
3334
#include <rdkafka.h>
3435

3536
#include <stdio.h>
@@ -167,6 +168,7 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
167168
const char *host)
168169
{
169170
struct flb_aws_provider *temp_provider = NULL;
171+
struct flb_tls *cred_tls = NULL;
170172
struct flb_aws_credentials *creds = NULL;
171173
flb_sds_t payload = NULL;
172174
int encode_result;
@@ -217,35 +219,47 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
217219
flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s",
218220
host, config->region);
219221

222+
/*
223+
* The credential provider chain reaches AWS endpoints over HTTPS (STS for
224+
* IRSA/AssumeRoleWithWebIdentity, the EC2/ECS metadata services, ...), so it
225+
* requires its own TLS context. Passing NULL here makes the STS call fail on
226+
* EKS and silently fall back to the node instance role, signing the MSK token
227+
* with the wrong principal (see fluent/fluent-bit#11255).
228+
*/
229+
cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
230+
FLB_TRUE, /* verify peer */
231+
FLB_FALSE, /* debug */
232+
NULL, NULL, NULL, NULL, NULL, NULL);
233+
if (!cred_tls) {
234+
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create TLS context for credentials");
235+
return NULL;
236+
}
237+
220238
/* Create AWS provider on-demand */
221-
temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL,
239+
temp_provider = flb_standard_chain_provider_create(config->flb_config, cred_tls,
222240
config->region, NULL, NULL,
223241
flb_aws_client_generator(),
224242
NULL);
225243
if (!temp_provider) {
226244
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider");
227-
return NULL;
245+
goto error;
228246
}
229247

230248
if (temp_provider->provider_vtable->init(temp_provider) != 0) {
231249
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider");
232-
flb_aws_provider_destroy(temp_provider);
233-
return NULL;
250+
goto error;
234251
}
235252

236253
/* Get credentials */
237254
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
238255
if (!creds) {
239256
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials");
240-
flb_aws_provider_destroy(temp_provider);
241-
return NULL;
257+
goto error;
242258
}
243259

244260
if (!creds->access_key_id || !creds->secret_access_key) {
245261
flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials");
246-
flb_aws_credentials_destroy(creds);
247-
flb_aws_provider_destroy(temp_provider);
248-
return NULL;
262+
goto error;
249263
}
250264

251265
gmtime_r(&now, &gm);
@@ -553,6 +567,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
553567
if (temp_provider) {
554568
flb_aws_provider_destroy(temp_provider);
555569
}
570+
if (cred_tls) {
571+
flb_tls_destroy(cred_tls);
572+
}
556573

557574
return payload;
558575

@@ -600,6 +617,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
600617
if (temp_provider) {
601618
flb_aws_provider_destroy(temp_provider);
602619
}
620+
if (cred_tls) {
621+
flb_tls_destroy(cred_tls);
622+
}
603623

604624
return NULL;
605625
}
@@ -623,6 +643,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
623643
struct flb_aws_credentials *creds = NULL;
624644
struct flb_kafka_opaque *kafka_opaque;
625645
struct flb_aws_provider *temp_provider = NULL;
646+
struct flb_tls *cred_tls = NULL;
626647
(void) oauthbearer_config;
627648

628649
kafka_opaque = (struct flb_kafka_opaque *) opaque;
@@ -673,13 +694,19 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
673694
}
674695

675696
/* Get credentials for principal (create temporary provider just for this) */
676-
temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL,
677-
config->region, NULL, NULL,
678-
flb_aws_client_generator(),
679-
NULL);
680-
if (temp_provider) {
681-
if (temp_provider->provider_vtable->init(temp_provider) == 0) {
682-
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
697+
cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
698+
FLB_TRUE, /* verify peer */
699+
FLB_FALSE, /* debug */
700+
NULL, NULL, NULL, NULL, NULL, NULL);
701+
if (cred_tls) {
702+
temp_provider = flb_standard_chain_provider_create(config->flb_config, cred_tls,
703+
config->region, NULL, NULL,
704+
flb_aws_client_generator(),
705+
NULL);
706+
if (temp_provider) {
707+
if (temp_provider->provider_vtable->init(temp_provider) == 0) {
708+
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
709+
}
683710
}
684711
}
685712

@@ -710,6 +737,9 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
710737
if (temp_provider) {
711738
flb_aws_provider_destroy(temp_provider);
712739
}
740+
if (cred_tls) {
741+
flb_tls_destroy(cred_tls);
742+
}
713743

714744
if (payload) {
715745
flb_sds_destroy(payload);

0 commit comments

Comments
 (0)