@@ -32,11 +32,6 @@ struct plugin_timer {
3232 void * cb_arg ;
3333};
3434
35- struct rpc_conn {
36- int fd ;
37- MEMBUF (char ) mb ;
38- };
39-
4035/* We can have more than one of these pending at once. */
4136struct jstream {
4237 struct list_node list ;
@@ -94,7 +89,7 @@ struct plugin {
9489 /* To write to lightningd */
9590 struct list_head js_list ;
9691
97- /* Asynchronous RPC interaction */
92+ /* Asynchronous RPC interaction. */
9893 struct io_conn * io_rpc_conn ;
9994 struct list_head rpc_js_list ;
10095 struct jsonrpc_io * jsonrpc_in ;
@@ -103,8 +98,9 @@ struct plugin {
10398 STRMAP (struct out_req * ) out_reqs ;
10499 u64 next_outreq_id ;
105100
106- /* Synchronous RPC interaction */
107- struct rpc_conn * rpc_conn ;
101+ /* Synchronous RPC interaction: sync_io is NULL if they didn't want it. */
102+ int sync_fd ;
103+ struct jsonrpc_io * sync_io ;
108104
109105 /* Plugin information details */
110106 enum plugin_restartability restartability ;
@@ -535,32 +531,6 @@ struct json_out *json_out_obj(const tal_t *ctx,
535531 return jout ;
536532}
537533
538- static int read_json_from_rpc (struct plugin * p )
539- {
540- char * end ;
541-
542- /* We rely on the double-\n marker which only terminates JSON top
543- * levels. Thanks lightningd! */
544- while ((end = memmem (membuf_elems (& p -> rpc_conn -> mb ),
545- membuf_num_elems (& p -> rpc_conn -> mb ), "\n\n" , 2 ))
546- == NULL ) {
547- ssize_t r ;
548-
549- /* Make sure we've room for at least READ_CHUNKSIZE. */
550- membuf_prepare_space (& p -> rpc_conn -> mb , READ_CHUNKSIZE );
551- r = read (p -> rpc_conn -> fd , membuf_space (& p -> rpc_conn -> mb ),
552- membuf_num_space (& p -> rpc_conn -> mb ));
553- /* lightningd goes away, we go away. */
554- if (r == 0 )
555- exit (0 );
556- if (r < 0 )
557- plugin_err (p , "Reading JSON input: %s" , strerror (errno ));
558- membuf_added (& p -> rpc_conn -> mb , r );
559- }
560-
561- return end + 2 - membuf_elems (& p -> rpc_conn -> mb );
562- }
563-
564534/* This closes a JSON response and writes it out. */
565535static void finish_and_send_json (int fd , struct json_out * jout )
566536{
@@ -720,40 +690,63 @@ void command_set_usage(struct command *cmd, const char *usage TAKES)
720690 cmd -> methodname );
721691}
722692
723- /* Reads rpc reply and returns tokens, setting contents to 'error' or
724- - * 'result' (depending on *error). */
725- static jsmntok_t * read_rpc_reply (const tal_t * ctx ,
726- struct plugin * plugin ,
727- const jsmntok_t * * contents ,
728- bool * error ,
729- int * reqlen )
693+ static const char * read_one_json_sync (struct plugin * p , const jsmntok_t * * toks )
730694{
731- jsmntok_t * toks ;
695+ for (;;) {
696+ const char * buf , * error ;
732697
733- do {
734- * reqlen = read_json_from_rpc (plugin );
698+ error = jsonrpc_io_parse (tmpctx , p -> sync_io , toks , & buf );
699+ if (error )
700+ plugin_err (p , "Parsing sync lightningd: %s" , error );
701+ if (* toks )
702+ return buf ;
735703
736- toks = json_parse_simple (ctx ,
737- membuf_elems (& plugin -> rpc_conn -> mb ),
738- * reqlen );
739- if (!toks )
740- plugin_err (plugin , "Malformed JSON reply '%.*s'" ,
741- * reqlen , membuf_elems (& plugin -> rpc_conn -> mb ));
704+ /* lightningd goes away, we go away. */
705+ if (!jsonrpc_sync_read (p -> sync_io , p -> sync_fd )) {
706+ if (errno == 0 )
707+ exit (0 );
708+ else
709+ plugin_err (p , "Reading sync lightningd: %s" ,
710+ strerror (errno ));
711+ }
712+ }
713+ }
714+
715+ /* Reads rpc reply and returns result tokens */
716+ static const jsmntok_t * read_sync_rpc_reply (const tal_t * ctx ,
717+ struct plugin * plugin ,
718+ const char * method ,
719+ const char * * final_buffer )
720+ {
721+ const jsmntok_t * errtok , * resulttok , * toks ;
722+ const char * buffer ;
723+
724+ for (;;) {
725+ buffer = read_one_json_sync (plugin , & toks );
742726 /* FIXME: Don't simply ignore notifications here! */
743- } while (!json_get_member (membuf_elems (& plugin -> rpc_conn -> mb ), toks ,
744- "id" ));
727+ if (json_get_member (buffer , toks , "id" ))
728+ break ;
729+ jsonrpc_io_parse_done (plugin -> sync_io );
730+ }
745731
746- * contents = json_get_member (membuf_elems (& plugin -> rpc_conn -> mb ), toks , "error" );
747- if (* contents )
748- * error = true;
749- else {
750- * contents = json_get_member (membuf_elems (& plugin -> rpc_conn -> mb ), toks ,
751- "result" );
752- if (!* contents )
753- plugin_err (plugin , "JSON reply with no 'result' nor 'error'? '%.*s'" ,
754- * reqlen , membuf_elems (& plugin -> rpc_conn -> mb ));
755- * error = false;
732+ errtok = json_get_member (buffer , toks , "error" );
733+ if (errtok ) {
734+ plugin_err (plugin , "Got error result to %s: '%.*s'" ,
735+ method ,
736+ json_tok_full_len (toks ),
737+ json_tok_full (buffer , toks ));
756738 }
739+ resulttok = json_get_member (buffer , toks , "result" );
740+ if (!resulttok ) {
741+ plugin_err (plugin , "JSON reply with no 'result' nor 'error'? '%.*s'" ,
742+ json_tok_full_len (toks ),
743+ json_tok_full (buffer , toks ));
744+ }
745+
746+ /* Make the returned pointers valid tal object */
747+ json_dup_contents (ctx , buffer , resulttok , final_buffer , & toks );
748+ jsonrpc_io_parse_done (plugin -> sync_io );
749+
757750 return toks ;
758751}
759752
@@ -764,13 +757,8 @@ static const jsmntok_t *sync_req(const tal_t *ctx,
764757 const struct json_out * params TAKES ,
765758 const char * * resp )
766759{
767- bool error ;
768- jsmntok_t * toks ;
769- const jsmntok_t * contents ;
770- int reqlen ;
771760 struct json_out * jout = json_out_new (tmpctx );
772761 const char * id = json_id (tmpctx , plugin , "init/" , method );
773- size_t num_toks ;
774762
775763 json_out_start (jout , NULL , '{' );
776764 json_out_addstr (jout , "jsonrpc" , "2.0" );
@@ -788,23 +776,15 @@ static const jsmntok_t *sync_req(const tal_t *ctx,
788776
789777 /* If we're past init, we may need a new fd (the old one
790778 * is being used for async comms). */
791- if (plugin -> rpc_conn -> fd == -1 )
792- plugin -> rpc_conn -> fd = rpc_open (plugin );
793-
794- finish_and_send_json (plugin -> rpc_conn -> fd , jout );
795-
796- toks = read_rpc_reply (ctx , plugin , & contents , & error , & reqlen );
797- if (error )
798- plugin_err (plugin , "Got error reply to %s: '%.*s'" ,
799- method , reqlen , membuf_elems (& plugin -> rpc_conn -> mb ));
779+ if (plugin -> sync_fd == -1 ) {
780+ plugin -> sync_fd = rpc_open (plugin );
781+ if (!plugin -> sync_io )
782+ plugin -> sync_io = jsonrpc_io_new (plugin );
783+ }
800784
801- * resp = membuf_consume ( & plugin -> rpc_conn -> mb , reqlen );
785+ finish_and_send_json ( plugin -> sync_fd , jout );
802786
803- /* Make the returned pointer the valid tal object of minimal length */
804- num_toks = json_next (contents ) - contents ;
805- memmove (toks , contents , num_toks * sizeof (* toks ));
806- tal_resize (& toks , num_toks );
807- return toks ;
787+ return read_sync_rpc_reply (ctx , plugin , method , resp );
808788}
809789
810790const jsmntok_t * jsonrpc_request_sync (const tal_t * ctx ,
@@ -813,7 +793,6 @@ const jsmntok_t *jsonrpc_request_sync(const tal_t *ctx,
813793 const struct json_out * params TAKES ,
814794 const char * * resp )
815795{
816-
817796 return sync_req (ctx , cmd -> plugin , method , params , resp );
818797}
819798
@@ -1509,7 +1488,6 @@ static struct command_result *handle_init(struct command *cmd,
15091488 size_t i ;
15101489 char * dir , * network ;
15111490 struct plugin * p = cmd -> plugin ;
1512- bool with_rpc ;
15131491 const char * err ;
15141492
15151493 configtok = json_get_member (buf , params , "configuration" );
@@ -1536,17 +1514,10 @@ static struct command_result *handle_init(struct command *cmd,
15361514 /* Only attempt to connect if the plugin has configured the rpc_conn
15371515 * already, if that's not the case we were told to run without an RPC
15381516 * connection, so don't even log an error. */
1539- if (p -> rpc_conn != NULL ) {
1540- p -> rpc_conn -> fd = rpc_open (p );
1541- if (p -> rpc_conn -> fd == -1 )
1542- with_rpc = false;
1543- else
1544- with_rpc = true;
1545-
1546- membuf_init (& p -> rpc_conn -> mb , tal_arr (p , char , READ_CHUNKSIZE ),
1547- READ_CHUNKSIZE , membuf_tal_resize );
1548- } else
1549- with_rpc = false;
1517+ if (p -> sync_io )
1518+ p -> sync_fd = rpc_open (p );
1519+ else
1520+ p -> sync_fd = -1 ;
15501521
15511522 opttok = json_get_member (buf , params , "options" );
15521523 json_for_each_obj (i , t , opttok ) {
@@ -1570,19 +1541,20 @@ static struct command_result *handle_init(struct command *cmd,
15701541 disable ));
15711542 }
15721543
1573- if (with_rpc ) {
1544+ /* Now set up async. */
1545+ if (p -> sync_fd != -1 ) {
15741546 struct out_req * req ;
15751547 struct command * aux_cmd = aux_command (cmd );
15761548
1577- io_new_conn (p , p -> rpc_conn -> fd , rpc_conn_init , p );
1549+ io_new_conn (p , p -> sync_fd , rpc_conn_init , p );
15781550 /* In case they intercept rpc_command, we can't do this sync. */
15791551 req = jsonrpc_request_start (aux_cmd , "listconfigs" ,
15801552 get_beglist , plugin_broken_cb , NULL );
15811553 json_add_string (req -> js , "config" , "i-promise-to-fix-broken-api-user" );
15821554 send_outreq (req );
15831555
15841556 /* We will open a new one if we want to be sync. */
1585- p -> rpc_conn -> fd = -1 ;
1557+ p -> sync_fd = -1 ;
15861558 }
15871559
15881560 return command_success (cmd , json_out_obj (cmd , NULL , NULL ));
@@ -2342,13 +2314,10 @@ static struct plugin *new_plugin(const tal_t *ctx,
23422314 p -> beglist = NULL ;
23432315
23442316 p -> desired_features = tal_steal (p , features );
2345- if (init_rpc ) {
2346- /* Sync RPC FIXME: maybe go full async ? */
2347- p -> rpc_conn = tal (p , struct rpc_conn );
2348- } else {
2349- p -> rpc_conn = NULL ;
2350- }
2351-
2317+ if (init_rpc )
2318+ p -> sync_io = jsonrpc_io_new (p );
2319+ else
2320+ p -> sync_io = NULL ;
23522321 p -> init = init ;
23532322 p -> manifested = p -> initialized = p -> exiting = false;
23542323 p -> restartability = restartability ;
0 commit comments