diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index 0000ca576e6..28ce8867edd 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -167,6 +168,7 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, const char *host) { struct flb_aws_provider *temp_provider = NULL; + struct flb_tls *cred_tls = NULL; struct flb_aws_credentials *creds = NULL; flb_sds_t payload = NULL; int encode_result; @@ -217,35 +219,47 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", host, config->region); + /* + * The credential provider chain reaches AWS endpoints over HTTPS (STS for + * IRSA/AssumeRoleWithWebIdentity, the EC2/ECS metadata services, ...), so it + * requires its own TLS context. Passing NULL here makes the STS call fail on + * EKS and silently fall back to the node instance role, signing the MSK token + * with the wrong principal (see fluent/fluent-bit#11255). + */ + cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, /* verify peer */ + FLB_FALSE, /* debug */ + NULL, NULL, NULL, NULL, NULL, NULL); + if (!cred_tls) { + flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create TLS context for credentials"); + return NULL; + } + /* Create AWS provider on-demand */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, + temp_provider = flb_standard_chain_provider_create(config->flb_config, cred_tls, config->region, NULL, NULL, flb_aws_client_generator(), NULL); if (!temp_provider) { flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); - return NULL; + goto error; } if (temp_provider->provider_vtable->init(temp_provider) != 0) { flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); - flb_aws_provider_destroy(temp_provider); - return NULL; + goto error; } /* Get credentials */ creds = temp_provider->provider_vtable->get_credentials(temp_provider); if (!creds) { flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); - flb_aws_provider_destroy(temp_provider); - return NULL; + goto error; } if (!creds->access_key_id || !creds->secret_access_key) { flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); - flb_aws_credentials_destroy(creds); - flb_aws_provider_destroy(temp_provider); - return NULL; + goto error; } gmtime_r(&now, &gm); @@ -553,6 +567,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (temp_provider) { flb_aws_provider_destroy(temp_provider); } + if (cred_tls) { + flb_tls_destroy(cred_tls); + } return payload; @@ -600,6 +617,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (temp_provider) { flb_aws_provider_destroy(temp_provider); } + if (cred_tls) { + flb_tls_destroy(cred_tls); + } return NULL; } @@ -623,6 +643,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, struct flb_aws_credentials *creds = NULL; struct flb_kafka_opaque *kafka_opaque; struct flb_aws_provider *temp_provider = NULL; + struct flb_tls *cred_tls = NULL; (void) oauthbearer_config; kafka_opaque = (struct flb_kafka_opaque *) opaque; @@ -673,13 +694,19 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, } /* Get credentials for principal (create temporary provider just for this) */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (temp_provider) { - if (temp_provider->provider_vtable->init(temp_provider) == 0) { - creds = temp_provider->provider_vtable->get_credentials(temp_provider); + cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, /* verify peer */ + FLB_FALSE, /* debug */ + NULL, NULL, NULL, NULL, NULL, NULL); + if (cred_tls) { + temp_provider = flb_standard_chain_provider_create(config->flb_config, cred_tls, + config->region, NULL, NULL, + flb_aws_client_generator(), + NULL); + if (temp_provider) { + if (temp_provider->provider_vtable->init(temp_provider) == 0) { + creds = temp_provider->provider_vtable->get_credentials(temp_provider); + } } } @@ -710,6 +737,9 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, if (temp_provider) { flb_aws_provider_destroy(temp_provider); } + if (cred_tls) { + flb_tls_destroy(cred_tls); + } if (payload) { flb_sds_destroy(payload);