-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_file_executor.py
More file actions
42 lines (32 loc) · 1.29 KB
/
simple_file_executor.py
File metadata and controls
42 lines (32 loc) · 1.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
"""
Simple executor for launch detection workflow that just logs completion
"""
import logging
from typing import Optional, Any
from src.domain.executor.base_executor import Executor
logger = logging.getLogger(__name__)
class SimpleFileExecutor(Executor):
"""
Simple executor that just logs the completion of file storage operations.
Used by the SimpleLaunchDetectionProcessor workflow when files are saved
by the SimpleFileStorageStage - no additional action needed beyond logging.
"""
async def initialize(self):
"""Initialize the executor"""
logger.info("✅ SimpleFileExecutor initialized")
async def execute(self, action: Any) -> None:
"""
Execute simple logging for completed file operations
Args:
action: Usually a string containing the saved JSON file path from SimpleLaunchDetectionProcessor
"""
try:
if isinstance(action, str):
# Log successful file storage
logger.info(f"✅ Token data saved to file: {action}")
print(f"💾 Saved: {action}")
else:
logger.debug(f"📝 Execution completed: {action}")
except Exception as e:
logger.error(f"❌ Error in SimpleFileExecutor: {e}")
raise