From d8265a4c223f43603502f94e783c4383449b2070 Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Thu, 26 Feb 2026 03:40:11 +0530 Subject: [PATCH 1/2] feat: implement sandbox isolation with worker system --- shuffle_sdk/__init__.py | 24 +- shuffle_sdk/sandbox.py | 336 ++++++++++++++++++++++++++ shuffle_sdk/sandbox_worker.py | 429 ++++++++++++++++++++++++++++++++++ shuffle_sdk/shuffle_sdk.py | 205 +++++++++------- 4 files changed, 903 insertions(+), 91 deletions(-) create mode 100644 shuffle_sdk/sandbox.py create mode 100644 shuffle_sdk/sandbox_worker.py diff --git a/shuffle_sdk/__init__.py b/shuffle_sdk/__init__.py index 05d5ff4..4028f71 100755 --- a/shuffle_sdk/__init__.py +++ b/shuffle_sdk/__init__.py @@ -1,7 +1,25 @@ # __init__.py -from .shuffle_sdk import AppBase, csv_parse +# In Docker image, shuffle_sdk.py is renamed to app_base.py +try: + from .app_base import AppBase, csv_parse, shuffle_filters +except (ImportError, ModuleNotFoundError): + from .shuffle_sdk import AppBase, csv_parse, shuffle_filters -__all__ = ["AppBase", "csv_parse"] # Define the public API of your package +from .sandbox import run_python, run_bash, run_liquid, is_available, configure, SANDBOX_ENABLED + +__all__ = [ + "AppBase", + "csv_parse", + "shuffle_filters", + "run_python", + "run_bash", + "run_liquid", + "is_available", + "configure", + "SANDBOX_ENABLED", +] -#print("Initializing shuffle_sdk package...") __version__ = '0.0.26' + +import sys +print(f"[SHUFFLE_SDK] Initialized", file=sys.stderr, flush=True) diff --git a/shuffle_sdk/sandbox.py b/shuffle_sdk/sandbox.py new file mode 100644 index 0000000..cf46158 --- /dev/null +++ b/shuffle_sdk/sandbox.py @@ -0,0 +1,336 @@ +""" +Sandbox - Spawn isolated subprocess for code execution. + +HOW IT WORKS: +1. Caller provides code + type (python/bash/liquid) +2. We spawn sandbox_worker.py as a subprocess +3. Send code as JSON via stdin +4. Read result as JSON from stdout +5. Return result to caller + +ISOLATION PROVIDED: +- Fresh Python interpreter (no shared state between executions) +- Resource limits applied in worker (memory, CPU, files) +- Clean environment (no inherited secrets) +- Timeout enforcement +- Output size limits + +USAGE: + from shuffle_sdk.sandbox import run_python, run_bash + + result = run_python("print(1 + 1)") + # {"success": True, "result": "2"} + + result = run_bash("echo hello") + # {"success": True, "result": "hello"} +""" + +import os +import sys +import json +import subprocess + + +# ============================================================================= +# CONFIGURATION +# ============================================================================= + +# Path to the worker script (same directory as this file) +WORKER_PATH = os.path.join(os.path.dirname(__file__), "sandbox_worker.py") + +# SANDBOX MODE: Defaults to True (sandboxed execution enabled) +# Set to False to disable sandboxing and run code directly (NOT RECOMMENDED) +# You must explicitly set this to False to disable sandboxing +SANDBOX_ENABLED = True + +# Limits +# Use SHUFFLE_APP_SDK_TIMEOUT env var if available, otherwise 60 seconds +# But set to 55 seconds (5 less) to give worker time before parent timeout +_env_timeout = os.getenv("SHUFFLE_APP_SDK_TIMEOUT") +TIMEOUT_SECONDS = int(_env_timeout) - 5 if _env_timeout else 55 +MAX_OUTPUT_BYTES = 10 * 1024 * 1024 # 10MB + +# Print to stderr at module load time to ensure visibility +_msg = "=" * 80 +print(_msg, file=sys.stderr, flush=True) +print("[SANDBOX] SANDBOX MODE ENABLED BY DEFAULT", file=sys.stderr, flush=True) +print("[SANDBOX] All code execution is isolated in subprocesses", file=sys.stderr, flush=True) +print("[SANDBOX] To disable, set sandbox.SANDBOX_ENABLED = False (not recommended)", file=sys.stderr, flush=True) +print(_msg, file=sys.stderr, flush=True) + + +# ============================================================================= +# CONTEXT HELPERS +# ============================================================================= + +def _extract_sdk_context(sdk_instance): + """ + Extract serializable context from SDK instance. + Instead of pickling, we extract key data and send as JSON. + Worker can reconstruct or use directly. + Returns None if extraction fails. + """ + if not sdk_instance: + return None + try: + context = { + "url": getattr(sdk_instance, "url", ""), + "base_url": getattr(sdk_instance, "base_url", ""), + "authorization": getattr(sdk_instance, "authorization", ""), + "current_execution_id": getattr(sdk_instance, "current_execution_id", ""), + "full_execution": getattr(sdk_instance, "full_execution", {}), + "action": getattr(sdk_instance, "action", {}), + "original_action": getattr(sdk_instance, "original_action", {}), + "start_time": getattr(sdk_instance, "start_time", 0), + "proxy_config": getattr(sdk_instance, "proxy_config", {}), + "local_storage": getattr(sdk_instance, "local_storage", []), + } + + # Try to include singul reference if available + try: + if hasattr(sdk_instance, "singul") and sdk_instance.singul: + context["has_singul"] = True + except: + pass + + return context + except Exception as e: + print(f"[SANDBOX] Failed to extract SDK context: {e}", file=sys.stderr, flush=True) + return None + + +# ============================================================================= +# CORE EXECUTION +# ============================================================================= + +def _run_worker(exec_type, code, sdk_instance=None, extra_context=None): + """ + Spawn worker subprocess and execute code. + + Args: + exec_type: "python", "bash", or "liquid" + code: Code/command/template to execute + sdk_instance: Optional SDK instance (will be pickled) + extra_context: Optional extra dict to merge into context + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + """ + if not SANDBOX_ENABLED: + print("[SANDBOX] WARNING: SANDBOXING IS DISABLED! This is unsafe.", file=sys.stderr, flush=True) + print("[SANDBOX] Code is running in the main process without isolation.", file=sys.stderr, flush=True) + print("[SANDBOX] To re-enable, set: sandbox.SANDBOX_ENABLED = True", file=sys.stderr, flush=True) + # Build request with SDK context (extracted data instead of pickle) + sdk_context = _extract_sdk_context(sdk_instance) + request = { + "type": exec_type, + "code": code, + "sdk_context": sdk_context, + "extra_context": extra_context or {}, + } + request_json = json.dumps(request) + + # Clean environment for worker + # Put the app directory (parent of shuffle_sdk/) first in PYTHONPATH + # so the local shuffle_sdk package is found before the system-level one + app_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + python_path = app_dir + ":" + ":".join(sys.path) + + env = { + "PATH": "/usr/local/bin:/usr/bin:/bin", + "HOME": "/tmp", + "TMPDIR": "/tmp", + "PYTHONPATH": python_path, + "LANG": "C.UTF-8", + "LC_ALL": "C.UTF-8", + } + + code_preview = code[:200] if len(code) > 200 else code + msg1 = f"[SANDBOX] Starting {exec_type} execution in subprocess" + msg2 = f"[SANDBOX] Type: {exec_type}" + msg3 = f"[SANDBOX] Code: {repr(code_preview)}{'...' if len(code) > 200 else ''}" + msg4 = f"[SANDBOX] Has SDK context: {sdk_instance is not None}" + + for msg in [msg1, msg2, msg3, msg4]: + print(msg, file=sys.stderr, flush=True) + + try: + # Spawn worker + msg = f"[SANDBOX] Spawning worker subprocess with worker script: {WORKER_PATH}" + print(msg, file=sys.stderr, flush=True) + + proc = subprocess.Popen( + [sys.executable, WORKER_PATH], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd="/tmp", + start_new_session=True, # Own process group for clean termination + ) + + msg = f"[SANDBOX] Worker process spawned (PID: {proc.pid})" + print(msg, file=sys.stderr, flush=True) + + # Send request, wait for response + try: + msg = f"[SANDBOX] Sending request to worker, timeout={TIMEOUT_SECONDS}s" + print(msg, file=sys.stderr, flush=True) + + stdout, stderr = proc.communicate( + input=request_json.encode("utf-8"), + timeout=TIMEOUT_SECONDS, + ) + + msg = f"[SANDBOX] Worker completed with return code: {proc.returncode}" + print(msg, file=sys.stderr, flush=True) + print(f"[SANDBOX] stdout length: {len(stdout)} bytes", file=sys.stderr, flush=True) + print(f"[SANDBOX] stderr length: {len(stderr)} bytes", file=sys.stderr, flush=True) + if stdout: + stdout_text = stdout.decode('utf-8', errors='replace') + print(f"[SANDBOX] stdout (JSON result): {stdout_text[:300]}", file=sys.stderr, flush=True) + if stderr: + stderr_text = stderr.decode('utf-8', errors='replace') + print(f"[SANDBOX] stderr (worker logs): {stderr_text[:200]}", file=sys.stderr, flush=True) + except subprocess.TimeoutExpired: + msg = f"[SANDBOX] Timeout! Killing process group after {TIMEOUT_SECONDS}s" + print(msg, file=sys.stderr, flush=True) + # Kill the entire process group + try: + os.killpg(os.getpgid(proc.pid), 9) + except: + proc.kill() + proc.wait() + return {"success": False, "error": f"Execution timed out after {TIMEOUT_SECONDS} seconds"} + + # Limit output size + if len(stdout) > MAX_OUTPUT_BYTES: + msg = f"[SANDBOX] Output truncated from {len(stdout)} to {MAX_OUTPUT_BYTES} bytes" + print(msg, file=sys.stderr, flush=True) + stdout = stdout[:MAX_OUTPUT_BYTES] + + # Parse result + if proc.returncode == 0 and stdout: + try: + result = json.loads(stdout.decode("utf-8")) + msg = f"[SANDBOX] Successfully parsed JSON result" + print(msg, file=sys.stderr, flush=True) + return result + except json.JSONDecodeError: + result_text = stdout.decode("utf-8", errors="replace") + msg = f"[SANDBOX] Output is not JSON, returning as text" + print(msg, file=sys.stderr, flush=True) + return {"success": True, "result": result_text} + else: + error_msg = stderr.decode("utf-8", errors="replace") if stderr else f"Exit code {proc.returncode}" + msg = f"[SANDBOX] Execution failed: {error_msg}" + print(msg, file=sys.stderr, flush=True) + return {"success": False, "error": error_msg} + + except FileNotFoundError: + msg = f"[SANDBOX] Worker script not found: {WORKER_PATH}" + print(msg, file=sys.stderr, flush=True) + return {"success": False, "error": f"Worker script not found: {WORKER_PATH}"} + except Exception as e: + msg = f"[SANDBOX] Unexpected error during execution: {e}" + print(msg, file=sys.stderr, flush=True) + return {"success": False, "error": f"Sandbox error: {e}"} + + +# ============================================================================= +# PUBLIC API +# ============================================================================= + +def run_python(code, sdk_instance=None): + """ + Execute Python code in isolated subprocess. + + Args: + code: Python source code to execute + sdk_instance: Optional AppBase instance (pickled and available as 'self') + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + + Example: + result = run_python("print(2 + 2)") + # {"success": True, "result": "4"} + """ + return _run_worker("python", code, sdk_instance) + + +def run_bash(code, sdk_instance=None, shuffle_input=None): + """ + Execute bash command in isolated subprocess. + + Args: + code: Bash command to execute + sdk_instance: Optional AppBase instance (pickled) + shuffle_input: Optional string available as $SHUFFLE_INPUT + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + + Example: + result = run_bash("echo hello world") + # {"success": True, "result": "hello world"} + """ + extra = {"shuffle_input": shuffle_input} if shuffle_input else None + return _run_worker("bash", code, sdk_instance, extra) + + +def run_liquid(template, sdk_instance=None): + """ + Render Liquid template in isolated subprocess. + + Args: + template: Liquid template string + sdk_instance: Optional AppBase instance (pickled) + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + + Example: + result = run_liquid("Hello {{ name }}") + # {"success": True, "result": "Hello ..."} + """ + # Extract action parameters from SDK instance to make them available in liquid template + extra_context = {} + if sdk_instance: + try: + # Get action parameters if they exist + if hasattr(sdk_instance, 'action') and isinstance(sdk_instance.action, dict): + # Add each parameter to context by name + for param in sdk_instance.action.get('parameters', []): + param_name = param.get('name', '') + param_value = param.get('value', '') + if param_name: + extra_context[param_name] = param_value + + # Also add env dict if it exists (for backward compatibility) + if hasattr(sdk_instance, 'env') and isinstance(sdk_instance.env, dict): + extra_context.update(sdk_instance.env) + except: + pass # Ignore errors extracting context + + return _run_worker("liquid", template, sdk_instance, extra_context if extra_context else None) + + +def is_available(): + """Check if sandbox worker exists.""" + return os.path.exists(WORKER_PATH) + + +def configure(timeout_seconds=None, max_output_bytes=None): + """ + Update sandbox configuration. + + Args: + timeout_seconds: Max execution time (default 60) + max_output_bytes: Max output size (default 10MB) + """ + global TIMEOUT_SECONDS, MAX_OUTPUT_BYTES + if timeout_seconds is not None: + TIMEOUT_SECONDS = timeout_seconds + if max_output_bytes is not None: + MAX_OUTPUT_BYTES = max_output_bytes diff --git a/shuffle_sdk/sandbox_worker.py b/shuffle_sdk/sandbox_worker.py new file mode 100644 index 0000000..c733d28 --- /dev/null +++ b/shuffle_sdk/sandbox_worker.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +""" +Sandbox Worker - Isolated code execution via stdin/stdout. + +HOW IT WORKS: +1. Parent process spawns this worker as a subprocess +2. Worker reads JSON from stdin (contains code + context) +3. Worker applies resource limits (memory, CPU, etc.) +4. Worker executes the code in isolation +5. Worker writes JSON result to stdout + +ISOLATION PROVIDED: +- Fresh Python interpreter per execution (no shared state) +- Resource limits prevent runaway processes +- Clean environment (no inherited secrets) +- Runs as separate process (can't access parent memory) + +USAGE: + echo '{"type": "python", "code": "print(1+1)"}' | python sandbox_worker.py +""" + +import sys +import json +import resource +import subprocess +from io import StringIO + + +# ============================================================================= +# LOGGING +# ============================================================================= + +def log_stderr(msg): + """Log message to stderr with worker prefix""" + print(f"[EXECUTE_WORKER] {msg}", file=sys.stderr, flush=True) + + +# ============================================================================= +# RESOURCE LIMITS +# ============================================================================= + +def apply_limits(): + """ + Apply resource limits immediately when worker starts. + + These limits prevent: + - Memory exhaustion (512MB max) + - CPU hogging (60 seconds max) + - Disk filling (50MB max file size) + - File descriptor exhaustion (100 max open files) + - Core dumps (disabled) + """ + import platform + + # Memory: 512MB max + try: + mem_bytes = 512 * 1024 * 1024 + resource.setrlimit(resource.RLIMIT_AS, (mem_bytes, mem_bytes)) + except (ValueError, resource.error): + pass # Some systems don't support this + + # CPU: 60 seconds max + try: + resource.setrlimit(resource.RLIMIT_CPU, (60, 60)) + except (ValueError, resource.error): + pass + + # File size: 50MB max per file + try: + file_bytes = 50 * 1024 * 1024 + resource.setrlimit(resource.RLIMIT_FSIZE, (file_bytes, file_bytes)) + except (ValueError, resource.error): + pass + + # Open files: 100 max + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (100, 100)) + except (ValueError, resource.error): + pass + + # Process limit: 50 max (Linux only - causes issues on macOS) + if platform.system() == "Linux": + try: + resource.setrlimit(resource.RLIMIT_NPROC, (50, 50)) + except (ValueError, resource.error): + pass + + # No core dumps + try: + resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) + except (ValueError, resource.error): + pass + + +# Apply limits immediately when worker starts +apply_limits() + + +# ============================================================================= +# HELPERS +# ============================================================================= + +def _reconstruct_sdk(sdk_context): + """ + Reconstruct a minimal SDK object from context data. + This gives us access to sdk attributes without needing pickle. + Returns a simple object with the context attributes, or None if context is empty. + """ + if not sdk_context: + return None + + try: + # Create a simple object to hold the context attributes + class SDKContextHolder: + pass + + sdk = SDKContextHolder() + + # Apply all context attributes + for key, value in sdk_context.items(): + setattr(sdk, key, value) + + # Try to initialize singul if the context indicates it exists + if sdk_context.get("has_singul"): + try: + # Try to create a fresh singul object + # This assumes the environment variables are set properly + from shuffle_sdk import AppBase + temp_app = AppBase() + sdk.singul = temp_app.singul + except: + # If we can't initialize singul, at least the context is there + pass + + return sdk + except Exception as e: + return None + + +# ============================================================================= +# EXECUTION FUNCTIONS +# ============================================================================= + +def execute_python(code, sdk, context): + """ + Execute Python code and capture output. + + Args: + code: Python source code to execute + sdk: SDK instance (available as 'self') + context: Extra context dict + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + """ + try: + # Capture print() output + output = StringIO() + + def captured_print(*args, **kwargs): + kwargs["file"] = output + print(*args, **kwargs) + + # Build execution environment (mirrors what app.py provides) + exec_env = globals().copy() + exec_env["print"] = captured_print + exec_env["self"] = sdk + + # singul/shuffle point to the Singul API object (same as app.py) + if sdk: + try: + exec_env["singul"] = sdk.singul + exec_env["shuffle"] = sdk.singul + except: + pass + + # Execute the code with comprehensive error handling + try: + exec(code, exec_env) + except SystemExit: + pass # Allow exit() without crashing + except SyntaxError as e: + if "'return' outside function" in str(e): + return {"success": False, "error": "Use exit() instead of return at top level", "error_type": "SyntaxError"} + return {"success": False, "error": f"SyntaxError: {e}", "error_type": "SyntaxError"} + except IndentationError as e: + return {"success": False, "error": f"IndentationError: {e}", "error_type": "IndentationError"} + except TypeError as e: + return {"success": False, "error": f"TypeError: {e}", "error_type": "TypeError"} + except NameError as e: + return {"success": False, "error": f"NameError: {e}", "error_type": "NameError"} + except ValueError as e: + return {"success": False, "error": f"ValueError: {e}", "error_type": "ValueError"} + except Exception as e: + import traceback + etype = type(e).__name__ + # Log full traceback to stderr for debugging + print(f"[EXECUTE_PYTHON] UNEXPECTED ERROR: {etype}: {e}", file=sys.stderr, flush=True) + print(f"[EXECUTE_PYTHON] Traceback: {traceback.format_exc()}", file=sys.stderr, flush=True) + msg = f"There was an error executing your Python code. Type: {etype}. Details: {e}" + return {"success": False, "error": msg, "error_type": etype} + + # Get output + result = output.getvalue().strip() + + # Try to parse as JSON (common pattern) + try: + return {"success": True, "result": json.loads(result)} + except (json.JSONDecodeError, ValueError): + return {"success": True, "result": result} + + except Exception as e: + import traceback + print(f"[EXECUTE_PYTHON] OUTER ERROR: {type(e).__name__}: {e}", file=sys.stderr, flush=True) + print(f"[EXECUTE_PYTHON] Traceback: {traceback.format_exc()}", file=sys.stderr, flush=True) + return {"success": False, "error": str(e), "error_type": type(e).__name__} + + +def execute_bash(code, context): + """ + Execute bash command and capture output. + + Args: + code: Bash command to execute + context: Dict with extra data (shuffle_input available as $SHUFFLE_INPUT) + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + """ + try: + # Clean environment - no inherited secrets + env = { + "PATH": "/usr/local/bin:/usr/bin:/bin", + "HOME": "/tmp", + "TMPDIR": "/tmp", + "LANG": "C.UTF-8", + } + + # Add shuffle_input if provided + shuffle_input = context.get("shuffle_input", "") + if shuffle_input: + env["SHUFFLE_INPUT"] = shuffle_input + + # Run the command + proc = subprocess.Popen( + code, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd="/tmp", + ) + + stdout, stderr = proc.communicate(timeout=55) + + # Get output (prefer stdout, fall back to stderr) + output = stdout.decode("utf-8", errors="replace").strip() + if not output and stderr: + output = stderr.decode("utf-8", errors="replace").strip() + + if proc.returncode == 0: + return {"success": True, "result": output} + else: + return {"success": False, "error": output or f"Exit code {proc.returncode}"} + + except subprocess.TimeoutExpired: + proc.kill() + return {"success": False, "error": "Command timed out after 55 seconds"} + except Exception as e: + return {"success": False, "error": str(e)} + + +def execute_liquid(template, sdk, context): + """ + Render a Liquid template with proper exception handling. + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ..., "error_type": ...} + """ + try: + from liquid import Liquid + import jinja2 + + # Import filters + try: + from shuffle_sdk import shuffle_filters + except ImportError: + from walkoff_app_sdk import shuffle_filters + + # Build template globals + template_globals = {} + if context: + template_globals.update(context) + if sdk: + template_globals["self"] = sdk + try: + template_globals["singul"] = sdk.singul + template_globals["shuffle"] = sdk.singul + except: + pass + + # Render - let exceptions bubble up to be caught below + liq = Liquid(template, mode="wild", from_file=False, + filters=shuffle_filters.filters, globals=template_globals) + result = liq.render() + return {"success": True, "result": result} + + except jinja2.exceptions.TemplateNotFound as e: + msg = f"There was a Liquid input error (1). Details: {e}" + return {"success": False, "error": msg, "error_type": "TemplateNotFound"} + except SyntaxError as e: + msg = f"There was a syntax error in your Liquid input (2). Details: {e}" + return {"success": False, "error": msg, "error_type": "SyntaxError"} + except IndentationError as e: + msg = f"There was an indentation error in your Liquid input (2). Details: {e}" + return {"success": False, "error": msg, "error_type": "IndentationError"} + except jinja2.exceptions.TemplateSyntaxError as e: + msg = f"There was a syntax error in your Liquid input (2). Details: {e}" + return {"success": False, "error": msg, "error_type": "TemplateSyntaxError"} + except json.JSONDecodeError as e: + msg = f"There was a syntax error in your input JSON (2). This is typically an issue with escaping newlines. Details: {e}" + return {"success": False, "error": msg, "error_type": "JSONDecodeError"} + except TypeError as e: + msg = f"There was a type error in your Liquid input (2). Details: {e}" + return {"success": False, "error": msg, "error_type": "TypeError"} + except Exception as e: + import traceback + etype = type(e).__name__ + # Log full traceback to stderr so we can debug + log_stderr(f"UNEXPECTED ERROR in execute_liquid: {etype}: {e}") + log_stderr(f"Traceback: {traceback.format_exc()}") + msg = f"There was a general error in your Liquid input. Type: {etype}. Details: {e}" + return {"success": False, "error": msg, "error_type": etype} + + +# ============================================================================= +# MAIN ENTRY POINT +# ============================================================================= + +def main(): + """ + Read JSON from stdin, execute requested operation, output JSON result. + + Input format: + { + "type": "python" | "bash" | "liquid", + "code": "...", + "context": {...} # Optional SDK state + } + + Output format: + {"success": true, "result": ...} + or + {"success": false, "error": "..."} + """ + # Log to stderr so it doesn't interfere with stdout (which is JSON output) + import sys + def log_stderr(msg): + print(f"[SANDBOX_WORKER] {msg}", file=sys.stderr, flush=True) + + # log_stderr("Worker process started") + + try: + # Read input from stdin + # log_stderr("Reading request from stdin...") + raw_input = sys.stdin.read() + if not raw_input.strip(): + print(json.dumps({"success": False, "error": "No input provided"})) + sys.exit(1) + + # Parse input + try: + request = json.loads(raw_input) + # log_stderr("Request parsed successfully") + except json.JSONDecodeError as e: + # log_stderr(f"Failed to parse JSON: {e}") + print(json.dumps({"success": False, "error": f"Invalid JSON input: {e}"})) + sys.exit(1) + + # Extract fields + exec_type = request.get("type", "") + code = request.get("code", "") + sdk_context = request.get("sdk_context") # SDK context data (extracted instead of pickled) + extra_context = request.get("extra_context", {}) # Extra context data + + # log_stderr(f"Execution type: {exec_type}") + # log_stderr(f"Code length: {len(code)} bytes") + # Reconstruct SDK object from context data + sdk = _reconstruct_sdk(sdk_context) + + # log_stderr(f"Has SDK context: {sdk_context is not None}") + + # Validate + if not exec_type: + # log_stderr("ERROR: Missing 'type' field") + print(json.dumps({"success": False, "error": "Missing 'type' field"})) + sys.exit(1) + if not code: + # log_stderr("ERROR: Missing 'code' field") + print(json.dumps({"success": False, "error": "Missing 'code' field"})) + sys.exit(1) + + # Execute with reconstructed SDK + # log_stderr(f"Starting {exec_type} execution...") + if exec_type == "python": + result = execute_python(code, sdk, extra_context) + elif exec_type == "bash": + result = execute_bash(code, extra_context) + elif exec_type == "liquid": + result = execute_liquid(code, sdk, extra_context) + else: + # log_stderr(f"ERROR: Unknown execution type: {exec_type}") + result = {"success": False, "error": f"Unknown type: {exec_type}"} + + # Output result + # (debug logging disabled to keep stderr clean) + print(json.dumps(result)) + + except Exception as e: + import traceback + # Log full traceback to stderr so we can debug + print(f"[EXECUTE_WORKER] CRITICAL ERROR: {type(e).__name__}: {e}", file=sys.stderr, flush=True) + print(f"[EXECUTE_WORKER] Traceback:\n{traceback.format_exc()}", file=sys.stderr, flush=True) + print(json.dumps({"success": False, "error": f"Worker error: {e}"})) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/shuffle_sdk/shuffle_sdk.py b/shuffle_sdk/shuffle_sdk.py index f05ee2c..5f15547 100755 --- a/shuffle_sdk/shuffle_sdk.py +++ b/shuffle_sdk/shuffle_sdk.py @@ -322,6 +322,12 @@ def url_decode(base): return unquote_plus(base) +@shuffle_filters.register +def tojson(a): + """Convert to JSON string""" + return json.dumps(a) + + ### ### ### @@ -2968,9 +2974,13 @@ def get_json_value(execution_data, input_data): # Sending self as it's not a normal function def parse_liquid(template, self): - + + # Save the original template to return on errors + original_template = template errors = False error_msg = "" + error_type = None + print(f"[PARSE_LIQUID] Input template length: {len(template)}", file=sys.stderr, flush=True) try: if len(template) > 10000000: self.logger.info("[DEBUG] Skipping liquid - size too big (%d)" % len(template)) @@ -2983,14 +2993,14 @@ def parse_liquid(template, self): # New pattern fixer to help with bad liquid formats try: - newoutput = self.patternfix_string(template, + newoutput = self.patternfix_string(template, { "{{|": '{{ "" |', }, { r'\{\{\s*\$[^|}]+\s*\|': '{{ "" |', } - , + , inputtype="liquid" ) @@ -2998,67 +3008,44 @@ def parse_liquid(template, self): except Exception as e: print("[ERROR] Failed liquid parsing fix: %s" % e) - all_globals = globals() - all_globals["self"] = self - - try: - all_globals["singul"] = self.singul - all_globals["shuffle"] = self.singul - except Exception as e: - self.logger.info("[ERROR][%s] Failed to set singul in liquid: %s" % (self.current_execution_id, e)) - - run = Liquid(template, mode="wild", from_file=False, filters=shuffle_filters.filters, globals=all_globals) + # Sandboxed rendering: runs in isolated subprocess with + # wild mode, shuffle_filters, and self/singul/shuffle globals + result = self.run_liquid_sandboxed(template) - # Add locals that are missing to globals - ret = run.render() - return ret + if result.get("success"): + # Success - return the rendered template + return result.get("result", template) + else: + # Error from worker - set error and return original + error = True + error_msg = result.get("error", "Liquid rendering failed") + error_type = result.get("error_type", "") except jinja2.exceptions.TemplateNotFound as e: - self.logger.info(f"[ERROR] Liquid Template error: {e}") + print(f"[ERROR] Liquid Template error: {e}", file=sys.stderr, flush=True) error = True + error_type = "TemplateNotFound" error_msg = e - - self.action["parameters"].append({ - "name": "liquid_template_error", - "value": f"There was a Liquid input error (1). Details: {e}", - }) - - self.action_result["action"] = self.action except SyntaxError as e: self.logger.info(f"[ERROR] Liquid Syntax error: {e}") error = True + error_type = "SyntaxError" error_msg = e - self.action["parameters"].append({ - "name": "liquid_python_syntax_error", - "value": f"There was a syntax error in your Liquid input (2). Details: {e}", - }) - - self.action_result["action"] = self.action except IndentationError as e: self.logger.info(f"[ERROR] Liquid IndentationError: {e}") error = True + error_type = "IndentationError" error_msg = e - self.action["parameters"].append({ - "name": "liquid_indentiation_error", - "value": f"There was an indentation error in your Liquid input (2). Details: {e}", - }) - - self.action_result["action"] = self.action except jinja2.exceptions.TemplateSyntaxError as e: self.logger.info(f"[ERROR] Liquid Syntax error: {e}") error = True + error_type = "TemplateSyntaxError" error_msg = e - self.action["parameters"].append({ - "name": "liquid_syntax_error", - "value": f"There was a syntax error in your Liquid input (2). Details: {e}", - }) - - self.action_result["action"] = self.action except json.decoder.JSONDecodeError as e: self.logger.info(f"[ERROR] Liquid JSON Syntax error: {e}") - + replace = False skip_next = False newlines = [] @@ -3068,7 +3055,7 @@ def parse_liquid(template, self): if replace: skip_next = True else: - replace = not replace + replace = not replace if replace == True: thisline.append(line) @@ -3087,14 +3074,9 @@ def parse_liquid(template, self): return parse_liquid(new_template, self) else: error = True + error_type = "JSONDecodeError" error_msg = e - self.action["parameters"].append({ - "name": "liquid_json_error", - "value": f"There was a syntax error in your input JSON(2). This is typically an issue with escaping newlines. Details: {e}", - }) - - self.action_result["action"] = self.action except TypeError as e: try: if "string as left operand" in f"{e}": @@ -3113,61 +3095,72 @@ def parse_liquid(template, self): splititem = "%s \"%s\"" % (additem, splititem.strip()) parsed_template = template.replace(split_left[0], splititem) - run = Liquid(parsed_template, mode="wild", from_file=False) - return run.render(**globals()) + retry_result = self.run_liquid_sandboxed(parsed_template) + if retry_result.get("success"): + return retry_result.get("result", template) + else: + raise Exception(retry_result.get("error", "Liquid retry failed")) except Exception as e: - self.action["parameters"].append({ - "name": "liquid_general_error", - "value": f"There was general error Liquid input (2). Details: {e}", - }) - - self.action_result["action"] = self.action - #return template + self.logger.info(f"[ERROR] Liquid TypeError error: {e}") + error = True + error_type = "TypeError" + error_msg = e - self.logger.info(f"[ERROR] Liquid TypeError error: {e}") - error = True - error_msg = e except Exception as e: + print(f"[PARSE_LIQUID] CAUGHT EXCEPTION: {type(e).__name__}: {e}", file=sys.stderr, flush=True) self.logger.info(f"[ERROR] General exception for liquid: {e}") error = True + error_type = type(e).__name__ error_msg = e + print(f"[PARSE_LIQUID] Set error=True, error_msg={error_msg}", file=sys.stderr, flush=True) - self.action["parameters"].append({ - "name": "liquid_general_exception", - "value": f"There was general exception Liquid input (2). Details: {e}", - }) - self.action_result["action"] = self.action - - if "fmt" in error_msg and "liquid_date" in error_msg: - return template + if "fmt" in str(error_msg) and "liquid_date" in str(error_msg): + return original_template self.logger.info("Done in liquid") + print(f"[PARSE_LIQUID] Before error check: error={error}, error_msg={error_msg}", file=sys.stderr, flush=True) if error == True: - self.action_result["status"] = "FAILURE" - data = { - "success": False, - "reason": f"Failed to parse LiquidPy: {error_msg}", - "input": template, - } + # Restore backwards compatibility: add specific error parameter names + # based on error type for workflows that depend on these + error_type_name = None + + if error_type == "TemplateNotFound": + error_type_name = "liquid_template_error" + elif error_type == "SyntaxError": + error_type_name = "liquid_python_syntax_error" + elif error_type == "IndentationError": + error_type_name = "liquid_indentiation_error" # Keep the typo for compatibility + elif error_type == "TemplateSyntaxError": + error_type_name = "liquid_syntax_error" + elif error_type == "JSONDecodeError": + error_type_name = "liquid_json_error" + elif error_type == "TypeError": + error_type_name = "liquid_general_error" + else: + error_type_name = "liquid_general_exception" - try: - self.action_result["result"] = json.dumps(data) - except Exception as e: - self.action_result["result"] = f"Failed to parse LiquidPy: {error_msg}" + # Add the error parameter with specific name + if error_msg: + print(f"[PARSE_LIQUID] ERROR: {error_msg}", file=sys.stderr, flush=True) + self.action["parameters"].append({ + "name": error_type_name, + "value": f"{error_msg}", + }) - self.action_result["completed_at"] = int(time.time_ns()) - self.send_result(self.action_result, headers, stream_path) + # Also add generic error for workflows that expect it + self.action["parameters"].append({ + "name": "error", + "value": f"{error_msg}", + }) - self.logger.info(f"[ERROR] Sent FAILURE response to backend due to : {e}") - - if runtime == "run": - return template - else: - os.exit() + # Return the original template on error so user can see what they tried + print(f"[PARSE_LIQUID] Returning original template (length: {len(original_template)})", file=sys.stderr, flush=True) + return original_template + print(f"[PARSE_LIQUID] Returning rendered template (length: {len(template)})", file=sys.stderr, flush=True) return template def recurse_cleanup_script(data): @@ -4378,6 +4371,42 @@ async def parse_value(newres): return + # ========================================================================= + # SANDBOXED EXECUTION + # ========================================================================= + + def run_python_sandboxed(self, code): + """ + Execute Python code in isolated subprocess. + + The sandboxed code has access to 'self' (this SDK instance). + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + """ + from . import sandbox + return sandbox.run_python(code, self) + + def run_bash_sandboxed(self, code, shuffle_input=None): + """ + Execute bash command in isolated subprocess. + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + """ + from . import sandbox + return sandbox.run_bash(code, self, shuffle_input) + + def run_liquid_sandboxed(self, template): + """ + Render Liquid template in isolated subprocess. + + Returns: + {"success": True, "result": ...} or {"success": False, "error": ...} + """ + from . import sandbox + return sandbox.run_liquid(template, self) + @classmethod def run(cls, action=""): logging.basicConfig(format="{asctime} - {name} - {levelname}:{message}", style='{') From f4430545195ae71ba546f6f330aa3d3aa576301d Mon Sep 17 00:00:00 2001 From: Aditya <60684641+0x0elliot@users.noreply.github.com> Date: Fri, 27 Feb 2026 03:27:35 +0530 Subject: [PATCH 2/2] fix: increasing MAX_OUTPUT_BYTES to 250 MBs --- shuffle_sdk/sandbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shuffle_sdk/sandbox.py b/shuffle_sdk/sandbox.py index cf46158..7cf7b31 100644 --- a/shuffle_sdk/sandbox.py +++ b/shuffle_sdk/sandbox.py @@ -48,7 +48,7 @@ # But set to 55 seconds (5 less) to give worker time before parent timeout _env_timeout = os.getenv("SHUFFLE_APP_SDK_TIMEOUT") TIMEOUT_SECONDS = int(_env_timeout) - 5 if _env_timeout else 55 -MAX_OUTPUT_BYTES = 10 * 1024 * 1024 # 10MB +MAX_OUTPUT_BYTES = 250 * 1024 * 1024 # 250MB # Print to stderr at module load time to ensure visibility _msg = "=" * 80