This document provides a detailed explanation of the Genesis Function RPC (Remote Procedure Call) system, primarily used for communication between Agents and Function providers (Services). The core design principle is dynamic discovery: callers (Agents) do not need prior knowledge of available functions, their names, or their specific parameters. All necessary information is discovered at runtime through DDS (Data Distribution Service) mechanisms.
sequenceDiagram
participant User
participant Agent as GenesisAgent
participant FReg as FunctionRegistry
participant LLM1 as Fast-LLM
participant LLM2 as Full-LLM
participant Calc as CalculatorService
User->>Agent: "What is 34 * (12 / 4)?"
Agent->>FReg: list()
FReg-->>Agent: 24 funcs
Agent->>LLM1: classify(request, funcs)
LLM1-->>Agent: [calc.multiply, calc.divide]
Agent->>LLM2: request + subset
LLM2-->>Agent: call(calc.divide, 12,4)
Agent->>Calc: RPC divide(12,4)
Calc-->>Agent: 3
Agent->>LLM2: result=3
LLM2-->>Agent: call(calc.multiply, 34,3)
Agent->>Calc: RPC multiply(34,3)
Calc-->>Agent: 102
Agent->>LLM2: result=102
LLM2-->>Agent: "The answer is 102."
Agent-->>User: 102
This example demonstrates:
- Dynamic Discovery: Agent queries FunctionRegistry for available functions
- Classification: Fast-LLM narrows down relevant functions from the full registry
- Tool Execution: Full-LLM uses classified functions to solve the problem
- RPC Communication: Agent makes RPC calls to CalculatorService
- Multi-step Reasoning: LLM chains multiple function calls to reach the answer
This system leverages RTI Connext DDS and its RPC capabilities (rti.rpc) but builds significant abstractions on top for function registration, discovery, monitoring, and standardized request/reply handling, particularly within the GenesisRPCService and EnhancedServiceBase classes.
Understanding this system is crucial for maintaining existing function services and for designing the new Interface-to-Agent RPC system, aiming to reuse its successful patterns while adhering to the desired communication hierarchy (Interface <-> Agent <-> Function).
These Python classes, decorated with @idl.struct, define the data structures exchanged over DDS for function RPC. They map directly to DDS types defined potentially in XML (datamodel.xml) but are used programmatically as Python classes.
-
Function: Represents the definition of a single function available for remote calls.name: str: The name of the function (e.g., "add", "get_weather").description: str: A human-readable description of what the function does.parameters: str: Crucially, this is a JSON string representing the JSON Schema definition of the function's expected parameters (following OpenAI's function/tool schema format). This schema is used by the Agent to understand how to format arguments and potentially by the service for validation.strict: bool: IfTrue, the service attempts to validate incoming arguments against theparametersschema.
-
Tool: An OpenAI-style container, currently always of type "function".type: str: Always "function".function: Function: Embeds theFunctiondefinition. Services registerToolobjects.
-
FunctionCall: Represents the details of a specific function invocation within a request.name: str: The name of the function to be called.arguments: str: A JSON string containing the actual arguments for the function call, formatted according to the schema defined in the correspondingFunction'sparameters.
-
FunctionRequest: The main structure sent by a caller (Agent) to invoke a function.id: str: A unique identifier for this specific request, often used for correlation (though RTI RPC handles correlation implicitly).type: str: Always "function".function: FunctionCall: Embeds theFunctionCalldetails specifying which function to call and with what arguments.
-
FunctionReply: The main structure sent back by the service (Function provider) to the caller (Agent).result_json: str: A JSON string containing the result of the function execution. Can be complex JSON.success: bool: Indicates whether the function executed successfully.error_message: str: IfsuccessisFalse, this contains a description of the error.
Implementation Note: Consistent use of these specific classes and their fields is vital. Arguments and results are always JSON strings within these structures. Parsing and serialization happen within the RPC service logic.
GenesisRPCService: The fundamental base class for creating RPC services that expose functions.-
__init__(self, service_name: str):- Initializes the DDS
DomainParticipant. - Creates an
rti.rpc.Replierinstance. This is the core DDS entity that listens for incoming requests and sends replies.request_type: Set byself.get_request_type(), defaults toFunctionRequest.reply_type: Set byself.get_reply_type(), defaults toFunctionReply.participant: The DDS participant.service_name: This name is used byrti.rpc.Replierprimarily to derive the default DDS Topic names for the request and reply types (e.g.,CalculatorServiceRequest,CalculatorServiceReply). While important for DDS discovery of the RPC endpoint itself, it's not used by callers to discover what functions the service provides. That happens via the separate Function Discovery mechanism (see Section 3.1). It serves mainly for debugging and identifying the RPC service endpoint within DDS tools.
- Initializes
self.functions: Dict[str, Dict[str, Any]] = {}: This dictionary is crucial. It maps registered function names (strings) to dictionaries containing their implementation ("implementation": Callable), their schema/description ("tool": Tool), and potentially other metadata.
- Initializes the DDS
-
get_request_type(self)/get_reply_type(self): Return the Python classes (FunctionRequest,FunctionReply) used for this RPC service. Allows potential overriding but is standard for function services. -
register_function(self, func, description, parameters, ...):- Takes the Python function (
func), its description, and its parameter schema (as a Pythondict). - Validates the parameter schema (
validate_schema). - Creates the
FunctionandToolobjects fromdatamodel.py. - Stores the function implementation and its
Tooldefinition in theself.functionsdictionary, keyed by the function's name (func.__name__).
- Takes the Python function (
-
async run(self):- The main asynchronous loop that handles incoming requests.
- Enters an infinite loop waiting for requests using
self.replier.receive_requests(...). This blocks until requests arrive or the timeout occurs. - Iterates through received
request_samples. - Extracts the
request = request_sample.data(which is an instance ofFunctionRequest) andrequest_info = request_sample.info(containing DDS metadata like the caller's identity). - Retrieves
function_nameandarguments_jsonfromrequest.function. - Looks up the
function_namein itsself.functionsdictionary. - If found:
- Retrieves the actual Python function implementation (
func = self.functions[function_name]["implementation"]). - Parses the
arguments_jsonstring into a Python dictionary (args_data). - (Optional) If
strict=Truein the function'sTooldefinition, validatesargs_dataagainst the schema stored in theTool. - Injects the
request_infointo the arguments dictionary (args_data["request_info"] = request_info). This allows the function implementation to know about the caller if needed (e.g., for monitoring). - Calls the actual function implementation:
result = func(**args_data). Handles both regular functions andasyncfunctions. - Serializes the
resultback into a JSON string (result_json). - Creates a
FunctionReplyinstance withsuccess=Trueand theresult_json.
- Retrieves the actual Python function implementation (
- If the function name is not found, or if argument parsing/validation/execution fails:
- Logs the error appropriately.
- Creates a
FunctionReplyinstance withsuccess=Falseand a descriptiveerror_message.
- Sends the
FunctionReplyback to the specific caller usingself.replier.send_reply(reply, request_sample.info). Therequest_sample.infoensures the reply is correlated to the original request by the underlying RTI RPC mechanism. - Includes
try...exceptblocks for robust error handling during the request processing lifecycle.
-
close(self): Cleans up DDS resources (Replier,Participant).
-
Implementation Note: The run method's logic for looking up functions in self.functions, parsing/validating arguments, calling the implementation, and handling results/errors is the core request-processing pipeline.
EnhancedServiceBase: Inherits fromGenesisRPCServiceand adds significant capabilities, primarily function discovery/advertisement and enhanced monitoring. Services likeCalculatorServicetypically inherit from this class.-
__init__(self, service_name, capabilities, participant=None, domain_id=0, registry: FunctionRegistry = None):- Calls
super().__init__(service_name). - Sets up DDS entities for monitoring (writers for
MonitoringEvent,ComponentLifecycleEvent,ChainEvent,LivelinessUpdate). UsesDynamicDataand types loaded from XML viadds.QosProvider. - Initializes
FunctionRegistry(fromgenesis_lib/function_discovery.py) if not provided. The registry is responsible for interacting with the DDS layer for advertising and discoveringFunctionCapabilitydata. - Creates and sets an
EnhancedFunctionCapabilityListeneron the registry's reader to react to discovery events (e.g., logging matches, potentially triggering actions when other services/functions appear). - Stores
self.app_guid(derived from the registry's DDS writer instance handle) for unique service identification in monitoring. - Calls
self._auto_register_decorated_functions()to automatically register methods decorated with@genesis_function.
- Calls
-
_auto_register_decorated_functions(self): Scans the class instance for methods decorated with@genesis_function(seegenesis_lib/decorators.py) and callsregister_enhanced_functionfor each. -
register_enhanced_function(self, func, description, parameters, ...):- A wrapper around the base class's
register_function. - Logs the enhanced registration process.
- Calls
self.function_wrapper(func_name)(func)to wrap the user's function implementation with monitoring logic before registering it. - Calls the base
self.register_functionwith the wrapped function.
- A wrapper around the base class's
-
function_wrapper(self, func_name): (Implemented as a decorator factory)- This is a key part of the enhanced monitoring. It returns a decorator that wraps the actual function implementation.
- The
wrapperfunction inside the decorator:- Extracts
request_infofrom the called function'skwargs. - Extracts call arguments (
call_data). - Publishes
ComponentLifecycleEvent(state -> BUSY) andChainEvent(type -> CALL_START) before calling the original function. Includes correlation IDs (chain_id,call_id). - Calls the original function implementation (
result = func(*args, **kwargs)). - Publishes
ChainEvent(type -> CALL_COMPLETE) andComponentLifecycleEvent(state -> READY) after the function returns successfully. - If the function raises an exception:
- Publishes
ChainEvent(type -> CALL_ERROR) andComponentLifecycleEvent(state -> DEGRADED). - Re-raises the exception so the base
runmethod can catch it and send an errorFunctionReply.
- Publishes
- Extracts
-
_publish_monitoring_event(...)/publish_component_lifecycle_event(...): Helper methods to construct and publish various DDS monitoring messages using the dedicated monitoring writers. They handle creatingDynamicData, populating fields (timestamps, IDs, states, metadata), and writing to the appropriate DDS Topic. -
_advertise_functions(self):- This method orchestrates the advertisement of the service's registered functions so that Agents can discover them. Crucially, this uses a separate mechanism (
FunctionCapabilityDDS Topic) from the RPC endpoint discovery. - Publishes initial monitoring events indicating the service is joining/initializing/discovering (
ComponentLifecycleEventwith categoriesNODE_DISCOVERY,AGENT_INIT). - Iterates through
self.functions. For each function:- Retrieves its schema, description, capabilities, etc.
- Publishes
ComponentLifecycleEvent(categoryNODE_DISCOVERY) for the function itself (using a unique ID for the function node). - Publishes
ComponentLifecycleEvent(categoryEDGE_DISCOVERY) to represent the link between the service application (self.app_guid) and the function node it hosts. - Calls
self.registry.register_function(...). This translates the function's details into aFunctionCapabilitystructure (defined ingenesis_lib/function_discovery.py, similar todatamodel.xmldefinition) and publishes it on theFunctionCapabilityDDS topic. This is the core advertisement step for discovery by Agents.
- After advertising all functions, publishes final monitoring events indicating the service is READY (
ComponentLifecycleEventcategoryAGENT_READY). - Sets
self._functions_advertised = True.
- This method orchestrates the advertisement of the service's registered functions so that Agents can discover them. Crucially, this uses a separate mechanism (
-
async run(self): Overrides the baserun. It ensures_advertise_functions()is called once before callingsuper().run()to start the request handling loop. Includesfinallyblock for cleanup. -
handle_function_discovery(...)/handle_function_removal(...): Methods intended to be called by theFunctionCapabilityListenerwhen other functions are discovered or removed from the network. Allows the service to react to changes in available functions elsewhere. -
close(self): Extends the basecloseto also close theFunctionRegistry.
-
Implementation Note: Services providing functions should inherit from EnhancedServiceBase, use the @genesis_function decorator (or call register_enhanced_function directly), and the base class handles advertisement and monitored execution.
While not directly part of the RPC call mechanism, this is essential for the dynamic nature of the system.
FunctionRegistry: Manages the advertisement and discovery of functions via DDS.- Creates DDS DataWriters for publishing
FunctionCapabilitydata. - Creates DDS DataReaders for discovering
FunctionCapabilitydata published by other services. - Provides
register_functionwhich takes function details and publishes them asFunctionCapability. - Provides ways to query discovered functions.
- Creates DDS DataWriters for publishing
FunctionCapability: The data structure (defined viadatamodel.xmland used withDynamicData) that describes a function's capabilities, including its name, description, provider ID (the service'sapp_guid), parameter schema (as a JSON string), and the crucialservice_name. Thisservice_namewithinFunctionCapabilitytells the Agent which RPC service endpoint (theservice_nameused when initializing theReplier/Requester) to target when it wants to call this specific function.FunctionCapabilityListener: A DDS listener attached to theFunctionCapabilityDataReader. Itson_data_availablemethod is triggered when new functions are discovered or existing ones update/disappear.EnhancedServiceBaseuses a custom version (EnhancedFunctionCapabilityListener) to triggerhandle_function_discovery/handle_function_removaland publish monitoring events.
Implementation Note: Agents use a FunctionRegistry (or similar discovery mechanism) to find functions. They read FunctionCapability samples from DDS. The parameter_schema tells the Agent how to structure the arguments JSON in the FunctionRequest, and the service_name field tells the Agent which RPC service endpoint to send the FunctionRequest to.
rti.rpc.Replier: Used by the service (GenesisRPCService/EnhancedServiceBase). Listens on DDS topics derived from theservice_nameandrequest_type. Receives requests, allows the application (runloop) to process them, and sends replies back usingsend_reply, ensuring correlation.rti.rpc.Requester: Used by the caller (Agent). Sends requests (FunctionRequest) usingsend_requesttargeted at a specificservice_name. Receives replies (FunctionReply) usingreceive_repliesorread_replies/take_replies. Handles request-reply correlation automatically using DDS mechanisms.
- A service (e.g.,
CalculatorService) inherits fromEnhancedServiceBase. EnhancedServiceBase.__init__is called, setting up DDS, monitoring, and theFunctionRegistry.- Functions are registered:
- Using
@genesis_functiondecorator on methods (preferred). - Or by explicitly calling
service.register_enhanced_function(method, description, params_schema).
- Using
- The decorator/
register_enhanced_functioncallsEnhancedServiceBase.function_wrapperto wrap the method with monitoring logic. - The wrapped function is registered with the base
GenesisRPCService.register_function, storing it inself.functions. - The service's
async run()method is called. EnhancedServiceBase.run()calls_advertise_functions()._advertise_functions():- Publishes initial "joining" monitoring events.
- Iterates through functions in
self.functions. - For each function, calls
self.registry.register_function(). registry.register_function()creates aFunctionCapabilityDDS sample containing the function's name, description, parameter schema (JSON string), provider ID (self.app_guid), and theservice_nameof the hosting RPC service.- The
FunctionCapabilitysample is published on the globally knownFunctionCapabilityDDS topic. - Monitoring events (
NODE_DISCOVERYfor the function,EDGE_DISCOVERYlinking function to service) are published. - Publishes final "ready" monitoring events.
EnhancedServiceBase.run()callssuper().run(), starting theGenesisRPCService.run()request loop. The service now listens forFunctionRequestmessages via itsReplier.
- An Agent initializes its own DDS Participant and a
FunctionRegistryinstance (or equivalent discovery logic). - The Agent's
FunctionRegistrycreates a DDS DataReader for theFunctionCapabilitytopic with a Listener (FunctionCapabilityListener). - As services advertise their functions (Step 3.1.8), the Agent's Listener receives
FunctionCapabilityDDS samples. - The Listener's
on_data_availablecallback processes these samples. - The Agent now knows:
- A function named
Xexists. - It's described by
description. - It expects parameters defined by the JSON schema in
parameter_schema. - It is provided by the service instance identified by
provider_id. - To call it, send a
FunctionRequestto the RPC endpoint identified byservice_name(extracted from theFunctionCapabilitysample).
- A function named
- The Agent typically stores this discovered function information locally (e.g., in a dictionary mapping function names to their capabilities and target service name).
- The Agent decides to call a discovered function (e.g., "add").
- It retrieves the function's details (parameter schema, target
service_name) from its discovered function cache. - It constructs the arguments dictionary (e.g.,
{'a': 1, 'b': 2}) according to the discoveredparameter_schema. - It serializes the arguments dictionary into a JSON string:
'{"a": 1, "b": 2}'. - It creates a
FunctionCallinstance:FunctionCall(name="add", arguments='{"a": 1, "b": 2}'). - It creates a
FunctionRequestinstance, embedding theFunctionCall:FunctionRequest(id=unique_id, type="function", function=call_details). - The Agent uses an
rti.rpc.Requesterinstance, configured to communicate with the targetservice_name(discovered in Step 3.2.5). - The Agent sends the request:
request_id = requester.send_request(function_request_object). - The request travels via DDS to the target service's
Replier. - The Service's
GenesisRPCService.run()loop receives the request viareplier.receive_requests(). - The
runloop looks up "add" inself.functions. - It retrieves the wrapped implementation function.
- It parses the
argumentsJSON string back into a dictionary. - It calls the wrapper function (created by
EnhancedServiceBase.function_wrapper). - The wrapper publishes "CALL_START" monitoring events.
- The wrapper calls the original
add(a=1, b=2, request_info=...)implementation. - The
addfunction executes and returns the result (e.g.,3). - The wrapper receives the result.
- The wrapper publishes "CALL_COMPLETE" monitoring events.
- The wrapper returns the result (
3) to therunloop. - The
runloop serializes the result into a JSON string:'3'(or'{"result": 3}'depending on formatting). - It creates a
FunctionReply:FunctionReply(result_json='3', success=True, error_message=""). - It sends the reply back using
replier.send_reply(reply, request_sample.info). - The Agent, waiting for a reply on its
Requester(e.g., usingreplies = requester.receive_replies(related_request_id=request_id)), receives theFunctionReplysample. - The Agent extracts the
FunctionReplyobject. - It checks
reply.success. - If successful, it parses
reply.result_jsonto get the final result (e.g.,json.loads('3') -> 3).
- Dynamic Discovery is Paramount: Agents MUST rely solely on discovered
FunctionCapabilitydata. They should not have hardcoded knowledge of function names, parameter structures, or target service names. The discovery mechanism provides all necessary runtime information. - Separation of Concerns:
datamodel.py: Defines the data being exchanged. Stable and fundamental.rpc_service.py: Provides the basic RPC mechanics (listening, routing calls based on name, argument parsing, result serialization, basic error handling).enhanced_service_base.py: Adds advanced features on top (monitoring, advertisement viaFunctionRegistry, function wrapping).function_discovery.py: Handles the DDS interactions for advertising and discovering theFunctionCapabilitydata.rti.rpc: The underlying DDS transport for request/reply.
- Role of
service_name: Used byrti.rpc.Replierandrti.rpc.Requesterto establish the underlying DDS communication channel (Topics). It identifies the RPC endpoint. Agents discover whichservice_nameendpoint hosts a specific function by reading theservice_namefield within the discoveredFunctionCapabilitydata for that function. - JSON Everywhere: Arguments (
FunctionCall.arguments) and results (FunctionReply.result_json) are transported as JSON strings within the RPC data structures. Services need to parse incoming arguments from JSON; callers need to parse results from JSON. Schemas (Function.parameters) are also JSON strings (representing JSON Schema). - Class Structures are Key:
- Always use the classes defined in
genesis_lib/datamodel.py(FunctionRequest,FunctionReply, etc.) when constructing requests/replies. - Function implementations receive arguments as standard Python types after the
runloop parses the JSON. They should return standard Python objects; therunloop handles serializing the return value to JSON for theFunctionReply. request_infois injected into the function callkwargsbyGenesisRPCService.runif the function implementation needs DDS-level info about the caller.
- Always use the classes defined in
- Monitoring:
EnhancedServiceBaseprovides robust monitoring via its function wrapper and lifecycle event publishing. This is crucial for observing system behavior. Chain and call IDs help correlate events across different components. - Error Handling: The
runloop inGenesisRPCServiceincludes broad exception handling. Errors during argument parsing, validation, or function execution result in aFunctionReplywithsuccess=Falseand anerror_message. Thefunction_wrapperinEnhancedServiceBaseadds error-specific monitoring events.
The Genesis Function RPC system provides a robust, dynamic, and monitored way for Agents to interact with Function services without requiring compile-time knowledge. It achieves this through:
- Clear data models (
datamodel.py). - A layered service implementation (
rpc_service.py,enhanced_service_base.py). - A dedicated DDS-based discovery mechanism (
FunctionCapability,FunctionRegistry). - Leveraging the underlying RTI Connext RPC primitives (
rti.rpc.Replier,rti.rpc.Requester).
When designing the new Interface-to-Agent RPC system, consider adopting similar patterns:
- Define clear
AgentRequest/AgentReplydata structures (likely indatamodel.pyor a new file). - Use a base RPC class structure (perhaps adapting
GenesisRPCServiceor creating a parallel hierarchy) for handling the DDSReplier/Requesterinteractions. - Implement a discovery mechanism (if needed beyond simple service name matching) for Agents to advertise their capabilities or specific endpoints.
- Integrate monitoring using patterns from
EnhancedServiceBase(wrappers, lifecycle events). - Strictly enforce the communication hierarchy (Interfaces only talk to Agents). This might involve checks within the RPC layers or discovery mechanisms.
- Pay close attention to data serialization (likely JSON) within the request/reply structures.
(c) 2025 Copyright, Real-Time Innovations, Inc. (RTI) All rights reserved.
RTI grants Licensee a license to use, modify, compile, and create derivative works of the Software. Licensee has the right to distribute object form only for use with RTI products. The Software is provided "as is", with no warranty of any type, including any warranty for fitness for any purpose. RTI is under no obligation to maintain or support the Software. RTI shall not be liable for any incidental or consequential damages arising out of the use or inability to use the software.