-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathapplication_task.py
More file actions
122 lines (107 loc) · 4.44 KB
/
application_task.py
File metadata and controls
122 lines (107 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: application_task.py
@date:2026/1/14 19:14
@desc:
"""
import uuid_utils.compat as uuid
from django.db.models import QuerySet
from application.models import ChatUserType, Chat, ChatRecord, ChatSourceChoices
from chat.serializers.chat import ChatSerializers
from knowledge.models import State
from trigger.handler.base_task import BaseTriggerTask
from trigger.models import TaskRecord
def get_reference(fields, obj):
for field in fields:
value = obj.get(field)
if value is None:
return None
else:
obj = value
return obj
def get_field_value(value, kwargs):
source = value.get('source')
if source == 'custom':
return value.get('value')
else:
return get_reference(value.get('value'), kwargs)
def get_application_execute_parameters(parameter_setting, kwargs):
parameters = {'form_data': {}}
question_setting = parameter_setting.get('question')
if question_setting:
parameters['message'] = get_field_value(question_setting, kwargs)
filed_list = ['image_list', 'document_list', 'audio_list', 'video_list', 'other_list']
for field in filed_list:
field_setting = parameter_setting.get(field)
if field_setting:
parameters[field] = get_field_value(field_setting, kwargs)
api_input_field_list = parameter_setting.get('api_input_field_list')
if api_input_field_list:
for key, value in api_input_field_list.items():
parameters['form_data'][key] = get_field_value(value, kwargs)
user_input_field_list = parameter_setting.get('user_input_field_list')
if user_input_field_list:
for key, value in user_input_field_list.items():
parameters['form_data'][key] = get_field_value(value, kwargs)
return parameters
def get_loop_workflow_node(node_list):
result = []
for item in node_list:
if item.get('type') == 'loop-node':
for loop_item in item.get('loop_node_data') or []:
for inner_item in loop_item.values():
result.append(inner_item)
return result
def get_workflow_state(details):
node_list = details.values()
all_node = [*node_list, *get_loop_workflow_node(node_list)]
err = any([True for value in all_node if value.get('status') == 500 and not value.get('enableException')])
if err:
return State.FAILURE
return State.SUCCESS
class ApplicationTask(BaseTriggerTask):
def support(self, trigger_task, **kwargs):
return trigger_task.get('source_type') == 'APPLICATION'
def execute(self, trigger_task, **kwargs):
parameter_setting = trigger_task.get('parameter')
parameters = get_application_execute_parameters(parameter_setting, kwargs)
parameters['re_chat'] = False
parameters['stream'] = True
chat_id = uuid.uuid7()
chat_user_id = str(uuid.uuid7())
chat_record_id = str(uuid.uuid7())
parameters['chat_record_id'] = chat_record_id
application_id = trigger_task.get('source_id')
message = parameters.get('message')
Chat.objects.get_or_create(id=chat_id, defaults={
'application_id': application_id,
'abstract': message,
'chat_user_id': chat_user_id,
'chat_user_type': ChatUserType.ANONYMOUS_USER.value,
'asker': {'username': "游客"},
'source': {
'type': ChatSourceChoices.TRIGGER.value
},
})
task_record_id = uuid.uuid7()
TaskRecord(id=task_record_id, source_type="APPLICATION", source_id=application_id,
task_record_id=chat_record_id,
meta={'chat_id': chat_id},
state=State.STARTED).save()
try:
list(ChatSerializers(data={
"chat_id": chat_id,
"chat_user_id": chat_user_id,
'chat_user_type': ChatUserType.ANONYMOUS_USER.value,
'application_id': application_id,
'source': {
'type': ChatSourceChoices.TRIGGER.value
},
'debug': False
}).chat(instance=parameters))
finally:
chat_record = QuerySet(ChatRecord).filter(id=chat_record_id).first()
state = get_workflow_state(chat_record.details)
QuerySet(TaskRecord).filter(id=task_record_id).update(state=state, run_time=chat_record.run_time)