-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsv_launch_detection_workflow.py
More file actions
186 lines (150 loc) · 6.66 KB
/
csv_launch_detection_workflow.py
File metadata and controls
186 lines (150 loc) · 6.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
"""
CSV launch detection workflow using domain architecture
Processes historical token data from CSV file with same pipeline as live detection
"""
import asyncio
import logging
import signal
import sys
import platform
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Add project root to Python path and import constants
from src.constants import PROJECT_ROOT
sys.path.insert(0, str(PROJECT_ROOT))
from src.domain.workflow.default_workflow import DefaultWorkflow
from src.domain.input_source.csv_token_input_source import CSVTokenInputSource
from src.domain.processor.simple_launch_detection_processor import (
SimpleLaunchDetectionProcessor,
)
from src.domain.executor.simple_file_executor import SimpleFileExecutor
# Configure logging with detailed error tracking
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)
# Set INFO level logging for key workflow components
logging.getLogger("src.domain.processor.simple_launch_detection_processor").setLevel(
logging.INFO
)
logging.getLogger("src.domain.processor.stages.token_metadata_stage").setLevel(
logging.INFO
)
logging.getLogger("src.domain.processor.stages.simple_file_storage_stage").setLevel(
logging.INFO
)
logging.getLogger("src.domain.input_source.csv_token_input_source").setLevel(
logging.INFO
)
logging.getLogger("src.domain.executor.simple_file_executor").setLevel(logging.INFO)
logger = logging.getLogger(__name__)
class CSVLaunchDetectionWorkflow:
"""CSV launch detection workflow orchestrator - processes historical token data"""
def __init__(self, csv_file: str, batch_size: int = 100):
"""
Initialize CSV launch detection workflow
Args:
csv_file: Path to CSV file containing token data
batch_size: Number of rows to read and process at a time (default: 100)
"""
self.csv_file = csv_file
self.batch_size = batch_size
self.workflow = None
self.input_source = None
self.running = False
async def initialize(self):
"""Initialize all workflow components"""
logger.info("🔧 Initializing CSV Launch Detection Workflow...")
# Create CSV input source
self.input_source = CSVTokenInputSource(
csv_file=self.csv_file, batch_size=self.batch_size
)
# Create simplified processor (no AI services needed)
processor = SimpleLaunchDetectionProcessor()
# Create simple file executor (no database operations)
executor = SimpleFileExecutor()
await executor.initialize()
# Create workflow
self.workflow = DefaultWorkflow(
input_source=self.input_source, processor=processor, executor=executor
)
# Initialization will happen in DefaultWorkflow
logger.info(
"✅ CSV Launch Detection Workflow initialized with 3-stage pipeline: "
"metadata extraction + image processing + file storage (no AI processing)"
)
async def start(self):
"""Start the workflow"""
if not self.workflow:
await self.initialize()
logger.info("🚀 Starting CSV Launch Detection Workflow...")
print("🚀 CSV Token Launch Detection Workflow Starting...")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print(f"📁 CSV File: {self.csv_file}")
print(f"📦 Batch size: {self.batch_size} rows")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
print("🎯 Processing historical token launches from CSV...")
print("📁 File storage enabled - memecoins will be saved as JSON + JPG pairs")
print("🖼️ Images processed to 512x512px JPEG with 90% quality")
print("💾 Target directory: res/memecoins/pending/")
print("🗑️ Processed rows will be deleted from CSV file")
print("❌ AI processing disabled - no tags, captions, or embeddings")
print("❌ Database storage disabled - only file operations")
print("📊 Complete token data will be stored as files for later processing")
print("🛑 Press Ctrl+C to stop")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
self.running = True
# Use the DefaultWorkflow's built-in initialization and run
await self.workflow.run_indefinitely()
async def stop(self):
"""Stop the workflow"""
logger.info("🛑 Stopping CSV Launch Detection Workflow...")
self.running = False
if self.workflow:
await self.workflow.stop()
logger.info("✅ CSV Launch Detection Workflow stopped")
async def main(csv_file: str = "res/token_dataset.csv", batch_size: int = 100):
"""Main function
Args:
csv_file: Path to CSV file containing token data (default: res/token_dataset.csv)
batch_size: Number of rows to read and process at a time (default: 100)
"""
# Verify CSV file exists
csv_path = Path(csv_file)
if not csv_path.is_absolute():
csv_path = PROJECT_ROOT / csv_file
if not csv_path.exists():
logger.error(f"❌ CSV file not found: {csv_path}")
logger.info(f"💡 Please ensure the CSV file exists at: {csv_path}")
sys.exit(1)
# Create and run workflow
workflow = CSVLaunchDetectionWorkflow(csv_file=csv_file, batch_size=batch_size)
# Handle graceful shutdown with cross-platform signal handling
def signal_handler():
logger.info("🛑 Shutdown signal received")
asyncio.create_task(workflow.stop())
# Set up signal handlers (Unix/Linux/macOS only - Windows doesn't support loop.add_signal_handler)
if platform.system() != "Windows":
loop = asyncio.get_event_loop()
for sig in [signal.SIGTERM, signal.SIGINT]:
loop.add_signal_handler(sig, signal_handler)
else:
# Windows: signal handling is done through KeyboardInterrupt exception
logger.info(
"🪟 Running on Windows - signal handling via KeyboardInterrupt only"
)
try:
await workflow.start()
except KeyboardInterrupt:
await workflow.stop()
except Exception as e:
logger.error(f"❌ Unexpected error: {e}")
await workflow.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Goodbye!")
sys.exit(0)