-
Notifications
You must be signed in to change notification settings - Fork 270
Expand file tree
/
Copy pathsqldb_service.py
More file actions
362 lines (319 loc) · 14.5 KB
/
Copy pathsqldb_service.py
File metadata and controls
362 lines (319 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
from datetime import datetime
import struct
import pandas as pd
from pydantic import BaseModel
from api.models.input_models import ChartFilters
from common.config.config import Config
import logging
from helpers.azure_credential_utils import get_azure_credential_async
import pyodbc
class SQLTool(BaseModel):
model_config = {"arbitrary_types_allowed": True}
conn: pyodbc.Connection
async def get_sql_response(self, sql_query: str) -> str:
cursor = None
try:
cursor = self.conn.cursor()
cursor.execute(sql_query)
result = ''.join(str(row) for row in cursor.fetchall())
return result
except Exception as e:
logging.error("Error executing SQL query: %s", e)
return f"Error executing SQL query: {str(e)}"
finally:
if cursor:
cursor.close()
async def get_db_connection():
"""Get a connection to the SQL database"""
config = Config()
server = config.sqldb_server
database = config.sqldb_database
mid_id = config.azure_client_id
credential = None
try:
credential = await get_azure_credential_async(client_id=mid_id)
token = await credential.get_token("https://database.windows.net/.default")
token_bytes = token.token.encode("utf-16-LE")
token_struct = struct.pack(
f"<I{len(token_bytes)}s",
len(token_bytes),
token_bytes
)
SQL_COPT_SS_ACCESS_TOKEN = 1256
# Try both ODBC Driver 18 and 17
conn = None
for driver in ["{ODBC Driver 18 for SQL Server}", "{ODBC Driver 17 for SQL Server}"]:
try:
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};"
conn = pyodbc.connect(
connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}
)
logging.info(f"Connected using Azure Credential with {driver}")
return conn
except pyodbc.Error:
continue
if conn is None:
raise RuntimeError("Unable to connect using ODBC Driver 18 or 17 with Azure Credential")
return conn
except Exception as e:
logging.error("Failed with Azure Credential: %s", str(e))
raise RuntimeError("Unable to connect to SQL database using Microsoft Entra authentication.") from e
finally:
if credential and hasattr(credential, "close"):
await credential.close()
async def adjust_processed_data_dates():
"""
Adjusts the dates in the processed_data, km_processed_data, and processed_data_key_phrases tables
to align with the current date.
"""
conn = await get_db_connection()
cursor = None
try:
cursor = conn.cursor()
# Adjust the dates to the current date
today = datetime.today()
cursor.execute(
"SELECT MAX(CAST(StartTime AS DATETIME)) FROM [dbo].[processed_data]"
)
max_start_time = (cursor.fetchone())[0]
if max_start_time:
days_difference = (today.date() - max_start_time.date()).days - 1
if days_difference > 0:
# Update processed_data table
cursor.execute(
"UPDATE [dbo].[processed_data] SET StartTime = FORMAT(DATEADD(DAY, ?, StartTime), 'yyyy-MM-dd "
"HH:mm:ss'), EndTime = FORMAT(DATEADD(DAY, ?, EndTime), 'yyyy-MM-dd HH:mm:ss')",
(days_difference, days_difference)
)
# Update km_processed_data table
cursor.execute(
"UPDATE [dbo].[km_processed_data] SET StartTime = FORMAT(DATEADD(DAY, ?, StartTime), 'yyyy-MM-dd "
"HH:mm:ss'), EndTime = FORMAT(DATEADD(DAY, ?, EndTime), 'yyyy-MM-dd HH:mm:ss')",
(days_difference, days_difference)
)
# Update processed_data_key_phrases table
cursor.execute(
"UPDATE [dbo].[processed_data_key_phrases] SET StartTime = FORMAT(DATEADD(DAY, ?, StartTime), "
"'yyyy-MM-dd HH:mm:ss')", (days_difference,)
)
# Commit the changes
conn.commit()
finally:
if cursor:
cursor.close()
conn.close()
async def fetch_filters_data():
"""
Fetches filter data from the database and organizes it into a nested JSON structure.
"""
conn = await get_db_connection()
cursor = None
try:
cursor = conn.cursor()
sql_stmt = '''select 'Topic' as filter_name, mined_topic as displayValue, mined_topic as key1 from
(SELECT distinct mined_topic from processed_data) t
union all
select 'Sentiment' as filter_name, sentiment as displayValue, sentiment as key1 from
(SELECT distinct sentiment from processed_data
union all select 'all' as sentiment) t
union all
select 'Satisfaction' as filter_name, satisfied as displayValue, satisfied as key1 from
(SELECT distinct satisfied from processed_data) t
union all
select 'DateRange' as filter_name, date_range as displayValue, date_range as key1 from
(SELECT 'Last 7 days' as date_range
union all SELECT 'Last 14 days' as date_range
union all SELECT 'Last 90 days' as date_range
union all SELECT 'Year to Date' as date_range
) t'''
cursor.execute(sql_stmt)
rows = [tuple(row) for row in cursor.fetchall()]
# Define column names
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
df.rename(columns={'key1': 'key'}, inplace=True)
nested_json = (
df.groupby("filter_name")
.apply(lambda x: {
"filter_name": x.name,
"filter_values": x.to_dict(orient="records")
}, include_groups=False).to_list()
)
filters_data = nested_json
return filters_data
finally:
if cursor:
cursor.close()
conn.close()
async def fetch_chart_data(chart_filters: ChartFilters = ''):
"""
Fetches chart data from the database based on the provided filters and organizes it into a nested JSON structure.
"""
conn = await get_db_connection()
cursor = None
try:
cursor = conn.cursor()
where_clause = ''
req_body = ''
try:
req_body = chart_filters.model_dump()
except Exception: # model_dump may fail if filters are empty or invalid
pass
if req_body != '':
where_clause = ''
for key, value in req_body.items():
if key == 'selected_filters':
for k, v in value.items():
if k == 'Topic':
topics = ''
for topic in v:
topics += f''' '{topic}', '''
if where_clause:
where_clause += " and "
if topics:
where_clause += f" mined_topic in ({topics})"
where_clause = where_clause.replace(', )', ')')
elif k == 'Sentiment':
for sentiment in v:
if sentiment != 'all':
if where_clause:
where_clause += " and "
where_clause += f"sentiment = '{sentiment}'"
elif k == 'Satisfaction':
for satisfaction in v:
if where_clause:
where_clause += " and "
where_clause += f"satisfied = '{satisfaction}'"
elif k == 'DateRange':
for date_range in v:
if where_clause:
where_clause += " and "
if date_range == 'Last 7 days':
where_clause += "StartTime >= DATEADD(day, -7, GETDATE())"
elif date_range == 'Last 14 days':
where_clause += "StartTime >= DATEADD(day, -14, GETDATE())"
elif date_range == 'Last 90 days':
where_clause += "StartTime >= DATEADD(day, -90, GETDATE())"
elif date_range == 'Year to Date':
where_clause += "StartTime >= DATEADD(year, -1, GETDATE())"
if where_clause:
where_clause = f"where {where_clause} "
sql_stmt = (
f'''select 'TOTAL_CALLS' as id, 'Total Calls' as chart_name, 'card' as chart_type,
'Total Calls' as name, count(*) as value, '' as unit_of_measurement from [dbo].[processed_data] {where_clause}
union all
select 'AVG_HANDLING_TIME' as id, 'Average Handling Time' as chart_name, 'card' as chart_type,
'Average Handling Time' as name,
AVG(DATEDIFF(MINUTE, StartTime, EndTime)) as value, 'mins' as unit_of_measurement from [dbo].[processed_data] {where_clause}
union all
select 'SATISFIED' as id, 'Satisfied' as chart_name, 'card' as chart_type, 'Satisfied' as name,
round((CAST(SUM(CASE WHEN satisfied = 'yes' THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) * 100), 2) as value, '%' as unit_of_measurement from [dbo].[processed_data]
{where_clause}
union all
select 'SENTIMENT' as id, 'Topics Overview' as chart_name, 'donutchart' as chart_type,
sentiment as name,
(count(sentiment) * 100 / sum(count(sentiment)) over ()) as value,
'' as unit_of_measurement from [dbo].[processed_data] {where_clause}
group by sentiment
union all
select 'AVG_HANDLING_TIME_BY_TOPIC' as id, 'Average Handling Time By Topic' as chart_name, 'bar' as chart_type,
mined_topic as name,
AVG(DATEDIFF(MINUTE, StartTime, EndTime)) as value, '' as unit_of_measurement from [dbo].[processed_data] {where_clause}
group by mined_topic
''')
# charts pt1
cursor.execute(sql_stmt)
# rows = cursor.fetchall()
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
# charts pt1
nested_json1 = (
df.groupby(['id', 'chart_name', 'chart_type']).apply(
lambda x: x[['name', 'value', 'unit_of_measurement']].to_dict(orient='records'), include_groups=False).reset_index()
)
nested_json1.columns = ['id', 'chart_name', 'chart_type', 'chart_value']
result1 = nested_json1.to_dict(orient='records')
sql_stmt = f'''SELECT TOP 1 WITH TIES
mined_topic as name, 'TOPICS' as id, 'Trending Topics' as chart_name, 'table' as chart_type,
lower(sentiment) as average_sentiment,
COUNT(*) AS call_frequency
FROM [dbo].[processed_data]
{where_clause}
GROUP BY mined_topic, sentiment
ORDER BY ROW_NUMBER() OVER (PARTITION BY mined_topic ORDER BY COUNT(*) DESC)
'''
cursor.execute(sql_stmt)
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
# charts pt2
if not df.empty:
nested_json2 = (
df.groupby(['id', 'chart_name', 'chart_type']).apply(
lambda x: x[['name', 'call_frequency', 'average_sentiment']].to_dict(orient='records'),
include_groups=False
).reset_index()
)
nested_json2.columns = ['id', 'chart_name', 'chart_type', 'chart_value']
result2 = nested_json2.to_dict(orient='records')
else:
result2 = []
where_clause = where_clause.replace('mined_topic', 'topic')
sql_stmt = f'''select top 15 key_phrase as text,
'KEY_PHRASES' as id, 'Key Phrases' as chart_name, 'wordcloud' as chart_type,
call_frequency as size, lower(average_sentiment) as average_sentiment from
(
SELECT TOP 1 WITH TIES
key_phrase,
sentiment as average_sentiment,
COUNT(*) AS call_frequency from
(
select key_phrase, sentiment from [dbo].[processed_data_key_phrases]
{where_clause}
) t
GROUP BY key_phrase, sentiment
ORDER BY ROW_NUMBER() OVER (PARTITION BY key_phrase ORDER BY COUNT(*) DESC)
) t2
order by call_frequency desc
'''
cursor.execute(sql_stmt)
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
df = df.head(15)
if not df.empty:
nested_json3 = (
df.groupby(['id', 'chart_name', 'chart_type']).apply(
lambda x: x[['text', 'size', 'average_sentiment']].to_dict(orient='records'),
include_groups=False
).reset_index()
)
nested_json3.columns = ['id', 'chart_name', 'chart_type', 'chart_value']
result3 = nested_json3.to_dict(orient='records')
else:
result3 = []
final_result = result1 + result2 + result3
return final_result
finally:
if cursor:
cursor.close()
conn.close()
async def execute_sql_query(sql_query):
"""
Executes a given SQL query and returns the result as a concatenated string.
"""
conn = await get_db_connection()
cursor = None
try:
cursor = conn.cursor()
cursor.execute(sql_query)
result = ''.join(str(row) for row in cursor.fetchall())
return result
except Exception as e:
logging.error("Error executing SQL query: %s", e)
return None
finally:
if cursor:
cursor.close()
conn.close()