Skip to content

Latest commit

 

History

History
642 lines (489 loc) · 22.5 KB

File metadata and controls

642 lines (489 loc) · 22.5 KB

Domain Architecture Framework

📋 Document Summary

What This Document Covers:

  • Event-driven domain architecture (InputSource → Processor → Executor)
  • Multi-stage processing pipelines with StageGraph orchestration
  • 30+ specialized processing stages
  • ProcessingContext system for stage coordination
  • Integration patterns and workflows

Sections in This Document:

Related Documentation:

Context Tags: #architecture #domain #processors #stages #event-driven #workflow-framework


Core Architecture Pattern

InputSource → Processor → Executor
    ↓           ↓           ↓
  Events   →  Actions  →  Execution

Flow Description

  1. InputSource: Generates domain events from various sources (WebSocket streams, filesystem, HTTP requests)
  2. Processor: Transforms events through multi-stage pipelines to produce actions
  3. Executor: Executes actions (database insertion, file output, console logging)

The framework ensures:

  • Separation of concerns: Data generation, processing, and execution are independent
  • Composability: Components can be mixed and matched for different workflows
  • Testability: Each component can be tested in isolation
  • Flexibility: New sources, processors, and executors can be added without modifying existing code

Module Structure

workflow/ - Orchestration

Location: src/domain/workflow/

DefaultWorkflow

Complete lifecycle management for domain workflows.

Features:

  • Initialize → Run → Stop lifecycle
  • Async event handling with callbacks
  • Component coordination (InputSource ↔ Processor ↔ Executor)
  • Graceful shutdown with cleanup

Usage Pattern:

workflow = DefaultWorkflow(input_source, processor, executor)
await workflow.initialize()
await workflow.run_indefinitely()

input_source/ - Event Generators

Location: src/domain/input_source/

Event generators produce domain events from various sources. All input sources implement BaseInputSource interface.

BaseInputSource

Abstract interface for event generation with async callback support.

Interface:

class InputSource(ABC):
    _on_new_event: Optional[AsyncCallback[Event]] = None

    async def initialize(self, callback: Optional[AsyncCallback[Event]] = None):
        self._on_new_event = callback

Implementations

Input Source Purpose Event Type
TokenLaunchSource Real-time WebSocket token monitoring from pump.fun TokenLaunchEvent
GenerationFileInputSource File-based AI generation requests GenerationRequestEvent
FilesystemMemecoinInputSource Filesystem-based memecoin data processing Custom events
WebGenerationInputSource HTTP API generation requests WebGenerationRequestEvent

processor/ - Processing Framework

Location: src/domain/processor/

Processors transform input events through multi-stage pipelines using StageGraph orchestration.

BaseStageProcessor[IN, OUT]

Generic base class for all processors with type-safe input/output.

Interface:

class BaseStageProcessor(ABC, Generic[IN, OUT]):
    def __init__(
        self,
        processor_name: Optional[str] = None,
        llm_service: Optional[LiteLLMService] = None,
        stage_graph: Optional[StageGraph] = None,
        executor_router=None
    )

    async def initialize(self, **kwargs) -> None
    async def process(self, input_data: IN, action_handler: callable = None) -> Optional[OUT]

    @abstractmethod
    def _extract_output(self, context: ProcessingContext[IN]) -> Optional[OUT]

Key Responsibilities:

  • Initialize and manage StageGraph execution
  • Coordinate LLM service dependencies
  • Extract final output from processing context
  • Handle early termination scenarios

Processor Implementations

Processor Stages Purpose Input → Output
LaunchDetectionProcessor 5 stages AI-enhanced token processing with metadata resolution TokenLaunchEventMemecoinAction
SimpleLaunchDetectionProcessor 3 stages Non-AI file storage for quick token saves TokenLaunchEvent → File output
RagGenerationProcessor 5 stages RAG-enhanced generation with vector search GenerationRequestEvent → Generated files
RAGMemecoinInsertionProcessor 8 stages Source-agnostic AI processing pipeline Memecoin data → MemecoinAction
BackupProcessingProcessor Basic Basic backup data processing Backup files → Actions
EnhancedBackupProcessingProcessor Enhanced Enhanced backup processing with validation Backup files → Validated actions

executor/ - Action Executors

Location: src/domain/executor/

Executors perform the final actions produced by processors (database writes, file creation, console output).

BaseExecutor

Abstract interface for action execution.

Interface:

class Executor(ABC):
    async def execute(self, action: Action):
        pass  # Override this

Executor Implementations

