-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevaluate.py
More file actions
140 lines (113 loc) · 4.39 KB
/
evaluate.py
File metadata and controls
140 lines (113 loc) · 4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
evaluate.py — Concurrent batch evaluation of all tickets with full audit logging.
The PDF requires:
- Tickets processed CONCURRENTLY (not one-by-one)
- audit_log.json covering all 20 tickets
- Tool calls, reasoning, and outcomes logged
Usage:
python evaluate.py # run all tickets concurrently
python evaluate.py --save # save audit_log.json
python evaluate.py TKT-001 TKT-008 # specific tickets only
"""
import json
import sys
import time
import threading
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
from agent import run_agent
from data_loader import load_all_data
DATA = load_all_data()
LOG_LOCK = threading.Lock()
def process_one_ticket(ticket: dict, results: list, index: int, total: int):
"""Process a single ticket and append result to shared list (thread-safe)."""
start = time.time()
with LOG_LOCK:
print(f" > [{index}/{total}] Starting: {ticket['ticket_id']} - {ticket['subject']}")
try:
response = run_agent(ticket)
elapsed = round(time.time() - start, 2)
status = "SUCCESS"
except Exception as e:
response = f"ERROR: {str(e)}"
elapsed = round(time.time() - start, 2)
status = "FAILED"
result = {
"ticket_id" : ticket["ticket_id"],
"subject" : ticket["subject"],
"customer_email" : ticket["customer_email"],
"source" : ticket.get("source", "unknown"),
"expected_action": ticket.get("expected_action", ""),
"agent_response" : response,
"status" : status,
"elapsed_seconds": elapsed,
"processed_at" : datetime.utcnow().isoformat() + "Z",
}
with LOG_LOCK:
results.append(result)
icon = "OK" if status == "SUCCESS" else "FAIL"
print(f" [{icon}] [{index}/{total}] Done: {ticket['ticket_id']} ({elapsed}s)")
return result
def run_evaluation(ticket_ids=None, save=False):
tickets = DATA["tickets"]
if ticket_ids:
tickets = [t for t in tickets if t["ticket_id"] in ticket_ids]
total = len(tickets)
results = []
wall_start = time.time()
print(f"\n{'='*65}")
print(f" ShopWave AI Agent - CONCURRENT Batch Evaluation")
print(f" {total} tickets | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Mode: Parallel threading (all tickets simultaneously)")
print(f"{'='*65}\n")
# Launch all tickets concurrently
threads = []
for i, ticket in enumerate(tickets, 1):
t = threading.Thread(
target=process_one_ticket,
args=(ticket, results, i, total),
daemon=True,
)
threads.append(t)
t.start()
time.sleep(0.4) # small stagger to avoid API rate limit bursts
for t in threads:
t.join()
wall_elapsed = round(time.time() - wall_start, 2)
results.sort(key=lambda r: r["ticket_id"])
ok = sum(1 for r in results if r["status"] == "SUCCESS")
fail = total - ok
avg = round(sum(r["elapsed_seconds"] for r in results) / total, 2) if total else 0
print(f"\n{'='*65}")
print(f" RESULTS")
print(f"{'='*65}")
for r in results:
icon = "OK " if r["status"] == "SUCCESS" else "FAIL"
print(f" [{icon}] {r['ticket_id']:10} | {r['elapsed_seconds']}s | {r['subject'][:38]}")
print(f"{'='*65}")
print(f" {ok}/{total} successful | {fail} failed")
print(f" Wall time: {wall_elapsed}s | Avg: {avg}s/ticket")
if wall_elapsed > 0:
print(f" Concurrency speedup: ~{round(sum(r['elapsed_seconds'] for r in results)/wall_elapsed,1)}x faster than sequential")
print(f"{'='*65}\n")
audit_log = {
"generated_at" : datetime.utcnow().isoformat() + "Z",
"total_tickets" : total,
"successful" : ok,
"failed" : fail,
"wall_time_seconds": wall_elapsed,
"avg_per_ticket" : avg,
"mode" : "concurrent_threading",
"tickets" : results,
}
if save:
with open("audit_log.json", "w") as f:
json.dump(audit_log, f, indent=2)
print(f"Saved to audit_log.json\n")
return audit_log
if __name__ == "__main__":
args = sys.argv[1:]
save_flag = "--save" in args
ids = [a for a in args if a.startswith("TKT-")]
run_evaluation(ticket_ids=ids or None, save=save_flag)