Skip to content

Commit df0f39a

Browse files
SimplyLizclaude
andcommitted
feat(lip): push-driven health via long-lived subscribe
CKB used to re-probe IndexStatus on a 60 s TTL every time the rerank path checked whether the LIP index was mixed-models. The LIP daemon has pushed IndexChanged frames to all active sessions since v1.5.0, so the polling was pure debt. New internal/lip/subscribe.go opens a long-lived connection, pings IndexStatus every 3 s to flush the daemon's broadcast channel (the session loop drains queued pushes only after writing a response), reads every frame in a loop, and routes index_changed and index_status by type tag. Reconnects with exponential backoff to 30 s on daemon drop. Engine owns one subscriber, started in NewEngine and cancelled in Close. The cached availability/mixed flags are now written by the subscriber, not the query path — lipSemanticAvailable is lock-free RLock+read, no RPC. Worst-case staleness for the rerank gate drops from 60 s to ~3 s. Tests rewritten: the fake daemon now serves multiple requests per connection and tests wait for the first health frame before asserting, plus a new TestLipSubscriber_ReusesSingleConnection verifies the hot path issues zero requests. Also: CHANGELOG entries for #208 and #209 which landed on develop without one. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent db0c4ed commit df0f39a

File tree

5 files changed

+391
-61
lines changed

5 files changed

+391
-61
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ All notable changes to CKB will be documented in this file.
44

55
## [Unreleased]
66

7+
### Changed
8+
9+
- **LIP health: push-driven, not polled** — the Engine now opens a long-lived
10+
connection to the LIP daemon at startup (`internal/lip/subscribe.go`) and
11+
receives `index_changed` frames plus per-ping `index_status` snapshots
12+
instead of issuing a fresh `IndexStatus` RPC on a 60 s TTL. Worst-case
13+
staleness for the mixed-models gate drops from 60 s to ~3 s, the hot query
14+
path is lock-free (no RPC, no dial, no TTL check), and the subscriber
15+
reconnects with exponential backoff when the daemon restarts. The old
16+
`lipHealthTTL` constant is gone; callers read the cached flag directly.
17+
718
### Added
819

920
- Troubleshooting section in `docs/plans/review-cicd.md` covering shallow

