Skip to content

Latest commit

 

History

History
377 lines (296 loc) · 13.6 KB

File metadata and controls

377 lines (296 loc) · 13.6 KB

📊 CSV Launch Detection Workflow

📋 Document Summary

What This Document Covers:

  • Batch processing of historical token launches from CSV files
  • 5-stage validation pipeline (same as live detection)
  • Automatic row deletion after processing
  • Configurable batch sizes for memory efficiency
  • File-based output ready for RAG insertion

Sections in This Document:

Related Documentation:

Context Tags: #workflow #batch-processing #csv #launch-detection #historical-data


Batch processing variant of live launch detection for analyzing historical token launches from CSV files

📋 Overview

The CSV Launch Detection Workflow is a production-ready batch processor that analyzes historical token launch data from CSV files using the same pipeline as live detection.

This workflow provides a simplified approach to processing large datasets of token launches without requiring real-time WebSocket connections or AI processing. It's ideal for:

  • Historical analysis: Process thousands of past launches
  • Data migration: Import legacy token data into the system
  • Testing & validation: Test detection logic on known datasets
  • Performance benchmarking: Measure throughput on batch data

🎯 Core Innovation: Batch CSV Processing

Traditional Approach: Manual parsing → Custom processing → Database insertion CSV Workflow Approach: CSV file → Standard pipeline → File storage → Ready for AI processing

Key Benefits:

  • Same 5-stage validation pipeline as live detection
  • Automatic row deletion after successful processing
  • Configurable batch sizes for memory efficiency
  • Cross-platform signal handling
  • File-based output ready for RAG insertion workflow

🏗️ Architecture

Component Architecture

┌─────────────────────┐    ┌──────────────────────────┐    ┌─────────────────────┐
│ CSVTokenInputSource │───▶│ SimpleLaunchDetection    │───▶│ SimpleFileExecutor  │
│                     │    │ Processor                │    │                     │
│ • Batch reading     │    │                          │    │ • JSON + JPG pairs  │
│ • Row deletion      │    │ • 5-stage validation     │    │ • 512x512px images  │
│ • Progress tracking │    │ • Metadata extraction    │    │ • pending/ directory│
│ • Error handling    │    │ • Image processing       │    │ • Atomic writes     │
└─────────────────────┘    └──────────────────────────┘    └─────────────────────┘

Core Assumption: CSV format matches pump.fun token schema (mint_address, name, symbol, uri, etc.)

Key Innovation: Reuses production validation logic while optimizing for batch throughput

Data Flow

CSV File (token_dataset.csv)
    ↓
Read batch (100 rows default)
    ↓
Parse token data
    ↓
5-stage validation pipeline
    ↓
Convert image to 512x512 JPEG
    ↓
Save as JSON + JPG pair in pending/
    ↓
Delete processed rows from CSV
    ↓
Repeat until CSV empty

🔄 5-Stage Processing Pipeline

Stage 1: TokenDataExtractionStage (< 1ms)

  • Process: Extract token metadata from CSV row
  • Input: Raw CSV row dict
  • Output: Parsed token data (mint_address, name, symbol, uri, etc.)
  • Failure: Missing required fields → Skip row
  • Context updates: token_data, mint_address

Stage 2: MetadataFetchStage (< 100ms)

  • Process: Fetch metadata JSON from IPFS/Arweave URI
  • Input: Token data with metadata URI
  • Output: Complete metadata (name, symbol, image_url)
  • Failure: Network errors → Retry with backoff, eventual skip
  • Context updates: metadata, image_url

Stage 3: ImageDownloadStage (< 500ms)

  • Process: Download image from metadata URL
  • Input: Image URL from metadata
  • Output: Raw image bytes
  • Failure: Download errors → Retry, eventual skip
  • Context updates: image_data

