Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 106 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx,
time_t file_first_log_time);

static void remove_from_queue(struct upload_queue *entry);
static void s3_chunk_retry_exhausted_cleanup(struct flb_s3 *ctx,
struct s3_file *chunk_file);
static int s3_get_retry_exhausted_action(const char *value);

static int blob_initialize_authorization_endpoint_upstream(struct flb_s3 *context);

Expand All @@ -97,21 +100,21 @@ static struct flb_aws_header *get_content_encoding_header(int compression_type)
.val = "gzip",
.val_len = 4,
};

static struct flb_aws_header zstd_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "zstd",
.val_len = 4,
};

static struct flb_aws_header snappy_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "snappy",
.val_len = 6,
};

switch (compression_type) {
case FLB_AWS_COMPRESS_GZIP:
return &gzip_header;
Expand Down Expand Up @@ -196,7 +199,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP ||
if (ctx->compression == FLB_AWS_COMPRESS_GZIP ||
ctx->compression == FLB_AWS_COMPRESS_ZSTD ||
ctx->compression == FLB_AWS_COMPRESS_SNAPPY) {
headers_len++;
Expand Down Expand Up @@ -228,7 +231,7 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression == FLB_AWS_COMPRESS_GZIP ||
if (ctx->compression == FLB_AWS_COMPRESS_GZIP ||
ctx->compression == FLB_AWS_COMPRESS_ZSTD ||
ctx->compression == FLB_AWS_COMPRESS_SNAPPY) {
encoding_header = get_content_encoding_header(ctx->compression);
Expand Down Expand Up @@ -628,6 +631,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
int ret;
int action;
flb_sds_t tmp_sds;
char *role_arn = NULL;
char *session_name;
Expand Down Expand Up @@ -681,6 +685,15 @@ static int cb_s3_init(struct flb_output_instance *ins,
return -1;
}

action = s3_get_retry_exhausted_action(ctx->retry_exhausted_action_str);
if (action == -1) {
flb_plg_error(ctx->ins,
"invalid retry_exhausted_action value '%s'",
ctx->retry_exhausted_action_str ? ctx->retry_exhausted_action_str : "(null)");
return -1;
}
ctx->retry_exhausted_action = action;

/* the check against -1 is works here because size_t is unsigned
* and (int) -1 == unsigned max value
* Fluent Bit uses -1 (which becomes max value) to indicate undefined
Expand Down Expand Up @@ -834,7 +847,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
"total_file_size is less than 10 MB, will use PutObject API");
ctx->use_put_object = FLB_TRUE;
}

if (ctx->use_put_object == FLB_FALSE) {
/* upload_chunk_size */
if (ctx->upload_chunk_size <= 0) {
Expand Down Expand Up @@ -1446,6 +1459,9 @@ static int put_all_chunks(struct flb_s3 *ctx)
if (fs_stream == ctx->stream_upload) {
continue;
}
if (fs_stream == ctx->stream_quarantine) {
continue;
}
/* skip metadata stream */
if (fs_stream == ctx->stream_metadata) {
continue;
Expand All @@ -1464,8 +1480,7 @@ static int put_all_chunks(struct flb_s3 *ctx)
flb_plg_warn(ctx->ins,
"Chunk for tag %s failed to send %d/%d times, will not retry",
(char *) fsf->meta_buf, chunk->failures, ctx->ins->retry_limit);
flb_free(chunk);
flb_fstore_file_inactive(ctx->fs, fsf);
s3_chunk_retry_exhausted_cleanup(ctx, chunk);
continue;
}

Expand Down Expand Up @@ -1946,6 +1961,72 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file,
return 0;
}

/*
* Terminal retry exhaustion must permanently remove local buffer files.
* Unlike inactive state (recoverable/restart-resumable), terminal cleanup
* must release store_dir_limit_size accounting and delete on-disk state.
*/
static void s3_chunk_retry_exhausted_cleanup(struct flb_s3 *ctx,
struct s3_file *chunk_file)
{
size_t reclaim_size;
int ret;

if (chunk_file == NULL) {
return;
}

if (ctx->retry_exhausted_action == S3_RETRY_EXHAUSTED_QUARANTINE) {
reclaim_size = chunk_file->size;

if (ctx->quarantine_dir_limit_size > 0 &&
ctx->quarantine_buffer_size + reclaim_size > ctx->quarantine_dir_limit_size) {
Comment thread
cosmo0920 marked this conversation as resolved.
flb_plg_warn(ctx->ins,
"quarantine limit reached, deleting retry-exhausted chunk");
s3_store_file_delete(ctx, chunk_file);
return;
}

ret = s3_store_file_quarantine(ctx, chunk_file);
if (ret < 0) {
flb_plg_error(ctx->ins,
"could not quarantine, deleting retry-exhausted chunk");
s3_store_file_delete(ctx, chunk_file);
return;
}

if (ctx->current_buffer_size >= reclaim_size) {
ctx->current_buffer_size -= reclaim_size;
}
else {
ctx->current_buffer_size = 0;
}
ctx->quarantine_buffer_size += reclaim_size;

flb_plg_warn(ctx->ins,
"retry-exhausted chunk moved to quarantine");
return;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

s3_store_file_delete(ctx, chunk_file);
}

static int s3_get_retry_exhausted_action(const char *value)
{
if (value == NULL) {
return S3_RETRY_EXHAUSTED_QUARANTINE;
}

if (strcasecmp(value, "delete") == 0) {
return S3_RETRY_EXHAUSTED_DELETE;
}
if (strcasecmp(value, "quarantine") == 0) {
return S3_RETRY_EXHAUSTED_QUARANTINE;
}

return -1;
}

/* Uploads all chunk files in queue synchronously */
static void s3_upload_queue(struct flb_config *config, void *out_context)
{
Expand Down Expand Up @@ -1998,7 +2079,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
if (upload_contents->retry_counter > ctx->ins->retry_limit) {
flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not "
"retry", upload_contents->retry_counter);
s3_store_file_inactive(ctx, upload_contents->upload_file);
s3_chunk_retry_exhausted_cleanup(ctx, upload_contents->upload_file);
if (upload_contents->m_upload_file) {
mk_list_del(&upload_contents->m_upload_file->_head);
}
Expand Down Expand Up @@ -3403,8 +3484,7 @@ static void cb_s3_upload(struct flb_config *config, void *data)
flb_plg_warn(ctx->ins,
"Chunk for tag %s failed to send %d/%d times, will not retry",
(char *) fsf->meta_buf, chunk->failures, ctx->ins->retry_limit);
flb_free(chunk);
flb_fstore_file_inactive(ctx->fs, fsf);
s3_chunk_retry_exhausted_cleanup(ctx, chunk);
continue;
}
}
Expand Down Expand Up @@ -3937,7 +4017,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) {
flb_plg_warn(ctx->ins, "File with tag %s failed to send %d/%d times, will not retry",
event_chunk->tag, upload_file->failures, ctx->ins->retry_limit);
s3_store_file_inactive(ctx, upload_file);
s3_chunk_retry_exhausted_cleanup(ctx, upload_file);
upload_file = NULL;
}

Expand Down Expand Up @@ -4177,6 +4257,13 @@ static struct flb_config_map config_map[] = {
"the `store_dir` to limit disk usage. If the limit is reached, "
"data will be discarded. Default is 0 which means unlimited."
},
{
FLB_CONFIG_MAP_SIZE, "quarantine_dir_limit_size", "0",
0, FLB_TRUE, offsetof(struct flb_s3, quarantine_dir_limit_size),
"Limit size for retry-exhausted quarantined chunks. Applies when "
"retry_exhausted_action is set to 'quarantine'. If limit is reached, "
"retry-exhausted chunks are deleted. Default is 0 (unlimited)."
},

{
FLB_CONFIG_MAP_STR, "s3_key_format", "/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S",
Expand Down Expand Up @@ -4229,6 +4316,13 @@ static struct flb_config_map config_map[] = {
"received chunk to be swapped with a later chunk, resulting in data shuffling. "
"This feature prevents this shuffling by using a queue logic for uploads."
},
{
FLB_CONFIG_MAP_STR, "retry_exhausted_action", "quarantine",
0, FLB_TRUE, offsetof(struct flb_s3, retry_exhausted_action_str),
"Action for chunks that exceeded retry_limit. Supported values are "
"'delete' (remove permanently) and 'quarantine' (move to quarantine and "
"remove from active buffer accounting)."
},

{
FLB_CONFIG_MAP_STR, "log_key", NULL,
Expand Down
7 changes: 7 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
#define DEFAULT_UPLOAD_TIMEOUT 3600

#define MAX_UPLOAD_ERRORS 5
#define S3_RETRY_EXHAUSTED_DELETE 0
#define S3_RETRY_EXHAUSTED_QUARANTINE 1

/*
* If we see repeated errors on an upload/chunk, we will discard it
Expand Down Expand Up @@ -113,6 +115,7 @@ struct flb_s3 {
char *sts_endpoint;
char *canned_acl;
char *content_type;
char *retry_exhausted_action_str;
char *storage_class;
char *log_key;
char *external_id;
Expand All @@ -122,10 +125,12 @@ struct flb_s3 {
int use_put_object;
int send_content_md5;
int static_file_path;
int retry_exhausted_action;
int compression;
int port;
int insecure;
size_t store_dir_limit_size;
size_t quarantine_dir_limit_size;

struct flb_blob_db blob_db;
flb_sds_t blob_database_file;
Expand All @@ -143,6 +148,7 @@ struct flb_s3 {

/* track the total amount of buffered data */
size_t current_buffer_size;
size_t quarantine_buffer_size;

struct flb_aws_provider *provider;
struct flb_aws_provider *base_provider;
Expand All @@ -164,6 +170,7 @@ struct flb_s3 {
struct flb_fstore *fs;
struct flb_fstore_stream *stream_active; /* default active stream */
struct flb_fstore_stream *stream_upload; /* multipart upload stream */
struct flb_fstore_stream *stream_quarantine; /* retry-exhausted stream */
struct flb_fstore_stream *stream_metadata; /* s3 metadata stream */

/*
Expand Down
60 changes: 59 additions & 1 deletion plugins/out_s3/s3_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ static int set_files_context(struct flb_s3 *ctx)
struct flb_fstore_stream *fs_stream;
struct flb_fstore_file *fsf;
struct s3_file *s3_file;
ssize_t file_size;

mk_list_foreach(head, &ctx->fs->streams) {
fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
Expand Down Expand Up @@ -246,6 +247,11 @@ static int set_files_context(struct flb_s3 *ctx)
s3_file->first_log_time = time(NULL);
s3_file->create_time = time(NULL);

file_size = cio_chunk_get_real_size(fsf->chunk);
if (file_size > 0) {
s3_file->size = (size_t) file_size;
}

/* Use fstore opaque 'data' reference to keep our context */
fsf->data = s3_file;
}
Expand Down Expand Up @@ -321,6 +327,16 @@ int s3_store_init(struct flb_s3 *ctx)
}
ctx->stream_upload = fs_stream;

/* Terminal quarantine stream */
fs_stream = flb_fstore_stream_create(ctx->fs, "quarantine");
if (!fs_stream) {
flb_plg_error(ctx->ins, "could not initialize quarantine stream");
flb_fstore_destroy(fs);
ctx->fs = NULL;
return -1;
}
ctx->stream_quarantine = fs_stream;
Comment thread
cosmo0920 marked this conversation as resolved.

set_files_context(ctx);
return 0;
}
Expand Down Expand Up @@ -376,7 +392,8 @@ int s3_store_has_data(struct flb_s3 *ctx)
mk_list_foreach(head, &ctx->fs->streams) {
/* skip multi upload stream */
fs_stream = mk_list_entry(head, struct flb_fstore_stream, _head);
if (fs_stream == ctx->stream_upload) {
if (fs_stream == ctx->stream_upload ||
fs_stream == ctx->stream_quarantine) {
continue;
}

Expand Down Expand Up @@ -413,6 +430,47 @@ int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file)
return ret;
}

int s3_store_file_quarantine(struct flb_s3 *ctx, struct s3_file *s3_file)
{
struct flb_fstore_file *fsf;
struct flb_fstore_file *qfsf;
void *buf;
size_t size;
int ret;

fsf = s3_file->fsf;
ret = flb_fstore_file_content_copy(ctx->fs, fsf, &buf, &size);
if (ret < 0) {
return -1;
}

qfsf = flb_fstore_file_create(ctx->fs, ctx->stream_quarantine, fsf->name, size);
if (qfsf == NULL) {
flb_free(buf);
return -1;
}

if (fsf->meta_buf != NULL && fsf->meta_size > 0) {
ret = flb_fstore_file_meta_set(ctx->fs, qfsf, fsf->meta_buf, fsf->meta_size);
if (ret < 0) {
flb_free(buf);
flb_fstore_file_delete(ctx->fs, qfsf);
return -1;
}
}

ret = flb_fstore_file_append(qfsf, buf, size);
flb_free(buf);
if (ret < 0) {
flb_fstore_file_delete(ctx->fs, qfsf);
return -1;
}

flb_fstore_file_delete(ctx->fs, fsf);
flb_free(s3_file);
return 0;
}

int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file)
{
struct flb_fstore_file *fsf;
Expand Down
1 change: 1 addition & 0 deletions plugins/out_s3/s3_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ int s3_store_has_data(struct flb_s3 *ctx);
int s3_store_has_uploads(struct flb_s3 *ctx);

int s3_store_file_inactive(struct flb_s3 *ctx, struct s3_file *s3_file);
int s3_store_file_quarantine(struct flb_s3 *ctx, struct s3_file *s3_file);
struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag,
int tag_len);
int s3_store_file_delete(struct flb_s3 *ctx, struct s3_file *s3_file);
Expand Down
Loading
Loading