Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ if(FLB_WINDOWS_DEFAULTS)
set(FLB_IN_LIB Yes)
set(FLB_IN_RANDOM Yes)
set(FLB_IN_SERIAL No)
set(FLB_IN_STDIN No)
set(FLB_IN_STDIN Yes)
set(FLB_IN_SYSLOG Yes)
set(FLB_IN_TAIL Yes)
set(FLB_IN_TCP Yes)
Expand Down
113 changes: 110 additions & 3 deletions plugins/in_stdin/in_stdin.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
#include <sys/stat.h>
#include <fcntl.h>

#ifdef FLB_SYSTEM_WINDOWS
#include <conio.h>
#include <io.h>
#endif

#include "in_stdin.h"

static inline void consume_bytes(char *buf, int bytes, int length)
Expand Down Expand Up @@ -179,6 +184,63 @@ static inline int pack_regex(struct flb_in_stdin_config *ctx,
return ret;
}

#ifdef FLB_SYSTEM_WINDOWS
static int stdin_win32_available(struct flb_in_stdin_config *ctx)
{
BOOL ret;
DWORD stdin_type;
DWORD available;
DWORD err;

stdin_type = ctx->stdin_type & ~FILE_TYPE_REMOTE;

if (stdin_type == FILE_TYPE_PIPE) {
ret = PeekNamedPipe(ctx->stdin_handle, NULL, 0, NULL, &available, NULL);
if (ret == FALSE) {
err = GetLastError();
if (err == ERROR_BROKEN_PIPE || err == ERROR_HANDLE_EOF) {
return 0;
Comment thread
cosmo0920 marked this conversation as resolved.
}

flb_plg_debug(ctx->ins, "could not query stdin pipe: win32 error=%lu", err);
return 0;
}

if (available == 0) {
return 0;
}

return 1;
}
else if (stdin_type == FILE_TYPE_CHAR) {
if (_kbhit() == 0) {
return 0;
}

return 1;
}

return 1;
}

static int stdin_read(struct flb_in_stdin_config *ctx, char *buf, size_t size)
{
int ret;

ret = stdin_win32_available(ctx);
if (ret <= 0) {
return ret;
}

return _read(ctx->fd, buf, (unsigned int) size);
}
#else
static int stdin_read(struct flb_in_stdin_config *ctx, char *buf, size_t size)
{
return read(ctx->fd, buf, size);
}
#endif

static int in_stdin_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
Expand All @@ -191,11 +253,17 @@ static int in_stdin_collect(struct flb_input_instance *ins,
struct flb_time out_time;
struct flb_in_stdin_config *ctx = in_context;

bytes = read(ctx->fd,
ctx->buf + ctx->buf_len,
ctx->buf_size - ctx->buf_len - 1);
bytes = stdin_read(ctx,
ctx->buf + ctx->buf_len,
ctx->buf_size - ctx->buf_len - 1);
flb_plg_trace(ctx->ins, "stdin read() = %i", bytes);

#ifdef FLB_SYSTEM_WINDOWS
if (bytes == 0 && (ctx->stdin_type & ~FILE_TYPE_REMOTE) == FILE_TYPE_PIPE) {
return 0;
}
#endif

if (bytes == 0) {
flb_plg_warn(ctx->ins, "end of file (stdin closed by remote end)");
}
Expand Down Expand Up @@ -310,6 +378,7 @@ static int in_stdin_config_init(struct flb_in_stdin_config *ctx,
ctx->buf = NULL;
ctx->buf_len = 0;
ctx->ins = in;
ctx->fd = -1;

ret = flb_input_config_map_set(in, (void *)ctx);
if (ret == -1) {
Expand Down Expand Up @@ -391,8 +460,26 @@ static int in_stdin_init(struct flb_input_instance *in,
goto init_error;
}

#ifdef FLB_SYSTEM_WINDOWS
ctx->stdin_handle = GetStdHandle(STD_INPUT_HANDLE);
if (ctx->stdin_handle == INVALID_HANDLE_VALUE || ctx->stdin_handle == NULL) {
flb_plg_error(ctx->ins, "could not open standard input handle");
goto init_error;
}
SetLastError(NO_ERROR);
ctx->stdin_type = GetFileType(ctx->stdin_handle);
if (ctx->stdin_type == FILE_TYPE_UNKNOWN && GetLastError() != NO_ERROR) {
flb_plg_error(ctx->ins, "could not detect standard input handle type");
goto init_error;
}
#endif

/* Clone the standard input file descriptor */
#ifdef FLB_SYSTEM_WINDOWS
fd = _dup(_fileno(stdin));
#else
fd = dup(STDIN_FILENO);
#endif
if (fd == -1) {
flb_errno();
flb_plg_error(ctx->ins, "Could not open standard input!");
Expand All @@ -407,11 +494,19 @@ static int in_stdin_init(struct flb_input_instance *in,
/* Set the context */
flb_input_set_context(in, ctx);

#ifdef FLB_SYSTEM_WINDOWS
ret = flb_input_set_collector_time(in,
in_stdin_collect,
0,
FLB_STDIN_WIN32_COLLECT_NSEC,
config);
#else
/* Collect upon data available on the standard input */
ret = flb_input_set_collector_event(in,
in_stdin_collect,
ctx->fd,
config);
#endif
if (ret == -1) {
flb_plg_error(ctx->ins, "Could not set collector for STDIN input plugin");
goto init_error;
Expand All @@ -421,6 +516,14 @@ static int in_stdin_init(struct flb_input_instance *in,
return 0;

init_error:
if (ctx->fd >= 0) {
#ifdef FLB_SYSTEM_WINDOWS
_close(ctx->fd);
#else
close(ctx->fd);
#endif
ctx->fd = -1;
}
in_stdin_config_destroy(ctx);

return -1;
Expand All @@ -436,7 +539,11 @@ static int in_stdin_exit(void *in_context, struct flb_config *config)
}

if (ctx->fd >= 0) {
#ifdef FLB_SYSTEM_WINDOWS
_close(ctx->fd);
#else
close(ctx->fd);
#endif
}
flb_pack_state_reset(&ctx->pack_state);
in_stdin_config_destroy(ctx);
Expand Down
10 changes: 10 additions & 0 deletions plugins/in_stdin/in_stdin.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#ifdef FLB_SYSTEM_WINDOWS
#include <windows.h>
#endif

#define DEFAULT_BUF_SIZE 16000
#define FLB_STDIN_WIN32_COLLECT_NSEC 100000000L

/* STDIN Input configuration & context */
struct flb_in_stdin_config {
Expand All @@ -41,6 +46,11 @@ struct flb_in_stdin_config {
struct flb_pack_state pack_state;
struct flb_input_instance *ins;
struct flb_log_event_encoder *log_encoder;

#ifdef FLB_SYSTEM_WINDOWS
HANDLE stdin_handle;
DWORD stdin_type;
#endif
};

extern struct flb_input_plugin in_stdin_plugin;
Expand Down
Loading