-
Notifications
You must be signed in to change notification settings - Fork 2.8k
feat: workflow init #3072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: workflow init #3072
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from django.contrib import admin | ||
|
|
||
| # Register your models here. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| from django.apps import AppConfig | ||
|
|
||
|
|
||
| class ApplicationConfig(AppConfig): | ||
| default_auto_field = 'django.db.models.BigAutoField' | ||
| name = 'application' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: __init__.py | ||
| @date:2025/5/7 15:14 | ||
| @desc: | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: application.py | ||
| @date:2025/5/7 15:29 | ||
| @desc: | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from django.test import TestCase | ||
|
|
||
| # Create your tests here. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: __init__.py | ||
| @date:2025/5/9 18:51 | ||
| @desc: | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from django.contrib import admin | ||
|
|
||
| # Register your models here. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| from django.apps import AppConfig | ||
|
|
||
|
|
||
| class WorkflowConfig(AppConfig): | ||
| default_auto_field = 'django.db.models.BigAutoField' | ||
| name = 'workflow' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: __init__.py | ||
| @date:2025/5/7 15:43 | ||
| @desc: | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: workflow.py | ||
| @date:2025/5/7 15:44 | ||
| @desc: | ||
| """ | ||
| from django.db import models | ||
| import uuid_utils.compat as uuid | ||
|
|
||
|
|
||
| class WorkflowType(models.TextChoices): | ||
| # 应用 | ||
| APPLICATION = "APPLICATION" | ||
| # 知识库 | ||
| KNOWLEDGE = "KNOWLEDGE" | ||
| # .... | ||
|
|
||
|
|
||
| class Workflow(models.Model): | ||
| id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id") | ||
| workflow = models.JSONField(verbose_name="工作流数据", default=dict) | ||
| type = models.CharField(verbose_name="工作流类型", choices=WorkflowType.choices, default=WorkflowType.APPLICATION) | ||
| create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True) | ||
| update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True) | ||
|
|
||
| class Meta: | ||
| db_table = "workflow" | ||
| ordering = ['update_time'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from django.test import TestCase | ||
|
|
||
| # Create your tests here. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: __init__.py.py | ||
| @date:2025/5/7 15:43 | ||
| @desc: | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: __init__.py | ||
| @date:2025/5/7 16:15 | ||
| @desc: | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,214 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: workflow.py | ||
| @date:2025/5/9 10:58 | ||
| @desc: | ||
| """ | ||
| from typing import List, Dict | ||
| from queue import Queue, Empty | ||
|
|
||
| from common.utils.common import group_by | ||
|
|
||
|
|
||
| class Content: | ||
| def __init__(self, content: str, reasoning_content: str, **kwargs): | ||
| """ | ||
| 内容 | ||
| @param content: ai响应内容 | ||
| @param reasoning_content:思考过程 | ||
| @param kwargs: 其他参数 | ||
| """ | ||
| self.content = content | ||
| self.reasoning_content = reasoning_content | ||
| for key in kwargs: | ||
| self.__setattr__(key, kwargs.get(key)) | ||
|
|
||
|
|
||
| class Chunk: | ||
|
|
||
| def __init__(self, runtime_id: str, node_id: str, node_name: str, content: Content, node_data, children, loop_index, | ||
| **kwargs): | ||
| """ | ||
|
|
||
| @param runtime_id: 运行时id | ||
| @param node_id: 节点id | ||
| @param node_name: 节点名称 | ||
| @param loop_index: 循环下标 | ||
| @param children: 子块 | ||
| @param node_data 节点数据 | ||
| @param content: 内容 | ||
| """ | ||
| self.runtime_id = runtime_id | ||
| self.node_id = node_id | ||
| self.node_name = node_name | ||
| self.loop_index = loop_index | ||
| self.children = children | ||
| self.content = content | ||
| self.node_data = node_data | ||
| for key in kwargs: | ||
| self.__setattr__(key, kwargs.get(key)) | ||
|
|
||
|
|
||
| class Channel: | ||
| """ | ||
| 对话管道 | ||
| """ | ||
| messages = Queue() | ||
| is_end = False | ||
|
|
||
| def write(self, message): | ||
| if isinstance(message, Channel) | isinstance(message, Chunk): | ||
| if self.is_end: | ||
| raise "通道已关闭" | ||
| self.messages.put(message) | ||
| else: | ||
| raise "不支持的管道参数" | ||
|
|
||
| def end(self): | ||
| self.is_end = True | ||
| return self.messages.put(None) | ||
|
|
||
| def pop(self): | ||
| if self.is_end: | ||
| return self.messages.get_nowait() | ||
| return self.messages.get() | ||
|
|
||
| def generator(self): | ||
| while True: | ||
| try: | ||
| message = self.pop() | ||
| if message: | ||
| if isinstance(message, Channel): | ||
| for chunk in message.generator(): | ||
| yield chunk | ||
| else: | ||
| yield message | ||
| except Empty: | ||
| return | ||
|
|
||
|
|
||
| class Node: | ||
|
|
||
| def __init__(self, _id: str, _type: str, x: int, y: int, properties: dict, **kwargs): | ||
| """ | ||
|
|
||
| @param _id: 节点id | ||
| @param _type: 类型 | ||
| @param x: 节点x轴位置 | ||
| @param y: 节点y轴位置 | ||
| @param properties: | ||
| @param kwargs: | ||
| """ | ||
| self.id = _id | ||
| self.type = _type | ||
| self.x = x | ||
| self.y = y | ||
| self.properties = properties | ||
| for keyword in kwargs: | ||
| self.__setattr__(keyword, kwargs.get(keyword)) | ||
|
|
||
|
|
||
| class Edge: | ||
| def __init__(self, _id: str, _type: str, sourceNodeId: str, targetNodeId: str, **keywords): | ||
| """ | ||
| 线 | ||
| @param _id: 线id | ||
| @param _type: 线类型 | ||
| @param sourceNodeId: | ||
| @param targetNodeId: | ||
| @param keywords: | ||
| """ | ||
| self.id = _id | ||
| self.type = _type | ||
| self.sourceNodeId = sourceNodeId | ||
| self.targetNodeId = targetNodeId | ||
| for keyword in keywords: | ||
| self.__setattr__(keyword, keywords.get(keyword)) | ||
|
|
||
|
|
||
| class EdgeNode: | ||
| edge: Edge | ||
| node: Node | ||
|
|
||
| def __init__(self, edge, node): | ||
| self.edge = edge | ||
| self.node = node | ||
|
|
||
|
|
||
| class Workflow: | ||
| """ | ||
| 节点列表 | ||
| """ | ||
| nodes: List[Node] | ||
| """ | ||
| 线列表 | ||
| """ | ||
| edges: List[Edge] | ||
| """ | ||
| 节点id:node | ||
| """ | ||
| node_map: Dict[str, Node] | ||
| """ | ||
| 节点id:当前节点id上面的所有节点 | ||
| """ | ||
| up_node_map: Dict[str, List[EdgeNode]] | ||
| """ | ||
| 节点id:当前节点id下面的所有节点 | ||
| """ | ||
| next_node_map: Dict[str, List[EdgeNode]] | ||
|
|
||
| def __init__(self, nodes: List[Node], edges: List[Edge]): | ||
| self.nodes = nodes | ||
| self.edges = edges | ||
| self.node_map = {node.id: node for node in nodes} | ||
|
|
||
| self.up_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.sourceNodeId)) for | ||
| edge in edges] for | ||
| key, edges in | ||
| group_by(edges, key=lambda edge: edge.targetNodeId).items()} | ||
|
|
||
| self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for | ||
| key, edges in | ||
| group_by(edges, key=lambda edge: edge.sourceNodeId).items()} | ||
|
|
||
| def get_node(self, node_id): | ||
| """ | ||
| 根据node_id 获取节点信息 | ||
| @param node_id: node_id | ||
| @return: 节点信息 | ||
| """ | ||
| return self.node_map.get(node_id) | ||
|
|
||
| def get_up_edge_nodes(self, node_id) -> List[EdgeNode]: | ||
| """ | ||
| 根据节点id 获取当前连接前置节点和连线 | ||
| @param node_id: 节点id | ||
| @return: 节点连线列表 | ||
| """ | ||
| return self.up_node_map.get(node_id) | ||
|
|
||
| def get_next_edge_nodes(self, node_id) -> List[EdgeNode]: | ||
| """ | ||
| 根据节点id 获取当前连接目标节点和连线 | ||
| @param node_id: 节点id | ||
| @return: 节点连线列表 | ||
| """ | ||
| return self.next_node_map.get(node_id) | ||
|
|
||
| def get_up_nodes(self, node_id) -> List[Node]: | ||
| """ | ||
| 根据节点id 获取当前连接前置节点 | ||
| @param node_id: 节点id | ||
| @return: 节点列表 | ||
| """ | ||
| return [en.node for en in self.up_node_map.get(node_id)] | ||
|
|
||
| def get_next_nodes(self, node_id) -> List[Node]: | ||
| """ | ||
| 根据节点id 获取当前连接目标节点 | ||
| @param node_id: 节点id | ||
| @return: 节点列表 | ||
| """ | ||
| return [en.node for en in self.next_node_map.get(node_id, [])] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎虎 | ||
| @file: i_node.py | ||
| @date:2025/5/7 16:41 | ||
| @desc: | ||
| """ | ||
| import time | ||
| from abc import abstractmethod | ||
|
|
||
| from common.utils.common import get_sha256_hash | ||
| from workflow.workflow.common import Channel, Chunk | ||
|
|
||
|
|
||
| class INode: | ||
| # 当前节点支持的工作流类型 | ||
| supported_workflow_type_list = [] | ||
| # 节点类型 | ||
| type = None | ||
| # 节点管道 | ||
| channel = Channel() | ||
|
|
||
| def __init__(self, node, workflow_manage, chunk: Chunk = None, up_node_id_list=None, loop_index=None): | ||
| self.node = node | ||
| self.chunk = chunk | ||
| if chunk is not None: | ||
| self.context = chunk.node_data | {} | ||
| else: | ||
| self.context = {} | ||
| # 运行时id | ||
| self.runtime_node_id = get_sha256_hash("".join(up_node_id_list | []) + node.id + str(loop_index | "")) | ||
| self.workflow_manage = workflow_manage | ||
| self.node_serializer = self.get_node_serializer()(data=node.properties.get('node_data')) | ||
| self.is_valid() | ||
|
|
||
| def is_valid(self): | ||
| self.node_serializer.is_valid(raise_exception=True) | ||
|
|
||
| def execute(self, **kwargs): | ||
| pass | ||
|
|
||
| def run(self): | ||
| start_time = time.time() | ||
| self.context['start_time'] = start_time | ||
| self._run() | ||
| self.context['run_time'] = time.time() - start_time | ||
|
|
||
| def _run(self): | ||
| return self.execute(**self.node_serializer.data) | ||
|
|
||
| @abstractmethod | ||
| def get_node_serializer(self): | ||
| pass | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code provided appears to be a template for an Observations
Potential Issues
Optimization Suggestions
By addressing these points, the code will become more robust, efficient, and easier to understand. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code appears to be a Python implementation of various object models related to workflows in MaxKB, which likely refers to an AI-driven knowledge base platform. Here’s a quick review and some general suggestions:
Code Issues and Corrections
Class Definitions
Comments and Docstrings
Variable Naming and Consistency
_id,_type,properties, etc., might not be fully consistent across classes or use cases. Consider using more descriptive variable names where appropriate.Type Annotations
typingmodule, which is good.Method Implementations
get_node,get_up_edge_nodes, etc., assume that certain keys exist in dictionaries (likeup_node_map). Add checks to handle null or undefined values if necessary to avoidKeyError.Edge Nodes Management
next_node_mapandup_node_map) could benefit from being refactored to ensure efficient lookups and additions/removals.Error Handling
Optimization Suggestions
Efficient Data Structures:
Lazy Evaluation in Generators:
async def. Asynchronous programming can significantly improve performance for large datasets.Thread Safety:
Caching Mechanisms:
Code Refactoring:
Performance Profiling:
Security Considerations:
These recommendations are general guidelines based on common issues identified during static analysis of similar systems. Depending on specific requirements and constraints, further customization and tuning might be needed.