Skip to content

Commit 12f99ac

Browse files
fix: explicitly close AsyncOpenAI embeddings client in try/finally
Wraps the �mbeddings_client = AsyncOpenAI(...) usage in process_files() inside a try/finally that calls �wait embeddings_client.close() so the underlying httpx connection pool is released, addressing reviewer feedback about leaked HTTP connections / unclosed-client warnings. Applied to both: - infra/scripts/index_scripts/03_cu_process_data_text.py - infra/scripts/index_scripts/04_cu_process_custom_data.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5757229 commit 12f99ac

2 files changed

Lines changed: 196 additions & 188 deletions

File tree

infra/scripts/index_scripts/03_cu_process_data_text.py

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -331,68 +331,72 @@ async def process_files():
331331
base_url=embeddings_base_url,
332332
api_key=embeddings_token_provider(),
333333
)
334-
for path in paths:
335-
file_client = file_system_client.get_file_client(path.name)
336-
data_file = file_client.download_file()
337-
data = data_file.readall()
338-
try:
339-
response = cu_client.begin_analyze(ANALYZER_ID, file_location="", file_data=data)
340-
result = cu_client.poll_result(response)
341-
file_name = path.name.split('/')[-1].replace("%3A", "_")
342-
if USE_CASE == 'telecom':
343-
start_time = file_name.replace(".json", "")[-19:]
344-
timestamp_format = "%Y-%m-%d %H_%M_%S"
345-
else:
346-
start_time = file_name.replace(".json", "")[-16:]
347-
timestamp_format = "%Y-%m-%d%H%M%S"
348-
start_timestamp = datetime.strptime(start_time, timestamp_format)
349-
conversation_id = file_name.split('convo_', 1)[1].split('_')[0]
350-
conversationIds.append(conversation_id)
351-
fields = result['result']['contents'][0]['fields']
352-
duration_str = get_field_value(fields, 'Duration', '0')
334+
try:
335+
for path in paths:
336+
file_client = file_system_client.get_file_client(path.name)
337+
data_file = file_client.download_file()
338+
data = data_file.readall()
353339
try:
354-
duration = int(duration_str)
355-
except (ValueError, TypeError):
356-
duration = 0
357-
end_timestamp = str(start_timestamp + timedelta(seconds=duration)).split(".")[0]
358-
start_timestamp = str(start_timestamp).split(".")[0]
359-
summary = get_field_value(fields, 'summary')
360-
satisfied = get_field_value(fields, 'satisfied')
361-
sentiment = get_field_value(fields, 'sentiment')
362-
topic = get_field_value(fields, 'topic')
363-
key_phrases = get_field_value(fields, 'keyPhrases')
364-
complaint = get_field_value(fields, 'complaint')
365-
content = get_field_value(fields, 'content')
366-
367-
# Collect record for batch insert
368-
processed_records.append({
369-
'ConversationId': conversation_id,
370-
'EndTime': end_timestamp,
371-
'StartTime': start_timestamp,
372-
'Content': content,
373-
'summary': summary,
374-
'satisfied': satisfied,
375-
'sentiment': sentiment,
376-
'topic': topic,
377-
'key_phrases': key_phrases,
378-
'complaint': complaint
379-
})
380-
381-
docs.extend(await prepare_search_doc(content, conversation_id, path.name, embeddings_client))
382-
counter += 1
383-
except Exception: # Skip files that fail processing
384-
pass
385-
if docs != [] and counter % 10 == 0:
340+
response = cu_client.begin_analyze(ANALYZER_ID, file_location="", file_data=data)
341+
result = cu_client.poll_result(response)
342+
file_name = path.name.split('/')[-1].replace("%3A", "_")
343+
if USE_CASE == 'telecom':
344+
start_time = file_name.replace(".json", "")[-19:]
345+
timestamp_format = "%Y-%m-%d %H_%M_%S"
346+
else:
347+
start_time = file_name.replace(".json", "")[-16:]
348+
timestamp_format = "%Y-%m-%d%H%M%S"
349+
start_timestamp = datetime.strptime(start_time, timestamp_format)
350+
conversation_id = file_name.split('convo_', 1)[1].split('_')[0]
351+
conversationIds.append(conversation_id)
352+
fields = result['result']['contents'][0]['fields']
353+
duration_str = get_field_value(fields, 'Duration', '0')
354+
try:
355+
duration = int(duration_str)
356+
except (ValueError, TypeError):
357+
duration = 0
358+
end_timestamp = str(start_timestamp + timedelta(seconds=duration)).split(".")[0]
359+
start_timestamp = str(start_timestamp).split(".")[0]
360+
summary = get_field_value(fields, 'summary')
361+
satisfied = get_field_value(fields, 'satisfied')
362+
sentiment = get_field_value(fields, 'sentiment')
363+
topic = get_field_value(fields, 'topic')
364+
key_phrases = get_field_value(fields, 'keyPhrases')
365+
complaint = get_field_value(fields, 'complaint')
366+
content = get_field_value(fields, 'content')
367+
368+
# Collect record for batch insert
369+
processed_records.append({
370+
'ConversationId': conversation_id,
371+
'EndTime': end_timestamp,
372+
'StartTime': start_timestamp,
373+
'Content': content,
374+
'summary': summary,
375+
'satisfied': satisfied,
376+
'sentiment': sentiment,
377+
'topic': topic,
378+
'key_phrases': key_phrases,
379+
'complaint': complaint
380+
})
381+
382+
docs.extend(await prepare_search_doc(content, conversation_id, path.name, embeddings_client))
383+
counter += 1
384+
except Exception: # Skip files that fail processing
385+
pass
386+
if docs != [] and counter % 10 == 0:
387+
search_client.upload_documents(documents=docs)
388+
docs = []
389+
if docs:
386390
search_client.upload_documents(documents=docs)
387-
docs = []
388-
if docs:
389-
search_client.upload_documents(documents=docs)
390-
391-
# Batch insert all processed records using optimized SQL script
392-
if processed_records:
393-
df_processed = pd.DataFrame(processed_records)
394-
columns = ['ConversationId', 'EndTime', 'StartTime', 'Content', 'summary', 'satisfied', 'sentiment', 'topic', 'key_phrases', 'complaint']
395-
generate_sql_insert_script(df_processed, 'processed_data', columns, 'processed_data_batch_insert.sql')
391+
392+
# Batch insert all processed records using optimized SQL script
393+
if processed_records:
394+
df_processed = pd.DataFrame(processed_records)
395+
columns = ['ConversationId', 'EndTime', 'StartTime', 'Content', 'summary', 'satisfied', 'sentiment', 'topic', 'key_phrases', 'complaint']
396+
generate_sql_insert_script(df_processed, 'processed_data', columns, 'processed_data_batch_insert.sql')
397+
finally:
398+
# Close the embeddings client to release the underlying httpx connection pool.
399+
await embeddings_client.close()
396400

397401
return conversationIds, counter
398402

0 commit comments

Comments
 (0)