Skip to content

Commit 2b325bd

Browse files
committed
mgmtd, grpc: add Get, Execute and Subscribe support
Route gRPC Get(CONFIG) requests loaded in mgmtd through mgmtd's running datastore so the mgmtd gRPC endpoint returns central configuration rather than daemon-local process config. Route gRPC Execute requests loaded in mgmtd through the backend RPC transaction machinery so daemon-owned YANG RPCs can be reached from the mgmtd gRPC endpoint. Add the Subscribe RPC wire shape and implement ON_CHANGE notification delivery through mgmtd's frontend selector tree. STREAM sends local operational-state snapshots before registering for matching notifications, SAMPLE performs periodic local state reads, and quiet streams can emit optional heartbeats. Keep Subscribe streams bounded by closing slow consumers with OUT_OF_RANGE, and clean up subscriptions on normal FINISH as well as cancellation. Add focused topotests for mgmtd Get(CONFIG), mgmtd Execute dispatch, RIPD notification delivery, selector matching, validation errors, heartbeats, STREAM snapshots, SAMPLE periodic reads and Subscribe back-pressure. Signed-off-by: Eric Parsonage <eric@eparsonage.com>
1 parent 6412641 commit 2b325bd

23 files changed

Lines changed: 2519 additions & 70 deletions

doc/developer/grpc.rst

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,60 @@ To enable gRPC support one needs to add `--enable-grpc` when running
99
the gRPC module be loaded and which port to bind to. This can be done by adding
1010
`-M grpc:<port>` to the daemon's CLI arguments.
1111

12-
Currently there is no gRPC "routing" so you will need to bind your gRPC
13-
`channel` to the particular daemon's gRPC port to interact with that daemon's
14-
gRPC northbound interface.
12+
When gRPC is loaded directly into a protocol daemon, the gRPC northbound
13+
interface is process-local: callbacks registered in that daemon can be invoked
14+
from that daemon's gRPC port. When gRPC is loaded into ``mgmtd``,
15+
``Get(CONFIG)`` is served from mgmtd's running datastore through the
16+
northbound config-get dispatcher. Daemon-local gRPC keeps using the
17+
process-local running configuration.
18+
19+
When gRPC is loaded into ``mgmtd``, ``Execute`` uses mgmtd's backend RPC
20+
transaction machinery. The request is matched against the backend RPC xpath
21+
registry, sent to the daemon that owns the RPC, and the backend reply is
22+
returned to the gRPC client.
23+
24+
When gRPC is loaded into ``mgmtd``, ``Subscribe`` uses mgmtd's frontend
25+
notification selector machinery. A gRPC subscriber registers its requested
26+
YANG notification selectors as a virtual frontend subscription. Selectors must
27+
resolve to a loaded northbound node, or to a module-root shorthand such as
28+
``/frr-ripd``. Invalid selectors are rejected with ``INVALID_ARGUMENT``.
29+
Backend notifications received by mgmtd are matched with the same selector
30+
tree used for native frontend clients, then the already encoded notification
31+
payload is written to the gRPC stream.
32+
33+
``Subscribe`` ``STREAM`` mode first reads the requested operational-state data
34+
paths through the same northbound path used by local ``Get`` state requests.
35+
It writes those snapshots as ``SubscribeResponse.update`` messages, sends a
36+
``sync_response`` marker, then keeps the stream registered with mgmtd's
37+
frontend selector machinery for later matching notifications.
38+
39+
``Subscribe`` ``SAMPLE`` mode uses the same operational-state read path as the
40+
initial ``STREAM`` snapshot, but it does not register a notification selector.
41+
The request supplies ``sample_interval_ms`` and the gRPC handler schedules an
42+
FRR main-loop timer to enqueue a fresh state read at that cadence.
43+
44+
When ``heartbeat_interval_ms`` is non-zero, the handler schedules a second
45+
main-loop timer. Any real update resets the heartbeat window; if the window
46+
expires first, the handler enqueues an empty ``SubscribeResponse.heartbeat`` and
47+
keeps the stream open.
48+
49+
The Subscribe writer keeps a bounded pending queue. If the client cannot drain
50+
responses quickly enough, the handler unregisters any mgmtd selector, cancels
51+
its timers and closes the stream with ``OUT_OF_RANGE`` rather than allowing
52+
unbounded memory growth.
53+
54+
The ``Subscription`` object owns the mgmtd selector handle, timers and pending
55+
response queue for a Subscribe stream. Normal stream completion releases that
56+
object before issuing the gRPC ``Finish`` operation, because the completion
57+
queue's final ``FINISH`` tag only deletes the RPC tag. Cancellation and failed
58+
writes arrive on the gRPC completion-queue thread, so they schedule main-loop
59+
cleanup before the RPC tag is deleted. During module shutdown, cleanup avoids
60+
waiting on the main loop and leaves already queued completion tags to drain.
61+
62+
``Subscribe`` ``POLL`` is deliberately left unimplemented in this
63+
server-streaming RPC. A client-driven poll needs a client-streaming or
64+
bidirectional Subscribe shape so that the client can send poll requests after
65+
the stream has opened.
1566

