Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions apps/common/event/listener_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,8 @@ def update_status(query_set: QuerySet, taskType: TaskType, state: State):
for key in params_dict:
_value_ = params_dict[key]
exec_sql = exec_sql.replace(key, str(_value_))
lock.acquire()
try:
with lock:
native_update(query_set, exec_sql)
finally:
lock.release()

@staticmethod
def embedding_by_document(document_id, embedding_model: Embeddings, state_list=None):
Expand Down Expand Up @@ -272,7 +269,6 @@ def is_the_task_interrupted():
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), TaskType.EMBEDDING,
State.STARTED)


# 根据段落进行向量化处理
page_desc(QuerySet(Paragraph)
.annotate(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a review of the code with suggestions for improvements:

Potential Issues

  1. SQL Injection: Using exec_sql directly can lead to SQL injection if not properly sanitized.

  2. Concurrency Handling: The current use of locking might be inefficient. For better performance, consider using Python's asyncio library or a database-specific connection pool that supports concurrent transactions.

  3. Code Complexity: The function embedding_by_document lacks proper error handling and logging. Consider adding exception handling and log statements.

  4. Unused Code Block: The final block inside update_status will always run because the lock is acquired but never released if an error occurs within the loop. This could lead to resource leaks.

Optimization Suggestions

  1. Parameter Validation: Before building the query, validate params_dict to ensure it only contains necessary fields.

  2. Batch Processing: If multiple documents need embedding at once, consider processing them in batches to reduce overhead.

  3. Database Connection Pooling: Use a connection pool from libraries like SQLAlchemy or Django ORM to manage database connections more efficiently.

  4. Locks: For high concurrency scenarios, consider implementing optimistic locks instead of pessimistic (locking). SQLite does not support this directly, so you may need to switch to PostgreSQL or another database that supports it.

  5. Error Handling: Implement robust error handling to manage exceptions during document embedding tasks and log errors to improve troubleshooting.

  6. Logging: Add comprehensive logging to track the flow and status of document embeddings, which helps in debugging and monitoring.

# Suggested Optimized Version:

from sqlalchemy import create_engine, func, select
import asyncio
import json

# Sample configuration
DATABASE_URI = 'sqlite:///example.db'  # Adjust according to your database type

def setup_database():
    engine = create_engine(DATABASE_URI)
    return engine

class DocumentEmbeddingService:
    def __init__(self, engine):
        self.engine = engine

    @staticmethod
    async def process_documents(documents: list, embedding_model: Embeddings) -> None:
        async with engine.begin() as conn:
            batch_size = 100  # Adjust based on your application needs
            for i in range(0, len(documents), batch_size):
                chunk = documents[i:i + batch_size]
                
                desc = await async_db_query(conn, Paragraph, 
                                           select([func.avg(Paragraph.desc_length)]))
                average_desc_len = desc[0] if desc else 0
                
                insert_statements = []
                for doc in chunk:
                    embedded_data = embedding_model.embed(
                        text=doc.text,
                        prompt_template=average_desc_len,
                        max_tokens=max_tokens_doc_desc  # Assuming a variable exists for this
                    )
                    
                    stmt = InsertIntoDocumentEmbeddedData(text=doc.text, 
                                                       embedding=json.dumps(embedded_data))  # Ensure JSON serialization works correctly
                    
                    insert_statements.append(stmt)

                await conn.execute_many(insert_statements)
                conn.commit()

    @staticmethod
    async def async_db_query(db_conn, model_class, s: SelectStatement[str]):
        result = await db_conn.execute(s)
        row = await result.first()
        return [row[column.name] for column in s.columns]

setup_database().then(lambda engine: asyncio.run(process_documents(docs_to_embed, embedder))).cancel_on_shutdown()

This version simplifies the logic for embedding documents by breaking down the task into smaller asynchronous functions. It uses a transaction-safe approach with async_db_query to safely execute queries. Make sure to adjust settings like batch_size, prompt_template, and other parameters based on your specific requirements and environment.

Expand Down