Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions src/aws/flb_aws_msk_iam.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fluent-bit/aws/flb_aws_msk_iam.h>

#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/tls/flb_tls.h>
#include <rdkafka.h>

#include <stdio.h>
Expand Down Expand Up @@ -167,6 +168,7 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
const char *host)
{
struct flb_aws_provider *temp_provider = NULL;
struct flb_tls *cred_tls = NULL;
struct flb_aws_credentials *creds = NULL;
flb_sds_t payload = NULL;
int encode_result;
Expand Down Expand Up @@ -217,35 +219,47 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s",
host, config->region);

/*
* The credential provider chain reaches AWS endpoints over HTTPS (STS for
* IRSA/AssumeRoleWithWebIdentity, the EC2/ECS metadata services, ...), so it
* requires its own TLS context. Passing NULL here makes the STS call fail on
* EKS and silently fall back to the node instance role, signing the MSK token
* with the wrong principal (see fluent/fluent-bit#11255).
*/
cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
FLB_TRUE, /* verify peer */
FLB_FALSE, /* debug */
NULL, NULL, NULL, NULL, NULL, NULL);
if (!cred_tls) {
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create TLS context for credentials");
return NULL;
}

/* Create AWS provider on-demand */
temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL,
temp_provider = flb_standard_chain_provider_create(config->flb_config, cred_tls,
config->region, NULL, NULL,
flb_aws_client_generator(),
NULL);
if (!temp_provider) {
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider");
return NULL;
goto error;
}

if (temp_provider->provider_vtable->init(temp_provider) != 0) {
Comment thread
jamesdangercarpenter marked this conversation as resolved.
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider");
flb_aws_provider_destroy(temp_provider);
return NULL;
goto error;
}

/* Get credentials */
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
if (!creds) {
flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials");
flb_aws_provider_destroy(temp_provider);
return NULL;
goto error;
}

if (!creds->access_key_id || !creds->secret_access_key) {
flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials");
flb_aws_credentials_destroy(creds);
flb_aws_provider_destroy(temp_provider);
return NULL;
goto error;
}

gmtime_r(&now, &gm);
Expand Down Expand Up @@ -553,6 +567,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
if (temp_provider) {
flb_aws_provider_destroy(temp_provider);
}
if (cred_tls) {
flb_tls_destroy(cred_tls);
}

return payload;

Expand Down Expand Up @@ -600,6 +617,9 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config,
if (temp_provider) {
flb_aws_provider_destroy(temp_provider);
}
if (cred_tls) {
flb_tls_destroy(cred_tls);
}

return NULL;
}
Expand All @@ -623,6 +643,7 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
struct flb_aws_credentials *creds = NULL;
struct flb_kafka_opaque *kafka_opaque;
struct flb_aws_provider *temp_provider = NULL;
struct flb_tls *cred_tls = NULL;
(void) oauthbearer_config;

kafka_opaque = (struct flb_kafka_opaque *) opaque;
Expand Down Expand Up @@ -673,13 +694,19 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
}

/* Get credentials for principal (create temporary provider just for this) */
temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL,
config->region, NULL, NULL,
flb_aws_client_generator(),
NULL);
if (temp_provider) {
if (temp_provider->provider_vtable->init(temp_provider) == 0) {
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
FLB_TRUE, /* verify peer */
FLB_FALSE, /* debug */
NULL, NULL, NULL, NULL, NULL, NULL);
if (cred_tls) {
temp_provider = flb_standard_chain_provider_create(config->flb_config, cred_tls,
config->region, NULL, NULL,
flb_aws_client_generator(),
NULL);
if (temp_provider) {
if (temp_provider->provider_vtable->init(temp_provider) == 0) {
creds = temp_provider->provider_vtable->get_credentials(temp_provider);
}
}
}

Expand Down Expand Up @@ -710,6 +737,9 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk,
if (temp_provider) {
flb_aws_provider_destroy(temp_provider);
}
if (cred_tls) {
flb_tls_destroy(cred_tls);
}

if (payload) {
flb_sds_destroy(payload);
Expand Down