Skip to content

Latest commit

 

History

History
433 lines (356 loc) · 17.5 KB

File metadata and controls

433 lines (356 loc) · 17.5 KB

Delphi Agent 技术架构

本文档集中描述 delphi-agent 的主要技术架构图,包括整体调用链路、运行时架构、Agent loop、工具与 skills、sandbox、多节点协调和数据流。

1. 整体架构流程图

flowchart TD
    Client["Client / Platform"] -->|POST /api/chat/stream<br/>SSE| HttpApi["delphi-agent-http-api"]

    subgraph HttpLayer["HTTP API Layer"]
        HttpApi --> Identity["RuntimeIdentityResolver<br/>X-Tenant-Id / X-User-Id / namespace"]
        Identity --> StreamController["StreamChatController<br/>prompt / command 分发"]
        StreamController --> CatalogController["CatalogController<br/>models / skills / prompts / resources"]
        StreamController --> AuditUsage["AuditController / UsageController"]
    end

    subgraph RuntimeLayer["Runtime Layer"]
        StreamController --> RunRuntime["AgentRunRuntime<br/>runId / queue / active run / SSE event"]
        StreamController --> SessionCommandRuntime["SessionCommandRuntime<br/>exclusive session commands"]
        RunRuntime --> TenantGuard["TenantRuntimeGuard<br/>namespace / quota / queue capacity"]
        RunRuntime --> LiveRegistry["LiveRunRegistry<br/>active run / abort / steer"]
        RunRuntime --> QueueManager["RunQueueManager<br/>interrupt / followup / steer / drop / reject"]
        RunRuntime --> SessionRuntime["AgentSessionRuntime<br/>session lifecycle / persistence / compact / fork"]
        SessionRuntime --> ToolFactory["AgentToolFactory<br/>tool inventory + policy"]
    end

    subgraph CoreLayer["Agent Core Layer"]
        SessionRuntime --> Agent["Agent.runLoop<br/>ReAct turn loop"]
        Agent --> AgentState["AgentState<br/>messages / tools / model / streaming"]
        Agent --> ToolCalls["AgentTool.execute<br/>tool call execution"]
        Agent --> AiRuntime["AiRuntime.streamSimple"]
    end

    subgraph ProviderLayer["Provider Layer"]
        AiRuntime --> ProviderRegistry["ApiProviderRegistry"]
        ProviderRegistry --> SpringAiProvider["SpringAiChatModelProvider"]
        SpringAiProvider --> ModelProvider["Model Provider<br/>DeepSeek Anthropic-compatible endpoint"]
    end

    subgraph ToolLayer["Tool / Sandbox Layer"]
        ToolFactory --> BuiltinTools["Builtin Tools<br/>read / write / edit / bash / grep / find / ls"]
        ToolFactory --> SkillTools["SkillAgentTool<br/>instructional / executable"]
        ToolFactory --> SubagentTools["Subagent Tools<br/>spawn / status / result / abort"]
        BuiltinTools --> Sandbox["ExecutionBackend"]
        SkillTools --> Sandbox
        SubagentTools --> SubagentRuntime["SubagentRuntime"]
    end

    subgraph StorageLayer["Storage / Catalog"]
        SessionRuntime --> Mongo["MongoDB<br/>sessions / entries / audit / usage"]
        LiveRegistry --> RedisLease["Redis<br/>active run lease / fencing token"]
        QueueManager --> RedisQueue["Redis Stream<br/>run queue / consumer group"]
        RunRuntime --> WorkspaceStorage["WorkspaceStorage<br/>local / snapshot"]
        WorkspaceStorage --> ObjectStorage["S3-compatible storage<br/>workspace snapshot"]
        ToolFactory --> Catalog["ResourceCatalogService<br/>skills / prompts / resources"]
        Catalog --> SkillDirs["skills/public<br/>skills/namespaces/&lt;namespace&gt;"]
    end
Loading

2. 请求运行时序图

