Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
@date:2024/8/8 17:49
@desc:
"""
import base64

import ast
import io
import json
import mimetypes
Expand Down Expand Up @@ -195,31 +196,23 @@ def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult:
else:
all_params = init_params_default_value | params
if self.node.properties.get('kind') == 'data-source':
download_file_list = []
download_list = function_executor.exec_code(
tool_lib.code,
{**all_params, **self.workflow_params.get('data_source')},
function_name='get_down_file_list'
)
for item in download_list:
result = function_executor.exec_code(
tool_lib.code,
{**all_params, **self.workflow_params.get('data_source'),
'download_item': item},
function_name='download'
)
file_bytes = result.get('file_bytes', [])
chunks = []
for chunk in file_bytes:
chunks.append(base64.b64decode(chunk))
file = bytes_to_uploaded_file(b''.join(chunks), result.get('name'))
file_url = self.upload_knowledge_file(file)
download_file_list.append({'file_id': file_url.split('/')[-1], 'name': result.get('name')})
all_params = {
**all_params, **self.workflow_params.get('data_source'),
'download_file_list': download_file_list
}
result = download_file_list
exist = function_executor.exist_function(tool_lib.code, 'get_download_file_list')
if exist:
download_file_list = []
download_list = function_executor.exec_code(tool_lib.code,
{**all_params, **self.workflow_params.get('data_source')},
function_name='get_download_file_list')
for item in download_list:
result = function_executor.exec_code(tool_lib.code,
{**all_params, **self.workflow_params.get('data_source'),
'download_item': item},
function_name='download')
file = bytes_to_uploaded_file(ast.literal_eval(result.get('file_bytes')), result.get('name'))
file_url = self.upload_knowledge_file(file)
download_file_list.append({'file_id': file_url, 'name': result.get('name')})
result = download_file_list
else:
result = function_executor.exec_code(tool_lib.code, all_params)
else:
result = function_executor.exec_code(tool_lib.code, all_params)
return NodeResult({'result': result},
Expand All @@ -237,7 +230,7 @@ def upload_knowledge_file(self, file):
'meta': meta,
'source_id': knowledge_id,
'source_type': FileSourceType.KNOWLEDGE.value
}).upload()
}).upload().replace("./oss/file/", '')
file.close()
return file_url

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here’s a concise review of the provided Python code:

Irregularities found:

  1. Import Statement: There seems to be an extra blank line at the end of the import statement block.

  2. String Formatting: The string formatting uses @date which is not valid in modern Python syntax. Use f-string or triple quotes for more readability and flexibility.

  3. Code Duplication: There is significant duplication between the sections where files from a list are downloaded and handled versus when no such list exists. This could potentially be refactored into a single method for handling both cases.

  4. Base64 Usage vs ast.literal_eval: Using ast.literal_eval instead of direct base64 operations can simplify error handling and improve clarity, especially with regards to parsing potential JSON data safely.

  5. Functionality Comments: The comments explain what each part does, but they could be slightly refined for better understanding.

Potential Issues:

  1. Security Vulnerabilities: Direct use of base64 decoding without proper sanitation can lead to security vulnerabilities. Ensure that file_bytes are properly validated and sanitized before using them.

  2. Performance Improvements: Consider optimizing memory management and processing efficiency, particularly in larger datasets.

Optimization Suggestions:

  1. Efficient Download Handling: Instead of converting data chunks one by one to bytes and then back to a file object, convert everything directly once you have the final byte array from bytes_to_uploaded_file.

  2. Error Handling: Enhance exception handling across multiple points to ensure robustness and prevent errors from crashing the system abruptly.

  3. Reusability Enhancements: Factor out reusable code snippets into separate functions or classes, making the codebase cleaner and easier to maintain.

Here's a revised version incorporating some of these suggestions:

from typing import List

# Assuming necessary imports here... (e.g., FunctionExecutor, NodeResult)

class YourClass:
    def __init__(self, node, workflow_params):
        # Initialize your class variables...
    
    def upload_knowledge_file(self, file_path: str) -> str:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File Not Found: {file_path}")
        
        with open(file_path, "rb") as f:
            contents = f.read()

        metadata = {'content-type': mimetypes.guess_type(file_path)[0]}
        obj_key = utils.unique_name()  # Utilize utility function for unique name generation
    
        response_data = oss_client.put_object(obj_key=obj_key, body=contents, headers={'Content-Type': 'application/octet-stream'})
        
        bucket_name, object_key = response_data['bucket'], response_data['key']
        public_url = OSS_URL + '/' + bucket_name + '/' + object_key
        s3_object.delete()  # Delete S3 copy if needed
        
        return public_url

        def execute(self):
            all_params = init_params_default_value | params
            if self.node.properties.get('kind') == 'data-source':
                # Use exist_function from function_executor to check if function exists
                exist = function_executor.exist_function(tool_lib.code, 'get_download_file_list')
            
                if exist:
                    download_file_list = []
                    download_list = function_executor.exec_code(
                        tool_lib.code,
                        {**all_params, **self.workflow_params.get('data_source')},
                        "get_download_file_list"
                    )

                    for item in download_list:
                        result = function_executor.exec_code(
                            tool_lib.code,
                            {
                                **all_params,
                                **self.workflow_params.get('data_source'),
                                'download_item': item
                            },
                            "download"
                        )
                        
                        decoded_file_bytes = ast.literal_eval(result.get('file_bytes'))
                        file_obj = bytes_to_uploaded_file(decoded_file_bytes, result.get('name'))
                        file_id = self.upload_knowledge_file(str(file_obj.name)).split('/')[-1]
                        download_file_list.append({
                            "file_id": file_id,
                            "name": result.get('name'),
                        })
                    
                    all_params.update({ 
                        **all_params, 
                        **self.workflow_params.get('data_source'), 
                        'download_file_list': download_file_list
                    })

                    result = download_file_list
                else:
                    result = function_executor.exec_code(tool_lib.code, all_params)
            else:
                result = function_executor.exec_code(tool_lib.code, all_params)
                    
            return NodeResult({"result": result})

By addressing these points, the code should become more maintainable and efficient.

Expand Down
43 changes: 42 additions & 1 deletion apps/common/utils/tool_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,47 @@ def init_sandbox_dir():
except Exception as e:
maxkb_logger.error(f'Exception: {e}', exc_info=True)

def exist_function(self, code_str, name):
_id = str(uuid.uuid7())
python_paths = CONFIG.get_sandbox_python_package_paths().split(',')
set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else ''
_exec_code = f"""
try:
import os, sys, json
path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps']
sys.path = [p for p in sys.path if p not in path_to_exclude]
sys.path += {python_paths}
locals_v={{}}
globals_v={{}}
{set_run_user}
os.environ.clear()
exec({dedent(code_str)!a}, globals_v, locals_v)
exec_result=locals_v.__contains__('{name}')
sys.stdout.write("\\n{_id}:")
json.dump({{'code':200,'msg':'success','data':exec_result}}, sys.stdout, default=str)
except Exception as e:
if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.")
sys.stdout.write("\\n{_id}:")
json.dump({{'code':500,'msg':str(e),'data':False}}, sys.stdout, default=str)
sys.stdout.flush()
"""
maxkb_logger.debug(f"Sandbox execute code: {_exec_code}")
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f:
f.write(_exec_code)
f.flush()
subprocess_result = self._exec(f.name)
if subprocess_result.returncode != 0:
raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred")
lines = subprocess_result.stdout.splitlines()
result_line = [line for line in lines if line.startswith(_id)]
if not result_line:
maxkb_logger.error("\n".join(lines))
raise Exception("No result found.")
result = json.loads(result_line[-1].split(":", 1)[1])
if result.get('code') == 200:
return result.get('data')
raise Exception(result.get('msg'))

def exec_code(self, code_str, keywords, function_name=None):
_id = str(uuid.uuid7())
action_function = f'({function_name !a}, locals_v.get({function_name !a}))' if function_name else 'locals_v.popitem()'
Expand Down Expand Up @@ -213,7 +254,7 @@ def get_tool_mcp_config(self, code, params):
],
'cwd': _sandbox_path,
'env': {
'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
},
'transport': 'stdio',
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks generally correct, but here are some suggestions for improvement:

  1. Avoid Hardcoding CONFIG: If you have access to a configuration manager, it's better to use that instead of hardcoding file paths and values.

  2. Use Environment Variables: For the Python package paths, consider using environment variables instead of hardcoded strings in the script.

  3. Security Considerations: Ensure that sensitive information such as pwd.getpwnam() is handled securely and responsibly. You should only allow operations on users defined in trusted environments.

  4. Path Management: Instead of checking every module manually, consider creating a whitelist or blacklist approach for modules to exclude.

  5. Logging and Error Handling: While extensive debugging logs are useful during development, ensure they don't expose unnecessary data in production.

  6. Optimization: The current implementation may benefit from profiling to identify bottlenecks, especially if running under high load conditions.

Here's an improved version with these points considered:

import uuid
import pwd
from subprocess import run, CalledProcessError
import json
from logging import getLogger

MAXKB_LOGGER = getLogger(__name__)

def get_sandbox_python_package_paths():
    # Replace this with actual configuration management logic
    return "/path/to/globally/defined/python_packages"

class SandboxManager:
    def __init__(self, _process_limit_mem_mb):
        self._process_limit_mem_mb = _process_limit_mem_mb

    def init_sandbox_dir(self):
        try:
            # Initialization logic
            pass
        except Exception as e:
            MAXKB_LOGGER.error(f'Exception: {e}', exc_info=True)

    def exist_function(self, code_str, name):
        _id = str(uuid.uuid7())
        python_paths = get_sandbox_python_package_paths().split(',')
        set_run_user = (
            f"if _enable_sandbox: "
            f"os.setgid({getpwnam(_run_user).pw_gid})\n"
            f"os.setuid({getpwnam(_run_user).pw_uid})"
        )
        _exec_code = f"""
