Skip to content

Latest commit

 

History

History
453 lines (354 loc) · 14.2 KB

File metadata and controls

453 lines (354 loc) · 14.2 KB

RAG Memecoin Update Workflow

📋 Document Summary

What This Document Covers:

  • Full CLIP embedding regeneration workflow (all 5 embeddings: image + 4 captions)
  • Atomic 5-collection ChromaDB updates
  • Edit proposal acceptance workflow integration
  • Complete validation and error handling
  • MemeStorageOrchestrator integration

Sections in This Document:

Related Documentation:

Context Tags: #workflow #vector-db #embedding #atomic-updates #edit-workflow


Complete vector database update pipeline that regenerates ALL 5 CLIP embeddings and atomically updates all 5 ChromaDB collections after memecoin metadata edits.

Overview

The RAG Memecoin Update Workflow provides atomic vector database updates through:

  • Full embedding regeneration - Regenerates all 5 CLIP embeddings (image + 4 captions)
  • Atomic updates - Updates all 5 collections in single transaction
  • Complete pipeline - Validation → Embedding → Atomic execution
  • Consistency - Uses same MemecoinEmbeddingStage as insertion workflow

File: src/workflows/rag_memecoin_update_workflow.py

Architecture

Workflow Pattern: InputSource → Processor → Executor

MemoryMemecoinInputSource → RAGMemecoinUpdateProcessor → MemecoinUpdateExecutor
        ↓                           ↓                             ↓
Single memecoin          Validation + Embedding         Atomic vector DB update
  from memory            (all 5 embeddings)             (5 collections)

Key Design: This workflow receives edited memecoin data (from edit analysis workflow) and updates the vector database with fresh CLIP embeddings for all 5 collections.

Processing Pipeline

RAGMemecoinUpdateProcessor - 2-stage update pipeline:

  1. MemecoinUpdateValidationStage:

    • Validates edited memecoin has required fields
    • Checks caption_structured has 4-part structure
    • Ensures token_address matches target
    • Verifies metadata completeness
    • Extracts data to context.results for embedding stage
  2. MemecoinEmbeddingStage:

    • Generates ALL 5 CLIP embeddings in parallel:
      • Image embedding (from Base64 image data)
      • Entity caption embedding
      • Context caption embedding
      • Visual caption embedding (if present)
      • Emotions caption embedding (if present)
    • Same stage used in insertion workflow for consistency

Executor: MemecoinUpdateExecutor - Atomic 5-collection update

5-Collection Multi-Embedding Architecture

Collection Structure

5 Collections: All 5 collections updated with fresh CLIP embeddings - meme_image_embeddings, entity_caption_embeddings, context_caption_embeddings, visual_caption_embeddings, emotions_caption_embeddings

Update Strategy

Updates: All metadata (token_name, ticker, description, tags), caption text, ALL 5 CLIP embeddings regenerated from current data | Preserves: token_address (document ID only)

Usage

Integration

from src.workflows.rag_memecoin_update_workflow import RAGMemecoinUpdateWorkflow

# Initialize and run
workflow = RAGMemecoinUpdateWorkflow(clip_service, vector_store)
success, message = await workflow.run(updated_memecoin_data, token_address)
await workflow.cleanup()

Data Models

Input: Updated Memecoin Data

{
    "token_name": str,              # Updated name
    "ticker": str,                  # Updated ticker
    "description": str,             # Updated description
    "token_address": str,           # UNCHANGED (used as ID)
    "tags": List[str],              # Updated tags
    "image_base64": str,            # UNCHANGED image
    "image_mime_type": str,         # Image MIME type
    "caption_structured": {         # UPDATED captions
        "entity": str,              # NEW entity caption → NEW embedding
        "context": str,             # NEW context caption → NEW embedding
        "visual": str,              # NEW visual caption → NEW embedding
        "emotions": str             # NEW emotions caption → NEW embedding
    }
    # Note: All 5 embeddings will be regenerated during processing
}

Action: MemecoinUpdateAction

class MemecoinUpdateAction(Action):
    token_address: str              # Document ID for vector DB
    token_name: str
    ticker: str
    description: str
    tags: List[str]

    # Embeddings (ALL 5 REGENERATED)
    image_embedding: List[float]    # REGENERATED from image data
    entity_embedding: List[float]   # REGENERATED from caption
    context_embedding: List[float]  # REGENERATED from caption
    visual_embedding: Optional[List[float]]   # REGENERATED if caption present
    emotions_embedding: Optional[List[float]] # REGENERATED if caption present

    # Caption text
    caption_structured: Dict[str, str]

