Skip to content

Commit 973e048

Browse files
committed
out_logdna: remove promoted keys from line field
Signed-off-by: lecaros <lecaros@chronosphere.io>
1 parent 86ec64a commit 973e048

2 files changed

Lines changed: 147 additions & 39 deletions

File tree

plugins/out_logdna/logdna.c

Lines changed: 146 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626

2727
#include "logdna.h"
2828

29+
#define LOGDNA_META_KEY "meta"
30+
#define LOGDNA_LEVEL_KEY "level"
31+
#define LOGDNA_SEVERITY_KEY "severity"
32+
#define LOGDNA_FILE_KEY "file"
33+
#define LOGDNA_APP_KEY "app"
34+
2935
static inline int primary_key_check(msgpack_object k, char *name, int len)
3036
{
3137
if (k.type != MSGPACK_OBJECT_STR) {
@@ -44,83 +50,121 @@ static inline int primary_key_check(msgpack_object k, char *name, int len)
4450
}
4551

4652
/*
47-
* This function looks for the following keys and add them to the buffer
53+
* This function looks for the following primary keys and promotes them to
54+
* the top-level line object:
4855
*
4956
* - level or severity
5057
* - file
5158
* - app
5259
* - meta
60+
*
61+
* When line_pck is not NULL, non-primary keys are packed into it for use
62+
* as the "line" body (excluding the promoted keys).
5363
*/
5464
static int record_append_primary_keys(struct flb_logdna *ctx,
5565
msgpack_object *map,
56-
msgpack_packer *mp_sbuf)
66+
msgpack_packer *mp_sbuf,
67+
msgpack_packer *line_pck)
5768
{
5869
int i;
5970
int c = 0;
71+
int is_primary;
72+
int line_count = 0;
6073
msgpack_object *level = NULL;
6174
msgpack_object *file = NULL;
6275
msgpack_object *app = NULL;
6376
msgpack_object *meta = NULL;
6477
msgpack_object k;
6578
msgpack_object v;
6679

67-
for (i = 0; i < map->via.array.size; i++) {
80+
if (line_pck) {
81+
for (i = 0; i < map->via.map.size; i++) {
82+
k = map->via.map.ptr[i].key;
83+
if (primary_key_check(k, LOGDNA_META_KEY, sizeof(LOGDNA_META_KEY) - 1) == FLB_TRUE ||
84+
primary_key_check(k, LOGDNA_LEVEL_KEY, sizeof(LOGDNA_LEVEL_KEY) - 1) == FLB_TRUE ||
85+
primary_key_check(k, LOGDNA_SEVERITY_KEY, sizeof(LOGDNA_SEVERITY_KEY) - 1) == FLB_TRUE ||
86+
primary_key_check(k, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1) == FLB_TRUE ||
87+
primary_key_check(k, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1) == FLB_TRUE) {
88+
continue;
89+
}
90+
line_count++;
91+
}
92+
msgpack_pack_map(line_pck, line_count);
93+
}
94+
95+
for (i = 0; i < map->via.map.size; i++) {
6896
k = map->via.map.ptr[i].key;
6997
v = map->via.map.ptr[i].val;
70-
71-
/* Level - optional */
72-
if (!level &&
73-
(primary_key_check(k, "level", 5) == FLB_TRUE ||
74-
primary_key_check(k, "severity", 8) == FLB_TRUE)) {
75-
level = &k;
76-
msgpack_pack_str(mp_sbuf, 5);
77-
msgpack_pack_str_body(mp_sbuf, "level", 5);
78-
msgpack_pack_object(mp_sbuf, v);
79-
c++;
98+
is_primary = FLB_FALSE;
99+
100+
/* Level - optional (both "level" and "severity" are primary) */
101+
if (primary_key_check(k, LOGDNA_LEVEL_KEY, sizeof(LOGDNA_LEVEL_KEY) - 1) == FLB_TRUE ||
102+
primary_key_check(k, LOGDNA_SEVERITY_KEY, sizeof(LOGDNA_SEVERITY_KEY) - 1) == FLB_TRUE) {
103+
is_primary = FLB_TRUE;
104+
if (!level) {
105+
level = &k;
106+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_LEVEL_KEY) - 1);
107+
msgpack_pack_str_body(mp_sbuf, LOGDNA_LEVEL_KEY, sizeof(LOGDNA_LEVEL_KEY) - 1);
108+
msgpack_pack_object(mp_sbuf, v);
109+
c++;
110+
}
80111
}
81112

82113
/* Meta - optional */
83-
if (!meta && primary_key_check(k, "meta", 4) == FLB_TRUE) {
84-
meta = &k;
85-
msgpack_pack_str(mp_sbuf, 4);
86-
msgpack_pack_str_body(mp_sbuf, "meta", 4);
87-
msgpack_pack_object(mp_sbuf, v);
88-
c++;
114+
if (primary_key_check(k, LOGDNA_META_KEY, sizeof(LOGDNA_META_KEY) - 1) == FLB_TRUE) {
115+
is_primary = FLB_TRUE;
116+
if (!meta) {
117+
meta = &k;
118+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_META_KEY) - 1);
119+
msgpack_pack_str_body(mp_sbuf, LOGDNA_META_KEY, sizeof(LOGDNA_META_KEY) - 1);
120+
msgpack_pack_object(mp_sbuf, v);
121+
c++;
122+
}
89123
}
90124

91125
/* File */
92-
if (!file && primary_key_check(k, "file", 4) == FLB_TRUE) {
93-
file = &k;
94-
msgpack_pack_str(mp_sbuf, 4);
95-
msgpack_pack_str_body(mp_sbuf, "file", 4);
96-
msgpack_pack_object(mp_sbuf, v);
97-
c++;
126+
if (primary_key_check(k, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1) == FLB_TRUE) {
127+
is_primary = FLB_TRUE;
128+
if (!file) {
129+
file = &k;
130+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_FILE_KEY) - 1);
131+
msgpack_pack_str_body(mp_sbuf, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1);
132+
msgpack_pack_object(mp_sbuf, v);
133+
c++;
134+
}
98135
}
99136