sequenceDiagram
    autonumber
    participant C as Client
    participant API as StreamChatController
    participant ID as RuntimeIdentityResolver
    participant RR as AgentRunRuntime
    participant TG as TenantRuntimeGuard
    participant LR as LiveRunRegistry
    participant RQ as RunQueueManager
    participant SR as AgentSessionRuntime
    participant AF as AgentToolFactory
    participant A as Agent
    participant LLM as AiRuntime / Provider
    participant T as AgentTool
    participant S as ExecutionBackend
    participant DB as MongoDB

    C->>API: POST /api/chat/stream
    API->>ID: resolve tenant/user/namespace
    ID-->>API: RuntimeIdentity
    API->>SR: ensureSessionId(create if absent)
    SR->>DB: save/load session
    API->>RR: stream(AgentRunRequest)
    RR->>TG: validateRunContext + quota
    TG-->>RR: allowed
    RR->>LR: register active run
    LR-->>RR: fencingToken
    RR-->>C: SSE run_started
    RR->>SR: configureRunTools
    SR->>AF: createTools(ToolRuntimeContext)
    AF-->>SR: builtin + skill + subagent tools after policy
    RR->>SR: prompt(sessionId, namespace, prompt, runId, fencingToken)
    SR->>A: Agent.prompt()
    A->>LLM: streamSimple(model, context, tool definitions)
    LLM-->>A: text deltas + tool calls
    A-->>C: SSE message_delta
    A->>T: execute(toolCallId, args)
    T->>S: execute/readFile/writeFile if needed
    S-->>T: ExecutionResult / file content
    T-->>A: AgentToolResult
    A->>LLM: continue with tool result
    LLM-->>A: final answer
    A-->>SR: AgentEnd
    SR->>DB: persist messages by runId + sequence
    RR-->>C: SSE run_completed
    RR->>LR: complete active run
    RR->>RQ: poll next queued run
Loading

同 session 已有 active run 时:

  • INTERRUPT:新 run 先进入 Redis Stream 队列,再通过 active run owner 中断旧 run。
  • FOLLOWUP:新 run 写入 Redis Stream,等待当前 run complete 后由 owner poll。
  • STEER:通过 owner node command channel 注入当前 run。
  • DROP / REJECT:不进入执行队列。

3. Agent Loop 技术流程图

stateDiagram-v2
    [*] --> AgentStart
    AgentStart --> TurnStart
    TurnStart --> AddPendingMessages: user prompt / steering / follow-up
    AddPendingMessages --> StreamLLM
    StreamLLM --> EmitMessageDelta: text / thinking delta
    EmitMessageDelta --> StreamLLM
    StreamLLM --> AssistantDone
    AssistantDone --> HasToolCalls

    HasToolCalls --> ExecuteTools: yes
    ExecuteTools --> ValidateArgs
    ValidateArgs --> BeforeToolHook
    BeforeToolHook --> ToolExecute
    ToolExecute --> AfterToolHook
    AfterToolHook --> AppendToolResults
    AppendToolResults --> TurnEnd

    HasToolCalls --> TurnEnd: no
    TurnEnd --> CheckSteering
    CheckSteering --> TurnStart: steering exists
    CheckSteering --> CheckFollowUp: no steering
    CheckFollowUp --> TurnStart: follow-up exists
    CheckFollowUp --> AgentEnd: empty
    AgentEnd --> [*]

    StreamLLM --> AgentEnd: error / aborted
Loading

关键点:

  • Agent.runLoop() 是核心 ReAct 循环,模型每一轮都可以返回文本和 tool calls。
  • 工具调用前会经过 JSON Schema 参数校验和 beforeToolCall hook。
  • 工具执行后会经过 afterToolCall hook,并以 tool result message 形式回灌上下文。
  • steering 消息优先于 follow-up;二者都为空且模型不再调用工具时 run 结束。

4. 工具与 Skills 技术架构图