1667
The minimum version of gRPC known to work is 1.16.1.
1768

doc/user/grpc.rst

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Northbound gRPC
77
.. program:: configure
88

99
*gRPC* provides a combined front end to all FRR daemons using the YANG
10-
northbound. It is currently disabled by default due its experimental
10+
northbound. It is currently disabled by default due to its experimental
1111
stage, but it can be enabled with :option:`--enable-grpc` option in the
1212
configure script.
1313

@@ -17,16 +17,40 @@ configure script.
1717
Northbound gRPC Features
1818
========================
1919

20-
* Get/set configuration using JSON/XML/XPath encodings.
21-
* Execute YANG RPC calls.
20+
* Get/set configuration using JSON/XML/XPath encodings. When gRPC is loaded
21+
into ``mgmtd``, ``Get(CONFIG)`` reads from mgmtd's running datastore.
22+
* Execute YANG RPC calls. When gRPC is loaded into ``mgmtd``, daemon-owned
23+
RPCs are routed through mgmtd to the backend daemon that registered the RPC
24+
xpath.
25+
* Subscribe to YANG notifications with ``Subscribe`` in ``ON_CHANGE`` mode.
26+
* Subscribe to operational-state snapshots with a ``sync_response`` marker
27+
using ``STREAM`` mode.
28+
* Subscribe to periodic operational-state reads using ``SAMPLE`` mode and
29+
``sample_interval_ms``.
30+
* Request heartbeat messages on quiet Subscribe streams using
31+
``heartbeat_interval_ms``.
2232
* Lock/unlock configuration.
2333
* Create/edit/load/update/commit candidate configuration.
2434
* List/get transactions.
2535

2636

2737
.. note::
2838

29-
There is currently no support for YANG notifications.
39+
``Subscribe`` currently supports ``ON_CHANGE`` notification delivery,
40+
``STREAM`` operational-state snapshots, ``SAMPLE`` periodic reads and
41+
optional heartbeats. The ``POLL`` mode is reserved for a future
42+
client-streaming Subscribe RPC shape.
43+
44+
``ON_CHANGE`` selectors must resolve to a loaded YANG node, or to a
45+
module-root shorthand such as ``/frr-ripd``. ``STREAM`` and ``SAMPLE``
46+
paths are operational-state data paths. Invalid selectors or paths are
47+
rejected with ``INVALID_ARGUMENT``.
48+
49+
Each ``Subscribe`` stream has a bounded pending response queue. If a
50+
client falls behind that bound, FRR closes the stream with ``OUT_OF_RANGE``.
51+
A client can reconnect, consume responses more quickly, or raise the
52+
``subscribe-pending-limit`` module option when a larger burst buffer is
53+
appropriate.
3054

3155

3256
.. note::
@@ -45,6 +69,8 @@ Daemon gRPC Configuration
4569
The *gRPC* module accepts the following run time option:
4670

4771
- ``port``: the port to listen to (defaults to ``50051``).
72+
- ``subscribe-pending-limit``: optional maximum queued ``Subscribe``
73+
responses per stream (defaults to ``128``).
4874

4975

5076
.. note::
@@ -55,7 +81,12 @@ The *gRPC* module accepts the following run time option:
5581

5682
To configure FRR daemons to listen to gRPC you need to append the
5783
following parameter to the daemon's command line: ``-M grpc``
58-
(optionally ``-M grpc:PORT`` to specify listening port).
84+
(optionally ``-M grpc:PORT`` to specify listening port, or
85+
``-M grpc:PORT,SUBSCRIBE-PENDING-LIMIT`` to tune the queued ``Subscribe``
86+
response bound).
87+
88+
For example, ``-M grpc:50051,128`` listens on port ``50051`` and bounds each
89+
``Subscribe`` stream to ``128`` queued responses.
5990