100137
/* App */
101-
if (primary_key_check(k, "app", 3) == FLB_TRUE) {
102-
app = &k;
103-
msgpack_pack_str(mp_sbuf, 3);
104-
msgpack_pack_str_body(mp_sbuf, "app", 3);
105-
msgpack_pack_object(mp_sbuf, v);
106-
c++;
138+
if (primary_key_check(k, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1) == FLB_TRUE) {
139+
is_primary = FLB_TRUE;
140+
if (!app) {
141+
app = &k;
142+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_APP_KEY) - 1);
143+
msgpack_pack_str_body(mp_sbuf, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1);
144+
msgpack_pack_object(mp_sbuf, v);
145+
c++;
146+
}
147+
}
148+
149+
if (line_pck && is_primary == FLB_FALSE) {
150+
msgpack_pack_object(line_pck, k);
151+
msgpack_pack_object(line_pck, v);
107152
}
108153
}
109154

110155
/* Set the global file name if the record did not provided one */
111156
if (!file && ctx->file) {
112-
msgpack_pack_str(mp_sbuf, 4);
113-
msgpack_pack_str_body(mp_sbuf, "file", 4);
157+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_FILE_KEY) - 1);
158+
msgpack_pack_str_body(mp_sbuf, LOGDNA_FILE_KEY, sizeof(LOGDNA_FILE_KEY) - 1);
114159
msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->file));
115160
msgpack_pack_str_body(mp_sbuf, ctx->file, flb_sds_len(ctx->file));
116161
c++;
117162
}
118163

