-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathloader.py
More file actions
340 lines (280 loc) · 11.3 KB
/
Copy pathloader.py
File metadata and controls
340 lines (280 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
"""Policy pack loader.
Resolves the active PolicyIndex at startup. Policies are fetched
exclusively from the governance backend (``api/v1/policy``); there is
no local compiled fallback. When the backend is unavailable, the
access token is unset, or the fetch times out, the loader returns an
empty PolicyIndex and the agent runs without any rules.
"""
from __future__ import annotations
import logging
import os
import threading
import time
from collections import Counter
import yaml
from uipath.core.governance.config import is_governance_enabled
from uipath.runtime.governance.config import EnforcementMode, set_enforcement_mode
from uipath.runtime.governance.native._yaml_to_index import build_policy_index_from_yaml
from uipath.runtime.governance.native.backend_client import ENV_ACCESS_TOKEN
from uipath.runtime.governance.native.models import PolicyIndex
from uipath.runtime.governance.native.policy_api_client import (
ENV_ORGANIZATION_ID,
ENV_TENANT_ID,
POLICY_API_TIMEOUT_SECONDS,
fetch_policy_response,
resolve_organization_id,
resolve_tenant_id,
)
logger = logging.getLogger(__name__)
# Pack name aliases for backward compatibility
PACK_ALIASES: dict[str, str] = {
"owasp": "owasp_agentic",
"hipaa": "hipaa_runtime",
"soc2": "soc2_runtime",
"nist": "nist_ai_rmf_runtime",
"eu_ai": "eu_ai_act_runtime",
"iso": "iso42001_runtime",
}
# Module-level cache
_policy_index: PolicyIndex | None = None
# Background-prefetch coordination. ``_prefetch_event`` is set once the
# background load_policy_index() call finishes (success OR failure);
# callers of ``get_policy_index()`` wait on it. ``_prefetch_lock``
# protects the start-once semantics so concurrent ``prefetch`` calls
# don't kick off duplicate threads.
_prefetch_event: threading.Event | None = None
_prefetch_lock = threading.Lock()
# Default wait when ``get_policy_index()`` blocks on an in-flight
# prefetch. Matched to the policy-API HTTP timeout so a stuck backend
# bounds the total time spent waiting at first hook fire to
# ~POLICY_API_TIMEOUT_SECONDS. If the wait expires we return an empty
# PolicyIndex — the agent runs without any policies rather than
# blocking further or retrying.
_PREFETCH_WAIT_SECONDS = POLICY_API_TIMEOUT_SECONDS
def prefetch_policy_index() -> None:
"""Kick off a background load of the policy index.
Non-blocking. Designed to be called as early as possible (at
``GovernanceRuntime.__init__``) so the HTTP call to the governance
backend overlaps with the rest of agent setup. The result lands in
the same module cache that ``get_policy_index()`` reads from;
``get_policy_index()`` waits on this prefetch when it's in flight.
Idempotent: subsequent calls while the first is running are no-ops,
and calls after completion are no-ops. Skipped entirely when the
governance feature flag is OFF so no network call is made.
"""
global _prefetch_event
if not is_governance_enabled():
return
with _prefetch_lock:
if _policy_index is not None:
return # already loaded
if _prefetch_event is not None:
return # already in flight
event = threading.Event()
_prefetch_event = event
def _worker() -> None:
global _policy_index
try:
loaded = load_policy_index()
except Exception as exc: # noqa: BLE001 - logged; first hook will retry sync
logger.warning("Policy prefetch failed: %s", exc)
else:
with _prefetch_lock:
_policy_index = loaded
finally:
event.set()
threading.Thread(
target=_worker,
name="governance-policy-prefetch",
daemon=True,
).start()
def get_policy_index() -> PolicyIndex:
"""Get the cached policy index, loading if necessary.
Resolution order on first call:
1. If the governance feature flag is OFF, return an empty
PolicyIndex (cached). No network call.
2. If a prefetch (see :func:`prefetch_policy_index`) is in flight,
wait for it to complete (bounded by ``_PREFETCH_WAIT_SECONDS``).
3. Governance backend at ``api/v1/policy`` (one HTTP GET, cached).
4. Empty PolicyIndex when the backend is unavailable or times out.
Result is cached for the process lifetime; per-hook evaluation never
touches the network. Call :func:`clear_policy_cache` to force a
refetch (mainly for tests).
"""
global _policy_index
if _policy_index is not None:
return _policy_index
if not is_governance_enabled():
logger.info(
"Governance feature flag is OFF; returning empty PolicyIndex. "
"No rules will fire. Set EnablePythonGovernanceChecker=True to enable."
)
_policy_index = PolicyIndex()
return _policy_index
event = _prefetch_event
if event is not None:
completed = event.wait(timeout=_PREFETCH_WAIT_SECONDS)
if completed and _policy_index is not None:
return _policy_index
if not completed:
logger.warning(
"Policy prefetch did not complete in %.1fs; "
"agent will run without any policies",
_PREFETCH_WAIT_SECONDS,
)
else:
# Distinguish from the timeout path so production triage
# can tell "prefetch hung" from "prefetch returned empty"
# (auth failure, server error, parse failure).
logger.warning(
"Policy prefetch completed but produced no PolicyIndex "
"(see prior WARN for the root cause); agent will run "
"without any policies"
)
_policy_index = PolicyIndex()
return _policy_index
# No prefetch was started (direct callers / tests). Sync load — bounded
# by the HTTP timeout in the API client.
_policy_index = load_policy_index()
return _policy_index
def load_policy_index(pack_name: str | None = None) -> PolicyIndex:
"""Load the active PolicyIndex from the governance backend.
Args:
pack_name: Ignored. Pack selection is controlled entirely by the
backend.
Returns:
PolicyIndex parsed from the backend response. Empty PolicyIndex
when the backend is unavailable, the token is unset, the YAML
is malformed, or the response yields zero rules.
"""
start = time.perf_counter()
api_index = _load_from_api()
if api_index is not None:
_log_index_summary(api_index)
logger.info(
"Policy index ready: source=backend, total_ms=%.1f",
(time.perf_counter() - start) * 1000,
)
return api_index
reason = _empty_index_reason()
logger.info(
"Policy index ready: source=empty (%s), total_ms=%.1f",
reason,
(time.perf_counter() - start) * 1000,
)
return PolicyIndex()
def _empty_index_reason() -> str:
"""Diagnose why the policy fetch produced nothing."""
if not resolve_organization_id():
return (
f"UiPathConfig.organization_id unavailable — set {ENV_ORGANIZATION_ID} "
"or install uipath-platform; backend API not contacted"
)
if not resolve_tenant_id():
return (
f"UiPathConfig.tenant_id unavailable — set {ENV_TENANT_ID} "
"or install uipath-platform; backend API not contacted"
)
if not os.environ.get(ENV_ACCESS_TOKEN):
return f"{ENV_ACCESS_TOKEN} unset — backend API not contacted"
return "backend returned no policies (timeout / error / empty body)"
def _apply_enforcement_mode(mode_str: str | None) -> None:
"""Map a backend-supplied mode string onto :class:`EnforcementMode`.
Unknown values log a warning and leave the existing mode untouched.
"""
if not mode_str:
return
try:
mode = EnforcementMode(mode_str.lower())
except ValueError:
logger.warning(
"Backend returned unknown enforcement mode %r; keeping current mode",
mode_str,
)
return
set_enforcement_mode(mode)
logger.info("Enforcement mode set from backend: %s", mode.value)
def _load_from_api() -> PolicyIndex | None:
"""Fetch and parse the policy index from the governance backend.
Applies the backend-supplied enforcement mode as a side effect.
Returns ``None`` when the backend skips/errors, when the YAML is
malformed, or when the resulting index has no rules — caller returns
an empty PolicyIndex in those cases.
"""
start = time.perf_counter()
response = fetch_policy_response()
if response is None:
return None
# Apply the platform-controlled enforcement mode before building the
# index, so anything that reads ``get_enforcement_mode()`` during
# index compilation already sees the right value.
_apply_enforcement_mode(response.mode)
if not response.policy:
logger.warning(
"Policy fetch returned empty policy field; "
"agent will run without any policies"
)
return None
try:
index = build_policy_index_from_yaml(response.policy)
except yaml.YAMLError as exc:
logger.warning("Policy YAML from backend was malformed: %s", exc)
return None
except Exception as exc: # noqa: BLE001 - never let load break agent startup
logger.warning("Failed to build PolicyIndex from backend YAML: %s", exc)
return None
if index.total_rules == 0:
logger.warning(
"Policy YAML from backend yielded zero rules; "
"agent will run without any policies"
)
return None
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Loaded policy index from backend: packs=%s, rules=%d, elapsed_ms=%.1f",
index.pack_names,
index.total_rules,
elapsed_ms,
)
return index
def _backend_base_url() -> str:
"""Return the backend base URL for logging; imported lazily to avoid cycles."""
try:
from uipath.runtime.governance.native.backend_client import (
get_backend_base_url,
)
return get_backend_base_url()
except Exception: # noqa: BLE001
return "backend"
def _log_index_summary(index: PolicyIndex) -> None:
"""Log summary of loaded policy index."""
# Count rules by hook
hook_counts: Counter[str] = Counter()
for rule in index.all_rules:
hook_counts[rule.hook.value] += 1
logger.debug(
"Policy packs: %s, total rules: %d, by hook: %s",
index.pack_names,
index.total_rules,
dict(hook_counts),
)
def get_available_packs() -> list[str]:
"""Get list of pack names from the currently loaded policy index.
Returns whatever the backend supplied on the most recent load.
Empty list if no index has been loaded yet or the backend yielded
no packs.
"""
if _policy_index is None:
return []
return _policy_index.pack_names
def clear_policy_cache() -> None:
"""Clear the cached policy index and any in-flight prefetch state.
Next call to ``get_policy_index()`` will refetch from the backend.
"""
global _policy_index, _prefetch_event
with _prefetch_lock:
_policy_index = None
_prefetch_event = None
logger.debug("Policy index cache cleared")
# Backward compatibility alias
reset_policy_index = clear_policy_cache