Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added apps/application/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions apps/application/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
6 changes: 6 additions & 0 deletions apps/application/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ApplicationConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'application'
Empty file.
8 changes: 8 additions & 0 deletions apps/application/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py
@date:2025/5/7 15:14
@desc:
"""
8 changes: 8 additions & 0 deletions apps/application/models/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: application.py
@date:2025/5/7 15:29
@desc:
"""
3 changes: 3 additions & 0 deletions apps/application/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.test import TestCase

# Create your tests here.
8 changes: 8 additions & 0 deletions apps/application/views/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py
@date:2025/5/9 18:51
@desc:
"""
1 change: 1 addition & 0 deletions apps/common/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def parse_md_image(content: str):
image_list = [match.group() for match in matches]
return image_list


def bulk_create_in_batches(model, data, batch_size=1000):
if len(data) == 0:
return
Expand Down
Empty file added apps/workflow/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions apps/workflow/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.contrib import admin

# Register your models here.
6 changes: 6 additions & 0 deletions apps/workflow/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class WorkflowConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'workflow'
Empty file.
8 changes: 8 additions & 0 deletions apps/workflow/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py
@date:2025/5/7 15:43
@desc:
"""
30 changes: 30 additions & 0 deletions apps/workflow/models/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: workflow.py
@date:2025/5/7 15:44
@desc:
"""
from django.db import models
import uuid_utils.compat as uuid


class WorkflowType(models.TextChoices):
# 应用
APPLICATION = "APPLICATION"
# 知识库
KNOWLEDGE = "KNOWLEDGE"
# ....


class Workflow(models.Model):
id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id")
workflow = models.JSONField(verbose_name="工作流数据", default=dict)
type = models.CharField(verbose_name="工作流类型", choices=WorkflowType.choices, default=WorkflowType.APPLICATION)
create_time = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)

class Meta:
db_table = "workflow"
ordering = ['update_time']
3 changes: 3 additions & 0 deletions apps/workflow/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.test import TestCase

# Create your tests here.
8 changes: 8 additions & 0 deletions apps/workflow/views/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py.py
@date:2025/5/7 15:43
@desc:
"""
8 changes: 8 additions & 0 deletions apps/workflow/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py
@date:2025/5/7 16:15
@desc:
"""
214 changes: 214 additions & 0 deletions apps/workflow/workflow/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: workflow.py
@date:2025/5/9 10:58
@desc:
"""
from typing import List, Dict
from queue import Queue, Empty

from common.utils.common import group_by


class Content:
def __init__(self, content: str, reasoning_content: str, **kwargs):
"""
内容
@param content: ai响应内容
@param reasoning_content:思考过程
@param kwargs: 其他参数
"""
self.content = content
self.reasoning_content = reasoning_content
for key in kwargs:
self.__setattr__(key, kwargs.get(key))


class Chunk:

def __init__(self, runtime_id: str, node_id: str, node_name: str, content: Content, node_data, children, loop_index,
**kwargs):
"""

@param runtime_id: 运行时id
@param node_id: 节点id
@param node_name: 节点名称
@param loop_index: 循环下标
@param children: 子块
@param node_data 节点数据
@param content: 内容
"""
self.runtime_id = runtime_id
self.node_id = node_id
self.node_name = node_name
self.loop_index = loop_index
self.children = children
self.content = content
self.node_data = node_data
for key in kwargs:
self.__setattr__(key, kwargs.get(key))


class Channel:
"""
对话管道
"""
messages = Queue()
is_end = False

def write(self, message):
if isinstance(message, Channel) | isinstance(message, Chunk):
if self.is_end:
raise "通道已关闭"
self.messages.put(message)
else:
raise "不支持的管道参数"

def end(self):
self.is_end = True
return self.messages.put(None)

def pop(self):
if self.is_end:
return self.messages.get_nowait()
return self.messages.get()

def generator(self):
while True:
try:
message = self.pop()
if message:
if isinstance(message, Channel):
for chunk in message.generator():
yield chunk
else:
yield message
except Empty:
return


class Node:

def __init__(self, _id: str, _type: str, x: int, y: int, properties: dict, **kwargs):
"""

@param _id: 节点id
@param _type: 类型
@param x: 节点x轴位置
@param y: 节点y轴位置
@param properties:
@param kwargs:
"""
self.id = _id
self.type = _type
self.x = x
self.y = y
self.properties = properties
for keyword in kwargs:
self.__setattr__(keyword, kwargs.get(keyword))