Stage 4: ImageProcessingStage (< 50ms)

  • Process: Convert image to 512x512 JPEG, 90% quality
  • Input: Raw image bytes
  • Output: Standardized JPEG bytes
  • Failure: Corrupt images → Skip token
  • Context updates: processed_image, image_format

Stage 5: SimpleFileStorageStage (< 10ms)

  • Process: Save JSON + JPG pair atomically
  • Input: Complete token data + processed image
  • Output: Files written to pending/ directory
  • Failure: I/O errors → Retry, eventual skip
  • Context updates: stored_files, file_paths

Total Pipeline Time: ~660ms per token (dominated by network I/O)

🚀 Running the Workflow

Prerequisites

# Ensure CSV file exists (default location)
ls res/token_dataset.csv

# CSV format required columns:
# mint_address, name, symbol, uri, creator, timestamp

Standalone Execution

# Default: res/token_dataset.csv with batch size 100
PYTHONPATH=. python src/workflows/csv_launch_detection_workflow.py

# Custom CSV file
PYTHONPATH=. python src/workflows/csv_launch_detection_workflow.py \
    --csv-file path/to/custom.csv

# Custom batch size (for memory constraints)
PYTHONPATH=. python src/workflows/csv_launch_detection_workflow.py \
    --batch-size 50

Expected Output

🚀 CSV Token Launch Detection Workflow Starting...
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📁 CSV File: res/token_dataset.csv
📦 Batch size: 100 rows
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 Processing historical token launches from CSV...
📁 File storage enabled - memecoins will be saved as JSON + JPG pairs
🖼️ Images processed to 512x512px JPEG with 90% quality
💾 Target directory: res/memecoins/pending/
🗑️ Processed rows will be deleted from CSV file
❌ AI processing disabled - no tags, captions, or embeddings
❌ Database storage disabled - only file operations
📊 Complete token data will be stored as files for later processing
🛑 Press Ctrl+C to stop
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

✅ Processed token: PEPE (addr: 4xjS...)
✅ Processed token: WOJAK (addr: 7mnK...)
...

Programmatic Usage

from src.workflows.csv_launch_detection_workflow import CSVLaunchDetectionWorkflow

# Create workflow
workflow = CSVLaunchDetectionWorkflow(
    csv_file="res/token_dataset.csv",
    batch_size=100
)

# Initialize and run
await workflow.initialize()
await workflow.start()

# Graceful shutdown
await workflow.stop()

📁 CSV Input Format

Required CSV Columns

mint_address,name,symbol,uri,creator,timestamp
4xjSw2EK...,Pepe the Frog,PEPE,https://ipfs.io/ipfs/Qm...,8vN3k...,1699123456
7mnKpQ5L...,Wojak,WOJAK,https://arweave.net/abc123,2uY7m...,1699123457

Field Descriptions

Column Type Required Description
mint_address string Yes Solana token address (unique ID)
name string Yes Token name
symbol string Yes Token ticker symbol
uri string Yes Metadata URI (IPFS/Arweave)
creator string Yes Creator wallet address
timestamp int Yes Unix timestamp of creation

Note: CSV rows are deleted after successful processing to track progress

📊 Output Format

File Structure

res/memecoins/pending/
├── 4xjSw2EK....json         # Token metadata
├── 4xjSw2EK....jpg          # Processed image (512x512)
├── 7mnKpQ5L....json
├── 7mnKpQ5L....jpg
└── ...

JSON Metadata Example

{
  "token_address": "4xjSw2EK...",
  "token_name": "Pepe the Frog",
  "ticker": "PEPE",
  "description": "The original meme frog",
  "created_at": 1699123456,
  "image_uri": "https://ipfs.io/ipfs/Qm...",
  "creator": "8vN3k...",
  "pending_processing": true
}

Image Processing

  • Format: JPEG
  • Dimensions: 512x512px
  • Quality: 90%
  • Color space: RGB
  • File naming: {token_address}.jpg

🔧 Configuration

Workflow Configuration

