|
| 1 | +""" |
| 2 | +ACS middleware for the NVIDIA Agent Toolkit (NAT / NeMo Agent Toolkit). |
| 3 | +
|
| 4 | +Wires NAT's Middleware abstraction to an ACS Guardian. Intercepts every |
| 5 | +function (tool / sub-workflow / LLM / etc.) call configured to use this |
| 6 | +middleware, sends an ACS JSON-RPC request to the Guardian, and applies |
| 7 | +the verdict to NAT's invocation context. |
| 8 | +
|
| 9 | +Schema source: NAT public repo `packages/nvidia_nat_core/src/nat/middleware/`. |
| 10 | +
|
| 11 | +Requires: |
| 12 | + pip install nvidia-nat-core |
| 13 | + (and nvidia-nat-security if you also want to register alongside NAT's |
| 14 | + defense middleware suite) |
| 15 | +
|
| 16 | +Compatibility: |
| 17 | + - nvidia-nat-core >= 1.7 (public release). Block via raising |
| 18 | + ACSGuardianDenied; modify via setting context.modified_kwargs / output. |
| 19 | + - Future versions that expose InvocationAction.SKIP are also supported: |
| 20 | + if the symbol is importable, the adapter sets context.action instead |
| 21 | + of raising, which produces cleaner traces. |
| 22 | +
|
| 23 | +Usage in NAT YAML: |
| 24 | +
|
| 25 | + middleware: |
| 26 | + acs_guardian: |
| 27 | + _type: acs_middleware |
| 28 | + guardian_url: http://127.0.0.1:8787/acs |
| 29 | + target_function_or_group: <tool-or-group-or-workflow-name> |
| 30 | + default_deny: true |
| 31 | +
|
| 32 | + function_groups: |
| 33 | + my_tools: |
| 34 | + middleware: [acs_guardian] |
| 35 | +
|
| 36 | + workflow: |
| 37 | + _type: react_agent |
| 38 | + middleware: [acs_guardian] |
| 39 | +""" |
| 40 | +from __future__ import annotations |
| 41 | + |
| 42 | +import json |
| 43 | +import os |
| 44 | +import time |
| 45 | +import urllib.error |
| 46 | +import urllib.request |
| 47 | +import uuid |
| 48 | +from typing import Any, Optional |
| 49 | + |
| 50 | +try: |
| 51 | + from nat.middleware.function_middleware import FunctionMiddleware |
| 52 | + from nat.middleware.middleware import InvocationContext |
| 53 | + from nat.data_models.middleware import FunctionMiddlewareBaseConfig |
| 54 | + _NAT_AVAILABLE = True |
| 55 | +except ImportError: |
| 56 | + FunctionMiddleware = object # type: ignore[assignment, misc] |
| 57 | + InvocationContext = Any # type: ignore[assignment, misc] |
| 58 | + FunctionMiddlewareBaseConfig = object # type: ignore[assignment, misc] |
| 59 | + _NAT_AVAILABLE = False |
| 60 | + |
| 61 | +# InvocationAction.SKIP is on the dev branch; not in NAT 1.7.0 release. |
| 62 | +try: |
| 63 | + from nat.middleware.middleware import InvocationAction # type: ignore[attr-defined] |
| 64 | + _HAS_INVOCATION_ACTION = True |
| 65 | +except (ImportError, AttributeError): |
| 66 | + InvocationAction = None # type: ignore[assignment] |
| 67 | + _HAS_INVOCATION_ACTION = False |
| 68 | + |
| 69 | +try: |
| 70 | + from nat.cli.register_workflow import register_middleware |
| 71 | + _HAS_REGISTRATION = True |
| 72 | +except ImportError: |
| 73 | + register_middleware = None # type: ignore[assignment] |
| 74 | + _HAS_REGISTRATION = False |
| 75 | + |
| 76 | +try: |
| 77 | + from pydantic import BaseModel, Field |
| 78 | +except ImportError: |
| 79 | + BaseModel = object # type: ignore[assignment, misc] |
| 80 | + Field = lambda **kw: None # type: ignore[assignment, misc] |
| 81 | + |
| 82 | + |
| 83 | +ACS_VERSION = "0.1.0" |
| 84 | + |
| 85 | + |
| 86 | +class ACSGuardianDenied(Exception): |
| 87 | + """Raised by the ACS middleware to block a function call. |
| 88 | +
|
| 89 | + NAT's documented blocking mechanism is to raise from pre_invoke (the |
| 90 | + docstring: "Raises: Any exception to abort execution"). This custom |
| 91 | + exception type lets observers and tests distinguish a policy-driven |
| 92 | + block from unrelated errors. |
| 93 | + """ |
| 94 | + |
| 95 | + |
| 96 | +# ----- Config ----- |
| 97 | + |
| 98 | +if _NAT_AVAILABLE: |
| 99 | + |
| 100 | + class ACSMiddlewareConfig(FunctionMiddlewareBaseConfig, name="acs_guardian"): # type: ignore[misc, valid-type, call-arg] |
| 101 | + """Config schema for the ACS NAT middleware. |
| 102 | +
|
| 103 | + Registered with NAT under `_type: acs_guardian` (the `name=` class kwarg |
| 104 | + is NAT's TypedBaseModel registration mechanism — see |
| 105 | + `nat/data_models/common.py`). |
| 106 | + """ |
| 107 | + guardian_url: str = Field( |
| 108 | + default="http://127.0.0.1:8787/acs", |
| 109 | + description="ACS Guardian endpoint to POST requests to.", |
| 110 | + ) |
| 111 | + default_deny: bool = Field( |
| 112 | + default=True, |
| 113 | + description="Block the call when the Guardian is unreachable or returns malformed responses.", |
| 114 | + ) |
| 115 | + session_id: Optional[str] = Field( |
| 116 | + default=None, |
| 117 | + description="Session id sent on every request. Auto-generated per-process if absent.", |
| 118 | + ) |
| 119 | + timeout_s: float = Field( |
| 120 | + default=5.0, |
| 121 | + description="Per-request timeout for the Guardian round-trip.", |
| 122 | + ) |
| 123 | + target_function_or_group: Optional[str] = None |
| 124 | + target_location: str = "input" |
| 125 | + |
| 126 | + |
| 127 | +# ----- Middleware class ----- |
| 128 | + |
| 129 | +class ACSMiddleware(FunctionMiddleware): # type: ignore[misc, valid-type] |
| 130 | + """NAT middleware that defers each call's allow/deny/modify decision to an ACS Guardian.""" |
| 131 | + |
| 132 | + def __init__(self, config): |
| 133 | + if _NAT_AVAILABLE: |
| 134 | + super().__init__() |
| 135 | + self._config = config |
| 136 | + self._session_id = ( |
| 137 | + getattr(config, "session_id", None) |
| 138 | + or os.environ.get("ACS_SESSION_ID") |
| 139 | + or f"nat-{uuid.uuid4().hex[:16]}" |
| 140 | + ) |
| 141 | + |
| 142 | + @property |
| 143 | + def enabled(self) -> bool: |
| 144 | + return True |
| 145 | + |
| 146 | + async def pre_invoke(self, context): |
| 147 | + """Gate the function call. Block via raising or InvocationAction.SKIP; modify args in place.""" |
| 148 | + request = self._build_request( |
| 149 | + method="steps/toolCallRequest", |
| 150 | + tool_name=context.function_context.name, |
| 151 | + tool_arguments=dict(context.modified_kwargs or {}), |
| 152 | + ) |
| 153 | + |
| 154 | + try: |
| 155 | + response = self._call_guardian(request) |
| 156 | + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError) as e: |
| 157 | + if self._config.default_deny: |
| 158 | + return self._block(context, f"Guardian unreachable: {e}") |
| 159 | + return None # fail-open: proceed |
| 160 | + |
| 161 | + result = (response or {}).get("result", {}) |
| 162 | + decision = (result.get("decision") or "").lower() |
| 163 | + reasoning = result.get("reasoning", "") |
| 164 | + |
| 165 | + if decision == "allow": |
| 166 | + return None # proceed unchanged |
| 167 | + if decision == "deny": |
| 168 | + return self._block(context, reasoning or "denied by Guardian") |
| 169 | + if decision == "modify": |
| 170 | + mods = result.get("modifications", {}) |
| 171 | + overrides = mods.get("parameter_overrides") |
| 172 | + if isinstance(overrides, dict): |
| 173 | + context.modified_kwargs.update(overrides) |
| 174 | + return context |
| 175 | + return self._block(context, f"MODIFY substituted to DENY: {reasoning}") |
| 176 | + if decision in ("ask", "defer"): |
| 177 | + # NAT has no native pause-and-resume primitive on the middleware |
| 178 | + # boundary. Substitute block; deployments wanting ASK/DEFER |
| 179 | + # should compose with NAT's HITL middleware |
| 180 | + # (nat.middleware.hitl) and have the Guardian resolve before |
| 181 | + # responding. |
| 182 | + return self._block(context, f"{decision}: {reasoning}") |
| 183 | + |
| 184 | + # Unknown decision: apply fail posture |
| 185 | + if self._config.default_deny: |
| 186 | + return self._block(context, f"unknown disposition: {decision}") |
| 187 | + return None |
| 188 | + |
| 189 | + async def post_invoke(self, context): |
| 190 | + """Record the result. Optionally modify the output.""" |
| 191 | + request = self._build_request( |
| 192 | + method="steps/toolCallResult", |
| 193 | + tool_name=context.function_context.name, |
| 194 | + tool_arguments=dict(context.modified_kwargs or {}), |
| 195 | + result=context.output, |
| 196 | + ) |
| 197 | + |
| 198 | + try: |
| 199 | + response = self._call_guardian(request) |
| 200 | + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError): |
| 201 | + return None # post-hoc is best-effort |
| 202 | + |
| 203 | + result = (response or {}).get("result", {}) |
| 204 | + decision = (result.get("decision") or "").lower() |
| 205 | + if decision == "modify": |
| 206 | + mods = result.get("modifications", {}) |
| 207 | + modified_content = mods.get("modified_content") |
| 208 | + if modified_content is not None: |
| 209 | + context.output = modified_content |
| 210 | + return context |
| 211 | + return None |
| 212 | + |
| 213 | + # ----- helpers ----- |
| 214 | + |
| 215 | + def _block(self, context, reason: str): |
| 216 | + """Block the invocation. Prefer InvocationAction when available, |
| 217 | + fall back to raising for NAT releases that don't expose it.""" |
| 218 | + if _HAS_INVOCATION_ACTION: |
| 219 | + context.action = InvocationAction.SKIP # type: ignore[attr-defined] |
| 220 | + return context |
| 221 | + raise ACSGuardianDenied(reason) |
| 222 | + |
| 223 | + def _build_request( |
| 224 | + self, |
| 225 | + method: str, |
| 226 | + tool_name: str, |
| 227 | + tool_arguments: dict, |
| 228 | + result: Any = None, |
| 229 | + ) -> dict: |
| 230 | + params: dict[str, Any] = { |
| 231 | + "session_id": self._session_id, |
| 232 | + "step_id": str(uuid.uuid4()), |
| 233 | + "tool": {"name": tool_name, "arguments": tool_arguments}, |
| 234 | + } |
| 235 | + if result is not None: |
| 236 | + params["result"] = result if isinstance(result, (str, int, float, bool, dict, list)) else str(result) |
| 237 | + return { |
| 238 | + "jsonrpc": "2.0", |
| 239 | + "id": str(uuid.uuid4()), |
| 240 | + "method": method, |
| 241 | + "params": params, |
| 242 | + "acs_version": ACS_VERSION, |
| 243 | + "request_id": str(uuid.uuid4()), |
| 244 | + "timestamp": int(time.time() * 1000), |
| 245 | + "metadata": {"source": "acs-adapter-nat"}, |
| 246 | + } |
| 247 | + |
| 248 | + def _call_guardian(self, request: dict) -> dict: |
| 249 | + body = json.dumps(request).encode("utf-8") |
| 250 | + req = urllib.request.Request( |
| 251 | + self._config.guardian_url, |
| 252 | + data=body, |
| 253 | + headers={"Content-Type": "application/json"}, |
| 254 | + method="POST", |
| 255 | + ) |
| 256 | + with urllib.request.urlopen(req, timeout=self._config.timeout_s) as resp: |
| 257 | + return json.loads(resp.read().decode("utf-8")) |
| 258 | + |
| 259 | + |
| 260 | +# ----- NAT registration ----- |
| 261 | + |
| 262 | +if _NAT_AVAILABLE and _HAS_REGISTRATION: |
| 263 | + @register_middleware(config_type=ACSMiddlewareConfig) # type: ignore[misc] |
| 264 | + async def build_acs_middleware(config: "ACSMiddlewareConfig", builder): # type: ignore[name-defined] |
| 265 | + """NAT factory entry point. Yields the middleware instance for NAT to wire up.""" |
| 266 | + yield ACSMiddleware(config) |
0 commit comments