Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions apps/application/flow/knowledge_workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from django.utils.translation import get_language

from application.flow.common import Workflow
from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer
from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer, NodeResult
from application.flow.workflow_manage import WorkflowManage
from common.handle.base_to_response import BaseToResponse
from common.handle.impl.response.system_to_response import SystemToResponse
Expand Down Expand Up @@ -90,7 +90,15 @@ def hand_node_result(self, current_node, node_result_future):
# 阻塞获取结果
list(result)
if current_node.status == 500:
return None
enableException = current_node.node.properties.get('enableException')
if not enableException:
return None
current_node.context['exception_message'] = current_node.err_message
current_node.context['branch_id'] = 'exception'
r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
_is_interrupt=lambda node, step_variable, global_variable: False)
r.write_context(current_node, self)
return r
if self.is_the_task_interrupted():
current_node.status = 201
return None
Expand All @@ -100,6 +108,15 @@ def hand_node_result(self, current_node, node_result_future):
self.status = 500
current_node.get_write_error_context(e)
self.answer += str(e)
if self.is_the_task_interrupted():
current_node.status = 201
return None
enableException = current_node.node.properties.get('enableException')
if enableException:
current_node.context['exception_message'] = current_node.err_message
current_node.context['branch_id'] = 'exception'
return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
_is_interrupt=lambda node, step_variable, global_variable: False)
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(state=State.FAILURE)
finally:
current_node.node_chunk.end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ def loop(workflow_manage_new_instance, node: INode, generate_loop):
instance._cleanup()
if break_outer:
break
if instance.is_the_task_interrupted():
break
node.context['is_interrupt_exec'] = is_interrupt_exec
node.context['loop_node_data'] = loop_node_data
node.context['loop_answer_data'] = loop_answer_data
Expand Down
4 changes: 1 addition & 3 deletions apps/application/flow/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,7 @@ def get_workflow_resource(workflow, node_handle):
lambda instance: [instance.stt_model_id] if instance.stt_model_id else []]
}
knowledge_instance_field_call_dict = {
'MODEL': [lambda instance: [instance.model_id] if instance.model_id else [],
lambda instance: [instance.tts_model_id] if instance.tts_model_id else [],
lambda instance: [instance.stt_model_id] if instance.stt_model_id else []],
'MODEL': [lambda instance: [instance.embedding_model_id] if instance.embedding_model_id else []],
}


Expand Down
27 changes: 19 additions & 8 deletions apps/application/flow/workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,17 +475,29 @@ def hand_event_node_result(self, current_node, node_result_future):
else:
list(result)
if current_node.status == 500:
enableException = current_node.node.properties.get('enableException')
if not enableException:
return None
current_node.context['exception_message'] = current_node.err_message
current_node.context['branch_id'] = 'exception'
r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
_is_interrupt=lambda node, step_variable, global_variable: False)
r.write_context(current_node, self)
return r
if self.is_the_task_interrupted():
current_node.status = 201
return None
return current_result
except Exception as e:
# 添加节点
maxkb_logger.error(f'Exception: {e}', exc_info=True)
if not current_node.node.properties.get('enableException'):
enableException = current_node.node.properties.get('enableException')
current_node.get_write_error_context(e)
self.status = 500
if self.is_the_task_interrupted():
current_node.status = 201
return None
if not enableException:
chunk = self.base_to_response.to_stream_chunk_response(self.params.get('chat_id'),
self.params.get('chat_id'),
current_node.id,
Expand All @@ -499,13 +511,12 @@ def hand_event_node_result(self, current_node, node_result_future):
'real_node_id': real_node_id,
'node_status': 'ERROR'})
current_node.node_chunk.add_chunk(chunk)
current_node.get_write_error_context(e)
self.status = 500
current_node.context['exception_message'] = current_node.err_message
current_node.context['branch_id'] = 'exception'
return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
_is_interrupt=lambda node, step_variable, global_variable: False)

