Skip to content

Latest commit

 

History

History
331 lines (253 loc) · 13.3 KB

File metadata and controls

331 lines (253 loc) · 13.3 KB

Firebase Integration

📋 Document Summary

What This Document Covers:

  • SimpleFirebaseService singleton architecture
  • Firestore collections (wallets, launches, funding, trends)
  • Reactive Firebase watchers for real-time updates
  • Cache-first strategies for performance
  • Data models and schemas
  • Integration with orchestrators and workflows

Sections in This Document:

Related Documentation:

Context Tags: #firebase #firestore #realtime #cache #persistence #watchers


Overview

LaunchAgencyBot uses Firebase Firestore as the primary persistence layer for configuration data, wallet management, launch scheduling, and trend tracking. Firebase provides real-time updates via reactive watchers, serverless infrastructure, automatic scaling, and built-in caching.

Primary Use Cases:

  1. Wallet Management - Portfolio tracking, balance caching, funding history
  2. Launch Configuration - Pre-launch planning, scheduling, validation
  3. Funding Operations - Scheduled CEX funding, execution tracking
  4. Trend Tracking - Real-time trend data with reactive watchers
  5. Session Management - Generation sessions, progress tracking

Architecture: Web UI/Workflows → Orchestrators → SimpleFirebaseService (singleton) → Firestore, with reactive watchers providing real-time cache updates.


Service Architecture

Location: src/firebase/simple_firebase_service.py

Singleton Pattern

Purpose: Prevent duplicate Firebase connections and resource waste via standard Python singleton using __new__ and _initialized flag.

Benefits:

  • Single connection - All orchestrators share one Firestore client
  • Resource efficiency - No duplicate connections
  • Watcher management - Centralized watcher lifecycle
  • Consistent state - Shared cache across components

Core Methods

SimpleFirebaseService provides standard Firestore operations:

Method Parameters Returns Description
get_collection(collection_name) str CollectionReference Get collection reference for queries/watchers
create_document(collection_name, document_id, data) str, str, dict None Create document with specific ID
get_document(collection_name, document_id) str, str dict | None Read document, returns None if not exists
update_document(collection_name, document_id, data) str, str, dict None Update existing document fields
delete_document(collection_name, document_id) str, str None Delete document permanently
query_collection(collection_name, filters, limit, order_by) str, List[Tuple], int, str List[dict] Query with filters, ordering, and limit

Query Filters: List of tuples (field, operator, value) where operator is ==, !=, <, <=, >, >=, in, array_contains.

Example Usage:

# Get all FUNDED wallets, sorted by creation date, limit 10
wallets = firebase.query_collection(
    collection_name="wallets",
    filters=[("status", "==", "FUNDED")],
    order_by="created_at",
    limit=10
)

Firestore Collections

Collection Overview

Collection Purpose Key Indexes
wallets Wallet configurations, balances, portfolio status, created_at
launch_configurations Pre-launch planning and scheduling status, launch_type, created_at
funding_operations Scheduled SOL funding with execution tracking status, scheduled_timestamp, wallet_address
potential_trends, confirmed_trends Real-time trend tracking with reactive watchers memecoin_count, created_at, updated_at
generation_sessions AI generation progress and results status, started_at

Unified Schema Pattern

Common Fields (all collections):

  • _id: Primary identifier (wallet address, UUID6, or auto-generated)
  • created_at: Unix timestamp of creation
  • updated_at: Unix timestamp of last modification (where applicable)
  • status: Current state (varies by collection)

Collection-Specific Fields:

