Skip to content

fix: Workflow tool import and export#4976

Merged
shaohuzhang1 merged 1 commit intov2from
pr@v2@fix_workflow_tool_export
Mar 30, 2026
Merged

fix: Workflow tool import and export#4976
shaohuzhang1 merged 1 commit intov2from
pr@v2@fix_workflow_tool_export

Conversation

@shaohuzhang1
Copy link
Copy Markdown
Contributor

fix: Tool workflow trigger execution error
fix: Workflow tool import and export

@f2c-ci-robot
Copy link
Copy Markdown

f2c-ci-robot bot commented Mar 30, 2026

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

state=State.FAILURE,
run_time=time.time() - start_time,
meta={'input': parameter_setting, 'output': 'Error: ' + str(e), 'err_message': 'Error: ' + str(e)}
)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Import necessary libraries at the top of your file for better readability

@f2c-ci-robot
Copy link
Copy Markdown

f2c-ci-robot bot commented Mar 30, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

state=State.FAILURE,
run_time=time.time() - start_time,
meta={'input': parameter_setting, 'output': 'Error: ' + str(e), 'err_message': 'Error: ' + str(e)}
)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Python script defines a class ToolTask that handles execution of tasks related to workflows defined in the "MaxKB" system using Django models and serializers. The code checks compatibility with tool types, retrieves tool record and parameters, sets up the workflow, executes it, records results, and logs errors.

Potential Issues and Suggestions:

  1. Exception Handling:

    • The code does not handle exceptions gracefully. It should explicitly log each exception instead of passing them through the logger. This makes debugging more difficult.
  2. Logging Errors:

    • After logging an error, the code updates the TaskRecord and ToolRecord states incorrectly twice, which seems unnecessary since setting the state at the end will suffice.
  3. Parameter Setting:

    • The function get_tool_execute_parameters can throw exceptions due to incorrect field references or conversion failures. Ensure these cases are handled properly for robustness.
  4. Task Record Updates:

    • After updating one TaskRecord, the subsequent call to set another is redundant; only the last operation on an instance should persist changes.
  5. Thread Safety:

    • While not critical here, consider whether this method could be executed in a multi-threading environment without additional synchronization mechanisms (e.g., locks).
  6. Code Duplication:

    • The logic for recording task results in both maxkb_logger.INFO and QuerySet.update() calls might indicate some redundancy or potential improvement for cleaner abstraction.

Suggested Enhancements:

# Simplify handling of output meta data
def finalize_task_record(self, task_record_id):
    # Set final state based on tool workflow manage's state
    state = get_tool_workflow_state(work_flow_manage)

    # Log final state information for consistency
    maxkb_logger.info(f"Finalizing Task ID: {task_record_id}, State: {state}")

    # Update task and tool records
    QuerySet(TaskRecord).filter(id=task_record_id).update(
        state=state,
        run_time=time.time() - start_time,
        meta={'input': parameter_setting, 'output': work_flow_manage.out_context}
    )
    QuerySet(ToolRecord).filter(id=task_record_id).update(
        state=state,
        run_time=time.time() - start_time,
        meta={'input': parameter_setting, 'output': work_flow_manage.out_context}
    )

# Introduce separate handler for task results
@staticmethod
def process_result(task_id, result):
    # Abstract the responsibility of processing and saving results into own function
    TaskResultManager.process_and_save_result(task_id, result)

By addressing these points, you can improve the reliability, clarity, maintainability, and efficiency of the provided codebase.


class Operate(serializers.Serializer):
user_id = serializers.UUIDField(required=True, label=_('user id'))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several issues, potential issues, and areas for optimization in the provided ToolWorkflowSerializer code:

