本文档集中描述 delphi-agent 的主要技术架构图,包括整体调用链路、运行时架构、Agent loop、工具与 skills、sandbox、多节点协调和数据流。
flowchart TD
Client["Client / Platform"] -->|POST /api/chat/stream<br/>SSE| HttpApi["delphi-agent-http-api"]
subgraph HttpLayer["HTTP API Layer"]
HttpApi --> Identity["RuntimeIdentityResolver<br/>X-Tenant-Id / X-User-Id / namespace"]
Identity --> StreamController["StreamChatController<br/>prompt / command 分发"]
StreamController --> CatalogController["CatalogController<br/>models / skills / prompts / resources"]
StreamController --> AuditUsage["AuditController / UsageController"]
end
subgraph RuntimeLayer["Runtime Layer"]
StreamController --> RunRuntime["AgentRunRuntime<br/>runId / queue / active run / SSE event"]
StreamController --> SessionCommandRuntime["SessionCommandRuntime<br/>exclusive session commands"]
RunRuntime --> TenantGuard["TenantRuntimeGuard<br/>namespace / quota / queue capacity"]
RunRuntime --> LiveRegistry["LiveRunRegistry<br/>active run / abort / steer"]
RunRuntime --> QueueManager["RunQueueManager<br/>interrupt / followup / steer / drop / reject"]
RunRuntime --> SessionRuntime["AgentSessionRuntime<br/>session lifecycle / persistence / compact / fork"]
SessionRuntime --> ToolFactory["AgentToolFactory<br/>tool inventory + policy"]
end
subgraph CoreLayer["Agent Core Layer"]
SessionRuntime --> Agent["Agent.runLoop<br/>ReAct turn loop"]
Agent --> AgentState["AgentState<br/>messages / tools / model / streaming"]
Agent --> ToolCalls["AgentTool.execute<br/>tool call execution"]
Agent --> AiRuntime["AiRuntime.streamSimple"]
end
subgraph ProviderLayer["Provider Layer"]
AiRuntime --> ProviderRegistry["ApiProviderRegistry"]
ProviderRegistry --> SpringAiProvider["SpringAiChatModelProvider"]
SpringAiProvider --> ModelProvider["Model Provider<br/>DeepSeek Anthropic-compatible endpoint"]
end
subgraph ToolLayer["Tool / Sandbox Layer"]
ToolFactory --> BuiltinTools["Builtin Tools<br/>read / write / edit / bash / grep / find / ls"]
ToolFactory --> SkillTools["SkillAgentTool<br/>instructional / executable"]
ToolFactory --> SubagentTools["Subagent Tools<br/>spawn / status / result / abort"]
BuiltinTools --> Sandbox["ExecutionBackend"]
SkillTools --> Sandbox
SubagentTools --> SubagentRuntime["SubagentRuntime"]
end
subgraph StorageLayer["Storage / Catalog"]
SessionRuntime --> Mongo["MongoDB<br/>sessions / entries / audit / usage"]
LiveRegistry --> RedisLease["Redis<br/>active run lease / fencing token"]
QueueManager --> RedisQueue["Redis Stream<br/>run queue / consumer group"]
RunRuntime --> WorkspaceStorage["WorkspaceStorage<br/>local / snapshot"]
WorkspaceStorage --> ObjectStorage["S3-compatible storage<br/>workspace snapshot"]
ToolFactory --> Catalog["ResourceCatalogService<br/>skills / prompts / resources"]
Catalog --> SkillDirs["skills/public<br/>skills/namespaces/<namespace>"]
end
sequenceDiagram
autonumber
participant C as Client
participant API as StreamChatController
participant ID as RuntimeIdentityResolver
participant RR as AgentRunRuntime
participant TG as TenantRuntimeGuard
participant LR as LiveRunRegistry
participant RQ as RunQueueManager
participant SR as AgentSessionRuntime
participant AF as AgentToolFactory
participant A as Agent
participant LLM as AiRuntime / Provider
participant T as AgentTool
participant S as ExecutionBackend
participant DB as MongoDB
C->>API: POST /api/chat/stream
API->>ID: resolve tenant/user/namespace
ID-->>API: RuntimeIdentity
API->>SR: ensureSessionId(create if absent)
SR->>DB: save/load session
API->>RR: stream(AgentRunRequest)
RR->>TG: validateRunContext + quota
TG-->>RR: allowed
RR->>LR: register active run
LR-->>RR: fencingToken
RR-->>C: SSE run_started
RR->>SR: configureRunTools
SR->>AF: createTools(ToolRuntimeContext)
AF-->>SR: builtin + skill + subagent tools after policy
RR->>SR: prompt(sessionId, namespace, prompt, runId, fencingToken)
SR->>A: Agent.prompt()
A->>LLM: streamSimple(model, context, tool definitions)
LLM-->>A: text deltas + tool calls
A-->>C: SSE message_delta
A->>T: execute(toolCallId, args)
T->>S: execute/readFile/writeFile if needed
S-->>T: ExecutionResult / file content
T-->>A: AgentToolResult
A->>LLM: continue with tool result
LLM-->>A: final answer
A-->>SR: AgentEnd
SR->>DB: persist messages by runId + sequence
RR-->>C: SSE run_completed
RR->>LR: complete active run
RR->>RQ: poll next queued run
同 session 已有 active run 时:
INTERRUPT:新 run 先进入 Redis Stream 队列,再通过 active run owner 中断旧 run。FOLLOWUP:新 run 写入 Redis Stream,等待当前 run complete 后由 owner poll。STEER:通过 owner node command channel 注入当前 run。DROP/REJECT:不进入执行队列。
stateDiagram-v2
[*] --> AgentStart
AgentStart --> TurnStart
TurnStart --> AddPendingMessages: user prompt / steering / follow-up
AddPendingMessages --> StreamLLM
StreamLLM --> EmitMessageDelta: text / thinking delta
EmitMessageDelta --> StreamLLM
StreamLLM --> AssistantDone
AssistantDone --> HasToolCalls
HasToolCalls --> ExecuteTools: yes
ExecuteTools --> ValidateArgs
ValidateArgs --> BeforeToolHook
BeforeToolHook --> ToolExecute
ToolExecute --> AfterToolHook
AfterToolHook --> AppendToolResults
AppendToolResults --> TurnEnd
HasToolCalls --> TurnEnd: no
TurnEnd --> CheckSteering
CheckSteering --> TurnStart: steering exists
CheckSteering --> CheckFollowUp: no steering
CheckFollowUp --> TurnStart: follow-up exists
CheckFollowUp --> AgentEnd: empty
AgentEnd --> [*]
StreamLLM --> AgentEnd: error / aborted
关键点:
Agent.runLoop()是核心 ReAct 循环,模型每一轮都可以返回文本和 tool calls。- 工具调用前会经过 JSON Schema 参数校验和
beforeToolCallhook。 - 工具执行后会经过
afterToolCallhook,并以 tool result message 形式回灌上下文。 - steering 消息优先于 follow-up;二者都为空且模型不再调用工具时 run 结束。
flowchart TD
AgentRuntime["AgentSessionRuntime.configureRunTools"] --> Factory["AgentToolFactory"]
Factory --> Inventory["ToolInventory.collect"]
Inventory --> BuiltinFactory["BuiltinToolFactory"]
BuiltinFactory --> ReadOnly["READONLY<br/>read / grep / find / ls"]
BuiltinFactory --> Mutating["MUTATING<br/>write / edit"]
BuiltinFactory --> Executable["EXECUTABLE<br/>bash"]
Inventory --> SkillsResolver["SkillsResolver"]
SkillsResolver --> PublicSkills["skills/public/**"]
SkillsResolver --> NamespaceSkills["skills/namespaces/<namespace>/**"]
PublicSkills --> SkillAgentTool["SkillAgentTool"]
NamespaceSkills --> SkillAgentTool
SkillAgentTool --> Instructional["Instructional Skill<br/>return SKILL.md as context"]
SkillAgentTool --> ExecutableSkill["Executable Skill<br/>copy to workspace/.skills + run entrypoint"]
Inventory --> TaskPlanning["TaskPlanningTool<br/>planner / orchestrator"]
Factory --> SubagentToolFactory["SubagentOrchestrationToolFactory"]
SubagentToolFactory --> SubagentTools["subagent_spawn / status / result / abort"]
ReadOnly --> Policy["ToolPolicyPipeline"]
Mutating --> Policy
Executable --> Policy
Instructional --> Policy
ExecutableSkill --> Policy
TaskPlanning --> Policy
SubagentTools --> Policy
Policy --> Resolver["TenantToolPolicyResolver<br/>role + tenant + depth"]
Resolver --> WrappedTools["ToolExecutionWrapper / ToolAuditWrapper"]
WrappedTools --> AgentTools["Agent.state().tools"]
工具策略按角色裁剪工具:
| 角色 | 允许类别 |
|---|---|
ORCHESTRATOR |
默认全部;严格模式只允许 READONLY、INSTRUCTIONAL、ORCHESTRATION |
PLANNER / RESEARCHER |
READONLY、INSTRUCTIONAL |
REVIEWER |
READONLY |
CODER |
READONLY、MUTATING、EXECUTABLE、INSTRUCTIONAL |
TESTER |
READONLY、EXECUTABLE、INSTRUCTIONAL |
flowchart TD
Tool["bash / executable skill / file tools"] --> Backend["ExecutionBackend"]
Backend --> Context["ExecutionContext<br/>namespace / sessionId / userId"]
Backend --> Options["ExecutionOptions<br/>timeoutMs / maxOutputBytes / envVars"]
Context --> Workspace["Workspace Resolver<br/>workspaces/<namespace>/<sessionId>"]
Backend --> Profile{"Spring Profile"}
Profile -->|default !local-dev| DockerBackend["DockerIsolatedBackend"]
Profile -->|local-dev| LocalBackend["LocalIsolatedBackend"]
subgraph DockerSandbox["Docker Sandbox"]
DockerBackend --> DockerCmd["docker run --rm"]
DockerCmd --> NoNetwork["--network none"]
DockerCmd --> NonRoot["--user 65534:65534"]
DockerCmd --> ReadOnlyRoot["--read-only rootfs"]
DockerCmd --> ResourceLimit["--memory / --cpu-quota / --pids-limit"]
DockerCmd --> Tmpfs["--tmpfs /tmp:rw,noexec,nosuid,size=64m"]
DockerCmd --> Mount["-v workspace:/workspace:rw"]
DockerCmd --> Workdir["-w /workspace"]
ResourceLimit --> TenantQuota["TenantQuotaManager<br/>per-tenant override"]
end
subgraph LocalSandbox["Local Dev Sandbox"]
LocalBackend --> ProcessBuilder["ProcessBuilder bash -c"]
ProcessBuilder --> LocalWorkdir["pb.directory(workspace)"]
ProcessBuilder --> LocalTimeout["timeout control"]
end
Workspace --> PathGuard["Path Guards"]
PathGuard --> SegmentGuard["namespace/sessionId reject<br/>.. / slash / backslash / null byte"]
PathGuard --> NormalizeGuard["normalize + startsWith(workspace)"]
NormalizeGuard --> FileOps["readFile / writeFile / resolvePathInWorkspace"]
DockerBackend --> Result["ExecutionResult<br/>exitCode / stdout / stderr / duration / timeout / truncated"]
LocalBackend --> Result
Docker sandbox 的安全边界:
- 进程隔离:每次执行独立
docker run --rm。 - 网络隔离:
--network none。 - 用户隔离:容器内使用
65534:65534非 root 用户。 - 文件系统隔离:只读 rootfs,仅挂载当前 session workspace。
- 资源限制:CPU、memory、PIDs 可配置并支持租户覆盖。
- 输出限制:stdout/stderr 按
maxOutputBytes截断。
local-dev backend 只提供路径和超时级别保护,不提供容器隔离,只适合开发调试。
flowchart TD
Config["pi.resources.* / env vars"] --> Catalog["ResourceCatalogService.reload"]
Catalog --> SkillScan["scan skills dirs<br/>find SKILL.md"]
Catalog --> PromptScan["scan prompts dirs<br/>*.prompt.md / *.prompt.txt / *.prompt"]
Catalog --> ResourceScan["scan resources dirs<br/>regular files"]
SkillScan --> ParseSkill["parse YAML frontmatter<br/>description / entrypoint / args_schema / timeout_ms"]
ParseSkill --> SkillInfo["SkillInfo"]
SkillInfo --> ScopeMatch["skillsByScope(scope)"]
ScopeMatch --> Public["public"]
ScopeMatch --> Namespace["namespaces/<namespace>"]
Public --> Resolve["SkillsResolver.resolveSkills(namespace)"]
Namespace --> Resolve
Resolve --> Cache["namespace cache"]
Cache --> ToolInventory["ToolInventory"]
PromptScan --> PromptInfo["PromptTemplateInfo"]
ResourceScan --> ResourceInfo["ResourceInfo"]
flowchart TD
SessionRuntime["AgentSessionRuntime"] --> Lifecycle["SessionLifecycleManager<br/>live agent cache / eviction"]
SessionRuntime --> PromptQueue["SessionPromptQueue<br/>serialize same-session prompts"]
SessionRuntime --> SessionRepo["SessionRepository"]
SessionRuntime --> EntryRepo["SessionEntryRepository"]
SessionRepo --> SessionDoc["SessionDocument<br/>namespace / model / headEntryId<br/>version / fencingToken / lastCommittedRunId"]
EntryRepo --> EntryDoc["SessionEntryDocument<br/>message / branch_summary / compaction<br/>runId + sequence idempotency"]
SessionRuntime --> Prompt["prompt / cont"]
Prompt --> Persist["persistConversation"]
Persist --> EntryRepo
Persist --> SessionRepo
SessionRuntime --> Fork["forkSession"]
Fork --> CopyPath["copy path entries to new session"]
CopyPath --> EntryRepo
SessionRuntime --> Navigate["navigateTree"]
Navigate --> BranchSummary["summarize abandoned branch"]
BranchSummary --> EntryRepo
SessionRuntime --> Compact["compact"]
Compact --> LlmSummary["LLM summary"]
Compact --> HeuristicSummary["fallback heuristic summary"]
LlmSummary --> Persist
HeuristicSummary --> Persist
持久化语义:
SessionDocument.version使用 Mongo 乐观锁版本字段。fencingToken记录最后一次成功提交的 active-run token,低 token 写入会被拒绝。SessionEntryDocument通过sessionId + runId + sequence唯一索引保证 run 重试幂等。headEntryId只在消息 entries 持久化后更新,避免会话头指向未落库消息。
sequenceDiagram
autonumber
participant N as Node
participant R as Redis
participant M as MongoDB
N->>R: Lua acquire active-run lease
R->>R: check session busy
R->>R: check tenant/user quota zset
R->>R: INCR session fencing key
R->>R: HSET active run + SET by-session + ZADD quota
R-->>N: fencingToken
N->>M: run with token, persist entries idempotently
N->>N: persist workspace snapshot
N-->>N: emit run_completed
N->>R: release lease if owner node matches
崩溃推演:
| 崩溃点 | 结果 | 恢复语义 |
|---|---|---|
| acquire 前 | 无状态变化 | 客户端重试即可 |
| acquire 后、执行前 | session 短期 busy | lease TTL 到期后可重新获取 |
| entries 部分写入后 | Mongo 存在部分 run entries | runId + sequence 让重试跳过已写 entry |
| session head 更新后 | 会话已提交 | lastCommittedRunId 可用于诊断和补发终态事件 |
| workspace snapshot 前 | Redis 仍 busy,未发送 completed | snapshot 成功后才释放 lease;失败进入 run_failed |
| release 前 | Redis 仍 busy,workspace 已提交 | lease TTL 或 owner release 后恢复 |
sequenceDiagram
autonumber
participant B as Node B
participant R as Redis Stream
participant A as Owner Node A
B->>R: XADD queued run
A->>R: XREADGROUP next run
R-->>A: pending stream record
A->>A: schedule run
A->>R: XACK + XDEL after schedule accepted
A-->>R: on crash before ack, record remains pending
B->>R: recovery XAUTOCLAIM stale pending
B->>R: XACK + XDEL old record, XADD replacement
选择 Redis Stream 而不是 List 的原因:
- queued run 是“需要可恢复”的短生命周期命令,不应在 pop 后丢失。
- consumer group 的 pending 状态天然描述“已交给某节点但尚未确认”。
XAUTOCLAIM能把超时 pending 记录迁移给存活节点。
sequenceDiagram
autonumber
participant C as Client
participant B as Node B
participant R as Redis
participant A as Owner Node A
C->>B: command abort / steer
B->>R: read active run owner
R-->>B: ownerNodeId
B->>R: publish owner command channel
A->>A: verify local active run
A->>A: abort or steer local Agent
A->>R: publish SSE event + append session event stream
continue 不走会话命令锁,而是作为 AgentRunOperation.CONTINUE 进入 AgentRunRuntime,和普通 prompt 共用 active-run lease、fencing token、队列策略和 SSE 生命周期。
compact、fork、navigate 不直接改本地 session runtime。它们先进入 SessionCommandRuntime:
- cluster 模式下如果 session 有 active run,直接拒绝,避免和正在执行的 Agent 并发写。
- 没有 active run 时获取 Redis session lock,再执行 Mongo/session 写入。
- lock 使用 owner value 删除,避免误删其他节点的新 lock。
flowchart LR
User["User / Platform"] --> Server["delphi-agent-server<br/>Spring Boot"]
Server --> Mongo["MongoDB"]
Server --> Redis["Redis<br/>lease / stream / command / SSE"]
Server --> Docker["Docker Engine<br/>default sandbox"]
Server --> DeepSeek["DeepSeek Anthropic-compatible API"]
Server --> SkillRoot["Skill Roots<br/>./skills, ~/.codex/skills"]
Server --> WorkspaceRoot["Workspace Root<br/>./workspaces or PI_WORKSPACES_ROOT"]
Server --> ObjectStorage["S3-compatible storage<br/>snapshot workspace"]
Docker --> Containers["One-off sandbox containers"]
Containers --> WorkspaceMount["Mounted session workspace"]
最小运行依赖:
- JDK 21 + Maven 3.9+
- MongoDB
DEEPSEEK_API_KEY- Docker(默认 profile)
多节点额外依赖:
- Redis 6.2+,用于 Stream / consumer group /
XAUTOCLAIM。 - 唯一
PI_CLUSTER_NODE_ID,建议由POD_NAME或HOSTNAME注入。 - 推荐
PI_WORKSPACE_STORAGE=snapshot,并配置 S3 兼容对象存储,避免节点迁移后 workspace 丢失。