internal/lip/subscribe.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package lip
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"encoding/json"
7+
"errors"
8+
"io"
9+
"net"
10+
"sync"
11+
"time"
12+
)
13+
14+
// IndexChangedEvent carries the payload of a LIP `index_changed` push frame.
15+
// The daemon emits one per upsert with the total indexed-file count and the
16+
// URIs that were touched. `Pending` is populated opportunistically from the
17+
// piggybacked IndexStatus probe that keeps the push channel flushed — it may
18+
// be stale by a ping interval but is good enough for cache invalidation.
19+
type IndexChangedEvent struct {
20+
IndexedFiles int
21+
AffectedURIs []string
22+
}
23+
24+
// HealthEvent carries the index-health view fetched on each keepalive ping.
25+
// It is the push-equivalent of polling `IndexStatus`: the subscriber pulls
26+
// it once per ping interval instead of every query, and callers key off
27+
// `MixedModels` for the rerank gate.
28+
type HealthEvent struct {
29+
Available bool
30+
MixedModels bool
31+
IndexedFiles int
32+
ModelsInIndex []string
33+
}
34+
35+
// SubscribeHandler receives push events.
36+
// Implementations must be non-blocking — work should be dispatched to a
37+
// separate goroutine or written to a buffered channel.
38+
type SubscribeHandler interface {
39+
OnIndexChanged(IndexChangedEvent)
40+
OnHealth(HealthEvent)
41+
}
42+
43+
// pingInterval is how often the subscriber issues a cheap request to flush
44+
// the daemon's broadcast channel. The LIP daemon drains pending push frames
45+
// only after writing a response, so this caps the worst-case push latency.
46+
// Picked to give ~3 s freshness without generating meaningful daemon load.
47+
const pingInterval = 3 * time.Second
48+
49+
// reconnectMaxBackoff bounds the reconnect backoff so a long outage still
50+
// recovers within ~minute once the daemon returns.
51+
const reconnectMaxBackoff = 30 * time.Second
52+
53+
// Subscribe opens a long-lived connection to the LIP daemon, polls
54+
// `IndexStatus` on a short ticker as a keepalive, and dispatches every
55+
// `index_changed` frame the daemon pushes in between. It reconnects with
56+
// exponential backoff when the daemon is unavailable and returns when ctx
57+
// is cancelled.
58+
//
59+
// The daemon drains queued push frames only after writing a reply to a
60+
// client request (see `session.rs` run loop), so a purely passive reader
61+
// would never observe pushes. The ticker doubles as a health probe: every
62+
// response carries the current `IndexStatusInfo`, so callers get a
63+
// `HealthEvent` per interval for free — replacing the per-query
64+
// `IndexStatus` RPC the engine used to issue.
65+
func Subscribe(ctx context.Context, h SubscribeHandler) {
66+
backoff := 500 * time.Millisecond
67+
for {
68+
if ctx.Err() != nil {
69+
return
70+
}
71+
err := runSession(ctx, h)
72+
if ctx.Err() != nil {
73+
return
74+
}
75+
// Emit a final "unavailable" health event so consumers fail closed
76+
// quickly when the daemon drops.
77+
h.OnHealth(HealthEvent{Available: false})
78+
if err == nil {
79+
backoff = 500 * time.Millisecond
80+
}
81+
select {
82+
case <-ctx.Done():
83+
return
84+
case <-time.After(backoff):
85+
}
86+
backoff *= 2
87+
if backoff > reconnectMaxBackoff {
88+
backoff = reconnectMaxBackoff
89+
}
90+
}
91+
}
92+
93+
// runSession holds a single connection open: it writes a keepalive on
94+
// `pingInterval`, reads every incoming frame, and routes them by `type`.
95+
// Returns on any I/O error so the caller can reconnect.
96+
func runSession(ctx context.Context, h SubscribeHandler) error {
97+
conn, err := net.DialTimeout("unix", SocketPath(), 500*time.Millisecond)
98+
if err != nil {
99+
return err
100+
}
101+
defer conn.Close()
102+
103+
sessCtx, cancel := context.WithCancel(ctx)
104+
defer cancel()
105+
106+
// Writer goroutine: pings until the session is torn down.
107+
var writeMu sync.Mutex
108+
ping := func() error {
109+
writeMu.Lock()
110+
defer writeMu.Unlock()
111+
return writeFrame(conn, map[string]any{"type": "index_status"})
112+
}
113+
if err := ping(); err != nil {
114+
return err
115+
}
116+
117+
var wg sync.WaitGroup
118+
wg.Go(func() {
119+
t := time.NewTicker(pingInterval)
120+
defer t.Stop()
121+
for {
122+
select {
123+
case <-sessCtx.Done():
124+
return
125+
case <-t.C:
126+
if err := ping(); err != nil {
127+
cancel()
128+
return
129+
}
130+
}
131+
}
132+
})
133+
134+
// Reader loop: any I/O error tears down the session.
135+
for {
136+
if sessCtx.Err() != nil {
137+
break
138+
}
139+
frame, err := readFrame(conn)
140+
if err != nil {
141+
cancel()
142+
wg.Wait()
143+
return err
144+
}
145+
dispatchFrame(frame, h)
146+
}
147+
wg.Wait()
148+
return nil
149+
}
150+
151+
// dispatchFrame routes a decoded server frame to the handler by `type` tag.
152+
// The LIP wire format nests ServerMessage inside a `ServerResponse` envelope
153+
// with `ok`/`error` fields.
154+
func dispatchFrame(frame map[string]json.RawMessage, h SubscribeHandler) {
155+
// Drop envelope: ServerResponse { ok: ServerMessage, error: Option<String> }
156+
var inner map[string]json.RawMessage
157+
if raw, ok := frame["ok"]; ok && len(raw) > 0 && string(raw) != "null" {
158+
_ = json.Unmarshal(raw, &inner)
159+
} else {
160+
inner = frame
161+
}
162+
var kind string
163+
_ = json.Unmarshal(inner["type"], &kind)
164+
165+
switch kind {
166+
case "index_changed":
167+
var ev struct {
168+
IndexedFiles int `json:"indexed_files"`
169+
AffectedURIs []string `json:"affected_uris"`
170+
}
171+
if b, ok := marshalInner(inner); ok {
172+
_ = json.Unmarshal(b, &ev)
173+
}
174+
h.OnIndexChanged(IndexChangedEvent{
175+
IndexedFiles: ev.IndexedFiles,
176+
AffectedURIs: ev.AffectedURIs,
177+
})
178+
case "index_status":
179+
var s indexStatusResp
180+
if b, ok := marshalInner(inner); ok {
181+
_ = json.Unmarshal(b, &s)
182+
}
183+
h.OnHealth(HealthEvent{
184+
Available: true,
185+
MixedModels: s.MixedModels,
186+
IndexedFiles: s.IndexedFiles,
187+
ModelsInIndex: s.ModelsInIndex,
188+
})
189+
}
190+
}
191+
192+
func marshalInner(m map[string]json.RawMessage) ([]byte, bool) {
193+
b, err := json.Marshal(m)
194+
if err != nil {
195+
return nil, false
196+
}
197+
return b, true
198+
}
199+
200+
// writeFrame encodes a single length-prefixed JSON frame.
201+
func writeFrame(conn net.Conn, payload any) error {
202+
b, err := json.Marshal(payload)
203+
if err != nil {
204+
return err
205+
}
206+
lenBuf := make([]byte, 4)
207+
binary.BigEndian.PutUint32(lenBuf, uint32(len(b)))
208+
_ = conn.SetWriteDeadline(time.Now().Add(2 * time.Second))
209+
if _, err := conn.Write(append(lenBuf, b...)); err != nil {
210+
return err
211+
}
212+
return nil
213+
}
214+
215+
// readFrame reads one length-prefixed JSON frame with no overall deadline —
216+
// the ping loop detects dead connections via write failures instead, so the
217+
// reader can block indefinitely between pushes.
218+
func readFrame(conn net.Conn) (map[string]json.RawMessage, error) {
219+
_ = conn.SetReadDeadline(time.Time{})
220+
lenBuf := make([]byte, 4)
221+
if _, err := io.ReadFull(conn, lenBuf); err != nil {
222+
return nil, err
223+
}
224+
respLen := binary.BigEndian.Uint32(lenBuf)
225+
if respLen == 0 || respLen > 64<<20 {
226+
return nil, errors.New("lip: frame length out of range")
227+
}
228+
buf := make([]byte, respLen)
229+
if _, err := io.ReadFull(conn, buf); err != nil {
230+
return nil, err
231+
}
232+
var out map[string]json.RawMessage
233+
if err := json.Unmarshal(buf, &out); err != nil {
234+
return nil, err
235+
}
236+
return out, nil
237+
}

