SDP library is implementing IUDFAgent
for communicating with Kapacitor via Kapacitor's UDF RPC protocol. UDFs use
socket based approach
to communicate with Kapacitor, but UDFAgent is ready to work with child
process based approach if needed.
We use uvw library, which is libuv wrapper, in order to provide asynchronous I/O. It also allows to write code in event-based approach.
Every UDF requires separate
RequestHandler
interface implementation -- it handles incoming RPC calls from Kapacitor and
sends responses back via IUDFAgent.
As long as we are using Apache Arrow library for data processing we need to
store, handle data and convert it between Kapacitor's points and
Arrow's RecordBatches formats. This functionality is provided by
corresponding objects:
IPointsStoragefor storing;RecordBatchHandlerfrom the main part of the library for data handling;PointsConverterfor data converting.
Therefore, the most important part of every UDF is a
RecordBatchHandler that is doing all useful work. See the
full list of currently implemented
RecordBatchHandlers.
If you want to write your own UDF using SDP library you could follow the
basic steps. As example, you can refer to
streamAggregateUDF's
and batchAggregateUDF's
RequestHandeler implementations (items 3, 4, 6) and to the
udf_agent_client_factory.h
file (item 7).
- Check out if your UDF is expressed through one of
RecordBatchHandlers or their composition. If it's not then implement newRecordBatchHandlerthat has minimal missing functionality for your UDF. - Decide which type does your UDF have: does it consume stream or batch
data? Choose appropriate base class for your UDF's
RequestHandler: for wants-batch you should probably useBatchRecordBatchRequestHandlerBase, for wants-stream one ofStreamRecordBatchRequestHandlerBaseandTimerRecordBatchRequestHandlerBaseshould be suitable for you. - Decide which parameters your UDF should have in terms of Kapacitor RPC
Protocol. Implement
RequestHandler::infomethod according to UDF's parameters and type. - Create instrument for parsing UDF's parameters from init RPC message.
- If you need additional pre-processing for converting Kapacitor's points to
Arrow's
RecordBatches, you can implement your ownPointsConverterdecorator. - Implement
RequestHandler::initmethod. There you should:- use your parameters parser from item 3 to parse
agent::InitMessageRPC message; - create
RecordBatchHandlerfrom item 1 according to your UDF's functionality; - create
PointsConverterwith possibly implemented decorator from item 5; - create
PointsStorageinjecting just createdRecordBatchHandlerandPointsConverterand set it usingsetPointsStorageprotected method.
- use your parameters parser from item 3 to parse
- Implement
UnixSocketClientFactorythat will be creatingAgentClients withUDFAgentand just implementedRequestHandlerin it. Use this factory to generate connections toUnixSocketServer.