Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct flb_kube {
size_t podname_len;

/* Kubernetes Token from FLB_KUBE_TOKEN file */
char *namespace_file;
char *token_file;
char *token;
size_t token_len;
Expand Down
249 changes: 222 additions & 27 deletions plugins/filter_kubernetes/kube_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,20 @@ static int get_local_pod_info(struct flb_kube *ctx)
char *hostname;

/* Get the namespace name */
ret = file_to_buffer(FLB_KUBE_NAMESPACE, &ns, &ns_size);
ret = file_to_buffer(ctx->namespace_file, &ns, &ns_size);
if (ret == -1) {
/*
* If it fails, it's just informational, as likely the caller
* wanted to connect using the Proxy instead from inside a POD.
*/
flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_NAMESPACE);
flb_plg_warn(ctx->ins, "cannot open %s", ctx->namespace_file);
return FLB_FALSE;
}

/* Namespace */
while (ns_size > 0 && (ns[ns_size - 1] == '\n' || ns[ns_size - 1] == '\r')) {
ns[--ns_size] = '\0';
}
ctx->namespace = ns;
ctx->namespace_len = ns_size;

Expand Down Expand Up @@ -1951,6 +1954,85 @@ static inline int extract_pod_meta(struct flb_kube *ctx,
return 0;
}

static int set_local_namespace_meta(struct flb_kube *ctx,
struct flb_kube_meta *meta)
{
int n;

memset(meta, '\0', sizeof(struct flb_kube_meta));

if (ctx->namespace == NULL) {
return -1;
}

meta->namespace = flb_strndup(ctx->namespace, ctx->namespace_len);
if (meta->namespace == NULL) {
flb_errno();
return -1;
}
meta->namespace_len = ctx->namespace_len;

n = meta->namespace_len + 1;
meta->cache_key = flb_malloc(n);
if (meta->cache_key == NULL) {
flb_errno();
return -1;
}

memcpy(meta->cache_key, meta->namespace, meta->namespace_len);
meta->cache_key[meta->namespace_len] = '\0';
meta->cache_key_len = meta->namespace_len;

return 0;
}

static int set_local_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *meta)
{
int n;
size_t off = 0;

memset(meta, '\0', sizeof(struct flb_kube_meta));

if (ctx->namespace == NULL || ctx->podname == NULL) {
return -1;
}

meta->namespace = flb_strndup(ctx->namespace, ctx->namespace_len);
if (meta->namespace == NULL) {
flb_errno();
return -1;
}
meta->namespace_len = ctx->namespace_len;
meta->fields++;

meta->podname = flb_strndup(ctx->podname, ctx->podname_len);
if (meta->podname == NULL) {
flb_errno();
return -1;
}
meta->podname_len = ctx->podname_len;
meta->fields++;

n = meta->namespace_len + 1 + meta->podname_len + 1;
meta->cache_key = flb_malloc(n);
if (meta->cache_key == NULL) {
flb_errno();
return -1;
}

memcpy(meta->cache_key, meta->namespace, meta->namespace_len);
off = meta->namespace_len;

meta->cache_key[off++] = ':';
memcpy(meta->cache_key + off, meta->podname, meta->podname_len);
off += meta->podname_len;

meta->cache_key[off] = '\0';
meta->cache_key_len = off;

return 0;
}

/*
* Given a fixed meta data (namespace), get API server information
* and merge buffers.
Expand Down Expand Up @@ -2314,12 +2396,10 @@ int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size)
return 0;
}

static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
static inline int lookup_pod_meta(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
{
int id;
int ret;
Expand All @@ -2329,12 +2409,6 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
size_t hash_meta_size;
msgpack_unpacked result;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_pod_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

/* Check if we have some data associated to the cache key */
ret = flb_hash_table_get(ctx->hash_table,
meta->cache_key, meta->cache_key_len,
Expand All @@ -2359,8 +2433,20 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
* the outgoing buffer and size.
*/
flb_free(tmp_hash_meta_buf);
flb_hash_table_get_by_id(ctx->hash_table, id, meta->cache_key,
&hash_meta_buf, &hash_meta_size);
ret = flb_hash_table_get_by_id(ctx->hash_table, id,
meta->cache_key,
&hash_meta_buf, &hash_meta_size);
if (ret == -1) {
*out_buf = NULL;
*out_size = 0;
return 0;
}
}
else {
flb_free(tmp_hash_meta_buf);
*out_buf = NULL;
*out_size = 0;
return 0;
}
}

Expand All @@ -2375,7 +2461,13 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
msgpack_unpacked_init(&result);

/* Unpack to get the offset/bytes of the first item */
msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
ret = msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);
*out_buf = NULL;
*out_size = 0;
return 0;
}

/* Set the pointer and proper size for the caller */
*out_buf = hash_meta_buf;
Expand All @@ -2394,9 +2486,40 @@ static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
return 0;
}

static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
static inline int flb_kube_pod_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
{
int ret;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_pod_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

return lookup_pod_meta(ctx, out_buf, out_size, meta, props);
}