class Edge:
def __init__(self, _id: str, _type: str, sourceNodeId: str, targetNodeId: str, **keywords):
"""
线
@param _id: 线id
@param _type: 线类型
@param sourceNodeId:
@param targetNodeId:
@param keywords:
"""
self.id = _id
self.type = _type
self.sourceNodeId = sourceNodeId
self.targetNodeId = targetNodeId
for keyword in keywords:
self.__setattr__(keyword, keywords.get(keyword))


class EdgeNode:
edge: Edge
node: Node

def __init__(self, edge, node):
self.edge = edge
self.node = node


class Workflow:
"""
节点列表
"""
nodes: List[Node]
"""
线列表
"""
edges: List[Edge]
"""
节点id:node
"""
node_map: Dict[str, Node]
"""
节点id:当前节点id上面的所有节点
"""
up_node_map: Dict[str, List[EdgeNode]]
"""
节点id:当前节点id下面的所有节点
"""
next_node_map: Dict[str, List[EdgeNode]]

def __init__(self, nodes: List[Node], edges: List[Edge]):
self.nodes = nodes
self.edges = edges
self.node_map = {node.id: node for node in nodes}

self.up_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.sourceNodeId)) for
edge in edges] for
key, edges in
group_by(edges, key=lambda edge: edge.targetNodeId).items()}

self.next_node_map = {key: [EdgeNode(edge, self.node_map.get(edge.targetNodeId)) for edge in edges] for
key, edges in
group_by(edges, key=lambda edge: edge.sourceNodeId).items()}

def get_node(self, node_id):
"""
根据node_id 获取节点信息
@param node_id: node_id
@return: 节点信息
"""
return self.node_map.get(node_id)

def get_up_edge_nodes(self, node_id) -> List[EdgeNode]:
"""
根据节点id 获取当前连接前置节点和连线
@param node_id: 节点id
@return: 节点连线列表
"""
return self.up_node_map.get(node_id)

def get_next_edge_nodes(self, node_id) -> List[EdgeNode]:
"""
根据节点id 获取当前连接目标节点和连线
@param node_id: 节点id
@return: 节点连线列表
"""
return self.next_node_map.get(node_id)

def get_up_nodes(self, node_id) -> List[Node]:
"""
根据节点id 获取当前连接前置节点
@param node_id: 节点id
@return: 节点列表
"""
return [en.node for en in self.up_node_map.get(node_id)]

def get_next_nodes(self, node_id) -> List[Node]:
"""
根据节点id 获取当前连接目标节点
@param node_id: 节点id
@return: 节点列表
"""
return [en.node for en in self.next_node_map.get(node_id, [])]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code appears to be a Python implementation of various object models related to workflows in MaxKB, which likely refers to an AI-driven knowledge base platform. Here’s a quick review and some general suggestions:

Code Issues and Corrections

Class Definitions

  • Many class definitions are missing method body definitions. If these classes should have more methods defined here, make sure they do.

Comments and Docstrings

  • The docstring comments lack descriptions for the parameters of many functions and classes. Adding comprehensive docstrings can improve readability and maintainability.

Variable Naming and Consistency

  • Some variable names, such as _id, _type, properties, etc., might not be fully consistent across classes or use cases. Consider using more descriptive variable names where appropriate.

Type Annotations

  • All arguments in function signatures are annotated properly with types from the typing module, which is good.

Method Implementations

  • Several methods like get_node, get_up_edge_nodes, etc., assume that certain keys exist in dictionaries (like up_node_map). Add checks to handle null or undefined values if necessary to avoid KeyError.

Edge Nodes Management

  • The way edge nodes are managed internally through mappings (next_node_map and up_node_map) could benefit from being refactored to ensure efficient lookups and additions/removals.

Error Handling

  • Ensure there is proper error handling both within individual functions and at the level where objects interact. This will make the system robust against invalid inputs.

Optimization Suggestions

  1. Efficient Data Structures:

    • Use more efficient data structures like sets when needed, especially if you frequently need unique items, as lists require searching each time.
  2. Lazy Evaluation in Generators:

    • For generators, consider using asynchronous generators async def. Asynchronous programming can significantly improve performance for large datasets.
  3. Thread Safety:

    • If this workflow component runs concurrently, ensure it is thread-safe. You may want to use locks or other concurrency control mechanisms depending on the environment.
  4. Caching Mechanisms:

    • To handle repetitive requests efficiently, implement caching layers around certain computations.
  5. Code Refactoring:

    • Break down complex logic into smaller, reusable modules wherever possible. Overly long functions can become difficult to understand and maintain.
  6. Performance Profiling:

    • Regularly profile your application to identify bottlenecks. Focus optimizations on areas that show high CPU or memory usage but do not affect functionality.
  7. Security Considerations:

    • Ensure that data handling and transmission processes respect security best practices to protect sensitive information.

