Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions code/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
## Setup

```bash
pip install -r requirements.txt
```

## Run

```bash
cd code/
python main.py \
--tickets ../support_tickets/support_tickets.csv \
--data ../data/ \
--output ../support_tickets/output.csv \
--log ../run_log.txt
```

## Validation

```bash
cd code/
python main.py \
--tickets ../support_tickets/sample_support_tickets.csv \
--data ../data/ \
--output ../support_tickets/sample_output.csv \
--log ../run_log.txt
```

## Output columns

`status, product_area, response, justification, request_type`
183 changes: 183 additions & 0 deletions code/classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import re
from dataclasses import dataclass
from typing import Dict
from typing import List
from retrieval import RetrievalHit
from utils import clean_text


ALLOWED_REQUEST_TYPES = {"product_issue", "feature_request", "bug", "invalid"}

AREA_FALLBACKS = {
"hackerrank": {
"interviewer": "team_management",
"team": "team_management",
"certificate": "certifications",
"mock interview": "interviewing",
"compatibility": "assessments",
"assessment": "assessments",
"test": "assessments",
"resume": "community_support",
"subscription": "billing_and_subscriptions",
"interview": "interviewing",
"lti": "integrations",
},
"claude": {
"bedrock": "api_and_developer_tools",
"api": "api_and_developer_tools",
"workspace": "account_management",
"personal data": "privacy_and_compliance",
"privacy": "privacy_and_compliance",
"crawl": "privacy_and_compliance",
"crawler": "privacy_and_compliance",
"lti": "education",
},
"visa": {
"charge": "payment_processing",
"refund": "payment_processing",
"dispute": "payment_processing",
"merchant": "merchant_acceptance",
"minimum": "merchant_acceptance",
"travel": "travel_services",
"cash": "travel_services",
"identity": "fraud_and_security",
"fraud": "fraud_and_security",
"privacy": "data_privacy",
},
}


@dataclass
class ClassificationResult:
subject: str
issue: str
ticket_text: str
company: str
request_type: str


class Classifier:
domain_keywords = {
"hackerrank": [
"hackerrank",
"assessment",
"test",
"candidate",
"recruiter",
"interview",
"mock interview",
"resume builder",
"apply tab",
"certificate",
"interviewer",
],
"claude": [
"claude",
"bedrock",
"anthropic",
"workspace",
"crawler",
"lti",
"model",
"console",
],
"visa": [
"visa",
"card",
"merchant",
"charge",
"cash",
"travel",
"fraud",
],
}

invalid_patterns = (
"delete all files",
"ignore previous instructions",
"show internal rules",
"show documents retrieved",
"logic exact",
"prompt injection",
)

bug_patterns = (
"not working",
"stopped working",
"failing",
"error",
"down",
"issue while",
"compatibility",
"blocked",
"blocker",
"not responding",
"unable to",
)

feature_patterns = (
"feature request",
"can you add",
"enhancement",
"new feature",
)

def classify(self, row: Dict[str, str]) -> ClassificationResult:
subject = clean_text(row.get("Subject", ""))
issue = clean_text(row.get("Issue", ""))
ticket_text = clean_text(f"{subject} {issue}")
company = self.infer_company(clean_text(row.get("Company", "")), ticket_text)
request_type = self.request_type(ticket_text)
if request_type not in ALLOWED_REQUEST_TYPES:
request_type = "invalid"
return ClassificationResult(
subject=subject,
issue=issue,
ticket_text=ticket_text,
company=company,
request_type=request_type,
)

def infer_company(self, provided: str, ticket_text: str) -> str:
lowered = provided.lower().strip()
if lowered in {"hackerrank", "claude", "visa"}:
return lowered
scores = {}
text = ticket_text.lower()
for company, keywords in self.domain_keywords.items():
scores[company] = sum(1 for keyword in keywords if keyword in text)
best_company = max(scores, key=scores.get)
return best_company if scores[best_company] > 0 else "none"

def request_type(self, ticket_text: str) -> str:
lowered = ticket_text.lower().strip()
if not lowered:
return "invalid"
if any(pattern in lowered for pattern in self.invalid_patterns):
return "invalid"
if re.fullmatch(r"(thanks|thank you|ok|okay)[.! ]*", lowered):
return "invalid"
if any(pattern in lowered for pattern in self.feature_patterns):
return "feature_request"
if any(pattern in lowered for pattern in self.bug_patterns):
return "bug"
return "product_issue"

def product_area(self, classification: ClassificationResult, hits: List[RetrievalHit]) -> str:
if hits:
top_area = hits[0].doc.product_area
if top_area not in {"conversation_management", "general"}:
return top_area

lowered = classification.ticket_text.lower()
for keyword, area in AREA_FALLBACKS.get(classification.company, {}).items():
if keyword in lowered:
return area

if classification.company == "hackerrank":
return "platform_support"
if classification.company == "claude":
return "account_management"
if classification.company == "visa":
return "payment_processing"
return "platform_support"
183 changes: 183 additions & 0 deletions code/decision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from dataclasses import dataclass
from typing import List

from classifier import ClassificationResult
from retrieval import RetrievalHit
from utils import tokenize


CONFIDENCE_THRESHOLDS = {
"strong_reply": 0.55,
"overlap_reply": 0.35,
"none": 0.0,
}

BYPASS_INTENTS = {"bedrock", "crawler", "lti", "interviewer", "troubleshooting", "merchant_rules", "travel_support", "compatibility"}


