Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# MongoDB Atlas
MONGODB_URI="mongodb+srv://user:password@cluster.mongodb.net/?retryWrites=true&w=majority"

# OpenAI
OPENAI_API_KEY="sk-..."

# Optional: SSL CA bundle (corporate proxy)
# SSL_CERT_FILE="/path/to/cert.pem"
# REQUESTS_CA_BUNDLE="/path/to/cert.pem"

# Optional: force expired policy for drift scenario
# POLICY_FORCE_OLD_VERSION=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.env
.venv/
__pycache__/
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""CRM Ops Desk — LangGraph multi-agent demo for SDOT instrumentation."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from app.agents.records import records_node
from app.agents.policy import policy_node
from app.agents.action import action_node
from app.agents.audit import audit_node

__all__ = ["records_node", "policy_node", "action_node", "audit_node"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from __future__ import annotations

import json
from typing import Any

from colorama import Fore, Style
from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode

from app.models.action_output import ActionOutput, ToolReceipt
from app.tools.crm_tools import CRM_TOOLS

MAX_TOOL_ROUNDS = 3


async def action_node(state: dict, config: RunnableConfig) -> dict:
"""Action Agent — LLM decides which CRM tools to call and executes them.

Runs the full tool-calling loop inside a single graph node so that
the SDOT instrumentor emits exactly one ``invoke_agent`` span for the
Action Agent (matching the original CRM app's telemetry shape).

Accepts ``config`` so that graph-level callbacks (Galileo, SDOT) propagate
into the inner LLM and tool calls — this is required for tool outputs to
be captured by evaluation scorers.
"""
print(f"{Fore.YELLOW}-> Action Agent: Starting{Style.RESET_ALL}")
records = state.get("records", {})
policies = state.get("policies", [])

# Build context summary for the LLM
context_parts = []
requests = records.get("requests", [])
tickets = records.get("tickets", [])
orders = records.get("orders", [])

if requests:
context_parts.append(
f"Existing refund requests ({len(requests)}):\n"
+ json.dumps(requests[:2], indent=2, default=str)
)
if tickets:
context_parts.append(
f"Existing support tickets ({len(tickets)}):\n"
+ json.dumps(tickets[:2], indent=2, default=str)
)
if orders:
context_parts.append(
f"Customer orders ({len(orders)}):\n"
+ json.dumps(orders[:2], indent=2, default=str)
)
if policies:
policy_summary = [
{"version": p.get("version"), "region": p.get("region"),
"refund_window_days": p.get("refund_window_days"),
"effective_until": str(p.get("effective_until", "current"))}
for p in policies[:3]
]
context_parts.append(f"Applicable policies:\n{json.dumps(policy_summary, indent=2)}")

context_block = "\n\n".join(context_parts) if context_parts else "No prior records found."

system_prompt = f"""You are the Action Agent in a CRM Operations Desk.
Your job is to decide what actions to take for a customer request and execute
the appropriate tools. You MUST follow the policies and use the records
provided below — do not invent data or ignore the context.

Customer ID: {state['user_id']}

== Context from Records & Policy agents ==
{context_block}

== Instructions ==
1. Analyse the customer's sentiment (negative/neutral/positive) based on
their message tone and word choice.
2. If the customer is angry or very negative, escalate the ticket immediately.
3. If the customer wants a refund or return:
a. Check if a refund request already exists in the context above.
b. If not, create a refund request using the actual order amount and
product details from the records.
c. Also explain the refund state to keep the customer informed.
4. If the customer is asking about an order, explain the order state using
the actual order data from the records above.
5. Always create or update a support ticket to document the interaction.
6. IMPORTANT: Use ONLY data from the context above. Pass the customer's
actual order amounts, ticket IDs, product names, and refund request
data. Do NOT fabricate any IDs, amounts, or product details.
7. Select the MOST RELEVANT tools for this specific request. Not every
request needs every tool. Choose precisely.
8. Call ALL relevant tools in a SINGLE response. Do NOT repeat tool calls.
9. After receiving tool results, provide a brief summary of what was done.
Reference the specific actions taken (ticket created, refund initiated, etc.)."""

user_msg = state["user_query"]

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)
llm_with_tools = llm.bind_tools(CRM_TOOLS)
tool_node = ToolNode(CRM_TOOLS)

