-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathai_analyser.py
More file actions
504 lines (416 loc) · 18.6 KB
/
ai_analyser.py
File metadata and controls
504 lines (416 loc) · 18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
"""
ai_analyser.py — Makes Anthropic API calls and returns AICommentary.
Takes a WorkbookDigest and WorkbookAnalysisResult and produces AI-generated
commentary across four optional features. Any individual feature failure is
captured in AICommentary.api_error; other features continue.
"""
from __future__ import annotations
import json
import logging
from typing import Optional
import settings
from models import (
AICommentary,
FormulaExplanation,
SheetNarrative,
WorkbookAnalysisResult,
WorkbookDigest,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared system prompt
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT = """\
You are an expert actuarial model reviewer with deep experience in life \
insurance, general insurance, and financial modelling in Excel. You are \
reviewing an Excel workbook on behalf of a qualified actuary. Your role is \
to provide clear, concise, technically accurate commentary that helps the \
actuary quickly understand the workbook's purpose, structure, and risks.
Write in plain English. Be specific and direct. Do not pad responses with \
generic statements. If you are uncertain about something, say so briefly and \
explain what additional information would resolve the uncertainty. Never \
fabricate values or claim to have seen things not present in the data \
provided to you.\
"""
# Rough token budget per prompt (chars / 4 ≈ tokens)
_TOKEN_BUDGET_CHARS = 6000 * 4
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _call_api(client, user_prompt: str) -> str:
"""Make a single API call and return the response text."""
response = client.messages.create(
model=settings.AI_MODEL,
max_tokens=settings.AI_MAX_TOKENS,
system=_SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_prompt}],
)
return response.content[0].text
def _append_error(commentary: AICommentary, msg: str) -> None:
"""Append msg to api_error, preserving any existing error text."""
if commentary.api_error:
commentary.api_error += f" | {msg}"
else:
commentary.api_error = msg
# ---------------------------------------------------------------------------
# Feature 1 — Findings Narrative
# ---------------------------------------------------------------------------
def _build_findings_prompt(
digest: WorkbookDigest,
result: WorkbookAnalysisResult,
) -> str:
severity_order = {"High": 0, "Medium": 1, "Low": 2}
sorted_findings = sorted(
result.findings,
key=lambda f: (severity_order.get(f.severity, 3), f.check_id),
)
lines: list[str] = []
low_extra = 0
for f in sorted_findings:
if len(lines) >= 40:
if f.severity == "Low":
low_extra += 1
continue
location = f.sheet_name
if f.cell_ref:
location += f", Cell: {f.cell_ref}"
lines.append(
f"[{f.severity}] {f.category} — {f.description} (Sheet: {location})"
)
findings_block = "\n".join(lines)
if low_extra:
findings_block += f"\n(plus {low_extra} additional Low findings not shown)"
counts = result.count_by_severity()
return f"""\
Workbook: {digest.file_name}
RAG Rating: {result.rag_rating} (Risk Score: {result.total_score})
Findings: {counts['High']} High, {counts['Medium']} Medium, {counts['Low']} Low
Findings detail:
{findings_block}
---
Write a findings narrative for the executive summary of a model governance \
report. The narrative should:
- Open with an overall characterisation of the risk level and the most \
important issue(s) to address
- Group related findings into themes rather than listing them one by one
- Call out any findings on actuarially sensitive sheets specifically
- Close with a brief statement of what actions are recommended before the \
workbook is used in production
- Length: 150 to 250 words
- Tone: professional, direct, suitable for a senior actuary or risk manager\
"""
# ---------------------------------------------------------------------------
# Feature 2 — Workbook Purpose and Sheet Narratives
# ---------------------------------------------------------------------------
def _build_purpose_prompt(digest: WorkbookDigest) -> str:
sheet_list_parts: list[str] = []
for ss in digest.sheet_summaries:
flag = " [SENSITIVE]" if ss.is_sensitive else ""
sheet_list_parts.append(f"{ss.name}{flag}")
sheet_list = ", ".join(sheet_list_parts)
# Per-sheet detail block, capped at AI_MAX_SHEETS_FOR_NARRATIVE
detail_lines: list[str] = []
for ss in digest.sheet_summaries[: settings.AI_MAX_SHEETS_FOR_NARRATIVE]:
detail_lines.append(f"\nSheet: {ss.name}")
detail_lines.append(f" Dimensions: {ss.row_count} rows x {ss.col_count} cols")
if ss.headers:
detail_lines.append(f" Headers/labels: {', '.join(ss.headers[:20])}")
if ss.top_formulas:
detail_lines.append(f" Top formulas (up to 5): {'; '.join(ss.top_formulas[:5])}")
if ss.named_ranges:
detail_lines.append(f" Named ranges: {', '.join(ss.named_ranges)}")
if ss.references_sheets:
detail_lines.append(f" References sheets: {', '.join(ss.references_sheets)}")
if len(digest.sheet_summaries) > settings.AI_MAX_SHEETS_FOR_NARRATIVE:
extra = len(digest.sheet_summaries) - settings.AI_MAX_SHEETS_FOR_NARRATIVE
detail_lines.append(f"\n(plus {extra} additional sheets not detailed above)")
per_sheet_block = "\n".join(detail_lines)
# Named ranges — names only to avoid token bloat
named_ranges_list = (
", ".join(list(digest.workbook_named_ranges.keys())[:40])
or "(none)"
)
if digest.vba_present:
if digest.vba_module_names:
vba_summary = f"Present. Modules: {', '.join(digest.vba_module_names)}"
else:
vba_summary = "Present (module names not available)"
else:
vba_summary = "Not present"
return f"""\
Workbook: {digest.file_name}
Sheets (in order): {sheet_list}
Sheet details:
{per_sheet_block}
Workbook-level named ranges: {named_ranges_list}
VBA: {vba_summary}
---
Provide two things:
1. WORKBOOK PURPOSE (100–180 words)
A plain-English description of what this workbook does, the type of model \
or analysis it represents, the likely business context, and the overall \
calculation flow from inputs to outputs. If the workbook type is uncertain, \
state the most likely interpretation and briefly note what is ambiguous.
2. SHEET NARRATIVES
For each sheet, write 2–4 sentences describing: what the sheet contains or \
calculates, its role in the overall model, and any notable structural \
observations (e.g. it appears to be a lookup table, a results output, a \
control sheet, an assumption repository). If a sheet name is ambiguous, \
say so.
Format your response as valid JSON matching this schema exactly:
{{
"workbook_purpose": "<string>",
"sheet_narratives": {{
"<sheet_name>": "<narrative string>",
...
}}
}}
Return only the JSON object. No preamble, no markdown fences.\
"""
# ---------------------------------------------------------------------------
# Feature 3 — Key Formula Explanations
# ---------------------------------------------------------------------------
def _collect_top_formulas(digest: WorkbookDigest) -> list[tuple[str, str, str]]:
"""
Return up to 25 (sheet_name, cell_address, formula) tuples.
First 15 slots from sensitive sheets, remainder from others.
"""
from models import SheetSummary # avoid circular at module level
import settings as s
def _formula_complexity(f: str) -> int:
return len(f) + f.count("(") * 5
sensitive: list[tuple[int, str, str, str]] = []
other: list[tuple[int, str, str, str]] = []
for ss in digest.sheet_summaries:
for entry in ss.top_formulas:
# entry format: "ADDR: =FORMULA..." or "ADDR: =FORMULA......"
if ": " in entry:
addr, formula = entry.split(": ", 1)
else:
addr, formula = "", entry
score = _formula_complexity(formula)
if ss.is_sensitive:
sensitive.append((score, ss.name, addr, formula))
else:
other.append((score, ss.name, addr, formula))
sensitive.sort(key=lambda x: x[0], reverse=True)
other.sort(key=lambda x: x[0], reverse=True)
chosen: list[tuple[str, str, str]] = []
for score, sheet, addr, formula in sensitive[:15]:
chosen.append((sheet, addr, formula))
for score, sheet, addr, formula in other[: max(0, 25 - len(chosen))]:
chosen.append((sheet, addr, formula))
return chosen[:25]
def _build_formulas_prompt(digest: WorkbookDigest) -> str:
top = _collect_top_formulas(digest)
lines: list[str] = []
for sheet, addr, formula in top:
trunc = formula[: settings.AI_MAX_FORMULA_CHARS]
if len(formula) > settings.AI_MAX_FORMULA_CHARS:
trunc += "..."
lines.append(f"{sheet}::{addr} — {trunc}")
formula_list = "\n".join(lines) or "(no formulas found)"
return f"""\
Workbook: {digest.file_name}
The following are the most structurally complex formulas in this workbook, \
selected by formula length and nesting depth:
{formula_list}
(Format: SHEET::CELL — FORMULA)
---
For each formula, provide a plain-English explanation of what it calculates. \
Your explanation should:
- Describe the purpose of the calculation in business/actuarial terms where \
inferable from context (sheet name, cell address, surrounding named ranges)
- Identify the key inputs and what they represent
- Flag any structural concerns (e.g. deeply nested IFs that are hard to audit, \
approximate-match lookups, volatile functions)
- Be 2–5 sentences per formula
Format your response as valid JSON matching this schema exactly:
[
{{
"sheet_name": "<string>",
"cell_address": "<string>",
"formula": "<string>",
"explanation": "<string>"
}},
...
]
Return only the JSON array. No preamble, no markdown fences.\
"""
# ---------------------------------------------------------------------------
# Feature 4 — Assumption / Input Commentary
# ---------------------------------------------------------------------------
def _build_assumption_prompt(digest: WorkbookDigest) -> str:
# Identify sheets referenced by other sheets (likely input/assumption sheets)
all_referenced: set[str] = set()
for ss in digest.sheet_summaries:
for ref in ss.references_sheets:
all_referenced.add(ref)
# Formula density per sheet: formula cells / total top_formulas count is a proxy
# Use top_formulas count as proxy for formula density — fewer formulas = more input
def _formula_count(ss) -> int:
return len(ss.top_formulas)
# Sort by ascending formula count (formula-free = likely input)
sheets_by_density = sorted(digest.sheet_summaries, key=_formula_count)
input_candidates = [
ss for ss in sheets_by_density
if ss.name in all_referenced or _formula_count(ss) == 0
][:10]
# Also include 5 most formula-free sheets not already in candidates
formula_free_names = {ss.name for ss in input_candidates}
for ss in sheets_by_density:
if len(input_candidates) >= 10:
break
if ss.name not in formula_free_names:
input_candidates.append(ss)
formula_free_names.add(ss.name)
# Build detail block
lines: list[str] = []
for ss in input_candidates:
sensitive_flag = " [SENSITIVE]" if ss.is_sensitive else ""
referenced_flag = " [referenced by other sheets]" if ss.name in all_referenced else ""
lines.append(f"\nSheet: {ss.name}{sensitive_flag}{referenced_flag}")
if ss.headers:
lines.append(f" Headers/labels: {', '.join(ss.headers[:20])}")
if ss.named_ranges:
lines.append(f" Named ranges: {', '.join(ss.named_ranges)}")
input_block = "\n".join(lines) or "(no clear input sheets identified)"
# Named ranges with values
nr_lines: list[str] = []
for name, val in list(digest.workbook_named_ranges.items())[:40]:
nr_lines.append(f" {name} = {val[:100]}")
named_ranges_block = "\n".join(nr_lines) or "(none)"
return f"""\
Workbook: {digest.file_name}
The following information has been extracted from sheets that appear to \
contain input parameters or assumptions (identified by low formula density \
and/or being referenced by other sheets):
{input_block}
Named ranges with values:
{named_ranges_block}
---
Provide assumption and input parameter commentary covering:
1. PARAMETER DISCOVERY
Identify what appear to be the key input parameters or assumption sets in \
this workbook. Do not restrict yourself to actuarial assumptions — identify \
whatever inputs drive the calculations, regardless of domain. Group them \
logically (e.g. "rate assumptions", "volume inputs", "expense parameters").
2. PLAUSIBILITY COMMENTARY
For each parameter group identified, comment on whether the values appear \
internally consistent (e.g. rates within a plausible range, consistent \
units, no obvious sign errors, no suspicious round numbers that might \
indicate placeholders). Be specific about any values that look anomalous. \
If you cannot assess plausibility from the data available, say so briefly.
3. STRUCTURAL OBSERVATIONS
Comment on how assumptions flow into calculations — e.g. whether they are \
centralised in one sheet or scattered, whether named ranges are used \
consistently, whether there are hardcoded values in formula sheets that \
should instead be parameterised.
Length: 200–350 words total.
Tone: professional, advisory, suitable for a model governance review.\
"""
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def run_ai_analysis(
digest: WorkbookDigest,
findings_result: WorkbookAnalysisResult,
api_key: str,
run_findings_narrative: bool = True,
run_purpose_and_sheets: bool = True,
run_formula_explanations: bool = True,
run_assumption_commentary: bool = True,
) -> AICommentary:
"""
Run enabled AI features against the workbook digest and return AICommentary.
Individual feature failures are captured in AICommentary.api_error and do
not prevent the remaining features from running.
Args:
digest: WorkbookDigest from digest_builder.
findings_result: WorkbookAnalysisResult from deterministic pass.
api_key: Anthropic API key (session-only, never logged).
run_findings_narrative: Enable Feature 1.
run_purpose_and_sheets: Enable Feature 2.
run_formula_explanations: Enable Feature 3.
run_assumption_commentary: Enable Feature 4.
Returns:
AICommentary with all available fields populated.
"""
import anthropic
commentary = AICommentary()
try:
client = anthropic.Anthropic(api_key=api_key)
except Exception as exc:
_append_error(commentary, f"Failed to initialise Anthropic client: {exc}")
return commentary
# ------------------------------------------------------------------
# Feature 1 — Findings Narrative
# ------------------------------------------------------------------
if run_findings_narrative:
try:
prompt = _build_findings_prompt(digest, findings_result)
commentary.findings_narrative = _call_api(client, prompt)
except Exception as exc:
_append_error(commentary, f"Feature 1 (Findings Narrative) failed: {exc}")
# ------------------------------------------------------------------
# Feature 2 — Workbook Purpose + Sheet Narratives
# ------------------------------------------------------------------
if run_purpose_and_sheets:
try:
prompt = _build_purpose_prompt(digest)
raw = _call_api(client, prompt)
try:
parsed = json.loads(raw)
commentary.workbook_purpose = parsed.get("workbook_purpose", "")
sheet_narr_dict: dict = parsed.get("sheet_narratives", {})
commentary.sheet_narratives = [
SheetNarrative(sheet_name=k, narrative=v)
for k, v in sheet_narr_dict.items()
if isinstance(k, str) and isinstance(v, str)
]
except json.JSONDecodeError:
# Fallback: store raw text in workbook_purpose
commentary.workbook_purpose = raw
commentary.sheet_narratives = []
except Exception as exc:
_append_error(commentary, f"Feature 2 (Purpose & Sheets) failed: {exc}")
# ------------------------------------------------------------------
# Feature 3 — Key Formula Explanations
# ------------------------------------------------------------------
if run_formula_explanations:
try:
prompt = _build_formulas_prompt(digest)
raw = _call_api(client, prompt)
try:
items = json.loads(raw)
commentary.formula_explanations = [
FormulaExplanation(
sheet_name=item.get("sheet_name", ""),
cell_address=item.get("cell_address", ""),
formula=item.get("formula", ""),
explanation=item.get("explanation", ""),
)
for item in items
if isinstance(item, dict)
]
except json.JSONDecodeError:
commentary.formula_explanations = [
FormulaExplanation(
sheet_name="Parse Error",
cell_address="",
formula="",
explanation=raw,
)
]
except Exception as exc:
_append_error(commentary, f"Feature 3 (Formula Explanations) failed: {exc}")
# ------------------------------------------------------------------
# Feature 4 — Assumption / Input Commentary
# ------------------------------------------------------------------
if run_assumption_commentary:
try:
prompt = _build_assumption_prompt(digest)
commentary.assumption_commentary = _call_api(client, prompt)
except Exception as exc:
_append_error(commentary, f"Feature 4 (Assumption Commentary) failed: {exc}")
return commentary