- Version: 1.0
- Date: 2025-08-16
- Author: Winston, System Architect
- Status: Design Phase
- Related Documents: PyFlowGraph PRD v1.0
This document defines the technical architecture for implementing Command Pattern-based undo/redo functionality and Shared Process Execution system in PyFlowGraph, positioning it as a professional workflow automation platform. The design maintains backward compatibility with existing PySide6 architecture while delivering enterprise-grade automation capabilities including high-performance data processing, API integrations, and workflow orchestration.
Key Architecture Decisions:
- Command Pattern implementation integrated into existing NodeGraph operations
- Shared Process Execution Model replacing isolated subprocess-per-node for 10-100x performance gains
- Direct object passing between nodes without JSON serialization overhead
- Memory-efficient command history with configurable depth (default 50, max 200)
- Backward-compatible file format extensions preserving existing .md workflow
- Extensible node type system for integration connectors (HTTP, Database, Cloud)
- Event-driven architecture supporting webhooks and real-time data processing
PyFlowGraph follows a layered desktop application architecture built on PySide6:
┌─────────────────────────────────────────────────────────────┐
│ Presentation Layer │
├─────────────────────────────────────────────────────────────┤
│ NodeEditorWindow (QMainWindow) │
│ ├── NodeEditorView (QGraphicsView) │
│ ├── CodeEditorDialog (Modal) │
│ ├── NodePropertiesDialog │
│ └── Various Dock Widgets │
├─────────────────────────────────────────────────────────────┤
│ Business Logic Layer │
├─────────────────────────────────────────────────────────────┤
│ NodeGraph (QGraphicsScene) │
│ ├── Node Management (create_node, remove_node) │
│ ├── Connection Management (create_connection, remove_connection) │
│ ├── Serialization (serialize, deserialize) │
│ └── Clipboard Operations (copy_selected, paste) │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer │
├─────────────────────────────────────────────────────────────┤
│ Node (QGraphicsItem) - Pin generation from Python parsing │
│ Connection (QGraphicsItem) - Bezier curve connections │
│ Pin (QGraphicsItem) - Type-safe connection points │
│ RerouteNode (QGraphicsItem) - Connection organization │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
├─────────────────────────────────────────────────────────────┤
│ SharedProcessManager - Shared process pool for execution │
│ GraphExecutor - Node execution coordination │
│ FlowFormat - Markdown serialization │
│ EventSystem - Event-driven execution │
│ FileOperations - File I/O management │
└─────────────────────────────────────────────────────────────┘
Primary Integration Point: NodeGraph (src/node_graph.py)
- Central hub for all graph operations
- Current methods provide natural command implementation points:
create_node()→ CreateNodeCommandremove_node()→ DeleteNodeCommandcreate_connection()→ CreateConnectionCommandremove_connection()→ DeleteConnectionCommand
Secondary Integration Points:
- NodeEditorWindow: Menu integration, keyboard shortcuts, UI controls
- FlowFormat: File format extensions for group metadata
- Node/Connection classes: Enhanced serialization for state preservation
The Command Pattern implementation provides a robust, extensible foundation for undo/redo functionality while integrating seamlessly with existing NodeGraph operations.
┌─────────────────────────────────────────────────────────────┐
│ Command Pattern Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ CommandBase │ │ CommandHistory │ │
│ │ (Abstract) │ │ (Manager) │ │
│ │ │ │ │ │
│ │ + execute() │ │ - commands[] │ │
│ │ + undo() │ │ - current_index │ │
│ │ + get_desc() │ │ - max_depth │ │
│ └─────────────────┘ │ │ │
│ ▲ │ + execute_cmd() │ │
│ │ │ + undo() │ │
│ │ │ + redo() │ │
│ ┌─────────────────┐ │ + clear() │ │
│ │ Concrete Commands│ └─────────────────┘ │
│ │ │ │
│ │ CreateNodeCmd │ ┌─────────────────┐ │
│ │ DeleteNodeCmd │ │ NodeGraph │ │
│ │ MoveNodeCmd │ │ (Modified) │ │
│ │ CreateConnCmd │ │ │ │
│ │ DeleteConnCmd │ │ + command_hist │ │
│ │ PropertyCmd │ │ + execute_cmd() │ │
│ │ CodeChangeCmd │ │ │ │
│ │ CompositeCmd │ │ [integrate all │ │
│ │ GroupCmd │ │ operations] │ │
│ │ UngroupCmd │ └─────────────────┘ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
# src/commands/command_base.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class CommandBase(ABC):
"""Abstract base class for all undoable commands."""
def __init__(self, description: str):
self.description = description
self.timestamp = time.time()
self._executed = False
@abstractmethod
def execute(self) -> bool:
"""Execute the command. Returns True if successful."""
pass
@abstractmethod
def undo(self) -> bool:
"""Undo the command. Returns True if successful."""
pass
def get_description(self) -> str:
"""Get human-readable description for UI display."""
return self.description
def can_merge_with(self, other: 'CommandBase') -> bool:
"""Check if this command can be merged with another."""
return False
def merge_with(self, other: 'CommandBase') -> Optional['CommandBase']:
"""Merge with another command if possible."""
return None# src/commands/command_history.py
from typing import List, Optional
from .command_base import CommandBase
class CommandHistory:
"""Manages command execution history and undo/redo operations."""
def __init__(self, max_depth: int = 50):
self.commands: List[CommandBase] = []
self.current_index: int = -1
self.max_depth = max_depth
self._memory_usage = 0
self._memory_limit = 50 * 1024 * 1024 # 50MB as per NFR3
def execute_command(self, command: CommandBase) -> bool:
"""Execute a command and add to history."""
if not command.execute():
return False
# Remove any commands ahead of current position
if self.current_index < len(self.commands) - 1:
self.commands = self.commands[:self.current_index + 1]
# Add command to history
self.commands.append(command)
self.current_index += 1
# Maintain depth limit and memory constraints
self._enforce_limits()
return True
def undo(self) -> Optional[str]:
"""Undo the last command. Returns description if successful."""
if not self.can_undo():
return None
command = self.commands[self.current_index]
if command.undo():
self.current_index -= 1
return command.get_description()
return None
def redo(self) -> Optional[str]:
"""Redo the next command. Returns description if successful."""
if not self.can_redo():
return None
command = self.commands[self.current_index + 1]
if command.execute():
self.current_index += 1
return command.get_description()
return None
def can_undo(self) -> bool:
return self.current_index >= 0
def can_redo(self) -> bool:
return self.current_index < len(self.commands) - 1
def _enforce_limits(self):
"""Enforce depth and memory limits."""
# Remove oldest commands if over depth limit
while len(self.commands) > self.max_depth:
removed = self.commands.pop(0)
self.current_index -= 1
self._memory_usage -= self._estimate_command_size(removed)
# Enforce memory limit (NFR3)
while (self._memory_usage > self._memory_limit and
len(self.commands) > 0):
removed = self.commands.pop(0)
self.current_index -= 1
self._memory_usage -= self._estimate_command_size(removed)# src/commands/node_commands.py
class CreateNodeCommand(CommandBase):
"""Command for creating nodes with full state preservation."""
def __init__(self, node_graph, node_type: str, position: QPointF,
node_id: str = None):
super().__init__(f"Create {node_type} node")
self.node_graph = node_graph
self.node_type = node_type
self.position = position
self.node_id = node_id or self._generate_id()
self.created_node = None
def execute(self) -> bool:
"""Create the node and add to graph."""
self.created_node = self.node_graph._create_node_internal(
self.node_type, self.position, self.node_id)
return self.created_node is not None
def undo(self) -> bool:
"""Remove the created node."""
if self.created_node and self.created_node in self.node_graph.nodes:
self.node_graph._remove_node_internal(self.created_node)
return True
return False
class DeleteNodeCommand(CommandBase):
"""Command for deleting nodes with complete state preservation."""
def __init__(self, node_graph, node):
super().__init__(f"Delete {node.title}")
self.node_graph = node_graph
self.node = node
self.node_state = None
self.affected_connections = []
def execute(self) -> bool:
"""Delete node after preserving complete state."""
# Preserve full node state
self.node_state = {
'id': self.node.id,
'position': self.node.pos(),
'title': self.node.title,
'code': self.node.code,
'properties': self.node.get_properties(),
'pin_data': self.node.serialize_pins()
}
# Preserve affected connections
self.affected_connections = []
for conn in list(self.node_graph.connections):
if (conn.output_pin.node == self.node or
conn.input_pin.node == self.node):
self.affected_connections.append({
'connection': conn,
'output_node_id': conn.output_pin.node.id,
'output_pin_index': conn.output_pin.index,
'input_node_id': conn.input_pin.node.id,
'input_pin_index': conn.input_pin.index
})
# Perform deletion
self.node_graph._remove_node_internal(self.node)
return True
def undo(self) -> bool:
"""Restore node with complete state and reconnections."""
# Recreate node with preserved state
restored_node = self.node_graph._create_node_internal(
self.node_state['title'],
self.node_state['position'],
self.node_state['id']
)
if not restored_node:
return False
# Restore node properties
restored_node.code = self.node_state['code']
restored_node.set_properties(self.node_state['properties'])
restored_node.deserialize_pins(self.node_state['pin_data'])
# Restore connections
for conn_data in self.affected_connections:
output_node = self.node_graph.get_node_by_id(
conn_data['output_node_id'])
input_node = self.node_graph.get_node_by_id(
conn_data['input_node_id'])
if output_node and input_node:
self.node_graph._create_connection_internal(
output_node.output_pins[conn_data['output_pin_index']],
input_node.input_pins[conn_data['input_pin_index']]
)
return True# src/commands/composite_command.py
class CompositeCommand(CommandBase):
"""Command that groups multiple operations as single undo unit."""
def __init__(self, description: str, commands: List[CommandBase]):
super().__init__(description)
self.commands = commands
self.executed_commands = []
def execute(self) -> bool:
"""Execute all commands, rolling back on failure."""
self.executed_commands = []
for command in self.commands:
if command.execute():
self.executed_commands.append(command)
else:
# Rollback executed commands
for executed in reversed(self.executed_commands):
executed.undo()
return False
return True
def undo(self) -> bool:
"""Undo all executed commands in reverse order."""
success = True
for command in reversed(self.executed_commands):
if not command.undo():
success = False
return successThe Single Process Execution Architecture replaces the current isolated subprocess-per-node model with a single persistent Python interpreter, delivering 100-1000x performance improvements for ML/data science workflows while respecting GPU memory constraints through intelligent sequential scheduling.
Node A → [Subprocess A] → JSON → Node B → [Subprocess B] → JSON → Node C
↑ ↑ ↑ ↑
100ms Serialize 100ms Serialize
startup overhead startup overhead
Node A → [Same Python Interpreter] → Direct Reference → Node B → [Same Interpreter] → Node C
↑ ↑ ↑ ↑
0ms No overhead Zero-copy 0ms
startup Same namespace Native objects startup
# src/execution/single_process_executor.py
class SingleProcessExecutor:
"""Manages execution in a single persistent Python interpreter."""
def __init__(self):
self.namespace: Dict[str, Any] = {} # Persistent namespace
self.object_store: Dict[str, Any] = {} # Direct object storage
self.memory_monitor = MemoryMonitor()
self.gpu_monitor = GPUMonitor()
self.execution_queue = ExecutionQueue()
def execute_node(self, node: Node, inputs: Dict[str, Any]) -> Any:
"""Execute node directly in current interpreter."""
# Check memory/GPU constraints before execution
self._check_resources(node, inputs)
# Prepare execution environment
exec_globals = {**self.namespace, **inputs}
# Execute node code directly
try:
exec(node.code, exec_globals)
result = exec_globals[node.function_name](**inputs)
# Store result directly (no serialization)
self._store_result(node, result)
return result
except Exception as e:
self._handle_execution_error(node, e)
raise
def _check_resources(self, node: Node, inputs: Dict[str, Any]):
"""Check if sufficient resources available before execution."""
# Estimate memory requirements
memory_required = self._estimate_memory_usage(inputs)
# Check GPU memory if using GPU tensors
if self._uses_gpu(inputs):
gpu_memory_required = self._estimate_gpu_memory(inputs)
if not self.gpu_monitor.has_available_memory(gpu_memory_required):
self._cleanup_gpu_memory()
def _store_result(self, node: Node, result: Any):
"""Store result directly in object store."""
# No serialization - direct Python object reference
self.object_store[f"node_{node.id}_result"] = result
# Update persistent namespace with common imports/variables
if hasattr(result, '__module__'):
module_name = result.__module__
if module_name not in self.namespace:
self.namespace[module_name] = __import__(module_name)# src/execution/sequential_scheduler.py
class SequentialScheduler:
"""GPU-aware sequential execution scheduler."""
def __init__(self, executor: SingleProcessExecutor):
self.executor = executor
self.dependency_graph = DependencyGraph()
self.resource_monitor = ResourceMonitor()
def schedule_execution(self, nodes: List[Node]) -> ExecutionPlan:
"""Create execution plan respecting dependencies and resources."""
# Build dependency graph
execution_order = self._topological_sort(nodes)
# Add resource constraints
execution_plan = ExecutionPlan()
for node in execution_order:
# Check if node requires GPU resources
if self._is_gpu_intensive(node):
execution_plan.add_gpu_checkpoint(node)
execution_plan.add_node(node)
return execution_plan
def _is_gpu_intensive(self, node: Node) -> bool:
"""Detect if node will use significant GPU memory."""
gpu_keywords = ['torch.', 'cuda', 'gpu', 'tensorflow', 'jax.device']
return any(keyword in node.code.lower() for keyword in gpu_keywords)
def execute_plan(self, plan: ExecutionPlan) -> Dict[Node, Any]:
"""Execute nodes sequentially according to plan."""
results = {}
for step in plan.steps:
if step.is_gpu_checkpoint:
# Clean up GPU memory before heavy operation
self._cleanup_gpu_memory()
result = self.executor.execute_node(step.node, step.inputs)
results[step.node] = result
return results# src/execution/gpu_memory_manager.py
class GPUMemoryManager:
"""Manages GPU memory for optimal utilization."""
def __init__(self):
self.device_monitors = {}
self.memory_pool = {}
self.cleanup_strategies = [
TensorCleanupStrategy(),
ModelCleanupStrategy(),
CacheCleanupStrategy()
]
def check_available_memory(self, required_bytes: int) -> bool:
"""Check if sufficient GPU memory available."""
try:
import torch
if torch.cuda.is_available():
free_memory = torch.cuda.get_device_properties(0).total_memory
free_memory -= torch.cuda.memory_allocated(0)
return free_memory >= required_bytes
except ImportError:
pass
return True # Assume available if no GPU libs
def cleanup_memory(self):
"""Aggressive GPU memory cleanup."""
for strategy in self.cleanup_strategies:
strategy.cleanup()
# Force garbage collection
import gc
gc.collect()
# PyTorch specific cleanup
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
except ImportError:
pass| Operation | Current (Subprocess) | New (Single Process) | Improvement |
|---|---|---|---|
| Small node execution | 100-200ms | <1ms | 100-200x faster |
| Large tensor passing | 500ms-2s | 0ms (direct reference) | ∞x faster |
| ML pipeline (10 nodes) | 5-10 seconds | 10-50ms | 100-1000x faster |
| PyTorch model inference | 2-5 seconds overhead | 0ms overhead | No overhead |
- Direct object references: No copying or serialization ever
- Persistent namespace: Imports and common objects stay loaded
- GPU memory optimization: Intelligent cleanup prevents OOM
- Shared computation graphs: ML frameworks can optimize across nodes
- Clean break from subprocess model - no backward compatibility
- All data passing uses direct Python object references
- Existing graphs require one-time conversion (automated)
- Focus on maximum performance rather than compatibility
- Sequential execution prevents VRAM conflicts
- Automatic cleanup before memory-intensive operations
- Real-time monitoring prevents out-of-memory crashes
- Support for multi-GPU workloads with device affinity
The Node Grouping system creates a hierarchical abstraction layer enabling management of complex graphs through collapsible containers while maintaining full compatibility with existing execution and serialization systems.
┌─────────────────────────────────────────────────────────────┐
│ Node Grouping Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ NodeGroup │ │ GroupManager │ │
│ │ (QGraphicsItem) │ │ (Controller) │ │
│ │ │ │ │ │
│ │ + child_nodes[] │ │ + groups[] │ │
│ │ + interface_pins│ │ + depth_limit │ │
│ │ + is_collapsed │ │ │ │
│ │ + group_bounds │ │ + create_group()│ │
│ │ │ │ + expand_group()│ │
│ │ + collapse() │ │ + validate_sel()│ │
│ │ + expand() │ │ + check_cycles()│ │
│ │ + generate_pins()│ │ + save_template()│ │
│ └─────────────────┘ └─────────────────┘ │
│ ▲ │
│ │ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ GroupPin │ │ GroupTemplate │ │
│ │ (Special) │ │ (Serialized) │ │
│ │ │ │ │ │
│ │ + internal_conn │ │ + metadata │ │
│ │ + external_conn │ │ + node_data[] │ │
│ │ + pin_type │ │ + interface_def │ │
│ └─────────────────┘ │ + version │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
# src/grouping/node_group.py
from PySide6.QtWidgets import QGraphicsItemGroup, QGraphicsItem
from PySide6.QtCore import QRectF, QPointF
from typing import List, Dict, Any, Optional
class NodeGroup(QGraphicsItemGroup):
"""Hierarchical container for organizing nodes into manageable groups."""
def __init__(self, name: str, description: str = ""):
super().__init__()
self.group_id = self._generate_id()
self.name = name
self.description = description
self.is_collapsed = False
self.depth_level = 0
self.max_depth = 10 # NFR7 requirement
# Child management
self.child_nodes: List[QGraphicsItem] = []
self.child_groups: List['NodeGroup'] = []
self.parent_group: Optional['NodeGroup'] = None
# Interface pins for external connectivity
self.interface_pins: List['GroupPin'] = []
self.external_connections: List[Dict] = []
# Visual properties
self.group_bounds = QRectF()
self.collapsed_size = QSizeF(200, 100)
self.expanded_bounds = QRectF()
self.setFlag(QGraphicsItem.ItemIsMovable, True)
self.setFlag(QGraphicsItem.ItemIsSelectable, True)
def add_child_node(self, node) -> bool:
"""Add node to group with validation."""
if self._would_create_cycle(node):
return False
self.child_nodes.append(node)
self.addToGroup(node)
node.parent_group = self
self._update_bounds()
return True
def add_child_group(self, group: 'NodeGroup') -> bool:
"""Add nested group with depth validation."""
if self.depth_level + 1 >= self.max_depth:
return False
if self._would_create_cycle(group):
return False
self.child_groups.append(group)
group.parent_group = self
group.depth_level = self.depth_level + 1
self.addToGroup(group)
self._update_bounds()
return True
def collapse(self) -> bool:
"""Collapse group to single node representation."""
if self.is_collapsed:
return True
# Store expanded positions
self.expanded_bounds = self.group_bounds
for node in self.child_nodes:
node.expanded_position = node.pos()
# Generate interface pins
self._generate_interface_pins()
# Hide internal nodes
for node in self.child_nodes:
node.setVisible(False)
for group in self.child_groups:
group.setVisible(False)
# Set collapsed visual state
self.is_collapsed = True
self._update_collapsed_appearance()
return True
def expand(self) -> bool:
"""Expand group to show internal nodes."""
if not self.is_collapsed:
return True
# Restore node positions
for node in self.child_nodes:
if hasattr(node, 'expanded_position'):
node.setPos(node.expanded_position)
node.setVisible(True)
for group in self.child_groups:
group.setVisible(True)
# Restore interface connections
self._restore_internal_connections()
self.is_collapsed = False
self._update_expanded_appearance()
return True
def _generate_interface_pins(self):
"""Analyze external connections and generate interface pins."""
self.interface_pins.clear()
self.external_connections.clear()
input_types = {}
output_types = {}
# Analyze all connections crossing group boundary
for node in self.child_nodes:
for pin in node.input_pins:
for conn in pin.connections:
if conn.output_pin.node not in self.child_nodes:
# External input connection
pin_type = pin.pin_type
if pin_type not in input_types:
input_types[pin_type] = []
input_types[pin_type].append({
'connection': conn,
'internal_pin': pin,
'external_pin': conn.output_pin
})
for pin in node.output_pins:
for conn in pin.connections:
if conn.input_pin.node not in self.child_nodes:
# External output connection
pin_type = pin.pin_type
if pin_type not in output_types:
output_types[pin_type] = []
output_types[pin_type].append({
'connection': conn,
'internal_pin': pin,
'external_pin': conn.input_pin
})
# Create interface pins
for pin_type, connections in input_types.items():
interface_pin = GroupPin(self, 'input', pin_type, connections)
self.interface_pins.append(interface_pin)
for pin_type, connections in output_types.items():
interface_pin = GroupPin(self, 'output', pin_type, connections)
self.interface_pins.append(interface_pin)
def serialize(self) -> Dict[str, Any]:
"""Serialize group for file persistence."""
return {
'group_id': self.group_id,
'name': self.name,
'description': self.description,
'is_collapsed': self.is_collapsed,
'depth_level': self.depth_level,
'group_bounds': {
'x': self.group_bounds.x(),
'y': self.group_bounds.y(),
'width': self.group_bounds.width(),
'height': self.group_bounds.height()
},
'child_node_ids': [node.id for node in self.child_nodes],
'child_group_ids': [group.group_id for group in self.child_groups],
'interface_pins': [pin.serialize() for pin in self.interface_pins],
'external_connections': self.external_connections
}# src/grouping/group_pin.py
class GroupPin:
"""Special pin type for group external interface."""
def __init__(self, parent_group: NodeGroup, direction: str,
pin_type: str, connections: List[Dict]):
self.parent_group = parent_group
self.direction = direction # 'input' or 'output'
self.pin_type = pin_type
self.internal_connections = connections
self.position = QPointF()
self.external_connection = None
def connect_external(self, external_pin) -> bool:
"""Connect this interface pin to external node."""
if not self._validate_connection(external_pin):
return False
self.external_connection = external_pin
# Route through to internal connections
for conn_data in self.internal_connections:
internal_pin = conn_data['internal_pin']
original_conn = conn_data['connection']
# Create new connection from external pin to internal pin
if self.direction == 'input':
new_conn = Connection(external_pin, internal_pin)
else:
new_conn = Connection(internal_pin, external_pin)
# Update node graph
self.parent_group.scene().create_connection(new_conn)
return True
def serialize(self) -> Dict[str, Any]:
"""Serialize interface pin data."""
return {
'direction': self.direction,
'pin_type': self.pin_type,
'position': {'x': self.position.x(), 'y': self.position.y()},
'internal_connections': [
{
'node_id': conn['internal_pin'].node.id,
'pin_index': conn['internal_pin'].index,
'external_node_id': conn['external_pin'].node.id,
'external_pin_index': conn['external_pin'].index
}
for conn in self.internal_connections
]
}# src/grouping/group_manager.py
class GroupManager:
"""Central controller for all group operations and validation."""
def __init__(self, node_graph):
self.node_graph = node_graph
self.groups: List[NodeGroup] = []
self.max_depth = 10
self.group_templates: Dict[str, 'GroupTemplate'] = {}
def create_group(self, selected_nodes: List, name: str,
description: str = "") -> Optional[NodeGroup]:
"""Create new group from selected nodes with validation."""
# Validation (FR5)
if not self._validate_group_creation(selected_nodes):
return None
# Create group
group = NodeGroup(name, description)
# Add nodes to group
for node in selected_nodes:
if not group.add_child_node(node):
return None
# Generate interface pins (FR6)
group._generate_interface_pins()
# Add to management
self.groups.append(group)
self.node_graph.addItem(group)
return group
def expand_group(self, group: NodeGroup) -> bool:
"""Expand group with position restoration (FR8)."""
if not group.is_collapsed:
return True
return group.expand()
def save_group_template(self, group: NodeGroup,
metadata: Dict[str, Any]) -> bool:
"""Save group as reusable template (FR9)."""
template = GroupTemplate(group, metadata)
if not template.validate():
return False
template_id = f"{metadata['name']}_{metadata['version']}"
self.group_templates[template_id] = template
# Persist to file system
return template.save_to_file()
def _validate_group_creation(self, nodes: List) -> bool:
"""Validate group creation preventing circular dependencies."""
if len(nodes) < 2:
return False
# Check for existing group membership conflicts
for node in nodes:
if hasattr(node, 'parent_group') and node.parent_group:
return False
# Check for circular dependencies
return not self._would_create_circular_dependency(nodes)
def _would_create_circular_dependency(self, nodes: List) -> bool:
"""Check if grouping would create circular dependency."""
# Implement cycle detection algorithm
# This is simplified - real implementation would use DFS
visited = set()
for node in nodes:
if self._has_cycle_from_node(node, visited, nodes):
return True
return FalseThe architecture leverages existing PySide6 patterns while adding new UI components for undo/redo and grouping functionality.
# src/node_editor_window.py - Enhanced menu system
class NodeEditorWindow(QMainWindow):
def __init__(self):
super().__init__()
self.command_history = CommandHistory()
self.group_manager = GroupManager(self.node_graph)
self._setup_enhanced_menus()
def _setup_enhanced_menus(self):
"""Setup menus with undo/redo and grouping support."""
edit_menu = self.menuBar().addMenu("Edit")
# Undo/Redo actions
self.undo_action = QAction("Undo", self)
self.undo_action.setShortcut(QKeySequence.Undo)
self.undo_action.triggered.connect(self.undo_operation)
self.redo_action = QAction("Redo", self)
self.redo_action.setShortcut(QKeySequence.Redo)
self.redo_action.triggered.connect(self.redo_operation)
edit_menu.addAction(self.undo_action)
edit_menu.addAction(self.redo_action)
edit_menu.addSeparator()
# Grouping actions
self.group_action = QAction("Group Selected", self)
self.group_action.setShortcut(QKeySequence("Ctrl+G"))
self.group_action.triggered.connect(self.create_group)
self.ungroup_action = QAction("Ungroup", self)
self.ungroup_action.setShortcut(QKeySequence("Ctrl+Shift+G"))
self.ungroup_action.triggered.connect(self.ungroup_selected)
edit_menu.addAction(self.group_action)
edit_menu.addAction(self.ungroup_action)
def undo_operation(self):
"""Execute undo with UI feedback."""
description = self.command_history.undo()
if description:
self.statusBar().showMessage(f"Undid: {description}", 2000)
self._update_menu_states()
def redo_operation(self):
"""Execute redo with UI feedback."""
description = self.command_history.redo()
if description:
self.statusBar().showMessage(f"Redid: {description}", 2000)
self._update_menu_states()
def _update_menu_states(self):
"""Update menu item enabled states."""
self.undo_action.setEnabled(self.command_history.can_undo())
self.redo_action.setEnabled(self.command_history.can_redo())
# Update descriptions with next operation
if self.command_history.can_undo():
next_undo = self.command_history.get_undo_description()
self.undo_action.setText(f"Undo {next_undo}")
else:
self.undo_action.setText("Undo")# src/ui/undo_history_dialog.py
class UndoHistoryDialog(QDialog):
"""Visual undo history timeline (FR4)."""
def __init__(self, command_history: CommandHistory, parent=None):
super().__init__(parent)
self.command_history = command_history
self.setWindowTitle("Undo History")
self.setModal(True)
self._setup_ui()
def _setup_ui(self):
layout = QVBoxLayout(self)
# History list
self.history_list = QListWidget()
self._populate_history()
layout.addWidget(self.history_list)
# Buttons
button_layout = QHBoxLayout()
self.undo_to_button = QPushButton("Undo To Selected")
self.undo_to_button.clicked.connect(self._undo_to_selected)
button_layout.addWidget(self.undo_to_button)
close_button = QPushButton("Close")
close_button.clicked.connect(self.accept)
button_layout.addWidget(close_button)
layout.addLayout(button_layout)
# src/ui/group_creation_dialog.py
class GroupCreationDialog(QDialog):
"""Dialog for group creation with metadata input."""
def __init__(self, selected_nodes: List, parent=None):
super().__init__(parent)
self.selected_nodes = selected_nodes
self.setWindowTitle("Create Node Group")
self.setModal(True)
self._setup_ui()
def _setup_ui(self):
layout = QFormLayout(self)
# Group name
self.name_edit = QLineEdit()
self.name_edit.setText(f"Group_{len(self.selected_nodes)}_nodes")
layout.addRow("Name:", self.name_edit)
# Description
self.description_edit = QTextEdit()
self.description_edit.setMaximumHeight(80)
layout.addRow("Description:", self.description_edit)
# Preview selected nodes
preview_label = QLabel(f"Selected Nodes ({len(self.selected_nodes)}):")
layout.addRow(preview_label)
node_list = QListWidget()
node_list.setMaximumHeight(100)
for node in self.selected_nodes:
node_list.addItem(f"{node.title} (ID: {node.id})")
layout.addRow(node_list)
# Buttons
button_box = QDialogButtonBox(
QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addRow(button_box)# src/node_graph.py - Modified for command integration
class NodeGraph(QGraphicsScene):
def __init__(self):
super().__init__()
self.command_history = CommandHistory()
self.group_manager = GroupManager(self)
# ... existing initialization
def execute_command(self, command: CommandBase) -> bool:
"""Central command execution with history tracking."""
success = self.command_history.execute_command(command)
if success:
self.commandExecuted.emit(command.get_description())
return success
def create_node(self, node_type: str, position: QPointF) -> bool:
"""Create node via command pattern."""
command = CreateNodeCommand(self, node_type, position)
return self.execute_command(command)
def remove_node(self, node) -> bool:
"""Remove node via command pattern."""
command = DeleteNodeCommand(self, node)
return self.execute_command(command)
def create_group_from_selection(self) -> Optional[NodeGroup]:
"""Create group from currently selected nodes."""
selected_nodes = [item for item in self.selectedItems()
if isinstance(item, Node)]
if len(selected_nodes) < 2:
return None
# Show group creation dialog
dialog = GroupCreationDialog(selected_nodes)
if dialog.exec() == QDialog.Accepted:
command = CreateGroupCommand(
self.group_manager,
selected_nodes,
dialog.name_edit.text(),
dialog.description_edit.toPlainText()
)
if self.execute_command(command):
return command.created_group
return NoneThe architecture addresses specific performance requirements (NFR1-NFR3) through targeted optimization strategies across all system layers.
# src/commands/performance_optimizations.py
class OptimizedCommandHistory(CommandHistory):
"""Performance-optimized command history implementation."""
def __init__(self, max_depth: int = 50):
super().__init__(max_depth)
self._memory_monitor = MemoryMonitor(50 * 1024 * 1024) # 50MB limit
self._execution_timer = ExecutionTimer()
def execute_command(self, command: CommandBase) -> bool:
"""Execute with performance monitoring."""
with self._execution_timer.measure() as timer:
success = super().execute_command(command)
# Verify NFR1: Individual operations < 100ms
if timer.elapsed_ms() > 100:
logger.warning(
f"Command {command.get_description()} exceeded 100ms: "
f"{timer.elapsed_ms():.1f}ms"
)
return success
def _estimate_command_size(self, command: CommandBase) -> int:
"""Accurate memory estimation for commands."""
if isinstance(command, DeleteNodeCommand):
# Estimate based on node complexity
node_state = command.node_state
base_size = 1024 # Base overhead
code_size = len(node_state.get('code', '')) * 2 # Unicode
props_size = len(str(node_state.get('properties', {}))) * 2
connections_size = len(command.affected_connections) * 200
return base_size + code_size + props_size + connections_size
elif isinstance(command, CompositeCommand):
return sum(self._estimate_command_size(cmd)
for cmd in command.commands)
else:
return 512 # Conservative estimate for simple commands
class MemoryMonitor:
"""Real-time memory usage monitoring."""
def __init__(self, limit_bytes: int):
self.limit_bytes = limit_bytes
self.current_usage = 0
def check_limit(self) -> bool:
"""Check if current usage exceeds limit."""
return self.current_usage > self.limit_bytes
def add_usage(self, bytes_used: int):
"""Track additional memory usage."""
self.current_usage += bytes_used
def remove_usage(self, bytes_freed: int):
"""Track freed memory."""
self.current_usage = max(0, self.current_usage - bytes_freed)# src/grouping/performance_optimized_group.py
class PerformanceOptimizedNodeGroup(NodeGroup):
"""Group implementation optimized for large node counts."""
def __init__(self, name: str, description: str = ""):
super().__init__(name, description)
self._cached_bounds = None
self._bounds_dirty = True
self._pin_generation_cache = {}
def add_child_node(self, node) -> bool:
"""Optimized node addition with deferred updates."""
start_time = time.perf_counter()
success = super().add_child_node(node)
if success:
# Mark caches as dirty instead of immediate recalculation
self._bounds_dirty = True
self._invalidate_pin_cache()
# Verify NFR2: 10ms per node for creation
elapsed_ms = (time.perf_counter() - start_time) * 1000
if elapsed_ms > 10:
logger.warning(
f"Node addition exceeded 10ms target: {elapsed_ms:.1f}ms"
)
return success
def _generate_interface_pins(self):
"""Cached pin generation for performance."""
cache_key = self._get_pin_cache_key()
if cache_key in self._pin_generation_cache:
self.interface_pins = self._pin_generation_cache[cache_key]
return
# Generate pins with optimized algorithm
start_time = time.perf_counter()
# Use sets for O(1) lookup instead of lists
internal_node_set = set(self.child_nodes)
input_connections = {}
output_connections = {}
# Single pass through all connections
for node in self.child_nodes:
for pin in node.input_pins:
for conn in pin.connections:
if conn.output_pin.node not in internal_node_set:
pin_type = pin.pin_type
if pin_type not in input_connections:
input_connections[pin_type] = []
input_connections[pin_type].append(conn)
for pin in node.output_pins:
for conn in pin.connections:
if conn.input_pin.node not in internal_node_set:
pin_type = pin.pin_type
if pin_type not in output_connections:
output_connections[pin_type] = []
output_connections[pin_type].append(conn)
# Create interface pins
self.interface_pins = []
for pin_type, conns in input_connections.items():
self.interface_pins.append(GroupPin(self, 'input', pin_type, conns))
for pin_type, conns in output_connections.items():
self.interface_pins.append(GroupPin(self, 'output', pin_type, conns))
# Cache results
self._pin_generation_cache[cache_key] = self.interface_pins
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.debug(f"Pin generation took {elapsed_ms:.1f}ms for "
f"{len(self.child_nodes)} nodes")
def expand(self) -> bool:
"""Optimized expansion with batch operations."""
start_time = time.perf_counter()
if not self.is_collapsed:
return True
# Batch visibility updates to reduce redraws
self.scene().blockSignals(True)
try:
# Restore positions in batch
for node in self.child_nodes:
if hasattr(node, 'expanded_position'):
node.setPos(node.expanded_position)
node.setVisible(True)
for group in self.child_groups:
group.setVisible(True)
self.is_collapsed = False
self._update_expanded_appearance()
finally:
self.scene().blockSignals(False)
self.scene().update() # Single update instead of per-item
elapsed_ms = (time.perf_counter() - start_time) * 1000
# Verify NFR2: 5ms per node for expansion
target_ms = len(self.child_nodes) * 5
if elapsed_ms > target_ms:
logger.warning(
f"Group expansion exceeded target ({target_ms}ms): "
f"{elapsed_ms:.1f}ms for {len(self.child_nodes)} nodes"
)
return True# src/performance/large_graph_optimizations.py
class LargeGraphOptimizer:
"""Optimization strategies for graphs with 1000+ nodes."""
def __init__(self, node_graph):
self.node_graph = node_graph
self.viewport_culling = ViewportCulling(node_graph)
self.level_of_detail = LevelOfDetail(node_graph)
def optimize_for_size(self, node_count: int):
"""Apply size-appropriate optimizations."""
if node_count > 1000:
# Activate aggressive optimizations
self.viewport_culling.enable()
self.level_of_detail.enable()
self._enable_render_caching()
elif node_count > 500:
# Moderate optimizations
self.viewport_culling.enable()
self.level_of_detail.set_mode('moderate')
else:
# Minimal optimizations for small graphs
self.viewport_culling.disable()
self.level_of_detail.disable()
class ViewportCulling:
"""Cull items outside visible viewport."""
def __init__(self, node_graph):
self.node_graph = node_graph
self.enabled = False
def enable(self):
"""Enable viewport culling."""
self.enabled = True
self.node_graph.view.viewportChanged.connect(self._update_visibility)
def _update_visibility(self):
"""Update item visibility based on viewport."""
if not self.enabled:
return
visible_rect = self.node_graph.view.mapToScene(
self.node_graph.view.viewport().rect()).boundingRect()
# Expand visible area for smooth scrolling
margin = 100
visible_rect.adjust(-margin, -margin, margin, margin)
for item in self.node_graph.items():
if isinstance(item, (Node, NodeGroup)):
item.setVisible(visible_rect.intersects(item.boundingRect()))The architecture maintains 100% backward compatibility with existing .md files while extending the format to support new group metadata.
# src/flow_format.py - Enhanced for grouping support
class EnhancedFlowFormat(FlowFormat):
"""Extended flow format supporting groups while maintaining compatibility."""
FORMAT_VERSION = "1.1" # Incremental version for new features
def serialize_graph(self, node_graph) -> str:
"""Serialize graph with optional group data."""
# Generate base markdown (compatible with v1.0)
base_markdown = super().serialize_graph(node_graph)
# Add group metadata if groups exist
if node_graph.group_manager.groups:
group_metadata = self._serialize_groups(node_graph.group_manager.groups)
base_markdown += "\n\n<!-- GROUP_METADATA_V1.1\n"
base_markdown += json.dumps(group_metadata, indent=2)
base_markdown += "\n-->\n"
return base_markdown
def deserialize_graph(self, markdown_content: str, node_graph):
"""Deserialize with group support and version detection."""
# Extract group metadata if present
group_metadata = self._extract_group_metadata(markdown_content)
# Remove group metadata for base parsing
clean_content = self._remove_group_metadata(markdown_content)
# Parse base graph (maintains v1.0 compatibility)
super().deserialize_graph(clean_content, node_graph)
# Apply group data if available
if group_metadata:
self._apply_group_metadata(group_metadata, node_graph)
def _serialize_groups(self, groups: List[NodeGroup]) -> Dict[str, Any]:
"""Serialize group data to metadata format."""
return {
'format_version': self.FORMAT_VERSION,
'groups': [group.serialize() for group in groups],
'group_hierarchy': self._build_hierarchy_map(groups),
'compatibility_notes': [
'This file contains node grouping data',
'Groups will be ignored when opened in PyFlowGraph < v0.8.0',
'All node and connection data remains fully compatible'
]
}
def _extract_group_metadata(self, content: str) -> Optional[Dict[str, Any]]:
"""Extract group metadata from markdown comments."""
import re
pattern = r'<!-- GROUP_METADATA_V[\d\.]+\n(.*?)\n-->'
match = re.search(pattern, content, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
logger.warning("Invalid group metadata found, ignoring")
return None
return None
def _apply_group_metadata(self, metadata: Dict[str, Any], node_graph):
"""Apply group metadata to recreate group structure."""
if metadata.get('format_version', '1.0') < '1.1':
logger.info("Unsupported group metadata version, skipping")
return
# Create groups in dependency order
created_groups = {}
for group_data in metadata['groups']:
group = NodeGroup(
group_data['name'],
group_data['description']
)
# Restore group properties
group.group_id = group_data['group_id']
group.is_collapsed = group_data['is_collapsed']
group.depth_level = group_data['depth_level']
# Set bounds
bounds_data = group_data['group_bounds']
group.group_bounds = QRectF(
bounds_data['x'], bounds_data['y'],
bounds_data['width'], bounds_data['height']
)
created_groups[group.group_id] = group
node_graph.group_manager.groups.append(group)
node_graph.addItem(group)
# Restore group relationships and node assignments
for group_data in metadata['groups']:
group = created_groups[group_data['group_id']]
# Add child nodes
for node_id in group_data['child_node_ids']:
node = node_graph.get_node_by_id(node_id)
if node:
group.add_child_node(node)
# Add child groups
for child_group_id in group_data['child_group_ids']:
child_group = created_groups.get(child_group_id)
if child_group:
group.add_child_group(child_group)# src/compatibility/version_handler.py
class FileVersionHandler:
"""Handle file format versions and migrations."""
SUPPORTED_VERSIONS = ['1.0', '1.1']
CURRENT_VERSION = '1.1'
def detect_version(self, file_content: str) -> str:
"""Detect file format version."""
# Check for group metadata
if 'GROUP_METADATA_V1.1' in file_content:
return '1.1'
# Default to v1.0 for compatibility
return '1.0'
def ensure_compatibility(self, file_content: str,
target_version: str = None) -> str:
"""Ensure file content is compatible with target version."""
current_version = self.detect_version(file_content)
target_version = target_version or self.CURRENT_VERSION
if current_version == target_version:
return file_content
# Migration logic
if current_version == '1.0' and target_version == '1.1':
# No migration needed - v1.1 is backward compatible
return file_content
elif current_version == '1.1' and target_version == '1.0':
# Downgrade by removing group metadata
return self._remove_group_metadata(file_content)
else:
raise ValueError(f"Unsupported version migration: "
f"{current_version} -> {target_version}")
def _remove_group_metadata(self, content: str) -> str:
"""Remove group metadata for v1.0 compatibility."""
import re
pattern = r'\n\n<!-- GROUP_METADATA_V[\d\.]+\n.*?\n-->\n'
return re.sub(pattern, '', content, flags=re.DOTALL)Based on the PRD epic structure, implementation follows a carefully planned sequence ensuring continuous integration and testing.
Duration: 2-3 weeks Deliverables:
- CommandBase abstract class with execution framework
- CommandHistory manager with memory constraints
- Basic node operation commands (Create, Delete)
- Connection operation commands (Create, Delete)
- Integration into NodeGraph operations
- Keyboard shortcut implementation (Ctrl+Z, Ctrl+Y)
Technical Milestones:
- All node/connection operations execute via commands
- Undo/redo functionality working for basic operations
- Memory usage stays under 50MB limit (NFR3)
- Individual operations complete under 100ms (NFR1)
Duration: 2 weeks Deliverables:
- Node movement and property change commands
- Code modification undo support
- Composite commands for multi-operation transactions
- Copy/paste operation undo
- Undo History UI dialog
- Menu integration with dynamic descriptions
Technical Milestones:
- All graph operations are undoable
- Composite operations group correctly
- UI shows appropriate undo/redo states
- Bulk operations complete under 500ms (NFR1)
Duration: 3-4 weeks Deliverables:
- NodeGroup class with hierarchy support
- GroupPin interface system
- Group creation from selection
- Collapse/expand functionality
- Basic group visual representation
- Group validation logic
Technical Milestones:
- Groups collapse to single node representation
- Interface pins route connections correctly
- Group operations scale linearly (NFR2)
- Nested groups work up to 10 levels (NFR7)
Duration: 2-3 weeks Deliverables:
- Group/ungroup commands for undo system
- Nested group support with navigation
- Group template system
- Template management UI
- Complete file format integration
- Performance optimizations
Technical Milestones:
- All grouping operations are undoable
- Template save/load functionality works
- File format maintains backward compatibility
- Large graphs (1000+ nodes) perform acceptably (NFR6)
Duration: Throughout development Deliverables:
- Comprehensive test suite additions
- Performance benchmarking
- UI polish and user experience refinement
- Documentation updates
- Bug fixes and stability improvements
Technical Milestones:
- Test coverage > 90% for new functionality
- All performance requirements met (NFR1-NFR7)
- Zero regression in existing functionality
- Professional UI consistency maintained
1. Command History Memory Management (NFR3)
- Risk: Command history exceeding 50MB limit with complex operations
- Mitigation:
- Implement aggressive memory monitoring
- Use lazy serialization for large command data
- Provide manual history clearing options
- Add memory usage indicators in UI
2. Large Group Performance (NFR2, NFR6)
- Risk: Group operations becoming unusably slow with 200+ nodes
- Mitigation:
- Implement viewport culling for large groups
- Use cached bounds calculation
- Provide performance warnings and degradation modes
- Add progress indicators for long operations
3. Backward Compatibility Maintenance
- Risk: File format changes breaking existing workflows
- Mitigation:
- Extensive compatibility testing with existing files
- Version detection and migration tools
- Fallback modes for unsupported features
- Clear communication about format evolution
4. Qt Graphics Performance with Deep Nesting
- Risk: QGraphicsItemGroup performance degradation with deep hierarchy
- Mitigation:
- Benchmark Qt performance with deep nesting
- Implement custom rendering for collapsed groups
- Provide flattening options for performance
5. Undo/Redo State Consistency
- Risk: Complex operations leaving system in inconsistent state
- Mitigation:
- Implement ACID properties for all commands (NFR5)
- Add state validation after each operation
- Provide recovery mechanisms for corruption
# tests/test_command_system.py - Example test structure
class TestCommandSystem:
"""Comprehensive command system testing."""
def test_memory_limits_enforcement(self):
"""Verify NFR3: Memory usage under 50MB."""
command_history = CommandHistory(max_depth=200)
# Create memory-intensive commands
for i in range(100):
large_node_command = self._create_large_node_command()
command_history.execute_command(large_node_command)
# Verify memory constraint
memory_usage = command_history._memory_monitor.current_usage
assert memory_usage < 50 * 1024 * 1024, \
f"Memory usage {memory_usage} exceeds 50MB limit"
def test_performance_requirements(self):
"""Verify NFR1: Operation timing requirements."""
node_graph = self._create_test_graph()
# Test individual operation timing
start_time = time.perf_counter()
command = CreateNodeCommand(node_graph, "TestNode", QPointF(0, 0))
success = node_graph.execute_command(command)
elapsed_ms = (time.perf_counter() - start_time) * 1000
assert success, "Command execution failed"
assert elapsed_ms < 100, \
f"Individual operation took {elapsed_ms:.1f}ms, exceeds 100ms limit"
def test_group_scaling_performance(self):
"""Verify NFR2: Group operation scaling."""
node_graph = self._create_test_graph_with_nodes(100)
nodes = list(node_graph.nodes)
start_time = time.perf_counter()
group = node_graph.group_manager.create_group(nodes, "TestGroup")
creation_time = time.perf_counter() - start_time
# Should be ~10ms per node
expected_max_ms = len(nodes) * 10
actual_ms = creation_time * 1000
assert actual_ms < expected_max_ms, \
f"Group creation took {actual_ms:.1f}ms for {len(nodes)} nodes, "
f"exceeds {expected_max_ms}ms target"The architecture extends PyFlowGraph to support enterprise workflow automation scenarios through specialized node types and execution models.
# Base class for integration nodes
class IntegrationNode(Node):
"""Base class for external system integration nodes."""
def __init__(self):
super().__init__()
self.authentication = None
self.connection_pool = None
self.retry_policy = RetryPolicy()
def configure_authentication(self, auth_config):
"""Configure authentication for external services."""
pass
def execute_with_retry(self, operation):
"""Execute operation with retry and error handling."""
pass- HTTP/REST Nodes: Request builders, response parsers, authentication handlers
- Database Nodes: Connection pooling, query builders, transaction management
- Message Queue Nodes: Publishers, subscribers, acknowledgment handling
- File System Nodes: Watchers, processors, batch operations
- Cloud Service Nodes: S3, Azure Blob, GCS with native SDK integration
# Enhanced event system for workflow automation
class WorkflowEventSystem:
"""Event system for webhook and trigger-based execution."""
def register_webhook(self, endpoint: str, graph_id: str):
"""Register webhook endpoint for graph trigger."""
pass
def schedule_workflow(self, graph_id: str, cron_expression: str):
"""Schedule periodic workflow execution."""
pass
def handle_external_trigger(self, trigger_type: str, payload: dict):
"""Process external triggers (webhooks, file changes, etc.)."""
pass- Data Mappers: Field mapping, type conversion, schema transformation
- Aggregators: Group by, sum, average, count operations
- Filters: Conditional filtering, validation, data quality checks
- Formatters: JSON, XML, CSV, Excel converters with templates
- Stream processing for large datasets
- Batch processing with configurable chunk sizes
- Memory-efficient data handling
- Parallel processing for independent branches
class WorkflowExecutor(GraphExecutor):
"""Enhanced executor for workflow automation."""
def __init__(self):
super().__init__()
self.scheduler = WorkflowScheduler()
self.monitor = ExecutionMonitor()
self.error_handler = ErrorHandler()
def execute_with_orchestration(self, graph):
"""Execute with full orchestration capabilities."""
# Scheduling, monitoring, error handling, retry logic
pass- Error Handling: Try-catch nodes, error routing, dead letter queues
- Retry Policies: Exponential backoff, max attempts, retry conditions
- Transaction Support: Rollback capabilities, compensation workflows
- Monitoring: Execution metrics, performance tracking, alerting
This technical architecture provides a comprehensive foundation for implementing Command Pattern-based undo/redo functionality and Node Grouping system in PyFlowGraph, while positioning it as a professional workflow automation platform. The design carefully balances performance requirements, backward compatibility, and extensibility while maintaining the application's existing architectural patterns and enabling enterprise-grade automation capabilities.
Key Success Factors:
- Incremental Implementation: Phased approach ensures continuous integration
- Performance-First Design: Architecture optimized for specified performance requirements
- Backward Compatibility: File format evolution maintains existing workflow compatibility
- Extensible Foundation: Command Pattern enables future feature expansion
- Qt Integration: Leverages existing PySide6 patterns and optimizations
- Enterprise Ready: Integration architecture supports production automation scenarios
- Developer Friendly: Python-native approach enables unlimited customization
The architecture enables PyFlowGraph to transition from "interesting prototype" to "professional workflow automation platform" by addressing critical competitive gaps while establishing a foundation for enterprise-grade automation capabilities.
Document Status: Ready for Development Phase Implementation
Next Phase: Begin Epic 1 - Command Pattern Foundation Development