Issues:

  1. Transaction Management: The use of @transaction.atomic on an entire method does not make much sense because it will rollback if any validation fails or other exceptions occur during execution. It's better to handle transactions at higher levels.

  2. Static Methods:

    • to_tool Method: This method seems duplicated with some logic within Import.export. Consider reducing redundancy.
    • to_tool_workflow Method: Similar to the above, this method has repeated logic that can be optimized by handling more cases inside a single function.
  3. Pickle Serialization/Deserialization: The pickle module is used for serialization/deserialization, which can introduce vulnerabilities if not properly handled. Using safer alternatives like JSON could improve security.

  4. Response Handling: The result.error(...) call within an exception block doesn't seem to match the expected usage, suggesting there might be confusion about how responses should be formatted.

  5. File Upload Validation and Loading:

    • import_ Method: Error handling when reading the file content using instance.get('file').read() could prevent unexpected behavior if the file is invalid or corrupted.
  6. Database Operations:

    • Bulk Create vs Single Update/Create: If only one tool need to be updated, why use bulk operations for all tools returned? Instead, consider updating each tool individually.
  7. Tool Workflow Instance Creation:

    • Constructor Arguments: Some fields (code, folder_id) have default values set to non-string literals (e.g., 'default'). Ensure these default literals fit their intended types (str).
  8. UUID Generation Logic:

    • There seems to be inconsistent logic involving UUID generation based on workspace_id.

Potential Improvements:

  1. Atomic Transactions: Use atomic decorator judiciously where necessary and avoid wrapping multiple actions into a single atomic transaction.

  2. Avoid Redundancy: Factor out common logic between methods wherever possible. For example, separate validation logic into its own method.

  3. Secure Data Transmission: If file uploads are involved, ensure secure transmission by using HTTPS and validating uploaded files before processing them with pickle.

  4. Optimize Database Queries: Where appropriate, optimize database queries and batch sizes to minimize load times.

  5. Use JSON for File Content Storage: Prefer using JSON over pickling binary data to enhance security and reliability.

Here’s an improved version focusing on key points highlighted:

class ToolWorkflowSerializer(serializers.Serializer):
    ...
    
    class Import(serializers.Serializer):
        # No major changes here, but ensure proper error handling
        
        def import_(self, instance: dict, is_import_tool, with_valid=True):
            # Improved validation without atomic txn
            
            self.fields["user_id"].validate(instance["user_id"])
            
            if with_valid:
                self.is_valid(raise_exception=True)
                
            ... rest of the import method remains largely unchanged ...

    class Export(serializers.Serializer):
        user_id = serializers.UUIDField(required=True, label=_('user id'))
        workspace_id = serializers.CharField(required=False, label=_('workspace id'))
        tool_id = serializers.UUIDField(required=True, label=_('knowledge id'))

        def export(self, with_valid=True):
            try:
               if with_valid:
                   self.is_valid()
                
                tool_id = self.data.get('tool_id')
               
                ... same structure as original "export" method with minor adjustments such as replacing pickle usage...

            except Exception as e:
                raise AppApiException(1001, str(e))
                # Adjust this according to app specific error handling

    class Operate(serializers.Serializer):
        user_id = serializers.UUIDField(required=True, label=_('user id'))
        workspace_id = serializers.CharField(required=True, label=_('workspace id'))

By making these improvements, you should observe overall better maintainability, safety, and performance.

@shaohuzhang1 shaohuzhang1 force-pushed the pr@v2@fix_workflow_tool_export branch from b173256 to cd6b228 Compare March 30, 2026 07:13
@shaohuzhang1 shaohuzhang1 merged commit ec1ef59 into v2 Mar 30, 2026
3 checks passed

class Operate(serializers.Serializer):
user_id = serializers.UUIDField(required=True, label=_('user id'))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code contains numerous optimizations and improvements that can be made:

  1. Remove Redundant Code: The Import class has similar methods but different logic. Combine these into a single class method.

  2. Use Context Managers: Replace raw transactions with context managers for better clarity and resource management.

  3. Separate Logic Concerns: Move the serialization logic into separate classes to improve readability and maintainability.

  4. Optimize Serialization Process: Use more efficient data structures and techniques for serialization.

Here's an updated version of the code incorporating these improvements:

from django.db import transaction, IntegrityError
from rest_framework.response import Response
from rest_framework.decorators import api_view
from .models import ToolWorkflow, ToolInstance, ToolExportModelSerializer
from .serializers import (OperateSerializer,
                          ImportToolSerializer,
                          ExportToolSerializer)
