Skip to content

fix: update file handling functions#4455

Merged
shaohuzhang1 merged 1 commit intov2from
pr@v2@fix_base_tool
Dec 8, 2025
Merged

fix: update file handling functions#4455
shaohuzhang1 merged 1 commit intov2from
pr@v2@fix_base_tool

Conversation

@shaohuzhang1
Copy link
Copy Markdown
Contributor

fix: update file handling functions

@f2c-ci-robot
Copy link
Copy Markdown

f2c-ci-robot bot commented Dec 8, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@f2c-ci-robot
Copy link
Copy Markdown

f2c-ci-robot bot commented Dec 8, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@shaohuzhang1 shaohuzhang1 merged commit 8a9de59 into v2 Dec 8, 2025
3 of 6 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@v2@fix_base_tool branch December 8, 2025 06:26
}).upload().replace("./oss/file/", '')
file.close()
return file_url

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here’s a concise review of the provided Python code:

Irregularities found:

  1. Import Statement: There seems to be an extra blank line at the end of the import statement block.

  2. String Formatting: The string formatting uses @date which is not valid in modern Python syntax. Use f-string or triple quotes for more readability and flexibility.

  3. Code Duplication: There is significant duplication between the sections where files from a list are downloaded and handled versus when no such list exists. This could potentially be refactored into a single method for handling both cases.

  4. Base64 Usage vs ast.literal_eval: Using ast.literal_eval instead of direct base64 operations can simplify error handling and improve clarity, especially with regards to parsing potential JSON data safely.

  5. Functionality Comments: The comments explain what each part does, but they could be slightly refined for better understanding.

Potential Issues:

  1. Security Vulnerabilities: Direct use of base64 decoding without proper sanitation can lead to security vulnerabilities. Ensure that file_bytes are properly validated and sanitized before using them.

  2. Performance Improvements: Consider optimizing memory management and processing efficiency, particularly in larger datasets.

Optimization Suggestions:

  1. Efficient Download Handling: Instead of converting data chunks one by one to bytes and then back to a file object, convert everything directly once you have the final byte array from bytes_to_uploaded_file.

  2. Error Handling: Enhance exception handling across multiple points to ensure robustness and prevent errors from crashing the system abruptly.

  3. Reusability Enhancements: Factor out reusable code snippets into separate functions or classes, making the codebase cleaner and easier to maintain.

Here's a revised version incorporating some of these suggestions:

from typing import List

# Assuming necessary imports here... (e.g., FunctionExecutor, NodeResult)

class YourClass:
    def __init__(self, node, workflow_params):
        # Initialize your class variables...
    
    def upload_knowledge_file(self, file_path: str) -> str:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File Not Found: {file_path}")
        
        with open(file_path, "rb") as f:
            contents = f.read()

        metadata = {'content-type': mimetypes.guess_type(file_path)[0]}
        obj_key = utils.unique_name()  # Utilize utility function for unique name generation
    
        response_data = oss_client.put_object(obj_key=obj_key, body=contents, headers={'Content-Type': 'application/octet-stream'})
        
        bucket_name, object_key = response_data['bucket'], response_data['key']
        public_url = OSS_URL + '/' + bucket_name + '/' + object_key
        s3_object.delete()  # Delete S3 copy if needed
        
        return public_url

        def execute(self):
            all_params = init_params_default_value | params
            if self.node.properties.get('kind') == 'data-source':
                # Use exist_function from function_executor to check if function exists
                exist = function_executor.exist_function(tool_lib.code, 'get_download_file_list')
            
                if exist:
                    download_file_list = []
                    download_list = function_executor.exec_code(
                        tool_lib.code,
                        {**all_params, **self.workflow_params.get('data_source')},
                        "get_download_file_list"
                    )

                    for item in download_list:
                        result = function_executor.exec_code(
                            tool_lib.code,
                            {
                                **all_params,
                                **self.workflow_params.get('data_source'),
                                'download_item': item
                            },
                            "download"
                        )
                        
                        decoded_file_bytes = ast.literal_eval(result.get('file_bytes'))
                        file_obj = bytes_to_uploaded_file(decoded_file_bytes, result.get('name'))
                        file_id = self.upload_knowledge_file(str(file_obj.name)).split('/')[-1]
                        download_file_list.append({
                            "file_id": file_id,
                            "name": result.get('name'),
                        })
                    
                    all_params.update({ 
                        **all_params, 
                        **self.workflow_params.get('data_source'), 
                        'download_file_list': download_file_list
                    })

                    result = download_file_list
                else:
                    result = function_executor.exec_code(tool_lib.code, all_params)
            else:
                result = function_executor.exec_code(tool_lib.code, all_params)
                    
            return NodeResult({"result": result})