messages: list = [SystemMessage(content=system_prompt), HumanMessage(content=user_msg)]

# Tool-calling loop (capped at MAX_TOOL_ROUNDS)
# Pass config so graph-level callbacks propagate to LLM and tool calls
for round_num in range(MAX_TOOL_ROUNDS):
response = await llm_with_tools.ainvoke(messages, config=config)
messages.append(response)

if not response.tool_calls:
break

tool_names = [tc["name"] for tc in response.tool_calls]
print(f" {Fore.YELLOW}LLM chose tools: {', '.join(tool_names)}{Style.RESET_ALL}")

# Execute tools via ToolNode — pass config for callback propagation
result = await tool_node.ainvoke({"messages": messages}, config=config)
messages.extend(result["messages"])

# Collect tool results into ActionOutput
tool_receipts: list[dict] = []
for msg in messages:
if isinstance(msg, ToolMessage):
try:
content = json.loads(msg.content) if isinstance(msg.content, str) else msg.content
except (json.JSONDecodeError, TypeError):
content = {"raw": str(msg.content)}
if not isinstance(content, dict):
content = {"raw": str(content)}
tool_receipts.append(
ToolReceipt(
tool=msg.name or "unknown",
status=content.get("status", 200),
latency_ms=0,
response=content,
).model_dump()
)

# Determine resolution
if not tool_receipts:
resolution = "no_action_required"
else:
successful = [r["tool"] for r in tool_receipts if 200 <= r["status"] < 300]
if "create_refund_request" in successful:
resolution = "refund_request_created"
elif "escalate_ticket" in successful:
resolution = "ticket_escalated"
elif "update_ticket" in successful:
resolution = "ticket_updated"
elif "create_ticket" in successful:
resolution = "ticket_created"
elif "explain_refund_state" in successful:
resolution = "refund_state_explained"
else:
resolution = "action_failed"

tool_costs = {
"create_ticket": 0.0015,
"update_ticket": 0.001,
"escalate_ticket": 0.003,
"create_refund_request": 0.0015,
"explain_refund_state": 0.0005,
"explain_order_state": 0.0005,
}
total_cost = 0.002 # base cost for LLM calls
for r in tool_receipts:
total_cost += tool_costs.get(r["tool"], 0.001)

action_output = ActionOutput(
resolution=resolution,
tool_receipts=[ToolReceipt(**r) for r in tool_receipts],
cost_token_usd=total_cost,
)
print(f" {Fore.YELLOW}Resolution: {resolution} ({len(tool_receipts)} tools){Style.RESET_ALL}")
return {"action_output": action_output.model_dump()}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import annotations

import time
from datetime import datetime

from colorama import Fore, Style

from app.models.action_output import ToolReceipt
from app.models.audit_output import AuditOutput, Citation


async def audit_node(state: dict) -> dict:
"""Audit Agent — generate human-readable rationale and citations."""
print(f"{Fore.BLUE}-> Audit Agent: Starting{Style.RESET_ALL}")
policies = state.get("policies", [])
action_raw = state.get("action_output", {})
records = state.get("records", {})

citations = [
Citation(
source="policy_database",
doc_id=str(p.get("_id", "")),
version=p.get("version", "?"),
relevance_score=0.95,
)
for p in policies
]

tool_receipts = [ToolReceipt(**r) for r in action_raw.get("tool_receipts", [])]

# Build rationale
parts: list[str] = []
parts.append(
f"Customer {state['user_id']} submitted: "
f"'{state['user_query'][:100]}{'...' if len(state['user_query']) > 100 else ''}'"
)