Collection Key Fields Notes
wallets name, sol_balance, last_refresh, portfolio, funding_history Status: FRESH, UNFUNDED, FUNDED, INACTIVE
launch_configurations launch_name, launch_type, dev_wallet_address, token_name, token_symbol, metadata, meme_uuid, scheduled_for Status: DRAFT, READY, SCHEDULED, COMPLETED, FAILED
funding_operations wallet_address, sol_amount, scheduled_timestamp, executed_at, transaction_signature, error_message Status: PENDING, EXECUTING, COMPLETED, FAILED, ARCHIVED
potential_trends, confirmed_trends trend_name, trend_description, memecoin_count, memecoins[] Each memecoin: token_address, token_name, ticker, image_url, creation_timestamp
generation_sessions progress, current_stage, stage_description, estimated_remaining_seconds, request, results[], started_at, completed_at Status: queued, processing, completed, failed

Example Document (wallets collection):

{
    "_id": "9xQe8ZF...",
    "name": "Dev Wallet 1",
    "status": "FUNDED",
    "sol_balance": 2.45,
    "last_refresh": 1697500000,
    "portfolio": {"tokens": [...], "nfts": []},
    "funding_history": [{"amount": 0.5, "timestamp": 1697400000, "tx_signature": "4hXF..."}],
    "created_at": 1697300000
}

Usage by Orchestrator:

  • Wallets: WalletsOrchestrator (cache-first), FundingOrchestrator (validation), LaunchConfigurationOrchestrator (wallet selection)
  • Launch Configurations: LaunchConfigurationOrchestrator (CRUD), Web UI (planning interface)
  • Funding Operations: FundingOrchestrator (scheduled execution, retry), Web UI (management)
  • Trends: TrendOrchestrator (reactive watchers, in-memory cache), Web UI (real-time display)
  • Generation Sessions: GenerationOrchestrator (session management, progress tracking), Web UI (progress display)

Reactive Watchers

Purpose: Real-time data synchronization with in-memory caching

Implementation Pattern

Watchers provide automatic cache updates when Firebase data changes:

class TrendOrchestrator:
    def _setup_firebase_watchers(self):
        collection_ref = self.firebase.get_collection("potential_trends")
        self.firebase.watch_collection(
            collection_ref=collection_ref,
            on_snapshot=self._on_trends_update,
            watcher_id="potential_trends_watcher"
        )

    def _on_trends_update(self, snapshot):
        logger.info(f"📊 Trends updated: {len(snapshot)} docs")
        self.cache["potential"] = [doc.to_dict() for doc in snapshot]
        self._update_statistics()

Active Watchers:

Orchestrator Collection Watcher ID Cache Key Post-Update Action
TrendOrchestrator potential_trends potential_trends_watcher cache["potential"] Recalculate statistics
TrendOrchestrator confirmed_trends confirmed_trends_watcher cache["confirmed"] Recalculate statistics

Benefits

  • Real-Time Sync: Firebase changes → instant cache updates → immediate UI reflection
  • Performance: Cache reads ~1-5ms (in-memory), Firebase updates ~100-300ms (network + trigger), total UI latency ~100-300ms
  • No Polling: Lower latency, reduced API calls
  • Use Cases: Trends dashboard (real-time counts), wallet balances (live updates), launch status (execution tracking)

Cache Strategies

Cache-First Pattern

Strategy: Always read from Firebase cache (100-300ms), refresh from blockchain on demand (2000-4000ms).

Flow:

  1. Fast Read: get_wallets() returns Firebase cached data immediately (20x faster than blockchain)
  2. Refresh: refresh_wallet(address) queries Solana blockchain, updates Firebase cache, returns fresh data
  3. User Control: Explicit refresh only when needed to minimize expensive RPC calls

Performance Benefits:

  • Cache reads: ~100-300ms (Firebase)
  • Blockchain queries: ~2000-4000ms (Solana RPC)
  • Speedup: 20x faster for cached data
  • RPC load: Reduced by 95% (only on-demand refreshes)

Implementation: WalletsOrchestrator uses query_collection("wallets") for reads, update_document() after blockchain refresh with last_refresh timestamp.


Data Models

Pydantic Models for Type Safety

from pydantic import BaseModel, Field
from typing import List, Optional

