From 5d4ed46a0844a766a508a64bd66785e57f2c8e23 Mon Sep 17 00:00:00 2001 From: "Jonathan Gonzalez V." Date: Wed, 15 Apr 2026 10:25:03 +0200 Subject: [PATCH 1/2] out_pgsql: refactor plugin for the modern design The plugin was too old and required an update as follow: * Use of the config map for the config options * Decoder for the log events * Use of the internal instances to avoid having a pool of connections Signed-off-by: Jonathan Gonzalez V. --- plugins/out_pgsql/pgsql.c | 579 +++++++++++++++++--------- plugins/out_pgsql/pgsql.h | 65 +-- plugins/out_pgsql/pgsql_connections.c | 168 ++------ 3 files changed, 456 insertions(+), 356 deletions(-) diff --git a/plugins/out_pgsql/pgsql.c b/plugins/out_pgsql/pgsql.c index fd4d65030d1..0677cd9518e 100644 --- a/plugins/out_pgsql/pgsql.c +++ b/plugins/out_pgsql/pgsql.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "pgsql.h" #include "pgsql_connections.h" @@ -31,29 +33,285 @@ void pgsql_conf_destroy(struct flb_pgsql_config *ctx) flb_free(ctx->db_hostname); - if (ctx->db_table != NULL) { - flb_sds_destroy(ctx->db_table); + if (ctx->db_table_escaped != NULL) { + flb_sds_destroy(ctx->db_table_escaped); } - if (ctx->timestamp_key != NULL) { - flb_sds_destroy(ctx->timestamp_key); + if (ctx->insert_query != NULL) { + flb_sds_destroy(ctx->insert_query); } flb_free(ctx); ctx = NULL; } +flb_sds_t pgsql_build_insert_query(const char *table_name, int cockroachdb) +{ + flb_sds_t query; + size_t query_size; + + query_size = strlen(table_name) + 192; + query = flb_sds_create_size(query_size); + if (query == NULL) { + flb_errno(); + return NULL; + } + + if (cockroachdb) { + flb_sds_printf(&query, + "INSERT INTO %s (tag, time, data) VALUES ($1, " + "DATE '1970-01-01' + ($2::float8 * INTERVAL '1 second'), " + "$3::jsonb);", + table_name); + } + else { + flb_sds_printf(&query, + "INSERT INTO %s (tag, time, data) VALUES ($1, " + "to_timestamp($2::double precision), $3::jsonb);", + table_name); + } + + return query; +} + +int pgsql_format_timestamp(char *buffer, size_t size, struct flb_time *timestamp) +{ + double timestamp_value; + + timestamp_value = flb_time_to_double(timestamp); + + return snprintf(buffer, size, "%0.9f", timestamp_value); +} + +char *pgsql_format_body_json(msgpack_object *body, int escape_unicode) +{ + return flb_msgpack_to_json_str(1024, body, escape_unicode); +} + +void pgsql_free_body_json(char *json) +{ + if (json != NULL) { + flb_free(json); + } +} + +int pgsql_translate_decoder_result(int decoder_result) +{ + if (decoder_result == FLB_EVENT_DECODER_SUCCESS) { + return FLB_OK; + } + + if (decoder_result == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) { + return FLB_RETRY; + } + + return FLB_ERROR; +} + +const char *pgsql_conn_status_string(ConnStatusType status) +{ + switch (status) { + PGSQL_CONN_STATUS_MAP(PGSQL_CONN_STATUS_CASE) + default: + return "CONNECTION_UNKNOWN"; + } +} + +void pgsql_log_conn_error(struct flb_pgsql_config *ctx, const char *action, PGconn *conn) +{ + const char *message; + ConnStatusType status; + + if (conn == NULL) { + flb_plg_error(ctx->ins, "%s failed: no PostgreSQL connection handle", action); + return; + } + + message = PQerrorMessage(conn); + status = PQstatus(conn); + + if (message != NULL && message[0] != '\0') { + flb_plg_error(ctx->ins, "%s failed: %s (status=%s)", + action, message, pgsql_conn_status_string(status)); + } + else { + flb_plg_error(ctx->ins, "%s failed with empty libpq error (status=%s)", + action, pgsql_conn_status_string(status)); + } +} + +void pgsql_log_result_error(struct flb_pgsql_config *ctx, + const char *action, + PGconn *conn, + PGresult *res) +{ + const char *message; + ExecStatusType status; + + if (res == NULL) { + if (conn != NULL) { + pgsql_log_conn_error(ctx, action, conn); + } + else { + flb_plg_error(ctx->ins, "%s failed: no PGresult and no connection handle", + action); + } + + return; + } + + status = PQresultStatus(res); + message = PQresultErrorMessage(res); + + if (message != NULL && message[0] != '\0') { + flb_plg_error(ctx->ins, "%s failed: %s (result=%s)", + action, message, PQresStatus(status)); + } + else if (conn != NULL) { + message = PQerrorMessage(conn); + + if (message != NULL && message[0] != '\0') { + flb_plg_error(ctx->ins, "%s failed: %s (result=%s, conn_status=%s)", + action, message, PQresStatus(status), + pgsql_conn_status_string(PQstatus(conn))); + } + else { + flb_plg_error(ctx->ins, + "%s failed with empty libpq error (result=%s, conn_status=%s)", + action, PQresStatus(status), + pgsql_conn_status_string(PQstatus(conn))); + } + } + else { + flb_plg_error(ctx->ins, "%s failed with empty libpq error (result=%s)", + action, PQresStatus(status)); + } +} + +static int pgsql_execute_command(struct flb_pgsql_config *ctx, + const char *command, + const char *action) +{ + PGresult *res; + + res = PQexec(ctx->conn_current, command); + + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) { + pgsql_log_result_error(ctx, action, ctx->conn_current, res); + if (res != NULL) { + PQclear(res); + } + return -1; + } + + PQclear(res); + + return 0; +} + +static int pgsql_insert_record(struct flb_pgsql_config *ctx, + const char *tag, + const char *timestamp, + const char *json) +{ + const char *param_values[3]; + PGresult *res; + + param_values[0] = tag; + param_values[1] = timestamp; + param_values[2] = json; + + res = PQexecPrepared(ctx->conn_current, + FLB_PGSQL_INSERT_STMT_NAME, + 3, + param_values, + NULL, + NULL, + 0); + + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) { + pgsql_log_result_error(ctx, "record insert", ctx->conn_current, res); + if (res != NULL) { + PQclear(res); + } + return -1; + } + + PQclear(res); + + return 0; +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "database", FLB_PGSQL_DBNAME, + 0, FLB_TRUE, offsetof(struct flb_pgsql_config, db_name), + "PostgreSQL database name." + }, + { + FLB_CONFIG_MAP_STR, "table", FLB_PGSQL_TABLE, + 0, FLB_TRUE, offsetof(struct flb_pgsql_config, db_table), + "Destination table name." + }, + { + FLB_CONFIG_MAP_STR, "connection_options", NULL, + 0, FLB_TRUE, offsetof(struct flb_pgsql_config, conn_options), + "Additional PostgreSQL connection options passed to libpq." + }, + { + FLB_CONFIG_MAP_STR, "user", NULL, + 0, FLB_TRUE, offsetof(struct flb_pgsql_config, db_user), + "Database user name." + }, + { + FLB_CONFIG_MAP_STR, "password", NULL, + 0, FLB_TRUE, offsetof(struct flb_pgsql_config, db_passwd), + "Database user password." + }, + { + FLB_CONFIG_MAP_BOOL, "cockroachdb", "false", + 0, FLB_TRUE, offsetof(struct flb_pgsql_config, cockroachdb), + "Enable CockroachDB-compatible timestamp SQL syntax." + }, + + {0} +}; + +static int pgsql_prepare_insert_statement(struct flb_pgsql_config *ctx) +{ + PGresult *res; + + res = PQprepare(ctx->conn_current, + FLB_PGSQL_INSERT_STMT_NAME, + ctx->insert_query, + 3, + NULL); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) { + pgsql_log_result_error(ctx, "prepare insert statement", ctx->conn_current, res); + if (res != NULL) { + PQclear(res); + } + ctx->insert_statement_prepared = FLB_FALSE; + return -1; + } + + PQclear(res); + ctx->insert_statement_prepared = FLB_TRUE; + + return 0; +} + static int cb_pgsql_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { - struct flb_pgsql_config *ctx; size_t str_len; PGresult *res; char *query = NULL; char *temp = NULL; - const char *tmp = NULL; int ret; + (void) config; + (void) data; /* set default network configuration */ flb_output_net_default(FLB_PGSQL_HOST, FLB_PGSQL_PORT, ins); @@ -66,6 +324,12 @@ static int cb_pgsql_init(struct flb_output_instance *ins, ctx->ins = ins; + ret = flb_output_config_map_set(ins, ctx); + if (ret == -1) { + pgsql_conf_destroy(ctx); + return -1; + } + /* Database host */ ctx->db_hostname = flb_strdup(ins->host.name); if (!ctx->db_hostname) { @@ -77,24 +341,6 @@ static int cb_pgsql_init(struct flb_output_instance *ins, /* Database port */ snprintf(ctx->db_port, sizeof(ctx->db_port), "%d", ins->host.port); - /* Database name */ - ctx->db_name = flb_output_get_property("database", ins); - if (!ctx->db_name) { - ctx->db_name = FLB_PGSQL_DBNAME; - } - - /* db table */ - tmp = flb_output_get_property("table", ins); - if (tmp) { - ctx->db_table = flb_sds_create(tmp); - } - else { - ctx->db_table = flb_sds_create(FLB_PGSQL_TABLE); - } - - /* connection options */ - ctx->conn_options = flb_output_get_property("connection_options", ins); - if (!ctx->db_table) { flb_errno(); pgsql_conf_destroy(ctx); @@ -102,108 +348,49 @@ static int cb_pgsql_init(struct flb_output_instance *ins, } /* db user */ - ctx->db_user = flb_output_get_property("user", ins); if (!ctx->db_user) { flb_plg_warn(ctx->ins, "You didn't supply a valid user to connect," "your current unix user will be used"); } - /* db user password */ - ctx->db_passwd = flb_output_get_property("password", ins); - - /* timestamp key */ - tmp = flb_output_get_property("timestamp_key", ins); - if (tmp) { - ctx->timestamp_key = flb_sds_create(tmp); - } - else { - ctx->timestamp_key = flb_sds_create(FLB_PGSQL_TIMESTAMP_KEY); - } - - if (!ctx->timestamp_key) { - flb_errno(); - pgsql_conf_destroy(ctx); - return -1; - } - - /* Pool size */ - tmp = flb_output_get_property("max_pool_size", ins); - if (tmp) { - ctx->max_pool_size = strtol(tmp, NULL, 0); - if (ctx->max_pool_size < 1) - ctx->max_pool_size = 1; - } - else { - ctx->max_pool_size = FLB_PGSQL_POOL_SIZE; - } - - tmp = flb_output_get_property("min_pool_size", ins); - if (tmp) { - ctx->min_pool_size = strtol(tmp, NULL, 0); - if (ctx->min_pool_size < 1 || ctx->min_pool_size > ctx->max_pool_size) - ctx->min_pool_size = ctx->max_pool_size; - } - else { - ctx->min_pool_size = FLB_PGSQL_MIN_POOL_SIZE; - } - - /* Sync Mode */ - tmp = flb_output_get_property("async", ins); - if (tmp && flb_utils_bool(tmp)) { - ctx->async = FLB_TRUE; - } - else { - ctx->async = FLB_FALSE; - } - - if (!ctx->async) { - ctx->min_pool_size = 1; - ctx->max_pool_size = 1; - } - - /* CockroachDB Support */ - tmp = flb_output_get_property("cockroachdb", ins); - if (tmp && flb_utils_bool(tmp)) { - ctx->cockroachdb = FLB_TRUE; - } - else { - ctx->cockroachdb = FLB_FALSE; - } - ret = pgsql_start_connections(ctx); if (ret) { + pgsql_conf_destroy(ctx); return -1; } flb_plg_info(ctx->ins, "host=%s port=%s dbname=%s OK", ctx->db_hostname, ctx->db_port, ctx->db_name); - flb_output_set_context(ins, ctx); - - temp = PQescapeIdentifier(ctx->conn_current->conn, ctx->db_table, + temp = PQescapeIdentifier(ctx->conn_current, ctx->db_table, flb_sds_len(ctx->db_table)); if (temp == NULL) { - flb_plg_error(ctx->ins, "failed to parse table name: %s", - PQerrorMessage(ctx->conn_current->conn)); + pgsql_log_conn_error(ctx, "table name escaping", ctx->conn_current); pgsql_conf_destroy(ctx); return -1; } - flb_sds_destroy(ctx->db_table); - ctx->db_table = flb_sds_create(temp); + ctx->db_table_escaped = flb_sds_create(temp); PQfreemem(temp); - if (!ctx->db_table) { + if (!ctx->db_table_escaped) { flb_errno(); pgsql_conf_destroy(ctx); return -1; } + ctx->insert_query = pgsql_build_insert_query(ctx->db_table_escaped, + ctx->cockroachdb); + if (ctx->insert_query == NULL) { + pgsql_conf_destroy(ctx); + return -1; + } + flb_plg_info(ctx->ins, "we check that the table %s " - "exists, if not we create it", ctx->db_table); + "exists, if not we create it", ctx->db_table_escaped); - str_len = 72 + flb_sds_len(ctx->db_table); + str_len = 72 + flb_sds_len(ctx->db_table_escaped); query = flb_malloc(str_len); if (query == NULL) { @@ -217,21 +404,31 @@ static int cb_pgsql_init(struct flb_output_instance *ins, snprintf(query, str_len, "CREATE TABLE IF NOT EXISTS %s " "(tag varchar, time timestamp, data jsonb);", - ctx->db_table); + ctx->db_table_escaped); flb_plg_trace(ctx->ins, "%s", query); - res = PQexec(ctx->conn_current->conn, query); + res = PQexec(ctx->conn_current, query); flb_free(query); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); + if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK) { + pgsql_log_result_error(ctx, "table creation", ctx->conn_current, res); + if (res != NULL) { + PQclear(res); + } pgsql_conf_destroy(ctx); return -1; } PQclear(res); + ret = pgsql_prepare_insert_statement(ctx); + if (ret != 0) { + pgsql_conf_destroy(ctx); + return -1; + } + + flb_output_set_context(ins, ctx); + return 0; } @@ -241,17 +438,23 @@ static void cb_pgsql_flush(struct flb_event_chunk *event_chunk, void *out_context, struct flb_config *config) { + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; struct flb_pgsql_config *ctx = out_context; - flb_sds_t json; - char *tmp = NULL; - char *query = NULL; - PGresult *res = NULL; - int send_res; - flb_sds_t tag_escaped = NULL; - size_t str_len; + char *json; + int decoder_result; + int flush_result; + int transaction_started; + char timestamp_value[64]; + + (void) out_flush; + (void) i_ins; + flush_result = FLB_OK; + transaction_started = FLB_FALSE; + json = NULL; - if (pgsql_next_connection(ctx) == 1) { + if (pgsql_next_connection(ctx) != 0) { FLB_OUTPUT_RETURN(FLB_RETRY); } @@ -262,109 +465,94 @@ static void cb_pgsql_flush(struct flb_event_chunk *event_chunk, * parameters previously used. This might be useful for error recovery * if a working connection is lost. */ - if (PQstatus(ctx->conn_current->conn) != CONNECTION_OK) { - PQreset(ctx->conn_current->conn); - FLB_OUTPUT_RETURN(FLB_RETRY); + if (PQstatus(ctx->conn_current) != CONNECTION_OK) { + ctx->insert_statement_prepared = FLB_FALSE; + PQreset(ctx->conn_current); + if (PQstatus(ctx->conn_current) != CONNECTION_OK) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + if (pgsql_prepare_insert_statement(ctx) != 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } + } + else if (ctx->insert_statement_prepared == FLB_FALSE) { + if (pgsql_prepare_insert_statement(ctx) != 0) { + FLB_OUTPUT_RETURN(FLB_RETRY); + } } - json = flb_pack_msgpack_to_json_format(event_chunk->data, - event_chunk->size, - FLB_PACK_JSON_FORMAT_JSON, - FLB_PACK_JSON_DATE_DOUBLE, - ctx->timestamp_key, - config->json_escape_unicode); - if (json == NULL) { - flb_errno(); + decoder_result = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + if (decoder_result != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, - "Can't parse the msgpack into json"); - FLB_OUTPUT_RETURN(FLB_RETRY); + "Log event decoder initialization error : %d", + decoder_result); + FLB_OUTPUT_RETURN(pgsql_translate_decoder_result(decoder_result)); } - tmp = PQescapeLiteral(ctx->conn_current->conn, json, flb_sds_len(json)); - flb_sds_destroy(json); - if (!tmp) { - flb_errno(); - PQfreemem(tmp); - flb_plg_error(ctx->ins, "Can't escape json string"); + if (pgsql_execute_command(ctx, "BEGIN", "transaction begin") != 0) { + flb_log_event_decoder_destroy(&log_decoder); FLB_OUTPUT_RETURN(FLB_RETRY); } + transaction_started = FLB_TRUE; - json = flb_sds_create(tmp); - PQfreemem(tmp); - if (!json) { - flb_errno(); - FLB_OUTPUT_RETURN(FLB_RETRY); - } + while (flb_log_event_decoder_next(&log_decoder, + &log_event) == FLB_EVENT_DECODER_SUCCESS) { + json = pgsql_format_body_json(log_event.body, config->json_escape_unicode); + if (json == NULL) { + flb_errno(); + flb_plg_error(ctx->ins, "Can't parse the msgpack record into json"); + flush_result = FLB_RETRY; + goto cleanup; + } - tmp = PQescapeLiteral(ctx->conn_current->conn, - event_chunk->tag, - flb_sds_len(event_chunk->tag)); - if (!tmp) { - flb_errno(); - flb_sds_destroy(json); - PQfreemem(tmp); - flb_plg_error(ctx->ins, "Can't escape tag string: %s", - event_chunk->tag); - FLB_OUTPUT_RETURN(FLB_RETRY); - } + pgsql_format_timestamp(timestamp_value, sizeof(timestamp_value), + &log_event.timestamp); - tag_escaped = flb_sds_create(tmp); - PQfreemem(tmp); - if (!tag_escaped) { - flb_errno(); - flb_sds_destroy(json); - FLB_OUTPUT_RETURN(FLB_RETRY); + if (pgsql_insert_record(ctx, + event_chunk->tag, + timestamp_value, + json) != 0) { + flush_result = FLB_RETRY; + goto cleanup; + } + + flb_plg_trace(ctx->ins, "inserted record with timestamp=%s", + timestamp_value); + pgsql_free_body_json(json); + json = NULL; } - str_len = 100 + flb_sds_len(json) - + flb_sds_len(tag_escaped) - + flb_sds_len(ctx->db_table) - + flb_sds_len(ctx->timestamp_key); - query = flb_malloc(str_len); + decoder_result = flb_log_event_decoder_get_last_result(&log_decoder); - if (query == NULL) { - flb_errno(); - flb_sds_destroy(json); - flb_sds_destroy(tag_escaped); - FLB_OUTPUT_RETURN(FLB_RETRY); + if (decoder_result != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, "Log event decoder error : %d", decoder_result); + flush_result = pgsql_translate_decoder_result(decoder_result); + goto cleanup; } + if (pgsql_execute_command(ctx, "COMMIT", "transaction commit") != 0) { + flush_result = FLB_RETRY; + goto cleanup; + } - snprintf(query, str_len, - ctx->cockroachdb ? FLB_PGSQL_INSERT_COCKROACH : FLB_PGSQL_INSERT, - ctx->db_table, tag_escaped, ctx->timestamp_key, json); - flb_plg_trace(ctx->ins, "query: %s", query); - - if (ctx->async) { - send_res = PQsendQuery(ctx->conn_current->conn, query); - flb_free(query); - flb_sds_destroy(json); - flb_sds_destroy(tag_escaped); - - if (send_res == 0) { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); - FLB_OUTPUT_RETURN(FLB_RETRY); - } + transaction_started = FLB_FALSE; + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(FLB_OK); - PQflush(ctx->conn_current->conn); +cleanup: + if (json != NULL) { + pgsql_free_body_json(json); } - else { - res = PQexec(ctx->conn_current->conn, query); - flb_free(query); - flb_sds_destroy(json); - flb_sds_destroy(tag_escaped); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); - PQclear(res); - FLB_OUTPUT_RETURN(FLB_RETRY); - } - PQclear(res); + + if (transaction_started) { + pgsql_execute_command(ctx, "ROLLBACK", "transaction rollback"); } - FLB_OUTPUT_RETURN(FLB_OK); + flb_log_event_decoder_destroy(&log_decoder); + FLB_OUTPUT_RETURN(flush_result); } static int cb_pgsql_exit(void *data, struct flb_config *config) @@ -386,5 +574,8 @@ struct flb_output_plugin out_pgsql_plugin = { .cb_init = cb_pgsql_init, .cb_flush = cb_pgsql_flush, .cb_exit = cb_pgsql_exit, - .flags = 0, + .event_type = FLB_OUTPUT_LOGS, + .config_map = config_map, + .flags = FLB_OUTPUT_NET, + .workers = 1, }; diff --git a/plugins/out_pgsql/pgsql.h b/plugins/out_pgsql/pgsql.h index e4505abfa1e..25b5df5ddd5 100644 --- a/plugins/out_pgsql/pgsql.h +++ b/plugins/out_pgsql/pgsql.h @@ -21,33 +21,35 @@ #define FLB_OUT_PGSQL_H #include +#include +#include #include #include #include +#include #define FLB_PGSQL_HOST "127.0.0.1" #define FLB_PGSQL_PORT 5432 #define FLB_PGSQL_DBNAME "fluentbit" #define FLB_PGSQL_TABLE "fluentbit" -#define FLB_PGSQL_TIMESTAMP_KEY "date" -#define FLB_PGSQL_POOL_SIZE 4 -#define FLB_PGSQL_MIN_POOL_SIZE 1 -#define FLB_PGSQL_SYNC FLB_FALSE #define FLB_PGSQL_COCKROACH FLB_FALSE - -#define FLB_PGSQL_INSERT "INSERT INTO %s SELECT %s, " \ - "to_timestamp(CAST(value->>'%s' as FLOAT))," \ - " * FROM json_array_elements(%s);" -#define FLB_PGSQL_INSERT_COCKROACH "INSERT INTO %s SELECT %s," \ - "CAST(value->>'%s' AS INTERVAL) + DATE'1970-01-01'," \ - " * FROM json_array_elements(%s);" - -struct flb_pgsql_conn { - struct mk_list _head; - PGconn *conn; - int number; -}; +#define FLB_PGSQL_INSERT_STMT_NAME "flb_pgsql_insert" + +#define PGSQL_CONN_STATUS_CASE(name) \ + case name: \ + return #name; + +#define PGSQL_CONN_STATUS_MAP(ENTRY) \ + ENTRY(CONNECTION_OK) \ + ENTRY(CONNECTION_BAD) \ + ENTRY(CONNECTION_STARTED) \ + ENTRY(CONNECTION_MADE) \ + ENTRY(CONNECTION_AWAITING_RESPONSE) \ + ENTRY(CONNECTION_AUTH_OK) \ + ENTRY(CONNECTION_SETENV) \ + ENTRY(CONNECTION_SSL_STARTUP) \ + ENTRY(CONNECTION_NEEDED) struct flb_pgsql_config { @@ -56,36 +58,37 @@ struct flb_pgsql_config { char db_port[8]; const char *db_name; flb_sds_t db_table; + flb_sds_t db_table_escaped; /* auth */ const char *db_user; const char *db_passwd; - /* time key */ - flb_sds_t timestamp_key; - /* instance reference */ struct flb_output_instance *ins; /* connections options */ const char *conn_options; - /* connections pool */ - struct mk_list conn_queue; - struct mk_list _head; - - struct flb_pgsql_conn *conn_current; - int max_pool_size; - int min_pool_size; - int active_conn; - - /* async mode or sync mode */ - int async; + PGconn *conn_current; + flb_sds_t insert_query; + int insert_statement_prepared; /* cockroachdb */ int cockroachdb; }; +flb_sds_t pgsql_build_insert_query(const char *table_name, int cockroachdb); +int pgsql_format_timestamp(char *buffer, size_t size, struct flb_time *timestamp); +char *pgsql_format_body_json(msgpack_object *body, int escape_unicode); +void pgsql_free_body_json(char *json); +int pgsql_translate_decoder_result(int decoder_result); +const char *pgsql_conn_status_string(ConnStatusType status); +void pgsql_log_conn_error(struct flb_pgsql_config *ctx, const char *action, PGconn *conn); +void pgsql_log_result_error(struct flb_pgsql_config *ctx, + const char *action, + PGconn *conn, + PGresult *res); void pgsql_conf_destroy(struct flb_pgsql_config *ctx); #endif diff --git a/plugins/out_pgsql/pgsql_connections.c b/plugins/out_pgsql/pgsql_connections.c index 961bba0109a..4d5f955c22e 100644 --- a/plugins/out_pgsql/pgsql_connections.c +++ b/plugins/out_pgsql/pgsql_connections.c @@ -23,62 +23,49 @@ void pgsql_destroy_connections(struct flb_pgsql_config *ctx) { - struct mk_list *tmp; - struct mk_list *head; - struct flb_pgsql_conn *conn; PGresult *res = NULL; - mk_list_foreach_safe(head, tmp, &ctx->conn_queue) { - conn = mk_list_entry(head, struct flb_pgsql_conn, _head); - if (PQstatus(conn->conn) == CONNECTION_OK) { - while(PQconsumeInput(conn->conn) == 0) { - res = PQgetResult(conn->conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - flb_plg_warn(ctx->ins, "%s", - PQerrorMessage(conn->conn)); - } - PQclear(res); + if (ctx->conn_current == NULL) { + return; + } + + if (ctx->conn_current != NULL) { + while ((res = PQgetResult(ctx->conn_current)) != NULL) { + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + pgsql_log_result_error(ctx, + "pending result drain", + ctx->conn_current, + res); } + PQclear(res); } - PQfinish(conn->conn); - flb_free(conn); + + PQfinish(ctx->conn_current); } + + ctx->conn_current = NULL; } void *pgsql_create_connection(struct flb_pgsql_config *ctx) { - struct flb_pgsql_conn *conn; + PGconn *conn; - conn = flb_calloc(1, sizeof(struct flb_pgsql_conn)); - if (!conn) { - flb_errno(); - return NULL; - } + conn = PQsetdbLogin(ctx->db_hostname, + ctx->db_port, + ctx->conn_options, + NULL, + ctx->db_name, + ctx->db_user, + ctx->db_passwd); - conn->conn = PQsetdbLogin(ctx->db_hostname, - ctx->db_port, - ctx->conn_options, - NULL, - ctx->db_name, - ctx->db_user, - ctx->db_passwd); - - if (PQstatus(conn->conn) != CONNECTION_OK) { - flb_plg_error(ctx->ins, - "failed connecting to host=%s with error: %s", - ctx->db_hostname, PQerrorMessage(conn->conn)); - PQfinish(conn->conn); - flb_free(conn); + if (conn == NULL) { + pgsql_log_conn_error(ctx, "PostgreSQL connection", conn); return NULL; } - flb_plg_info(ctx->ins, "switching postgresql connection " - "to non-blocking mode"); - - if (PQsetnonblocking(conn->conn, 1) != 0) { - flb_plg_error(ctx->ins, "non-blocking mode not set"); - PQfinish(conn->conn); - flb_free(conn); + if (PQstatus(conn) != CONNECTION_OK) { + pgsql_log_conn_error(ctx, "PostgreSQL connection", conn); + PQfinish(conn); return NULL; } @@ -87,107 +74,26 @@ void *pgsql_create_connection(struct flb_pgsql_config *ctx) int pgsql_start_connections(struct flb_pgsql_config *ctx) { - int i; - struct flb_pgsql_conn *conn = NULL; - - mk_list_init(&ctx->conn_queue); - ctx->active_conn = 0; - - for(i = 0; i < ctx->min_pool_size; i++) { - flb_plg_info(ctx->ins, "Opening connection: #%d", i); - - conn = (struct flb_pgsql_conn *)pgsql_create_connection(ctx); - if (conn == NULL) { - pgsql_conf_destroy(ctx); - return -1; - } - - conn->number = i; - ctx->active_conn++; - mk_list_add(&conn->_head, &ctx->conn_queue); - } - - ctx->conn_current = mk_list_entry_last(&ctx->conn_queue, - struct flb_pgsql_conn, - _head); - - return 0; -} - -int pgsql_new_connection(struct flb_pgsql_config *ctx) -{ - struct flb_pgsql_conn *conn = NULL; + PGconn *conn = NULL; - if (ctx->active_conn >= ctx->max_pool_size) { - return -1; - } + flb_plg_info(ctx->ins, "opening PostgreSQL connection"); - conn = (struct flb_pgsql_conn *)pgsql_create_connection(ctx); + conn = pgsql_create_connection(ctx); if (conn == NULL) { - pgsql_conf_destroy(ctx); return -1; } - conn->number = ctx->active_conn + 1; - ctx->active_conn++; - - mk_list_add(&conn->_head, &ctx->conn_queue); + ctx->conn_current = conn; return 0; } int pgsql_next_connection(struct flb_pgsql_config *ctx) { - struct flb_pgsql_conn *tmp = NULL; - PGresult *res = NULL; - struct mk_list *head; - int ret_conn = 1; - - if (PQconsumeInput(ctx->conn_current->conn) == 1) { - if (PQisBusy(ctx->conn_current->conn) == 0) { - res = PQgetResult(ctx->conn_current->conn); - PQclear(res); - } - } - else { - flb_plg_error(ctx->ins, "%s", - PQerrorMessage(ctx->conn_current->conn)); - } - - mk_list_foreach(head, &ctx->conn_queue) { - tmp = mk_list_entry(head, struct flb_pgsql_conn, _head); - if (ctx->conn_current == NULL) { - ctx->conn_current = tmp; - break; - } - - res = PQgetResult(tmp->conn); - - if (res == NULL) { - flb_plg_debug(ctx->ins, "Connection number %d", - tmp->number); - ctx->conn_current = tmp; - PQclear(res); - return 0; - } - - if (PQresultStatus(res) == PGRES_FATAL_ERROR) { - flb_plg_info(ctx->ins, "%s", - PQerrorMessage(tmp->conn)); - } - - PQclear(res); - } - - if (pgsql_new_connection(ctx) == -1) { - flb_plg_warn(ctx->ins, - "No more free connections." - " Increase max connections"); - } - else { - flb_plg_warn(ctx->ins, "Added new connection"); - ret_conn = pgsql_next_connection(ctx); + if (ctx->conn_current == NULL) { + flb_plg_error(ctx->ins, "no PostgreSQL connection available"); + return -1; } - return ret_conn; + return 0; } From 90dc68d04d37a851db58822fdcc5eb90c791b2a5 Mon Sep 17 00:00:00 2001 From: "Jonathan Gonzalez V." Date: Wed, 15 Apr 2026 10:27:14 +0200 Subject: [PATCH 2/2] tests: out_pgsql: Add internal unit tests for the refactored plugin Signed-off-by: Jonathan Gonzalez V. --- tests/internal/CMakeLists.txt | 12 + tests/internal/pgsql.c | 926 ++++++++++++++++++++++++++++++++++ 2 files changed, 938 insertions(+) create mode 100644 tests/internal/pgsql.c diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 9f01fc93b26..9ff66175a18 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -66,6 +66,13 @@ if(FLB_OUT_AZURE_BLOB) ) endif() +if(FLB_OUT_PGSQL) + set(UNIT_TESTS_FILES + ${UNIT_TESTS_FILES} + pgsql.c + ) +endif() + # TLS helpers if(FLB_TLS) set(UNIT_TESTS_FILES @@ -247,6 +254,11 @@ function(prepare_unit_tests TEST_PREFIX SOURCEFILES) target_link_libraries(${source_file_we} flb-plugin-out_azure_blob) endif() + if(FLB_OUT_PGSQL AND "${source_file_we}" STREQUAL "flb-it-pgsql") + target_link_libraries(${source_file_we} flb-plugin-out_pgsql) + target_include_directories(${source_file_we} PRIVATE ${PostgreSQL_INCLUDE_DIRS}) + endif() + if(FLB_STREAM_PROCESSOR) target_link_libraries(${source_file_we} flb-sp) endif() diff --git a/tests/internal/pgsql.c b/tests/internal/pgsql.c new file mode 100644 index 00000000000..6a71e9a9041 --- /dev/null +++ b/tests/internal/pgsql.c @@ -0,0 +1,926 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "flb_tests_internal.h" + +struct pg_conn { + ConnStatusType status; + ConnStatusType reset_status; + const char *error_message; +}; + +struct pg_result { + ExecStatusType status; + const char *error_message; +}; + +struct pgsql_test_libpq_state { + size_t active_results; + size_t begin_calls; + size_t commit_calls; + size_t rollback_calls; + size_t pqexec_calls; + size_t pqprepare_calls; + size_t pqexec_prepared_calls; + size_t pqreset_calls; + size_t pqfinish_calls; + size_t pqsetdb_calls; + size_t pqget_result_calls; + size_t pqescape_identifier_calls; + size_t insert_failure_call; + int pqsetdb_return_null; + int decoder_init_result; + ExecStatusType prepare_status; + ExecStatusType begin_status; + ExecStatusType commit_status; + ExecStatusType rollback_status; + ExecStatusType exec_prepared_failure_status; + ConnStatusType exec_prepared_failure_conn_status; + const char *result_error_message; +}; + +static struct pgsql_test_libpq_state pgsql_test_libpq_state; +static int pgsql_test_flush_result; + +static void pgsql_test_libpq_reset(void) +{ + memset(&pgsql_test_libpq_state, 0, sizeof(pgsql_test_libpq_state)); + + pgsql_test_libpq_state.prepare_status = PGRES_COMMAND_OK; + pgsql_test_libpq_state.begin_status = PGRES_COMMAND_OK; + pgsql_test_libpq_state.commit_status = PGRES_COMMAND_OK; + pgsql_test_libpq_state.rollback_status = PGRES_COMMAND_OK; + pgsql_test_libpq_state.exec_prepared_failure_status = PGRES_FATAL_ERROR; + pgsql_test_libpq_state.exec_prepared_failure_conn_status = CONNECTION_BAD; + pgsql_test_libpq_state.decoder_init_result = FLB_EVENT_DECODER_SUCCESS; + pgsql_test_libpq_state.result_error_message = "mock libpq failure"; +} + +static PGresult *pgsql_test_result_create(ExecStatusType status, + const char *error_message) +{ + struct pg_result *result; + + result = calloc(1, sizeof(struct pg_result)); + if (result == NULL) { + return NULL; + } + + result->status = status; + result->error_message = error_message != NULL ? error_message : ""; + pgsql_test_libpq_state.active_results++; + + return (PGresult *) result; +} + +PGconn *PQsetdbLogin(const char *pghost, const char *pgport, + const char *pgoptions, const char *pgtty, + const char *dbName, const char *login, + const char *pwd) +{ + struct pg_conn *conn; + + (void) pghost; + (void) pgport; + (void) pgoptions; + (void) pgtty; + (void) dbName; + (void) login; + (void) pwd; + + pgsql_test_libpq_state.pqsetdb_calls++; + + if (pgsql_test_libpq_state.pqsetdb_return_null) { + return NULL; + } + + conn = calloc(1, sizeof(struct pg_conn)); + if (conn == NULL) { + return NULL; + } + + conn->status = CONNECTION_OK; + conn->reset_status = CONNECTION_OK; + conn->error_message = "mock connection error"; + + return (PGconn *) conn; +} + +void PQfinish(PGconn *conn) +{ + pgsql_test_libpq_state.pqfinish_calls++; + free(conn); +} + +ConnStatusType PQstatus(const PGconn *conn) +{ + const struct pg_conn *connection; + + connection = (const struct pg_conn *) conn; + + return connection->status; +} + +char *PQerrorMessage(const PGconn *conn) +{ + const struct pg_conn *connection; + + connection = (const struct pg_conn *) conn; + + return (char *) connection->error_message; +} + +PGresult *PQexec(PGconn *conn, const char *query) +{ + struct pg_conn *connection; + + connection = (struct pg_conn *) conn; + pgsql_test_libpq_state.pqexec_calls++; + + if (strcmp(query, "BEGIN") == 0) { + pgsql_test_libpq_state.begin_calls++; + return pgsql_test_result_create(pgsql_test_libpq_state.begin_status, + pgsql_test_libpq_state.result_error_message); + } + + if (strcmp(query, "COMMIT") == 0) { + pgsql_test_libpq_state.commit_calls++; + return pgsql_test_result_create(pgsql_test_libpq_state.commit_status, + pgsql_test_libpq_state.result_error_message); + } + + if (strcmp(query, "ROLLBACK") == 0) { + pgsql_test_libpq_state.rollback_calls++; + return pgsql_test_result_create(pgsql_test_libpq_state.rollback_status, + pgsql_test_libpq_state.result_error_message); + } + + if (connection->error_message == NULL) { + connection->error_message = "mock connection error"; + } + + return pgsql_test_result_create(PGRES_COMMAND_OK, ""); +} + +PGresult *PQprepare(PGconn *conn, const char *stmtName, const char *query, + int nParams, const Oid *paramTypes) +{ + (void) conn; + (void) stmtName; + (void) query; + (void) nParams; + (void) paramTypes; + + pgsql_test_libpq_state.pqprepare_calls++; + + return pgsql_test_result_create(pgsql_test_libpq_state.prepare_status, + pgsql_test_libpq_state.result_error_message); +} + +PGresult *PQexecPrepared(PGconn *conn, const char *stmtName, int nParams, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) +{ + struct pg_conn *connection; + + (void) stmtName; + (void) nParams; + (void) paramValues; + (void) paramLengths; + (void) paramFormats; + (void) resultFormat; + + connection = (struct pg_conn *) conn; + pgsql_test_libpq_state.pqexec_prepared_calls++; + + if (pgsql_test_libpq_state.insert_failure_call != 0 && + pgsql_test_libpq_state.pqexec_prepared_calls == + pgsql_test_libpq_state.insert_failure_call) { + connection->status = + pgsql_test_libpq_state.exec_prepared_failure_conn_status; + connection->error_message = pgsql_test_libpq_state.result_error_message; + + return pgsql_test_result_create( + pgsql_test_libpq_state.exec_prepared_failure_status, + pgsql_test_libpq_state.result_error_message); + } + + return pgsql_test_result_create(PGRES_COMMAND_OK, ""); +} + +ExecStatusType PQresultStatus(const PGresult *res) +{ + const struct pg_result *result; + + result = (const struct pg_result *) res; + + return result->status; +} + +char *PQresultErrorMessage(const PGresult *res) +{ + const struct pg_result *result; + + result = (const struct pg_result *) res; + + return (char *) result->error_message; +} + +char *PQresStatus(ExecStatusType status) +{ + switch (status) { + case PGRES_COMMAND_OK: + return "PGRES_COMMAND_OK"; + case PGRES_FATAL_ERROR: + return "PGRES_FATAL_ERROR"; + default: + return "PGRES_UNKNOWN"; + } +} + +void PQclear(PGresult *res) +{ + pgsql_test_libpq_state.active_results--; + free(res); +} + +PGresult *PQgetResult(PGconn *conn) +{ + (void) conn; + + pgsql_test_libpq_state.pqget_result_calls++; + + return NULL; +} + +void PQreset(PGconn *conn) +{ + struct pg_conn *connection; + + connection = (struct pg_conn *) conn; + connection->status = connection->reset_status; + pgsql_test_libpq_state.pqreset_calls++; +} + +char *PQescapeIdentifier(PGconn *conn, const char *str, size_t len) +{ + char *buffer; + + (void) conn; + + buffer = malloc(len + 3); + if (buffer == NULL) { + return NULL; + } + + buffer[0] = '"'; + memcpy(&buffer[1], str, len); + buffer[len + 1] = '"'; + buffer[len + 2] = '\0'; + pgsql_test_libpq_state.pqescape_identifier_calls++; + + return buffer; +} + +void PQfreemem(void *ptr) +{ + free(ptr); +} + +/* Capture cb_pgsql_flush() return codes when compiling the plugin inline. */ +#undef FLB_OUTPUT_RETURN +#define FLB_OUTPUT_RETURN(x) \ + do { \ + pgsql_test_flush_result = x; \ + return; \ + } while (0) + +static int pgsql_test_flb_log_event_decoder_init( + struct flb_log_event_decoder *context, + char *input_buffer, + size_t input_length); + +#define flb_log_event_decoder_init pgsql_test_flb_log_event_decoder_init +#include "../../plugins/out_pgsql/pgsql.c" +#undef flb_log_event_decoder_init +#include "../../plugins/out_pgsql/pgsql_connections.c" + +static int pgsql_test_flb_log_event_decoder_init( + struct flb_log_event_decoder *context, + char *input_buffer, + size_t input_length) +{ + if (pgsql_test_libpq_state.decoder_init_result != + FLB_EVENT_DECODER_SUCCESS) { + return pgsql_test_libpq_state.decoder_init_result; + } + + return flb_log_event_decoder_init(context, input_buffer, input_length); +} + +static void pack_test_body(msgpack_sbuffer *sbuf, + msgpack_unpacked *unpacked, + msgpack_object **body) +{ + size_t off = 0; + + msgpack_sbuffer_init(sbuf); + + { + msgpack_packer pck; + + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + msgpack_pack_map(&pck, 2); + + msgpack_pack_str(&pck, 7); + msgpack_pack_str_body(&pck, "message", 7); + msgpack_pack_str(&pck, 5); + msgpack_pack_str_body(&pck, "hello", 5); + + msgpack_pack_str(&pck, 5); + msgpack_pack_str_body(&pck, "value", 5); + msgpack_pack_int(&pck, 1); + } + + msgpack_unpacked_init(unpacked); + TEST_CHECK(msgpack_unpack_next(unpacked, sbuf->data, sbuf->size, &off)); + + *body = unpacked->data.via.map.ptr != NULL ? &unpacked->data : NULL; +} + +static int pgsql_test_context_init(struct flb_pgsql_config *ctx, + struct flb_output_instance *ins, + struct flb_output_plugin *plugin, + struct pg_conn *conn) +{ + memset(ctx, 0, sizeof(struct flb_pgsql_config)); + memset(ins, 0, sizeof(struct flb_output_instance)); + memset(plugin, 0, sizeof(struct flb_output_plugin)); + memset(conn, 0, sizeof(struct pg_conn)); + + plugin->name = "pgsql"; + ins->p = plugin; + ins->log_level = FLB_LOG_OFF; + + conn->status = CONNECTION_OK; + conn->reset_status = CONNECTION_OK; + conn->error_message = "mock connection error"; + + ctx->ins = ins; + ctx->conn_current = (PGconn *) conn; + ctx->insert_statement_prepared = FLB_TRUE; + ctx->insert_query = pgsql_build_insert_query("\"fluentbit\"", FLB_FALSE); + + TEST_CHECK(ctx->insert_query != NULL); + + return ctx->insert_query != NULL ? 0 : -1; +} + +static void pgsql_test_context_destroy(struct flb_pgsql_config *ctx) +{ + if (ctx->insert_query != NULL) { + flb_sds_destroy(ctx->insert_query); + ctx->insert_query = NULL; + } +} + +static int pgsql_test_create_log_chunk(struct flb_log_event_encoder *encoder, + struct flb_event_chunk *chunk, + int record_count) +{ + struct flb_time timestamp; + flb_sds_t tag; + int index; + int result; + + result = flb_log_event_encoder_init(encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + TEST_CHECK(result == FLB_EVENT_ENCODER_SUCCESS); + if (result != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + for (index = 0; index < record_count; index++) { + result = flb_log_event_encoder_begin_record(encoder); + TEST_CHECK(result == FLB_EVENT_ENCODER_SUCCESS); + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + flb_time_set(×tamp, 1700000000 + index, 500000000); + + result = flb_log_event_encoder_set_timestamp(encoder, ×tamp); + TEST_CHECK(result == FLB_EVENT_ENCODER_SUCCESS); + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + result = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_CSTRING_VALUE("message"), + FLB_LOG_EVENT_CSTRING_VALUE("hello"), + FLB_LOG_EVENT_CSTRING_VALUE("value"), + FLB_LOG_EVENT_INT32_VALUE(index + 1)); + TEST_CHECK(result == FLB_EVENT_ENCODER_SUCCESS); + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + result = flb_log_event_encoder_commit_record(encoder); + TEST_CHECK(result == FLB_EVENT_ENCODER_SUCCESS); + if (result != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + } + + tag = flb_sds_create("pgsql.test"); + TEST_CHECK(tag != NULL); + if (tag == NULL) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + memset(chunk, 0, sizeof(struct flb_event_chunk)); + chunk->type = FLB_EVENT_TYPE_LOGS; + chunk->tag = tag; + chunk->data = encoder->output_buffer; + chunk->size = encoder->output_length; + chunk->total_events = record_count; + + return 0; +} + +static void pgsql_test_destroy_log_chunk(struct flb_log_event_encoder *encoder, + struct flb_event_chunk *chunk) +{ + if (chunk->tag != NULL) { + flb_sds_destroy(chunk->tag); + chunk->tag = NULL; + } + + flb_log_event_encoder_destroy(encoder); + chunk->data = NULL; + chunk->size = 0; +} + +static int pgsql_test_create_invalid_root_chunk(msgpack_sbuffer *sbuf, + struct flb_event_chunk *chunk) +{ + msgpack_packer pck; + flb_sds_t tag; + + msgpack_sbuffer_init(sbuf); + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&pck, 1); + msgpack_pack_str(&pck, 4); + msgpack_pack_str_body(&pck, "root", 4); + msgpack_pack_str(&pck, 3); + msgpack_pack_str_body(&pck, "bad", 3); + + tag = flb_sds_create("pgsql.invalid"); + TEST_CHECK(tag != NULL); + if (tag == NULL) { + msgpack_sbuffer_destroy(sbuf); + return -1; + } + + memset(chunk, 0, sizeof(struct flb_event_chunk)); + chunk->type = FLB_EVENT_TYPE_LOGS; + chunk->tag = tag; + chunk->data = sbuf->data; + chunk->size = sbuf->size; + chunk->total_events = 1; + + return 0; +} + +static void pgsql_test_destroy_invalid_root_chunk(msgpack_sbuffer *sbuf, + struct flb_event_chunk *chunk) +{ + if (chunk->tag != NULL) { + flb_sds_destroy(chunk->tag); + chunk->tag = NULL; + } + + msgpack_sbuffer_destroy(sbuf); + chunk->data = NULL; + chunk->size = 0; +} + +static int pgsql_test_invoke_flush(struct flb_event_chunk *event_chunk, + struct flb_pgsql_config *ctx, + struct flb_config *config) +{ + struct flb_output_flush out_flush; + + memset(&out_flush, 0, sizeof(out_flush)); + pgsql_test_flush_result = 123456; + + cb_pgsql_flush(event_chunk, &out_flush, NULL, ctx, config); + + return pgsql_test_flush_result; +} + +void test_pgsql_build_insert_query_postgres(void) +{ + flb_sds_t query; + + query = pgsql_build_insert_query("\"fluentbit\"", FLB_FALSE); + TEST_CHECK(query != NULL); + + if (query != NULL) { + TEST_CHECK(strcmp(query, + "INSERT INTO \"fluentbit\" (tag, time, data) VALUES ($1, " + "to_timestamp($2::double precision), $3::jsonb);") == 0); + flb_sds_destroy(query); + } +} + +void test_pgsql_build_insert_query_cockroach(void) +{ + flb_sds_t query; + + query = pgsql_build_insert_query("\"fluentbit\"", FLB_TRUE); + TEST_CHECK(query != NULL); + + if (query != NULL) { + TEST_CHECK(strcmp(query, + "INSERT INTO \"fluentbit\" (tag, time, data) VALUES ($1, " + "DATE '1970-01-01' + ($2::float8 * INTERVAL '1 second'), " + "$3::jsonb);") == 0); + flb_sds_destroy(query); + } +} + +void test_pgsql_format_timestamp(void) +{ + struct flb_time timestamp; + char buffer[64]; + int result; + + flb_time_set(×tamp, 1700000000, 500000000); + + result = pgsql_format_timestamp(buffer, sizeof(buffer), ×tamp); + + TEST_CHECK(result > 0); + TEST_CHECK(strcmp(buffer, "1700000000.500000000") == 0); +} + +void test_pgsql_format_body_json(void) +{ + msgpack_sbuffer sbuf; + msgpack_unpacked unpacked; + msgpack_object *body; + char *json; + + body = NULL; + pack_test_body(&sbuf, &unpacked, &body); + TEST_CHECK(body != NULL); + + if (body == NULL) { + msgpack_unpacked_destroy(&unpacked); + msgpack_sbuffer_destroy(&sbuf); + return; + } + + json = pgsql_format_body_json(body, FLB_TRUE); + TEST_CHECK(json != NULL); + + if (json != NULL) { + TEST_CHECK(strcmp(json, "{\"message\":\"hello\",\"value\":1}") == 0); + pgsql_free_body_json(json); + } + + msgpack_unpacked_destroy(&unpacked); + msgpack_sbuffer_destroy(&sbuf); +} + +void test_pgsql_format_body_json_cleanup_contract(void) +{ + msgpack_sbuffer sbuf; + msgpack_unpacked unpacked; + msgpack_object *body; + char *json; + int index; + + body = NULL; + pack_test_body(&sbuf, &unpacked, &body); + TEST_CHECK(body != NULL); + + if (body == NULL) { + msgpack_unpacked_destroy(&unpacked); + msgpack_sbuffer_destroy(&sbuf); + return; + } + + for (index = 0; index < 32; index++) { + json = pgsql_format_body_json(body, FLB_TRUE); + TEST_CHECK(json != NULL); + + if (json != NULL) { + TEST_CHECK(strcmp(json, "{\"message\":\"hello\",\"value\":1}") == 0); + pgsql_free_body_json(json); + } + } + + pgsql_free_body_json(NULL); + + msgpack_unpacked_destroy(&unpacked); + msgpack_sbuffer_destroy(&sbuf); +} + +void test_pgsql_translate_decoder_result(void) +{ + TEST_CHECK(pgsql_translate_decoder_result(FLB_EVENT_DECODER_SUCCESS) == FLB_OK); + TEST_CHECK(pgsql_translate_decoder_result( + FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) == FLB_RETRY); + TEST_CHECK(pgsql_translate_decoder_result( + FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE) == FLB_ERROR); +} + +void test_pgsql_conn_status_string(void) +{ + TEST_CHECK(strcmp(pgsql_conn_status_string(CONNECTION_OK), + "CONNECTION_OK") == 0); + TEST_CHECK(strcmp(pgsql_conn_status_string(CONNECTION_BAD), + "CONNECTION_BAD") == 0); + TEST_CHECK(strcmp(pgsql_conn_status_string((ConnStatusType) -1), + "CONNECTION_UNKNOWN") == 0); +} + +void test_pgsql_next_connection_without_connection_fails(void) +{ + struct flb_pgsql_config ctx; + struct flb_output_instance ins; + struct flb_output_plugin plugin; + + memset(&ctx, 0, sizeof(ctx)); + memset(&ins, 0, sizeof(ins)); + memset(&plugin, 0, sizeof(plugin)); + + plugin.name = "pgsql"; + ins.p = &plugin; + ins.log_level = FLB_LOG_OFF; + ctx.ins = &ins; + + TEST_CHECK(pgsql_next_connection(&ctx) == -1); +} + +void test_pgsql_destroy_connections_null_safe(void) +{ + struct flb_pgsql_config ctx; + + memset(&ctx, 0, sizeof(ctx)); + + pgsql_destroy_connections(&ctx); + TEST_CHECK(ctx.conn_current == NULL); +} + +void test_pgsql_create_connection_null_handle_fails(void) +{ + struct flb_pgsql_config ctx; + struct flb_output_instance ins; + struct flb_output_plugin plugin; + PGconn *conn; + + memset(&ctx, 0, sizeof(ctx)); + memset(&ins, 0, sizeof(ins)); + memset(&plugin, 0, sizeof(plugin)); + + plugin.name = "pgsql"; + ins.p = &plugin; + ins.log_level = FLB_LOG_OFF; + ctx.ins = &ins; + + pgsql_test_libpq_reset(); + pgsql_test_libpq_state.pqsetdb_return_null = FLB_TRUE; + + conn = pgsql_create_connection(&ctx); + + TEST_CHECK(conn == NULL); + TEST_CHECK(pgsql_test_libpq_state.pqsetdb_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.pqfinish_calls == 0); +} + +void test_cb_pgsql_flush_without_connection_retries(void) +{ + struct flb_pgsql_config ctx; + struct flb_output_instance ins; + struct flb_output_plugin plugin; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + struct flb_config config; + int result; + + memset(&ctx, 0, sizeof(ctx)); + memset(&ins, 0, sizeof(ins)); + memset(&plugin, 0, sizeof(plugin)); + memset(&config, 0, sizeof(config)); + + plugin.name = "pgsql"; + ins.p = &plugin; + ins.log_level = FLB_LOG_OFF; + ctx.ins = &ins; + + result = pgsql_test_create_log_chunk(&encoder, &chunk, 1); + TEST_CHECK(result == 0); + if (result != 0) { + return; + } + + pgsql_test_libpq_reset(); + + result = pgsql_test_invoke_flush(&chunk, &ctx, &config); + TEST_CHECK(result == FLB_RETRY); + TEST_CHECK(pgsql_test_libpq_state.begin_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.active_results == 0); + + pgsql_test_destroy_log_chunk(&encoder, &chunk); +} + +void test_cb_pgsql_flush_mid_batch_insert_failure_retries_and_recovers(void) +{ + struct flb_pgsql_config ctx; + struct flb_output_instance ins; + struct flb_output_plugin plugin; + struct pg_conn conn; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + struct flb_config config; + int result; + + memset(&config, 0, sizeof(config)); + + pgsql_test_libpq_reset(); + + result = pgsql_test_context_init(&ctx, &ins, &plugin, &conn); + TEST_CHECK(result == 0); + if (result != 0) { + return; + } + + result = pgsql_test_create_log_chunk(&encoder, &chunk, 2); + TEST_CHECK(result == 0); + if (result != 0) { + pgsql_test_context_destroy(&ctx); + return; + } + + pgsql_test_libpq_state.insert_failure_call = 2; + + result = pgsql_test_invoke_flush(&chunk, &ctx, &config); + TEST_CHECK(result == FLB_RETRY); + TEST_CHECK(pgsql_test_libpq_state.begin_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.commit_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.rollback_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.pqexec_prepared_calls == 2); + TEST_CHECK(pgsql_test_libpq_state.pqprepare_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.pqreset_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.active_results == 0); + TEST_CHECK(conn.status == CONNECTION_BAD); + + /* Recovery is attempted on the next flush once libpq reports BAD. */ + pgsql_test_libpq_state.insert_failure_call = 0; + + result = pgsql_test_invoke_flush(&chunk, &ctx, &config); + TEST_CHECK(result == FLB_OK); + TEST_CHECK(pgsql_test_libpq_state.begin_calls == 2); + TEST_CHECK(pgsql_test_libpq_state.commit_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.rollback_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.pqexec_prepared_calls == 4); + TEST_CHECK(pgsql_test_libpq_state.pqprepare_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.pqreset_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.active_results == 0); + TEST_CHECK(ctx.insert_statement_prepared == FLB_TRUE); + TEST_CHECK(conn.status == CONNECTION_OK); + + pgsql_test_destroy_log_chunk(&encoder, &chunk); + pgsql_test_context_destroy(&ctx); +} + +void test_cb_pgsql_flush_decoder_terminal_error_rolls_back(void) +{ + struct flb_pgsql_config ctx; + struct flb_output_instance ins; + struct flb_output_plugin plugin; + struct pg_conn conn; + struct flb_event_chunk chunk; + struct flb_config config; + msgpack_sbuffer sbuf; + int result; + + memset(&config, 0, sizeof(config)); + + pgsql_test_libpq_reset(); + + result = pgsql_test_context_init(&ctx, &ins, &plugin, &conn); + TEST_CHECK(result == 0); + if (result != 0) { + return; + } + + result = pgsql_test_create_invalid_root_chunk(&sbuf, &chunk); + TEST_CHECK(result == 0); + if (result != 0) { + pgsql_test_context_destroy(&ctx); + return; + } + + result = pgsql_test_invoke_flush(&chunk, &ctx, &config); + TEST_CHECK(result == FLB_ERROR); + TEST_CHECK(pgsql_test_libpq_state.begin_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.commit_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.rollback_calls == 1); + TEST_CHECK(pgsql_test_libpq_state.pqexec_prepared_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.active_results == 0); + + pgsql_test_destroy_invalid_root_chunk(&sbuf, &chunk); + pgsql_test_context_destroy(&ctx); +} + +void test_cb_pgsql_flush_decoder_init_terminal_error_returns_error(void) +{ + struct flb_pgsql_config ctx; + struct flb_output_instance ins; + struct flb_output_plugin plugin; + struct pg_conn conn; + struct flb_event_chunk chunk; + struct flb_config config; + struct flb_log_event_encoder encoder; + int result; + + memset(&config, 0, sizeof(config)); + + pgsql_test_libpq_reset(); + + result = pgsql_test_context_init(&ctx, &ins, &plugin, &conn); + TEST_CHECK(result == 0); + if (result != 0) { + return; + } + + result = pgsql_test_create_log_chunk(&encoder, &chunk, 1); + TEST_CHECK(result == 0); + if (result != 0) { + pgsql_test_context_destroy(&ctx); + return; + } + + pgsql_test_libpq_state.decoder_init_result = + FLB_EVENT_DECODER_ERROR_WRONG_ROOT_TYPE; + + result = pgsql_test_invoke_flush(&chunk, &ctx, &config); + + TEST_CHECK(result == FLB_ERROR); + TEST_CHECK(pgsql_test_libpq_state.begin_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.commit_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.rollback_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.pqexec_prepared_calls == 0); + TEST_CHECK(pgsql_test_libpq_state.active_results == 0); + + pgsql_test_destroy_log_chunk(&encoder, &chunk); + pgsql_test_context_destroy(&ctx); +} + +TEST_LIST = { + {"build_insert_query_postgres", test_pgsql_build_insert_query_postgres}, + {"build_insert_query_cockroach", test_pgsql_build_insert_query_cockroach}, + {"format_timestamp", test_pgsql_format_timestamp}, + {"format_body_json", test_pgsql_format_body_json}, + {"format_body_json_cleanup_contract", test_pgsql_format_body_json_cleanup_contract}, + {"translate_decoder_result", test_pgsql_translate_decoder_result}, + {"conn_status_string", test_pgsql_conn_status_string}, + {"next_connection_without_connection_fails", test_pgsql_next_connection_without_connection_fails}, + {"destroy_connections_null_safe", test_pgsql_destroy_connections_null_safe}, + {"create_connection_null_handle_fails", + test_pgsql_create_connection_null_handle_fails}, + {"cb_pgsql_flush_without_connection_retries", + test_cb_pgsql_flush_without_connection_retries}, + {"cb_pgsql_flush_mid_batch_insert_failure_retries_and_recovers", + test_cb_pgsql_flush_mid_batch_insert_failure_retries_and_recovers}, + {"cb_pgsql_flush_decoder_terminal_error_rolls_back", + test_cb_pgsql_flush_decoder_terminal_error_rolls_back}, + {"cb_pgsql_flush_decoder_init_terminal_error_returns_error", + test_cb_pgsql_flush_decoder_init_terminal_error_returns_error}, + {0} +};