Executor Purpose Action Types
MemecoinInsertionExecutor Database storage for memecoins MemecoinAction
SimpleFileExecutor File-only output executor File actions
ExecutorRouter Multi-action routing for complex workflows Multiple action types
OutputFileExecutor Generated file output handling OutputFileAction
GeneratedTokenMetadataExecutor Token metadata file output GeneratedTokenMetadataAction
GeneratedImageExecutor Image file output handling GeneratedImageAction
SimilarExamplesExecutor RAG context file output SimilarExamplesAction
BackupCleanupExecutor Backup data cleanup operations Cleanup actions

model/ - Domain Models

Location: src/domain/model/

Domain models define events (inputs) and actions (outputs) with Pydantic validation.

events/ - Input Events

Event Purpose Source
BaseEvent Abstract event interface -
TokenLaunchEvent Real-time token launch data TokenLaunchSource
GenerationRequestEvent AI generation requests GenerationFileInputSource
WebGenerationRequestEvent HTTP API generation requests WebGenerationInputSource

actions/ - Processing Results

Action Purpose Executor
BaseAction Abstract action interface -
MemecoinAction Unified memecoin data for console/database output MemecoinInsertionExecutor
GeneratedTokenAction AI-generated token metadata Various executors

RAG Generation Actions (rag_generation_actions.py):

  • QueryEmbeddingAction - Query embedding results
  • SimilarExamplesAction - Retrieved similar examples
  • GeneratedTokenMetadataAction - Generated token metadata
  • GeneratedImageAction - Generated image data
  • OutputFileAction - File output results

Processing Stages (30+ Implementations)

Location: src/domain/processor/stages/

Processing stages are modular units that transform data through the pipeline. Each stage operates on ProcessingContext and can trigger early termination.

BaseStage[TInput, TOutput]

Abstract base class for all processing stages.

Interface:

class BaseStage(ABC, Generic[TInput, TOutput]):
    def __init__(self, stage_name: Optional[str] = None)

    @abstractmethod
    async def process(
        self,
        context: ProcessingContext[TInput]
    ) -> ProcessingContext[TOutput]  # Override this

    def get_stage_name(self) -> str
    async def initialize(self) -> None
    async def cleanup(self) -> None
    async def can_skip(self, context: ProcessingContext[TInput]) -> bool

Key Features:

  • Generic input/output typing
  • Async processing with context transformation
  • Optional skip conditions
  • Lifecycle hooks (initialize, cleanup)

Core Stages

Stage Purpose Input → Output
TokenMetadataStage Token parsing + IPFS metadata resolution Token data → Resolved metadata
ImagePreparationStage Image download + Base64 conversion + resizing URL → Processed image
SimpleFileStorageStage Atomic file operations for JSON + JPG pairs Memecoin data → File paths

AI Processing Stages

Stage Purpose Technology
CategoryClassificationStage Dynamic subcategory filtering for tag classification Gemini API
TagClassificationStage AI-powered multimodal tag classification (356 tags, 19 categories) Gemini 2.0 Flash Thinking
ImageCaptioningStage AI caption generation with 4-part structure (entity, context, visual, emotions) Gemini Vision API
TagsRefinementStage Multimodal LLM tag evaluation with configurable conservativeness and confidence-based filtering Gemini 2.0 Flash Thinking
MemecoinEmbeddingStage CLIP text + image embeddings (768-dimensional) CLIP model
EntityExtractionStage Extract entities from prompts for accurate generation LLM
ImageGenerationStage AI image generation with unified visual context Gemini 2.5 Flash
MemeImageGenerationStage Specialized meme image generation Meme generation API
MemeTextMetadataGenerationStage Token metadata generation with RAG context LLM

Tag Classification Details:

  • 356 predefined tags across 19 categories
  • Multimodal analysis (text + image)
  • Gemini 2.0 Flash Thinking for advanced reasoning
  • Dynamic subcategory filtering reduces API token usage

Image Captioning Structure:

  1. Entity: What/who is in the image
  2. Context: Background, setting, relationships
  3. Visual: Colors, composition, style
  4. Emotions: Mood, tone, feeling

Tags Refinement:

  • Configurable conservativeness levels (1-10)
  • Confidence-based filtering
  • Removes low-confidence or irrelevant tags
  • Multimodal validation against image content

RAG Workflow Stages

Stage Purpose Technology
RAGQueryEnhancementStage Transform user prompts into structured 4-part queries (entity/context/visual/emotions) LLM
RAGRetrievalStage Multi-part vector similarity search across 4 caption collections with weighted ranking CLIP + ChromaDB
SimilarMemesRetrievalStage Find similar examples for generation context Vector search
RagValidationStage Validate RAG generation inputs Pydantic validation
RagOutputStage RAG workflow output handling File system

Data Processing Stages

Stage Purpose Input → Output
ExtractedBackupMetadataStage Parse backup JSON + PNG files File paths → Parsed data
DeduplicationStage Remove duplicates with conflict resolution Dataset → Deduplicated dataset
BackupValidationStage Validate backup data completeness Backup data → Validated data
DuplicateCheckStage Check for existing duplicates in database Memecoin data → Duplicate status