These recommendations are general guidelines based on common issues identified during static analysis of similar systems. Depending on specific requirements and constraints, further customization and tuning might be needed.

54 changes: 54 additions & 0 deletions apps/workflow/workflow/i_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: i_node.py
@date:2025/5/7 16:41
@desc:
"""
import time
from abc import abstractmethod

from common.utils.common import get_sha256_hash
from workflow.workflow.common import Channel, Chunk


class INode:
# 当前节点支持的工作流类型
supported_workflow_type_list = []
# 节点类型
type = None
# 节点管道
channel = Channel()

def __init__(self, node, workflow_manage, chunk: Chunk = None, up_node_id_list=None, loop_index=None):
self.node = node
self.chunk = chunk
if chunk is not None:
self.context = chunk.node_data | {}
else:
self.context = {}
# 运行时id
self.runtime_node_id = get_sha256_hash("".join(up_node_id_list | []) + node.id + str(loop_index | ""))
self.workflow_manage = workflow_manage
self.node_serializer = self.get_node_serializer()(data=node.properties.get('node_data'))
self.is_valid()

def is_valid(self):
self.node_serializer.is_valid(raise_exception=True)

def execute(self, **kwargs):
pass

def run(self):
start_time = time.time()
self.context['start_time'] = start_time
self._run()
self.context['run_time'] = time.time() - start_time

def _run(self):
return self.execute(**self.node_serializer.data)

@abstractmethod
def get_node_serializer(self):
pass
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code provided appears to be a template for an INode class that forms part of the MaxKB project. Below are some observations, potential issues, and optimization suggestions:

Observations

  • The file contains documentation comments indicating it's part of the MaxKB project and includes information about authorship (@Author:虎虎) and date created (2025/5/7 16:41).
  • The class follows a similar structure to other classes with methods like execute, _run, and an __init__ method.

Potential Issues

  1. Abstract Method Overloading: The get_node_serializer method is marked as abstract but does not have any implementation defined outside of the base class. This means subclasses must implement this method.

  2. Context Initialization Logic: The context initialization logic uses bitwise OR operators on lists for concatenation, which can lead to unexpected behavior (e.g., empty strings being appended). It should use proper list handling, such as using .extend() instead of .|= if used within a pipeline framework where |= might be intended.

  3. Missing Return Value from execute: The execute method returns nothing, which could indicate a bug or intended lack thereof. If execute should return something meaningful, such as output data or exceptions, it needs to be explicitly defined.

  4. Code Style:

    • Using multiple blank lines between sections might improve readability.
    • Consistent indentation should be maintained throughout the codebase.

Optimization Suggestions

  1. Reduce Magic Numbers and Implied Parameters:

    • Instead of passing explicit parameters directly in the constructor, define default values that are typically inferred (such as no chunk or up_node_id_list). For example:
      def __init__(self,
                   node,
                   workflow_manage,
                   chunk=None,
                   up_node_id_list=(),
                   loop_index=None):
          """
          Initialize the INode instance
      
          Arguments:
              node (dict): Node configuration object
              workflow_manage (obj): Workflow management controller/object
              chunk (Chunk, optional): Data chunk associated with this execution; defaults to None
              up_node_id_list (tuple, optional): IDs of the upstream nodes; defaults to ()
              loop_index (int, optional): Index representing current loop iteration; defaults to None
          """
          ...
  2. Use Class Variables Wisely:

    • Since supported_workflow_type_list and type appear to be static across instances, they could be declared at the class level to prevent repeated instantiation overhead.
  3. Avoid Redundant Checks:

    • While checks within the is_valid method are useful, ensure they do not perform unnecessary computations that may slow down subsequent operations.
  4. Consider Exception Handling:

    • Ensure that error handling is comprehensive enough to deal with various edge cases and unexpected failures during execution.
  5. Implement Type Hinting:

    • Consider adding type hints to make the code more readable and maintainable.

By addressing these points, the code will become more robust, efficient, and easier to understand.

Loading
Loading