diff --git a/modules/event_flatstore/doc/event_flatstore_admin.xml b/modules/event_flatstore/doc/event_flatstore_admin.xml
index 17f545568c9..90d237ecd07 100644
--- a/modules/event_flatstore/doc/event_flatstore_admin.xml
+++ b/modules/event_flatstore/doc/event_flatstore_admin.xml
@@ -264,6 +264,25 @@ modparam("event_flatstore", "suffix", "$time(%Y)")
+
Exported Functions
diff --git a/modules/event_flatstore/event_flatstore.c b/modules/event_flatstore/event_flatstore.c
index e58fcc1b799..2394267eecd 100644
--- a/modules/event_flatstore/event_flatstore.c
+++ b/modules/event_flatstore/event_flatstore.c
@@ -31,6 +31,7 @@
#include
#include
#include
+#include
#include "event_flatstore.h"
#include "../../mem/mem.h"
@@ -58,9 +59,11 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx);
static void event_flatstore_timer(unsigned int ticks, void *param);
+static int write_header(struct flat_file *file, int fd);
static int *opened_fds;
-static int *rotate_version;
+static int *fd_version;
+static time_t *last_rotate_ts;
static int buff_convert_len;
static int cap_params;
@@ -83,10 +86,20 @@ static unsigned long file_rotate_size;
static str file_suffix;
static pv_elem_p file_suffix_format;
static str escape_delimiter = {0, 0};
+static str file_header;
-static void raise_rotation_event(struct flat_file *file, const char *reason);
+static inline int file_exists(const char *path)
+{
+ struct stat st;
+ return (path && stat(path, &st) == 0 && S_ISREG(st.st_mode));
+}
+
+static void raise_ticket_event(struct rotation_ticket *t, struct flat_file *f);
+static void try_complete_ticket(struct flat_file *f, unsigned int old_ver);
+static inline void enqueue_ticket(struct flat_file *f, struct rotation_ticket *t);
static void update_counters_and_rotate(struct flat_file *file,
ssize_t bytes_inc);
+static void queue_ticket(struct flat_file *f, struct rotation_ticket *t);
static char *ensure_file_path(struct flat_file *file, int reset);
static void event_flatstore_rotate(int sender, void *param);
@@ -116,6 +129,7 @@ static const param_export_t mod_params[] = {
{"rotate_size", INT_PARAM|STR_PARAM|USE_FUNC_PARAM, (void*)rotate_size_param},
{"suffix", STR_PARAM, &file_suffix.s},
{"escape_delimiter", STR_PARAM, &escape_delimiter.s},
+ {"header", STR_PARAM, &file_header.s},
{0,0,0}
};
@@ -215,13 +229,20 @@ static int mod_init(void) {
int i;
opened_fds = NULL;
- rotate_version = NULL;
+ fd_version = NULL;
buff = NULL;
buff_convert_len = 0;
io_param = NULL;
cap_params = 20;
dirc = NULL;
+ last_rotate_ts = shm_malloc(sizeof(time_t));
+ if (!last_rotate_ts) {
+ LM_ERR("oom!\n");
+ return -1;
+ }
+ *last_rotate_ts = time(NULL);
+
LM_NOTICE("initializing module ...\n");
if (file_rotate_period && file_rotate_period < 0) {
@@ -283,6 +304,9 @@ static int mod_init(void) {
LM_DBG("The delimiter for separating columns in files was set at %.*s\n", delimiter.len, delimiter.s);
}
+ if (file_header.s)
+ file_header.len = strlen(file_header.s);
+
if (escape_delimiter.s) {
escape_delimiter.len = strlen(escape_delimiter.s);
if (escape_delimiter.len != delimiter.len) {
@@ -330,13 +354,13 @@ static int mod_init(void) {
LM_ERR("oom\n");
return -1;
}
- rotate_version = pkg_malloc(initial_capacity * sizeof(int));
- if (!rotate_version) {
+ fd_version = pkg_malloc(initial_capacity * sizeof(int));
+ if (!fd_version) {
LM_ERR("oom\n");
return -1;
}
- memset(rotate_version, 0, initial_capacity * sizeof(int));
+ memset(fd_version, 0, initial_capacity * sizeof(int));
for(i = 0; i < initial_capacity; i++)
opened_fds[i] = -1;
@@ -409,8 +433,10 @@ static void destroy(void){
if (opened_fds)
pkg_free(opened_fds);
- if (rotate_version)
- pkg_free(rotate_version);
+ if (fd_version)
+ pkg_free(fd_version);
+ if (last_rotate_ts)
+ shm_free(last_rotate_ts);
}
/* it does not do nothing */
@@ -439,10 +465,18 @@ static struct flat_file *search_for_fd(str value){
return NULL;
}
+static void free_ticket(struct rotation_ticket *t)
+{
+ if (!t) return;
+ if (t->old_name) shm_free(t->old_name);
+ shm_free(t);
+}
+
mi_response_t *mi_rotate(const mi_params_t *params,
struct mi_handler *async_hdl)
{
str path;
+ struct rotation_ticket *t = NULL;
if (get_mi_string_param(params, "path_to_file", &path.s, &path.len) < 0)
return init_mi_param_error();
@@ -462,16 +496,34 @@ mi_response_t *mi_rotate(const mi_params_t *params,
}
LM_DBG("Found file descriptor and updating rotating version for %s, to %d\n",
- found_fd->path.s, found_fd->rotate_version + 1);
+ found_fd->path.s, found_fd->current_version + 1);
+
+ if (file_exists(found_fd->pathname)) {
+ t = shm_malloc(sizeof *t);
+ if (t) {
+ t->ver = found_fd->current_version;
+ t->open_left = found_fd->counter_open;
+ t->reason = ROTATE_REASON_MI;
+ t->old_name = shm_strdup(found_fd->pathname);
+ t->next = NULL;
+ }
+ } else {
+ LM_DBG("skip ticket: file %s does not exist\n", found_fd->pathname);
+ }
- found_fd->rotate_version++;
+ found_fd->current_version++;
found_fd->record_count = 0;
found_fd->bytes_written = 0;
+ found_fd->header_written = 0;
ensure_file_path(found_fd, 1);
+ /* postpone next periodic rotation */
+ *last_rotate_ts = time(NULL);
+
+ if (t) queue_ticket(found_fd, t);
+
lock_release(global_lock);
- raise_rotation_event(found_fd, ROTATE_REASON_MI);
ipc_send_rpc_all(event_flatstore_rotate, 0);
return init_mi_result_ok();
@@ -503,6 +555,7 @@ static int insert_in_list(struct flat_file *entry) {
*list_files = entry;
entry->prev = NULL;
entry->next = NULL;
+ entry->header_written = 0;
return 0;
}
@@ -512,6 +565,7 @@ static int insert_in_list(struct flat_file *entry) {
entry->file_index_process = head->file_index_process + 1;
entry->prev = NULL;
entry->next = head;
+ entry->header_written = 0;
head->prev = entry;
*list_files = entry;
return 0;
@@ -524,6 +578,7 @@ static int insert_in_list(struct flat_file *entry) {
entry->file_index_process = expected;
entry->prev = aux->prev;
entry->next = aux;
+ entry->header_written = 0;
aux->prev =entry;
entry->prev->next = entry;
return 0;
@@ -537,6 +592,7 @@ static int insert_in_list(struct flat_file *entry) {
entry->file_index_process = expected;
entry->prev = parent;
entry->next = NULL;
+ entry->header_written = 0;
parent->next = entry;
return 0;
}
@@ -744,6 +800,109 @@ static char *ensure_file_path(struct flat_file *file, int reset)
return file->pathname;
}
+static int write_header(struct flat_file *file, int fd)
+{
+ struct stat st;
+ ssize_t wr;
+
+ if (!file || fd < 0)
+ return -1;
+
+ if (!file_header.s || file->header_written)
+ return 0;
+
+ if (fstat(fd, &st) != 0) {
+ LM_ERR("header: fstat failed for %s (%d)\n", file->pathname, errno);
+ return -1;
+ }
+
+ if (st.st_size != 0) { // File already has content; assume header is present
+ file->header_written = 1;
+ return 0;
+ }
+
+ /* write header */
+ size_t off = 0;
+ while (off < (size_t)file_header.len) {
+ do { wr = write(fd, file_header.s + off, file_header.len - off); } while (wr < 0 && errno == EINTR);
+ if (wr <= 0) {
+ LM_ERR("failed to write header to %s\n", file->pathname);
+ return -1;
+ }
+ off += (size_t)wr;
+ }
+ do { wr = write(fd, "\n", 1); } while (wr < 0 && errno == EINTR);
+ if (wr != 1) {
+ LM_ERR("failed to write header newline to %s\n", file->pathname);
+ return -1;
+ }
+
+ file->header_written = 1;
+ return 0;
+}
+
+static inline void enqueue_ticket(struct flat_file *f, struct rotation_ticket *t)
+{
+ if (!t) return;
+ t->next = NULL;
+ if (f->pending == NULL) {
+ f->pending = t;
+ } else {
+ struct rotation_ticket *it = f->pending;
+ while (it->next) it = it->next;
+ it->next = t;
+ }
+}
+
+static void queue_ticket(struct flat_file *f, struct rotation_ticket *t)
+{
+ if (!f || !t) return;
+
+ /* Nobody has this file open, emit the event */
+ if (f->counter_open == 0) {
+ lock_release(global_lock);
+ raise_ticket_event(t, f);
+ free_ticket(t);
+ lock_get(global_lock);
+ return;
+ }
+
+ /* Keep at most one pending ticket per file */
+ if (f->pending) {
+ struct rotation_ticket *stale = f->pending;
+
+ LM_WARN("forcing completion of stale flatstore rotation ticket "
+ "for %s (ver=%u, open_left=%u, counter_open=%u)\n",
+ f->path.s, stale->ver, stale->open_left, f->counter_open);
+
+ f->pending = stale->next;
+
+ lock_release(global_lock);
+ raise_ticket_event(stale, f);
+ free_ticket(stale);
+ lock_get(global_lock);
+ }
+
+ /* Remember the new ticket for this version */
+ enqueue_ticket(f, t);
+}
+
+static void try_complete_ticket(struct flat_file *f, unsigned int old_ver)
+{
+ if (!f->pending || f->pending->ver != old_ver)
+ return;
+
+ if (f->pending->open_left)
+ f->pending->open_left--;
+
+ if (f->pending->open_left == 0) {
+ raise_ticket_event(f->pending, f);
+ struct rotation_ticket *done = f->pending;
+ f->pending = f->pending->next;
+ free_ticket(done);
+ }
+}
+
/* check if the local 'version' of the file descriptor asociated with entry fs
is different from the global version, if it is different reopen the file
*/
@@ -773,34 +932,46 @@ static void rotating(struct flat_file *file){
lock_release(global_lock);
return;
}
- rotate_version[index] = file->rotate_version;
+ fd_version[index] = file->current_version;
file->counter_open++;
- LM_DBG("File %s is opened %d time\n", file->pathname, file->counter_open);
+
+ if (write_header(file, opened_fds[index]) < 0) {
+ LM_ERR("rotating/open: could not write header in %s\n", file->pathname);
+ }
lock_release(global_lock);
return;
+ } else if (fd_version[index] != file->current_version) {
+ /* fd for older version is opened here */
- } else if (rotate_version[index] != file->rotate_version) {
- /* fd is opened in this process */
-
- /* update version */
- rotate_version[index] = file->rotate_version;
- lock_release(global_lock);
+ unsigned int old_ver = fd_version[index];
/* rotate */
rc = close(opened_fds[index]);
+ opened_fds[index] = -1;
if(rc < 0){
LM_ERR("Closing socket error\n");
+ lock_release(global_lock);
return;
}
opened_fds[index] = open(file->pathname, O_RDWR | O_APPEND | O_CREAT, file_permissions_oct);
if (opened_fds[index] < 0) {
LM_ERR("Opening socket error\n");
+ lock_release(global_lock);
return;
}
- LM_DBG("Rotating file %s\n",file->path.s);
+ fd_version[index] = file->current_version;
+
+ if (write_header(file, opened_fds[index]) < 0) {
+ LM_ERR("rotating/switch: could not write header in %s\n", file->pathname);
+ }
+
+ try_complete_ticket(file, old_ver);
+ lock_release(global_lock);
+
+ LM_DBG("Rotating file %s\n",file->path.s);
} else
lock_release(global_lock);
}
@@ -813,7 +984,6 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
struct flat_socket *entry = (struct flat_socket*)(sock ? sock->params: NULL);
char endline = '\n';
char *ptr_buff;
- int nr_params = 0;
int f_idx;
int i;
@@ -846,7 +1016,6 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
for (param = params->first; param; param = param->next) {
if (param->flags & EVI_INT_VAL)
required_length += INT2STR_MAX_LEN;
- nr_params++;
}
if (buff == NULL || required_length > buff_convert_len) {
@@ -885,14 +1054,8 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
offset_buff += len;
idx++;
} else if ((param->flags & EVI_STR_VAL) && param->val.s.len && param->val.s.s) {
- for(i=0; ival.s.len; i++) {
- if (param->val.s.s[i] == delimiter.s[0]) {
- param->val.s.s[i] = ' ';
- }
- }
-
/* if escape_delimiter is configured, replace any in-value
- * occurrences of the delimiter with the escape sequence */
+ * occurrences of the delimiter with the escape sequence */
if (escape_delimiter.s) {
if (delimiter.len == 1) { /* fast single-char */
for (i = 0; i < param->val.s.len; i++)
@@ -931,9 +1094,6 @@ static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
nwritten = writev(opened_fds[f_idx], io_param, idx);
} while (nwritten < 0 && errno == EINTR);
- if (ev_name && ev_name->s)
- LM_DBG("raised event: %.*s has %d parameters\n", ev_name->len, ev_name->s, nr_params);
-
if (nwritten < 0){
LM_ERR("cannot write to socket\n");
return -1;
@@ -969,8 +1129,9 @@ static void flat_free(evi_reply_sock *sock) {
} else {
for (it = *list_sockets; it->next && fs != it->next; it = it->next) ;
if (it->next) {
- it->next = it->next->next;
- shm_free(it->next);
+ struct flat_socket *del = it->next;
+ it->next = del->next;
+ shm_free(del);
}
}
@@ -992,7 +1153,7 @@ static void flat_free(evi_reply_sock *sock) {
lock_release(global_lock);
/* check if we can close the file and actually delete it */
- verify_delete();
+ verify_delete();
}
static str flat_print(evi_reply_sock *sock){
@@ -1009,11 +1170,19 @@ void event_flatstore_rotate(int sender, void *param)
lock_get(global_lock);
for (file = *list_files; file; file = file->next) {
index = file->file_index_process;
- if (opened_fds[index] != -1 && rotate_version[index] != file->rotate_version) {
+ if (opened_fds[index] != -1 && fd_version[index] != file->current_version) {
+ LM_DBG("ipc_rotate: closing fd for %s (fd_ver=%d, cur_ver=%u, "
+ "counter_open=%u->%u, pending=%s)\n",
+ file->path.s, fd_version[index], file->current_version,
+ file->counter_open,
+ file->counter_open > 0 ? file->counter_open - 1 : 0,
+ file->pending ? "yes" : "no");
close(opened_fds[index]);
opened_fds[index] = -1; /* open it next time we have an event */
if (file->counter_open > 0)
file->counter_open--;
+
+ try_complete_ticket(file, fd_version[index]);
}
}
lock_release(global_lock);
@@ -1023,20 +1192,64 @@ static void event_flatstore_timer(unsigned int ticks, void *param)
{
struct flat_file* file;
- /* we only run when ticks is multiple of file_rotate_period */
- if (time(NULL) % file_rotate_period != 0)
+ time_t now = time(NULL);
+ /* run only if enough time passed since last rotation */
+ if ((now - *last_rotate_ts) < file_rotate_period)
return;
lock_get(global_lock);
for (file = *list_files; file; file = file->next) {
- file->rotate_version++;
+ struct rotation_ticket *t = NULL;
+ if (file_exists(file->pathname)) {
+ t = shm_malloc(sizeof *t);
+ if (t) {
+ t->ver = file->current_version;
+ t->open_left = file->counter_open;
+ t->reason = ROTATE_REASON_PERIOD;
+ t->old_name = shm_strdup(file->pathname);
+ t->next = NULL;
+ }
+ } else {
+ LM_DBG("skip ticket: file %s does not exist\n", file->pathname);
+ }
+
+ /* bump version, build NEW path */
+ file->current_version++;
file->record_count = 0;
file->bytes_written = 0;
+ file->header_written = 0;
ensure_file_path(file, 1);
- raise_rotation_event(file, ROTATE_REASON_PERIOD);
- LM_DBG("File %s is being rotated at %u - new file is %s\n",
- file->path.s, ticks, file->pathname);
+
+ LM_DBG("File %s is being rotated at %u - new file path is %s\n",
+ file->path.s, ticks, file->pathname);
+
+ if (t) {
+ queue_ticket(file, t);
+ } else if (file->pending) {
+ /* No new file to create a ticket for, but a previous
+ * ticket is stuck (workers never closed their FDs
+ * because calls stopped). Force-complete it now. */
+ struct rotation_ticket *stale = file->pending;
+
+ LM_DBG("forcing completion of stuck pending ticket "
+ "for %s: old_name=%s ver=%u open_left=%u "
+ "counter_open=%u\n",
+ file->path.s,
+ stale->old_name ? stale->old_name : "(null)",
+ stale->ver, stale->open_left,
+ file->counter_open);
+
+ file->pending = stale->next;
+
+ lock_release(global_lock);
+ raise_ticket_event(stale, file);
+ free_ticket(stale);
+ lock_get(global_lock);
+ }
}
+
+ *last_rotate_ts = now;
+
/* inform everyone they need to rotate */
lock_release(global_lock);
ipc_send_rpc_all(event_flatstore_rotate, 0);
@@ -1062,6 +1275,10 @@ static void verify_delete(void) {
close(opened_fds[del_it->file->file_index_process]);
if (del_it->file->counter_open > 0)
del_it->file->counter_open--;
+
+ try_complete_ticket(del_it->file,
+ fd_version[del_it->file->file_index_process]);
+
opened_fds[del_it->file->file_index_process] = -1;
}
@@ -1125,8 +1342,10 @@ static str fn_name = str_init("filename");
static str oldfn_name = str_init("old_filename");
-static void raise_rotation_event(struct flat_file *file, const char *reason)
+static void raise_ticket_event(struct rotation_ticket *t, struct flat_file *file)
{
+ LM_DBG("RAISE %s -> %s (reason=%s)\n", t->old_name ? t->old_name : "-", file->pathname, t->reason);
+
if (flatstore_evi_id == EVI_ERROR || !evi_probe_event(flatstore_evi_id))
return;
@@ -1140,15 +1359,9 @@ static void raise_rotation_event(struct flat_file *file, const char *reason)
char ts_buf[32];
str ts_val = { ts_buf, 0 };
- str fn_val = { file->pathname, strlen(file->pathname) };
-
- if (!file->old_pathname)
- file->old_pathname = (char *)flat_empty_str;
-
- str oldfn_val = { file->old_pathname, strlen(file->old_pathname) };
-
-
- str reason_val = { (char*)reason, strlen(reason) };
+ str fn_val = { (char*)file->pathname, strlen(file->pathname) };
+ str oldfn_val = { t->old_name, strlen(t->old_name) };
+ str reason_val = { (char*)t->reason, strlen(t->reason) };
ts_val.len = snprintf(ts_buf, sizeof(ts_buf), "%ld", (long)time(NULL));
@@ -1164,6 +1377,9 @@ static void raise_rotation_event(struct flat_file *file, const char *reason)
if (evi_raise_event(flatstore_evi_id, list))
LM_ERR("unable to send flatstore rotation event\n");
+ /* postpone next periodic rotation */
+ *last_rotate_ts = time(NULL);
+
/* clean old_pathname */
if (file->old_pathname && file->old_pathname != file->path.s && file->old_pathname != flat_empty_str) {
shm_free(file->old_pathname);
@@ -1188,16 +1404,34 @@ static void update_counters_and_rotate(struct flat_file *file, ssize_t bytes_inc
hit_sz = 1;
if (hit_cnt || hit_sz) {
- file->rotate_version++;
+ struct rotation_ticket *t = NULL;
+ if (file_exists(file->pathname)) {
+ t = shm_malloc(sizeof *t);
+ if (t) {
+ t->ver = file->current_version;
+ t->open_left = file->counter_open;
+ t->reason = hit_cnt ? ROTATE_REASON_COUNT : ROTATE_REASON_SIZE;
+ t->old_name = shm_strdup(file->pathname);
+ t->next = NULL;
+ }
+ } else {
+ LM_DBG("skip ticket: file %s does not exist\n", file->pathname);
+ }
+
+ file->current_version++;
file->record_count = 0;
file->bytes_written = 0;
+ file->header_written = 0;
ensure_file_path(file, 1);
+
+ /* postpone next periodic rotation */
+ *last_rotate_ts = time(NULL);
+
+ if (t) queue_ticket(file, t);
}
lock_release(global_lock);
if (hit_cnt || hit_sz) {
- raise_rotation_event(file,
- hit_cnt ? ROTATE_REASON_COUNT : ROTATE_REASON_SIZE);
ipc_send_rpc_all(event_flatstore_rotate, 0);
}
}
diff --git a/modules/event_flatstore/event_flatstore.h b/modules/event_flatstore/event_flatstore.h
index 303c807ec91..e41192b1edb 100644
--- a/modules/event_flatstore/event_flatstore.h
+++ b/modules/event_flatstore/event_flatstore.h
@@ -39,6 +39,14 @@
#define ROTATE_REASON_PERIOD "period"
#define ROTATE_REASON_MI "mi"
+struct rotation_ticket {
+ unsigned int ver;
+ unsigned int open_left;
+ const char *reason;
+ char *old_name;
+ struct rotation_ticket *next;
+};
+
struct flat_file {
str path; // original path from module config (may include $var(...) placeholders)
char *pathname; // resolved absolute path after variable expansion
@@ -47,8 +55,10 @@ struct flat_file {
unsigned long bytes_written;
unsigned int file_index_process;
unsigned int counter_open;
- unsigned int rotate_version;
+ unsigned int current_version;
+ struct rotation_ticket *pending;
unsigned int flat_socket_ref;
+ int header_written;
struct flat_file *next;
struct flat_file *prev;
};