-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_launch_detection_processor.py
More file actions
115 lines (95 loc) · 4.59 KB
/
simple_launch_detection_processor.py
File metadata and controls
115 lines (95 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import logging
from typing import Optional
from src.domain.processor.base_stage_processor import BaseStageProcessor
from src.models.ai.processing_models import ProcessingContext
from src.domain.processor.stage_graph import StageGraph, StageNode
from src.domain.model.events.token_launch_event import TokenLaunchEvent
from src.domain.processor.stages.token_metadata_stage import TokenMetadataStage
from src.domain.processor.stages.image_preparation_stage import ImagePreparationStage
from src.domain.processor.stages.simple_file_storage_stage import SimpleFileStorageStage
logger = logging.getLogger(__name__)
class SimpleLaunchDetectionProcessor(BaseStageProcessor[TokenLaunchEvent, str]):
"""
Simplified launch detection processor that handles token metadata extraction, image processing, and file storage.
This processor removes AI processing components but includes proper image processing:
- No tag classification
- No image captioning
- No CLIP embeddings
- No vector database insertion
- No console action stage (database operations)
Instead, it includes:
1. Extracts token metadata (reuses TokenMetadataStage)
2. Downloads and processes images to 512x512px JPEG with 90% quality (ImagePreparationStage)
3. Saves JSON + processed JPG file pairs to pending directory
"""
def __init__(self, processor_name: Optional[str] = None):
"""
Initialize simplified launch detection processor
Args:
processor_name: Optional custom processor name
"""
# Create the stage graph (3-stage pipeline: metadata → image processing → file storage)
stage_graph = self._create_stage_graph()
super().__init__(
processor_name=processor_name or "SimpleLaunchDetectionProcessor",
llm_service=None, # No LLM service needed
stage_graph=stage_graph,
)
def _create_stage_graph(self) -> StageGraph:
"""Create the stage graph for simplified launch detection processing with 3 stages"""
# Create stage instances
metadata_stage = TokenMetadataStage()
image_preparation_stage = ImagePreparationStage()
file_storage_stage = SimpleFileStorageStage()
# Create stage nodes with connections - 3-stage pipeline
# 1. Extract metadata, resolve IPFS, create MemecoinEntry with basic data
# 2. Download and process images to 512x512px JPEG with 90% quality
# 3. Save as JSON + processed JPG file pairs to pending directory
nodes = [
StageNode(stage=metadata_stage, next_stages=["ImagePreparationStage"]),
StageNode(
stage=image_preparation_stage, next_stages=["SimpleFileStorageStage"]
),
StageNode(
stage=file_storage_stage,
next_stages=[], # Final stage
),
]
# Create and return stage graph
return StageGraph(nodes=nodes, entry_point="TokenMetadataStage")
def _extract_output(
self, context: ProcessingContext[TokenLaunchEvent]
) -> Optional[str]:
"""Extract final output from processing context"""
# Check if processing was terminated due to error
if context.results.get("processing_error"):
logger.info(
f"🛑 Processing terminated: {context.results.get('processing_error')}"
)
return None
# Return the file path of the saved JSON file as output
file_path = context.results.get("saved_json_path")
if not file_path:
logger.error(
"❌ No saved file path found in context after successful processing"
)
return None
return file_path
async def initialize(self, **kwargs) -> None:
"""Initialize the processor and all stages"""
await super().initialize(**kwargs)
# Initialize all stages in the stage graph
if self._stage_graph:
for stage in self._stage_graph.stages:
if hasattr(stage, "initialize"):
try:
await stage.initialize(**kwargs)
logger.debug(f"✅ Initialized stage: {stage.get_stage_name()}")
except Exception as e:
logger.error(
f"❌ Failed to initialize stage {stage.get_stage_name()}: {e}"
)
raise
logger.info(
"✅ SimpleLaunchDetectionProcessor initialized with 3-stage pipeline: metadata extraction + image processing + file storage (no AI processing)"
)