Skip to content

Commit ecee05b

Browse files
author
Daniel
committed
fixing memor
1 parent 4a17f57 commit ecee05b

11 files changed

Lines changed: 498 additions & 268 deletions

File tree

go/cmd/engine/main.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/ForestHubAI/edge-agents/go/engine/backend"
1717
"github.com/ForestHubAI/edge-agents/go/engine/build"
1818
"github.com/ForestHubAI/edge-agents/go/engine/driver"
19-
"github.com/ForestHubAI/edge-agents/go/engine/local"
2019
"github.com/ForestHubAI/edge-agents/go/engine/memory"
2120
"github.com/ForestHubAI/edge-agents/go/engine/websearch"
2221
"github.com/ForestHubAI/edge-agents/go/llmproxy"
@@ -92,18 +91,17 @@ func main() {
9291
logging.Logger.Fatal().Err(err).Msg("initialising driver registry")
9392
}
9493

95-
// Memory subsystem: backed by the configured local dir, syncs through
96-
// either the backend (cloud mode) or a filesystem-backed local store
97-
// rooted at the same dir (standalone mode — declared memory survives
98-
// engine restarts without a backend). Restore is invoked on every Build
99-
// (deploy or initial), so no eager call here.
100-
var memoryStore engine.MemoryStore
94+
// Memory subsystem: the Manager owns durable local storage rooted at
95+
// cfg.MemoryDir (declared memory survives engine restarts with no
96+
// backend). The backend, when configured, is an optional remote mirror —
97+
// it hydrates an empty local copy on a cold start and receives best-effort
98+
// pushes; nil means local-only. Restore is invoked on every Build (deploy
99+
// or initial), so no eager call here.
100+
var memorySync engine.MemorySync
101101
if backendClient != nil {
102-
memoryStore = backendClient
103-
} else {
104-
memoryStore = local.NewMemoryStore(cfg.MemoryDir)
102+
memorySync = backendClient
105103
}
106-
memoryManager := memory.NewManager(cfg.MemoryDir, memoryStore)
104+
memoryManager := memory.NewManager(cfg.MemoryDir, memorySync)
107105

108106
// Optional web search provider. Built eagerly so a bad provider name fails
109107
// fatal at boot; absent api key leaves it nil and any WebSearchTool node

