Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions doc/developer/grpc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,60 @@ To enable gRPC support one needs to add `--enable-grpc` when running
the gRPC module be loaded and which port to bind to. This can be done by adding
`-M grpc:<port>` to the daemon's CLI arguments.

Currently there is no gRPC "routing" so you will need to bind your gRPC
`channel` to the particular daemon's gRPC port to interact with that daemon's
gRPC northbound interface.
When gRPC is loaded directly into a protocol daemon, the gRPC northbound
interface is process-local: callbacks registered in that daemon can be invoked
from that daemon's gRPC port. When gRPC is loaded into ``mgmtd``,
``Get(CONFIG)`` is served from mgmtd's running datastore through the
northbound config-get dispatcher. Daemon-local gRPC keeps using the
process-local running configuration.

When gRPC is loaded into ``mgmtd``, ``Execute`` uses mgmtd's backend RPC
transaction machinery. The request is matched against the backend RPC xpath
registry, sent to the daemon that owns the RPC, and the backend reply is
returned to the gRPC client.

When gRPC is loaded into ``mgmtd``, ``Subscribe`` uses mgmtd's frontend
notification selector machinery. A gRPC subscriber registers its requested
YANG notification selectors as a virtual frontend subscription. Selectors must
resolve to a loaded northbound node, or to a module-root shorthand such as
``/frr-ripd``. Invalid selectors are rejected with ``INVALID_ARGUMENT``.
Backend notifications received by mgmtd are matched with the same selector
tree used for native frontend clients, then the already encoded notification
payload is written to the gRPC stream.

``Subscribe`` ``STREAM`` mode first reads the requested operational-state data
paths through the same northbound path used by local ``Get`` state requests.
It writes those snapshots as ``SubscribeResponse.update`` messages, sends a
``sync_response`` marker, then keeps the stream registered with mgmtd's
frontend selector machinery for later matching notifications.

``Subscribe`` ``SAMPLE`` mode uses the same operational-state read path as the
initial ``STREAM`` snapshot, but it does not register a notification selector.
The request supplies ``sample_interval_ms`` and the gRPC handler schedules an
FRR main-loop timer to enqueue a fresh state read at that cadence.

When ``heartbeat_interval_ms`` is non-zero, the handler schedules a second
main-loop timer. Any real update resets the heartbeat window; if the window
expires first, the handler enqueues an empty ``SubscribeResponse.heartbeat`` and
keeps the stream open.

The Subscribe writer keeps a bounded pending queue. If the client cannot drain
responses quickly enough, the handler unregisters any mgmtd selector, cancels
its timers and closes the stream with ``OUT_OF_RANGE`` rather than allowing
unbounded memory growth.

The ``Subscription`` object owns the mgmtd selector handle, timers and pending
response queue for a Subscribe stream. Normal stream completion releases that
object before issuing the gRPC ``Finish`` operation, because the completion
queue's final ``FINISH`` tag only deletes the RPC tag. Cancellation and failed
writes arrive on the gRPC completion-queue thread, so they schedule main-loop
cleanup before the RPC tag is deleted. During module shutdown, cleanup avoids
waiting on the main loop and leaves already queued completion tags to drain.

``Subscribe`` ``POLL`` is deliberately left unimplemented in this
server-streaming RPC. A client-driven poll needs a client-streaming or
bidirectional Subscribe shape so that the client can send poll requests after
the stream has opened.

The minimum version of gRPC known to work is 1.16.1.

Expand Down
41 changes: 36 additions & 5 deletions doc/user/grpc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Northbound gRPC
.. program:: configure

*gRPC* provides a combined front end to all FRR daemons using the YANG
northbound. It is currently disabled by default due its experimental
northbound. It is currently disabled by default due to its experimental
stage, but it can be enabled with :option:`--enable-grpc` option in the
configure script.

Expand All @@ -17,16 +17,40 @@ configure script.
Northbound gRPC Features
========================

* Get/set configuration using JSON/XML/XPath encodings.
* Execute YANG RPC calls.
* Get/set configuration using JSON/XML/XPath encodings. When gRPC is loaded
into ``mgmtd``, ``Get(CONFIG)`` reads from mgmtd's running datastore.
* Execute YANG RPC calls. When gRPC is loaded into ``mgmtd``, daemon-owned
RPCs are routed through mgmtd to the backend daemon that registered the RPC
xpath.
* Subscribe to YANG notifications with ``Subscribe`` in ``ON_CHANGE`` mode.
* Subscribe to operational-state snapshots with a ``sync_response`` marker
using ``STREAM`` mode.
* Subscribe to periodic operational-state reads using ``SAMPLE`` mode and
``sample_interval_ms``.
* Request heartbeat messages on quiet Subscribe streams using
``heartbeat_interval_ms``.
* Lock/unlock configuration.
* Create/edit/load/update/commit candidate configuration.
* List/get transactions.