6091
To do that in production you need to edit the ``/etc/frr/daemons`` file
6192
so the daemons get started with the command line argument. Example:

grpc/frr-northbound.proto

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@ service Northbound {
7676

7777
// Execute a YANG RPC.
7878
rpc Execute(ExecuteRequest) returns (ExecuteResponse) {}
79+
80+
// Subscribe to YANG notifications and/or operational-state snapshots on a
81+
// set of paths. The server keeps the stream open and pushes a
82+
// SubscribeResponse for every event matching the subscription. Modes:
83+
//
84+
// ON_CHANGE -- forward notifications as they happen.
85+
// STREAM -- send an initial snapshot of subscribed oper-state then
86+
// remain registered for matching notifications. A
87+
// SubscribeResponse with the sync_response field marks the
88+
// end of the snapshot.
89+
// SAMPLE -- periodically read subscribed oper-state at the
90+
// sample_interval_ms cadence and push the result.
91+
// POLL -- reserved for a future client-streaming Subscribe shape.
92+
//
93+
// Client cancellation closes the stream; server cleans up selectors.
94+
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {}
7995
}
8096

8197
// ----------------------- Parameters and return types -------------------------
@@ -387,6 +403,69 @@ message ExecuteResponse {
387403
repeated PathValue output = 1;
388404
}
389405

406+
//
407+
// RPC: Subscribe()
408+
//
409+
message SubscribeRequest {
410+
enum SubscriptionMode {
411+
ON_CHANGE = 0;
412+
STREAM = 1;
413+
SAMPLE = 2;
414+
POLL = 3;
415+
}
416+
417+
// Subscription mode. See the Subscribe RPC documentation for semantics.
418+
SubscriptionMode mode = 1;
419+
420+
// Wire encoding for SubscribeResponse update payloads.
421+
Encoding encoding = 2;
422+
423+
// YANG xpaths the subscriber is interested in. ON_CHANGE accepts
424+
// notification selectors that resolve to a loaded YANG node, or a
425+
// module-root shorthand such as "/frr-ripd". STREAM and SAMPLE use
426+
// operational-state data-tree paths. At least one path is required.
427+
repeated string path = 3;
428+
429+
// SAMPLE-only: cadence in milliseconds. Server enforces a minimum
430+
// (default 100 ms) and may reject sub-minimum values with
431+
// INVALID_ARGUMENT.
432+
uint32 sample_interval_ms = 4;
433+
434+
// Optional heartbeat interval in milliseconds. If non-zero the server
435+
// emits an empty SubscribeResponse.heartbeat at this cadence whenever no
436+
// update has flowed within the heartbeat window. Zero disables. Server
437+
// enforces the same minimum interval as SAMPLE.
438+
uint32 heartbeat_interval_ms = 5;
439+
}
440+
441+
message SubscribeResponse {
442+
// Return values:
443+
// - grpc::StatusCode::OK: Stream ended normally.
444+
// - grpc::StatusCode::INVALID_ARGUMENT: Bad selector, path, mode or
445+
// interval.
446+
// - grpc::StatusCode::OUT_OF_RANGE: Stream closed because the client
447+
// cannot drain updates quickly enough.
448+
449+
oneof response {
450+
// A notification or an oper-state update fragment.
451+
DataTree update = 1;
452+
453+
// Marker emitted once at the end of the STREAM-mode initial snapshot.
454+
SyncResponse sync_response = 2;
455+
456+
// Empty heartbeat tick (only when heartbeat_interval_ms > 0).
457+
Heartbeat heartbeat = 3;
458+
}
459+
}
460+
461+
message SyncResponse {
462+
// Empty. Carries no payload; presence is the signal.
463+
}
464+
465+
message Heartbeat {
466+
// Empty. Carries no payload; presence is the signal.
467+
}
468+
390469
// -------------------------------- Definitions --------------------------------
391470

392471
// YANG module.

lib/northbound.c

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ DEFINE_MTYPE_STATIC(LIB, NB_TRANS, "NB transaction");
2929
/* Running configuration - shouldn't be modified directly. */
3030
struct nb_config *running_config;
3131

32+
static nb_rpc_dispatch_cb nb_rpc_dispatcher;
33+
static nb_rpc_dispatch_async_cb nb_rpc_dispatcher_async;
34+
static nb_config_get_dispatch_cb nb_config_get_dispatcher;
35+
3236
/* Hash table of user pointers associated with configuration entries. */
3337
static struct hash *running_config_entries;
3438

