Skip to content

Commit 37810d9

Browse files
authored
fix(collections): rehydrate in-process collections lazily after init failure (#468)
When NewInProcessBackend boots, it iterates every on-disk collection and calls newVectorEngine to construct its engine wrapper. For the postgres engine that constructor performs a "test embedding" probe that requires the embedding model to be reachable. If the embedding service is briefly unavailable at boot — e.g. the node hosting the embedding model has temporary NATS connectivity issues — the construction fails and the collection is silently dropped from the in-memory map. Subsequent operations against that collection (Upload / Search / ListEntries / …) all return "collection not found" indefinitely, even after the embedding service comes back, because nothing ever retries. Worse, the collection still exists on disk (its JSON sidecar) and its data still exists in the vector DB (e.g. the per-collection documents_col_<uuid> table in PostgreSQL). From the user's perspective their collection has silently disappeared. Two changes: 1. NewInProcessBackend always registers every on-disk collection in state.Collections — even when newVectorEngine returns nil. A nil entry is a placeholder meaning "known on disk, not yet loaded". 2. backendInProcess.lookup centralises the cache read for every operation. If the cache holds a placeholder, it retries newVectorEngine now, under the write lock. So as soon as the embedding service is reachable again, the next request to the collection will rehydrate it transparently. If init still fails or the collection isn't on disk at all, lookup returns (nil, false) and the operation surfaces "collection not found" as before. The state.EnsureCollection callback used by the internal RAG provider already handled the placeholder case (it re-inits whenever the cache entry is missing or nil), so it needs no change. This does not address the underlying probe-and-cache pattern in LocalRecall's NewPersistentPostgresCollection — read-only operations on existing collections still require an embedding probe at engine construction. That is a separate, deeper fix worth pursuing in LocalRecall directly.
1 parent c1a1231 commit 37810d9

1 file changed

Lines changed: 53 additions & 34 deletions

File tree

webui/collections/inprocess.go

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,39 @@ type backendInProcess struct {
6363

6464
var _ Backend = (*backendInProcess)(nil)
6565

66+
// lookup returns the cached collection KB for name. If the cache holds a
67+
// placeholder (nil entry — the engine init failed at startup, e.g. because
68+
// the embedding service was momentarily unreachable when iterating over
69+
// existing collections in NewInProcessBackend) it attempts to re-initialise
70+
// the engine now so a transient outage doesn't permanently 404 a collection
71+
// that still has data on disk / in the vector DB. Returns (nil, false) only
72+
// when the collection isn't known at all, or when re-init still fails.
73+
func (b *backendInProcess) lookup(name string) (*rag.PersistentKB, bool) {
74+
b.state.Mu.RLock()
75+
kb, exists := b.state.Collections[name]
76+
b.state.Mu.RUnlock()
77+
if !exists {
78+
return nil, false
79+
}
80+
if kb != nil {
81+
return kb, true
82+
}
83+
// Placeholder: collection is known on disk but its engine wrapper failed
84+
// to construct earlier. Retry under the write lock.
85+
b.state.Mu.Lock()
86+
defer b.state.Mu.Unlock()
87+
if kb, ok := b.state.Collections[name]; ok && kb != nil {
88+
return kb, true
89+
}
90+
kb = newVectorEngine(b.cfg.VectorEngine, b.openAIClient, b.cfg.LLMAPIURL, b.cfg.LLMAPIKey, name, b.cfg.CollectionDBPath, b.cfg.FileAssets, b.cfg.EmbeddingModel, b.cfg.DatabaseURL, b.cfg.MaxChunkingSize, b.cfg.ChunkOverlap)
91+
if kb == nil {
92+
return nil, false
93+
}
94+
b.state.Collections[name] = kb
95+
b.state.SourceManager.RegisterCollection(name, kb)
96+
return kb, true
97+
}
98+
6699
func (b *backendInProcess) ListCollections() ([]string, error) {
67100
return rag.ListAllCollections(b.cfg.CollectionDBPath), nil
68101
}
@@ -80,9 +113,7 @@ func (b *backendInProcess) CreateCollection(name string) error {
80113
}
81114

82115
func (b *backendInProcess) Upload(collection, filename string, fileBody io.Reader) (string, error) {
83-
b.state.Mu.RLock()
84-
kb, exists := b.state.Collections[collection]
85-
b.state.Mu.RUnlock()
116+
kb, exists := b.lookup(collection)
86117
if !exists {
87118
return "", fmt.Errorf("collection not found: %s", collection)
88119
}
@@ -108,29 +139,23 @@ func (b *backendInProcess) Upload(collection, filename string, fileBody io.Reade
108139
}
109140

110141
func (b *backendInProcess) ListEntries(collection string) ([]string, error) {
111-
b.state.Mu.RLock()
112-
kb, exists := b.state.Collections[collection]
113-
b.state.Mu.RUnlock()
142+
kb, exists := b.lookup(collection)
114143
if !exists {
115144
return nil, fmt.Errorf("collection not found: %s", collection)
116145
}
117146
return kb.ListDocuments(), nil
118147
}
119148

120149
func (b *backendInProcess) GetEntryContent(collection, entry string) (string, int, error) {
121-
b.state.Mu.RLock()
122-
kb, exists := b.state.Collections[collection]
123-
b.state.Mu.RUnlock()
150+
kb, exists := b.lookup(collection)
124151
if !exists {
125152
return "", 0, fmt.Errorf("collection not found: %s", collection)
126153
}
127154
return kb.GetEntryFileContent(entry)
128155
}
129156

130157
func (b *backendInProcess) Search(collection, query string, maxResults int) ([]SearchResult, error) {
131-
b.state.Mu.RLock()
132-
kb, exists := b.state.Collections[collection]
133-
b.state.Mu.RUnlock()
158+
kb, exists := b.lookup(collection)
134159
if !exists {
135160
return nil, fmt.Errorf("collection not found: %s", collection)
136161
}
@@ -159,22 +184,18 @@ func (b *backendInProcess) Search(collection, query string, maxResults int) ([]S
159184
}
160185

161186
func (b *backendInProcess) Reset(collection string) error {
162-
b.state.Mu.Lock()
163-
kb, exists := b.state.Collections[collection]
164-
if exists {
165-
delete(b.state.Collections, collection)
166-
}
167-
b.state.Mu.Unlock()
187+
kb, exists := b.lookup(collection)
168188
if !exists {
169189
return fmt.Errorf("collection not found: %s", collection)
170190
}
191+
b.state.Mu.Lock()
192+
delete(b.state.Collections, collection)
193+
b.state.Mu.Unlock()
171194
return kb.Reset()
172195
}
173196

174197
func (b *backendInProcess) DeleteEntry(collection, entry string) ([]string, error) {
175-
b.state.Mu.RLock()
176-
kb, exists := b.state.Collections[collection]
177-
b.state.Mu.RUnlock()
198+
kb, exists := b.lookup(collection)
178199
if !exists {
179200
return nil, fmt.Errorf("collection not found: %s", collection)
180201
}
@@ -186,9 +207,7 @@ func (b *backendInProcess) DeleteEntry(collection, entry string) ([]string, erro
186207
}
187208

188209
func (b *backendInProcess) AddSource(collection, url string, intervalMin int) error {
189-
b.state.Mu.RLock()
190-
kb, exists := b.state.Collections[collection]
191-
b.state.Mu.RUnlock()
210+
kb, exists := b.lookup(collection)
192211
if !exists {
193212
return fmt.Errorf("collection not found: %s", collection)
194213
}
@@ -201,9 +220,7 @@ func (b *backendInProcess) RemoveSource(collection, url string) error {
201220
}
202221

203222
func (b *backendInProcess) ListSources(collection string) ([]SourceInfo, error) {
204-
b.state.Mu.RLock()
205-
kb, exists := b.state.Collections[collection]
206-
b.state.Mu.RUnlock()
223+
kb, exists := b.lookup(collection)
207224
if !exists {
208225
return nil, fmt.Errorf("collection not found: %s", collection)
209226
}
@@ -220,19 +237,15 @@ func (b *backendInProcess) ListSources(collection string) ([]SourceInfo, error)
220237
}
221238

222239
func (b *backendInProcess) GetEntryFilePath(collection, entry string) (string, error) {
223-
b.state.Mu.RLock()
224-
kb, exists := b.state.Collections[collection]
225-
b.state.Mu.RUnlock()
240+
kb, exists := b.lookup(collection)
226241
if !exists {
227242
return "", fmt.Errorf("collection not found: %s", collection)
228243
}
229244
return kb.GetEntryFilePath(entry)
230245
}
231246

232247
func (b *backendInProcess) EntryExists(collection, entry string) bool {
233-
b.state.Mu.RLock()
234-
kb, exists := b.state.Collections[collection]
235-
b.state.Mu.RUnlock()
248+
kb, exists := b.lookup(collection)
236249
if !exists {
237250
return false
238251
}
@@ -257,8 +270,14 @@ func NewInProcessBackend(cfg *Config) (Backend, *State) {
257270
colls := rag.ListAllCollections(cfg.CollectionDBPath)
258271
for _, c := range colls {
259272
collection := newVectorEngine(cfg.VectorEngine, openAIClient, cfg.LLMAPIURL, cfg.LLMAPIKey, c, cfg.CollectionDBPath, cfg.FileAssets, cfg.EmbeddingModel, cfg.DatabaseURL, cfg.MaxChunkingSize, cfg.ChunkOverlap)
273+
// Register every on-disk collection — even when the engine wrapper
274+
// failed to construct (e.g. the embedding service was momentarily
275+
// unreachable). A nil entry marks "known on disk but not yet loaded";
276+
// backendInProcess.lookup will rehydrate lazily on first access so a
277+
// transient outage at boot doesn't permanently 404 collections whose
278+
// data is still on disk / in the vector DB.
279+
st.Collections[c] = collection
260280
if collection != nil {
261-
st.Collections[c] = collection
262281
st.SourceManager.RegisterCollection(c, collection)
263282
}
264283
}

0 commit comments

Comments
 (0)