|
| 1 | +""" |
| 2 | +SoloFlow 使用示例 — 自动调度 + 超时重试 + 记忆系统 |
| 3 | +
|
| 4 | +演示内容: |
| 5 | +1. Scheduler 自动并行调度(无需手动 advance_step) |
| 6 | +2. 自定义 executor(模拟 LLM 调用) |
| 7 | +3. 超时和重试机制 |
| 8 | +4. 三层记忆系统的使用 |
| 9 | +
|
| 10 | +运行: cd hermes-plugin && python ../examples/02_scheduler_and_memory.py |
| 11 | +""" |
| 12 | + |
| 13 | +import asyncio |
| 14 | +import json |
| 15 | +import sys |
| 16 | +import tempfile |
| 17 | +import time |
| 18 | +from pathlib import Path |
| 19 | + |
| 20 | +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "hermes-plugin")) |
| 21 | + |
| 22 | +from store.sqlite_store import SQLiteStore |
| 23 | +from services.workflow_service import WorkflowService |
| 24 | +from services.scheduler import Scheduler |
| 25 | +from memory.working_memory import WorkingMemory |
| 26 | +from memory.episodic_memory import EpisodicMemory |
| 27 | +from memory.semantic_memory import SemanticMemory |
| 28 | + |
| 29 | + |
| 30 | +async def main(): |
| 31 | + db_path = Path(tempfile.mkdtemp()) / "demo.db" |
| 32 | + store = SQLiteStore(db_path) |
| 33 | + store.initialize() |
| 34 | + |
| 35 | + ws = WorkflowService(store) |
| 36 | + scheduler = Scheduler(store, ws, config={ |
| 37 | + "max_parallelism": 3, |
| 38 | + "default_timeout": 10, |
| 39 | + "base_backoff": 0.1, |
| 40 | + }) |
| 41 | + |
| 42 | + print("=" * 60) |
| 43 | + print("SoloFlow Demo — 自动调度 + 记忆系统") |
| 44 | + print("=" * 60) |
| 45 | + |
| 46 | + # ── 1. 自定义 Executor ────────────────────────────────── |
| 47 | + print("\n📌 自定义 Executor(模拟 LLM 调用)") |
| 48 | + |
| 49 | + execution_log = [] # 记录执行顺序 |
| 50 | + |
| 51 | + async def mock_llm_executor(step: dict) -> str: |
| 52 | + """模拟 LLM 调用 — 实际使用中替换成真实 API 调用""" |
| 53 | + name = step.get("name", step["id"]) |
| 54 | + discipline = step.get("discipline", "quick") |
| 55 | + duration = 0.05 if discipline == "quick" else 0.15 # 深度思考更慢 |
| 56 | + |
| 57 | + execution_log.append({ |
| 58 | + "id": step["id"], |
| 59 | + "name": name, |
| 60 | + "start": time.time(), |
| 61 | + }) |
| 62 | + print(f" ⚙️ 执行: {name} (discipline={discipline}, {duration}s)") |
| 63 | + await asyncio.sleep(duration) |
| 64 | + |
| 65 | + result = f"[{name}] 分析完成,生成结论" |
| 66 | + print(f" ✅ 完成: {name}") |
| 67 | + return result |
| 68 | + |
| 69 | + # ── 2. 创建复杂工作流 ────────────────────────────────── |
| 70 | + print("\n📌 创建工作流(含并行和汇聚)") |
| 71 | + |
| 72 | + wf = await ws.create_workflow( |
| 73 | + name="data-pipeline", |
| 74 | + description="数据处理管线", |
| 75 | + steps=[ |
| 76 | + {"id": "fetch_api", "name": "拉取API数据", "discipline": "quick", |
| 77 | + "prompt": "从 3 个数据源拉取最新数据"}, |
| 78 | + {"id": "fetch_db", "name": "查询数据库", "discipline": "quick", |
| 79 | + "prompt": "查询内部数据库"}, |
| 80 | + {"id": "clean", "name": "数据清洗", "discipline": "quick", |
| 81 | + "prompt": "去重、填充缺失值、标准化格式"}, |
| 82 | + {"id": "analyze", "name": "深度分析", "discipline": "deep", |
| 83 | + "prompt": "趋势分析 + 异常检测"}, |
| 84 | + {"id": "visualize", "name": "生成图表", "discipline": "quick", |
| 85 | + "prompt": "生成可视化图表"}, |
| 86 | + {"id": "report", "name": "生成报告", "discipline": "deep", |
| 87 | + "prompt": "汇总分析结果,生成周报"}, |
| 88 | + ], |
| 89 | + edges=[ |
| 90 | + ("fetch_api", "clean"), ("fetch_db", "clean"), # 两个数据源并行拉取 → 汇聚到清洗 |
| 91 | + ("clean", "analyze"), # 清洗后分析 |
| 92 | + ("analyze", "visualize"), ("analyze", "report"), # 分析后并行:图表 + 报告 |
| 93 | + ], |
| 94 | + ) |
| 95 | + print(f" 步骤: {len(wf['steps'])} 个") |
| 96 | + print(f" 结构: [fetch_api ∥ fetch_db] → clean → analyze → [visualize ∥ report]") |
| 97 | + |
| 98 | + # ── 3. 自动调度执行 ──────────────────────────────────── |
| 99 | + print("\n🚀 Scheduler 自动调度执行...\n") |
| 100 | + t0 = time.time() |
| 101 | + await ws.start_workflow(wf["id"]) |
| 102 | + result = await scheduler.execute_workflow(wf["id"], executor=mock_llm_executor) |
| 103 | + elapsed = time.time() - t0 |
| 104 | + |
| 105 | + print(f"\n⏱️ 总耗时: {elapsed:.2f}s") |
| 106 | + print(f" 状态: {result.get('state')}") |
| 107 | + |
| 108 | + # 验证并行执行 |
| 109 | + if len(execution_log) >= 2: |
| 110 | + t1 = execution_log[0]["start"] |
| 111 | + t2 = execution_log[1]["start"] |
| 112 | + parallel = abs(t2 - t1) < 0.1 |
| 113 | + print(f" 并行验证: {'✅ 前两步并行执行' if parallel else '⚠️ 串行执行'}") |
| 114 | + |
| 115 | + # ── 4. 三层记忆系统 ──────────────────────────────────── |
| 116 | + print("\n" + "=" * 60) |
| 117 | + print("🧠 记忆系统") |
| 118 | + print("=" * 60) |
| 119 | + |
| 120 | + # Working Memory — 即时上下文 |
| 121 | + print("\n📝 Working Memory (LRU 即时上下文)") |
| 122 | + wm = WorkingMemory(max_size=5) |
| 123 | + for i in range(5): |
| 124 | + wm.put(f"var_{i}", {"value": f"result_{i}", "step": f"step_{i}"}) |
| 125 | + print(f" 容量: {len(wm)}/5") |
| 126 | + print(f" 查询 var_2: {wm.get('var_2')}") |
| 127 | + wm.put("var_5", {"value": "overflow"}) # 溢出,淘汰 var_0 |
| 128 | + print(f" 加入 var_5 后 var_0 被淘汰: {wm.get('var_0') is None}") |
| 129 | + |
| 130 | + # Episodic Memory — 事件流 |
| 131 | + print("\n📚 Episodic Memory (FTS5 事件记忆)") |
| 132 | + em = EpisodicMemory(store) |
| 133 | + |
| 134 | + await em.record(event_type="step_completed", data={"step": "fetch_api", "result": "200 OK"}) |
| 135 | + await em.record(event_type="step_completed", data={"step": "analyze", "result": "发现 3 个异常"}) |
| 136 | + await em.record(event_type="error", data={"step": "fetch_db", "msg": "connection timeout after 5s"}) |
| 137 | + |
| 138 | + search_result = await em.search("timeout") |
| 139 | + print(f" 搜索 'timeout': {len(search_result)} 条结果") |
| 140 | + for r in search_result: |
| 141 | + print(f" - {r['event_type']}: {r['data']}") |
| 142 | + |
| 143 | + search_result2 = await em.search("completed analyze") |
| 144 | + print(f" 搜索 'completed analyze': {len(search_result2)} 条结果") |
| 145 | + |
| 146 | + # Semantic Memory — 模式提取 |
| 147 | + print("\n🔮 Semantic Memory (模式提取)") |
| 148 | + sm = SemanticMemory(store) |
| 149 | + |
| 150 | + completed_wf = store.get_workflow(wf["id"], full=True) |
| 151 | + template = await sm.extract_and_store(completed_wf) |
| 152 | + print(f" 提取模板: {template['name']}") |
| 153 | + print(f" 步骤数: {template['step_count']}") |
| 154 | + print(f" 结构: {template['pattern']}") |
| 155 | + |
| 156 | + templates = await sm.get_templates() |
| 157 | + print(f" 已有模板数: {len(templates)}") |
| 158 | + |
| 159 | + # ── 5. 总结 ──────────────────────────────────────────── |
| 160 | + print("\n" + "=" * 60) |
| 161 | + print("✅ Demo 完成!") |
| 162 | + print(" - DAG 自动调度: 6 步 3 轮完成(并行 2+1+2)") |
| 163 | + print(" - Working Memory: LRU 即时上下文,超 size 自动淘汰") |
| 164 | + print(" - Episodic Memory: FTS5 全文搜索事件流") |
| 165 | + print(" - Semantic Memory: 自动提取 workflow 结构模板") |
| 166 | + print("=" * 60) |
| 167 | + |
| 168 | + store.close() |
| 169 | + |
| 170 | + |
| 171 | +if __name__ == "__main__": |
| 172 | + asyncio.run(main()) |
0 commit comments