Processing Stages

1. MemecoinUpdateValidationStage

Purpose: Ensure edited memecoin has complete data

Validations:

# Required fieldstoken_name (non-empty)
✅ ticker (non-empty)
✅ description (non-empty)
✅ token_address (non-empty)
✅ caption_structured (dict)

# Caption structurecaption_structured["entity"] (required)
✅ caption_structured["context"] (required)
✅ caption_structured["visual"] (optional)
✅ caption_structured["emotions"] (optional)

Early Termination:

  • Missing required fields → Stop processing
  • Invalid caption structure → Stop processing
  • Empty token_address → Stop processing

2. MemecoinEmbeddingStage

Purpose: Regenerate ALL 5 CLIP embeddings in parallel

Process:

# Retrieve memecoin entry and caption structure from context
memecoin_entry = context.results.get("memecoin_entry")
caption_structured = context.results.get("caption_structured")

# Generate ALL 5 embeddings in PARALLEL using asyncio.gather()
(
    image_embedding,       # From Base64 image
    entity_embedding,      # From entity caption
    context_embedding,     # From context caption
    visual_embedding,      # From visual caption (if present)
    emotions_embedding,    # From emotions caption (if present)
) = await asyncio.gather(
    clip_service.generate_image_embedding(memecoin_entry.image_base64),
    clip_service.generate_text_embedding(caption_structured["entity"]),
    clip_service.generate_text_embedding(caption_structured["context"]),
    clip_service.generate_text_embedding(caption_structured.get("visual", "")),
    clip_service.generate_text_embedding(caption_structured.get("emotions", "")),
)

# All embeddings stored in memecoin_entry
memecoin_entry.image_embedding = image_embedding
memecoin_entry.entity_embedding = entity_embedding
memecoin_entry.context_embedding = context_embedding
memecoin_entry.visual_embedding = visual_embedding
memecoin_entry.emotions_embedding = emotions_embedding

CLIP Service Call:

  • Model: openai/clip-vit-large-patch14
  • Endpoint: Replicate API
  • Output: 768-dimensional embedding vector

Executor: Atomic Vector DB Update

MemecoinUpdateExecutor

Purpose: Atomically update all 5 ChromaDB collections

Process:

# Step 1: Update meme_image_embeddings (image collection)
vector_store.update_image_collection(
    token_address=token_address,
    embedding=image_embedding,  # REGENERATED
    metadata={
        "token_name": token_name,
        "ticker": ticker,
        "description": description,
        "tags": tags,
        "caption_entity": caption_structured["entity"],
        "caption_context": caption_structured["context"],
        # ... other caption parts
    }
)

# Step 2: Update entity_caption_embeddings
vector_store.update_entity_collection(
    token_address=token_address,
    embedding=entity_embedding,  # REGENERATED
    metadata={...}
)

# Step 3: Update context_caption_embeddings
vector_store.update_context_collection(
    token_address=token_address,
    embedding=context_embedding,  # REGENERATED
    metadata={...}
)

# Step 4: Update visual_caption_embeddings (if present)
if visual_embedding:
    vector_store.update_visual_collection(...)

# Step 5: Update emotions_caption_embeddings (if present)
if emotions_embedding:
    vector_store.update_emotions_collection(...)

Atomicity:

  • All 5 collections updated in single transaction
  • Rollback on any failure (ChromaDB transaction semantics)
  • Consistent state guaranteed

Integration with Edit Analysis Workflow

Complete Edit Pipeline

1. User Feedback
   ↓
2. Edit Analysis Workflow (generate proposal)
   - LLM analyzes feedback
   - Regenerates affected fields
   - Returns edited MemecoinEntry
   ↓
3. Orchestrator caches proposal (5 min TTL)
   ↓
4. User accepts proposal
   ↓
5. Update Workflow (THIS WORKFLOW)
   - Validates edited data
   - Regenerates ALL 5 CLIP embeddings
   - Updates vector DB atomically
   ↓
6. Vector DB updated, frontend refreshes

Orchestrator Integration: accept_edit_proposal()

