-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathai_helpers.py
More file actions
684 lines (586 loc) · 22.9 KB
/
ai_helpers.py
File metadata and controls
684 lines (586 loc) · 22.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
import logging
import traceback
import typing as t
from enum import Enum
from threading import Timer
import ipywidgets as widgets
import markdown2 as md
import openai
from IPython.display import Code, display, display_html
from openai.types.chat import (
ChatCompletionMessage,
ChatCompletionMessageParam,
ParsedChatCompletionMessage,
)
from pydantic import BaseModel
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_fixed,
wait_random,
)
from .exceptions import (
APIConnectionError,
InvalidAPIKeyError,
InvalidModelError,
UnexpectedAPIError,
ValidationResult,
)
if t.TYPE_CHECKING:
from .helpers import IPytestResult
# Set logger
logger = logging.getLogger()
class ExplanationStep(BaseModel):
"""A single step in the explanation"""
title: t.Optional[str]
content: str
class CodeSnippet(BaseModel):
"""A code snippet with optional description"""
code: str
description: t.Optional[str]
class Explanation(BaseModel):
"""A structured explanation with steps, code snippets, and hints"""
summary: str
steps: t.List[ExplanationStep]
code_snippets: t.List[CodeSnippet]
hints: t.List[str]
class OpenAIWrapper:
"""A simple API wrapper adapted for IPython environments"""
# These are the models we can use: they must support structured responses
GPT_MODELS = (
"gpt-4o",
"gpt-4o-mini",
"gpt-4.1",
"gpt-4.1-mini",
"gpt-4.1-nano",
"o4-mini",
)
DEFAULT_MODEL = "gpt-4o-mini"
DEFAULT_LANGUAGE = "English"
_instance = None
def __new__(cls, *args, **kwargs) -> "OpenAIWrapper":
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def create_validated(
cls,
api_key: str,
model: t.Optional[str] = None,
language: t.Optional[str] = None,
) -> t.Tuple["OpenAIWrapper", ValidationResult]:
instance = cls.__new__(cls)
# Only initialize if not already
if not hasattr(instance, "client"):
instance.api_key = api_key
instance.language = language or cls.DEFAULT_LANGUAGE
instance.model = model or cls.DEFAULT_MODEL
instance.client = openai.OpenAI(api_key=api_key)
# Validate the model
model_validation = instance.validate_model(instance.model)
return instance, model_validation
@classmethod
def validate_api_key(cls, api_key: t.Optional[str]) -> ValidationResult:
"""Validate the OpenAI API key"""
if not api_key:
return ValidationResult(
is_valid=False,
error=InvalidAPIKeyError("API key is missing."),
message="OpenAI API key is not provided.",
)
try:
client = openai.OpenAI(api_key=api_key)
client.models.list() # the simplest API call to verify the API
except openai.AuthenticationError:
return ValidationResult(
is_valid=False,
error=InvalidAPIKeyError("The provided API key is invalid."),
message="Invalid OpenAI API key. Please, double check it.",
)
except openai.APIConnectionError:
return ValidationResult(
is_valid=False,
error=APIConnectionError("Unable to connect to OpenAI."),
message="Could not connect to OpenAI. Please, check your internet connection.",
)
except Exception as e:
return ValidationResult(
is_valid=False,
error=UnexpectedAPIError(f"Unexpected error: {e}"),
message="An unexpected error occurred while validating API key.",
)
else:
return ValidationResult(is_valid=True)
def validate_model(self, model: t.Optional[str]) -> ValidationResult:
"""Validate the model selection"""
try:
if model not in self.GPT_MODELS:
return ValidationResult(
is_valid=False,
error=InvalidModelError(),
message=f"Invalid model: {model}. Available models: {' '.join(self.GPT_MODELS)}",
)
except Exception as e:
return ValidationResult(
is_valid=False,
error=UnexpectedAPIError(f"Error validating model: {e}"),
message="Unexpected error during model validation",
)
return ValidationResult(is_valid=True)
def __init__(
self,
api_key: t.Optional[str],
model: t.Optional[str] = None,
language: t.Optional[str] = None,
) -> None:
"""Initialize the wrapper for OpenAI API with logging and checks"""
# Avoid reinitializing the client
if hasattr(self, "client"):
return
# Validate the API key
validation = self.validate_api_key(api_key)
if not validation.is_valid:
assert validation.error is not None # for type checking
raise validation.error
self.api_key = api_key
self.language = language or self.DEFAULT_LANGUAGE
self.client = openai.OpenAI(api_key=self.api_key)
self.model = model or self.DEFAULT_MODEL
model_validation = self.validate_model(self.model)
if not model_validation.is_valid:
assert model_validation.error is not None # type checking
raise model_validation.error
def change_model(self, model: str) -> None:
"""Change the active OpenAI model in use"""
validation = self.validate_model(model)
if not validation.is_valid:
assert validation.error is not None # type checking
logger.exception("Error changing model")
raise validation.error
self.model = model
logger.info("Model changed to %s", self.model)
@retry(
retry=retry_if_exception_type(openai.RateLimitError),
stop=stop_after_attempt(3),
wait=wait_fixed(10) + wait_random(0, 5),
)
def get_chat_response(
self, query: str, *args, **kwargs
) -> ParsedChatCompletionMessage | ChatCompletionMessage:
"""Fetch a completion from the chat model"""
system_prompt = (
"As an expert Python developer, provide clear and concise explanations of error tracebacks, "
"focusing on the root cause for users with minimal Python experience. "
"Follow these guidelines strictly:\n"
"- Offer hints, even for trivial errors.\n"
"- Take into account the number of attempts made by providing increasingly detailed hints after a failed attempt.\n"
"- Do not provide verbatim solutions, only guidance.\n"
f"- Respond in {self.language}.\n"
"- Any text or string must be written in Markdown."
)
messages: t.List[ChatCompletionMessageParam] = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
]
try:
response = self.client.beta.chat.completions.parse(
model=self.model,
messages=messages,
response_format=Explanation,
**kwargs,
)
except openai.APIError:
logger.exception("API error encountered.")
raise
except openai.LengthFinishReasonError:
logger.exception("Input prompt has too many tokens.")
raise
else:
return response.choices[0].message
class ButtonState(Enum):
"""The state of the explanation button"""
READY = "ready"
LOADING = "loading"
WAIT = "waiting"
class AIExplanation:
"""Class representing an AI-generated explanation"""
_STYLES = """
<style>
.ai-container {
margin-top: 1.5rem;
font-family: system-ui, -apple-system, sans-serif;
}
.ai-header {
display: flex;
align-items: center;
gap: 0.75rem;
margin-bottom: 1rem;
}
.ai-title {
font-size: 1.1rem;
font-weight: 500;
}
.ai-button {
background-color: #4b88ff;
color: white;
border: none;
padding: 0;
border-radius: 0.5rem;
font-weight: 500;
transition: all 0.2s ease;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
min-width: 200px;
white-space: nowrap;
text-overflow: ellipsis;
overflow: hidden;
}
.ai-button:hover:not(:disabled) {
background-color: #3b7bff;
transform: translateY(-1px);
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.15);
}
.ai-button:disabled {
background-color: #94a3b8;
cursor: not-allowed;
transform: none;
box-shadow: none;
}
.ai-timer {
font-size: 0.9rem;
color: #64748b;
margin-left: 1rem;
font-weight: 500;
}
.ai-content {
margin-top: 1rem;
border: 1px solid #e5e7eb;
border-radius: 0.5rem;
overflow: hidden;
}
.ai-explanation h3 {
margin: 0 0 1rem 0;
font-size: 1.1rem;
color: #1f2937;
}
.ai-explanation .jupyter-widgets.accordion {
margin: 0.75rem 0;
border: 1px solid #e5e7eb;
border-radius: 0.375rem;
overflow: hidden;
}
.ai-explanation .jupyter-widgets.accordion > .accordion-header {
background: #f9fafb;
padding: 0.75rem 1rem;
font-weight: 500;
}
.ai-explanation .jupyter-widgets.accordion > .accordion-content {
padding: 1rem;
background: white;
}
.ai-explanation code {
background: #f3f4f6;
padding: 0.2rem 0.4rem;
border-radius: 0.25rem;
font-size: 0.9em;
}
.ai-explanation pre {
background: #f8f9fa;
padding: 1rem;
border-radius: 0.375rem;
overflow-x: auto;
}
</style>
"""
def __init__(
self,
ipytest_result: "IPytestResult",
openai_client: "OpenAIWrapper",
exception: t.Optional[BaseException] = None,
wait_time: int = 60, # Wait time in seconds
) -> None:
"""Public constructor for an explanation widget"""
self.ipytest_result = ipytest_result
self.exception = exception
self.openai_client = openai_client
# The output widget for displaying the explanation
self._output = widgets.Output()
# Timer and state
self._timer: t.Optional[Timer] = None
self._is_throttled = False
self._wait_time = float(wait_time)
self._remaining_time = float(wait_time)
# The button widget for fetching the explanation
self._button_styles = {
ButtonState.READY: {
"description": "Get AI Explanation",
"icon": "search",
"disabled": False,
},
ButtonState.LOADING: {
"description": "Loading...",
"icon": "spinner",
"disabled": True,
},
ButtonState.WAIT: {
"description": "Please wait",
"icon": "hourglass-start",
"disabled": True,
},
}
# Set a default query
self._query_template = (
"I wrote the following Python function:\n\n"
"{function_code}\n\n"
"Whose docstring describes the purpose, arguments, and expected return values:\n\n"
"{docstring}\n\n"
"Running pytest on this function failed, and here is the error traceback I got:\n\n"
"{traceback}\n\n"
"Consider that this is my {attempt_number} attempt."
)
# Default query params
self._query_params = {
"function_code": "",
"docstring": "",
"traceback": "",
"attempt_number": self.ipytest_result.test_attempts,
}
# Create a header with button and timer
self._header = widgets.Box(
layout=widgets.Layout(
display="flex",
align_items="center",
margin="0 0 1rem 0",
)
)
# Create a timer display
self._timer_display = widgets.HTML(
value="", layout=widgets.Layout(margin="0 0 0 0.75rem")
)
# Initialize the button
self._current_state = ButtonState.READY
self._button = widgets.Button()
self._update_button_state(ButtonState.READY)
self._button.on_click(self._handle_click)
self._button.observe(self._handle_state_change, names=["disabled"])
def render(self) -> widgets.Widget:
"""Return a single widget containing all the components"""
style_html = widgets.HTML(self._STYLES)
header_html = widgets.HTML(
'<div class="ai-header">'
'<span class="ai-title">🤖 Explain With AI</span>'
"</div>"
)
button_container = widgets.Box(
[
self._button,
self._timer_display,
],
layout=widgets.Layout(display="flex", align_iterms="center"),
)
# Create the rendered container
container = widgets.VBox(
children=[
style_html,
header_html,
button_container,
self._output,
],
layout=widgets.Layout(margin="1rem 0 0 0", padding="0"),
)
return container
def set_query_template(self, template: str) -> None:
"""Set the query template"""
self._query_template = template
def query_params(self, *args, **kwargs: t.Any) -> None:
"""Add/update multiple query parameters"""
self._query_params.update(kwargs)
@property
def query(self) -> str:
"""Generate the query string"""
logger.debug("Building a query with parameters: %s", self._query_params)
try:
return self._query_template.format(**self._query_params)
except KeyError as e:
logger.exception("Missing key in query parameter")
raise ValueError from e
def _update_remaining_time(self):
"""Update the button label with remaining time"""
self._remaining_time = max(0, self._remaining_time - 1)
if self._is_throttled:
self._update_button_state(ButtonState.WAIT)
def _update_button_state(self, state: ButtonState) -> None:
"""Update the button state"""
self._current_state = state
style = self._button_styles[state].copy()
# Update the timer display
if state == ButtonState.WAIT:
self._timer_display.value = (
'<span class="ai-timer">Available in '
f"{int(self._remaining_time)} "
"seconds</span>"
)
else:
self._timer_display.value = ""
self._button.add_class("ai-button")
self._button.description = style["description"]
self._button.icon = style["icon"]
self._button.disabled = style["disabled"]
def _handle_state_change(self, change):
"""Handle the state change of the button"""
if change["new"]:
if self._is_throttled:
self._update_button_state(ButtonState.WAIT)
else:
self._is_throttled = False
self._update_button_state(ButtonState.READY)
def _enable_button(self):
"""Enable the button after a delay"""
self._button.disabled = False
self._timer = None
self._remaining_time = self._wait_time
def _handle_click(self, _) -> None:
"""Handle the button click event with throttling"""
if self._is_throttled:
self._update_button_state(ButtonState.WAIT)
return
self._is_throttled = True
self._button.disabled = True
self._remaining_time = self._wait_time
def update_timer():
if self._remaining_time > 0:
self._update_remaining_time()
self._timer = Timer(1, update_timer)
self._timer.start()
else:
self._enable_button()
self._timer = Timer(1.0, update_timer)
self._timer.start()
# Call the method to fetch the explanation
self._fetch_explanation()
def _fetch_explanation(self) -> None:
"""Fetch the explanation from OpenAI API"""
from .helpers import IPytestOutcome
logger.debug("Attempting to fetch explanation from OpenAI API.")
if not self.openai_client:
return
self._update_button_state(ButtonState.LOADING)
if self.exception:
traceback_str = "".join(traceback.format_exception_only(self.exception))
logger.debug("Formatted traceback: %s", traceback_str)
else:
traceback_str = "No traceback available."
with self._output:
self._output.clear_output()
try:
# assert self.ipytest_result.function is not None
match self.ipytest_result.status:
case IPytestOutcome.FINISHED if (
self.ipytest_result.function is not None
):
self.query_params(
function_code=self.ipytest_result.function.source_code,
docstring=self.ipytest_result.function.implementation.__doc__,
traceback=traceback_str,
)
case _:
self.query_params(
function_code=self.ipytest_result.cell_content,
docstring="(Find it in the function's definition above.)",
traceback=traceback_str,
)
response = self.openai_client.get_chat_response(
self.query,
temperature=0.2,
)
logger.debug("Received response: %s", response)
formatted_response = self._format_explanation(response)
logger.debug("Formatted response: %s", formatted_response)
if formatted_response:
display(widgets.VBox(children=formatted_response))
else:
display(widgets.HTML("<p>No explanation could be generated.</p>"))
except Exception as e:
logger.exception("An error occurred while fetching the explanation.")
display_html(f"<p>Failed to fetch explanation: {e}</p>", raw=True)
finally:
if self._is_throttled:
self._update_button_state(ButtonState.WAIT)
else:
self._update_button_state(ButtonState.READY)
def _format_explanation(
self, chat_response: ParsedChatCompletionMessage | ChatCompletionMessage
) -> t.Optional[t.List[t.Any]]:
"""Format the explanation response for display"""
# Initialize the Markdown to HTML converter
def to_html(text: t.Any) -> str:
"""Markdown to HTML converter"""
return md.markdown(str(text))
# Reset the explanation object
explanation = None
# A list to store all the widgets
widgets_list = []
if (
isinstance(chat_response, ParsedChatCompletionMessage)
and (explanation := chat_response.parsed) is not None
):
logger.debug("Response is a valid `Explanation` object that can be parsed.")
# A summary of the explanation
summary_widget = widgets.HTML(f"<h3>{to_html(explanation.summary)}</h3>")
widgets_list.append(summary_widget)
# Add steps as Accordion widgets
steps_widgets = []
for i, step in enumerate(explanation.steps, start=1):
step_title = step.title or f"Step {i}"
step_content = widgets.HTML(to_html(step.content))
step_accordion = widgets.Accordion(
children=[step_content], titles=(step_title,)
)
steps_widgets.append(step_accordion)
widgets_list.extend(steps_widgets)
# Add code snippets using Code widgets
if explanation.code_snippets:
for i, snippet in enumerate(explanation.code_snippets, start=1):
snippet_output = widgets.Output()
snippet_description = widgets.HTML(to_html(snippet.description))
snippet_output.append_display_data(snippet_description)
snippet_code = Code(language="python", data=snippet.code)
snippet_output.append_display_data(snippet_code)
snippet_accordion = widgets.Accordion(
children=[snippet_output], titles=(f"Code Snippet #{i}",)
)
widgets_list.append(snippet_accordion)
# Add hints as bullet points
if explanation.hints:
hints_html = (
"<ul>"
+ "".join(f"<li>{to_html(hint)}</li>" for hint in explanation.hints)
+ "</ul>"
)
hints_widget = widgets.Accordion(
children=[widgets.HTML(hints_html)],
titles=("Hints",),
)
widgets_list.append(hints_widget)
elif (
isinstance(chat_response, ChatCompletionMessage)
and (explanation := chat_response.content) is not None
):
logger.debug(
"Response is not a structured `Explanation` object, returning as-is."
)
explanation = (
explanation.removeprefix("```html").removesuffix("```").strip()
)
widgets_list.append(widgets.HTML(to_html(explanation)))
if explanation is not None:
# Wrap everything in a styled container
container = widgets.VBox(
children=[widgets.HTML('<div class="ai-explanation">')]
+ widgets_list
+ [widgets.HTML("</div>")]
)
return [container]
else:
logger.debug("Failed to parse explanation.")
return None