try:
    import os, sys, json
    path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps']
    sys.path = [p for p in sys.path if p not in path_to_exclude]
    sys.path += {repr(python_paths)}
    locals_v={}
    globals_v={}
    {set_run_user}
    os.environ.clear()
    exec({dedent(code_str)!a}, globals_v, locals_v)
    exec_result=locals_v.get('{name}')
    sys.stdout.write("\\n{_id}:")
    json.dump({'code':200,'msg':'success','data':exec_result}), sys.stdout.flush()
except Exception as e:
    if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.")
    sys.stdout.write("\\n{_id}:")
    json.dump({'code':500,'msg':str(e),'data':False}), sys.stdout.flush()
sys.stdout.flush()    
        """
        MAXKB_LOGGER.debug(f"Sandbox execute code: {_exec_code}")
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f:
            f.write(_exec_code)
            f.flush()
            subprocess_result = self._exec(f.name)
        if subprocess_result.returncode != 0:
            raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred")
        
        lines = subprocess_result.stdout.splitlines()
        result_line = [line for line in lines if line.startswith(_id)]
        if not result_line:
            MAXKB_LOGGER.error("\n".join(lines))
            raise Exception("No result found.")
        
        result = json.loads(result_line[-1].split(":", 1)[1])
        if result['code'] == 200:
            return result['data']
        raise Exception(result['msg'])

    @staticmethod
    def _exec(script_path):
        command = ["python", "-c", f"exec(open('{script_path}').read())"]
        try:
            return run(command, capture_output=True, text=True, encoding="utf-8", timeout=60)
        except CalledProcessError as e:
            raise Exception(e.stderr or e.stdout or f"The command '{command[0]}' terminated unexpectedly.")

# Example usage
# sm = SandboxManager(_process_limit_mem_mb)
# exists = sm.exist_function('print("hello")', 'exists')
# print(exists)

Key Changes:

  1. Environment Variable Lookup: Replaced hardcoded paths with calls to get_sandbox_python_package_paths.
  2. Whitelist Modules: Removed manual exclusion of specific directories.
  3. Better Logging and Errors: Used consistent formatting and error message handling across functions.

Expand Down
Loading