flowchart TD
    AgentRuntime["AgentSessionRuntime.configureRunTools"] --> Factory["AgentToolFactory"]
    Factory --> Inventory["ToolInventory.collect"]

    Inventory --> BuiltinFactory["BuiltinToolFactory"]
    BuiltinFactory --> ReadOnly["READONLY<br/>read / grep / find / ls"]
    BuiltinFactory --> Mutating["MUTATING<br/>write / edit"]
    BuiltinFactory --> Executable["EXECUTABLE<br/>bash"]

    Inventory --> SkillsResolver["SkillsResolver"]
    SkillsResolver --> PublicSkills["skills/public/**"]
    SkillsResolver --> NamespaceSkills["skills/namespaces/&lt;namespace&gt;/**"]
    PublicSkills --> SkillAgentTool["SkillAgentTool"]
    NamespaceSkills --> SkillAgentTool
    SkillAgentTool --> Instructional["Instructional Skill<br/>return SKILL.md as context"]
    SkillAgentTool --> ExecutableSkill["Executable Skill<br/>copy to workspace/.skills + run entrypoint"]

    Inventory --> TaskPlanning["TaskPlanningTool<br/>planner / orchestrator"]
    Factory --> SubagentToolFactory["SubagentOrchestrationToolFactory"]
    SubagentToolFactory --> SubagentTools["subagent_spawn / status / result / abort"]

    ReadOnly --> Policy["ToolPolicyPipeline"]
    Mutating --> Policy
    Executable --> Policy
    Instructional --> Policy
    ExecutableSkill --> Policy
    TaskPlanning --> Policy
    SubagentTools --> Policy

    Policy --> Resolver["TenantToolPolicyResolver<br/>role + tenant + depth"]
    Resolver --> WrappedTools["ToolExecutionWrapper / ToolAuditWrapper"]
    WrappedTools --> AgentTools["Agent.state().tools"]
Loading

工具策略按角色裁剪工具:

角色 允许类别
ORCHESTRATOR 默认全部;严格模式只允许 READONLYINSTRUCTIONALORCHESTRATION
PLANNER / RESEARCHER READONLYINSTRUCTIONAL
REVIEWER READONLY
CODER READONLYMUTATINGEXECUTABLEINSTRUCTIONAL
TESTER READONLYEXECUTABLEINSTRUCTIONAL

5. Sandbox 技术架构图

flowchart TD
    Tool["bash / executable skill / file tools"] --> Backend["ExecutionBackend"]
    Backend --> Context["ExecutionContext<br/>namespace / sessionId / userId"]
    Backend --> Options["ExecutionOptions<br/>timeoutMs / maxOutputBytes / envVars"]
    Context --> Workspace["Workspace Resolver<br/>workspaces/&lt;namespace&gt;/&lt;sessionId&gt;"]

    Backend --> Profile{"Spring Profile"}
    Profile -->|default !local-dev| DockerBackend["DockerIsolatedBackend"]
    Profile -->|local-dev| LocalBackend["LocalIsolatedBackend"]

    subgraph DockerSandbox["Docker Sandbox"]
        DockerBackend --> DockerCmd["docker run --rm"]
        DockerCmd --> NoNetwork["--network none"]
        DockerCmd --> NonRoot["--user 65534:65534"]
        DockerCmd --> ReadOnlyRoot["--read-only rootfs"]
        DockerCmd --> ResourceLimit["--memory / --cpu-quota / --pids-limit"]
        DockerCmd --> Tmpfs["--tmpfs /tmp:rw,noexec,nosuid,size=64m"]
        DockerCmd --> Mount["-v workspace:/workspace:rw"]
        DockerCmd --> Workdir["-w /workspace"]
        ResourceLimit --> TenantQuota["TenantQuotaManager<br/>per-tenant override"]
    end

    subgraph LocalSandbox["Local Dev Sandbox"]
        LocalBackend --> ProcessBuilder["ProcessBuilder bash -c"]
        ProcessBuilder --> LocalWorkdir["pb.directory(workspace)"]
        ProcessBuilder --> LocalTimeout["timeout control"]
    end

    Workspace --> PathGuard["Path Guards"]
    PathGuard --> SegmentGuard["namespace/sessionId reject<br/>.. / slash / backslash / null byte"]
    PathGuard --> NormalizeGuard["normalize + startsWith(workspace)"]
    NormalizeGuard --> FileOps["readFile / writeFile / resolvePathInWorkspace"]

    DockerBackend --> Result["ExecutionResult<br/>exitCode / stdout / stderr / duration / timeout / truncated"]
    LocalBackend --> Result
