Skip to content

Commit f947d79

Browse files
reckless-rpc: open socket for listening to streaming logs
1 parent 3d0a90f commit f947d79

2 files changed

Lines changed: 114 additions & 3 deletions

File tree

plugins/recklessrpc.c

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
*/
33

44
#include "config.h"
5+
#include <arpa/inet.h>
56
#include <ccan/array_size/array_size.h>
67
#include <ccan/io/io.h>
78
#include <ccan/pipecmd/pipecmd.h>
@@ -10,8 +11,10 @@
1011
#include <common/json_stream.h>
1112
#include <common/memleak.h>
1213
#include <errno.h>
14+
#include <netinet/in.h>
1315
#include <plugins/libplugin.h>
1416
#include <signal.h>
17+
#include <sys/socket.h>
1518
#include <unistd.h>
1619

1720
static struct plugin *plugin;
@@ -21,12 +24,17 @@ struct reckless {
2124
int stdinfd;
2225
int stdoutfd;
2326
int stderrfd;
27+
int logfd;
2428
char *stdoutbuf;
2529
char *stderrbuf;
30+
char *logbuf;
2631
size_t stdout_read; /* running total */
2732
size_t stdout_new; /* new since last read */
2833
size_t stderr_read;
2934
size_t stderr_new;
35+
size_t log_read;
36+
size_t log_new;
37+
char* log_to_process;
3038
pid_t pid;
3139
char *process_failed;
3240
};
@@ -51,7 +59,7 @@ static void reckless_send_yes(struct reckless *reckless)
5159
static struct io_plan *read_more(struct io_conn *conn, struct reckless *rkls)
5260
{
5361
rkls->stdout_read += rkls->stdout_new;
54-
if (rkls->stdout_read == tal_count(rkls->stdoutbuf))
62+
if (rkls->stdout_read * 2 > tal_count(rkls->stdoutbuf))
5563
tal_resize(&rkls->stdoutbuf, rkls->stdout_read * 2);
5664
return io_read_partial(conn, rkls->stdoutbuf + rkls->stdout_read,
5765
tal_count(rkls->stdoutbuf) - rkls->stdout_read,
@@ -196,7 +204,7 @@ static struct io_plan *stderr_read_more(struct io_conn *conn,
196204
struct reckless *rkls)
197205
{
198206
rkls->stderr_read += rkls->stderr_new;
199-
if (rkls->stderr_read == tal_count(rkls->stderrbuf))
207+
if (rkls->stderr_read * 2 > tal_count(rkls->stderrbuf))
200208
tal_resize(&rkls->stderrbuf, rkls->stderr_read * 2);
201209
if (strends(rkls->stderrbuf, "[Y] to create one now.\n")) {
202210
plugin_log(plugin, LOG_DBG, "confirming config creation");
@@ -233,6 +241,82 @@ static bool is_single_arg_cmd(const char *command) {
233241
return false;
234242
}
235243

244+
static void log_conn_finish(struct io_conn *conn, struct reckless *reckless)
245+
{
246+
io_close(conn);
247+
close(reckless->logfd);
248+
249+
}
250+
251+
static struct io_plan *log_read_more(struct io_conn *conn,
252+
struct reckless *rkls)
253+
{
254+
rkls->log_read += rkls->log_new;
255+
256+
if (rkls->log_read*2 >= tal_count(rkls->logbuf))
257+
tal_resize(&rkls->logbuf, rkls->log_read * 2);
258+
259+
int unprocessed = rkls->log_read - (rkls->log_to_process - rkls->logbuf);
260+
char *lineend = memchr(rkls->log_to_process, 0x0A, unprocessed);
261+
262+
while (lineend != NULL) {
263+
char * note;
264+
note = tal_strndup(tmpctx, rkls->log_to_process,
265+
lineend - rkls->log_to_process);
266+
/* FIXME: Add notification for the utility logs. */
267+
plugin_log(plugin, LOG_DBG, "RECKLESS UTILITY: %s", note);
268+
rkls->log_to_process = lineend + 1;
269+
unprocessed = rkls->log_read - (rkls->log_to_process - rkls->logbuf);
270+
lineend = memchr(rkls->log_to_process, 0x0A, unprocessed);
271+
}
272+
273+
return io_read_partial(conn, rkls->logbuf + rkls->log_read,
274+
tal_count(rkls->logbuf) - rkls->log_read,
275+
&rkls->log_new, log_read_more, rkls);
276+
}
277+
278+
static struct io_plan *log_conn_init(struct io_conn *conn, struct reckless *rkls)
279+
{
280+
io_set_finish(conn, log_conn_finish, rkls);
281+
return log_read_more(conn, rkls);
282+
}
283+
284+
static int open_socket(int *port)
285+
{
286+
int sock;
287+
sock = socket(AF_INET, SOCK_STREAM, 0);
288+
if (sock < 0) {
289+
plugin_log(plugin, LOG_UNUSUAL, "could not open socket for "
290+
"streaming logs");
291+
return -1;
292+
}
293+
struct sockaddr_in ai;
294+
ai.sin_family = AF_INET;
295+
ai.sin_port = htons(0);
296+
inet_pton(AF_INET, "127.0.0.1", &ai.sin_addr);
297+
298+
if (bind(sock, (struct sockaddr *)&ai, sizeof(ai)) < 0) {
299+
plugin_log(plugin, LOG_UNUSUAL, "failed to bind socket: %s", strerror(errno));
300+
close(sock);
301+
return -1;
302+
}
303+
304+
socklen_t len = sizeof(ai);
305+
if (getsockname(sock, (struct sockaddr *)&ai, &len) < 0) {
306+
plugin_log(plugin, LOG_DBG, "couldn't retrieve socket port");
307+
return -1;
308+
}
309+
*port = ntohs(ai.sin_port);
310+
311+
if (listen(sock, 64) != 0) {
312+
plugin_log(plugin, LOG_UNUSUAL, "failed to listen on socket: %s", strerror(errno));
313+
close(sock);
314+
return -1;
315+
}
316+
317+
return sock;
318+
}
319+
236320
static struct command_result *reckless_call(struct command *cmd,
237321
const char *subcommand,
238322
const char *target,
@@ -242,6 +326,13 @@ static struct command_result *reckless_call(struct command *cmd,
242326
if (!subcommand || !target)
243327
return command_fail(cmd, PLUGIN_ERROR, "invalid reckless call");
244328
}
329+
int sock;
330+
int *port = tal(tmpctx, int);
331+
sock = open_socket(port);
332+
if (sock < 0)
333+
plugin_log(plugin, LOG_BROKEN, "not streaming logs "
334+
"from reckless utility");
335+
245336
char **my_call;
246337
my_call = tal_arrz(tmpctx, char *, 0);
247338
tal_arr_expand(&my_call, "reckless");
@@ -251,6 +342,11 @@ static struct command_result *reckless_call(struct command *cmd,
251342
tal_arr_expand(&my_call, lconfig.lightningdir);
252343
tal_arr_expand(&my_call, "--network");
253344
tal_arr_expand(&my_call, lconfig.network);
345+
if (sock > 0) {
346+
tal_arr_expand(&my_call, "--logging-port");
347+
tal_arr_expand(&my_call, tal_fmt(tmpctx, "%i", *port));
348+
}
349+
254350
if (lconfig.config) {
255351
tal_arr_expand(&my_call, "--conf");
256352
tal_arr_expand(&my_call, lconfig.config);
@@ -266,11 +362,17 @@ static struct command_result *reckless_call(struct command *cmd,
266362
reckless->cmd = cmd;
267363
reckless->stdoutbuf = tal_arrz(reckless, char, 4096);
268364
reckless->stderrbuf = tal_arrz(reckless, char, 4096);
365+
reckless->logbuf = tal_arrz(reckless, char, 4096);
269366
reckless->stdout_read = 0;
270367
reckless->stdout_new = 0;
271368
reckless->stderr_read = 0;
272369
reckless->stderr_new = 0;
370+
reckless->log_read = 0;
371+
reckless->log_new = 0;
372+
reckless->log_to_process = reckless->logbuf;
273373
reckless->process_failed = NULL;
374+
reckless->logfd = sock;
375+
274376
char * full_cmd;
275377
full_cmd = tal_fmt(tmpctx, "calling:");
276378
for (int i=0; i<tal_count(my_call); i++)
@@ -281,9 +383,13 @@ static struct command_result *reckless_call(struct command *cmd,
281383
reckless->pid = pipecmdarr(&reckless->stdinfd, &reckless->stdoutfd,
282384
&reckless->stderrfd, my_call);
283385

386+
if (sock > 0)
387+
io_new_listener(reckless, reckless->logfd,
388+
log_conn_init, reckless);
284389
/* FIXME: fail if invalid pid*/
285390
io_new_conn(reckless, reckless->stdoutfd, conn_init, reckless);
286391
io_new_conn(reckless, reckless->stderrfd, stderr_conn_init, reckless);
392+
287393
tal_free(my_call);
288394
return command_still_pending(cmd);
289395
}

tools/reckless

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,13 @@ class Logger:
6262
try:
6363
self.socket.connect(('localhost', port))
6464
except Exception as e:
65-
logging.warning(f'socket failed to connect with {e}')
6665
self.socket = None
66+
if logging.root.level <= logging.WARNING:
67+
msg = f'socket failed to connect with {e}'
68+
if self.capture:
69+
self.json_output['log'].append(self.str_esc(msg))
70+
else:
71+
logging.warning(msg)
6772

6873
def str_esc(self, raw_string: str) -> str:
6974
assert isinstance(raw_string, str)

0 commit comments

Comments
 (0)