"The workflow should adapt to the task, not force every task through the same pipeline"
Different tasks require different levels of sophistication, HITL involvement, and agent combinations.
Example: "What is LangGraph?", "Who created Python?"
Workflow:
query_analyst → retrieval_specialist → writer → END
Agents: 3 agents HITL: Optional (only if human_in_loop=True and user wants control) Estimated Time: ~10-15 seconds
Characteristics:
- Single, clear question
- Factual answer expected
- Minimal context needed
- Skip re-ranking (use top results)
- Skip QA (writer handles quality)
Example: "Write a comprehensive guide on LangGraph state management"
Workflow:
query_analyst
→ [HITL: Approve research plan]
→ retrieval_specialist (multi-source, deep)
→ [HITL: Review sources]
→ re_ranker (comprehensive filtering)
→ [HITL: Approve context]
→ writer (long-form content)
→ [HITL: Review draft]
→ quality_assurance (thorough checks)
→ [HITL: Final approval]
→ END
Agents: All 5 agents HITL: All 5 checkpoints Estimated Time: 2-5 minutes (with human feedback)
Characteristics:
- Complex, multi-faceted topic
- Requires deep research
- Multiple sources needed
- Quality critical
- Iterative refinement expected
Example: "Explain how async/await works in Python"
Workflow:
query_analyst
→ retrieval_specialist (focus on tutorials, examples)
→ [HITL: Verify explanation approach]
→ re_ranker (prioritize clarity)
→ writer (educational formatting)
→ [HITL: Review explanation]
→ END
Agents: 4 agents (skip QA, writer handles quality) HITL: 2 checkpoints (after retrieval, after writer) Estimated Time: ~30-60 seconds
Characteristics:
- Educational purpose
- Needs examples and analogies
- Clarity over comprehensiveness
- Some back-and-forth expected
Example: "Show me how to implement a LangGraph checkpointer"
Workflow:
query_analyst (identify code requirements)
→ retrieval_specialist (code-focused search)
→ [HITL: Verify relevant code examples]
→ writer (code formatting, syntax highlighting)
→ [HITL: Test/verify code]
→ END
Agents: 3 agents (skip re-ranker, skip QA) HITL: 2 checkpoints (verify examples, verify generated code) Estimated Time: ~20-30 seconds
Characteristics:
- Code-centric query
- Working examples critical
- Syntax and imports matter
- Testing/verification needed
Example: "Verify if LangGraph supports custom checkpointers"
Workflow:
query_analyst (identify claim to verify)
→ retrieval_specialist (authoritative sources)
→ re_ranker (prioritize official docs)
→ quality_assurance (cross-reference sources)
→ [HITL: Review evidence]
→ writer (verdict with citations)
→ END
Agents: 5 agents (heavy on QA) HITL: 1 checkpoint (review evidence before verdict) Estimated Time: ~30-45 seconds
Characteristics:
- Verification-focused
- Source authority critical
- Cross-referencing needed
- Evidence-based conclusion
Example: "Compare LangGraph vs. CrewAI for agent orchestration"
Workflow:
query_analyst (identify comparison dimensions)
→ [HITL: Approve comparison criteria]
→ retrieval_specialist (balanced multi-source)
→ [HITL: Verify source coverage]
→ re_ranker (balanced representation)
→ writer (comparative analysis)
→ [HITL: Review analysis]
→ END
Agents: 4 agents (skip QA for opinion-based content) HITL: 3 checkpoints (criteria, sources, analysis) Estimated Time: ~60-90 seconds
Characteristics:
- Multiple perspectives needed
- Balanced representation
- Comparative structure
- Opinion/analysis component
The QueryAnalystAgent classifies the task based on:
def classify_task_type(query: str) -> TaskType:
"""
Classify query into task type based on patterns.
Classification Heuristics:
- Question words (what, who, when, where) → SIMPLE_QA
- "write", "create", "comprehensive guide" → RESEARCH_ARTICLE
- "explain", "how does", "understand" → CONCEPT_EXPLANATION
- "code", "implement", "example", "function" → CODE_GENERATION
- "verify", "check", "is it true", "confirm" → FACT_CHECKING
- "compare", "contrast", "difference", "vs" → MULTI_SOURCE_SYNTHESIS
"""
query_lower = query.lower()
# Code generation patterns
if any(word in query_lower for word in ["code", "implement", "function", "class", "script"]):
return TaskType.CODE_GENERATION
# Fact checking patterns
if any(word in query_lower for word in ["verify", "check", "is it true", "confirm", "validate"]):
return TaskType.FACT_CHECKING
# Comparison patterns
if any(word in query_lower for word in ["compare", "contrast", "vs", "versus", "difference between"]):
return TaskType.MULTI_SOURCE_SYNTHESIS
# Research article patterns
if any(word in query_lower for word in ["write", "create", "comprehensive", "guide", "article", "essay"]):
return TaskType.RESEARCH_ARTICLE
# Concept explanation patterns
if any(word in query_lower for word in ["explain", "how does", "understand", "learn about", "teach"]):
return TaskType.CONCEPT_EXPLANATION
# Default to simple QA
return TaskType.SIMPLE_QAUsers can explicitly specify task type:
# In UI or API
result = coordinator.execute(
query="What is LangGraph?",
task_type=TaskType.RESEARCH_ARTICLE, # Override automatic detection
thread_id="project_123"
)TASK_WORKFLOWS = {
TaskType.SIMPLE_QA: {
"agents": ["query_analyst", "retrieval_specialist", "writer"],
"hitl_points": [], # No HITL for simple Q&A (unless forced)
"skip_quality_assurance": True,
"skip_re_ranker": True,
},
TaskType.RESEARCH_ARTICLE: {
"agents": ["query_analyst", "retrieval_specialist", "re_ranker", "writer", "quality_assurance"],
"hitl_points": ["after_query", "after_retrieval", "after_reranker", "after_writer", "after_qa"],
"skip_quality_assurance": False,
"skip_re_ranker": False,
},
TaskType.CONCEPT_EXPLANATION: {
"agents": ["query_analyst", "retrieval_specialist", "re_ranker", "writer"],
"hitl_points": ["after_retrieval", "after_writer"],
"skip_quality_assurance": True,
"skip_re_ranker": False,
},
TaskType.CODE_GENERATION: {
"agents": ["query_analyst", "retrieval_specialist", "writer"],
"hitl_points": ["after_retrieval", "after_writer"],
"skip_quality_assurance": True,
"skip_re_ranker": True,
"retrieval_focus": "code_examples",
},
TaskType.FACT_CHECKING: {
"agents": ["query_analyst", "retrieval_specialist", "re_ranker", "quality_assurance", "writer"],
"hitl_points": ["after_qa"],
"skip_quality_assurance": False,
"skip_re_ranker": False,
"qa_emphasis": "verification",
},
TaskType.MULTI_SOURCE_SYNTHESIS: {
"agents": ["query_analyst", "retrieval_specialist", "re_ranker", "writer"],
"hitl_points": ["after_query", "after_retrieval", "after_writer"],
"skip_quality_assurance": True,
"skip_re_ranker": False,
"retrieval_strategy": "balanced_multi_source",
},
}def _build_task_adaptive_graph(self, task_type: TaskType) -> StateGraph:
"""
Build workflow dynamically based on task type.
This creates a specialized graph for the specific task,
including only necessary agents and HITL points.
"""
workflow_config = TASK_WORKFLOWS[task_type]
workflow = StateGraph(MessagesState)
# Add only necessary agent nodes
for agent_name in workflow_config["agents"]:
workflow.add_node(agent_name, getattr(self, f"_{agent_name}_node"))
# Add HITL nodes at specified points
if self.human_in_loop:
for hitl_point in workflow_config["hitl_points"]:
workflow.add_node(hitl_point, getattr(self, f"_{hitl_point}_node"))
# Build edges based on agent sequence
agents = workflow_config["agents"]
hitl_points = workflow_config["hitl_points"] if self.human_in_loop else []
# Connect agents with HITL checkpoints in between
current_node = START
for i, agent in enumerate(agents):
workflow.add_edge(current_node, agent)
# Check if there's a HITL point after this agent
hitl_after_agent = f"after_{agent.replace('_node', '')}"
if hitl_after_agent in hitl_points:
workflow.add_edge(agent, hitl_after_agent)
current_node = hitl_after_agent
else:
current_node = agent
# Final edge to END
workflow.add_edge(current_node, END)
return workflow.compile(checkpointer=MemorySaver())# In Streamlit UI
st.subheader("🎯 Task Type")
task_type_option = st.selectbox(
"What type of task is this?",
options=[
("🤔 Auto-detect", None),
("❓ Simple Question", TaskType.SIMPLE_QA),
("📄 Research Article", TaskType.RESEARCH_ARTICLE),
("💡 Concept Explanation", TaskType.CONCEPT_EXPLANATION),
("💻 Code Generation", TaskType.CODE_GENERATION),
("✅ Fact Checking", TaskType.FACT_CHECKING),
("⚖️ Compare Multiple Sources", TaskType.MULTI_SOURCE_SYNTHESIS),
],
format_func=lambda x: x[0]
)
task_type = task_type_option[1] if task_type_option else None# Show workflow stages based on task type
if task_type == TaskType.SIMPLE_QA:
st.info("📊 Workflow: Analysis → Retrieval → Answer (Fast)")
elif task_type == TaskType.RESEARCH_ARTICLE:
st.info("📊 Workflow: Full Pipeline with 5 Review Points (Thorough)")
elif task_type == TaskType.CODE_GENERATION:
st.info("📊 Workflow: Analysis → Code Search → Generation (Code-Focused)")- Define TaskType enum
- Implement classification logic in QueryAnalystAgent
- Add task type to state
- Define TASK_WORKFLOWS configuration
- Implement dynamic graph builder
- Test each task type workflow
- Conditional HITL based on task type
- Task-specific feedback options
- Dynamic interrupt points
- Task type selector
- Dynamic workflow visualization
- Task-appropriate feedback forms
- Progress indicators per task type
- Test all 6 task types
- Validate workflow efficiency
- User feedback integration
- Performance optimization
- Efficiency: Simple questions get fast answers
- Quality: Complex tasks get thorough treatment
- Flexibility: Workflow adapts to task needs
- User Control: Choose automation vs. HITL level
- Scalability: Easy to add new task types
- Transparency: Clear workflow for each task
- Performance: Optimal resource usage per task
- Simple Q&A: < 15 seconds
- Research Article: 2-5 minutes (with HITL)
- Concept Explanation: < 60 seconds
- Code Generation: < 30 seconds
- Fact Checking: < 45 seconds
- Multi-Source Synthesis: < 90 seconds
- ✅ Document design (this file)
- Implement task classification in QueryAnalystAgent
- Create workflow definitions (TASK_WORKFLOWS)
- Build dynamic graph constructor
- Test each task type individually
- Integrate with UI for task selection
- Add telemetry to track task type performance