class Wallet(BaseModel):
    """Wallet data model"""
    wallet_address: str = Field(..., description="Solana public key")
    name: str
    status: str  # FRESH, UNFUNDED, FUNDED, INACTIVE
    sol_balance: float
    last_refresh: int  # Unix timestamp
    portfolio: dict
    funding_history: List[dict]
    created_at: int

class LaunchConfiguration(BaseModel):
    """Launch configuration data model"""
    launch_id: str
    launch_name: str
    launch_type: str  # DRAFT, COMPLETE
    status: str  # READY, SCHEDULED, COMPLETED, FAILED
    dev_wallet_address: str
    dev_sol_amount: float
    token_name: Optional[str] = None
    token_symbol: Optional[str] = None
    metadata: Optional[dict] = None
    meme_uuid: Optional[str] = None
    created_at: int
    updated_at: int
    scheduled_for: Optional[int] = None

class FundingOperation(BaseModel):
    """Funding operation data model"""
    funding_id: str
    wallet_address: str
    sol_amount: float
    status: str  # PENDING, EXECUTING, COMPLETED, FAILED, ARCHIVED
    scheduled_timestamp: int
    created_at: int
    executed_at: Optional[int] = None
    transaction_signature: Optional[str] = None
    error_message: Optional[str] = None

Orchestrator Integration

Shared Firebase Service (Registry Injection)

Pattern: Registry creates singleton Firebase service, injects into all orchestrators

class OrchestratorRegistry:
    async def initialize_all_orchestrators(self):
        # Create singletons
        firebase_service = SimpleFirebaseService(credentials_path=os.getenv("FIREBASE_CREDENTIALS"))
        solana_service = SimpleSolanaService(rpc_url=os.getenv("SOLANA_RPC_URL"))

        # Inject into orchestrators
        self.wallets_orchestrator = WalletsOrchestrator(firebase_service, solana_service)
        # Similar injection for funding, launch_config, trend orchestrators

Orchestrator Dependencies:

Orchestrator Firebase Solana Other
WalletsOrchestrator -
FundingOrchestrator -
LaunchConfigurationOrchestrator - -
TrendOrchestrator - -
GenerationOrchestrator - RAG service

Orchestrator Usage

Orchestrators use Firebase for CRUD operations and Solana for blockchain verification:

class WalletsOrchestrator:
    async def create_wallet(self, wallet_address: str, name: str) -> Wallet:
        # Blockchain verification
        balance = await self.solana.get_balance(wallet_address)

        # Firebase persistence
        self.firebase.create_document("wallets", wallet_address, {
            "name": name, "status": "FRESH", "sol_balance": balance,
            "created_at": int(time.time()), ...
        })

Cross-Service Flow: Orchestrator receives request → validates with Solana → persists to Firebase → returns typed model


Best Practices

  1. Use Singleton Service - Get service from registry, never create new instances (causes duplicate connections)
  2. Always Use Pydantic Models - Type-safe models for Firebase data, avoid raw dicts
  3. Handle Missing Documents - Always check for None before accessing, raise clear errors for missing data
  4. Use Reactive Watchers - Setup watchers for automatic updates instead of polling Firebase repeatedly
  5. Cache Expensive Operations - Cache blockchain data in Firebase for 20x faster reads on subsequent requests

Implementation References

Key Files

Component File Path
Firebase Service src/firebase/simple_firebase_service.py
Wallets Orchestrator src/orchestrators/wallets/wallets_orchestrator.py
Funding Orchestrator src/orchestrators/funding/funding_orchestrator.py
Launch Config Orchestrator src/orchestrators/launch_config/launch_config_orchestrator.py
Trend Orchestrator src/orchestrators/trends/trend_orchestrator.py
Orchestrator Registry src/orchestrators/registry.py

Document Status: Complete Last Updated: October 16, 2025 Implementation: Singleton Firebase service with reactive watchers and cache-first strategies