return None
else:
current_node.context['exception_message'] = current_node.err_message
current_node.context['branch_id'] = 'exception'
return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
_is_interrupt=lambda node, step_variable, global_variable: False)
finally:
current_node.node_chunk.end()
# 归还链接到连接池
Expand Down
36 changes: 4 additions & 32 deletions apps/knowledge/serializers/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def one(self):
lambda application_id: all_application_list.__contains__(application_id),
[
str(
application_knowledge_mapping.application_id
application_knowledge_mapping.source_id
) for application_knowledge_mapping in
QuerySet(ResourceMapping).filter(source_type='APPLICATION',
target_type='KNOWLEDGE',
Expand All @@ -381,7 +381,6 @@ def edit(self, instance: Dict, select_one=True):
KnowledgeEditRequest(data=instance).is_valid(knowledge=knowledge)
if 'embedding_model_id' in instance:
knowledge.embedding_model_id = instance.get('embedding_model_id')
update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
if "name" in instance:
knowledge.name = instance.get("name")
if 'desc' in instance:
Expand All @@ -394,37 +393,8 @@ def edit(self, instance: Dict, select_one=True):
knowledge.file_size_limit = instance.get('file_size_limit')
if 'file_count_limit' in instance:
knowledge.file_count_limit = instance.get('file_count_limit')
if 'application_id_list' in instance and instance.get('application_id_list') is not None:
application_id_list = instance.get('application_id_list')
# 当前用户可修改关联的知识库列表
application_knowledge_id_list = [
str(knowledge_dict.get('id')) for knowledge_dict in self.list_application(with_valid=False)
]
for knowledge_id in application_id_list:
if not application_knowledge_id_list.__contains__(knowledge_id):
raise AppApiException(
500,
_(
'Unknown application id {knowledge_id}, cannot be associated'
).format(knowledge_id=knowledge_id)
)

QuerySet(ResourceMapping).filter(
source_id__in=application_knowledge_id_list,
source_type='APPLICATION',
target_type='KNOWLEDGE',
target_id=self.data.get("knowledge_id")
).delete()
# 插入
QuerySet(ResourceMapping).bulk_create([
ResourceMapping(
source_id=application_id,
source_type='APPLICATION',
target_type='KNOWLEDGE',
target_id=self.data.get('knowledge_id')
) for application_id in application_id_list
]) if len(application_id_list) > 0 else None
knowledge.save()
update_resource_mapping_by_knowledge(str(knowledge.id))
if select_one:
return self.one()
return None
Expand Down Expand Up @@ -601,6 +571,7 @@ def save_base(self, instance, with_valid=True):
'user_id': self.data.get('user_id'),
'auth_target_type': AuthTargetType.KNOWLEDGE.value
}).auth_resource(str(knowledge_id))
update_resource_mapping_by_knowledge(str(knowledge_id))
return {
**KnowledgeModelSerializer(knowledge).data,
'user_id': self.data.get('user_id'),
Expand Down Expand Up @@ -642,6 +613,7 @@ def save_web(self, instance: Dict, with_valid=True):
}).auth_resource(str(knowledge_id))

sync_web_knowledge.delay(str(knowledge_id), instance.get('source_url'), instance.get('selector'))
update_resource_mapping_by_knowledge(str(knowledge_id))
return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []}

class SyncWeb(serializers.Serializer):
Expand Down
3 changes: 2 additions & 1 deletion apps/knowledge/serializers/knowledge_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ def import_(self, instance: dict, is_import_tool, with_valid=True):
'auth_target_type': AuthTargetType.TOOL.value
}).auth_resource_batch([t.id for t in tool_model_list])
return True
update_resource_mapping_by_knowledge(knowledge_id)

@staticmethod
def to_knowledge_workflow(knowledge_workflow, update_tool_map):
Expand Down Expand Up @@ -429,7 +430,6 @@ def publish(self, with_valid=True):
QuerySet(KnowledgeWorkflow).filter(
knowledge_id=self.data.get("knowledge_id")
).update(is_publish=True, publish_time=timezone.now())
update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
return True

def edit(self, instance: Dict):
Expand All @@ -446,6 +446,7 @@ def edit(self, instance: Dict):
defaults={
'work_flow': instance.get('work_flow')
})
update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
return self.one()
if instance.get("work_flow_template"):
template_instance = instance.get('work_flow_template')
Expand Down