.. note::

There is currently no support for YANG notifications.
``Subscribe`` currently supports ``ON_CHANGE`` notification delivery,
``STREAM`` operational-state snapshots, ``SAMPLE`` periodic reads and
optional heartbeats. The ``POLL`` mode is reserved for a future
client-streaming Subscribe RPC shape.

``ON_CHANGE`` selectors must resolve to a loaded YANG node, or to a
module-root shorthand such as ``/frr-ripd``. ``STREAM`` and ``SAMPLE``
paths are operational-state data paths. Invalid selectors or paths are
rejected with ``INVALID_ARGUMENT``.

Each ``Subscribe`` stream has a bounded pending response queue. If a
client falls behind that bound, FRR closes the stream with ``OUT_OF_RANGE``.
A client can reconnect, consume responses more quickly, or raise the
``subscribe-pending-limit`` module option when a larger burst buffer is
appropriate.


.. note::
Expand All @@ -45,6 +69,8 @@ Daemon gRPC Configuration
The *gRPC* module accepts the following run time option:

- ``port``: the port to listen to (defaults to ``50051``).
- ``subscribe-pending-limit``: optional maximum queued ``Subscribe``
responses per stream (defaults to ``128``).


.. note::
Expand All @@ -55,7 +81,12 @@ The *gRPC* module accepts the following run time option:

To configure FRR daemons to listen to gRPC you need to append the
following parameter to the daemon's command line: ``-M grpc``
(optionally ``-M grpc:PORT`` to specify listening port).
(optionally ``-M grpc:PORT`` to specify listening port, or
``-M grpc:PORT,SUBSCRIBE-PENDING-LIMIT`` to tune the queued ``Subscribe``
response bound).

For example, ``-M grpc:50051,128`` listens on port ``50051`` and bounds each
``Subscribe`` stream to ``128`` queued responses.

To do that in production you need to edit the ``/etc/frr/daemons`` file
so the daemons get started with the command line argument. Example:
Expand Down
79 changes: 79 additions & 0 deletions grpc/frr-northbound.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ service Northbound {

// Execute a YANG RPC.
rpc Execute(ExecuteRequest) returns (ExecuteResponse) {}

// Subscribe to YANG notifications and/or operational-state snapshots on a
// set of paths. The server keeps the stream open and pushes a
// SubscribeResponse for every event matching the subscription. Modes:
//
// ON_CHANGE -- forward notifications as they happen.
// STREAM -- send an initial snapshot of subscribed oper-state then
// remain registered for matching notifications. A
// SubscribeResponse with the sync_response field marks the
// end of the snapshot.
// SAMPLE -- periodically read subscribed oper-state at the
// sample_interval_ms cadence and push the result.
// POLL -- reserved for a future client-streaming Subscribe shape.
//
// Client cancellation closes the stream; server cleans up selectors.
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {}
}

// ----------------------- Parameters and return types -------------------------
Expand Down Expand Up @@ -387,6 +403,69 @@ message ExecuteResponse {
repeated PathValue output = 1;
}

//
// RPC: Subscribe()
//
message SubscribeRequest {
enum SubscriptionMode {
ON_CHANGE = 0;
STREAM = 1;
SAMPLE = 2;
POLL = 3;
}

// Subscription mode. See the Subscribe RPC documentation for semantics.
SubscriptionMode mode = 1;

// Wire encoding for SubscribeResponse update payloads.
Encoding encoding = 2;

// YANG xpaths the subscriber is interested in. ON_CHANGE accepts
// notification selectors that resolve to a loaded YANG node, or a
// module-root shorthand such as "/frr-ripd". STREAM and SAMPLE use
// operational-state data-tree paths. At least one path is required.
repeated string path = 3;

// SAMPLE-only: cadence in milliseconds. Server enforces a minimum
// (default 100 ms) and may reject sub-minimum values with
// INVALID_ARGUMENT.
uint32 sample_interval_ms = 4;

// Optional heartbeat interval in milliseconds. If non-zero the server
// emits an empty SubscribeResponse.heartbeat at this cadence whenever no
// update has flowed within the heartbeat window. Zero disables. Server
// enforces the same minimum interval as SAMPLE.
uint32 heartbeat_interval_ms = 5;
}

