-
Notifications
You must be signed in to change notification settings - Fork 261
Expand file tree
/
Copy path03_cu_process_data_text.py
More file actions
654 lines (558 loc) · 26.9 KB
/
03_cu_process_data_text.py
File metadata and controls
654 lines (558 loc) · 26.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
"""
Data processing script for conversation knowledge mining.
This module processes call transcripts using Azure Content Understanding,
generates embeddings, and stores processed data in SQL Server and Azure Search.
"""
import argparse
import asyncio
import json
import logging
import os
import re
import struct
from datetime import datetime, timedelta
from urllib.parse import urlparse
# Suppress informational warnings from agent_framework about runtime
# tool/structured_output overrides not being supported by AzureAIClient.
logging.getLogger("agent_framework.azure").setLevel(logging.ERROR)
import pandas as pd
import pyodbc
from azure.ai.inference.aio import EmbeddingsClient
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import PromptAgentDefinition
from azure.identity.aio import AzureCliCredential as AsyncAzureCliCredential
from azure.identity import AzureCliCredential, get_bearer_token_provider
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.storage.filedatalake import DataLakeServiceClient
from agent_framework.azure import AzureAIProjectAgentProvider
from content_understanding_client import AzureContentUnderstandingClient
# Get parameters from command line
p = argparse.ArgumentParser()
p.add_argument("--search_endpoint", required=True)
p.add_argument("--ai_project_endpoint", required=True)
p.add_argument("--deployment_model", required=True)
p.add_argument("--embedding_model", required=True)
p.add_argument("--storage_account_name", required=True)
p.add_argument("--sql_server", required=True)
p.add_argument("--sql_database", required=True)
p.add_argument("--cu_endpoint", required=True)
p.add_argument("--cu_api_version", required=True)
p.add_argument("--usecase", required=True)
p.add_argument("--solution_name", required=True)
args = p.parse_args()
SEARCH_ENDPOINT = args.search_endpoint
AI_PROJECT_ENDPOINT = args.ai_project_endpoint
DEPLOYMENT_MODEL = args.deployment_model
EMBEDDING_MODEL = args.embedding_model
STORAGE_ACCOUNT_NAME = args.storage_account_name
SQL_SERVER = args.sql_server
SQL_DATABASE = args.sql_database
CU_ENDPOINT = args.cu_endpoint
CU_API_VERSION = args.cu_api_version
USE_CASE = args.usecase
SOLUTION_NAME = args.solution_name
# Construct agent names from solution name (matching 01_create_agents.py pattern)
TOPIC_MINING_AGENT_NAME = f"KM-TopicMiningAgent-{SOLUTION_NAME}"
TOPIC_MAPPING_AGENT_NAME = f"KM-TopicMappingAgent-{SOLUTION_NAME}"
FILE_SYSTEM_CLIENT_NAME = "data"
DIRECTORY = 'call_transcripts'
INDEX_NAME = "call_transcripts_index"
if USE_CASE == "telecom":
SAMPLE_IMPORT_FILE = 'infra/data/telecom/sample_search_index_data.json'
SAMPLE_PROCESSED_DATA_FILE = 'infra/data/telecom/sample_processed_data.json'
SAMPLE_PROCESSED_DATA_KEY_PHRASES_FILE = 'infra/data/telecom/sample_processed_data_key_phrases.json'
else:
SAMPLE_IMPORT_FILE = 'infra/data/IT_helpdesk/sample_search_index_data.json'
SAMPLE_PROCESSED_DATA_FILE = 'infra/data/IT_helpdesk/sample_processed_data.json'
SAMPLE_PROCESSED_DATA_KEY_PHRASES_FILE = 'infra/data/IT_helpdesk/sample_processed_data_key_phrases.json'
# Azure DataLake setup
account_url = f"https://{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net"
credential = AzureCliCredential(process_timeout=30)
service_client = DataLakeServiceClient(account_url, credential=credential, api_version='2023-01-03')
file_system_client = service_client.get_file_system_client(FILE_SYSTEM_CLIENT_NAME)
directory_name = DIRECTORY
paths = list(file_system_client.get_paths(path=directory_name))
# Azure Search setup
search_credential = AzureCliCredential(process_timeout=30)
search_client = SearchClient(SEARCH_ENDPOINT, INDEX_NAME, search_credential)
index_client = SearchIndexClient(endpoint=SEARCH_ENDPOINT, credential=search_credential)
# SQL Server setup
try:
driver = "{ODBC Driver 18 for SQL Server}"
token_bytes = credential.get_token("https://database.windows.net/.default").token.encode("utf-16-LE")
token_struct = struct.pack(f"<I{len(token_bytes)}s", len(token_bytes), token_bytes)
SQL_COPT_SS_ACCESS_TOKEN = 1256
connection_string = f"DRIVER={driver};SERVER={SQL_SERVER};DATABASE={SQL_DATABASE};"
conn = pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct})
cursor = conn.cursor()
except Exception:
driver = "{ODBC Driver 17 for SQL Server}"
token_bytes = credential.get_token("https://database.windows.net/.default").token.encode("utf-16-LE")
token_struct = struct.pack(f"<I{len(token_bytes)}s", len(token_bytes), token_bytes)
SQL_COPT_SS_ACCESS_TOKEN = 1256
connection_string = f"DRIVER={driver};SERVER={SQL_SERVER};DATABASE={SQL_DATABASE};"
conn = pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct})
cursor = conn.cursor()
# SQL data type mapping for pandas to SQL conversion
sql_data_types = {
'int64': 'INT',
'float64': 'DECIMAL(10,2)',
'object': 'NVARCHAR(MAX)',
'bool': 'BIT',
'datetime64[ns]': 'DATETIME2(6)',
'timedelta[ns]': 'TIME'
}
# Helper function to generate and execute optimized SQL insert scripts
def generate_sql_insert_script(df, table_name, columns, sql_file_name):
"""
Generate and execute optimized SQL INSERT script from DataFrame.
Args:
df: pandas DataFrame with data to insert
table_name: Target SQL table name
columns: List of column names
sql_file_name: Output SQL file name
Returns:
Number of records inserted
"""
if df.empty:
print(f"No data to insert into {table_name}.")
return 0
# Prepare output directory
sql_output_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'index_scripts', 'sql_files'))
os.makedirs(sql_output_dir, exist_ok=True)
output_file_path = os.path.join(sql_output_dir, sql_file_name)
# Generate INSERT statements
insert_sql = f"INSERT INTO {table_name} ([{'],['.join(columns)}]) VALUES "
values_list = []
sql_commands = []
count = 0
for _, row in df.iterrows():
values = []
for value in row:
if pd.isna(value) or value is None:
values.append('NULL')
elif isinstance(value, str):
str_value = value.replace("'", "''")
values.append(f"'{str_value}'")
elif isinstance(value, bool):
values.append("1" if value else "0")
else:
values.append(str(value))
count += 1
values_list.append(f"({', '.join(values)})")
# Batch inserts in groups of 1000 for performance
if count == 1000:
insert_sql += ",\n".join(values_list) + ";\n"
sql_commands.append(insert_sql)
# Reset for next batch
insert_sql = f"INSERT INTO {table_name} ([{'],['.join(columns)}]) VALUES "
values_list = []
count = 0
# Handle remaining records
if values_list:
insert_sql += ",\n".join(values_list) + ";\n"
sql_commands.append(insert_sql)
# Write SQL script to file
with open(output_file_path, 'w', encoding='utf-8') as f:
f.write("\n".join(sql_commands))
# Execute SQL script
with open(output_file_path, 'r', encoding='utf-8') as f:
sql_script = f.read()
cursor.execute(sql_script)
conn.commit()
record_count = len(df)
print(f"Inserted {record_count} records into {table_name} using optimized SQL script.")
return record_count
# Content Understanding client
cu_credential = AzureCliCredential(process_timeout=30)
cu_token_provider = get_bearer_token_provider(cu_credential, "https://cognitiveservices.azure.com/.default")
cu_client = AzureContentUnderstandingClient(
endpoint=CU_ENDPOINT,
api_version=CU_API_VERSION,
token_provider=cu_token_provider
)
ANALYZER_ID = "ckm-json"
# Azure AI Foundry (Inference) embeddings client (async)
inference_endpoint = f"https://{urlparse(AI_PROJECT_ENDPOINT).netloc}/models"
# Utility functions
async def get_embeddings_async(text: str, embeddings_client):
"""Get embeddings using async EmbeddingsClient."""
try:
resp = await embeddings_client.embed(model=EMBEDDING_MODEL, input=[text])
return resp.data[0].embedding
except Exception as e:
print(f"Error getting embeddings: {e}")
raise
# Function: Clean Spaces with Regex
def clean_spaces_with_regex(text):
# Use a regular expression to replace multiple spaces with a single space
cleaned_text = re.sub(r'\s+', ' ', text)
# Use a regular expression to replace consecutive dots with a single dot
cleaned_text = re.sub(r'\.{2,}', '.', cleaned_text)
return cleaned_text
def chunk_data(text, tokens_per_chunk=1024):
text = clean_spaces_with_regex(text)
sentences = text.split('. ') # Split text into sentences
chunks = []
current_chunk = ''
current_chunk_token_count = 0
# Iterate through each sentence
for sentence in sentences:
# Split sentence into tokens
tokens = sentence.split()
# Check if adding the current sentence exceeds tokens_per_chunk
if current_chunk_token_count + len(tokens) <= tokens_per_chunk:
# Add the sentence to the current chunk
if current_chunk:
current_chunk += '. ' + sentence
else:
current_chunk += sentence
current_chunk_token_count += len(tokens)
else:
# Add current chunk to chunks list and start a new chunk
chunks.append(current_chunk)
current_chunk = sentence
current_chunk_token_count = len(tokens)
# Add the last chunk
if current_chunk:
chunks.append(current_chunk)
return chunks
async def prepare_search_doc(content, document_id, path_name, embeddings_client):
chunks = chunk_data(content)
docs = []
for idx, chunk in enumerate(chunks, 1):
chunk_id = f"{document_id}_{str(idx).zfill(2)}"
try:
v_contentVector = await get_embeddings_async(str(chunk), embeddings_client)
except Exception:
await asyncio.sleep(30)
try:
v_contentVector = await get_embeddings_async(str(chunk), embeddings_client)
except Exception:
v_contentVector = []
docs.append({
"id": chunk_id,
"chunk_id": chunk_id,
"content": chunk,
"sourceurl": path_name.split('/')[-1],
"contentVector": v_contentVector
})
return docs
# Database table creation
def create_tables():
cursor.execute('DROP TABLE IF EXISTS processed_data')
cursor.execute("""CREATE TABLE processed_data (
ConversationId varchar(255) NOT NULL PRIMARY KEY,
EndTime varchar(255),
StartTime varchar(255),
Content varchar(max),
summary varchar(3000),
satisfied varchar(255),
sentiment varchar(255),
topic varchar(255),
key_phrases nvarchar(max),
complaint varchar(255),
mined_topic varchar(255)
);""")
cursor.execute('DROP TABLE IF EXISTS processed_data_key_phrases')
cursor.execute("""CREATE TABLE processed_data_key_phrases (
ConversationId varchar(255),
key_phrase varchar(500),
sentiment varchar(255),
topic varchar(255),
StartTime varchar(255)
);""")
conn.commit()
create_tables()
def get_field_value(fields, field_name, default=""):
field = fields.get(field_name, {})
return field.get('valueString', default)
# Process files and insert into DB and Search
async def process_files():
"""Process all files with async embeddings client."""
conversationIds, docs, counter = [], [], 0
processed_records = [] # Collect all records for batch insert
# Create embeddings client for entire processing session
async with (
AsyncAzureCliCredential(process_timeout=30) as async_cred,
EmbeddingsClient(
endpoint=inference_endpoint,
credential=async_cred,
credential_scopes=["https://ai.azure.com/.default"],
) as embeddings_client
):
for path in paths:
file_client = file_system_client.get_file_client(path.name)
data_file = file_client.download_file()
data = data_file.readall()
try:
response = cu_client.begin_analyze(ANALYZER_ID, file_location="", file_data=data)
result = cu_client.poll_result(response)
file_name = path.name.split('/')[-1].replace("%3A", "_")
if USE_CASE == 'telecom':
start_time = file_name.replace(".json", "")[-19:]
timestamp_format = "%Y-%m-%d %H_%M_%S"
else:
start_time = file_name.replace(".json", "")[-16:]
timestamp_format = "%Y-%m-%d%H%M%S"
start_timestamp = datetime.strptime(start_time, timestamp_format)
conversation_id = file_name.split('convo_', 1)[1].split('_')[0]
conversationIds.append(conversation_id)
fields = result['result']['contents'][0]['fields']
duration_str = get_field_value(fields, 'Duration', '0')
try:
duration = int(duration_str)
except (ValueError, TypeError):
duration = 0
end_timestamp = str(start_timestamp + timedelta(seconds=duration)).split(".")[0]
start_timestamp = str(start_timestamp).split(".")[0]
summary = get_field_value(fields, 'summary')
satisfied = get_field_value(fields, 'satisfied')
sentiment = get_field_value(fields, 'sentiment')
topic = get_field_value(fields, 'topic')
key_phrases = get_field_value(fields, 'keyPhrases')
complaint = get_field_value(fields, 'complaint')
content = get_field_value(fields, 'content')
# Collect record for batch insert
processed_records.append({
'ConversationId': conversation_id,
'EndTime': end_timestamp,
'StartTime': start_timestamp,
'Content': content,
'summary': summary,
'satisfied': satisfied,
'sentiment': sentiment,
'topic': topic,
'key_phrases': key_phrases,
'complaint': complaint
})
docs.extend(await prepare_search_doc(content, conversation_id, path.name, embeddings_client))
counter += 1
except Exception: # Skip files that fail processing
pass
if docs != [] and counter % 10 == 0:
search_client.upload_documents(documents=docs)
docs = []
if docs:
search_client.upload_documents(documents=docs)
# Batch insert all processed records using optimized SQL script
if processed_records:
df_processed = pd.DataFrame(processed_records)
columns = ['ConversationId', 'EndTime', 'StartTime', 'Content', 'summary', 'satisfied', 'sentiment', 'topic', 'key_phrases', 'complaint']
generate_sql_insert_script(df_processed, 'processed_data', columns, 'processed_data_batch_insert.sql')
return conversationIds, counter
conversationIds, counter = asyncio.run(process_files())
print(f"✓ Processed {counter} files")
# Load sample data to search index and database
def bulk_import_json_to_table(json_file, table_name):
with open(file=json_file, mode="r", encoding='utf-8') as f:
data = json.load(f)
if not data:
return
df = pd.DataFrame(data)
generate_sql_insert_script(df, table_name, list(df.columns), f'sample_import_{table_name}.sql')
with open(file=SAMPLE_IMPORT_FILE, mode='r', encoding='utf-8') as file:
documents = json.load(file)
batch = [{"@search.action": "upload", **doc} for doc in documents]
search_client.upload_documents(documents=batch)
bulk_import_json_to_table(SAMPLE_PROCESSED_DATA_FILE, 'processed_data')
bulk_import_json_to_table(SAMPLE_PROCESSED_DATA_KEY_PHRASES_FILE, 'processed_data_key_phrases')
print(f"✓ Loaded {len(documents)} sample records")
# Topic mining and mapping
cursor.execute('SELECT distinct topic FROM processed_data')
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
cursor.execute('DROP TABLE IF EXISTS km_mined_topics')
cursor.execute("""CREATE TABLE km_mined_topics (
label varchar(255) NOT NULL PRIMARY KEY,
description varchar(255)
);""")
conn.commit()
topics_str = ', '.join(df['topic'].tolist())
# Extract common topics from previously loaded sample data
cursor.execute('SELECT distinct mined_topic FROM processed_data')
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
common_topics_str = ', '.join(df['mined_topic'].dropna().tolist())
if not common_topics_str:
common_topics_str = "parental controls, billing issues"
# Create agents for topic mining and mapping
print("Creating topic mining and mapping agents...")
# Topic Mining Agent instruction
TOPIC_MINING_AGENT_INSTRUCTION = f'''You are a data analysis assistant specialized in natural language processing and topic modeling.
Your task is to analyze conversation topics and identify distinct categories.
Rules:
1. Identify key topics using topic modeling techniques
2. Choose the right number of topics based on data (try to keep it up to 8 topics)
3. Assign clear and concise labels to each topic
4. Provide brief descriptions for each topic
5. Include common topics like {common_topics_str} if relevant
6. If data is insufficient, indicate more data is needed
7. Return topics in JSON format with 'topics' array containing objects with 'label' and 'description' fields
8. Return ONLY the JSON, no other text or markdown formatting
'''
# Topic Mapping Agent instruction
TOPIC_MAPPING_AGENT_INSTRUCTION = '''You are a data analysis assistant that maps conversation topics to the closest matching category.
Return ONLY the matching topic EXACTLY as written in the list (case-sensitive)
Do not add any explanatory text, punctuation, quotes, or formatting
Do not create, rephrase, abbreviate, or pluralize topics
If no topic is a perfect match, choose the closest one from the list ONLY
'''
# Create async project client and agents
async def create_agents():
"""Create topic mining and mapping agents asynchronously."""
async with (
AsyncAzureCliCredential(process_timeout=30) as async_cred,
AIProjectClient(endpoint=AI_PROJECT_ENDPOINT, credential=async_cred) as project_client,
):
topic_mining_agent = await project_client.agents.create_version(
agent_name=TOPIC_MINING_AGENT_NAME,
definition=PromptAgentDefinition(
model=DEPLOYMENT_MODEL,
instructions=TOPIC_MINING_AGENT_INSTRUCTION,
),
)
topic_mapping_agent = await project_client.agents.create_version(
agent_name=TOPIC_MAPPING_AGENT_NAME,
definition=PromptAgentDefinition(
model=DEPLOYMENT_MODEL,
instructions=TOPIC_MAPPING_AGENT_INSTRUCTION,
),
)
return topic_mining_agent, topic_mapping_agent
topic_mining_agent, topic_mapping_agent = asyncio.run(create_agents())
print(f"✓ Created agents: {topic_mining_agent.name}, {topic_mapping_agent.name}")
try:
async def call_topic_mining_agent(topics_str1):
"""Use Topic Mining Agent with Agent Framework to analyze and categorize topics."""
async with (
AsyncAzureCliCredential(process_timeout=30) as async_cred,
AIProjectClient(endpoint=AI_PROJECT_ENDPOINT, credential=async_cred) as project_client,
):
# Create provider for agent management
provider = AzureAIProjectAgentProvider(project_client=project_client)
# Get agent using provider
agent = await provider.get_agent(name=TOPIC_MINING_AGENT_NAME)
# Query with the topics string
query = f"Analyze these conversation topics and identify distinct categories: {topics_str1}"
result = await agent.run(query)
res = result.text
# Clean up markdown formatting if present
res = res.replace("```json", '').replace("```", '').strip()
return json.loads(res)
res = asyncio.run(call_topic_mining_agent(topics_str))
for object1 in res['topics']:
cursor.execute("INSERT INTO km_mined_topics (label, description) VALUES (?,?)", (object1['label'], object1['description']))
conn.commit()
cursor.execute('SELECT label FROM km_mined_topics')
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df_topics = pd.DataFrame(rows, columns=column_names)
mined_topics_list = df_topics['label'].tolist()
print(f"✓ Mined {len(mined_topics_list)} topics")
async def call_topic_mapping_agent(agent, input_text, list_of_topics):
"""Use Topic Mapping Agent with Agent Framework to map topic to category."""
query = f"""Find the closest topic for this text: '{input_text}' from this list of topics: {list_of_topics}"""
result = await agent.run(query)
return result.text.strip()
cursor.execute('SELECT * FROM processed_data')
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df_processed_data = pd.DataFrame(rows, columns=column_names)
df_processed_data = df_processed_data[df_processed_data['ConversationId'].isin(conversationIds)]
# Map topics using agent asynchronously
async def map_all_topics():
"""Map all topics to categories using agent."""
# Create credential, project client, provider, and agent once for reuse
async with (
AsyncAzureCliCredential(process_timeout=30) as async_cred,
AIProjectClient(endpoint=AI_PROJECT_ENDPOINT, credential=async_cred) as project_client,
):
# Create provider for agent management
provider = AzureAIProjectAgentProvider(project_client=project_client)
# Get agent using provider
agent = await provider.get_agent(name=TOPIC_MAPPING_AGENT_NAME)
# Process all rows using the same agent instance
for _, row in df_processed_data.iterrows():
mined_topic_str = await call_topic_mapping_agent(agent, row['topic'], str(mined_topics_list))
cursor.execute("UPDATE processed_data SET mined_topic = ? WHERE ConversationId = ?", (mined_topic_str, row['ConversationId']))
conn.commit()
asyncio.run(map_all_topics())
# Update processed data for RAG
cursor.execute('DROP TABLE IF EXISTS km_processed_data')
cursor.execute("""CREATE TABLE km_processed_data (
ConversationId varchar(255) NOT NULL PRIMARY KEY,
StartTime varchar(255),
EndTime varchar(255),
Content varchar(max),
summary varchar(max),
satisfied varchar(255),
sentiment varchar(255),
keyphrases nvarchar(max),
complaint varchar(255),
topic varchar(255)
);""")
conn.commit()
cursor.execute('''select ConversationId, StartTime, EndTime, Content, summary, satisfied, sentiment,
key_phrases as keyphrases, complaint, mined_topic as topic from processed_data''')
rows = cursor.fetchall()
columns = ["ConversationId", "StartTime", "EndTime", "Content", "summary", "satisfied", "sentiment",
"keyphrases", "complaint", "topic"]
df_km = pd.DataFrame([list(row) for row in rows], columns=columns)
generate_sql_insert_script(df_km, 'km_processed_data', columns, 'processed_km_data_with_mined_topics.sql')
# Update processed_data_key_phrases table
cursor.execute('''select ConversationId, key_phrases, sentiment, mined_topic as topic, StartTime from processed_data''')
rows = [tuple(row) for row in cursor.fetchall()]
column_names = [i[0] for i in cursor.description]
df = pd.DataFrame(rows, columns=column_names)
df = df[df['ConversationId'].isin(conversationIds)]
# Collect all key phrase records for batch insert
key_phrase_records = []
for _, row in df.iterrows():
key_phrases = row['key_phrases'].split(',')
for key_phrase in key_phrases:
key_phrase = key_phrase.strip()
key_phrase_records.append({
'ConversationId': row['ConversationId'],
'key_phrase': key_phrase,
'sentiment': row['sentiment'],
'topic': row['topic'],
'StartTime': row['StartTime']
})
# Batch insert using optimized SQL script
if key_phrase_records:
df_key_phrases = pd.DataFrame(key_phrase_records)
columns = ['ConversationId', 'key_phrase', 'sentiment', 'topic', 'StartTime']
generate_sql_insert_script(df_key_phrases, 'processed_data_key_phrases', columns, 'processed_new_key_phrases.sql')
# Adjust dates to current date
today = datetime.today()
cursor.execute("SELECT MAX(CAST(StartTime AS DATETIME)) FROM [dbo].[processed_data]")
max_start_time = cursor.fetchone()[0]
days_difference = (today.date() - max_start_time.date()).days - 1 if max_start_time else 0
if days_difference > 0:
cursor.execute("UPDATE [dbo].[processed_data] SET StartTime = FORMAT(DATEADD(DAY, ?, StartTime), 'yyyy-MM-dd HH:mm:ss'), EndTime = FORMAT(DATEADD(DAY, ?, EndTime), 'yyyy-MM-dd HH:mm:ss')", (days_difference, days_difference))
cursor.execute("UPDATE [dbo].[km_processed_data] SET StartTime = FORMAT(DATEADD(DAY, ?, StartTime), 'yyyy-MM-dd HH:mm:ss'), EndTime = FORMAT(DATEADD(DAY, ?, EndTime), 'yyyy-MM-dd HH:mm:ss')", (days_difference, days_difference))
cursor.execute("UPDATE [dbo].[processed_data_key_phrases] SET StartTime = FORMAT(DATEADD(DAY, ?, StartTime), 'yyyy-MM-dd HH:mm:ss')", (days_difference,))
conn.commit()
cursor.close()
conn.close()
print("✓ Data processing completed")
finally:
# Delete the agents after processing is complete
print("Deleting topic mining and mapping agents...")
try:
async def delete_agents():
"""Delete topic mining and mapping agents asynchronously."""
async with (
AsyncAzureCliCredential(process_timeout=30) as async_cred,
AIProjectClient(endpoint=AI_PROJECT_ENDPOINT, credential=async_cred) as project_client,
):
await project_client.agents.delete_version(topic_mining_agent.name, topic_mining_agent.version)
await project_client.agents.delete_version(topic_mapping_agent.name, topic_mapping_agent.version)
asyncio.run(delete_agents())
print(f"✓ Deleted agents: {topic_mining_agent.name}, {topic_mapping_agent.name}")
except Exception as e:
print(f"Warning: Could not delete agents: {e}")