Skip to content

Commit 5b65d28

Browse files
committed
out_logdna: remove promoted keys from line field
Signed-off-by: lecaros <lecaros@chronosphere.io>
1 parent 230eb3c commit 5b65d28

4 files changed

Lines changed: 784 additions & 24 deletions

File tree

plugins/out_logdna/logdna.c

Lines changed: 124 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626

2727
#include "logdna.h"
2828

29+
#define LOGDNA_META_KEY "meta"
30+
#define LOGDNA_LEVEL_KEY "level"
31+
#define LOGDNA_SEVERITY_KEY "severity"
32+
#define LOGDNA_FILE_KEY "file"
33+
#define LOGDNA_APP_KEY "app"
34+
2935
static inline int primary_key_check(msgpack_object k, char *name, int len)
3036
{
3137
if (k.type != MSGPACK_OBJECT_STR) {
@@ -44,83 +50,114 @@ static inline int primary_key_check(msgpack_object k, char *name, int len)
4450
}
4551

4652
/*
47-
* This function looks for the following keys and add them to the buffer
53+
* This function looks for the following primary keys and promotes them to
54+
* the top-level line object:
4855
*
4956
* - level or severity
5057
* - file
5158
* - app
5259
* - meta
60+
*
61+
* When line_pck is not NULL, non-primary keys are packed into it for use
62+
* as the "line" body (excluding the promoted keys).
5363
*/
5464
static int record_append_primary_keys(struct flb_logdna *ctx,
5565
msgpack_object *map,
56-
msgpack_packer *mp_sbuf)
66+
msgpack_packer *mp_sbuf,
67+
msgpack_packer *line_pck)
5768
{
5869
int i;
5970
int c = 0;
71+
int is_primary;
72+
int line_count = 0;
6073
msgpack_object *level = NULL;
6174
msgpack_object *file = NULL;
6275
msgpack_object *app = NULL;
6376
msgpack_object *meta = NULL;
6477
msgpack_object k;
6578
msgpack_object v;
6679

67-
for (i = 0; i < map->via.array.size; i++) {
80+
if (line_pck) {
81+
for (i = 0; i < map->via.map.size; i++) {
82+
k = map->via.map.ptr[i].key;
83+
if (primary_key_check(k, LOGDNA_META_KEY, sizeof(LOGDNA_META_KEY) - 1) == FLB_TRUE ||
84+
primary_key_check(k, LOGDNA_LEVEL_KEY, sizeof(LOGDNA_LEVEL_KEY) - 1) == FLB_TRUE ||
85+
primary_key_check(k, LOGDNA_SEVERITY_KEY, sizeof(LOGDNA_SEVERITY_KEY) - 1) == FLB_TRUE ||
86+
primary_key_check(k, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1) == FLB_TRUE ||
87+
primary_key_check(k, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1) == FLB_TRUE) {
88+
continue;
89+
}
90+
line_count++;
91+
}
92+
msgpack_pack_map(line_pck, line_count);
93+
}
94+
95+
for (i = 0; i < map->via.map.size; i++) {
6896
k = map->via.map.ptr[i].key;
6997
v = map->via.map.ptr[i].val;
98+
is_primary = FLB_FALSE;
7099

71100
/* Level - optional */
72101
if (!level &&
73-
(primary_key_check(k, "level", 5) == FLB_TRUE ||
74-
primary_key_check(k, "severity", 8) == FLB_TRUE)) {
102+
(primary_key_check(k, LOGDNA_LEVEL_KEY, sizeof(LOGDNA_LEVEL_KEY) - 1) == FLB_TRUE ||
103+
primary_key_check(k, LOGDNA_SEVERITY_KEY, sizeof(LOGDNA_SEVERITY_KEY) - 1) == FLB_TRUE)) {
75104
level = &k;
76-
msgpack_pack_str(mp_sbuf, 5);
77-
msgpack_pack_str_body(mp_sbuf, "level", 5);
105+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_LEVEL_KEY) - 1);
106+
msgpack_pack_str_body(mp_sbuf, LOGDNA_LEVEL_KEY, sizeof(LOGDNA_LEVEL_KEY) - 1);
78107
msgpack_pack_object(mp_sbuf, v);
79108
c++;
109+
is_primary = FLB_TRUE;
80110
}
81111

82112
/* Meta - optional */
83-
if (!meta && primary_key_check(k, "meta", 4) == FLB_TRUE) {
113+
if (!meta && primary_key_check(k, LOGDNA_META_KEY, sizeof(LOGDNA_META_KEY) - 1) == FLB_TRUE) {
84114
meta = &k;
85-
msgpack_pack_str(mp_sbuf, 4);
86-
msgpack_pack_str_body(mp_sbuf, "meta", 4);
115+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_META_KEY) - 1);
116+
msgpack_pack_str_body(mp_sbuf, LOGDNA_META_KEY, sizeof(LOGDNA_META_KEY) - 1);
87117
msgpack_pack_object(mp_sbuf, v);
88118
c++;
119+
is_primary = FLB_TRUE;
89120
}
90121

