Skip to content

Commit a0aa168

Browse files
committed
http_server: run internal server on worker thread safely
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent 3e1d016 commit a0aa168

7 files changed

Lines changed: 193 additions & 56 deletions

File tree

include/fluent-bit/http_server/flb_hs.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
#include <fluent-bit/flb_sds.h>
2727
#include <fluent-bit/http_server/flb_http_server.h>
2828
#include <monkey/mk_core.h>
29+
#include <pthread.h>
2930

3031
/*
3132
* HTTP buffers that contains certain cached data to be used
3233
* by end-points.
3334
*/
3435
struct flb_hs_buf {
36+
pthread_mutex_t lock;
3537
int users;
3638
int pending_free;
3739
flb_sds_t data;
@@ -81,6 +83,7 @@ struct flb_hs {
8183
struct flb_config *config;
8284
struct mk_list routes;
8385
struct mk_list health_metrics;
86+
pthread_mutex_t health_metrics_lock;
8487
struct flb_health_check_metrics_counter health_counter;
8588

8689
struct flb_hs_buf metrics;
@@ -102,6 +105,8 @@ int flb_hs_push_storage_metrics(struct flb_hs *hs, void *data, size_t size);
102105
int flb_hs_destroy(struct flb_hs *ctx);
103106
int flb_hs_start(struct flb_hs *hs);
104107
void flb_hs_cmt_buffer_destroy(void *data);
108+
int flb_hs_buf_acquire(struct flb_hs_buf *buffer, int require_data,
109+
int require_raw_data);
105110
void flb_hs_buf_release(struct flb_hs_buf *buffer, void (*raw_free)(void *));
106111
int flb_hs_register_endpoint(struct flb_hs *hs,
107112
const char *path,

src/http_server/api/v1/health.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,15 @@ int flb_hs_health_state_get(struct flb_hs *hs, struct flb_hs_health_state *state
4848
}
4949

5050
memset(state, 0, sizeof(struct flb_hs_health_state));
51+
pthread_mutex_lock(&hs->health_metrics_lock);
52+
5153
state->error_limit = hs->health_counter.error_limit;
5254
state->retry_failure_limit = hs->health_counter.retry_failure_limit;
5355
state->period_limit = hs->health_counter.period_limit;
5456

5557
if (mk_list_is_empty(&hs->health_metrics) == 0) {
5658
state->healthy = FLB_TRUE;
59+
pthread_mutex_unlock(&hs->health_metrics_lock);
5760
return 0;
5861
}
5962

@@ -79,10 +82,12 @@ int flb_hs_health_state_get(struct flb_hs *hs, struct flb_hs_health_state *state
7982
if (state->errors > hs->health_counter.error_limit ||
8083
state->retries_failed > hs->health_counter.retry_failure_limit) {
8184
state->healthy = FLB_FALSE;
85+
pthread_mutex_unlock(&hs->health_metrics_lock);
8286
return 0;
8387
}
8488

8589
state->healthy = FLB_TRUE;
90+
pthread_mutex_unlock(&hs->health_metrics_lock);
8691

8792
return 0;
8893
}

src/http_server/api/v1/metrics.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@
3737
/* Return the newest metrics buffer */
3838
static struct flb_hs_buf *metrics_get_latest(struct flb_hs *hs)
3939
{
40-
if (hs->metrics.data == NULL || hs->metrics.raw_data == NULL) {
40+
if (flb_hs_buf_acquire(&hs->metrics, FLB_TRUE, FLB_TRUE) != 0) {
4141
return NULL;
4242
}
43+
4344
return &hs->metrics;
4445
}
4546

@@ -159,9 +160,6 @@ static int cb_metrics_prometheus(struct flb_hs *hs,
159160
return flb_http_response_commit(response);
160161
}
161162

162-
/* ref count */
163-
buf->users++;
164-
165163
/* Compose outgoing buffer string */
166164
sds = flb_sds_create_size(1024);
167165
if (!sds) {
@@ -416,8 +414,6 @@ static int cb_metrics(struct flb_hs *hs,
416414
return flb_http_response_commit(response);
417415
}
418416

419-
buf->users++;
420-
421417
flb_hs_response_set_payload(response, 200,
422418
FLB_HS_CONTENT_TYPE_JSON,
423419
buf->data, flb_sds_len(buf->data));

src/http_server/api/v1/storage.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030
/* Return the newest storage metrics buffer */
3131
static struct flb_hs_buf *storage_metrics_get_latest(struct flb_hs *hs)
3232
{
33-
if (hs->storage_metrics.data == NULL) {
33+
if (flb_hs_buf_acquire(&hs->storage_metrics, FLB_TRUE, FLB_FALSE) != 0) {
3434
return NULL;
3535
}
36+
3637
return &hs->storage_metrics;
3738
}
3839

@@ -51,8 +52,6 @@ static int cb_storage(struct flb_hs *hs,
5152
return flb_http_response_commit(response);
5253
}
5354

54-
buf->users++;
55-
5655
flb_hs_response_set_payload(response, 200,
5756
FLB_HS_CONTENT_TYPE_JSON,
5857
buf->data, flb_sds_len(buf->data));

src/http_server/api/v2/metrics.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@
3636
/* Return the newest metrics buffer */
3737
static struct flb_hs_buf *metrics_get_latest(struct flb_hs *hs)
3838
{
39-
if (hs->metrics_v2.raw_data == NULL) {
39+
if (flb_hs_buf_acquire(&hs->metrics_v2, FLB_FALSE, FLB_TRUE) != 0) {
4040
return NULL;
4141
}
42+
4243
return &hs->metrics_v2;
4344
}
4445

@@ -59,7 +60,6 @@ static int cb_metrics_prometheus(struct flb_hs *hs,
5960
return flb_http_response_commit(response);
6061
}
6162

62-
buf->users++;
6363
cmt = (struct cmt *) buf->raw_data;
6464

6565
/* convert CMetrics to text */
@@ -97,7 +97,6 @@ static int cb_metrics(struct flb_hs *hs,
9797
return flb_http_response_commit(response);
9898
}
9999

100-
buf->users++;
101100
cmt = (struct cmt *) buf->raw_data;
102101

103102
/* convert CMetrics to text */

0 commit comments

Comments
 (0)