What This Document Covers:
- Event-driven domain architecture (InputSource → Processor → Executor)
- Multi-stage processing pipelines with StageGraph orchestration
- 30+ specialized processing stages
- ProcessingContext system for stage coordination
- Integration patterns and workflows
Sections in This Document:
- Core Architecture Pattern
- Module Structure
- Processing Stages
- ProcessingContext System
- StageGraph Orchestration
- Error Handling
- Integration Patterns
Related Documentation:
- → ORCHESTRATORS.md - Orchestrator coordination layer
- → VECTOR_DATABASE.md - Vector database architecture
- → ../../.claude/ARCHITECTURE.md - System-wide architecture
- → ../../src/workflows/ - Workflow implementations
Context Tags: #architecture #domain #processors #stages #event-driven #workflow-framework
InputSource → Processor → Executor
↓ ↓ ↓
Events → Actions → Execution
- InputSource: Generates domain events from various sources (WebSocket streams, filesystem, HTTP requests)
- Processor: Transforms events through multi-stage pipelines to produce actions
- Executor: Executes actions (database insertion, file output, console logging)
The framework ensures:
- Separation of concerns: Data generation, processing, and execution are independent
- Composability: Components can be mixed and matched for different workflows
- Testability: Each component can be tested in isolation
- Flexibility: New sources, processors, and executors can be added without modifying existing code
Location: src/domain/workflow/
Complete lifecycle management for domain workflows.
Features:
- Initialize → Run → Stop lifecycle
- Async event handling with callbacks
- Component coordination (InputSource ↔ Processor ↔ Executor)
- Graceful shutdown with cleanup
Usage Pattern:
workflow = DefaultWorkflow(input_source, processor, executor)
await workflow.initialize()
await workflow.run_indefinitely()Location: src/domain/input_source/
Event generators produce domain events from various sources. All input sources implement BaseInputSource interface.
Abstract interface for event generation with async callback support.
Interface:
class InputSource(ABC):
_on_new_event: Optional[AsyncCallback[Event]] = None
async def initialize(self, callback: Optional[AsyncCallback[Event]] = None):
self._on_new_event = callback| Input Source | Purpose | Event Type |
|---|---|---|
| TokenLaunchSource | Real-time WebSocket token monitoring from pump.fun | TokenLaunchEvent |
| GenerationFileInputSource | File-based AI generation requests | GenerationRequestEvent |
| FilesystemMemecoinInputSource | Filesystem-based memecoin data processing | Custom events |
| WebGenerationInputSource | HTTP API generation requests | WebGenerationRequestEvent |
Location: src/domain/processor/
Processors transform input events through multi-stage pipelines using StageGraph orchestration.
Generic base class for all processors with type-safe input/output.
Interface:
class BaseStageProcessor(ABC, Generic[IN, OUT]):
def __init__(
self,
processor_name: Optional[str] = None,
llm_service: Optional[LiteLLMService] = None,
stage_graph: Optional[StageGraph] = None,
executor_router=None
)
async def initialize(self, **kwargs) -> None
async def process(self, input_data: IN, action_handler: callable = None) -> Optional[OUT]
@abstractmethod
def _extract_output(self, context: ProcessingContext[IN]) -> Optional[OUT]Key Responsibilities:
- Initialize and manage StageGraph execution
- Coordinate LLM service dependencies
- Extract final output from processing context
- Handle early termination scenarios
| Processor | Stages | Purpose | Input → Output |
|---|---|---|---|
| LaunchDetectionProcessor | 5 stages | AI-enhanced token processing with metadata resolution | TokenLaunchEvent → MemecoinAction |
| SimpleLaunchDetectionProcessor | 3 stages | Non-AI file storage for quick token saves | TokenLaunchEvent → File output |
| RagGenerationProcessor | 5 stages | RAG-enhanced generation with vector search | GenerationRequestEvent → Generated files |
| RAGMemecoinInsertionProcessor | 8 stages | Source-agnostic AI processing pipeline | Memecoin data → MemecoinAction |
| BackupProcessingProcessor | Basic | Basic backup data processing | Backup files → Actions |
| EnhancedBackupProcessingProcessor | Enhanced | Enhanced backup processing with validation | Backup files → Validated actions |
Location: src/domain/executor/
Executors perform the final actions produced by processors (database writes, file creation, console output).
Abstract interface for action execution.
Interface:
class Executor(ABC):
async def execute(self, action: Action):
pass # Override this| Executor | Purpose | Action Types |
|---|---|---|
| MemecoinInsertionExecutor | Database storage for memecoins | MemecoinAction |
| SimpleFileExecutor | File-only output executor | File actions |
| ExecutorRouter | Multi-action routing for complex workflows | Multiple action types |
| OutputFileExecutor | Generated file output handling | OutputFileAction |
| GeneratedTokenMetadataExecutor | Token metadata file output | GeneratedTokenMetadataAction |
| GeneratedImageExecutor | Image file output handling | GeneratedImageAction |
| SimilarExamplesExecutor | RAG context file output | SimilarExamplesAction |
| BackupCleanupExecutor | Backup data cleanup operations | Cleanup actions |
Location: src/domain/model/
Domain models define events (inputs) and actions (outputs) with Pydantic validation.
| Event | Purpose | Source |
|---|---|---|
| BaseEvent | Abstract event interface | - |
| TokenLaunchEvent | Real-time token launch data | TokenLaunchSource |
| GenerationRequestEvent | AI generation requests | GenerationFileInputSource |
| WebGenerationRequestEvent | HTTP API generation requests | WebGenerationInputSource |
| Action | Purpose | Executor |
|---|---|---|
| BaseAction | Abstract action interface | - |
| MemecoinAction | Unified memecoin data for console/database output | MemecoinInsertionExecutor |
| GeneratedTokenAction | AI-generated token metadata | Various executors |
RAG Generation Actions (rag_generation_actions.py):
QueryEmbeddingAction- Query embedding resultsSimilarExamplesAction- Retrieved similar examplesGeneratedTokenMetadataAction- Generated token metadataGeneratedImageAction- Generated image dataOutputFileAction- File output results
Location: src/domain/processor/stages/
Processing stages are modular units that transform data through the pipeline. Each stage operates on ProcessingContext and can trigger early termination.
Abstract base class for all processing stages.
Interface:
class BaseStage(ABC, Generic[TInput, TOutput]):
def __init__(self, stage_name: Optional[str] = None)
@abstractmethod
async def process(
self,
context: ProcessingContext[TInput]
) -> ProcessingContext[TOutput] # Override this
def get_stage_name(self) -> str
async def initialize(self) -> None
async def cleanup(self) -> None
async def can_skip(self, context: ProcessingContext[TInput]) -> boolKey Features:
- Generic input/output typing
- Async processing with context transformation
- Optional skip conditions
- Lifecycle hooks (initialize, cleanup)
| Stage | Purpose | Input → Output |
|---|---|---|
| TokenMetadataStage | Token parsing + IPFS metadata resolution | Token data → Resolved metadata |
| ImagePreparationStage | Image download + Base64 conversion + resizing | URL → Processed image |
| SimpleFileStorageStage | Atomic file operations for JSON + JPG pairs | Memecoin data → File paths |
| Stage | Purpose | Technology |
|---|---|---|
| CategoryClassificationStage | Dynamic subcategory filtering for tag classification | Gemini API |
| TagClassificationStage | AI-powered multimodal tag classification (356 tags, 19 categories) | Gemini 2.0 Flash Thinking |
| ImageCaptioningStage | AI caption generation with 4-part structure (entity, context, visual, emotions) | Gemini Vision API |
| TagsRefinementStage | Multimodal LLM tag evaluation with configurable conservativeness and confidence-based filtering | Gemini 2.0 Flash Thinking |
| MemecoinEmbeddingStage | CLIP text + image embeddings (768-dimensional) | CLIP model |
| EntityExtractionStage | Extract entities from prompts for accurate generation | LLM |
| ImageGenerationStage | AI image generation with unified visual context | Gemini 2.5 Flash |
| MemeImageGenerationStage | Specialized meme image generation | Meme generation API |
| MemeTextMetadataGenerationStage | Token metadata generation with RAG context | LLM |
Tag Classification Details:
- 356 predefined tags across 19 categories
- Multimodal analysis (text + image)
- Gemini 2.0 Flash Thinking for advanced reasoning
- Dynamic subcategory filtering reduces API token usage
Image Captioning Structure:
- Entity: What/who is in the image
- Context: Background, setting, relationships
- Visual: Colors, composition, style
- Emotions: Mood, tone, feeling
Tags Refinement:
- Configurable conservativeness levels (1-10)
- Confidence-based filtering
- Removes low-confidence or irrelevant tags
- Multimodal validation against image content
| Stage | Purpose | Technology |
|---|---|---|
| RAGQueryEnhancementStage | Transform user prompts into structured 4-part queries (entity/context/visual/emotions) | LLM |
| RAGRetrievalStage | Multi-part vector similarity search across 4 caption collections with weighted ranking | CLIP + ChromaDB |
| SimilarMemesRetrievalStage | Find similar examples for generation context | Vector search |
| RagValidationStage | Validate RAG generation inputs | Pydantic validation |
| RagOutputStage | RAG workflow output handling | File system |
| Stage | Purpose | Input → Output |
|---|---|---|
| ExtractedBackupMetadataStage | Parse backup JSON + PNG files | File paths → Parsed data |
| DeduplicationStage | Remove duplicates with conflict resolution | Dataset → Deduplicated dataset |
| BackupValidationStage | Validate backup data completeness | Backup data → Validated data |
| DuplicateCheckStage | Check for existing duplicates in database | Memecoin data → Duplicate status |
| Stage | Purpose | Output Type |
|---|---|---|
| ConsoleActionStage | Final validation + MemecoinAction creation | MemecoinAction |
| BackupConsoleActionStage | Backup-specific action creation | Backup actions |
| FileOutputStage | File output with timestamped directories | File system |
Location: src/domain/processor/stages/base_stage.py
The framework uses rich processing context for stage coordination and data flow.
@dataclass
class ProcessingContext(Generic[TData]):
input_data: TData
results: Dict[str, Any] = field(default_factory=dict) # Stage results accumulation
metadata: Dict[str, Any] = field(default_factory=dict) # Processing metadata
should_continue_processing: bool = True # Early termination control
termination_reason: Optional[str] = None # Failure reason tracking
def get_result(self, key: str, default: Any = None) -> Any:
"""Get result by key with optional default"""
return self.results.get(key, default)Design Note: ProcessingContext intentionally remains a @dataclass (not migrated to Pydantic) due to its role as a lightweight processing state container with generic typing support. The dataclass approach provides better performance and simpler generic type handling.
if not self._validate_input(data):
context.should_continue_processing = False
context.termination_reason = "Invalid input data"
return contextresult = await self._process_data()
context.results["stage_output"] = resultprevious_result = context.get_result("previous_stage_output")
if not previous_result:
context.should_continue_processing = False
context.termination_reason = "Missing dependency"
return contextKey Features:
- Type-safe: Generic
TDataensures input data type safety - Accumulative: Results dictionary accumulates outputs from all stages
- Metadata tracking: Additional metadata can be stored without polluting results
- Early termination:
should_continue_processingflag stops pipeline execution - Failure tracking:
termination_reasonprovides debugging context
Location: src/domain/processor/stage_graph.py
StageGraph provides directed graph orchestration of processing stages with support for conditional branching, skip conditions, and parallel execution paths.
Pydantic model for stage graph node configuration (migrated from dataclass in October 2025).
class StageNode(BaseModel):
"""Pydantic model for stage graph node configuration"""
stage: BaseStage
stage_id: str = ""
next_stages: List[str] = Field(default_factory=list)
skip_condition: Optional[Callable[[ProcessingContext], bool]] = None
model_config = ConfigDict(
arbitrary_types_allowed=True, # Allow BaseStage and callable types
validate_assignment=False # Performance optimization
)
@model_validator(mode='after')
def set_default_stage_id(self) -> 'StageNode':
"""Auto-generate stage_id from stage name if not provided"""
if not self.stage_id:
self.stage_id = self.stage.get_stage_name()
return self
def should_skip(self, context: ProcessingContext) -> boolMigration Note: StageNode migrated from @dataclass to Pydantic BaseModel in October 2025 for better validation and configuration management while maintaining full backward compatibility with 13+ processor dependencies.
class StageGraph:
def __init__(
self,
nodes: List[StageNode],
entry_point: Optional[str] = None
)
async def execute(
self,
context: ProcessingContext
) -> ProcessingContextKey Features:
- Directed graph execution: Nodes define execution order via
next_stages - Conditional branching: Skip conditions allow dynamic path selection
- Early termination: Respects
context.should_continue_processingflag - Entry point: Optional custom entry point (defaults to first node)
- Type-safe: Pydantic validation for node configuration
Example Usage:
stage_graph = StageGraph(
nodes=[
StageNode(stage=ValidationStage(), next_stages=["processing"]),
StageNode(stage=ProcessingStage(), stage_id="processing", next_stages=["output"]),
StageNode(stage=OutputStage(), stage_id="output"),
],
entry_point="validation"
)
context = ProcessingContext(input_data=my_data)
result_context = await stage_graph.execute(context)Location: src/domain/exceptions/
Simplified error handling with retry logic and exponential backoff.
| Exception | Purpose | Use Case |
|---|---|---|
| ValidationError | Input validation and data format errors | Malformed input data, missing required fields |
| APIServiceError | External API service failures | LLM API errors, IPFS failures, vector DB errors |
Retry logic with exponential backoff and fallback support.
Features:
- Exponential backoff with configurable max attempts
- Fallback function support for degraded operation
- Predefined configurations for common scenarios
- Async/await support
Predefined Retry Configs:
- API: 3 attempts, exponential backoff (for LLM APIs)
- NETWORK: 5 attempts, exponential backoff (for HTTP requests)
- RATE_LIMIT: 3 attempts, longer backoff (for rate-limited APIs)
- DATABASE: 2 attempts, short backoff (for database operations)
- RESOURCE: 4 attempts, exponential backoff (for resource contention)
Usage Example:
from src.domain.exceptions import RetryManager, API_RETRY_CONFIG
retry_manager = RetryManager(config=API_RETRY_CONFIG)
result = await retry_manager.execute_with_retry(
lambda: llm_service.generate(prompt),
fallback=lambda: default_response
)from src.domain.workflow import DefaultWorkflow
from src.domain.input_source import TokenLaunchSource
from src.domain.processor import LaunchDetectionProcessor
from src.domain.executor import MemecoinInsertionExecutor
# Create components
input_source = TokenLaunchSource(platform="pump.fun")
processor = LaunchDetectionProcessor(
llm_service=llm_service,
stage_graph=stage_graph
)
executor = MemecoinInsertionExecutor()
# Initialize workflow
workflow = DefaultWorkflow(input_source, processor, executor)
await workflow.initialize()
await workflow.run_indefinitely()from src.domain.processor.stages import BaseStage
from src.domain.processor.stages.processing_context import ProcessingContext
class CustomStage(BaseStage[MyInput, MyOutput]):
async def process(
self,
context: ProcessingContext[MyInput]
) -> ProcessingContext[MyOutput]:
# Check if previous stages failed
if not context.should_continue_processing:
return context
# Your processing logic
try:
result = await self._do_work(context.input_data)
context.results["my_stage_result"] = result
except Exception as e:
context.should_continue_processing = False
context.termination_reason = f"Processing failed: {str(e)}"
return contextfrom src.domain.processor import BaseStageProcessor
from src.domain.processor.stage_graph import StageGraph, StageNode
class CustomProcessor(BaseStageProcessor[MyInput, MyOutput]):
def __init__(self):
stage_graph = StageGraph(
nodes=[
StageNode(stage=Stage1(), next_stages=["stage2"]),
StageNode(stage=Stage2(), stage_id="stage2", next_stages=["stage3"]),
StageNode(stage=Stage3(), stage_id="stage3"),
]
)
super().__init__(
processor_name="CustomProcessor",
stage_graph=stage_graph
)
def _extract_output(
self,
context: ProcessingContext[MyInput]
) -> Optional[MyOutput]:
if not context.should_continue_processing:
return None
return context.get_result("final_output")- InputSources generate events, don't process them
- Processors transform data, don't fetch or store it
- Executors perform actions, don't transform data
- Stages can stop pipeline execution immediately
- Failure reasons are tracked for debugging
- No wasted computation on invalid data
- Each stage adds results to shared context
- Later stages can access earlier stage outputs
- No global state or side effects
- Generic types ensure compile-time correctness
- Pydantic validation for models
- Type hints throughout
- Mix and match components
- Create custom workflows without modifying framework
- Reuse stages across processors
- Each component can be tested in isolation
- Mock-friendly interfaces
- No hard dependencies
| Component | File Path |
|---|---|
| Workflow Base | src/domain/workflow/default_workflow.py |
| InputSource Base | src/domain/input_source/base_input_source.py |
| Processor Base | src/domain/processor/base_stage_processor.py |
| Executor Base | src/domain/executor/base_executor.py |
| Stage Base | src/domain/processor/stages/base_stage.py |
| StageGraph | src/domain/processor/stage_graph.py |
| ProcessingContext | src/domain/processor/stages/processing_context.py |
| Events | src/domain/model/events/ |
| Actions | src/domain/model/actions/ |
| Exceptions | src/domain/exceptions/ |
| Processor | File Path |
|---|---|
| LaunchDetectionProcessor | src/domain/processor/launch_detection_processor.py |
| SimpleLaunchDetectionProcessor | src/domain/processor/simple_launch_detection_processor.py |
| RagGenerationProcessor | src/domain/processor/rag_generation_processor.py |
| RAGMemecoinInsertionProcessor | src/domain/processor/rag_memecoin_insertion_processor.py |
| BackupProcessingProcessor | src/domain/processor/backup_processing_processor.py |
| EnhancedBackupProcessingProcessor | src/domain/processor/enhanced_backup_processing_processor.py |
Document Status: Complete Last Updated: October 15, 2025 Architecture Version: Event-driven domain framework with 30+ stages