static inline int flb_kube_local_pod_meta_get(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props)
{
int ret;

ret = set_local_pod_meta(ctx, meta);
if (ret != 0) {
return -1;
}

return lookup_pod_meta(ctx, out_buf, out_size, meta, props);
}

static inline int lookup_namespace_meta(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta)
{
Expand All @@ -2408,12 +2531,6 @@ static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
size_t hash_meta_size;
msgpack_unpacked result;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_namespace_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

/* Check if we have some data associated to the cache key */
ret = flb_hash_table_get(ctx->namespace_hash_table,
meta->cache_key, meta->cache_key_len,
Expand All @@ -2438,8 +2555,20 @@ static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
* the outgoing buffer and size.
*/
flb_free(tmp_hash_meta_buf);
flb_hash_table_get_by_id(ctx->namespace_hash_table, id, meta->cache_key,
&hash_meta_buf, &hash_meta_size);
ret = flb_hash_table_get_by_id(ctx->namespace_hash_table, id,
meta->cache_key,
&hash_meta_buf, &hash_meta_size);
if (ret == -1) {
*out_buf = NULL;
*out_size = 0;
return 0;
}
}
else {
flb_free(tmp_hash_meta_buf);
*out_buf = NULL;
*out_size = 0;
return 0;
}
}

Expand All @@ -2452,7 +2581,13 @@ static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
msgpack_unpacked_init(&result);

/* Unpack to get the offset/bytes of the first item */
msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
ret = msgpack_unpack_next(&result, hash_meta_buf, hash_meta_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);
*out_buf = NULL;
*out_size = 0;
return 0;
}

/* Set the pointer and proper size for the caller */
*out_buf = hash_meta_buf;
Expand All @@ -2463,6 +2598,37 @@ static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
return 0;
}

static inline int flb_kube_namespace_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta)
{
int ret;

/* Get metadata from tag or record (cache key is the important one) */
ret = extract_namespace_meta(ctx, tag, tag_len, data, data_size, meta);
if (ret != 0) {
return -1;
}

return lookup_namespace_meta(ctx, out_buf, out_size, meta);
}

static inline int flb_kube_local_namespace_meta_get(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
struct flb_kube_meta *meta)
{
int ret;

ret = set_local_namespace_meta(ctx, meta);
if (ret != 0) {
return -1;
}

return lookup_namespace_meta(ctx, out_buf, out_size, meta);
}

int flb_kube_meta_get(struct flb_kube *ctx,
const char *tag, int tag_len,
const char *data, size_t data_size,
Expand Down Expand Up @@ -2495,6 +2661,35 @@ int flb_kube_meta_get(struct flb_kube *ctx,
return -1;
}

int flb_kube_meta_get_local(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
const char **namespace_out_buf,
size_t *namespace_out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta)
{
int ret_namespace_meta = -1;
int ret_pod_meta = -1;

if (ctx->namespace_labels == FLB_TRUE || ctx->namespace_annotations == FLB_TRUE) {
ret_namespace_meta = flb_kube_local_namespace_meta_get(ctx, namespace_out_buf,
namespace_out_size,
namespace_meta);
}

if (ctx->namespace_metadata_only == FLB_FALSE) {
ret_pod_meta = flb_kube_local_pod_meta_get(ctx, out_buf, out_size,
meta, props);
Comment thread
cosmo0920 marked this conversation as resolved.
}

if (ret_pod_meta == 0 || ret_namespace_meta == 0) {
return 0;
}

return -1;
}

int flb_kube_meta_release(struct flb_kube_meta *meta)
{
int r = 0;
Expand Down
7 changes: 7 additions & 0 deletions plugins/filter_kubernetes/kube_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ int flb_kube_meta_get(struct flb_kube *ctx,
struct flb_kube_meta *meta,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta);
int flb_kube_meta_get_local(struct flb_kube *ctx,
const char **out_buf, size_t *out_size,
const char **namespace_out_buf,
size_t *namespace_out_size,
struct flb_kube_meta *meta,
struct flb_kube_props *props,
struct flb_kube_meta *namespace_meta);
int flb_kube_meta_release(struct flb_kube_meta *meta);
int flb_kube_pod_association_init(struct flb_kube *ctx, struct flb_config *config);
int get_api_server_configmap(struct flb_kube *ctx, const char *namespace, const char *configmap, char **out_buf, size_t *out_size);
Expand Down
8 changes: 6 additions & 2 deletions plugins/filter_kubernetes/kube_property.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ int flb_kube_prop_set(struct flb_kube *ctx, struct flb_kube_meta *meta,
/* If the property is for a specific container, and this is not
* that container, bail out
*/
if (container && strncmp(container, meta->container_name, container_len)) {
return 0;
if (container) {
if (meta->container_name == NULL ||
container_len != (size_t) meta->container_name_len ||
strncmp(container, meta->container_name, container_len) != 0) {
return 0;
}
}

return function(ctx, meta,
Expand Down
Loading
Loading