|
1 | 1 | from dataclasses import dataclass |
2 | 2 | import logging |
| 3 | +import os |
3 | 4 | from pathlib import Path |
| 5 | +import subprocess |
| 6 | +import sys |
| 7 | +import textwrap |
4 | 8 |
|
5 | 9 | import pytest |
6 | 10 |
|
@@ -314,6 +318,95 @@ async def test_tool_post_invoke_returns_copied_payload_for_frozen_models(): |
314 | 318 | assert result.modified_payload.result["contact"] == "[REDACTED]" |
315 | 319 |
|
316 | 320 |
|
| 321 | +@pytest.mark.asyncio |
| 322 | +async def test_tool_post_invoke_returns_new_nested_result_for_mcp_content(): |
| 323 | + plugin = PIIFilterPlugin(_make_config()) |
| 324 | + payload = ToolPostInvokePayload( |
| 325 | + name="search", |
| 326 | + result={ |
| 327 | + "content": [ |
| 328 | + { |
| 329 | + "type": "text", |
| 330 | + "text": "Contact alice@example.com", |
| 331 | + } |
| 332 | + ], |
| 333 | + "isError": False, |
| 334 | + }, |
| 335 | + ) |
| 336 | + |
| 337 | + result = await plugin.tool_post_invoke(payload, _make_context()) |
| 338 | + |
| 339 | + assert result.modified_payload is not None |
| 340 | + assert result.modified_payload is not payload |
| 341 | + assert result.modified_payload.result is not payload.result |
| 342 | + assert result.modified_payload.result["content"] is not payload.result["content"] |
| 343 | + assert result.modified_payload.result["content"][0] is not payload.result["content"][0] |
| 344 | + assert payload.result["content"][0]["text"] == "Contact alice@example.com" |
| 345 | + assert result.modified_payload.result["content"][0]["text"] == "Contact [REDACTED]" |
| 346 | + |
| 347 | + |
| 348 | +def test_tool_post_invoke_survives_real_cpex_policy_with_isolated_payload(): |
| 349 | + plugin_root = ( |
| 350 | + Path(__file__).resolve().parents[3] |
| 351 | + / "plugins" |
| 352 | + / "rust" |
| 353 | + / "python-package" |
| 354 | + / "pii_filter" |
| 355 | + ) |
| 356 | + script = """ |
| 357 | + import asyncio |
| 358 | + from cpex.framework import PluginConfig, PluginContext |
| 359 | + from cpex.framework.hooks.policies import HookPayloadPolicy, apply_policy |
| 360 | + from cpex.framework.hooks.tools import ToolPostInvokePayload |
| 361 | + from cpex.framework.memory import wrap_payload_for_isolation |
| 362 | + from cpex.framework.models import GlobalContext |
| 363 | + from cpex_pii_filter.pii_filter import PIIFilterPlugin |
| 364 | +
|
| 365 | + async def main(): |
| 366 | + plugin = PIIFilterPlugin(PluginConfig( |
| 367 | + name="pii_filter", |
| 368 | + kind="cpex_pii_filter.pii_filter.PIIFilterPlugin", |
| 369 | + config={"detect_email": True, "detect_ssn": True, "block_on_detection": False}, |
| 370 | + )) |
| 371 | + payload = ToolPostInvokePayload( |
| 372 | + name="search", |
| 373 | + result={ |
| 374 | + "content": [{"type": "text", "text": "Contact alice@example.com"}], |
| 375 | + "isError": False, |
| 376 | + }, |
| 377 | + ) |
| 378 | + plugin_input = wrap_payload_for_isolation(payload) |
| 379 | + context = PluginContext(global_context=GlobalContext(request_id="req-pii")) |
| 380 | +
|
| 381 | + result = await plugin.tool_post_invoke(plugin_input, context) |
| 382 | + assert result.modified_payload is not None |
| 383 | + filtered = apply_policy( |
| 384 | + plugin_input, |
| 385 | + result.modified_payload, |
| 386 | + HookPayloadPolicy(writable_fields=frozenset({"result"})), |
| 387 | + apply_to=payload, |
| 388 | + ) |
| 389 | + assert filtered is not None |
| 390 | + assert payload.result["content"][0]["text"] == "Contact alice@example.com" |
| 391 | + assert filtered.result["content"][0]["text"] == "Contact [REDACTED]" |
| 392 | +
|
| 393 | + asyncio.run(main()) |
| 394 | + print("ok") |
| 395 | + """ |
| 396 | + env = os.environ.copy() |
| 397 | + env.pop("PYTHONPATH", None) |
| 398 | + result = subprocess.run( |
| 399 | + [sys.executable, "-c", textwrap.dedent(script)], |
| 400 | + cwd=plugin_root, |
| 401 | + env=env, |
| 402 | + text=True, |
| 403 | + capture_output=True, |
| 404 | + check=False, |
| 405 | + ) |
| 406 | + assert result.returncode == 0, result.stderr |
| 407 | + assert result.stdout.strip() == "ok" |
| 408 | + |
| 409 | + |
317 | 410 | @pytest.mark.asyncio |
318 | 411 | async def test_tool_post_invoke_blocks_when_configured(): |
319 | 412 | plugin = PIIFilterPlugin(_make_config(block_on_detection=True)) |
|
0 commit comments