119-
120164
/* If no application name is set, set the default */
121165
if (!app) {
122-
msgpack_pack_str(mp_sbuf, 3);
123-
msgpack_pack_str_body(mp_sbuf, "app", 3);
166+
msgpack_pack_str(mp_sbuf, sizeof(LOGDNA_APP_KEY) - 1);
167+
msgpack_pack_str_body(mp_sbuf, LOGDNA_APP_KEY, sizeof(LOGDNA_APP_KEY) - 1);
124168
msgpack_pack_str(mp_sbuf, flb_sds_len(ctx->app));
125169
msgpack_pack_str_body(mp_sbuf, ctx->app, flb_sds_len(ctx->app));
126170
c++;
@@ -139,10 +183,14 @@ static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
139183
int total_lines;
140184
int array_size = 0;
141185
off_t map_off;
186+
size_t off;
142187
char *line_json;
143188
flb_sds_t json;
144189
msgpack_packer mp_pck;
145190
msgpack_sbuffer mp_sbuf;
191+
msgpack_packer mp_line_pck;
192+
msgpack_sbuffer mp_line_sbuf;
193+
msgpack_unpacked mp_line_result;
146194
struct flb_log_event_decoder log_decoder;
147195
struct flb_log_event log_event;
148196

@@ -178,10 +226,24 @@ static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
178226
msgpack_pack_map(&mp_pck, array_size);
179227

180228
/*
181-
* Append primary keys found, the return values is the number of appended
229+
* Append primary keys found, the return value is the number of appended
182230
* keys to the record, we use that to adjust the map header size.
231+
*
232+
* When exclude_promoted_keys is enabled, non-primary keys are packed
233+
* into mp_line_sbuf for use as the "line" body.
183234
*/
184-
ret = record_append_primary_keys(ctx, log_event.body, &mp_pck);
235+
if (ctx->exclude_promoted_keys) {
236+
msgpack_sbuffer_init(&mp_line_sbuf);
237+
msgpack_packer_init(&mp_line_pck, &mp_line_sbuf,
238+
msgpack_sbuffer_write);
239+
240+
ret = record_append_primary_keys(ctx, log_event.body,
241+
&mp_pck, &mp_line_pck);
242+
}
243+
else {
244+
ret = record_append_primary_keys(ctx, log_event.body,
245+
&mp_pck, NULL);
246+
}
185247
array_size += ret;
186248

187249
/* Timestamp */
@@ -193,7 +255,23 @@ static flb_sds_t logdna_compose_payload(struct flb_logdna *ctx,
193255
msgpack_pack_str(&mp_pck, 4);
194256
msgpack_pack_str_body(&mp_pck, "line", 4);
195257

196-
line_json = flb_msgpack_to_json_str(1024, log_event.body, config->json_escape_unicode);
258+
if (ctx->exclude_promoted_keys) {
259+
msgpack_unpacked_init(&mp_line_result);
260+
off = 0;
261+
msgpack_unpack_next(&mp_line_result,
262+
mp_line_sbuf.data, mp_line_sbuf.size, &off);
263+
264+
line_json = flb_msgpack_to_json_str(1024, &mp_line_result.data,
265+
config->json_escape_unicode);
266+
267+
msgpack_unpacked_destroy(&mp_line_result);
268+
msgpack_sbuffer_destroy(&mp_line_sbuf);
269+
}
270+
else {
271+
line_json = flb_msgpack_to_json_str(1024, log_event.body,
272+
config->json_escape_unicode);
273+
}
274+
197275
len = strlen(line_json);
198276
msgpack_pack_str(&mp_pck, len);
199277
msgpack_pack_str_body(&mp_pck, line_json, len);
@@ -584,11 +662,39 @@ static struct flb_config_map config_map[] = {
584662
"Name of the application generating the data (optional)"
585663
},
586664

665+
{
666+
FLB_CONFIG_MAP_BOOL, "exclude_promoted_keys", "false",
667+
0, FLB_TRUE, offsetof(struct flb_logdna, exclude_promoted_keys),
668+
"Exclude promoted keys (meta, level, severity (promoted as level), app, file) from the line body"
669+
},
670+
587671
/* EOF */
588672
{0}
589673

590674
};
591675

676+
static int cb_logdna_format_test(struct flb_config *config,
677+
struct flb_input_instance *ins,
678+
void *plugin_context,
679+
void *flush_ctx,
680+
int event_type,
681+
const char *tag, int tag_len,
682+
const void *data, size_t bytes,
683+
void **out_data, size_t *out_size)
684+
{
685+
flb_sds_t json;
686+
struct flb_logdna *ctx = plugin_context;
687+
688+
json = logdna_compose_payload(ctx, data, bytes, tag, tag_len, config);
689+
if (!json) {
690+
return -1;
691+
}
692+
693+
*out_data = json;
694+
*out_size = flb_sds_len(json);
695+
return 0;
696+
}
697+
592698
/* Plugin reference */
593699
struct flb_output_plugin out_logdna_plugin = {
594700
.name = "logdna",
@@ -597,5 +703,6 @@ struct flb_output_plugin out_logdna_plugin = {
597703
.cb_flush = cb_logdna_flush,
598704
.cb_exit = cb_logdna_exit,
599705
.config_map = config_map,
706+
.test_formatter.callback = cb_logdna_format_test,
600707
.flags = FLB_OUTPUT_NET | FLB_IO_TLS,
601708
};

plugins/out_logdna/logdna.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ struct flb_logdna {
4141
flb_sds_t file;
4242
flb_sds_t app;
4343
struct mk_list *tags;
44+
int exclude_promoted_keys;
4445

4546
/* Internal */
4647
flb_sds_t _hostname;

0 commit comments

Comments
 (0)