@@ -1958,6 +1962,50 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath,
19581962
return nb_node->cbs.rpc(&args);
19591963
}
19601964

1965+
void nb_rpc_dispatch_set(nb_rpc_dispatch_cb cb)
1966+
{
1967+
nb_rpc_dispatcher = cb;
1968+
}
1969+
1970+
void nb_rpc_dispatch_async_set(nb_rpc_dispatch_async_cb cb)
1971+
{
1972+
nb_rpc_dispatcher_async = cb;
1973+
}
1974+
1975+
int nb_rpc_dispatch(const char *xpath, const struct lyd_node *input,
1976+
struct lyd_node **output, char *errmsg, size_t errmsg_len)
1977+
{
1978+
if (!nb_rpc_dispatcher)
1979+
return -EOPNOTSUPP;
1980+
1981+
return nb_rpc_dispatcher(xpath, input, output, errmsg, errmsg_len);
1982+
}
1983+
1984+
int nb_rpc_dispatch_async(const char *xpath, const struct lyd_node *input,
1985+
nb_rpc_dispatch_done_cb done, void *arg, char *errmsg,
1986+
size_t errmsg_len)
1987+
{
1988+
if (!nb_rpc_dispatcher_async)
1989+
return -EOPNOTSUPP;
1990+
1991+
return nb_rpc_dispatcher_async(xpath, input, done, arg, errmsg,
1992+
errmsg_len);
1993+
}
1994+
1995+
void nb_config_get_dispatch_set(nb_config_get_dispatch_cb cb)
1996+
{
1997+
nb_config_get_dispatcher = cb;
1998+
}
1999+
2000+
int nb_config_get_dispatch(const char *xpath, struct lyd_node **result,
2001+
char *errmsg, size_t errmsg_len)
2002+
{
2003+
if (!nb_config_get_dispatcher)
2004+
return -EOPNOTSUPP;
2005+
2006+
return nb_config_get_dispatcher(xpath, result, errmsg, errmsg_len);
2007+
}
2008+
19612009
void nb_callback_notify(const struct nb_node *nb_node, uint8_t op, const char *xpath,
19622010
struct lyd_node *dnode)
19632011
{
@@ -2518,6 +2566,9 @@ int nb_notification_send(const char *xpath, struct list *arguments)
25182566
DEFINE_HOOK(nb_notification_tree_send,
25192567
(const char *xpath, const struct lyd_node *tree), (xpath, tree));
25202568

2569+
static nb_notification_data_subscribe_cb notification_data_subscribe_cb;
2570+
static nb_notification_data_unsubscribe_cb notification_data_unsubscribe_cb;
2571+
25212572
int nb_notification_tree_send(const char *xpath, const struct lyd_node *tree)
25222573
{
25232574
int ret;
@@ -2532,6 +2583,36 @@ int nb_notification_tree_send(const char *xpath, const struct lyd_node *tree)
25322583
return ret;
25332584
}
25342585

2586+
void nb_notification_data_subscribe_set(nb_notification_data_subscribe_cb cb)
2587+
{
2588+
notification_data_subscribe_cb = cb;
2589+
}
2590+
2591+
void nb_notification_data_unsubscribe_set(nb_notification_data_unsubscribe_cb cb)
2592+
{
2593+
notification_data_unsubscribe_cb = cb;
2594+
}
2595+
2596+
int nb_notification_data_subscribe(const char *const *selectors,
2597+
size_t selector_count, LYD_FORMAT format,
2598+
nb_notification_data_cb cb, void *arg,
2599+
void **handle, char *errmsg,
2600+
size_t errmsg_len)
2601+
{
2602+
if (!notification_data_subscribe_cb)
2603+
return -EOPNOTSUPP;
2604+
2605+
return notification_data_subscribe_cb(selectors, selector_count, format,
2606+
cb, arg, handle, errmsg,
2607+
errmsg_len);
2608+
}
2609+
2610+
void nb_notification_data_unsubscribe(void *handle)
2611+
{
2612+
if (notification_data_unsubscribe_cb)
2613+
notification_data_unsubscribe_cb(handle);
2614+
}
2615+
25352616
/* Running configuration user pointers management. */
25362617
struct nb_config_entry {
25372618
char xpath[XPATH_MAXLEN];

0 commit comments

Comments
 (0)