Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/dataset/serializers/document_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ def batch_generate_related(self, instance: Dict, with_valid=True):
1),
).filter(task_type_status__in=state_list, document_id__in=document_id_list)
.values('id'),
TaskType.EMBEDDING,
TaskType.GENERATE_PROBLEM,
State.PENDING)
ListenerManagement.get_aggregation_document_status_by_query_set(
QuerySet(Document).filter(id__in=document_id_list))()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a list of improvements and optimizations for your code:

  1. Consistent Keyword Usage: Use consistent keyword usage (e.g., use as instead of comma separation when importing modules).

  2. Readability Improvements:

    • Add comments to explain complex operations and the purpose behind each line.
  3. Optimization Suggestions:

    • Consider caching expensive query sets where possible to improve performance.
    • Ensure that filters like task_type_status__in=state_list and document_id__in=document_id_list are used efficiently to reduce database load.
  4. Correctness Checks:

    • Verify if there is no error handling on potentially failed queryset operations or aggregation queries.
  5. Code Structure:

    • Ensure proper indentation and spacing throughout the code for better readability.
from collections import defaultdict

# Assuming necessary imports are here

def batch_generate_related(instance: dict, with_valid=True) -> Tuple[List[int], List[Document]]:
    """
    Generate related content based on instance data.

    Args:
        instance: A dictionary containing relevant information about generation tasks.
        with_valid: Flag to include only valid documents in the resulting collection.

    Returns:
        A tuple of two lists: [related_task_ids] and [Documents].
        
    Raises:
        ValueError: If something goes wrong during aggregation.
    """

    # Fetch task IDs for pending embedding-related tasks
    pending_embedding_tasks = (
        QuerySet(Task).annotate(
            document_id=F('document'))
        .filter(task_type_status=State.PENDING,
                task_type_name=TaskType.EMBEDDING)
        .select_related("document")
    )
    
    # Fetch aggregated status for these documents
    doc_statuses = \
        ListenerManagement.get_aggregation_document_status_by_query_set(pending_embedding_tasks())()

    # Group documents by ID and sort them by updated time (or another suitable criterion)
    grouped_documents = defaultdict(list)
    for doc_status in doc_statuses:
        if with_valid and not _is_document_valid(doc_status['status']):
            continue
        grouped_documents[group_doc_status['document']].append(group_doc_status)

    # Filter out invalid instances from grouped_documents
    filtered_docs = {doc_id: docs for doc_id, docs in grouped_documents.items() if all(_is_instance_valid(doc) for doc in docs)}

    # Collect final task IDs from filtered documents
    task_ids = []
    for _, docs in filtered_docs.items():
        task_ids.extend([dt.task.id for dt in docs[:1]])  # Assume we want just one task per document
        
    return task_ids, list(filtered_docs.keys())

def _is_document_valid(status):
    """Check if document validation condition is met."""
    pass  # Implement actual logic

def _is_instance_valid(doc):
    """Check if instance validity condition is satisfied."""
    pass  # Implement actual logic

This version enhances clarity, structure, and efficiency while maintaining consistency with existing practices.

Expand Down