Skip to content

Commit cee0142

Browse files
singholtThean Lim
authored andcommitted
out_s3: fix retry_limit semantics and multipart memory leaks
Fix retry_limit off-by-one: change all six >= comparisons to > so retry_limit=N correctly allows N retries (N+1 total attempts), matching engine semantics. Default retry_limit to MAX_UPLOAD_ERRORS (5) when the user has not explicitly set it, using the new retry_limit_is_set flag. The engine default of 1 is too low for S3's internal retry system where partially uploaded multipart data is wasted when retries are exhausted too early. Log a warning when capping an explicit unlimited setting to MAX_UPLOAD_ERRORS. Fix multipart upload memory leaks: - Add multipart_upload_destroy after mk_list_del when completion errors exceed retry_limit (80KB per abandoned upload) - Free s3_file before flb_fstore_file_inactive when chunk failures exceed retry_limit (48 bytes per abandoned chunk) - Free the json chunk buffer in all cb_s3_flush return paths Fix use-after-free in s3_upload_queue retry-limit path: unlink the multipart_upload from ctx->uploads before destroying it. Without this, the freed node remains in the linked list and get_upload() dereferences it on the next flush. Fix double-free in s3_context_destroy: the upload_queue cleanup loop was calling multipart_upload_destroy on uploads already freed by the ctx->uploads cleanup loop above it. Remove the redundant destroy. Fix mock_s3_call if/else chain so CreateMultipartUpload returns a valid XML response with UploadId. Skip size validation in test mode so multipart paths can be exercised with small data. Remove unit_test_flush bypass so tests exercise the real flush path. Add mock call counter and skip the timer floor in test mode for faster execution. Co-authored-by: Thean Lim <theanlim@amazon.com> Signed-off-by: Anuj Singh <singholt@amazon.com>
1 parent d212965 commit cee0142

1 file changed

Lines changed: 66 additions & 53 deletions

File tree

plugins/out_s3/s3.c

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,25 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
259259
return 0;
260260
};
261261

