Skip to content

Latest commit

 

History

History
314 lines (259 loc) · 14.7 KB

File metadata and controls

314 lines (259 loc) · 14.7 KB

Thread Management System for LangGraph State Persistence

Created: 2025-10-28
Status: ✅ Implemented
Based on: LangGraph Official Documentation

📚 Overview

This document describes the Thread Management System implemented for managing stateful conversations in LangGraph workflows, following official LangGraph best practices.

🎯 Key Concepts from LangGraph Documentation

1. Thread IDs (thread_id)

  • Purpose: Unique identifier for a specific execution flow or session
  • Scope: All interactions with the same thread_id maintain continuity
  • Format: Passed in config: config = {"configurable": {"thread_id": "session_123"}}
  • Isolation: Each thread has isolated state

2. Checkpointers

  • Purpose: Automatically save state after each node execution
  • Benefits:
    • Enable resuming workflows from specific points
    • Allow debugging and inspection of intermediate states
    • Required for state persistence
  • Types: MemorySaver (in-memory), SqliteSaver (persistent), etc.

3. State Management

  • Short-Term Memory: Managed within state and checkpoints (session-specific)
  • Long-Term Memory: Stored externally in databases/vector stores (cross-session)
  • State Scoping: State is scoped to a specific thread_id

🏗️ Implementation

Architecture

┌──────────────────────────────────────────────────────────────┐
│                     ThreadManager                             │
├──────────────────────────────────────────────────────────────┤
│  • create_new_session()  - Generate unique thread_id         │
│  • get_current_config()  - Get LangGraph config with thread  │
│  • update_activity()     - Track message count & last active │
│  • load_session()        - Resume previous session           │
│  • get_session_history() - View all sessions                 │
└──────────────────────────────────────────────────────────────┘
                            │
        ┌───────────────────┴───────────────────┐
        │                                       │
┌───────▼──────────┐                  ┌────────▼────────────┐
│   RAG Chat UI    │                  │  Development UI     │
├──────────────────┤                  ├─────────────────────┤
│ • rag_thread_    │                  │ • dev_thread_       │
│   manager        │                  │   manager           │
│ • Session view   │                  │ • Session view      │
│ • History UI     │                  │ • History UI        │
└──────────────────┘                  └─────────────────────┘
        │                                       │
        │                                       │
        ▼                                       ▼
┌────────────────────┐              ┌──────────────────────┐
│ RAGSwarmCoordinator│              │ DevelopmentContext   │
│                    │              │ Agent                │
│ config = {         │              │                      │
│   "configurable": {│              │ config = {           │
│     "thread_id":   │              │   "configurable": {  │
│     "rag_xxx"      │              │     "thread_id":     │
│   }                │              │     "dev_xxx"        │
│ }                  │              │   }                  │
└────────────────────┘              │ }                    │
                                    └──────────────────────┘

Core Components

1. ThreadManager Class (utils/thread_manager.py)

from utils.thread_manager import ThreadManager

# Initialize for RAG chat
manager = ThreadManager(session_type="rag", prefix="rag_chat")

# Get config for LangGraph
config = manager.get_current_config()
# Returns: {"configurable": {"thread_id": "rag_chat_a1b2c3d4"}}

# Execute graph with persistent state
result = await graph.ainvoke({"messages": [...]}, config=config)

# Track activity
manager.update_activity(message_count_delta=1)

# Create new session
manager.create_new_session()

# Load previous session
manager.load_session("rag_chat_a1b2c3d4")

2. ThreadSession Dataclass

@dataclass
class ThreadSession:
    thread_id: str
    created_at: datetime
    last_active: datetime
    message_count: int = 0
    session_type: str = "chat"
    metadata: Dict = field(default_factory=dict)

UI Integration

RAG Chat UI (apps/rag_management_app.py)

Features:

  • ✅ Persistent thread_id across all messages in a conversation
  • ✅ Session history view
  • ✅ Load previous sessions
  • ✅ Activity tracking (message count, last active time)
  • ✅ Session statistics

UI Elements:

┌─────────────────────────────────────────────────────────────┐
│ 🔗 Session Management                                       │
├─────────────────────────────────────────────────────────────┤
│ [Thread: rag_chat_a1b2c3d4] [🆕 New] [Sessions: 3] [📜]    │
└─────────────────────────────────────────────────────────────┘

Implementation:

# Initialize ThreadManager
if 'rag_thread_manager' not in st.session_state:
    from utils.thread_manager import create_thread_manager
    st.session_state.rag_thread_manager = create_thread_manager(
        session_type="rag",
        prefix="rag_chat"
    )

# Use in RAG Swarm execution
config = st.session_state.rag_thread_manager.get_current_config()
result = asyncio.run(swarm.execute(user_input, config=config))

# Update activity after message
st.session_state.rag_thread_manager.update_activity(message_count_delta=1)

Development Workflow UI (apps/streamlit_app.py)

Features:

  • ✅ Persistent development sessions with full context
  • ✅ Resume interrupted development workflows
  • ✅ Session history for iterative development
  • ✅ Activity tracking per development session

UI Elements:

