From e98f424b9d30e7c990e67fc8afd499c7dce88344 Mon Sep 17 00:00:00 2001 From: shaohuzhang1 Date: Mon, 5 Jan 2026 13:46:22 +0800 Subject: [PATCH] fix: The image understanding and video understanding nodes in the workflow knowledge base are cancelled when the document import is executed. The loop will be completed, and the cancellation will not take effect --- .../flow/knowledge_workflow_manage.py | 21 +++++++++-- .../loop_node/impl/base_loop_node.py | 2 ++ apps/application/flow/tools.py | 4 +-- apps/application/flow/workflow_manage.py | 27 +++++++++----- apps/knowledge/serializers/knowledge.py | 36 +++---------------- .../serializers/knowledge_workflow.py | 3 +- 6 files changed, 47 insertions(+), 46 deletions(-) diff --git a/apps/application/flow/knowledge_workflow_manage.py b/apps/application/flow/knowledge_workflow_manage.py index 1a696675cbc..4a19803a367 100644 --- a/apps/application/flow/knowledge_workflow_manage.py +++ b/apps/application/flow/knowledge_workflow_manage.py @@ -14,7 +14,7 @@ from django.utils.translation import get_language from application.flow.common import Workflow -from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer +from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer, NodeResult from application.flow.workflow_manage import WorkflowManage from common.handle.base_to_response import BaseToResponse from common.handle.impl.response.system_to_response import SystemToResponse @@ -90,7 +90,15 @@ def hand_node_result(self, current_node, node_result_future): # 阻塞获取结果 list(result) if current_node.status == 500: - return None + enableException = current_node.node.properties.get('enableException') + if not enableException: + return None + current_node.context['exception_message'] = current_node.err_message + current_node.context['branch_id'] = 'exception' + r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, + _is_interrupt=lambda node, step_variable, global_variable: False) + r.write_context(current_node, self) + return r if self.is_the_task_interrupted(): current_node.status = 201 return None @@ -100,6 +108,15 @@ def hand_node_result(self, current_node, node_result_future): self.status = 500 current_node.get_write_error_context(e) self.answer += str(e) + if self.is_the_task_interrupted(): + current_node.status = 201 + return None + enableException = current_node.node.properties.get('enableException') + if enableException: + current_node.context['exception_message'] = current_node.err_message + current_node.context['branch_id'] = 'exception' + return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, + _is_interrupt=lambda node, step_variable, global_variable: False) QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(state=State.FAILURE) finally: current_node.node_chunk.end() diff --git a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py index cd75272e264..6d08e54776a 100644 --- a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py +++ b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py @@ -200,6 +200,8 @@ def loop(workflow_manage_new_instance, node: INode, generate_loop): instance._cleanup() if break_outer: break + if instance.is_the_task_interrupted(): + break node.context['is_interrupt_exec'] = is_interrupt_exec node.context['loop_node_data'] = loop_node_data node.context['loop_answer_data'] = loop_answer_data diff --git a/apps/application/flow/tools.py b/apps/application/flow/tools.py index d9907ebc9c9..6cbe5555216 100644 --- a/apps/application/flow/tools.py +++ b/apps/application/flow/tools.py @@ -496,9 +496,7 @@ def get_workflow_resource(workflow, node_handle): lambda instance: [instance.stt_model_id] if instance.stt_model_id else []] } knowledge_instance_field_call_dict = { - 'MODEL': [lambda instance: [instance.model_id] if instance.model_id else [], - lambda instance: [instance.tts_model_id] if instance.tts_model_id else [], - lambda instance: [instance.stt_model_id] if instance.stt_model_id else []], + 'MODEL': [lambda instance: [instance.embedding_model_id] if instance.embedding_model_id else []], } diff --git a/apps/application/flow/workflow_manage.py b/apps/application/flow/workflow_manage.py index d7633c6bebc..f9c418b0ff0 100644 --- a/apps/application/flow/workflow_manage.py +++ b/apps/application/flow/workflow_manage.py @@ -475,17 +475,29 @@ def hand_event_node_result(self, current_node, node_result_future): else: list(result) if current_node.status == 500: + enableException = current_node.node.properties.get('enableException') + if not enableException: + return None current_node.context['exception_message'] = current_node.err_message current_node.context['branch_id'] = 'exception' r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, _is_interrupt=lambda node, step_variable, global_variable: False) r.write_context(current_node, self) return r + if self.is_the_task_interrupted(): + current_node.status = 201 + return None return current_result except Exception as e: # 添加节点 maxkb_logger.error(f'Exception: {e}', exc_info=True) - if not current_node.node.properties.get('enableException'): + enableException = current_node.node.properties.get('enableException') + current_node.get_write_error_context(e) + self.status = 500 + if self.is_the_task_interrupted(): + current_node.status = 201 + return None + if not enableException: chunk = self.base_to_response.to_stream_chunk_response(self.params.get('chat_id'), self.params.get('chat_id'), current_node.id, @@ -499,13 +511,12 @@ def hand_event_node_result(self, current_node, node_result_future): 'real_node_id': real_node_id, 'node_status': 'ERROR'}) current_node.node_chunk.add_chunk(chunk) - current_node.get_write_error_context(e) - self.status = 500 - current_node.context['exception_message'] = current_node.err_message - current_node.context['branch_id'] = 'exception' - return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, - _is_interrupt=lambda node, step_variable, global_variable: False) - + return None + else: + current_node.context['exception_message'] = current_node.err_message + current_node.context['branch_id'] = 'exception' + return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, + _is_interrupt=lambda node, step_variable, global_variable: False) finally: current_node.node_chunk.end() # 归还链接到连接池 diff --git a/apps/knowledge/serializers/knowledge.py b/apps/knowledge/serializers/knowledge.py index b895d0dfe95..0c997f96d78 100644 --- a/apps/knowledge/serializers/knowledge.py +++ b/apps/knowledge/serializers/knowledge.py @@ -365,7 +365,7 @@ def one(self): lambda application_id: all_application_list.__contains__(application_id), [ str( - application_knowledge_mapping.application_id + application_knowledge_mapping.source_id ) for application_knowledge_mapping in QuerySet(ResourceMapping).filter(source_type='APPLICATION', target_type='KNOWLEDGE', @@ -381,7 +381,6 @@ def edit(self, instance: Dict, select_one=True): KnowledgeEditRequest(data=instance).is_valid(knowledge=knowledge) if 'embedding_model_id' in instance: knowledge.embedding_model_id = instance.get('embedding_model_id') - update_resource_mapping_by_knowledge(self.data.get("knowledge_id")) if "name" in instance: knowledge.name = instance.get("name") if 'desc' in instance: @@ -394,37 +393,8 @@ def edit(self, instance: Dict, select_one=True): knowledge.file_size_limit = instance.get('file_size_limit') if 'file_count_limit' in instance: knowledge.file_count_limit = instance.get('file_count_limit') - if 'application_id_list' in instance and instance.get('application_id_list') is not None: - application_id_list = instance.get('application_id_list') - # 当前用户可修改关联的知识库列表 - application_knowledge_id_list = [ - str(knowledge_dict.get('id')) for knowledge_dict in self.list_application(with_valid=False) - ] - for knowledge_id in application_id_list: - if not application_knowledge_id_list.__contains__(knowledge_id): - raise AppApiException( - 500, - _( - 'Unknown application id {knowledge_id}, cannot be associated' - ).format(knowledge_id=knowledge_id) - ) - - QuerySet(ResourceMapping).filter( - source_id__in=application_knowledge_id_list, - source_type='APPLICATION', - target_type='KNOWLEDGE', - target_id=self.data.get("knowledge_id") - ).delete() - # 插入 - QuerySet(ResourceMapping).bulk_create([ - ResourceMapping( - source_id=application_id, - source_type='APPLICATION', - target_type='KNOWLEDGE', - target_id=self.data.get('knowledge_id') - ) for application_id in application_id_list - ]) if len(application_id_list) > 0 else None knowledge.save() + update_resource_mapping_by_knowledge(str(knowledge.id)) if select_one: return self.one() return None @@ -601,6 +571,7 @@ def save_base(self, instance, with_valid=True): 'user_id': self.data.get('user_id'), 'auth_target_type': AuthTargetType.KNOWLEDGE.value }).auth_resource(str(knowledge_id)) + update_resource_mapping_by_knowledge(str(knowledge_id)) return { **KnowledgeModelSerializer(knowledge).data, 'user_id': self.data.get('user_id'), @@ -642,6 +613,7 @@ def save_web(self, instance: Dict, with_valid=True): }).auth_resource(str(knowledge_id)) sync_web_knowledge.delay(str(knowledge_id), instance.get('source_url'), instance.get('selector')) + update_resource_mapping_by_knowledge(str(knowledge_id)) return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []} class SyncWeb(serializers.Serializer): diff --git a/apps/knowledge/serializers/knowledge_workflow.py b/apps/knowledge/serializers/knowledge_workflow.py index ad21dc5a0d7..b9f7fce0663 100644 --- a/apps/knowledge/serializers/knowledge_workflow.py +++ b/apps/knowledge/serializers/knowledge_workflow.py @@ -348,6 +348,7 @@ def import_(self, instance: dict, is_import_tool, with_valid=True): 'auth_target_type': AuthTargetType.TOOL.value }).auth_resource_batch([t.id for t in tool_model_list]) return True + update_resource_mapping_by_knowledge(knowledge_id) @staticmethod def to_knowledge_workflow(knowledge_workflow, update_tool_map): @@ -429,7 +430,6 @@ def publish(self, with_valid=True): QuerySet(KnowledgeWorkflow).filter( knowledge_id=self.data.get("knowledge_id") ).update(is_publish=True, publish_time=timezone.now()) - update_resource_mapping_by_knowledge(self.data.get("knowledge_id")) return True def edit(self, instance: Dict): @@ -446,6 +446,7 @@ def edit(self, instance: Dict): defaults={ 'work_flow': instance.get('work_flow') }) + update_resource_mapping_by_knowledge(self.data.get("knowledge_id")) return self.one() if instance.get("work_flow_template"): template_instance = instance.get('work_flow_template')