if policies:
regions = list({p.get("region") for p in policies})
versions = list({p.get("version") for p in policies})
parts.append(f"Retrieved {len(policies)} policies (regions: {', '.join(regions)}, versions: {', '.join(versions)})")
expired = [p for p in policies if p.get("effective_until")]
if expired:
parts.append(f"Policy drift: expired policy {expired[0].get('version')}")

reqs = records.get("requests", [])
tkts = records.get("tickets", [])
if reqs:
total = sum(r.get("amount", 0) for r in reqs)
parts.append(f"Found {len(reqs)} refund requests (total: ${total:.2f})")
if tkts:
parts.append(f"Found {len(tkts)} support tickets")

ok_tools = [r.tool for r in tool_receipts if 200 <= r.status < 300]
fail_tools = [r.tool for r in tool_receipts if r.status >= 300]
if ok_tools:
parts.append(f"Executed: {', '.join(ok_tools)}")
if fail_tools:
parts.append(f"Failed: {', '.join(fail_tools)}")

resolution = action_raw.get("resolution", "unknown")
parts.append(f"Resolution: {resolution}")
cost = action_raw.get("cost_token_usd", 0)
if cost:
parts.append(f"Cost: ${cost:.4f}")

span_ids = [
f"policy_{int(time.time() * 1000)}",
f"records_{int(time.time() * 1000)}",
f"action_{int(time.time() * 1000)}",
f"audit_{int(time.time() * 1000)}",
]

audit = AuditOutput(
interaction_id=f"int_{int(time.time() * 1000)}",
span_ids=span_ids,
citations=citations,
tool_receipts=tool_receipts,
final_verdict=resolution,
rationale=" | ".join(parts),
created_at=datetime.utcnow().isoformat(),
)

print(f" {Fore.BLUE}Audit complete{Style.RESET_ALL}")
return {"audit_output": audit.model_dump(), "status": "completed"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

import os
from datetime import datetime

from colorama import Fore, Style

from app.rag.atlas_client import embed, get_db


async def policy_node(state: dict) -> dict:
"""Policy Agent — retrieve relevant policies via MongoDB Atlas vector search."""
print(f"{Fore.CYAN}-> Policy Agent: Starting{Style.RESET_ALL}")
user_query = state["user_query"]
records = state.get("records", {})
orders = records.get("orders", [])

# Determine region from first order's shipping address
if not orders:
region = "US"
else:
country = orders[0].get("shipping_address", {}).get("country", "").upper()
if country in ("US", "USA", "UNITED STATES"):
region = "US"
elif country in (
"UK", "GB", "UNITED KINGDOM", "FR", "FRANCE",
"DE", "GERMANY", "IT", "ITALY", "ES", "SPAIN",
):
region = "EU"
else:
region = "EU" # Default to EU for non-US countries

db = get_db()
try:
vec = embed([user_query])[0]
pipeline = [
{
"$vectorSearch": {
"index": "policy_vectors",
"path": "embedding",
"queryVector": vec,
"numCandidates": 50,
"limit": 10,
}
}
]
policies = list(db.policies.aggregate(pipeline))
except Exception as e:
print(f" {Fore.YELLOW}Vector search failed: {e}{Style.RESET_ALL}")
policies = list(db.policies.find().limit(5))

# Filter by region
policies = [p for p in policies if p.get("region") == region]

# Use old expired policies if drift toggle is set
if os.getenv("POLICY_FORCE_OLD_VERSION", "false").lower() == "true":
expired = [p for p in policies if p.get("effective_until") is not None]
if expired:
expired.sort(key=lambda x: x.get("effective_from", datetime.min), reverse=True)
policies = expired
print(f" {Fore.YELLOW}Drift mode: using expired policy {policies[0].get('version')}{Style.RESET_ALL}")

for p in policies:
p.pop("embedding", None)
p["_id"] = str(p["_id"])

print(f" {Fore.CYAN}Found {len(policies)} policies for region {region}{Style.RESET_ALL}")
return {"policies": policies}
Loading
Loading