Output Stages

Stage Purpose Output Type
ConsoleActionStage Final validation + MemecoinAction creation MemecoinAction
BackupConsoleActionStage Backup-specific action creation Backup actions
FileOutputStage File output with timestamped directories File system

ProcessingContext System

Location: src/domain/processor/stages/base_stage.py

The framework uses rich processing context for stage coordination and data flow.

ProcessingContext[TData]

@dataclass
class ProcessingContext(Generic[TData]):
    input_data: TData
    results: Dict[str, Any] = field(default_factory=dict)     # Stage results accumulation
    metadata: Dict[str, Any] = field(default_factory=dict)    # Processing metadata
    should_continue_processing: bool = True                   # Early termination control
    termination_reason: Optional[str] = None                  # Failure reason tracking

    def get_result(self, key: str, default: Any = None) -> Any:
        """Get result by key with optional default"""
        return self.results.get(key, default)

Design Note: ProcessingContext intentionally remains a @dataclass (not migrated to Pydantic) due to its role as a lightweight processing state container with generic typing support. The dataclass approach provides better performance and simpler generic type handling.

Context Usage Patterns

1. Early Termination

if not self._validate_input(data):
    context.should_continue_processing = False
    context.termination_reason = "Invalid input data"
    return context

2. Result Accumulation

result = await self._process_data()
context.results["stage_output"] = result

3. Stage Dependencies

previous_result = context.get_result("previous_stage_output")
if not previous_result:
    context.should_continue_processing = False
    context.termination_reason = "Missing dependency"
    return context

Key Features:

  • Type-safe: Generic TData ensures input data type safety
  • Accumulative: Results dictionary accumulates outputs from all stages
  • Metadata tracking: Additional metadata can be stored without polluting results
  • Early termination: should_continue_processing flag stops pipeline execution
  • Failure tracking: termination_reason provides debugging context

StageGraph Orchestration

Location: src/domain/processor/stage_graph.py

StageGraph provides directed graph orchestration of processing stages with support for conditional branching, skip conditions, and parallel execution paths.

StageNode

Pydantic model for stage graph node configuration (migrated from dataclass in October 2025).

class StageNode(BaseModel):
    """Pydantic model for stage graph node configuration"""
    stage: BaseStage
    stage_id: str = ""
    next_stages: List[str] = Field(default_factory=list)
    skip_condition: Optional[Callable[[ProcessingContext], bool]] = None

    model_config = ConfigDict(
        arbitrary_types_allowed=True,  # Allow BaseStage and callable types
        validate_assignment=False       # Performance optimization
    )

    @model_validator(mode='after')
    def set_default_stage_id(self) -> 'StageNode':
        """Auto-generate stage_id from stage name if not provided"""
        if not self.stage_id:
            self.stage_id = self.stage.get_stage_name()
        return self

    def should_skip(self, context: ProcessingContext) -> bool

Migration Note: StageNode migrated from @dataclass to Pydantic BaseModel in October 2025 for better validation and configuration management while maintaining full backward compatibility with 13+ processor dependencies.

StageGraph

class StageGraph:
    def __init__(
        self,
        nodes: List[StageNode],
        entry_point: Optional[str] = None
    )

    async def execute(
        self,
        context: ProcessingContext
    ) -> ProcessingContext

Key Features:

  • Directed graph execution: Nodes define execution order via next_stages
  • Conditional branching: Skip conditions allow dynamic path selection
  • Early termination: Respects context.should_continue_processing flag
  • Entry point: Optional custom entry point (defaults to first node)
  • Type-safe: Pydantic validation for node configuration

Example Usage:

stage_graph = StageGraph(
    nodes=[
        StageNode(stage=ValidationStage(), next_stages=["processing"]),
        StageNode(stage=ProcessingStage(), stage_id="processing", next_stages=["output"]),
        StageNode(stage=OutputStage(), stage_id="output"),
    ],
    entry_point="validation"
)

context = ProcessingContext(input_data=my_data)
result_context = await stage_graph.execute(context)

Error Handling

Location: src/domain/exceptions/

Simplified error handling with retry logic and exponential backoff.

Exception Types

Exception Purpose Use Case
ValidationError Input validation and data format errors Malformed input data, missing required fields
APIServiceError External API service failures LLM API errors, IPFS failures, vector DB errors

RetryManager

Retry logic with exponential backoff and fallback support.

Features:

  • Exponential backoff with configurable max attempts
  • Fallback function support for degraded operation
  • Predefined configurations for common scenarios
  • Async/await support