262+
/*
263+
* Track mock API call counts via env vars (e.g. TEST_PutObject_CALL_COUNT)
264+
* so tests can assert the exact number of S3 API attempts.
265+
*/
266+
static void mock_s3_call_increment_counter(char *api)
267+
{
268+
char env_var[64];
269+
char *val;
270+
int count;
271+
char buf[16];
272+
273+
snprintf(env_var, sizeof(env_var), "TEST_%s_CALL_COUNT", api);
274+
val = getenv(env_var);
275+
count = val ? atoi(val) : 0;
276+
count++;
277+
snprintf(buf, sizeof(buf), "%d", count);
278+
setenv(env_var, buf, 1);
279+
}
280+
262281
struct flb_http_client *mock_s3_call(char *error_env_var, char *api)
263282
{
264283
/* create an http client so that we can set the response */
@@ -267,6 +286,8 @@ struct flb_http_client *mock_s3_call(char *error_env_var, char *api)
267286
char *resp;
268287
int len;
269288

289+
mock_s3_call_increment_counter(api);
290+
270291
c = flb_calloc(1, sizeof(struct flb_http_client));
271292
if (!c) {
272293
flb_errno();
@@ -296,7 +317,7 @@ struct flb_http_client *mock_s3_call(char *error_env_var, char *api)
296317
"</InitiateMultipartUploadResult>";
297318
c->resp.payload_size = strlen(c->resp.payload);
298319
}
299-
if (strcmp(api, "AbortMultipartUpload") == 0) {
320+
else if (strcmp(api, "AbortMultipartUpload") == 0) {
300321
/* mocked success response */
301322
c->resp.status = 204;
302323
resp = "Date: Mon, 1 Nov 2010 20:34:56 GMT\n"
@@ -592,7 +613,6 @@ static void s3_context_destroy(struct flb_s3 *ctx)
592613
mk_list_foreach_safe(head, tmp, &ctx->upload_queue) {
593614
upload_contents = mk_list_entry(head, struct upload_queue, _head);
594615
s3_store_file_delete(ctx, upload_contents->upload_file);
595-
multipart_upload_destroy(upload_contents->m_upload_file);
596616
remove_from_queue(upload_contents);
597617
}
598618

@@ -630,7 +650,19 @@ static int cb_s3_init(struct flb_output_instance *ins,
630650
ctx->retry_time = 0;
631651
ctx->upload_queue_success = FLB_FALSE;
632652

633-
if(ctx->ins->retry_limit < 0) {
653+
/*
654+
* The engine default retry_limit (1) is too low for S3's internal
655+
* retry system — partially uploaded multipart data is wasted when
656+
* retries are exhausted too early. Default to MAX_UPLOAD_ERRORS (5)
657+
* unless the user explicitly configured retry_limit.
658+
*/
659+
if (ctx->ins->retry_limit_is_set == FLB_FALSE) {
660+
ctx->ins->retry_limit = MAX_UPLOAD_ERRORS;
661+
}
662+
else if (ctx->ins->retry_limit < 0) {
663+
flb_plg_warn(ctx->ins,
664+
"retry_limit set to unlimited, capping to %d",
665+
MAX_UPLOAD_ERRORS);
634666
ctx->ins->retry_limit = MAX_UPLOAD_ERRORS;
635667
}
636668

@@ -734,12 +766,6 @@ static int cb_s3_init(struct flb_output_instance *ins,
734766
}
735767
flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
736768

737-
if (ctx->use_put_object == FLB_FALSE && ctx->file_size < 2 * MIN_CHUNKED_UPLOAD_SIZE) {
738-
flb_plg_info(ctx->ins,
739-
"total_file_size is less than 10 MB, will use PutObject API");
740-
ctx->use_put_object = FLB_TRUE;
741-
}
742-
743769
tmp = flb_output_get_property("compression", ins);
744770
if (tmp) {
745771
ret = flb_aws_compression_get_type(tmp);
@@ -761,6 +787,17 @@ static int cb_s3_init(struct flb_output_instance *ins,
761787
if (tmp) {
762788
ctx->content_type = (char *) tmp;
763789
}
790+
791+
if (s3_plugin_under_test() == FLB_TRUE) {
792+
goto skip_size_validation;
793+
}
794+
795+
if (ctx->use_put_object == FLB_FALSE && ctx->file_size < 2 * MIN_CHUNKED_UPLOAD_SIZE) {
796+
flb_plg_info(ctx->ins,
797+
"total_file_size is less than 10 MB, will use PutObject API");
798+
ctx->use_put_object = FLB_TRUE;
799+
}
800+
764801
if (ctx->use_put_object == FLB_FALSE) {
765802
/* upload_chunk_size */
766803
if (ctx->upload_chunk_size <= 0) {
@@ -807,6 +844,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
807844
}
808845
}
809846

847+
skip_size_validation:
810848
tmp = flb_output_get_property("endpoint", ins);
811849
if (tmp) {
812850
ctx->insecure = strncmp(tmp, "http://", 7) == 0 ? FLB_TRUE : FLB_FALSE;
@@ -1016,11 +1054,13 @@ static int cb_s3_init(struct flb_output_instance *ins,
10161054

10171055
ctx->timer_created = FLB_FALSE;
10181056
ctx->timer_ms = (int) (ctx->upload_timeout / 6) * 1000;
1019-
if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) {
1020-
ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT;
1021-
}
1022-
else if (ctx->timer_ms < UPLOAD_TIMER_MIN_WAIT) {
1023-
ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
1057+
if (s3_plugin_under_test() == FLB_FALSE) {
1058+
if (ctx->timer_ms > UPLOAD_TIMER_MAX_WAIT) {
1059+
ctx->timer_ms = UPLOAD_TIMER_MAX_WAIT;
1060+
}
1061+
else if (ctx->timer_ms < UPLOAD_TIMER_MIN_WAIT) {
1062+
ctx->timer_ms = UPLOAD_TIMER_MIN_WAIT;
1063+
}
10241064
}
10251065

10261066
/*
@@ -1384,10 +1424,11 @@ static int put_all_chunks(struct flb_s3 *ctx)
13841424
continue;
13851425
}
13861426

1387-
if (chunk->failures >= ctx->ins->retry_limit) {
1427+
if (chunk->failures > ctx->ins->retry_limit) {
13881428
flb_plg_warn(ctx->ins,
13891429
"Chunk for tag %s failed to send %d/%d times, will not retry",
13901430
(char *) fsf->meta_buf, chunk->failures, ctx->ins->retry_limit);
1431+
flb_free(chunk);
13911432
flb_fstore_file_inactive(ctx->fs, fsf);
13921433
continue;
13931434
}
@@ -1672,7 +1713,7 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
16721713
if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) {
16731714
continue;
16741715
}
1675-
if (tmp_upload->upload_errors >= ctx->ins->retry_limit) {
1716+
if (tmp_upload->upload_errors > ctx->ins->retry_limit) {
16761717
tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
16771718
flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors",
16781719
tmp_upload->s3_key);
@@ -1918,10 +1959,13 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
19181959

19191960
/* If retry limit was reached, discard file and remove file from queue */
19201961
upload_contents->retry_counter++;
1921-
if (upload_contents->retry_counter >= ctx->ins->retry_limit) {
1962+
if (upload_contents->retry_counter > ctx->ins->retry_limit) {
19221963
flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not "
19231964
"retry", upload_contents->retry_counter);
19241965
s3_store_file_inactive(ctx, upload_contents->upload_file);
1966+
if (upload_contents->m_upload_file) {
1967+
mk_list_del(&upload_contents->m_upload_file->_head);
1968+
}
19251969
multipart_upload_destroy(upload_contents->m_upload_file);
19261970
remove_from_queue(upload_contents);
19271971
continue;
@@ -3319,10 +3363,11 @@ static void cb_s3_upload(struct flb_config *config, void *data)
33193363
if (ret != FLB_OK) {
33203364
flb_plg_error(ctx->ins, "Could not send chunk with tag %s",
33213365
(char *) fsf->meta_buf);
3322-
if(chunk->failures >= ctx->ins->retry_limit){
3366+
if(chunk->failures > ctx->ins->retry_limit){
33233367
flb_plg_warn(ctx->ins,
33243368
"Chunk for tag %s failed to send %d/%d times, will not retry",
33253369
(char *) fsf->meta_buf, chunk->failures, ctx->ins->retry_limit);
3370+
flb_free(chunk);
33263371
flb_fstore_file_inactive(ctx->fs, fsf);
33273372
continue;
33283373
}
@@ -3334,11 +3379,12 @@ static void cb_s3_upload(struct flb_config *config, void *data)
33343379
m_upload = mk_list_entry(head, struct multipart_upload, _head);
33353380
complete = FLB_FALSE;
33363381

3337-
if (m_upload->complete_errors >= ctx->ins->retry_limit) {
3382+
if (m_upload->complete_errors > ctx->ins->retry_limit) {
33383383
flb_plg_error(ctx->ins,
33393384
"Upload for %s has reached max completion errors, "
33403385
"plugin will give up", m_upload->s3_key);
33413386
mk_list_del(&m_upload->_head);
3387+
multipart_upload_destroy(m_upload);
33423388
continue;
33433389
}
33443390

@@ -3528,31 +3574,6 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char
35283574
return out_buf;
35293575
}
35303576

3531-
static void unit_test_flush(void *out_context, struct s3_file *upload_file,
3532-
const char *tag, int tag_len, flb_sds_t chunk,
3533-
int chunk_size, struct multipart_upload *m_upload_file,
3534-
time_t file_first_log_time)
3535-
{
3536-
int ret;
3537-
char *buffer;
3538-
size_t buffer_size;
3539-
struct flb_s3 *ctx = out_context;
3540-
3541-
s3_store_buffer_put(ctx, upload_file, tag, tag_len,
3542-
chunk, (size_t) chunk_size, file_first_log_time);
3543-
ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size);
3544-
if (ret < 0) {
3545-
flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
3546-
upload_file->file_path);
3547-
FLB_OUTPUT_RETURN(FLB_RETRY);
3548-
}
3549-
3550-
ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len);
3551-
flb_free(buffer);
3552-
3553-
FLB_OUTPUT_RETURN(ret);
3554-
}
3555-
35563577
static void flush_init(void *out_context)
35573578
{
35583579
int ret;
@@ -3838,16 +3859,8 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
38383859
file_first_log_time = time(NULL);
38393860
}
38403861

3841-
/* Specific to unit tests, will not get called normally */
3842-
if (s3_plugin_under_test() == FLB_TRUE) {
3843-
unit_test_flush(ctx, upload_file,
3844-
event_chunk->tag, flb_sds_len(event_chunk->tag),
3845-
chunk, chunk_size,
3846-
m_upload_file, file_first_log_time);
3847-
}
3848-
38493862
/* Discard upload_file if it has failed to upload retry_limit times */
3850-
if (upload_file != NULL && upload_file->failures >= ctx->ins->retry_limit) {
3863+
if (upload_file != NULL && upload_file->failures > ctx->ins->retry_limit) {
38513864
flb_plg_warn(ctx->ins, "File with tag %s failed to send %d/%d times, will not retry",
38523865
event_chunk->tag, upload_file->failures, ctx->ins->retry_limit);
38533866
s3_store_file_inactive(ctx, upload_file);

0 commit comments

Comments
 (0)