Loading

Docker sandbox 的安全边界:

  • 进程隔离:每次执行独立 docker run --rm
  • 网络隔离:--network none
  • 用户隔离:容器内使用 65534:65534 非 root 用户。
  • 文件系统隔离:只读 rootfs,仅挂载当前 session workspace。
  • 资源限制:CPU、memory、PIDs 可配置并支持租户覆盖。
  • 输出限制:stdout/stderr 按 maxOutputBytes 截断。

local-dev backend 只提供路径和超时级别保护,不提供容器隔离,只适合开发调试。

6. Catalog 与 Skill 加载流程

flowchart TD
    Config["pi.resources.* / env vars"] --> Catalog["ResourceCatalogService.reload"]
    Catalog --> SkillScan["scan skills dirs<br/>find SKILL.md"]
    Catalog --> PromptScan["scan prompts dirs<br/>*.prompt.md / *.prompt.txt / *.prompt"]
    Catalog --> ResourceScan["scan resources dirs<br/>regular files"]

    SkillScan --> ParseSkill["parse YAML frontmatter<br/>description / entrypoint / args_schema / timeout_ms"]
    ParseSkill --> SkillInfo["SkillInfo"]
    SkillInfo --> ScopeMatch["skillsByScope(scope)"]
    ScopeMatch --> Public["public"]
    ScopeMatch --> Namespace["namespaces/&lt;namespace&gt;"]
    Public --> Resolve["SkillsResolver.resolveSkills(namespace)"]
    Namespace --> Resolve
    Resolve --> Cache["namespace cache"]
    Cache --> ToolInventory["ToolInventory"]

    PromptScan --> PromptInfo["PromptTemplateInfo"]
    ResourceScan --> ResourceInfo["ResourceInfo"]
Loading

7. 会话与持久化架构图

flowchart TD
    SessionRuntime["AgentSessionRuntime"] --> Lifecycle["SessionLifecycleManager<br/>live agent cache / eviction"]
    SessionRuntime --> PromptQueue["SessionPromptQueue<br/>serialize same-session prompts"]
    SessionRuntime --> SessionRepo["SessionRepository"]
    SessionRuntime --> EntryRepo["SessionEntryRepository"]

    SessionRepo --> SessionDoc["SessionDocument<br/>namespace / model / headEntryId<br/>version / fencingToken / lastCommittedRunId"]
    EntryRepo --> EntryDoc["SessionEntryDocument<br/>message / branch_summary / compaction<br/>runId + sequence idempotency"]

    SessionRuntime --> Prompt["prompt / cont"]
    Prompt --> Persist["persistConversation"]
    Persist --> EntryRepo
    Persist --> SessionRepo

    SessionRuntime --> Fork["forkSession"]
    Fork --> CopyPath["copy path entries to new session"]
    CopyPath --> EntryRepo

    SessionRuntime --> Navigate["navigateTree"]
    Navigate --> BranchSummary["summarize abandoned branch"]
    BranchSummary --> EntryRepo

    SessionRuntime --> Compact["compact"]
    Compact --> LlmSummary["LLM summary"]
    Compact --> HeuristicSummary["fallback heuristic summary"]
    LlmSummary --> Persist
    HeuristicSummary --> Persist
Loading

持久化语义:

  • SessionDocument.version 使用 Mongo 乐观锁版本字段。
  • fencingToken 记录最后一次成功提交的 active-run token,低 token 写入会被拒绝。
  • SessionEntryDocument 通过 sessionId + runId + sequence 唯一索引保证 run 重试幂等。
  • headEntryId 只在消息 entries 持久化后更新,避免会话头指向未落库消息。

8. 多节点协调时序图

8.1 获取运行权

sequenceDiagram
    autonumber
    participant N as Node
    participant R as Redis
    participant M as MongoDB

    N->>R: Lua acquire active-run lease
    R->>R: check session busy
    R->>R: check tenant/user quota zset
    R->>R: INCR session fencing key
    R->>R: HSET active run + SET by-session + ZADD quota
    R-->>N: fencingToken
    N->>M: run with token, persist entries idempotently
    N->>N: persist workspace snapshot
    N-->>N: emit run_completed
    N->>R: release lease if owner node matches
