-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlive_launch_detection_workflow.py
More file actions
151 lines (120 loc) · 5.39 KB
/
live_launch_detection_workflow.py
File metadata and controls
151 lines (120 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
"""
Simplified launch detection workflow using domain architecture
Real-time monitoring of new token launches on pump.fun with file-only storage
"""
import asyncio
import logging
import signal
import sys
import platform
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Add project root to Python path and import constants
from src.constants import PROJECT_ROOT
sys.path.insert(0, str(PROJECT_ROOT))
from src.domain.workflow.default_workflow import DefaultWorkflow
from src.domain.input_source.token_launch_source import TokenLaunchSource
from src.domain.processor.simple_launch_detection_processor import (
SimpleLaunchDetectionProcessor,
)
from src.domain.executor.simple_file_executor import SimpleFileExecutor
# Configure logging with detailed error tracking
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)
# Set INFO level logging for key workflow components
logging.getLogger("src.domain.processor.simple_launch_detection_processor").setLevel(
logging.INFO
)
logging.getLogger("src.domain.processor.stages.token_metadata_stage").setLevel(
logging.INFO
)
logging.getLogger("src.domain.processor.stages.simple_file_storage_stage").setLevel(
logging.INFO
)
logging.getLogger("src.domain.input_source.token_launch_source").setLevel(logging.INFO)
logging.getLogger("src.domain.executor.simple_file_executor").setLevel(logging.INFO)
logger = logging.getLogger(__name__)
class LaunchDetectionWorkflow:
"""Simple launch detection workflow orchestrator - file storage only"""
def __init__(self):
self.workflow = None
self.input_source = None
self.running = False
async def initialize(self):
"""Initialize all workflow components"""
logger.info("🔧 Initializing Simplified Launch Detection Workflow...")
# Create input source (defaulting to pump.fun platform)
self.input_source = TokenLaunchSource(platform="pump.fun")
# Create simplified processor (no AI services needed)
processor = SimpleLaunchDetectionProcessor()
# Create simple file executor (no database operations)
executor = SimpleFileExecutor()
await executor.initialize()
# Create workflow
self.workflow = DefaultWorkflow(
input_source=self.input_source, processor=processor, executor=executor
)
# Initialization will happen in DefaultWorkflow
logger.info(
"✅ Simplified Launch Detection Workflow initialized with 3-stage pipeline: metadata extraction + image processing + file storage (no AI processing)"
)
async def start(self):
"""Start the workflow"""
if not self.workflow:
await self.initialize()
logger.info("🚀 Starting Simplified Launch Detection Workflow...")
print("🚀 Simplified Token Launch Detection Workflow Starting...")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print("📡 Connecting to token launch monitoring APIs...")
print("🎯 Monitoring for new token launches on pump.fun...")
print("📁 File storage enabled - memecoins will be saved as JSON + JPG pairs")
print("🖼️ Images processed to 512x512px JPEG with 90% quality")
print("💾 Target directory: res/memecoins/pending/")
print("❌ AI processing disabled - no tags, captions, or embeddings")
print("❌ Database storage disabled - only file operations")
print("📊 Complete token data will be stored as files for later processing")
print("🛑 Press Ctrl+C to stop")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
self.running = True
# Use the DefaultWorkflow's built-in initialization and run
await self.workflow.run_indefinitely()
async def stop(self):
"""Stop the workflow"""
logger.info("🛑 Stopping Simplified Launch Detection Workflow...")
self.running = False
if self.workflow:
await self.workflow.stop()
logger.info("✅ Simplified Launch Detection Workflow stopped")
async def main():
"""Main function"""
workflow = LaunchDetectionWorkflow()
# Handle graceful shutdown with cross-platform signal handling
def signal_handler():
logger.info("🛑 Shutdown signal received")
asyncio.create_task(workflow.stop())
# Set up signal handlers (Unix/Linux/macOS only - Windows doesn't support loop.add_signal_handler)
if platform.system() != "Windows":
loop = asyncio.get_event_loop()
for sig in [signal.SIGTERM, signal.SIGINT]:
loop.add_signal_handler(sig, signal_handler)
else:
# Windows: signal handling is done through KeyboardInterrupt exception
logger.info(
"🪟 Running on Windows - signal handling via KeyboardInterrupt only"
)
try:
await workflow.start()
except KeyboardInterrupt:
await workflow.stop()
except Exception as e:
logger.error(f"❌ Unexpected error: {e}")
await workflow.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Goodbye!")
sys.exit(0)