diff --git a/modules/event_flatstore/doc/event_flatstore_admin.xml b/modules/event_flatstore/doc/event_flatstore_admin.xml index 17f545568c9..90d237ecd07 100644 --- a/modules/event_flatstore/doc/event_flatstore_admin.xml +++ b/modules/event_flatstore/doc/event_flatstore_admin.xml @@ -264,6 +264,25 @@ modparam("event_flatstore", "suffix", "$time(%Y)") +
+ <varname>header</varname> (string) + + If set, the string is written as the very first line of + every new log file created by the module. + Useful for column names. + + + Default value is "" (disabled). + + + Set <varname>header</varname> parameter + +... +modparam("event_flatstore", "header", "time,event,param1,param2") +... + + +
Exported Functions diff --git a/modules/event_flatstore/event_flatstore.c b/modules/event_flatstore/event_flatstore.c index e58fcc1b799..2394267eecd 100644 --- a/modules/event_flatstore/event_flatstore.c +++ b/modules/event_flatstore/event_flatstore.c @@ -31,6 +31,7 @@ #include #include #include +#include #include "event_flatstore.h" #include "../../mem/mem.h" @@ -58,9 +59,11 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx); static void event_flatstore_timer(unsigned int ticks, void *param); +static int write_header(struct flat_file *file, int fd); static int *opened_fds; -static int *rotate_version; +static int *fd_version; +static time_t *last_rotate_ts; static int buff_convert_len; static int cap_params; @@ -83,10 +86,20 @@ static unsigned long file_rotate_size; static str file_suffix; static pv_elem_p file_suffix_format; static str escape_delimiter = {0, 0}; +static str file_header; -static void raise_rotation_event(struct flat_file *file, const char *reason); +static inline int file_exists(const char *path) +{ + struct stat st; + return (path && stat(path, &st) == 0 && S_ISREG(st.st_mode)); +} + +static void raise_ticket_event(struct rotation_ticket *t, struct flat_file *f); +static void try_complete_ticket(struct flat_file *f, unsigned int old_ver); +static inline void enqueue_ticket(struct flat_file *f, struct rotation_ticket *t); static void update_counters_and_rotate(struct flat_file *file, ssize_t bytes_inc); +static void queue_ticket(struct flat_file *f, struct rotation_ticket *t); static char *ensure_file_path(struct flat_file *file, int reset); static void event_flatstore_rotate(int sender, void *param); @@ -116,6 +129,7 @@ static const param_export_t mod_params[] = { {"rotate_size", INT_PARAM|STR_PARAM|USE_FUNC_PARAM, (void*)rotate_size_param}, {"suffix", STR_PARAM, &file_suffix.s}, {"escape_delimiter", STR_PARAM, &escape_delimiter.s}, + {"header", STR_PARAM, &file_header.s}, {0,0,0} }; @@ -215,13 +229,20 @@ static int mod_init(void) { int i; opened_fds = NULL; - rotate_version = NULL; + fd_version = NULL; buff = NULL; buff_convert_len = 0; io_param = NULL; cap_params = 20; dirc = NULL; + last_rotate_ts = shm_malloc(sizeof(time_t)); + if (!last_rotate_ts) { + LM_ERR("oom!\n"); + return -1; + } + *last_rotate_ts = time(NULL); + LM_NOTICE("initializing module ...\n"); if (file_rotate_period && file_rotate_period < 0) { @@ -283,6 +304,9 @@ static int mod_init(void) { LM_DBG("The delimiter for separating columns in files was set at %.*s\n", delimiter.len, delimiter.s); } + if (file_header.s) + file_header.len = strlen(file_header.s); + if (escape_delimiter.s) { escape_delimiter.len = strlen(escape_delimiter.s); if (escape_delimiter.len != delimiter.len) { @@ -330,13 +354,13 @@ static int mod_init(void) { LM_ERR("oom\n"); return -1; } - rotate_version = pkg_malloc(initial_capacity * sizeof(int)); - if (!rotate_version) { + fd_version = pkg_malloc(initial_capacity * sizeof(int)); + if (!fd_version) { LM_ERR("oom\n"); return -1; } - memset(rotate_version, 0, initial_capacity * sizeof(int)); + memset(fd_version, 0, initial_capacity * sizeof(int)); for(i = 0; i < initial_capacity; i++) opened_fds[i] = -1; @@ -409,8 +433,10 @@ static void destroy(void){ if (opened_fds) pkg_free(opened_fds); - if (rotate_version) - pkg_free(rotate_version); + if (fd_version) + pkg_free(fd_version); + if (last_rotate_ts) + shm_free(last_rotate_ts); } /* it does not do nothing */ @@ -439,10 +465,18 @@ static struct flat_file *search_for_fd(str value){ return NULL; } +static void free_ticket(struct rotation_ticket *t) +{ + if (!t) return; + if (t->old_name) shm_free(t->old_name); + shm_free(t); +} + mi_response_t *mi_rotate(const mi_params_t *params, struct mi_handler *async_hdl) { str path; + struct rotation_ticket *t = NULL; if (get_mi_string_param(params, "path_to_file", &path.s, &path.len) < 0) return init_mi_param_error(); @@ -462,16 +496,34 @@ mi_response_t *mi_rotate(const mi_params_t *params, } LM_DBG("Found file descriptor and updating rotating version for %s, to %d\n", - found_fd->path.s, found_fd->rotate_version + 1); + found_fd->path.s, found_fd->current_version + 1); + + if (file_exists(found_fd->pathname)) { + t = shm_malloc(sizeof *t); + if (t) { + t->ver = found_fd->current_version; + t->open_left = found_fd->counter_open; + t->reason = ROTATE_REASON_MI; + t->old_name = shm_strdup(found_fd->pathname); + t->next = NULL; + } + } else { + LM_DBG("skip ticket: file %s does not exist\n", found_fd->pathname); + } - found_fd->rotate_version++; + found_fd->current_version++; found_fd->record_count = 0; found_fd->bytes_written = 0; + found_fd->header_written = 0; ensure_file_path(found_fd, 1); + /* postpone next periodic rotation */ + *last_rotate_ts = time(NULL); + + if (t) queue_ticket(found_fd, t); + lock_release(global_lock); - raise_rotation_event(found_fd, ROTATE_REASON_MI); ipc_send_rpc_all(event_flatstore_rotate, 0); return init_mi_result_ok(); @@ -503,6 +555,7 @@ static int insert_in_list(struct flat_file *entry) { *list_files = entry; entry->prev = NULL; entry->next = NULL; + entry->header_written = 0; return 0; } @@ -512,6 +565,7 @@ static int insert_in_list(struct flat_file *entry) { entry->file_index_process = head->file_index_process + 1; entry->prev = NULL; entry->next = head; + entry->header_written = 0; head->prev = entry; *list_files = entry; return 0; @@ -524,6 +578,7 @@ static int insert_in_list(struct flat_file *entry) { entry->file_index_process = expected; entry->prev = aux->prev; entry->next = aux; + entry->header_written = 0; aux->prev =entry; entry->prev->next = entry; return 0; @@ -537,6 +592,7 @@ static int insert_in_list(struct flat_file *entry) { entry->file_index_process = expected; entry->prev = parent; entry->next = NULL; + entry->header_written = 0; parent->next = entry; return 0; } @@ -744,6 +800,109 @@ static char *ensure_file_path(struct flat_file *file, int reset) return file->pathname; } +static int write_header(struct flat_file *file, int fd) +{ + struct stat st; + ssize_t wr; + + if (!file || fd < 0) + return -1; + + if (!file_header.s || file->header_written) + return 0; + + if (fstat(fd, &st) != 0) { + LM_ERR("header: fstat failed for %s (%d)\n", file->pathname, errno); + return -1; + } + + if (st.st_size != 0) { // File already has content; assume header is present + file->header_written = 1; + return 0; + } + + /* write header */ + size_t off = 0; + while (off < (size_t)file_header.len) { + do { wr = write(fd, file_header.s + off, file_header.len - off); } while (wr < 0 && errno == EINTR); + if (wr <= 0) { + LM_ERR("failed to write header to %s\n", file->pathname); + return -1; + } + off += (size_t)wr; + } + do { wr = write(fd, "\n", 1); } while (wr < 0 && errno == EINTR); + if (wr != 1) { + LM_ERR("failed to write header newline to %s\n", file->pathname); + return -1; + } + + file->header_written = 1; + return 0; +} + +static inline void enqueue_ticket(struct flat_file *f, struct rotation_ticket *t) +{ + if (!t) return; + t->next = NULL; + if (f->pending == NULL) { + f->pending = t; + } else { + struct rotation_ticket *it = f->pending; + while (it->next) it = it->next; + it->next = t; + } +} + +static void queue_ticket(struct flat_file *f, struct rotation_ticket *t) +{ + if (!f || !t) return; + + /* Nobody has this file open, emit the event */ + if (f->counter_open == 0) { + lock_release(global_lock); + raise_ticket_event(t, f); + free_ticket(t); + lock_get(global_lock); + return; + } + + /* Keep at most one pending ticket per file */ + if (f->pending) { + struct rotation_ticket *stale = f->pending; + + LM_WARN("forcing completion of stale flatstore rotation ticket " + "for %s (ver=%u, open_left=%u, counter_open=%u)\n", + f->path.s, stale->ver, stale->open_left, f->counter_open); + + f->pending = stale->next; + + lock_release(global_lock); + raise_ticket_event(stale, f); + free_ticket(stale); + lock_get(global_lock); + } + + /* Remember the new ticket for this version */ + enqueue_ticket(f, t); +} + +static void try_complete_ticket(struct flat_file *f, unsigned int old_ver) +{ + if (!f->pending || f->pending->ver != old_ver) + return; + + if (f->pending->open_left) + f->pending->open_left--; + + if (f->pending->open_left == 0) { + raise_ticket_event(f->pending, f); + struct rotation_ticket *done = f->pending; + f->pending = f->pending->next; + free_ticket(done); + } +} + /* check if the local 'version' of the file descriptor asociated with entry fs is different from the global version, if it is different reopen the file */ @@ -773,34 +932,46 @@ static void rotating(struct flat_file *file){ lock_release(global_lock); return; } - rotate_version[index] = file->rotate_version; + fd_version[index] = file->current_version; file->counter_open++; - LM_DBG("File %s is opened %d time\n", file->pathname, file->counter_open); + + if (write_header(file, opened_fds[index]) < 0) { + LM_ERR("rotating/open: could not write header in %s\n", file->pathname); + } lock_release(global_lock); return; + } else if (fd_version[index] != file->current_version) { + /* fd for older version is opened here */ - } else if (rotate_version[index] != file->rotate_version) { - /* fd is opened in this process */ - - /* update version */ - rotate_version[index] = file->rotate_version; - lock_release(global_lock); + unsigned int old_ver = fd_version[index]; /* rotate */ rc = close(opened_fds[index]); + opened_fds[index] = -1; if(rc < 0){ LM_ERR("Closing socket error\n"); + lock_release(global_lock); return; } opened_fds[index] = open(file->pathname, O_RDWR | O_APPEND | O_CREAT, file_permissions_oct); if (opened_fds[index] < 0) { LM_ERR("Opening socket error\n"); + lock_release(global_lock); return; } - LM_DBG("Rotating file %s\n",file->path.s); + fd_version[index] = file->current_version; + + if (write_header(file, opened_fds[index]) < 0) { + LM_ERR("rotating/switch: could not write header in %s\n", file->pathname); + } + + try_complete_ticket(file, old_ver); + lock_release(global_lock); + + LM_DBG("Rotating file %s\n",file->path.s); } else lock_release(global_lock); } @@ -813,7 +984,6 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, struct flat_socket *entry = (struct flat_socket*)(sock ? sock->params: NULL); char endline = '\n'; char *ptr_buff; - int nr_params = 0; int f_idx; int i; @@ -846,7 +1016,6 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, for (param = params->first; param; param = param->next) { if (param->flags & EVI_INT_VAL) required_length += INT2STR_MAX_LEN; - nr_params++; } if (buff == NULL || required_length > buff_convert_len) { @@ -885,14 +1054,8 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, offset_buff += len; idx++; } else if ((param->flags & EVI_STR_VAL) && param->val.s.len && param->val.s.s) { - for(i=0; ival.s.len; i++) { - if (param->val.s.s[i] == delimiter.s[0]) { - param->val.s.s[i] = ' '; - } - } - /* if escape_delimiter is configured, replace any in-value - * occurrences of the delimiter with the escape sequence */ + * occurrences of the delimiter with the escape sequence */ if (escape_delimiter.s) { if (delimiter.len == 1) { /* fast single-char */ for (i = 0; i < param->val.s.len; i++) @@ -931,9 +1094,6 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, nwritten = writev(opened_fds[f_idx], io_param, idx); } while (nwritten < 0 && errno == EINTR); - if (ev_name && ev_name->s) - LM_DBG("raised event: %.*s has %d parameters\n", ev_name->len, ev_name->s, nr_params); - if (nwritten < 0){ LM_ERR("cannot write to socket\n"); return -1; @@ -969,8 +1129,9 @@ static void flat_free(evi_reply_sock *sock) { } else { for (it = *list_sockets; it->next && fs != it->next; it = it->next) ; if (it->next) { - it->next = it->next->next; - shm_free(it->next); + struct flat_socket *del = it->next; + it->next = del->next; + shm_free(del); } } @@ -992,7 +1153,7 @@ static void flat_free(evi_reply_sock *sock) { lock_release(global_lock); /* check if we can close the file and actually delete it */ - verify_delete(); + verify_delete(); } static str flat_print(evi_reply_sock *sock){ @@ -1009,11 +1170,19 @@ void event_flatstore_rotate(int sender, void *param) lock_get(global_lock); for (file = *list_files; file; file = file->next) { index = file->file_index_process; - if (opened_fds[index] != -1 && rotate_version[index] != file->rotate_version) { + if (opened_fds[index] != -1 && fd_version[index] != file->current_version) { + LM_DBG("ipc_rotate: closing fd for %s (fd_ver=%d, cur_ver=%u, " + "counter_open=%u->%u, pending=%s)\n", + file->path.s, fd_version[index], file->current_version, + file->counter_open, + file->counter_open > 0 ? file->counter_open - 1 : 0, + file->pending ? "yes" : "no"); close(opened_fds[index]); opened_fds[index] = -1; /* open it next time we have an event */ if (file->counter_open > 0) file->counter_open--; + + try_complete_ticket(file, fd_version[index]); } } lock_release(global_lock); @@ -1023,20 +1192,64 @@ static void event_flatstore_timer(unsigned int ticks, void *param) { struct flat_file* file; - /* we only run when ticks is multiple of file_rotate_period */ - if (time(NULL) % file_rotate_period != 0) + time_t now = time(NULL); + /* run only if enough time passed since last rotation */ + if ((now - *last_rotate_ts) < file_rotate_period) return; lock_get(global_lock); for (file = *list_files; file; file = file->next) { - file->rotate_version++; + struct rotation_ticket *t = NULL; + if (file_exists(file->pathname)) { + t = shm_malloc(sizeof *t); + if (t) { + t->ver = file->current_version; + t->open_left = file->counter_open; + t->reason = ROTATE_REASON_PERIOD; + t->old_name = shm_strdup(file->pathname); + t->next = NULL; + } + } else { + LM_DBG("skip ticket: file %s does not exist\n", file->pathname); + } + + /* bump version, build NEW path */ + file->current_version++; file->record_count = 0; file->bytes_written = 0; + file->header_written = 0; ensure_file_path(file, 1); - raise_rotation_event(file, ROTATE_REASON_PERIOD); - LM_DBG("File %s is being rotated at %u - new file is %s\n", - file->path.s, ticks, file->pathname); + + LM_DBG("File %s is being rotated at %u - new file path is %s\n", + file->path.s, ticks, file->pathname); + + if (t) { + queue_ticket(file, t); + } else if (file->pending) { + /* No new file to create a ticket for, but a previous + * ticket is stuck (workers never closed their FDs + * because calls stopped). Force-complete it now. */ + struct rotation_ticket *stale = file->pending; + + LM_DBG("forcing completion of stuck pending ticket " + "for %s: old_name=%s ver=%u open_left=%u " + "counter_open=%u\n", + file->path.s, + stale->old_name ? stale->old_name : "(null)", + stale->ver, stale->open_left, + file->counter_open); + + file->pending = stale->next; + + lock_release(global_lock); + raise_ticket_event(stale, file); + free_ticket(stale); + lock_get(global_lock); + } } + + *last_rotate_ts = now; + /* inform everyone they need to rotate */ lock_release(global_lock); ipc_send_rpc_all(event_flatstore_rotate, 0); @@ -1062,6 +1275,10 @@ static void verify_delete(void) { close(opened_fds[del_it->file->file_index_process]); if (del_it->file->counter_open > 0) del_it->file->counter_open--; + + try_complete_ticket(del_it->file, + fd_version[del_it->file->file_index_process]); + opened_fds[del_it->file->file_index_process] = -1; } @@ -1125,8 +1342,10 @@ static str fn_name = str_init("filename"); static str oldfn_name = str_init("old_filename"); -static void raise_rotation_event(struct flat_file *file, const char *reason) +static void raise_ticket_event(struct rotation_ticket *t, struct flat_file *file) { + LM_DBG("RAISE %s -> %s (reason=%s)\n", t->old_name ? t->old_name : "-", file->pathname, t->reason); + if (flatstore_evi_id == EVI_ERROR || !evi_probe_event(flatstore_evi_id)) return; @@ -1140,15 +1359,9 @@ static void raise_rotation_event(struct flat_file *file, const char *reason) char ts_buf[32]; str ts_val = { ts_buf, 0 }; - str fn_val = { file->pathname, strlen(file->pathname) }; - - if (!file->old_pathname) - file->old_pathname = (char *)flat_empty_str; - - str oldfn_val = { file->old_pathname, strlen(file->old_pathname) }; - - - str reason_val = { (char*)reason, strlen(reason) }; + str fn_val = { (char*)file->pathname, strlen(file->pathname) }; + str oldfn_val = { t->old_name, strlen(t->old_name) }; + str reason_val = { (char*)t->reason, strlen(t->reason) }; ts_val.len = snprintf(ts_buf, sizeof(ts_buf), "%ld", (long)time(NULL)); @@ -1164,6 +1377,9 @@ static void raise_rotation_event(struct flat_file *file, const char *reason) if (evi_raise_event(flatstore_evi_id, list)) LM_ERR("unable to send flatstore rotation event\n"); + /* postpone next periodic rotation */ + *last_rotate_ts = time(NULL); + /* clean old_pathname */ if (file->old_pathname && file->old_pathname != file->path.s && file->old_pathname != flat_empty_str) { shm_free(file->old_pathname); @@ -1188,16 +1404,34 @@ static void update_counters_and_rotate(struct flat_file *file, ssize_t bytes_inc hit_sz = 1; if (hit_cnt || hit_sz) { - file->rotate_version++; + struct rotation_ticket *t = NULL; + if (file_exists(file->pathname)) { + t = shm_malloc(sizeof *t); + if (t) { + t->ver = file->current_version; + t->open_left = file->counter_open; + t->reason = hit_cnt ? ROTATE_REASON_COUNT : ROTATE_REASON_SIZE; + t->old_name = shm_strdup(file->pathname); + t->next = NULL; + } + } else { + LM_DBG("skip ticket: file %s does not exist\n", file->pathname); + } + + file->current_version++; file->record_count = 0; file->bytes_written = 0; + file->header_written = 0; ensure_file_path(file, 1); + + /* postpone next periodic rotation */ + *last_rotate_ts = time(NULL); + + if (t) queue_ticket(file, t); } lock_release(global_lock); if (hit_cnt || hit_sz) { - raise_rotation_event(file, - hit_cnt ? ROTATE_REASON_COUNT : ROTATE_REASON_SIZE); ipc_send_rpc_all(event_flatstore_rotate, 0); } } diff --git a/modules/event_flatstore/event_flatstore.h b/modules/event_flatstore/event_flatstore.h index 303c807ec91..e41192b1edb 100644 --- a/modules/event_flatstore/event_flatstore.h +++ b/modules/event_flatstore/event_flatstore.h @@ -39,6 +39,14 @@ #define ROTATE_REASON_PERIOD "period" #define ROTATE_REASON_MI "mi" +struct rotation_ticket { + unsigned int ver; + unsigned int open_left; + const char *reason; + char *old_name; + struct rotation_ticket *next; +}; + struct flat_file { str path; // original path from module config (may include $var(...) placeholders) char *pathname; // resolved absolute path after variable expansion @@ -47,8 +55,10 @@ struct flat_file { unsigned long bytes_written; unsigned int file_index_process; unsigned int counter_open; - unsigned int rotate_version; + unsigned int current_version; + struct rotation_ticket *pending; unsigned int flat_socket_ref; + int header_written; struct flat_file *next; struct flat_file *prev; };