┌─────────────────────────────────────────────────────────────┐
│ 🔗 Development Session                                      │
├─────────────────────────────────────────────────────────────┤
│ [Session: dev_session_x7y8z9] [🆕 New] [Sessions: 5] [📜] │
└─────────────────────────────────────────────────────────────┘

Implementation:

# Initialize ThreadManager for development
if 'dev_thread_manager' not in st.session_state:
    from utils.thread_manager import create_thread_manager
    st.session_state.dev_thread_manager = create_thread_manager(
        session_type="development",
        prefix="dev_session"
    )

# Use in development workflow
config = st.session_state.dev_thread_manager.get_current_config()
result = asyncio.run(graph.ainvoke(initial_state, config=config))

# Update activity
st.session_state.dev_thread_manager.update_activity(message_count_delta=1)

🔍 How It Works

Conversation Flow with Thread Management

User Session Start
│
├─> ThreadManager.create_new_session()
│   └─> Generate unique thread_id: "rag_chat_a1b2c3d4"
│
├─> User Message 1: "How does MCP work?"
│   ├─> config = manager.get_current_config()
│   │   └─> {"configurable": {"thread_id": "rag_chat_a1b2c3d4"}}
│   ├─> graph.ainvoke({"messages": [msg]}, config=config)
│   │   └─> LangGraph saves state to checkpointer with thread_id
│   └─> manager.update_activity(1)
│
├─> User Message 2: "Can you show an example?"
│   ├─> SAME config (same thread_id!)
│   ├─> graph.ainvoke({"messages": [msg]}, config=config)
│   │   └─> LangGraph loads previous state from checkpointer
│   │   └─> Agent has full context of Message 1!
│   └─> manager.update_activity(1)
│
└─> New Session Button Clicked
    └─> manager.create_new_session()
        └─> New thread_id: "rag_chat_b2c3d4e5"
        └─> Fresh state, no context from previous session

State Persistence Flow

┌────────────────────────────────────────────────────────────┐
│                    Message 1                                │
├────────────────────────────────────────────────────────────┤
│ thread_id: "rag_chat_a1b2c3d4"                            │
│ state: {"messages": [{"role": "user", "content": "..."}]} │
└─────────────────┬──────────────────────────────────────────┘
                  │
                  ▼
         ┌────────────────────┐
         │  MemorySaver       │ (Checkpointer)
         │  (Checkpointer)    │
         ├────────────────────┤
         │ thread_id: state   │
         │ "rag_chat_a1b2c3d4"│ ◄─── State saved here
         └────────┬───────────┘
                  │
                  │ (State persists)
                  │
                  ▼
┌────────────────────────────────────────────────────────────┐
│                    Message 2                                │
├────────────────────────────────────────────────────────────┤
│ thread_id: "rag_chat_a1b2c3d4" (SAME!)                    │
│ state: LOADED from checkpointer                            │
│   {"messages": [                                           │
│     {"role": "user", "content": "Message 1"},              │
│     {"role": "assistant", "content": "Response 1"},        │
│     {"role": "user", "content": "Message 2"}  ◄─── New    │
│   ]}                                                       │
└────────────────────────────────────────────────────────────┘

🎨 UI Examples

Session History Dropdown

┌────────────────────────────────────────────────────────────┐
│ 📜 Session History                                         │
├────────────────────────────────────────────────────────────┤
│ ✅ rag_chat_a1b2c3d4    │  5 msgs • 14:23  │          │
│    rag_chat_x7y8z9w0    │  3 msgs • 14:10  │  [Load]  │
│    rag_chat_m4n5o6p7    │  8 msgs • 13:45  │  [Load]  │
└────────────────────────────────────────────────────────────┘

Session Statistics

┌───────────────────────┐
│ Sessions: 3           │
│ Total Messages: 16    │
│ Current: rag_chat_a1b │
└───────────────────────┘

✅ Best Practices Followed

Based on LangGraph documentation:

  1. ✅ Consistent thread_id Usage: Each session has a unique, persistent thread_id
  2. ✅ Efficient State Design: State includes only necessary information (messages, context)
  3. ✅ Clear Memory Strategy:
    • Short-term: Thread state in checkpointer
    • Long-term: Documents in Qdrant vector store
  4. ✅ Checkpointer Integration: Always use checkpointer for stateful workflows
  5. ✅ Config Format: Standard LangGraph format: {"configurable": {"thread_id": "..."}}

🚀 Benefits

For Users

  • Continuity: Conversations maintain context across messages
  • History: View and resume previous sessions
  • Isolation: Each session is independent
  • Transparency: Clear visibility of current session

For Developers

  • Standard Pattern: Follows LangGraph best practices
  • Reusable: ThreadManager works for any workflow
  • Debuggable: State snapshots for debugging
  • Scalable: Easy to add new session types

📖 References

🔮 Future Enhancements

  • Persistent Storage: Use SqliteSaver for long-term session persistence
  • Session Metadata: Add tags, descriptions, and custom metadata
  • Session Export: Export/import sessions for sharing
  • Session Analytics: Track session metrics and patterns
  • Multi-User Support: User-specific session management