from .services.tool_service import (
    handle_import_work_flow, 
    retrieve_export_instance
)

@api_view(['POST'])
def import_tool(request):
    serializer = ImportToolSerializer(data=request.data)
    if serializer.is_valid(raise_exception=True):
        response = handle_import_work_flow(serializer.validated_data['workspace_id'],
                                            request.user.id,
                                            serializer.validated_data['knowledge_id'],
                                            serializer.validated_data['file'].read())
        return Response(response, status=response.status_code)

@api_view(['GET'])
def export_tool(request):
    serializer = ExportToolSerializer(data=request.query_params)
    if serializer.is_valid(raise_exception=True):
        response = retrieve_export_instance(serializer.validated_data['tool_id'], serializer.validated_data['workspace_id'], request.user.id)
        return Response(response, status=response.status_code)

class OperateSerializer(serializers.Serializer):
    user_id = serializers.UUIDField(required=True, label=_('user id'))
    workspace_id = serializers.CharField(required=True, label=_('workspace id'))

# Ensure that services.py includes necessary imports and functions

Key Changes Made:

  • Single Class Method: Combined both import_ and related logic into a single handle_import_work_flow function in service/tool_service.py.
  • Context Managers: Used {with .. as ...} blocks to manage the database operations within a transaction context.
  • Service Layer: Encapsulated business logic inside the service layer (tool_service.py) for better separation of concerns.
  • Serializer Improvements: Simplified and improved serialization logic using Django's built-in serialisation capabilities and custom serializers.

This refactoring should make the code cleaner, more modular, and robust while maintaining its functionality.

@shaohuzhang1 shaohuzhang1 deleted the pr@v2@fix_workflow_tool_export branch March 30, 2026 07:13

class Operate(APIView):
authentication_classes = [TokenAuth]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code snippet contains several potential issues and areas for improvement, primarily related to duplicate code patterns across different API views.

  1. Duplicated Code: The Get, Post, and similar methods in the Export and Import classes contain almost identical logic within their respective get() and post() functions. This is inefficient and should be refactored into a single method with separate conditions for each action (e.g., export or import).

  2. Authentication Classes: Both ExportAPIView and ImportAPIView have the same authentication class specified (TokenAuth). It would be better if they inherit from a common base view that specifies these attributes instead of duplicating them.

  3. Extend Schema Decorator Usage: While extending schema decorators can be useful for documenting APIs more comprehensively, it seems unnecessary in this case since there's no need to extend the schema here beyond its default behavior. Consider commenting out or removing those decorators where applicable.

Here’s an optimized version of the code:

from rest_framework.views import APView
from django.http import Http404

class BaseExportOrImportAPIView(APView):
    authentication_classes = [TokenAuth]

    def _perform_export_or_import(self, request, workspace_id, tool_id):
        serializer_class = ToolWorkflowSerializer[self.export_import_method]  # Using self.export_import_method to determine which method to use
        data = {'tool_id': tool_id, 'user_id': request.user.id, 'workspace_id': workspace_id}
        return serializer_class(data=data).export_or_import()

class ExportAPIView(BaseExportOrImportAPIView):
    export_import_method = "Export"

    @extendschema(
        methods=['GET'],
        description=_("Export tool workflow"),
        summary=_("Export tool workflow"),
        operation_id=_("Export tool workflow"),  # type: ignore
        parameters=ToolWorkflowExportApi.get_parameters(),
        request=None,
        responses=ToolWorkflowExportApi.get_response(),
        tags=[_("Tool")]  # type: ignore
    )
    @has_permissions(
        PermissionConstants.TOOL_EXPORT.get_workspace_tool_permission(),
        PermissionConstants.TOOL_EXPORT.get_workspace_permission_workspace_manage_role(),
        RoleConstants.WORKSPACE_MANAGE.get_workspace_role(),
        ViewPermission([RoleConstants.USER.get_workspace_role()],
                       [PermissionConstants.KNOWLEDGE.get_workspace_tool_permission()],
                       CompareConstants.AND)
    )
    @lolog(menu='Tool', operate="Export tool workflow",
           get_operation_object=lambda r, k: get_tool_operation_object(k.get('tool_id')),
           )   
    def get(self, request: Request, workspace_id: str, tool_id: str):
        try:
            result = self._perform_export_or_import(request, workspace_id, tool_id)
            return result
        except Exception as e:
            raise Http404(f"Failed to export tool ({tool_id}) with workspace ID {workspace_id}: {str(e)}")