Loading

崩溃推演:

崩溃点 结果 恢复语义
acquire 前 无状态变化 客户端重试即可
acquire 后、执行前 session 短期 busy lease TTL 到期后可重新获取
entries 部分写入后 Mongo 存在部分 run entries runId + sequence 让重试跳过已写 entry
session head 更新后 会话已提交 lastCommittedRunId 可用于诊断和补发终态事件
workspace snapshot 前 Redis 仍 busy,未发送 completed snapshot 成功后才释放 lease;失败进入 run_failed
release 前 Redis 仍 busy,workspace 已提交 lease TTL 或 owner release 后恢复

8.2 Redis Stream 队列

sequenceDiagram
    autonumber
    participant B as Node B
    participant R as Redis Stream
    participant A as Owner Node A

    B->>R: XADD queued run
    A->>R: XREADGROUP next run
    R-->>A: pending stream record
    A->>A: schedule run
    A->>R: XACK + XDEL after schedule accepted
    A-->>R: on crash before ack, record remains pending
    B->>R: recovery XAUTOCLAIM stale pending
    B->>R: XACK + XDEL old record, XADD replacement
Loading

选择 Redis Stream 而不是 List 的原因:

  • queued run 是“需要可恢复”的短生命周期命令,不应在 pop 后丢失。
  • consumer group 的 pending 状态天然描述“已交给某节点但尚未确认”。
  • XAUTOCLAIM 能把超时 pending 记录迁移给存活节点。

8.3 跨节点控制命令

sequenceDiagram
    autonumber
    participant C as Client
    participant B as Node B
    participant R as Redis
    participant A as Owner Node A

    C->>B: command abort / steer
    B->>R: read active run owner
    R-->>B: ownerNodeId
    B->>R: publish owner command channel
    A->>A: verify local active run
    A->>A: abort or steer local Agent
    A->>R: publish SSE event + append session event stream
Loading

continue 不走会话命令锁,而是作为 AgentRunOperation.CONTINUE 进入 AgentRunRuntime,和普通 prompt 共用 active-run lease、fencing token、队列策略和 SSE 生命周期。

compactforknavigate 不直接改本地 session runtime。它们先进入 SessionCommandRuntime

  • cluster 模式下如果 session 有 active run,直接拒绝,避免和正在执行的 Agent 并发写。
  • 没有 active run 时获取 Redis session lock,再执行 Mongo/session 写入。
  • lock 使用 owner value 删除,避免误删其他节点的新 lock。

9. 部署与运行视图

flowchart LR
    User["User / Platform"] --> Server["delphi-agent-server<br/>Spring Boot"]
    Server --> Mongo["MongoDB"]
    Server --> Redis["Redis<br/>lease / stream / command / SSE"]
    Server --> Docker["Docker Engine<br/>default sandbox"]
    Server --> DeepSeek["DeepSeek Anthropic-compatible API"]
    Server --> SkillRoot["Skill Roots<br/>./skills, ~/.codex/skills"]
    Server --> WorkspaceRoot["Workspace Root<br/>./workspaces or PI_WORKSPACES_ROOT"]
    Server --> ObjectStorage["S3-compatible storage<br/>snapshot workspace"]

    Docker --> Containers["One-off sandbox containers"]
    Containers --> WorkspaceMount["Mounted session workspace"]
Loading

最小运行依赖:

  • JDK 21 + Maven 3.9+
  • MongoDB
  • DEEPSEEK_API_KEY
  • Docker(默认 profile)

多节点额外依赖:

  • Redis 6.2+,用于 Stream / consumer group / XAUTOCLAIM
  • 唯一 PI_CLUSTER_NODE_ID,建议由 POD_NAMEHOSTNAME 注入。
  • 推荐 PI_WORKSPACE_STORAGE=snapshot,并配置 S3 兼容对象存储,避免节点迁移后 workspace 丢失。