-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Expand file tree
/
Copy pathtrigger_task.py
More file actions
114 lines (96 loc) · 5.01 KB
/
trigger_task.py
File metadata and controls
114 lines (96 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: trigger_task.py
@date:2026/1/14 16:34
@desc:
"""
import os
from django.db import models
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from application.models import ChatRecord
from common.db.search import native_page_search, get_dynamics_model
from common.exception.app_exception import AppApiException
from common.utils.common import get_file_content
from maxkb.conf import PROJECT_DIR
from trigger.models import TriggerTask, TaskRecord
class ChatRecordSerializerModel(serializers.ModelSerializer):
class Meta:
model = ChatRecord
fields = ['id', 'chat_id', 'vote_status', 'vote_reason', 'vote_other_content', 'problem_text', 'answer_text',
'message_tokens', 'answer_tokens', 'const', 'improve_paragraph_id_list', 'run_time', 'index',
'answer_text_list', 'details',
'create_time', 'update_time']
class TriggerTaskResponse(serializers.ModelSerializer):
class Meta:
model = TriggerTask
fields = "__all__"
class TriggerTaskQuerySerializer(serializers.Serializer):
trigger_id = serializers.CharField(required=True, label=_("Trigger ID"))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
def get_query_set(self):
query_set = QuerySet(TriggerTask).filter(workspace_id=self.data.get("workspace_id")).filter(
trigger_id=self.data.get("trigger_id"))
return query_set
def list(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
return [TriggerTaskResponse(row).data for row in self.get_query_set()]
class TriggerTaskRecordOperateSerializer(serializers.Serializer):
trigger_id = serializers.CharField(required=True, label=_("Trigger ID"))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
trigger_task_id = serializers.CharField(required=True, label=_("Trigger task ID"))
trigger_task_record_id = serializers.CharField(required=True, label=_("Trigger task record ID"))
def get_execution_details(self, is_valid=True):
if is_valid:
self.is_valid(raise_exception=True)
task_record = QuerySet(TaskRecord).filter(trigger_id=self.data.get("trigger_id"),
trigger_task_id=self.data.get("trigger_task_id"),
id=self.data.get('trigger_task_record_id')).first()
if not task_record:
raise AppApiException(500, _('Trigger task record id does not exist'))
if task_record.source_type == 'APPLICATION':
chat_record = QuerySet(ChatRecord).filter(id=task_record.task_record_id).first()
return ChatRecordSerializerModel(chat_record).data
if task_record.source_type == 'TOOL':
pass
return None
class TriggerTaskRecordQuerySerializer(serializers.Serializer):
trigger_id = serializers.CharField(required=True, label=_("Trigger ID"))
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
state = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_('Trigger state'))
name = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_('Trigger name'))
order = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('Order field'))
def get_query_set(self):
trigger_query_set = QuerySet(
model=get_dynamics_model({
'ett.create_time': models.DateTimeField(),
'ett.state': models.CharField(),
'sdc.name': models.CharField(),
'ett.workspace_id': models.CharField(),
'ett.trigger_id': models.UUIDField(),
}))
trigger_query_set = trigger_query_set.filter(
**{'ett.trigger_id': self.data.get("trigger_id")})
if self.data.get("order"):
trigger_query_set = trigger_query_set.order_by(self.data.get("order"))
else:
trigger_query_set = trigger_query_set.order_by("-ett.create_time")
if self.data.get('state'):
trigger_query_set = trigger_query_set.filter(**{'ett.state': self.data.get('state')})
if self.data.get("name"):
trigger_query_set = trigger_query_set.filter(**{'sdc.name__contains': self.data.get('name')})
return trigger_query_set
def list(self, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
return [TriggerTaskResponse(row).data for row in self.get_query_set()]
def page(self, current_page, page_size, with_valid=True):
if with_valid:
self.is_valid(raise_exception=True)
return native_page_search(current_page, page_size, self.get_query_set(), get_file_content(
os.path.join(PROJECT_DIR, "apps", "trigger", "sql", 'get_trigger_task_record_page_list.sql')
))