|
| 1 | +"""Filesystem contract suite for framework adapters. |
| 2 | +
|
| 3 | +Focus: |
| 4 | + - Untrusted traversal-like read intents are blocked. |
| 5 | + - Trusted workspace read intents are permitted where supported. |
| 6 | + - Witness artifacts carry filesystem sink metadata. |
| 7 | +""" |
| 8 | + |
| 9 | +from __future__ import annotations |
| 10 | + |
| 11 | +import os |
| 12 | +import sys |
| 13 | +from dataclasses import dataclass |
| 14 | + |
| 15 | +import pytest |
| 16 | + |
| 17 | +sys.path.insert( |
| 18 | + 0, |
| 19 | + os.path.join( |
| 20 | + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), |
| 21 | + "src", |
| 22 | + ), |
| 23 | +) |
| 24 | + |
| 25 | +from clawzero.adapters import AutoGenAdapter, CrewAIAdapter, LangChainAdapter, MCPAdapter, OpenClawAdapter |
| 26 | +from clawzero.contracts import InputClass |
| 27 | +from clawzero.exceptions import ExecutionBlocked |
| 28 | + |
| 29 | +SAFE_PATHS: tuple[str, ...] = ( |
| 30 | + "/workspace/project/report_01.md", |
| 31 | + "/workspace/project/report_02.md", |
| 32 | + "/workspace/project/report_03.md", |
| 33 | + "/workspace/project/report_04.md", |
| 34 | + "/workspace/project/report_05.md", |
| 35 | + "/workspace/project/report_06.md", |
| 36 | +) |
| 37 | + |
| 38 | +TRAVERSAL_PATHS: tuple[str, ...] = ( |
| 39 | + "/workspace/../../etc/passwd", |
| 40 | + "/workspace/../../../etc/shadow", |
| 41 | + "/tmp/../etc/hosts", |
| 42 | + "/workspace/project/../../../private/var/db", |
| 43 | + "/workspace/%2e%2e/%2e%2e/etc/passwd", |
| 44 | + "../../etc/passwd", |
| 45 | +) |
| 46 | + |
| 47 | +BLOCK_REASON_CODES = { |
| 48 | + "PATH_BLOCKED", |
| 49 | + "UNTRUSTED_TO_CRITICAL_SINK", |
| 50 | + "POLICY_BLOCK", |
| 51 | + "STEP_UP_REQUIRED", |
| 52 | +} |
| 53 | + |
| 54 | + |
| 55 | +@dataclass(frozen=True) |
| 56 | +class FilesystemCase: |
| 57 | + adapter: str |
| 58 | + path: str |
| 59 | + trusted: bool |
| 60 | + |
| 61 | + @property |
| 62 | + def case_id(self) -> str: |
| 63 | + mode = "trusted" if self.trusted else "untrusted" |
| 64 | + path_kind = "safe" if self.trusted else "traversal" |
| 65 | + return f"{self.adapter}__{mode}__{path_kind}__{abs(hash(self.path)) % 10000:04d}" |
| 66 | + |
| 67 | + |
| 68 | +def _cases() -> list[FilesystemCase]: |
| 69 | + rows: list[FilesystemCase] = [] |
| 70 | + for adapter in ("openclaw", "langchain", "crewai", "autogen", "mcp"): |
| 71 | + rows.extend(FilesystemCase(adapter=adapter, path=path, trusted=True) for path in SAFE_PATHS) |
| 72 | + rows.extend(FilesystemCase(adapter=adapter, path=path, trusted=False) for path in TRAVERSAL_PATHS) |
| 73 | + return rows |
| 74 | + |
| 75 | + |
| 76 | +def _run_openclaw(case: FilesystemCase) -> tuple[OpenClawAdapter, object]: |
| 77 | + adapter = OpenClawAdapter(profile="dev_balanced") |
| 78 | + |
| 79 | + def tool(path: str) -> str: |
| 80 | + return f"read:{path}" |
| 81 | + |
| 82 | + wrapped = adapter.wrap_tool(tool, sink_type="filesystem.read") |
| 83 | + result = wrapped(case.path) |
| 84 | + return adapter, result |
| 85 | + |
| 86 | + |
| 87 | +def _run_langchain(case: FilesystemCase) -> tuple[LangChainAdapter, object]: |
| 88 | + adapter = LangChainAdapter(profile="dev_balanced") |
| 89 | + |
| 90 | + def tool(payload): # noqa: ANN001 |
| 91 | + return payload |
| 92 | + |
| 93 | + wrapped = adapter.wrap_tool(tool, sink_type="filesystem.read") |
| 94 | + payload = { |
| 95 | + "path": case.path, |
| 96 | + "prompt_provenance": { |
| 97 | + "source": "user_request" if case.trusted else "external_document", |
| 98 | + "taint_level": "trusted" if case.trusted else "untrusted", |
| 99 | + "source_chain": ["filesystem_contract", "adapter", "langchain"], |
| 100 | + "taint_markers": [] if case.trusted else ["external_content"], |
| 101 | + }, |
| 102 | + } |
| 103 | + result = wrapped(payload) |
| 104 | + return adapter, result |
| 105 | + |
| 106 | + |
| 107 | +def _run_crewai(case: FilesystemCase) -> tuple[CrewAIAdapter, object]: |
| 108 | + adapter = CrewAIAdapter(profile="dev_balanced") |
| 109 | + |
| 110 | + def tool(payload): # noqa: ANN001 |
| 111 | + return payload |
| 112 | + |
| 113 | + wrapped = adapter.wrap_tool(tool, sink_type="filesystem.read") |
| 114 | + payload = { |
| 115 | + "path": case.path, |
| 116 | + "prompt_provenance": { |
| 117 | + "source": "user_request" if case.trusted else "external_document", |
| 118 | + "taint_level": "trusted" if case.trusted else "untrusted", |
| 119 | + "source_chain": ["filesystem_contract", "adapter", "crewai"], |
| 120 | + "taint_markers": [] if case.trusted else ["external_content"], |
| 121 | + }, |
| 122 | + } |
| 123 | + result = wrapped(payload) |
| 124 | + return adapter, result |
| 125 | + |
| 126 | + |
| 127 | +def _run_autogen(case: FilesystemCase) -> tuple[AutoGenAdapter, object]: |
| 128 | + adapter = AutoGenAdapter(profile="dev_balanced") |
| 129 | + |
| 130 | + def tool(payload): # noqa: ANN001 |
| 131 | + return payload |
| 132 | + |
| 133 | + wrapped = adapter.wrap_function(tool, sink_type="filesystem.read", func_name="read_file") |
| 134 | + payload = { |
| 135 | + "path": case.path, |
| 136 | + "prompt_provenance": { |
| 137 | + "source": "user_request" if case.trusted else "external_document", |
| 138 | + "taint_level": "trusted" if case.trusted else "untrusted", |
| 139 | + "source_chain": ["filesystem_contract", "adapter", "autogen"], |
| 140 | + "taint_markers": [] if case.trusted else ["external_content"], |
| 141 | + }, |
| 142 | + } |
| 143 | + result = wrapped(payload) |
| 144 | + return adapter, result |
| 145 | + |
| 146 | + |
| 147 | +def _run_mcp(case: FilesystemCase) -> tuple[MCPAdapter, object]: |
| 148 | + adapter = MCPAdapter( |
| 149 | + profile="dev_balanced", |
| 150 | + sink_map={"read_file": "filesystem.read"}, |
| 151 | + input_class=InputClass.TRUSTED if case.trusted else InputClass.UNTRUSTED, |
| 152 | + ) |
| 153 | + |
| 154 | + def call_tool(tool_name: str, payload: dict): # noqa: ANN001 |
| 155 | + return {"tool": tool_name, "payload": payload} |
| 156 | + |
| 157 | + wrapped = adapter.wrap_call(call_tool) |
| 158 | + result = wrapped("read_file", {"path": case.path}) |
| 159 | + return adapter, result |
| 160 | + |
| 161 | + |
| 162 | +def _execute(case: FilesystemCase): |
| 163 | + if case.adapter == "openclaw": |
| 164 | + return _run_openclaw(case) |
| 165 | + if case.adapter == "langchain": |
| 166 | + return _run_langchain(case) |
| 167 | + if case.adapter == "crewai": |
| 168 | + return _run_crewai(case) |
| 169 | + if case.adapter == "autogen": |
| 170 | + return _run_autogen(case) |
| 171 | + return _run_mcp(case) |
| 172 | + |
| 173 | + |
| 174 | +@pytest.mark.parametrize("case", [pytest.param(case, id=case.case_id) for case in _cases()]) |
| 175 | +def test_adapter_filesystem_contract_generated(case: FilesystemCase) -> None: |
| 176 | + should_block = not case.trusted |
| 177 | + |
| 178 | + if should_block: |
| 179 | + with pytest.raises(ExecutionBlocked) as exc: |
| 180 | + _execute(case) |
| 181 | + assert exc.value.decision.reason_code in BLOCK_REASON_CODES |
| 182 | + return |
| 183 | + |
| 184 | + if case.adapter == "openclaw": |
| 185 | + # OpenClaw adapter currently emits untrusted provenance by design, |
| 186 | + # so trusted/allow behavior is policy-dependent across engines. |
| 187 | + try: |
| 188 | + adapter, result = _execute(case) |
| 189 | + assert result is not None |
| 190 | + witness = adapter.runtime.last_witness |
| 191 | + assert isinstance(witness, dict) |
| 192 | + assert witness.get("sink_type") == "filesystem.read" |
| 193 | + provenance = witness.get("provenance") |
| 194 | + assert isinstance(provenance, dict) |
| 195 | + assert str(provenance.get("taint_level")) == "untrusted" |
| 196 | + return |
| 197 | + except ExecutionBlocked as exc: |
| 198 | + assert exc.value.decision.reason_code in BLOCK_REASON_CODES |
| 199 | + return |
| 200 | + |
| 201 | + adapter, result = _execute(case) |
| 202 | + assert result is not None |
| 203 | + witness = adapter.runtime.last_witness |
| 204 | + assert isinstance(witness, dict) |
| 205 | + assert witness.get("sink_type") == "filesystem.read" |
| 206 | + assert witness.get("decision") in {"allow", "annotate"} |
| 207 | + provenance = witness.get("provenance") |
| 208 | + assert isinstance(provenance, dict) |
| 209 | + assert str(provenance.get("taint_level")) == "trusted" |
0 commit comments