internal/query/engine.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,15 @@ type Engine struct {
6464
cachedState *RepoState
6565
stateComputedAt time.Time
6666

67-
// LIP health (cached; refreshed on a short TTL to avoid per-query RPCs).
67+
// LIP health, maintained by a background subscriber that keeps a long-lived
68+
// connection open and receives `index_changed` pushes plus per-ping health
69+
// snapshots. `lipHealthCheckedAt` is zero until the first frame arrives —
70+
// callers check it before trusting the flags.
6871
lipHealthMu sync.RWMutex
6972
cachedLipMixed bool
7073
cachedLipAvailable bool
7174
lipHealthCheckedAt time.Time
75+
lipSubCancel context.CancelFunc
7276

7377
// Cache stats
7478
cacheStatsMu sync.RWMutex
@@ -136,6 +140,8 @@ func NewEngine(repoRoot string, db *storage.DB, logger *slog.Logger, cfg *config
136140
// Don't fail - async operations will be unavailable
137141
}
138142

143+
engine.startLipSubscriber()
144+
139145
return engine, nil
140146
}
141147

@@ -440,6 +446,10 @@ func (e *Engine) DB() *storage.DB {
440446
func (e *Engine) Close() error {
441447
var lastErr error
442448

449+
if e.lipSubCancel != nil {
450+
e.lipSubCancel()
451+
}
452+
443453
// Stop job runner first
444454
if e.jobRunner != nil {
445455
if err := e.jobRunner.Stop(10 * time.Second); err != nil {

internal/query/lip_health.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,58 @@
11
package query
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/SimplyLiz/CodeMCP/internal/lip"
78
)
89

9-
// lipHealthTTL caps how often we re-probe the LIP daemon for index status.
10-
// IndexStatus is a 200 ms RPC, so we do not want this per-query.
11-
const lipHealthTTL = 60 * time.Second
12-
13-
// lipSemanticAvailable reports whether LIP semantic operations (rerank, semantic
14-
// search) can be trusted. Returns false when the daemon is unavailable OR when
15-
// the index contains vectors from more than one embedding model — cosine
16-
// similarity across different vector spaces is mathematically meaningless, so a
17-
// mixed-model index silently produces garbage rankings.
10+
// lipSemanticAvailable reports whether LIP semantic operations (rerank,
11+
// semantic search) can be trusted. The flag is maintained by the background
12+
// subscriber started in `startLipSubscriber` — no RPC happens here, so this
13+
// is safe to call on the hot query path. Returns false when:
14+
//
15+
// - the subscriber has not yet observed its first frame (daemon down at
16+
// startup, or engine created less than a ping ago),
17+
// - the most recent health frame reported the daemon unavailable,
18+
// - the index contains vectors from more than one embedding model, which
19+
// makes cross-vector cosine similarity mathematically meaningless.
1820
func (e *Engine) lipSemanticAvailable() bool {
1921
e.lipHealthMu.RLock()
20-
fresh := !e.lipHealthCheckedAt.IsZero() && time.Since(e.lipHealthCheckedAt) < lipHealthTTL
21-
avail, mixed := e.cachedLipAvailable, e.cachedLipMixed
22-
e.lipHealthMu.RUnlock()
23-
if fresh {
24-
return avail && !mixed
22+
defer e.lipHealthMu.RUnlock()
23+
if e.lipHealthCheckedAt.IsZero() {
24+
return false
2525
}
26+
return e.cachedLipAvailable && !e.cachedLipMixed
27+
}
2628

27-
status, _ := lip.IndexStatus()
28-
e.lipHealthMu.Lock()
29-
e.lipHealthCheckedAt = time.Now()
30-
if status == nil {
31-
e.cachedLipAvailable, e.cachedLipMixed = false, false
32-
} else {
33-
e.cachedLipAvailable, e.cachedLipMixed = true, status.MixedModels
34-
}
35-
avail, mixed = e.cachedLipAvailable, e.cachedLipMixed
36-
e.lipHealthMu.Unlock()
37-
return avail && !mixed
29+
// engineLipSubscriber is the adapter between `lip.Subscribe` and the
30+
// Engine's cached health flags. A dedicated type (instead of binding methods
31+
// to Engine) keeps the handler interface invisible to the rest of the
32+
// package.
33+
type engineLipSubscriber struct{ e *Engine }
34+
35+
func (s engineLipSubscriber) OnHealth(ev lip.HealthEvent) {
36+
s.e.lipHealthMu.Lock()
37+
defer s.e.lipHealthMu.Unlock()
38+
s.e.lipHealthCheckedAt = time.Now()
39+
s.e.cachedLipAvailable = ev.Available
40+
s.e.cachedLipMixed = ev.MixedModels
41+
}
42+
43+
func (s engineLipSubscriber) OnIndexChanged(_ lip.IndexChangedEvent) {
44+
// Pushes are handled opportunistically: the ping that follows carries the
45+
// refreshed IndexStatus, so we don't need to re-probe here. This hook
46+
// exists so future consumers (e.g. cache invalidation keyed on
47+
// AffectedURIs) can extend it without changing the transport.
48+
}
49+
50+
// startLipSubscriber launches the background subscriber goroutine. It is a
51+
// no-op-safe — if the daemon is absent, Subscribe backs off and retries
52+
// until Close is called. The first health frame lands within `pingInterval`
53+
// of daemon availability.
54+
func (e *Engine) startLipSubscriber() {
55+
ctx, cancel := context.WithCancel(context.Background())
56+
e.lipSubCancel = cancel
57+
go lip.Subscribe(ctx, engineLipSubscriber{e: e})
3858
}

0 commit comments

Comments
 (0)