91122
/* File */
92-
if (!file && primary_key_check(k, "file", 4) == FLB_TRUE) {
123+
if (!file && primary_key_check(k, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1) == FLB_TRUE) {
93124
file = &k;
94-
msgpack_pack_str(mp_sbuf, 4);
95-
msgpack_pack_str_body(mp_sbuf, "file", 4);
125+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_FILE_KEY) - 1);
126+
msgpack_pack_str_body(mp_sbuf, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1);
96127
msgpack_pack_object(mp_sbuf, v);
97128
c++;
129+
is_primary = FLB_TRUE;
98130
}
99131

100132
/* App */
101-
if (primary_key_check(k, "app", 3) == FLB_TRUE) {
133+
if (!app && primary_key_check(k, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1) == FLB_TRUE) {
102134
app = &k;
103-
msgpack_pack_str(mp_sbuf, 3);
104-
msgpack_pack_str_body(mp_sbuf, "app", 3);
135+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_APP_KEY) - 1);
136+
msgpack_pack_str_body(mp_sbuf, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1);
105137
msgpack_pack_object(mp_sbuf, v);
106138
c++;
139+
is_primary = FLB_TRUE;
140+
}
141+
142+
if (line_pck && is_primary == FLB_FALSE) {
143+
msgpack_pack_object(line_pck, k);
144+
msgpack_pack_object(line_pck, v);
107145
}
108146
}
109147

110148
/* Set the global file name if the record did not provided one */
111149
if (!file && ctx->file) {
112-
msgpack_pack_str(mp_sbuf, 4);
113-
msgpack_pack_str_body(mp_sbuf, "file", 4);
150+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_FILE_KEY) - 1);
151+
msgpack_pack_str_body(mp_sbuf, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1);
114152
msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->file));
115153
msgpack_pack_str_body(mp_sbuf, ctx->file, flb_sds_len(ctx->file));
116154
c++;
117155
}
118156

119-
120157
/* If no application name is set, set the default */
121158
if (!app) {
122-
msgpack_pack_str(mp_sbuf, 3);
123-
msgpack_pack_str_body(mp_sbuf, "app", 3);
159+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_APP_KEY) - 1);
160+
msgpack_pack_str_body(mp_sbuf, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1);
124161
msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->app));
125162
msgpack_pack_str_body(mp_sbuf, ctx->app, flb_sds_len(ctx->app));
126163
c++;
@@ -139,10 +176,14 @@ static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
139176
int total_lines;
140177
int array_size = 0;
141178
off_t map_off;
179+
size_t off;
142180
char *line_json;
143181
flb_sds_t json;
144182
msgpack_packer mp_pck;
145183
msgpack_sbuffer mp_sbuf;
184+
msgpack_packer mp_line_pck;
185+
msgpack_sbuffer mp_line_sbuf;
186+
msgpack_unpacked mp_line_result;
146187
struct flb_log_event_decoder log_decoder;
147188
struct flb_log_event log_event;
148189

@@ -178,10 +219,24 @@ static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
178219
msgpack_pack_map(&mp_pck, array_size);
179220

180221
/*
181-
* Append primary keys found, the return values is the number of appended
222+
* Append primary keys found, the return value is the number of appended
182223
* keys to the record, we use that to adjust the map header size.
224+
*
225+
* When exclude_promoted_keys is enabled, non-primary keys are packed
226+
* into mp_line_sbuf for use as the "line" body.
183227
*/
184-
ret = record_append_primary_keys(ctx, log_event.body, &mp_pck);
228+
if (ctx->exclude_promoted_keys) {
229+
msgpack_sbuffer_init(&mp_line_sbuf);
230+
msgpack_packer_init(&mp_line_pck, &mp_line_sbuf,
231+
msgpack_sbuffer_write);
232+
233+
ret = record_append_primary_keys(ctx, log_event.body,
234+
&mp_pck, &mp_line_pck);
235+
}
236+
else {
237+
ret = record_append_primary_keys(ctx, log_event.body,
238+
&mp_pck, NULL);
239+
}
185240
array_size += ret;
186241