# Default configuration
workflow = CSVLaunchDetectionWorkflow(
    csv_file="res/token_dataset.csv",
    batch_size=100
)

# Memory-constrained configuration
workflow = CSVLaunchDetectionWorkflow(
    csv_file="res/token_dataset.csv",
    batch_size=25  # Reduced for low-memory systems
)

# High-throughput configuration
workflow = CSVLaunchDetectionWorkflow(
    csv_file="res/token_dataset.csv",
    batch_size=200  # Increased for fast systems
)

Environment Variables

Variable Required Description
None - No external APIs needed

Note: This workflow does not require API keys (no AI processing)

🛡️ Error Handling

Critical Errors

  • CSV file not found: Immediate exit with error message
  • Invalid CSV format: Skip malformed rows, continue processing
  • Disk full: Graceful failure with clear error message

Retry Mechanisms

  • Network errors: 3 retries with exponential backoff (1s, 2s, 4s)
  • IPFS/Arweave timeouts: 10-second timeout per request
  • Image download failures: Skip token after retries

Row Deletion Policy

  • Success: Row deleted from CSV immediately after file write
  • Failure: Row retained in CSV for retry in next run
  • Partial processing: Row retained until complete success

🎯 Use Cases

Use Case 1: Historical Token Analysis

Process thousands of past launches from pump.fun to build training datasets for AI models.

Workflow: CSV export from pump.fun → CSV workflow → Pending directory → RAG insertion workflow

Use Case 2: Data Migration

Import legacy token data from other platforms into LaunchAgencyBot format.

Workflow: External data → CSV conversion → CSV workflow → Standardized format

🔍 What's NOT Included (Simplified Design)

This workflow intentionally excludes:

  • AI Processing: No tags, captions, or embeddings (use RAG insertion workflow)
  • Database Storage: No vector DB insertion (files ready for insertion workflow)
  • Real-time Processing: Batch-only, not suitable for live monitoring
  • WebSocket Connections: File-based input only
  • CLIP Embeddings: Delegated to specialized RAG workflow

Design Philosophy: Do one thing well—batch CSV processing with standard validation

🔗 Integration with Other Workflows

Next Step: RAG Insertion Workflow

After CSV processing completes, run the RAG insertion workflow to add AI enhancements:

# Step 1: Process CSV
PYTHONPATH=. python src/workflows/csv_launch_detection_workflow.py

# Step 2: AI processing + vector DB insertion
PYTHONPATH=. python src/workflows/rag_memecoin_insertion_workflow.py

Flow: CSV Workflow → pending/ directory → RAG Workflow → approved/ directory → Vector DB

🔍 Troubleshooting

Common Issues

Issue Cause Solution
CSV file not found Incorrect path Check file path, use absolute path if needed
Out of memory Batch size too large Reduce batch_size to 25 or 50
Network timeouts Slow IPFS/Arweave Wait and retry, or skip problematic tokens
Disk full Too many pending files Process pending/ with RAG workflow to clear
Permission errors Write access denied Check permissions on res/memecoins/pending/

Design Decisions & Tradeoffs

Decision: No AI Processing in CSV Workflow

  • Decision: Delegate AI processing to RAG insertion workflow
  • Tradeoff: Two-step process instead of one
  • Reasoning: Separation of concerns—CSV workflow focuses on batch I/O, RAG workflow handles AI complexity

Decision: Delete Rows After Processing

  • Decision: Remove processed rows from CSV file immediately
  • Tradeoff: Cannot easily reprocess without backup
  • Reasoning: Enables progress tracking and partial recovery from interruptions

Decision: Batch Reading with Configurable Size

  • Decision: Process CSV in configurable batches (default 100)
  • Tradeoff: Memory overhead vs. throughput
  • Reasoning: Balance between memory efficiency and processing speed

📚 Related Documentation


Status: Production-ready Version: 1.0.0 Last Updated: 2025-10-12