By addressing these points, the code should become more maintainable and efficient.

'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so',
},
'transport': 'stdio',
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks generally correct, but here are some suggestions for improvement:

  1. Avoid Hardcoding CONFIG: If you have access to a configuration manager, it's better to use that instead of hardcoding file paths and values.

  2. Use Environment Variables: For the Python package paths, consider using environment variables instead of hardcoded strings in the script.

  3. Security Considerations: Ensure that sensitive information such as pwd.getpwnam() is handled securely and responsibly. You should only allow operations on users defined in trusted environments.

  4. Path Management: Instead of checking every module manually, consider creating a whitelist or blacklist approach for modules to exclude.

  5. Logging and Error Handling: While extensive debugging logs are useful during development, ensure they don't expose unnecessary data in production.

  6. Optimization: The current implementation may benefit from profiling to identify bottlenecks, especially if running under high load conditions.

Here's an improved version with these points considered:

import uuid
import pwd
from subprocess import run, CalledProcessError
import json
from logging import getLogger

MAXKB_LOGGER = getLogger(__name__)

def get_sandbox_python_package_paths():
    # Replace this with actual configuration management logic
    return "/path/to/globally/defined/python_packages"

class SandboxManager:
    def __init__(self, _process_limit_mem_mb):
        self._process_limit_mem_mb = _process_limit_mem_mb

    def init_sandbox_dir(self):
        try:
            # Initialization logic
            pass
        except Exception as e:
            MAXKB_LOGGER.error(f'Exception: {e}', exc_info=True)

    def exist_function(self, code_str, name):
        _id = str(uuid.uuid7())
        python_paths = get_sandbox_python_package_paths().split(',')
        set_run_user = (
            f"if _enable_sandbox: "
            f"os.setgid({getpwnam(_run_user).pw_gid})\n"
            f"os.setuid({getpwnam(_run_user).pw_uid})"
        )
        _exec_code = f"""
try:
    import os, sys, json
    path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps']
    sys.path = [p for p in sys.path if p not in path_to_exclude]
    sys.path += {repr(python_paths)}
    locals_v={}
    globals_v={}
    {set_run_user}
    os.environ.clear()
    exec({dedent(code_str)!a}, globals_v, locals_v)
    exec_result=locals_v.get('{name}')
    sys.stdout.write("\\n{_id}:")
    json.dump({'code':200,'msg':'success','data':exec_result}), sys.stdout.flush()
except Exception as e:
    if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.")
    sys.stdout.write("\\n{_id}:")
    json.dump({'code':500,'msg':str(e),'data':False}), sys.stdout.flush()
sys.stdout.flush()    
        """
        MAXKB_LOGGER.debug(f"Sandbox execute code: {_exec_code}")
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f:
            f.write(_exec_code)
            f.flush()
            subprocess_result = self._exec(f.name)
        if subprocess_result.returncode != 0:
            raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred")
        
        lines = subprocess_result.stdout.splitlines()
        result_line = [line for line in lines if line.startswith(_id)]
        if not result_line:
            MAXKB_LOGGER.error("\n".join(lines))
            raise Exception("No result found.")
        
        result = json.loads(result_line[-1].split(":", 1)[1])
        if result['code'] == 200:
            return result['data']
        raise Exception(result['msg'])

    @staticmethod
    def _exec(script_path):
        command = ["python", "-c", f"exec(open('{script_path}').read())"]
        try:
            return run(command, capture_output=True, text=True, encoding="utf-8", timeout=60)
        except CalledProcessError as e:
            raise Exception(e.stderr or e.stdout or f"The command '{command[0]}' terminated unexpectedly.")

# Example usage
# sm = SandboxManager(_process_limit_mem_mb)
# exists = sm.exist_function('print("hello")', 'exists')
# print(exists)

Key Changes:

  1. Environment Variable Lookup: Replaced hardcoded paths with calls to get_sandbox_python_package_paths.
  2. Whitelist Modules: Removed manual exclusion of specific directories.
  3. Better Logging and Errors: Used consistent formatting and error message handling across functions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant