forked from HKUDS/OpenSpace
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathscrubber.py
More file actions
183 lines (161 loc) · 6.36 KB
/
Copy pathscrubber.py
File metadata and controls
183 lines (161 loc) · 6.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""PII / secret scrubber for OpenViking feedback sessions.
Regex-based redaction applied before OpenSpace writes session content
to OpenViking. Not a full DLP solution — it catches the common classes
of secrets and identifiers that should never end up in a shared team
memory store. For enterprise deployments, consider layering a proper
DLP / entity-recognition stack on top.
Design goals:
* Zero external dependencies (only stdlib ``re``)
* Conservative placeholders that preserve prompt structure
* Idempotent: scrubbing already-scrubbed text produces the same result
* Opt-out via ``OPENVIKING_SCRUB_PII=false`` (not recommended)
Covered patterns (non-exhaustive):
* API keys & bearer tokens (Anthropic, OpenAI, GitHub, AWS, GCP, Azure, Slack)
* Basic-auth URLs (``https://user:pass@host``)
* Email addresses
* Phone numbers (E.164 and common formats)
* Credit cards (with Luhn check)
* SSN (US)
* IPv4 addresses
* JWT tokens
* RSA / SSH private key headers
"""
from __future__ import annotations
import re
from typing import Any, Dict, Iterable, List, Optional
# ---------------------------------------------------------------------------
# Regex patterns
# ---------------------------------------------------------------------------
# API key / token patterns — ordered from most specific to most generic so
# that "sk-ant-..." is caught before the generic "sk-..." fallback.
_PATTERNS: List[tuple[str, re.Pattern]] = [
# Anthropic API key
("[REDACTED_ANTHROPIC_KEY]", re.compile(r"sk-ant-[A-Za-z0-9\-_]{32,}")),
# OpenAI / OpenAI-compatible
("[REDACTED_OPENAI_KEY]", re.compile(r"sk-proj-[A-Za-z0-9\-_]{32,}")),
("[REDACTED_OPENAI_KEY]", re.compile(r"sk-[A-Za-z0-9]{40,}")),
# GitHub classic + fine-grained
("[REDACTED_GITHUB_TOKEN]", re.compile(r"gh[pousr]_[A-Za-z0-9]{36,}")),
("[REDACTED_GITHUB_TOKEN]", re.compile(r"github_pat_[A-Za-z0-9_]{40,}")),
# AWS
("[REDACTED_AWS_ACCESS_KEY]", re.compile(r"\bAKIA[0-9A-Z]{16}\b")),
("[REDACTED_AWS_ACCESS_KEY]", re.compile(r"\bASIA[0-9A-Z]{16}\b")),
# GCP service account key (starts with "AIza" for API keys)
("[REDACTED_GCP_API_KEY]", re.compile(r"\bAIza[0-9A-Za-z\-_]{35}\b")),
# Slack
("[REDACTED_SLACK_TOKEN]", re.compile(r"xox[baprs]-[A-Za-z0-9\-]{10,}")),
# OpenRouter
("[REDACTED_OPENROUTER_KEY]", re.compile(r"sk-or-[A-Za-z0-9\-_]{32,}")),
# JWT (three base64 segments joined by dots)
(
"[REDACTED_JWT]",
re.compile(r"\beyJ[A-Za-z0-9_\-]{10,}\.[A-Za-z0-9_\-]{10,}\.[A-Za-z0-9_\-]{10,}\b"),
),
# Bearer token in Authorization header context
(
"Authorization: Bearer [REDACTED_BEARER]",
re.compile(r"Authorization:\s*Bearer\s+[A-Za-z0-9\-_.=]+", re.IGNORECASE),
),
# Basic-auth URL: https://user:pass@host
(
r"\1[REDACTED_CREDS]@",
re.compile(r"(https?://)[^\s/@:]+:[^\s/@]+@"),
),
# Private key headers
(
"[REDACTED_PRIVATE_KEY]",
re.compile(
r"-----BEGIN (?:RSA |EC |OPENSSH |DSA |ENCRYPTED )?PRIVATE KEY-----"
r".*?"
r"-----END (?:RSA |EC |OPENSSH |DSA |ENCRYPTED )?PRIVATE KEY-----",
re.DOTALL,
),
),
# Email
(
"[REDACTED_EMAIL]",
re.compile(r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b"),
),
# Phone (E.164). Requires leading '+' so bare numeric strings (order
# numbers, IDs) are not swept up. Credit cards are handled separately
# via Luhn check above.
(
"[REDACTED_PHONE]",
re.compile(r"\+[1-9]\d{6,14}\b(?![0-9])"),
),
# SSN (US)
("[REDACTED_SSN]", re.compile(r"\b\d{3}-\d{2}-\d{4}\b")),
# IPv4 — keep localhost / RFC1918 readable, redact public IPs?
# For simplicity we scrub all dotted quads that look like IPs.
(
"[REDACTED_IP]",
re.compile(
r"\b(?:25[0-5]|2[0-4]\d|[01]?\d\d?)"
r"(?:\.(?:25[0-5]|2[0-4]\d|[01]?\d\d?)){3}\b"
),
),
]
# Credit card regex (check digits via Luhn before redacting)
_CC_PATTERN = re.compile(r"\b(?:\d[ -]?){13,19}\b")
def _luhn_ok(digits: str) -> bool:
total = 0
reverse = digits[::-1]
for i, ch in enumerate(reverse):
n = int(ch)
if i % 2 == 1:
n *= 2
if n > 9:
n -= 9
total += n
return total % 10 == 0
def _scrub_credit_cards(text: str) -> str:
def repl(m: re.Match) -> str:
raw = m.group(0)
digits = re.sub(r"[ -]", "", raw)
if 13 <= len(digits) <= 19 and _luhn_ok(digits):
return "[REDACTED_CC]"
return raw
return _CC_PATTERN.sub(repl, text)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def scrub_text(text: Optional[str]) -> str:
"""Return ``text`` with secrets and PII replaced by placeholders.
Idempotent: scrubbing already-scrubbed output is a no-op.
Order matters — credit cards are Luhn-checked and scrubbed FIRST so
long numeric runs do not get partially consumed by the phone regex.
"""
if not text:
return text or ""
out = text
# Credit cards first (Luhn-validated) to avoid phone regex eating
# a 13–19 digit card number as a fake E.164 number.
out = _scrub_credit_cards(out)
for replacement, pattern in _PATTERNS:
out = pattern.sub(replacement, out)
return out
def scrub_iterable(items: Iterable[str]) -> List[str]:
return [scrub_text(x) for x in items]
def scrub_feedback_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""Return a deep-copied record with sensitive text fields scrubbed.
Applies to keys that are known to contain user-visible content. The
caller decides which records to scrub (e.g. only when
``OPENVIKING_SCRUB_PII=true``).
"""
if not isinstance(record, dict):
return record
clean: Dict[str, Any] = {}
for k, v in record.items():
if isinstance(v, str):
# Scrub every string field — placeholders preserve structure
clean[k] = scrub_text(v)
elif isinstance(v, list):
clean[k] = [
scrub_text(x) if isinstance(x, str) else x
for x in v
]
elif isinstance(v, dict):
clean[k] = scrub_feedback_record(v)
else:
clean[k] = v
return clean