message SubscribeResponse {
// Return values:
// - grpc::StatusCode::OK: Stream ended normally.
// - grpc::StatusCode::INVALID_ARGUMENT: Bad selector, path, mode or
// interval.
// - grpc::StatusCode::OUT_OF_RANGE: Stream closed because the client
// cannot drain updates quickly enough.

oneof response {
// A notification or an oper-state update fragment.
DataTree update = 1;

// Marker emitted once at the end of the STREAM-mode initial snapshot.
SyncResponse sync_response = 2;

// Empty heartbeat tick (only when heartbeat_interval_ms > 0).
Heartbeat heartbeat = 3;
}
}

message SyncResponse {
// Empty. Carries no payload; presence is the signal.
}

message Heartbeat {
// Empty. Carries no payload; presence is the signal.
}

// -------------------------------- Definitions --------------------------------

// YANG module.
Expand Down
81 changes: 81 additions & 0 deletions lib/northbound.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ DEFINE_MTYPE_STATIC(LIB, NB_TRANS, "NB transaction");
/* Running configuration - shouldn't be modified directly. */
struct nb_config *running_config;

static nb_rpc_dispatch_cb nb_rpc_dispatcher;
static nb_rpc_dispatch_async_cb nb_rpc_dispatcher_async;
static nb_config_get_dispatch_cb nb_config_get_dispatcher;

/* Hash table of user pointers associated with configuration entries. */
static struct hash *running_config_entries;

Expand Down Expand Up @@ -1958,6 +1962,50 @@ int nb_callback_rpc(const struct nb_node *nb_node, const char *xpath,
return nb_node->cbs.rpc(&args);
}

void nb_rpc_dispatch_set(nb_rpc_dispatch_cb cb)
{
nb_rpc_dispatcher = cb;
}

void nb_rpc_dispatch_async_set(nb_rpc_dispatch_async_cb cb)
{
nb_rpc_dispatcher_async = cb;
}

int nb_rpc_dispatch(const char *xpath, const struct lyd_node *input,
struct lyd_node **output, char *errmsg, size_t errmsg_len)
{
if (!nb_rpc_dispatcher)
return -EOPNOTSUPP;

return nb_rpc_dispatcher(xpath, input, output, errmsg, errmsg_len);
}

int nb_rpc_dispatch_async(const char *xpath, const struct lyd_node *input,
nb_rpc_dispatch_done_cb done, void *arg, char *errmsg,
size_t errmsg_len)
{
if (!nb_rpc_dispatcher_async)
return -EOPNOTSUPP;

return nb_rpc_dispatcher_async(xpath, input, done, arg, errmsg,
errmsg_len);
}

void nb_config_get_dispatch_set(nb_config_get_dispatch_cb cb)
{
nb_config_get_dispatcher = cb;
}

int nb_config_get_dispatch(const char *xpath, struct lyd_node **result,
char *errmsg, size_t errmsg_len)
{
if (!nb_config_get_dispatcher)
return -EOPNOTSUPP;

return nb_config_get_dispatcher(xpath, result, errmsg, errmsg_len);
}

void nb_callback_notify(const struct nb_node *nb_node, uint8_t op, const char *xpath,
struct lyd_node *dnode)
{
Expand Down Expand Up @@ -2518,6 +2566,9 @@ int nb_notification_send(const char *xpath, struct list *arguments)
DEFINE_HOOK(nb_notification_tree_send,
(const char *xpath, const struct lyd_node *tree), (xpath, tree));

static nb_notification_data_subscribe_cb notification_data_subscribe_cb;
static nb_notification_data_unsubscribe_cb notification_data_unsubscribe_cb;

int nb_notification_tree_send(const char *xpath, const struct lyd_node *tree)
{
int ret;
Expand All @@ -2532,6 +2583,36 @@ int nb_notification_tree_send(const char *xpath, const struct lyd_node *tree)
return ret;
}

void nb_notification_data_subscribe_set(nb_notification_data_subscribe_cb cb)
{
notification_data_subscribe_cb = cb;
}

void nb_notification_data_unsubscribe_set(nb_notification_data_unsubscribe_cb cb)
{
notification_data_unsubscribe_cb = cb;
}

int nb_notification_data_subscribe(const char *const *selectors,
size_t selector_count, LYD_FORMAT format,
nb_notification_data_cb cb, void *arg,
void **handle, char *errmsg,
size_t errmsg_len)
{
if (!notification_data_subscribe_cb)
return -EOPNOTSUPP;

return notification_data_subscribe_cb(selectors, selector_count, format,
cb, arg, handle, errmsg,
errmsg_len);
}

void nb_notification_data_unsubscribe(void *handle)
{
if (notification_data_unsubscribe_cb)
notification_data_unsubscribe_cb(handle);
}

/* Running configuration user pointers management. */
struct nb_config_entry {
char xpath[XPATH_MAXLEN];
Expand Down
Loading
Loading