Predefined Retry Configs:

  • API: 3 attempts, exponential backoff (for LLM APIs)
  • NETWORK: 5 attempts, exponential backoff (for HTTP requests)
  • RATE_LIMIT: 3 attempts, longer backoff (for rate-limited APIs)
  • DATABASE: 2 attempts, short backoff (for database operations)
  • RESOURCE: 4 attempts, exponential backoff (for resource contention)

Usage Example:

from src.domain.exceptions import RetryManager, API_RETRY_CONFIG

retry_manager = RetryManager(config=API_RETRY_CONFIG)
result = await retry_manager.execute_with_retry(
    lambda: llm_service.generate(prompt),
    fallback=lambda: default_response
)

Integration Patterns

Basic Workflow Setup

from src.domain.workflow import DefaultWorkflow
from src.domain.input_source import TokenLaunchSource
from src.domain.processor import LaunchDetectionProcessor
from src.domain.executor import MemecoinInsertionExecutor

# Create components
input_source = TokenLaunchSource(platform="pump.fun")
processor = LaunchDetectionProcessor(
    llm_service=llm_service,
    stage_graph=stage_graph
)
executor = MemecoinInsertionExecutor()

# Initialize workflow
workflow = DefaultWorkflow(input_source, processor, executor)
await workflow.initialize()
await workflow.run_indefinitely()

Custom Stage Implementation

from src.domain.processor.stages import BaseStage
from src.domain.processor.stages.processing_context import ProcessingContext

class CustomStage(BaseStage[MyInput, MyOutput]):
    async def process(
        self,
        context: ProcessingContext[MyInput]
    ) -> ProcessingContext[MyOutput]:
        # Check if previous stages failed
        if not context.should_continue_processing:
            return context

        # Your processing logic
        try:
            result = await self._do_work(context.input_data)
            context.results["my_stage_result"] = result
        except Exception as e:
            context.should_continue_processing = False
            context.termination_reason = f"Processing failed: {str(e)}"

        return context

Custom Processor Implementation

from src.domain.processor import BaseStageProcessor
from src.domain.processor.stage_graph import StageGraph, StageNode

class CustomProcessor(BaseStageProcessor[MyInput, MyOutput]):
    def __init__(self):
        stage_graph = StageGraph(
            nodes=[
                StageNode(stage=Stage1(), next_stages=["stage2"]),
                StageNode(stage=Stage2(), stage_id="stage2", next_stages=["stage3"]),
                StageNode(stage=Stage3(), stage_id="stage3"),
            ]
        )
        super().__init__(
            processor_name="CustomProcessor",
            stage_graph=stage_graph
        )

    def _extract_output(
        self,
        context: ProcessingContext[MyInput]
    ) -> Optional[MyOutput]:
        if not context.should_continue_processing:
            return None
        return context.get_result("final_output")

Design Principles

1. Separation of Concerns

  • InputSources generate events, don't process them
  • Processors transform data, don't fetch or store it
  • Executors perform actions, don't transform data

2. Early Termination

  • Stages can stop pipeline execution immediately
  • Failure reasons are tracked for debugging
  • No wasted computation on invalid data

3. Result Accumulation

  • Each stage adds results to shared context
  • Later stages can access earlier stage outputs
  • No global state or side effects

4. Type Safety

  • Generic types ensure compile-time correctness
  • Pydantic validation for models
  • Type hints throughout

5. Composability

  • Mix and match components
  • Create custom workflows without modifying framework
  • Reuse stages across processors

6. Testability

  • Each component can be tested in isolation
  • Mock-friendly interfaces
  • No hard dependencies

Implementation References

Key Files

Component File Path
Workflow Base src/domain/workflow/default_workflow.py
InputSource Base src/domain/input_source/base_input_source.py
Processor Base src/domain/processor/base_stage_processor.py
Executor Base src/domain/executor/base_executor.py
Stage Base src/domain/processor/stages/base_stage.py
StageGraph src/domain/processor/stage_graph.py
ProcessingContext src/domain/processor/stages/processing_context.py
Events src/domain/model/events/
Actions src/domain/model/actions/
Exceptions src/domain/exceptions/

Processor Implementations

Processor File Path
LaunchDetectionProcessor src/domain/processor/launch_detection_processor.py
SimpleLaunchDetectionProcessor src/domain/processor/simple_launch_detection_processor.py
RagGenerationProcessor src/domain/processor/rag_generation_processor.py
RAGMemecoinInsertionProcessor src/domain/processor/rag_memecoin_insertion_processor.py
BackupProcessingProcessor src/domain/processor/backup_processing_processor.py
EnhancedBackupProcessingProcessor src/domain/processor/enhanced_backup_processing_processor.py

Document Status: Complete Last Updated: October 15, 2025 Architecture Version: Event-driven domain framework with 30+ stages