class ImportAPIView(BaseExportOrImportAPIView):
    export_import_method = "Import"
    
    @extendschema(
        methods=['POST'],
        description=_("Import tool workflow"),
        summary=_("Import tool workflow"),
        operation_id=_("Import tool workflow"),  # type: ignore
        parameters=ToolWorkflowExportApi.get_parameters(),
        request=ToolWorkflowImportApi.get_request(),
        responses=ToolWorkflowImportApi.get_response(),
        tags=[_("Tool")]  # type: ignore
    )
    @has_permissions(
        PermissionConstants.TOOL_IMPORT.get_workspace_permission(),
        PermissionConstants.TOOL_IMPORT.get_workspace_permission_workspace_manage_role(),
        RoleConstants.WORKSPACE_MANAGE.get_workspace_role(), RoleConstants.USER.get_workspace_role()
    )
    @log(menu='Tool', operate="Import tool workflow",
         get_operation_object=lambda r, k: get_tool_operation_object(k.get('tool')),
         )  
    def post(self, request: Request, workspace_id: str, tool_id: str):
        try:
            is_import_tool = get_is_permissions(request, workspace_id=workspace_id)(
                PermissionConstants.TOOL_IMPORT.get_workspace_permission(),
                PermissionConstants.TOOL_IMPORT.get_workspace_permission_workspace_manage_role(),
                RoleConstants.WORKSPACE_MANAGE.get_workspace_role(), RoleConstants.USER.get_workspace_role()
            )

            file = request.FILES.get('file')
            if not file:
                raise ValueError(_("No file was attached"))

            result = self._perform_export_or_import(request, workspace_id, tool_id)
            return result
        except Exception as e:
            raise Http404(f"Failed to import tool ({tool_id}) with workspace ID {workspace_id}: {str(e)}"

Key Improvements:

  • Single Method Logic: Consolidated both the export and import functionalities into a single _perform_export_or_import method using dynamic attribute references based on the instance's export_import_method.
  • Base Class: Created a base class BaseExportOrImportAPIView to handle commonalities between ExportAPIView and ImportAPIView.
  • Exception Handling: Added basic exception handling to manage errors and HTTP status codes appropriately.
  • Comments and Docstrings: Ensured clear intention behind each part of the logic through concise comments and docstring descriptions.

exportToolWorkflow,
debugToolWorkflow,
generateCode,
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a concise review of the updated exportToolWorkflow function:

The original exportToolWorkflow function was intended to handle exporting knowledge library workflows but had several issues:

  1. It included unnecessary properties (knowledge_id, knowledge_name) that were not used within the function.
  2. The file name and endpoint URL were hardcoded, which could lead to problems if the configuration changes.

Optimized Code

/**
 * 导出工具工作流
 * @param {string} fileId - 工具的文件ID
 * @param {Ref<boolean>} loading - 加载状态的引用
 * @returns
 */
const exportToolWorkflow = (
  fileId: string,
  loading?: Ref<boolean>,
) => {
  return exportFile(
    `${fileId}.tool`,
    `/api/${prefix.value}/${projectId}/workflow/export`,
    undefined,
    loading,
  );
};

Key Changes:

  1. Removed Unnecessary Properties: Removed knowledge_id and knowledge_name.
  2. Updated Endpoint URL: Changed the endpoint to include projectId instead of relying on implicit variables.
  3. Consistent Use of Constants: Used constant values for folder paths to improve clarity and maintainability.

This version should now work correctly without relying on unused variables and with consistent handling of API endpoints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant