Skip to content

Commit e98f424

Browse files
committed
fix: The image understanding and video understanding nodes in the workflow knowledge base are cancelled when the document import is executed. The loop will be completed, and the cancellation will not take effect
1 parent 5451a0b commit e98f424

File tree

6 files changed

+47
-46
lines changed

6 files changed

+47
-46
lines changed

apps/application/flow/knowledge_workflow_manage.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from django.utils.translation import get_language
1515

1616
from application.flow.common import Workflow
17-
from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer
17+
from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer, NodeResult
1818
from application.flow.workflow_manage import WorkflowManage
1919
from common.handle.base_to_response import BaseToResponse
2020
from common.handle.impl.response.system_to_response import SystemToResponse
@@ -90,7 +90,15 @@ def hand_node_result(self, current_node, node_result_future):
9090
# 阻塞获取结果
9191
list(result)
9292
if current_node.status == 500:
93-
return None
93+
enableException = current_node.node.properties.get('enableException')
94+
if not enableException:
95+
return None
96+
current_node.context['exception_message'] = current_node.err_message
97+
current_node.context['branch_id'] = 'exception'
98+
r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
99+
_is_interrupt=lambda node, step_variable, global_variable: False)
100+
r.write_context(current_node, self)
101+
return r
94102
if self.is_the_task_interrupted():
95103
current_node.status = 201
96104
return None
@@ -100,6 +108,15 @@ def hand_node_result(self, current_node, node_result_future):
100108
self.status = 500
101109
current_node.get_write_error_context(e)
102110
self.answer += str(e)
111+
if self.is_the_task_interrupted():
112+
current_node.status = 201
113+
return None
114+
enableException = current_node.node.properties.get('enableException')
115+
if enableException:
116+
current_node.context['exception_message'] = current_node.err_message
117+
current_node.context['branch_id'] = 'exception'
118+
return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
119+
_is_interrupt=lambda node, step_variable, global_variable: False)
103120
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(state=State.FAILURE)
104121
finally:
105122
current_node.node_chunk.end()

apps/application/flow/step_node/loop_node/impl/base_loop_node.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ def loop(workflow_manage_new_instance, node: INode, generate_loop):
200200
instance._cleanup()
201201
if break_outer:
202202
break
203+
if instance.is_the_task_interrupted():
204+
break
203205
node.context['is_interrupt_exec'] = is_interrupt_exec
204206
node.context['loop_node_data'] = loop_node_data
205207
node.context['loop_answer_data'] = loop_answer_data

apps/application/flow/tools.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -496,9 +496,7 @@ def get_workflow_resource(workflow, node_handle):
496496
lambda instance: [instance.stt_model_id] if instance.stt_model_id else []]
497497
}
498498
knowledge_instance_field_call_dict = {
499-
'MODEL': [lambda instance: [instance.model_id] if instance.model_id else [],
500-
lambda instance: [instance.tts_model_id] if instance.tts_model_id else [],
501-
lambda instance: [instance.stt_model_id] if instance.stt_model_id else []],
499+
'MODEL': [lambda instance: [instance.embedding_model_id] if instance.embedding_model_id else []],
502500
}
503501

504502

apps/application/flow/workflow_manage.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -475,17 +475,29 @@ def hand_event_node_result(self, current_node, node_result_future):
475475
else:
476476
list(result)
477477
if current_node.status == 500:
478+
enableException = current_node.node.properties.get('enableException')
479+
if not enableException:
480+
return None
478481
current_node.context['exception_message'] = current_node.err_message
479482
current_node.context['branch_id'] = 'exception'
480483
r = NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
481484
_is_interrupt=lambda node, step_variable, global_variable: False)
482485
r.write_context(current_node, self)
483486
return r
487+
if self.is_the_task_interrupted():
488+
current_node.status = 201
489+
return None
484490
return current_result
485491
except Exception as e:
486492
# 添加节点
487493
maxkb_logger.error(f'Exception: {e}', exc_info=True)
488-
if not current_node.node.properties.get('enableException'):
494+
enableException = current_node.node.properties.get('enableException')
495+
current_node.get_write_error_context(e)
496+
self.status = 500
497+
if self.is_the_task_interrupted():
498+
current_node.status = 201
499+
return None
500+
if not enableException:
489501
chunk = self.base_to_response.to_stream_chunk_response(self.params.get('chat_id'),
490502
self.params.get('chat_id'),
491503
current_node.id,
@@ -499,13 +511,12 @@ def hand_event_node_result(self, current_node, node_result_future):
499511
'real_node_id': real_node_id,
500512
'node_status': 'ERROR'})
501513
current_node.node_chunk.add_chunk(chunk)
502-
current_node.get_write_error_context(e)
503-
self.status = 500
504-
current_node.context['exception_message'] = current_node.err_message
505-
current_node.context['branch_id'] = 'exception'
506-
return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
507-
_is_interrupt=lambda node, step_variable, global_variable: False)
508-
514+
return None
515+
else:
516+
current_node.context['exception_message'] = current_node.err_message
517+
current_node.context['branch_id'] = 'exception'
518+
return NodeResult({'branch_id': 'exception', 'exception': current_node.err_message}, {},
519+
_is_interrupt=lambda node, step_variable, global_variable: False)
509520
finally:
510521
current_node.node_chunk.end()
511522
# 归还链接到连接池

apps/knowledge/serializers/knowledge.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ def one(self):
365365
lambda application_id: all_application_list.__contains__(application_id),
366366
[
367367
str(
368-
application_knowledge_mapping.application_id
368+
application_knowledge_mapping.source_id
369369
) for application_knowledge_mapping in
370370
QuerySet(ResourceMapping).filter(source_type='APPLICATION',
371371
target_type='KNOWLEDGE',
@@ -381,7 +381,6 @@ def edit(self, instance: Dict, select_one=True):
381381
KnowledgeEditRequest(data=instance).is_valid(knowledge=knowledge)
382382
if 'embedding_model_id' in instance:
383383
knowledge.embedding_model_id = instance.get('embedding_model_id')
384-
update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
385384
if "name" in instance:
386385
knowledge.name = instance.get("name")
387386
if 'desc' in instance:
@@ -394,37 +393,8 @@ def edit(self, instance: Dict, select_one=True):
394393
knowledge.file_size_limit = instance.get('file_size_limit')
395394
if 'file_count_limit' in instance:
396395
knowledge.file_count_limit = instance.get('file_count_limit')
397-
if 'application_id_list' in instance and instance.get('application_id_list') is not None:
398-
application_id_list = instance.get('application_id_list')
399-
# 当前用户可修改关联的知识库列表
400-
application_knowledge_id_list = [
401-
str(knowledge_dict.get('id')) for knowledge_dict in self.list_application(with_valid=False)
402-
]
403-
for knowledge_id in application_id_list:
404-
if not application_knowledge_id_list.__contains__(knowledge_id):
405-
raise AppApiException(
406-
500,
407-
_(
408-
'Unknown application id {knowledge_id}, cannot be associated'
409-
).format(knowledge_id=knowledge_id)
410-
)
411-
412-
QuerySet(ResourceMapping).filter(
413-
source_id__in=application_knowledge_id_list,
414-
source_type='APPLICATION',
415-
target_type='KNOWLEDGE',
416-
target_id=self.data.get("knowledge_id")
417-
).delete()
418-
# 插入
419-
QuerySet(ResourceMapping).bulk_create([
420-
ResourceMapping(
421-
source_id=application_id,
422-
source_type='APPLICATION',
423-
target_type='KNOWLEDGE',
424-
target_id=self.data.get('knowledge_id')
425-
) for application_id in application_id_list
426-
]) if len(application_id_list) > 0 else None
427396
knowledge.save()
397+
update_resource_mapping_by_knowledge(str(knowledge.id))
428398
if select_one:
429399
return self.one()
430400
return None
@@ -601,6 +571,7 @@ def save_base(self, instance, with_valid=True):
601571
'user_id': self.data.get('user_id'),
602572
'auth_target_type': AuthTargetType.KNOWLEDGE.value
603573
}).auth_resource(str(knowledge_id))
574+
update_resource_mapping_by_knowledge(str(knowledge_id))
604575
return {
605576
**KnowledgeModelSerializer(knowledge).data,
606577
'user_id': self.data.get('user_id'),
@@ -642,6 +613,7 @@ def save_web(self, instance: Dict, with_valid=True):
642613
}).auth_resource(str(knowledge_id))
643614

644615
sync_web_knowledge.delay(str(knowledge_id), instance.get('source_url'), instance.get('selector'))
616+
update_resource_mapping_by_knowledge(str(knowledge_id))
645617
return {**KnowledgeModelSerializer(knowledge).data, 'document_list': []}
646618

647619
class SyncWeb(serializers.Serializer):

apps/knowledge/serializers/knowledge_workflow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ def import_(self, instance: dict, is_import_tool, with_valid=True):
348348
'auth_target_type': AuthTargetType.TOOL.value
349349
}).auth_resource_batch([t.id for t in tool_model_list])
350350
return True
351+
update_resource_mapping_by_knowledge(knowledge_id)
351352

352353
@staticmethod
353354
def to_knowledge_workflow(knowledge_workflow, update_tool_map):
@@ -429,7 +430,6 @@ def publish(self, with_valid=True):
429430
QuerySet(KnowledgeWorkflow).filter(
430431
knowledge_id=self.data.get("knowledge_id")
431432
).update(is_publish=True, publish_time=timezone.now())
432-
update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
433433
return True
434434

435435
def edit(self, instance: Dict):
@@ -446,6 +446,7 @@ def edit(self, instance: Dict):
446446
defaults={
447447
'work_flow': instance.get('work_flow')
448448
})
449+
update_resource_mapping_by_knowledge(self.data.get("knowledge_id"))
449450
return self.one()
450451
if instance.get("work_flow_template"):
451452
template_instance = instance.get('work_flow_template')

0 commit comments

Comments
 (0)