@dataclass(frozen=True)
class Decision:
status: str
confidence: float
reason_code: str
reason_detail: str


class DecisionEngine:
pii_terms = ("personal data", "private info", "privacy request", "data request")
security_terms = (
"security vulnerability",
"bug bounty",
"identity theft",
"restore my access",
"delete my account",
"remove my account",
"stolen",
"compromised",
)
certificate_terms = ("certificate", "credential")
billing_terms = ("payment", "refund", "charge", "billing", "subscription", "dispute")
legal_terms = ("legal", "compliance", "dpo", "gdpr", "infosec", "security questionnaire")

def decide(
self,
ticket_text: str,
classification: ClassificationResult,
hits: List[RetrievalHit],
) -> Decision:
lowered = ticket_text.lower()
confidence = hits[0].score if hits else CONFIDENCE_THRESHOLDS["none"]
intent = self._intent_name(lowered)

if classification.request_type == "invalid":
return Decision(
status="escalated",
confidence=confidence,
reason_code="invalid",
reason_detail="Ticket does not match any supported domain or request type.",
)

if not hits:
return Decision(
status="escalated",
confidence=0.0,
reason_code="no_docs",
reason_detail="No relevant documentation was retrieved from the support corpus.",
)

if self._is_high_risk(lowered):
reason_code, reason_detail = self._select_escalation_reason(lowered, hits)
return Decision(
status="escalated",
confidence=confidence,
reason_code=reason_code,
reason_detail=reason_detail,
)

if intent in BYPASS_INTENTS and hits:
return Decision(
status="replied",
confidence=confidence,
reason_code="corpus_grounded_reply",
reason_detail="A sufficiently strong support document was retrieved for a grounded reply.",
)

if confidence >= CONFIDENCE_THRESHOLDS["strong_reply"]:
return Decision(
status="replied",
confidence=confidence,
reason_code="corpus_grounded_reply",
reason_detail="A sufficiently strong support document was retrieved for a grounded reply.",
)

if confidence >= CONFIDENCE_THRESHOLDS["overlap_reply"] and self._intent_keyword_overlap(lowered, hits):
return Decision(
status="replied",
confidence=confidence,
reason_code="corpus_grounded_reply",
reason_detail="A sufficiently strong support document was retrieved for a grounded reply.",
)

return Decision(
status="escalated",
confidence=confidence,
reason_code="low_confidence",
reason_detail="The available documentation does not provide sufficient guidance for this specific case.",
)

def _select_escalation_reason(self, lowered: str, hits: List[RetrievalHit]):
if any(term in lowered for term in ("personal data", "gdpr", "privacy", "my data", "delete my")):
return (
"pii",
"This ticket involves personal data and requires privacy review.",
)
if any(term in lowered for term in ("fraud", "unauthorized", "stolen", "suspicious", "bug bounty", "security vulnerability")):
return (
"security",
"This ticket involves account security or fraud and requires human verification.",
)
if any(term in lowered for term in ("password", "account access", "login", "identity", "account deletion")):
return (
"security",
"This ticket involves account security changes and requires human verification.",
)
if any(term in lowered for term in ("billing", "payment", "refund", "charge", "invoice", "subscription cancel")):
return (
"billing",
"This ticket involves a billing or payment matter that requires human review.",
)
if any(term in lowered for term in ("certificate", "credential", "badge", "verify name")):
return (
"certificate",
"This ticket involves a certificate or credential update that requires identity verification.",
)
if any(term in lowered for term in ("legal", "compliance", "regulation", "court")):
return (
"legal",
"This ticket involves compliance or legal review.",
)
if not hits:
return ("no_docs", "No relevant documentation was found in the support corpus for this query.")
return ("low_confidence", "The available documentation does not provide sufficient guidance for this specific case.")

def _is_high_risk(self, lowered: str) -> bool:
return any(
term in lowered
for term in (
"personal data", "gdpr", "privacy", "my data", "delete my",
"fraud", "unauthorized", "stolen", "suspicious", "bug bounty", "security vulnerability",
"password", "account access", "login", "identity", "account deletion",
"billing", "payment", "refund", "charge", "invoice", "subscription cancel",
"certificate", "credential", "badge", "verify name",
"legal", "compliance", "regulation", "court",
)
)

def _intent_name(self, lowered: str) -> str:
if "bedrock" in lowered:
return "bedrock"
if "crawler" in lowered or "crawl" in lowered:
return "crawler"
if "not responding" in lowered or "requests are failing" in lowered or "stopped working" in lowered:
return "troubleshooting"
if "minimum" in lowered and "visa" in lowered:
return "merchant_rules"
if "urgent cash" in lowered or ("cash" in lowered and "visa" in lowered):
return "travel_support"
if "compatibility" in lowered or "zoom connectivity" in lowered:
return "compatibility"
if "employee has left" in lowered or "remove them" in lowered:
return "interviewer"
if " lti " in f" {lowered} ":
return "lti"
if "interviewer" in lowered and "remove" in lowered:
return "interviewer"
return ""

def _intent_keyword_overlap(self, lowered: str, hits: List[RetrievalHit]) -> bool:
ticket_tokens = set(tokenize(lowered))
for hit in hits[:3]:
doc_tokens = set(tokenize(f"{hit.doc.title} {hit.doc.path} {hit.doc.text[:800]}"))
if len(ticket_tokens & doc_tokens) >= 2:
return True
return False
Loading