diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 25244c1df04..b5c02a5b7b6 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -169,6 +169,44 @@ int flb_tail_file_offset_marker_matches(struct flb_tail_file *file) return FLB_TRUE; } +static void update_resumable_offset_state(struct flb_tail_file *file) +{ +#ifdef FLB_HAVE_SQLDB + if (file->config->db) { + flb_tail_db_file_offset(file, file->config); + return; + } +#endif + + flb_tail_file_update_offset_marker(file); +} + +int flb_tail_file_reset_on_truncate(struct flb_tail_file *file, + int64_t size_delta, + const char *caller) +{ + int64_t offset; + struct flb_tail_config *ctx = file->config; + + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return -1; + } + + flb_plg_debug(ctx->ins, + "%s: inode=%"PRIu64" file truncated %s (diff: %"PRId64" bytes)", + caller, file->inode, file->name, size_delta); + + file->offset = offset; + file->stream_offset = offset; + file->last_processed_bytes = 0; + file->buf_len = 0; + + update_resumable_offset_state(file); + return 0; +} + static uint64_t stat_get_st_dev(struct stat *st) { #ifdef FLB_SYSTEM_WINDOWS @@ -1678,9 +1716,10 @@ int flb_tail_file_remove_all(struct flb_tail_config *ctx) static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *file) { int ret; - int64_t offset; struct stat st; + (void) ctx; + ret = fstat(file->fd, &st); if (ret == -1) { flb_errno(); @@ -1694,23 +1733,10 @@ static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *fi /* Check if the file was truncated by comparing current size with previous size */ if (size_delta < 0) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); + if (flb_tail_file_reset_on_truncate(file, size_delta, + "adjust_counters") == -1) { return FLB_TAIL_ERROR; } - - flb_plg_debug(ctx->ins, "adjust_counters: inode=%"PRIu64" file truncated %s (diff: %"PRId64" bytes)", - file->inode, file->name, size_delta); - file->offset = offset; - file->buf_len = 0; - - /* Update offset in the database file */ -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_tail_db_file_offset(file, ctx); - } -#endif } else { // Avoid negative pending_bytes when fstat() has stale data and size < offset @@ -1766,11 +1792,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) file->buf_len -= processed_bytes; file->buf_data[file->buf_len] = '\0'; -#ifdef FLB_HAVE_SQLDB - if (file->config->db) { - flb_tail_db_file_offset(file, file->config); - } -#endif + update_resumable_offset_state(file); return adjust_counters(ctx, file); } } @@ -1954,11 +1976,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) file->buf_len -= processed_bytes; file->buf_data[file->buf_len] = '\0'; -#ifdef FLB_HAVE_SQLDB - if (file->config->db) { - flb_tail_db_file_offset(file, file->config); - } -#endif + update_resumable_offset_state(file); /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ ret = adjust_counters(ctx, file); diff --git a/plugins/in_tail/tail_file.h b/plugins/in_tail/tail_file.h index ee8003f1d19..fc8b21eb119 100644 --- a/plugins/in_tail/tail_file.h +++ b/plugins/in_tail/tail_file.h @@ -155,5 +155,8 @@ static inline off_t flb_tail_file_db_offset(struct flb_tail_file *file) int flb_tail_file_update_offset_marker(struct flb_tail_file *file); int flb_tail_file_offset_marker_matches(struct flb_tail_file *file); +int flb_tail_file_reset_on_truncate(struct flb_tail_file *file, + int64_t size_delta, + const char *caller); #endif diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c index ff45ad8dc25..3363b7e2ae6 100644 --- a/plugins/in_tail/tail_fs_inotify.c +++ b/plugins/in_tail/tail_fs_inotify.c @@ -31,6 +31,7 @@ #include "tail_config.h" #include "tail_file.h" #include "tail_db.h" +#include "tail_scan.h" #include "tail_signal.h" #include @@ -163,17 +164,138 @@ static int flb_tail_fs_add_rotated(struct flb_tail_file *file) return tail_fs_add(file, FLB_FALSE); } +static int reconcile_file_state(struct flb_tail_config *ctx, + struct flb_tail_file *file, + const char *caller, + int *pending_data_detected) +{ + int ret; + int64_t size_delta; + struct stat st; + + if (pending_data_detected != NULL) { + *pending_data_detected = FLB_FALSE; + } + + ret = fstat(file->fd, &st); + if (ret == -1) { + flb_plg_debug(ctx->ins, "inode=%"PRIu64" error stat(2) %s, removing", + file->inode, file->name); + flb_tail_file_remove(file); + return -1; + } + + size_delta = st.st_size - file->size; + if (size_delta != 0) { + file->size = st.st_size; + } + + if (size_delta < 0 || st.st_size < file->offset || + flb_tail_file_offset_marker_matches(file) != FLB_TRUE) { + ret = flb_tail_file_reset_on_truncate(file, size_delta, caller); + if (ret == -1) { + return -1; + } + } + + if (file->offset < st.st_size) { + file->pending_bytes = (st.st_size - file->offset); + if (pending_data_detected != NULL) { + *pending_data_detected = FLB_TRUE; + } + } + else { + file->pending_bytes = 0; + } + + if (st.st_nlink == 0) { + if (file->pending_bytes > 0) { + return 0; + } + + if (file->buf_len > 0) { + return 0; + } + + flb_plg_debug(ctx->ins, "inode=%"PRIu64" file has been deleted: %s", + file->inode, file->name); + +#ifdef FLB_HAVE_SQLDB + if (ctx->db) { + flb_tail_db_file_delete(file, ctx); + } +#endif + flb_tail_file_remove(file); + return -1; + } + + ret = flb_tail_file_is_rotated(ctx, file); + if (ret == FLB_TRUE) { + ret = flb_tail_file_rotated(file); + if (ret == -1) { + return -1; + } + + ret = flb_tail_fs_remove(ctx, file); + if (ret == -1) { + return -1; + } + } + else if (ret == -1) { + return -1; + } + + if (file->rotated != 0 && file->watch_fd == -1) { + ret = flb_tail_fs_add_rotated(file); + if (ret == -1) { + return -1; + } + } + + return 0; +} + +static int reconcile_all_files(struct flb_tail_config *ctx) +{ + int ret; + int pending; + int pending_data_detected; + struct mk_list *head; + struct mk_list *tmp; + struct flb_tail_file *file; + + pending_data_detected = FLB_FALSE; + + mk_list_foreach_safe(head, tmp, &ctx->files_event) { + file = mk_list_entry(head, struct flb_tail_file, _head); + ret = reconcile_file_state(ctx, file, "inotify_reconcile", &pending); + if (ret == -1) { + continue; + } + + if (pending == FLB_TRUE) { + pending_data_detected = FLB_TRUE; + } + } + + if (pending_data_detected == FLB_TRUE) { + tail_signal_pending(ctx); + } + + return pending_data_detected; +} + static int tail_fs_event(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { int ret; - int64_t offset; struct mk_list *head; struct mk_list *tmp; struct flb_tail_config *ctx = in_context; struct flb_tail_file *file = NULL; struct inotify_event ev; - struct stat st; + + (void) ins; /* Read the event */ ret = read(ctx->fd_notify, &ev, sizeof(struct inotify_event)); @@ -181,6 +303,15 @@ static int tail_fs_event(struct flb_input_instance *ins, return -1; } + if (ev.mask & IN_Q_OVERFLOW) { + debug_event_mask(ctx, NULL, ev.mask); + flb_plg_warn(ctx->ins, "inotify event queue overflow, reconciling files"); + + reconcile_all_files(ctx); + flb_tail_scan(ctx->path_list, ctx); + return 0; + } + /* Lookup watched file */ mk_list_foreach_safe(head, tmp, &ctx->files_event) { file = mk_list_entry(head, struct flb_tail_file, _head); @@ -204,79 +335,16 @@ static int tail_fs_event(struct flb_input_instance *ins, return -1; } - /* Check file rotation (only if it has not been rotated before) */ - if (ev.mask & IN_MOVE_SELF && file->rotated == 0) { - flb_plg_debug(ins, "inode=%"PRIu64" rotated IN_MOVE SELF '%s'", - file->inode, file->name); - - /* A rotated file must be re-registered */ - flb_tail_file_rotated(file); - flb_tail_fs_remove(ctx, file); - flb_tail_fs_add_rotated(file); - } - - ret = fstat(file->fd, &st); + /* + * IN_MOVE_SELF rotation is handled inside reconcile_file_state via + * flb_tail_file_is_rotated, which detects the inode/name divergence and + * re-registers the watch through the same retry path. + */ + ret = reconcile_file_state(ctx, file, "tail_fs_event", NULL); if (ret == -1) { - flb_plg_debug(ins, "inode=%"PRIu64" error stat(2) %s, removing", - file->inode, file->name); - flb_tail_file_remove(file); return 0; } - /* Check if the file was truncated */ - int64_t size_delta = st.st_size - file->size; - if (size_delta != 0) { - file->size = st.st_size; - } - - file->pending_bytes = (st.st_size > file->offset) ? (st.st_size - file->offset) : 0; - - /* File was removed ? */ - if (ev.mask & IN_ATTRIB) { - /* Check if the file have been deleted */ - if (st.st_nlink == 0) { - flb_plg_debug(ins, "inode=%"PRIu64" file has been deleted: %s", - file->inode, file->name); - -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - /* Remove file entry from the database */ - flb_tail_db_file_delete(file, ctx); - } -#endif - /* Remove file from the monitored list */ - flb_tail_file_remove(file); - return 0; - } - } - - if (ev.mask & IN_MODIFY) { - /* - * The file was modified, check how many new bytes do - * we have. - */ - - if (size_delta < 0) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return -1; - } - - flb_plg_debug(ctx->ins, "tail_fs_event: inode=%"PRIu64" file truncated %s (diff: %"PRId64" bytes)", - file->inode, file->name, size_delta); - file->offset = offset; - file->buf_len = 0; - - /* Update offset in the database file */ -#ifdef FLB_HAVE_SQLDB - if (ctx->db) { - flb_tail_db_file_offset(file, ctx); - } -#endif - } - } - /* Collect the data */ ret = in_tail_collect_event(file, config); if (ret != FLB_TAIL_ERROR) { @@ -307,44 +375,32 @@ static int in_tail_progress_check_callback(struct flb_input_instance *ins, struct flb_config *config, void *context) { int ret = 0; + int pending; struct mk_list *tmp; struct mk_list *head; struct flb_tail_config *ctx = context; struct flb_tail_file *file; int pending_data_detected; - struct stat st; (void) config; + (void) ins; pending_data_detected = FLB_FALSE; mk_list_foreach_safe(head, tmp, &ctx->files_event) { file = mk_list_entry(head, struct flb_tail_file, _head); - - if (file->offset < file->size) { - pending_data_detected = FLB_TRUE; - - continue; - } - - ret = fstat(file->fd, &st); + ret = reconcile_file_state(ctx, file, "in_tail_progress_check", &pending); if (ret == -1) { - flb_errno(); - flb_plg_error(ins, "fstat error"); - continue; } - if (file->offset < st.st_size) { - file->size = st.st_size; - file->pending_bytes = (file->size - file->offset); - + if (pending == FLB_TRUE) { pending_data_detected = FLB_TRUE; } } if (pending_data_detected) { - tail_signal_pending(ctx); + tail_signal_pending(ctx); } return 0;