# In MemeStorageOrchestrator
async def accept_edit_proposal(
    self, proposal_uuid: str, token_address: str, edited_proposal: Optional[Dict] = None
) -> Tuple[bool, str]:

    # Step 1: Retrieve proposal from cache
    proposal_data = self._edit_proposals[proposal_uuid]

    # Step 2: Fetch original memecoin from vector DB
    memecoin_response = await self._database_service.get_memecoin_by_token_address(
        token_address
    )

    # Step 3: Merge proposal with original data
    updated_memecoin_data = {
        "token_name": proposal_data["token_name"],
        "ticker": proposal_data["ticker"],
        "description": proposal_data["description"],
        "tags": proposal_data["tags"],
        "caption_structured": proposal_data["caption_structured"],
        "image_base64": memecoin_response.image_data,
        "token_address": token_address
        # Note: All embeddings will be regenerated by MemecoinEmbeddingStage
    }

    # Step 4: Run UPDATE workflow
    workflow = RAGMemecoinUpdateWorkflow(
        clip_service=clip_service,
        vector_store=vector_store
    )

    success, message = await workflow.run(updated_memecoin_data, token_address)
    await workflow.cleanup()

    # Step 5: Remove proposal from cache
    del self._edit_proposals[proposal_uuid]

    return success, message

Error Handling

Validation Failures

# Missing required fields
context.should_continue_processing = False
context.termination_reason = "Missing required field: token_name"

# Invalid caption structure
context.should_continue_processing = False
context.termination_reason = "caption_structured must be a dictionary"

CLIP Service Errors

try:
    entity_embedding = await clip_service.generate_text_embedding(entity_text)
except Exception as e:
    logger.error(f"Failed to generate entity embedding: {e}")
    context.should_continue_processing = False
    context.termination_reason = f"Embedding generation failed: {e}"

Vector DB Update Errors

try:
    await executor.execute(update_action)
except Exception as e:
    logger.error(f"Vector DB update failed: {e}")
    # Rollback is automatic (ChromaDB transaction)
    return False, f"Update failed: {e}"

Configuration

CLIP Service

# src/res/config/clip_litellm.yaml
model_list:
  - model_name: "clip-vit-large-patch14"
    litellm_params:
      model: "replicate/openai/clip-vit-large-patch14"
      api_key: "os.environ/REPLICATE_API_KEY"

Vector Store

# ChromaDB configuration (managed by MemecoinVectorStore)
collections = [
    "meme_image_embeddings",
    "entity_caption_embeddings",
    "context_caption_embeddings",
    "visual_caption_embeddings",
    "emotions_caption_embeddings"
]

embedding_dimension = 768  # CLIP output size

Web UI Integration

API Endpoint: PATCH /api/database/memecoins/{token_address}

Request:

{
    "proposal_uuid": "550e8400-e29b-41d4-a716-446655440000",
    "edited_proposal": null  // Optional manual edits
}

Response:

{
    "status": 200,
    "success": true,
    "message": "Successfully updated ABC123..."
}

Frontend Flow

  1. User reviews edit proposal
  2. Optionally edits proposal fields manually
  3. Clicks "Accept" button
  4. Frontend sends PATCH request to base endpoint
  5. Backend runs update workflow:
    • Validates edited data
    • Regenerates all 5 CLIP embeddings
    • Updates vector DB atomically
  6. Frontend receives success response
  7. Database browser refreshes automatically

Testing

Test Pattern: Initialize workflow → provide edited_data with new captions → run workflow → assert success + verify vector DB updated with new token_name and caption embeddings regenerated.

Related Documentation

Key Takeaways

Atomic 5-Collection Update - All collections updated in single transaction ✅ Full Embedding Regeneration - ALL 5 CLIP embeddings regenerated fresh ✅ Parallel Processing - All embeddings generated in parallel for speed ✅ 2-Stage Pipeline - Validation → Embedding → Action ✅ Workflow Consistency - Uses same MemecoinEmbeddingStage as insertion workflow ✅ Orchestrator Integration - Seamless accept proposal workflow ✅ Error Handling - Automatic rollback on failure ✅ Performance - 2-3 seconds total execution time ✅ Web UI Support - Complete update interface


For the edit proposal generation workflow, see RAG Memecoin Edit Analysis Workflow