go/docs/ports.md

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# Engine Ports & Implementations
2+
3+
The engine never depends on a concrete external service. Everything it needs
4+
from "outside" is reached through an **interface — a port — declared in
5+
`engine/port.go`**. Concrete adapters satisfy those interfaces and map to
6+
their own wire forms privately, so the engine core compiles and runs without
7+
knowing whether it's talking to the fh-backend, a local file, or nothing at
8+
all.
9+
10+
There are two sources of implementation:
11+
12+
- **The fh-backend adapter** (`engine/backend`) — one HTTP client that
13+
satisfies every port, used when `FH_BACKEND_URL` is configured.
14+
- **Built-in / standalone behavior** — what happens with no backend. For some
15+
ports that's a real local implementation; for others it's "the port is
16+
nil and the engine does without."
17+
18+
`cmd/engine/main.go` is the only place that decides which adapter fills each
19+
seam, based on whether a backend client was constructed.
20+
21+
## The matrix
22+
23+
| Port | Methods | Required? | No backend (standalone) | fh-backend adapter |
24+
|------|---------|-----------|-------------------------|--------------------|
25+
| `LlmClient` | `Chat` | Required for agent nodes | Local providers via `llmproxy` (direct API keys) | Backend-routed provider fallback |
26+
| `Retriever` | `QueryRAG` | Required **only if** a retrieval node is deployed | **nil** → build rejects any Retriever node | Forwards to `/rag/query` |
27+
| `Supervisor` | `Register`, `Heartbeat` | Optional | **nil** → no registration/heartbeat | POSTs `/agents/bootCallback`, `/agents/heartbeat` |
28+
| `MemorySync` | `Hydrate`, `Push` | Optional (mirror only) | **nil** → local-only memory | HTTP `GET`/`PUT /agents/memory` |
29+
30+
Two capabilities deliberately are **not** ports:
31+
32+
- **Local memory persistence** — owned unconditionally by
33+
`engine/memory.Manager`. The device always has a durable local copy; see
34+
[Memory](#memorysync-optional-remote-mirror) below.
35+
- **Logging** — stderr is unconditional and the engine already depends on the
36+
`logging` package. Optional log shipping to the backend is a `logging`
37+
`HTTPWriter` wired by `main`, not a port.
38+
39+
## LlmClient — required for agent nodes
40+
41+
`Chat` is the chat-completion seam. The implementation is
42+
`llmproxy.Client`, which dispatches by model id across configured providers.
43+
44+
- **Standalone:** providers configured with direct API keys
45+
(`anthropic`/`openai`/`gemini`/`mistral`/`selfhosted`). This is the primary
46+
path, not a fallback.
47+
- **Backend:** any provider the backend exposes but the engine has no local
48+
key for is registered as a backend-routed stand-in, resolved by model id
49+
exactly as if it were local.
50+
- **Missing:** an `AgentNode` with a nil `LlmClient` **fails the build**
51+
(`engine/build/graph.go`). There is no silent default.
52+
53+
## Retriever — required only when used
54+
55+
`QueryRAG` is the RAG seam. There is **no built-in standalone implementation
56+
yet**.
57+
58+
- **Standalone:** `Retriever` is nil. A workflow that declares a Retriever
59+
node **fails to deploy** with a clear error, mirroring the WebSearch and
60+
Agent build checks. A workflow with no retrieval node deploys and runs
61+
fine. (This replaced a silent no-op that returned empty results.)
62+
- **Backend:** forwards to `/rag/query`.
63+
- **Planned:** a standalone pgvector-backed adapter (query embedding via
64+
`llmproxy` + similarity search + ingestion) will live in its own package
65+
(e.g. `engine/rag/pgvector`), not bundled with the trivial seams.
66+
67+
## Supervisor — optional outbound callbacks
68+
69+
`Supervisor` abstracts whoever receives this agent's callbacks: the
70+
registration sent at boot plus the periodic liveness heartbeat. It is a
71+
purely outbound seam — deploys/commands arrive the other way, through the
72+
engine's HTTP server.
73+
74+
- **Standalone:** nil. With no one to report to, the engine simply doesn't
75+
register or heartbeat. This is correct, not degraded — pull-based health
76+
endpoints are the standalone observability story, not a fake heartbeat.
77+
- **Backend:** `backend.Client` POSTs `/agents/bootCallback` and
78+
`/agents/heartbeat`. The retry/heartbeat loops live in
79+
`engine/lifecycle.go`.
80+
81+
## MemorySync — optional remote mirror
82+
83+
Memory is **local-first (edge-primary)**. `engine/memory.Manager` owns a
84+
durable directory of `<uid>.json` records and is the source of truth: it
85+
reads them at boot and writes through on every mutation. `MemorySync` is
86+
*only* the optional remote mirror.
87+
88+
- **`Hydrate`** — pulls the agent's accumulated content. Called by the
89+
Manager **only on a cold start** (empty local directory) to seed a fresh
90+
copy.
91+
- **`Push`** — mirrors each local write. **Best-effort**: the local write is
92+
the truth, so a push failure is logged and the agent keeps working.
93+
- **Standalone:** nil. Memory is purely local and durable across restarts
94+
(mount a persistent volume to survive container remounts).
95+
96+
### Reconciliation on Restore
97+
98+
`Restore(ctx, declared)` is called on every build with the memory files
99+
declared by the workflow. Content precedence is **local → cold-start mirror →
100+
declared seed**:
101+
102+
1. An existing **local** copy wins — this preserves the agent's accumulated
103+
edits across redeploys.
104+
2. Otherwise, on a cold start with a mirror, the **hydrated** content seeds
105+
the file.
106+
3. Otherwise the workflow's declared `MemoryFile.Content` is used.
107+
108+
So **`MemoryFile.Content` is initial content only — it never overwrites an
109+
existing file.** Declared *metadata* (label, description, size cap) is always
110+
authoritative; only content is preserved.
111+
112+
> **Durability boundary:** with `Push` but no reverse path, the backend holds
113+
> only what the engine has pushed; runtime edits are durable on the local
114+
> volume. There is no backend → device or device → device reconciliation yet.
115+
> Adding a push-back path is additive and doesn't disturb this design.
116+
117+
## Wiring (cmd/engine/main.go)
118+
119+
```
120+
FH_BACKEND_URL set?
121+
├─ yes → backend.Client satisfies LlmClient (fallback), Retriever,
122+
│ Supervisor, and MemorySync.
123+
└─ no → LlmClient: local providers only
124+
Retriever: nil (retrieval nodes fail the build)
125+
Supervisor: nil (no register/heartbeat)
126+
MemorySync: nil (local-only memory)
127+
```
128+
129+
## Adding an adapter
130+
131+
Group by **shared dependency**, not by port:
132+
133+
- An adapter that talks to the fh-backend belongs in `engine/backend` (it
134+
shares the one HTTP client).
135+
- A heavy standalone implementation (its own driver, ingestion, etc.) gets
136+
its **own package** — e.g. a pgvector `Retriever` under `engine/rag`.
137+
Don't bundle it with trivial seams.
138+
- Edit `port.go` only to add or change a seam; never widen a port with a
139+
method a single adapter needs but the engine core doesn't call.

go/engine/backend/memory.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,23 @@ type memoryUpsertBody struct {
1212
Content string `json:"content"`
1313
}
1414

15-
// Snapshot pulls every memory file owned by the calling agent. Called
16-
// once at engine boot to populate the local working copy.
17-
func (c *Client) Snapshot(ctx context.Context) ([]workflow.MemoryFile, error) {
15+
// Hydrate pulls every memory file owned by the calling agent. The Manager
16+
// calls this on a cold start to seed an empty local working copy.
17+
func (c *Client) Hydrate(ctx context.Context) ([]workflow.MemoryFile, error) {
1818
var out []workflow.MemoryFile
1919
if err := c.http.Do(ctx, http.MethodGet, "/agents/memory", nil, nil, &out); err != nil {
20-
return nil, fmt.Errorf("backend memory snapshot: %w", err)
20+
return nil, fmt.Errorf("backend memory hydrate: %w", err)
2121
}
2222
return out, nil
2323
}
2424

25-
// Upsert pushes new content for the memory file identified by uid.
26-
// The engine calls this synchronously after every successful local write.
25+
// Push mirrors new content for the memory file identified by uid to the
26+
// backend. The Manager calls this best-effort after every local write.
2727
// The backend rejects unknown uids (404) and oversized payloads (413).
28-
func (c *Client) Upsert(ctx context.Context, uid, content string) error {
28+
func (c *Client) Push(ctx context.Context, uid, content string) error {
2929
body := memoryUpsertBody{Content: content}
3030
if err := c.http.Do(ctx, http.MethodPut, "/agents/memory/"+uid, nil, body, nil); err != nil {
31-
return fmt.Errorf("backend memory upsert: %w", err)
31+
return fmt.Errorf("backend memory push: %w", err)
3232
}
3333
return nil
3434
}

go/engine/backend/memory_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16-
func TestSnapshot_Success(t *testing.T) {
16+
func TestHydrate_Success(t *testing.T) {
1717
var (
1818
gotKey string
1919
gotMethod string
@@ -34,7 +34,7 @@ func TestSnapshot_Success(t *testing.T) {
3434
defer srv.Close()
3535

3636
c := NewClient(srv.URL, "secret")
37-
got, err := c.Snapshot(context.Background())
37+
got, err := c.Hydrate(context.Background())
3838
require.NoError(t, err)
3939
assert.Equal(t, http.MethodGet, gotMethod)
4040
assert.Equal(t, "/agents/memory", gotPath)
@@ -45,19 +45,19 @@ func TestSnapshot_Success(t *testing.T) {
4545
assert.Equal(t, "uid-log", got[1].Id)
4646
}
4747

48-
func TestSnapshot_BackendError(t *testing.T) {
48+
func TestHydrate_BackendError(t *testing.T) {
4949
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
5050
w.WriteHeader(http.StatusInternalServerError)
5151
}))
5252
defer srv.Close()
5353

5454
c := NewClient(srv.URL, "secret")
55-
_, err := c.Snapshot(context.Background())
55+
_, err := c.Hydrate(context.Background())
5656
require.Error(t, err)
5757
assert.Contains(t, err.Error(), "500")
5858
}
5959

60-
func TestUpsert_Success(t *testing.T) {
60+
func TestPush_Success(t *testing.T) {
6161
var (
6262
gotKey string
6363
gotMethod string
@@ -74,21 +74,21 @@ func TestUpsert_Success(t *testing.T) {
7474
defer srv.Close()
7575

7676
c := NewClient(srv.URL, "secret")
77-
require.NoError(t, c.Upsert(context.Background(), "uid-notes", "hello world"))
77+
require.NoError(t, c.Push(context.Background(), "uid-notes", "hello world"))
7878
assert.Equal(t, http.MethodPut, gotMethod)
7979
assert.Equal(t, "/agents/memory/uid-notes", gotPath)
8080
assert.Equal(t, "secret", gotKey)
8181
assert.JSONEq(t, `{"content":"hello world"}`, string(gotBody))
8282
}
8383

84-
func TestUpsert_BackendError(t *testing.T) {
84+
func TestPush_BackendError(t *testing.T) {
8585
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
8686
w.WriteHeader(http.StatusNotFound)
8787
}))
8888
defer srv.Close()
8989

9090
c := NewClient(srv.URL, "secret")
91-
err := c.Upsert(context.Background(), "uid-missing", "x")
91+
err := c.Push(context.Background(), "uid-missing", "x")
9292
require.Error(t, err)
9393
assert.Contains(t, err.Error(), "404")
9494
}

go/engine/build/build.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ type Builder struct {
2828
// current deploy).
2929
func (b *Builder) Build(ctx context.Context, wf *workflow.Workflow, dm engine.DeploymentMapping, ext *engine.ExternalResources) (*engine.Runner, error) {
3030
if b.Memory != nil {
31-
if err := b.Memory.Restore(ctx); err != nil {
31+
declared, err := declaredMemoryFiles(wf)
32+
if err != nil {
33+
return nil, fmt.Errorf("memory: reading declared files: %w", err)
34+
}
35+
if err := b.Memory.Restore(ctx, declared); err != nil {
3236
return nil, fmt.Errorf("refreshing memory: %w", err)
3337
}
3438
}
@@ -44,6 +48,31 @@ func (b *Builder) Build(ctx context.Context, wf *workflow.Workflow, dm engine.De
4448
return runner, nil
4549
}
4650

51+
// declaredMemoryFiles extracts the MemoryFile declarations from a workflow,
52+
// skipping other memory kinds (e.g. VectorDatabase, consumed by Retriever
53+
// nodes). These are the canonical set of files the memory Manager restores.
54+
func declaredMemoryFiles(wf *workflow.Workflow) ([]workflow.MemoryFile, error) {
55+
if wf.Memory == nil {
56+
return nil, nil
57+
}
58+
var out []workflow.MemoryFile
59+
for i, m := range *wf.Memory {
60+
disc, err := m.Discriminator()
61+
if err != nil {
62+
return nil, fmt.Errorf("memory[%d]: %w", i, err)
63+
}
64+
if disc != string(workflow.MemoryFileTypeMemoryFile) {
65+
continue
66+
}
67+
mf, err := m.AsMemoryFile()
68+
if err != nil {
69+
return nil, fmt.Errorf("memory[%d]: %w", i, err)
70+
}
71+
out = append(out, mf)
72+
}
73+
return out, nil
74+
}
75+
4776
// buildContext holds the inputs shared across every graph build.
4877
type buildContext struct {
4978
ctx context.Context

go/engine/local/local.go

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)