diff --git a/apps/application/flow/knowledge_workflow_manage.py b/apps/application/flow/knowledge_workflow_manage.py index 1a696675cbc..4a19803a367 100644 --- a/apps/application/flow/knowledge_workflow_manage.py +++ b/apps/application/flow/knowledge_workflow_manage.py @@ -14,7 +14,7 @@ from django.utils.translation import get_language from application.flow.common import Workflow -from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer +from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer, NodeResult from application.flow.workflow_manage import WorkflowManage from common.handle.base_to_response import BaseToResponse from common.handle.impl.response.system_to_response import SystemToResponse @@ -90,7 +90,15 @@ def hand_node_result(self, current_node, node_result_future): # 阻塞获取结果 list(result) if current_node.status == 500: - return None + enableException = current_node.node.properties.get('enableException') + if not enableException: + return None + current_node.context['exception_message'] = current_node.err_message + current_node.context['branch_id'] = 'exception' + r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, + _is_interrupt=lambda node, step_variable, global_variable: False) + r.write_context(current_node, self) + return r if self.is_the_task_interrupted(): current_node.status = 201 return None @@ -100,6 +108,15 @@ def hand_node_result(self, current_node, node_result_future): self.status = 500 current_node.get_write_error_context(e) self.answer += str(e) + if self.is_the_task_interrupted(): + current_node.status = 201 + return None + enableException = current_node.node.properties.get('enableException') + if enableException: + current_node.context['exception_message'] = current_node.err_message + current_node.context['branch_id'] = 'exception' + return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, + _is_interrupt=lambda node, step_variable, global_variable: False) QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(state=State.FAILURE) finally: current_node.node_chunk.end() diff --git a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py index cd75272e264..6d08e54776a 100644 --- a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py +++ b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py @@ -200,6 +200,8 @@ def loop(workflow_manage_new_instance, node: INode, generate_loop): instance._cleanup() if break_outer: break + if instance.is_the_task_interrupted(): + break node.context['is_interrupt_exec'] = is_interrupt_exec node.context['loop_node_data'] = loop_node_data node.context['loop_answer_data'] = loop_answer_data diff --git a/apps/application/flow/tools.py b/apps/application/flow/tools.py index d9907ebc9c9..6cbe5555216 100644 --- a/apps/application/flow/tools.py +++ b/apps/application/flow/tools.py @@ -496,9 +496,7 @@ def get_workflow_resource(workflow, node_handle): lambda instance: [instance.stt_model_id] if instance.stt_model_id else []] } knowledge_instance_field_call_dict = { - 'MODEL': [lambda instance: [instance.model_id] if instance.model_id else [], - lambda instance: [instance.tts_model_id] if instance.tts_model_id else [], - lambda instance: [instance.stt_model_id] if instance.stt_model_id else []], + 'MODEL': [lambda instance: [instance.embedding_model_id] if instance.embedding_model_id else []], } diff --git a/apps/application/flow/workflow_manage.py b/apps/application/flow/workflow_manage.py index d7633c6bebc..f9c418b0ff0 100644 --- a/apps/application/flow/workflow_manage.py +++ b/apps/application/flow/workflow_manage.py @@ -475,17 +475,29 @@ def hand_event_node_result(self, current_node, node_result_future): else: list(result) if current_node.status == 500: + enableException = current_node.node.properties.get('enableException') + if not enableException: + return None current_node.context['exception_message'] = current_node.err_message current_node.context['branch_id'] = 'exception' r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, _is_interrupt=lambda node, step_variable, global_variable: False) r.write_context(current_node, self) return r + if self.is_the_task_interrupted(): + current_node.status = 201 + return None return current_result except Exception as e: # 添加节点 maxkb_logger.error(f'Exception: {e}', exc_info=True) - if not current_node.node.properties.get('enableException'): + enableException = current_node.node.properties.get('enableException') + current_node.get_write_error_context(e) + self.status = 500 + if self.is_the_task_interrupted(): + current_node.status = 201 + return None + if not enableException: chunk = self.base_to_response.to_stream_chunk_response(self.params.get('chat_id'), self.params.get('chat_id'), current_node.id, @@ -499,13 +511,12 @@ def hand_event_node_result(self, current_node, node_result_future): 'real_node_id': real_node_id, 'node_status': 'ERROR'}) current_node.node_chunk.add_chunk(chunk) - current_node.get_write_error_context(e) - self.status = 500 - current_node.context['exception_message'] = current_node.err_message - current_node.context['branch_id'] = 'exception' - return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, - _is_interrupt=lambda node, step_variable, global_variable: False) - + return None + else: + current_node.context['exception_message'] = current_node.err_message + current_node.context['branch_id'] = 'exception' + return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {}, + _is_interrupt=lambda node, step_variable, global_variable: False) finally: current_node.node_chunk.end() # 归还链接到连接池 diff --git a/apps/knowledge/serializers/knowledge.py b/apps/knowledge/serializers/knowledge.py index b895d0dfe95..0c997f96d78 100644 --- a/apps/knowledge/serializers/knowledge.py +++ b/apps/knowledge/serializers/knowledge.py @@ -365,7 +365,7 @@ def one(self): lambda application_id: all_application_list.__contains__(application_id), [ str( - application_knowledge_mapping.application_id + application_knowledge_mapping.source_id ) for application_knowledge_mapping in QuerySet(ResourceMapping).filter(source_type='APPLICATION', target_type='KNOWLEDGE', @@ -381,7 +381,6 @@ def edit(self, instance: Dict, select_one=True): KnowledgeEditRequest(data=instance).is_valid(knowledge=knowledge) if 'embedding_model_id' in instance: knowledge.embedding_model_id = instance.get('embedding_model_id') - update_resource_mapping_by_knowledge(self.data.get("knowledge_id")) if "name" in instance: knowledge.name = instance.get("name") if 'desc' in instance: @@ -394,37 +393,8 @@ def edit(self, instance: Dict, select_one=True): knowledge.file_size_limit = instance.get('file_size_limit') if 'file_count_limit' in instance: knowledge.file_count_limit = instance.get('file_count_limit') - if 'application_id_list' in instance and instance.get('application_id_list') is not None: - application_id_list = instance.get('application_id_list') - # 当前用户可修改关联的知识库列表 - application_knowledge_id_list = [ - str(knowledge_dict.get('id')) for knowledge_dict in self.list_application(with_valid=False) - ] - for knowledge_id in application_id_list: - if not application_knowledge_id_list.__contains__(knowledge_id): - raise AppApiException( - 500, - _( - 'Unknown application id {knowledge_id}, cannot be associated' - ).format(knowledge_id=knowledge_id) - ) - - QuerySet(ResourceMapping).filter( - source_id__in=application_knowledge_id_list, - source_type='APPLICATION', - target_type='KNOWLEDGE', - target_id=self.data.get("knowledge_id") - ).delete() - # 插入 - QuerySet(ResourceMapping).bulk_create([ - ResourceMapping( - source_id=application_id, - source_type='APPLICATION', - target_type='KNOWLEDGE', - target_id=self.data.get('knowledge_id') - ) for application_id in application_id_list - ]) if len(application_id_list) > 0 else None knowledge.save() + update_resource_mapping_by_knowledge(str(knowledge.id)) if select_one: return self.one() return None @@ -601,6 +571,7 @@ def save_base(self, instance, with_valid=True): 'user_id': self.data.get('user_id'), 'auth_target_type': AuthTargetType.KNOWLEDGE.value }).auth_resource(str(knowledge_id)) + update_resource_mapping_by_knowledge(str(knowledge_id)) return { **KnowledgeModelSerializer(knowledge).data, 'user_id': self.data.get('user_id'), @@ -642,6 +613,7 @@ def save_web(self, instance: Dict, with_valid=True): }).auth_resource(str(knowledge_id)) sync_web_knowledge.delay(str(knowledge_id), instance.get('source_url'), instance.get('selector')) + update_resource_mapping_by_knowledge(str(knowledge_id)) return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []} class SyncWeb(serializers.Serializer): diff --git a/apps/knowledge/serializers/knowledge_workflow.py b/apps/knowledge/serializers/knowledge_workflow.py index ad21dc5a0d7..b9f7fce0663 100644 --- a/apps/knowledge/serializers/knowledge_workflow.py +++ b/apps/knowledge/serializers/knowledge_workflow.py @@ -348,6 +348,7 @@ def import_(self, instance: dict, is_import_tool, with_valid=True): 'auth_target_type': AuthTargetType.TOOL.value }).auth_resource_batch([t.id for t in tool_model_list]) return True + update_resource_mapping_by_knowledge(knowledge_id) @staticmethod def to_knowledge_workflow(knowledge_workflow, update_tool_map): @@ -429,7 +430,6 @@ def publish(self, with_valid=True): QuerySet(KnowledgeWorkflow).filter( knowledge_id=self.data.get("knowledge_id") ).update(is_publish=True, publish_time=timezone.now()) - update_resource_mapping_by_knowledge(self.data.get("knowledge_id")) return True def edit(self, instance: Dict): @@ -446,6 +446,7 @@ def edit(self, instance: Dict): defaults={ 'work_flow': instance.get('work_flow') }) + update_resource_mapping_by_knowledge(self.data.get("knowledge_id")) return self.one() if instance.get("work_flow_template"): template_instance = instance.get('work_flow_template')