187242
/* Timestamp */
@@ -193,7 +248,23 @@ static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
193248
msgpack_pack_str(&mp_pck, 4);
194249
msgpack_pack_str_body(&mp_pck, "line", 4);
195250

196-
line_json = flb_msgpack_to_json_str(1024, log_event.body, config->json_escape_unicode);
251+
if (ctx->exclude_promoted_keys) {
252+
msgpack_unpacked_init(&mp_line_result);
253+
off = 0;
254+
msgpack_unpack_next(&mp_line_result,
255+
mp_line_sbuf.data, mp_line_sbuf.size, &off);
256+
257+
line_json = flb_msgpack_to_json_str(1024, &mp_line_result.data,
258+
config->json_escape_unicode);
259+
260+
msgpack_unpacked_destroy(&mp_line_result);
261+
msgpack_sbuffer_destroy(&mp_line_sbuf);
262+
}
263+
else {
264+
line_json = flb_msgpack_to_json_str(1024, log_event.body,
265+
config->json_escape_unicode);
266+
}
267+
197268
len = strlen(line_json);
198269
msgpack_pack_str(&mp_pck, len);
199270
msgpack_pack_str_body(&mp_pck, line_json, len);
@@ -584,11 +655,39 @@ static struct flb_config_map config_map[] = {
584655
"Name of the application generating the data (optional)"
585656
},
586657

658+
{
659+
FLB_CONFIG_MAP_BOOL, "exclude_promoted_keys", "false",
660+
0, FLB_TRUE, offsetof(struct flb_logdna, exclude_promoted_keys),
661+
"Exclude promoted keys (meta, level, app, file) from the line body"
662+
},
663+
587664
/* EOF */
588665
{0}
589666

590667
};
591668

669+
static int cb_logdna_format_test(struct flb_config *config,
670+
struct flb_input_instance *ins,
671+
void *plugin_context,
672+
void *flush_ctx,
673+
int event_type,
674+
const char *tag, int tag_len,
675+
const void *data, size_t bytes,
676+
void **out_data, size_t *out_size)
677+
{
678+
flb_sds_t json;
679+
struct flb_logdna *ctx = plugin_context;
680+
681+
json = logdna_compose_payload(ctx, data, bytes, tag, tag_len, config);
682+
if (!json) {
683+
return -1;
684+
}
685+
686+
*out_data = json;
687+
*out_size = flb_sds_len(json);
688+
return 0;
689+
}
690+
592691
/* Plugin reference */
593692
struct flb_output_plugin out_logdna_plugin = {
594693
.name = "logdna",
@@ -597,5 +696,6 @@ struct flb_output_plugin out_logdna_plugin = {
597696
.cb_flush = cb_logdna_flush,
598697
.cb_exit = cb_logdna_exit,
599698
.config_map = config_map,
699+
.test_formatter.callback = cb_logdna_format_test,
600700
.flags = FLB_OUTPUT_NET | FLB_IO_TLS,
601701
};

plugins/out_logdna/logdna.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ struct flb_logdna {
4141
flb_sds_t file;
4242
flb_sds_t app;
4343
struct mk_list *tags;
44+
int exclude_promoted_keys;
4445

4546
/* Internal */
4647
flb_sds_t _hostname;

tests/runtime/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ if(FLB_IN_LIB)
231231
FLB_RT_TEST(FLB_OUT_HTTP "out_http.c")
232232
FLB_RT_TEST(FLB_OUT_KAFKA "out_kafka.c")
233233
FLB_RT_TEST(FLB_OUT_LIB "out_lib.c")
234+
FLB_RT_TEST(FLB_OUT_LOGDNA "out_logdna.c")
234235
FLB_RT_TEST(FLB_OUT_LOKI "out_loki.c")
235236
FLB_RT_TEST(FLB_OUT_NULL "out_null.c")
236237
FLB_RT_TEST(FLB_OUT_PLOT "out_plot.c")

0 commit comments

Comments
 (0)