Skip to content

Commit 79248c1

Browse files
committed
proxy: input plugins can now emit metrics and traces event types
Read event_type from the proxy definition and dispatch to the appropriate append function (flb_input_metrics_append or flb_input_trace_append) in the collect callback, mirroring how the output proxy side already handles event_type. Metrics and traces are decoded from msgpack using cmt_decode_msgpack and ctr_decode_msgpack respectively before being appended. Logs remain the default (event_type == 0) to preserve backwards compatibility. Error handling mirrors the pattern in fw_prot.c:append_log: return values are checked, resources are freed on failure, and ctr ownership is not assumed on the traces success path since flb_input_trace_append takes ownership of the context. Fixes: #11914
1 parent f617ed1 commit 79248c1

1 file changed

Lines changed: 50 additions & 5 deletions

File tree

src/flb_plugin_proxy.c

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@
3333
#include <fluent-bit/flb_utils.h>
3434
#include <fluent-bit/flb_plugin_proxy.h>
3535
#include <fluent-bit/flb_input_log.h>
36+
#include <fluent-bit/flb_input_metric.h>
37+
#include <fluent-bit/flb_input_trace.h>
38+
#include <fluent-bit/flb_input_event.h>
3639
#include <fluent-bit/flb_custom.h>
40+
#include <cmetrics/cmt_decode_msgpack.h>
41+
#include <ctraces/ctr_decode_msgpack.h>
3742

3843
/* Proxies */
3944
#include "proxy/go/go.h"
@@ -75,8 +80,12 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
7580
struct flb_config *config, void *in_context)
7681
{
7782
int ret = FLB_OK;
83+
int event_type;
7884
size_t len = 0;
85+
size_t offset = 0;
7986
void *data = NULL;
87+
struct cmt *cmt = NULL;
88+
struct ctrace *ctr = NULL;
8089
struct flb_plugin_input_proxy_context *ctx = (struct flb_plugin_input_proxy_context *) in_context;
8190

8291
#ifdef FLB_HAVE_PROXY_GO
@@ -85,7 +94,7 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
8594
ret = proxy_go_input_collect(ctx, &data, &len);
8695

8796
if (len == 0) {
88-
flb_trace("[GO] No logs are ingested");
97+
flb_trace("[GO] No data ingested");
8998
return -1;
9099
}
91100

@@ -94,17 +103,53 @@ static int flb_proxy_input_cb_collect(struct flb_input_instance *ins,
94103
return -1;
95104
}
96105

97-
flb_input_log_append(ins, NULL, 0, data, len);
106+
event_type = ctx->proxy->def->event_type;
107+
108+
if (event_type == FLB_INPUT_METRICS) {
109+
ret = cmt_decode_msgpack_create(&cmt, (char *) data, len, &offset);
110+
if (ret != CMT_DECODE_MSGPACK_SUCCESS) {
111+
flb_error("[proxy] failed to decode metrics msgpack (error %d)", ret);
112+
proxy_go_input_cleanup(ctx, data);
113+
return -1;
114+
}
115+
ret = flb_input_metrics_append(ins, NULL, 0, cmt);
116+
if (ret != 0) {
117+
flb_error("[proxy] could not append metrics, ret=%d", ret);
118+
cmt_decode_msgpack_destroy(cmt);
119+
proxy_go_input_cleanup(ctx, data);
120+
return -1;
121+
}
122+
cmt_decode_msgpack_destroy(cmt);
123+
}
124+
else if (event_type == FLB_INPUT_TRACES) {
125+
ret = ctr_decode_msgpack_create(&ctr, (char *) data, len, &offset);
126+
if (ret != CTR_DECODE_MSGPACK_SUCCESS) {
127+
flb_error("[proxy] failed to decode traces msgpack (error %d)", ret);
128+
proxy_go_input_cleanup(ctx, data);
129+
return -1;
130+
}
131+
ret = flb_input_trace_append(ins, NULL, 0, ctr);
132+
if (ret != 0) {
133+
flb_error("[proxy] could not append traces, ret=%d", ret);
134+
ctr_decode_msgpack_destroy(ctr);
135+
proxy_go_input_cleanup(ctx, data);
136+
return -1;
137+
}
138+
/* flb_input_trace_append takes ownership of ctr and destroys it on success */
139+
}
140+
else {
141+
/* default: logs */
142+
ret = flb_input_log_append(ins, NULL, 0, data, len);
143+
}
98144

99-
ret = proxy_go_input_cleanup(ctx, data);
100-
if (ret == -1) {
145+
if (proxy_go_input_cleanup(ctx, data) == -1) {
101146
flb_errno();
102147
return -1;
103148
}
104149
}
105150
#endif
106151

107-
return 0;
152+
return ret;
108153
}
109154

110155
static int flb_proxy_input_cb_init(struct flb_input_instance *ins,

0 commit comments

Comments
 (0)