-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm.py
More file actions
173 lines (169 loc) · 6.77 KB
/
Copy pathllm.py
File metadata and controls
173 lines (169 loc) · 6.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
from fastapi import APIRouter, HTTPException, Query, Request, Depends
from app.utils.make_meta import make_meta
from app.utils.db import get_db_connection_direct
from app.utils.api_key_auth import get_api_key
router = APIRouter()
@router.get("/llm")
def get_llm_records(
request: Request,
page: int = Query(1, ge=1, description="Page number (1-based)"),
page_size: int = Query(10, ge=1, le=100, description="Records per page"),
prospect_id: int = Query(None, description="Filter by prospect_id"),
api_key: str = Depends(get_api_key)
) -> dict:
"""GET /llm: Paginated list of LLM completions."""
try:
conn = get_db_connection_direct()
cur = conn.cursor()
if prospect_id is not None:
# No pagination for single prospect_id lookup
select_query = """
SELECT id, prompt, completion, duration, time, data, model, prospect_id, search_vector
FROM llm
WHERE prospect_id = %s
ORDER BY id DESC
"""
cur.execute(select_query, (prospect_id,))
rows = cur.fetchall()
records = [
{
"id": row[0],
"prompt": row[1],
"completion": row[2],
"duration": row[3],
"time": row[4].isoformat() if row[4] else None,
"data": row[5],
"model": row[6],
"prospect_id": row[7],
"search_vector": str(row[8]) if row[8] is not None else None,
}
for row in rows
]
cur.close()
conn.close()
if records:
meta = make_meta("success", f"Found {len(records)} record(s) for prospect_id {prospect_id}")
return {
"meta": meta,
"data": records,
}
else:
meta = make_meta("warning", f"No records found for prospect_id {prospect_id}")
return {
"meta": meta,
"data": [],
}
else:
offset = (page - 1) * page_size
cur.execute("SELECT COUNT(*) FROM llm;")
count_row = cur.fetchone()
total = count_row[0] if count_row and count_row[0] is not None else 0
cur.execute("""
SELECT id, prompt, completion, duration, time, data, model, prospect_id, search_vector
FROM llm
ORDER BY id DESC
LIMIT %s OFFSET %s;
""", (page_size, offset))
records = [
{
"id": row[0],
"prompt": row[1],
"completion": row[2],
"duration": row[3],
"time": row[4].isoformat() if row[4] else None,
"data": row[5],
"model": row[6],
"prospect_id": row[7],
"search_vector": str(row[8]) if row[8] is not None else None,
}
for row in cur.fetchall()
]
cur.close()
conn.close()
meta = make_meta("success", f"LLM {len(records)} records (page {page})")
return {
"meta": meta,
"data": {
"page": page,
"page_size": page_size,
"total": total,
"pages": (total + page_size - 1) // page_size,
"data": records,
},
}
except Exception as e:
meta = make_meta("error", f"DB error: {str(e)}")
return {"meta": meta, "data": {}}
@router.post("/llm")
def llm_post(payload: dict) -> dict:
"""POST /llm: send prompt to Gemini, returns completion google-genai SDK."""
prompt = payload.get("prompt")
prospect_id = payload.get("prospect_id")
if not prompt:
raise HTTPException(status_code=400, detail="Missing 'prompt' in request body.")
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise HTTPException(status_code=500, detail="Gemini API key not configured.")
import logging
try:
from google import genai
import time as time_mod
client = genai.Client(api_key=api_key)
model_names = [
"models/gemini-flash-latest",
"models/gemini-1.5-pro",
"models/gemini-1.5-flash",
"models/gemini-1.0-pro",
"models/gemini-pro",
"models/gemini-pro-vision"
]
response = None
completion = None
used_model = None
errors = {}
start_time = time_mod.time()
for model_name in model_names:
try:
response = client.models.generate_content(model=model_name, contents=prompt)
completion = getattr(response, "text", None)
if completion:
used_model = model_name
break
except Exception as e:
errors[model_name] = str(e)
continue
duration = time_mod.time() - start_time
if not completion:
error_details = " | ".join([f"{k}: {v}" for k, v in errors.items()])
raise Exception(f"No available Gemini model succeeded for generate_content with your API key. Details: {error_details}")
# Insert record into llm table
record_id = None
try:
import json
from app import __version__
data_blob = json.dumps({"version": __version__})
conn = get_db_connection_direct()
cur = conn.cursor()
# Generate tsvector from prompt and completion
cur.execute(
"""
INSERT INTO llm (prompt, completion, duration, data, model, prospect_id, search_vector)
VALUES (%s, %s, %s, %s, %s, %s, to_tsvector('english', %s || ' ' || %s))
RETURNING id;
""",
(prompt, completion, duration, data_blob, used_model, prospect_id, prompt, completion)
)
record_id_row = cur.fetchone()
record_id = record_id_row[0] if record_id_row else None
conn.commit()
cur.close()
conn.close()
except Exception as db_exc:
# Log DB error but do not fail the API response
logging.error(f"Failed to insert llm record: {db_exc}")
meta = make_meta("success", f"Gemini completion received from {used_model}")
return {"meta": meta, "data": {"id": record_id, "prompt": prompt, "completion": completion}}
except Exception as e:
meta = make_meta("error", f"Gemini API error: {str(e)}")
return {"meta": meta, "data": {}}