Skip to content

Commit 4bece46

Browse files
committed
Added database queue for memory operations
1 parent 7fee741 commit 4bece46

5 files changed

Lines changed: 188 additions & 129 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,3 +360,5 @@ dist
360360
!.yarn/versions
361361

362362
benchmark
363+
temp
364+
dashboard

backend/src/ai/mcp.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ export const create_mcp_srv = () => {
6767
metadata: z.record(z.any()).optional().describe('Arbitrary metadata blob'),
6868
user_id: z.string().trim().min(1).optional().describe('Associate the memory with a specific user identifier')
6969
}, async ({ content, tags, metadata, user_id }) => {
70+
// Note: add_hsg_memory internally queues writes to handle concurrent parallel calls
71+
// This prevents SQLite "transaction within transaction" errors when clients like
72+
// Gemini CLI make multiple simultaneous store requests
7073
const u = uid(user_id)
7174
const res = await add_hsg_memory(content, j(tags || []), metadata, u)
7275
if (u) update_user_summary(u).catch(err => console.error('[MCP] user summary update failed:', err))

backend/src/core/queue.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
export class job_q<t> {
2+
private q: t[] = []
3+
private busy = false
4+
5+
constructor(public delay = 0) { }
6+
7+
add(x: t): Promise<void> {
8+
return new Promise((ok, no) => {
9+
this.q.push({ item: x, ok, no } as any)
10+
this.run()
11+
})
12+
}
13+
14+
private async run() {
15+
if (this.busy) return
16+
this.busy = true
17+
while (this.q.length) {
18+
const job = this.q.shift()!
19+
if (this.delay > 0) await new Promise(r => setTimeout(r, this.delay))
20+
const { item, ok } = job as any
21+
ok(item)
22+
}
23+
this.busy = false
24+
}
25+
}
26+
27+
export class db_q {
28+
private q = new job_q<() => Promise<any>>(0)
29+
30+
async exec<t>(fn: () => Promise<t>): Promise<t> {
31+
return new Promise((ok, no) => {
32+
this.q.add(async () => {
33+
try {
34+
const res = await fn()
35+
ok(res)
36+
} catch (err) {
37+
no(err)
38+
}
39+
})
40+
})
41+
}
42+
}
43+
44+
export const dbq = new db_q()

backend/src/memory/hsg.ts

Lines changed: 110 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { canonical_token_set } from '../utils/text'
33
import { inc_q, dec_q } from './decay'
44
import { env, tier } from '../core/cfg'
55
import { cos_sim, buf_to_vec, vec_to_buf } from '../utils/index'
6+
import { dbq } from '../core/queue'
67
export interface sector_cfg {
78
model: string
89
decay_lambda: number
@@ -672,86 +673,88 @@ export async function add_hsg_memory(
672673
metadata?: any,
673674
user_id?: string
674675
): Promise<{ id: string, primary_sector: string, sectors: string[], chunks?: number, deduplicated?: boolean }> {
675-
const simhash = compute_simhash(content)
676-
const existing = await q.get_mem_by_simhash.get(simhash)
677-
if (existing && hamming_dist(simhash, existing.simhash) <= 3) {
678-
const now = Date.now()
679-
const boosted_sal = Math.min(1, existing.salience + 0.15)
680-
await q.upd_seen.run(existing.id, now, boosted_sal, now)
681-
return {
682-
id: existing.id,
683-
primary_sector: existing.primary_sector,
684-
sectors: [existing.primary_sector],
685-
deduplicated: true
686-
}
687-
}
688-
const id = crypto.randomUUID()
689-
const now = Date.now()
690-
const chunks = chunk_text(content)
691-
const use_chunking = chunks.length > 1
692-
const classification = classify_content(content, metadata)
693-
const all_sectors = [classification.primary, ...classification.additional]
694-
await transaction.begin()
695-
try {
696-
const max_seg_res = await q.get_max_segment.get()
697-
let cur_seg = max_seg_res?.max_seg ?? 0
698-
const seg_cnt_res = await q.get_segment_count.get(cur_seg)
699-
const seg_cnt = seg_cnt_res?.c ?? 0
700-
if (seg_cnt >= env.seg_size) {
701-
cur_seg++
702-
console.log(`[HSG] Rotated to segment ${cur_seg} (previous segment full: ${seg_cnt} memories)`)
703-
}
704-
const stored_content = extract_essence(content, classification.primary, env.summary_max_length)
705-
const sec_cfg = sector_configs[classification.primary]
706-
const init_sal = Math.max(0, Math.min(1, 0.4 + 0.1 * classification.additional.length))
707-
await q.ins_mem.run(
708-
id,
709-
user_id || null,
710-
cur_seg,
711-
stored_content,
712-
simhash,
713-
classification.primary,
714-
tags || null,
715-
JSON.stringify(metadata || {}),
716-
now,
717-
now,
718-
now,
719-
init_sal,
720-
sec_cfg.decay_lambda,
721-
1,
722-
null,
723-
null,
724-
null, // compressed_vec
725-
0 // feedback_score
726-
)
727-
const emb_res = await embedMultiSector(id, content, all_sectors, use_chunking ? chunks : undefined)
728-
for (const result of emb_res) {
729-
const vec_buf = vectorToBuffer(result.vector)
730-
await q.ins_vec.run(id, result.sector, vec_buf, result.dim)
676+
return dbq.exec(async () => {
677+
const simhash = compute_simhash(content)
678+
const existing = await q.get_mem_by_simhash.get(simhash)
679+
if (existing && hamming_dist(simhash, existing.simhash) <= 3) {
680+
const now = Date.now()
681+
const boosted_sal = Math.min(1, existing.salience + 0.15)
682+
await q.upd_seen.run(existing.id, now, boosted_sal, now)
683+
return {
684+
id: existing.id,
685+
primary_sector: existing.primary_sector,
686+
sectors: [existing.primary_sector],
687+
deduplicated: true
688+
}
731689
}
732-
const mean_vec = calc_mean_vec(emb_res, all_sectors)
733-
const mean_vec_buf = vectorToBuffer(mean_vec)
734-
await q.upd_mean_vec.run(id, mean_vec.length, mean_vec_buf)
690+
const id = crypto.randomUUID()
691+
const now = Date.now()
692+
const chunks = chunk_text(content)
693+
const use_chunking = chunks.length > 1
694+
const classification = classify_content(content, metadata)
695+
const all_sectors = [classification.primary, ...classification.additional]
696+
await transaction.begin()
697+
try {
698+
const max_seg_res = await q.get_max_segment.get()
699+
let cur_seg = max_seg_res?.max_seg ?? 0
700+
const seg_cnt_res = await q.get_segment_count.get(cur_seg)
701+
const seg_cnt = seg_cnt_res?.c ?? 0
702+
if (seg_cnt >= env.seg_size) {
703+
cur_seg++
704+
console.log(`[HSG] Rotated to segment ${cur_seg} (previous segment full: ${seg_cnt} memories)`)
705+
}
706+
const stored_content = extract_essence(content, classification.primary, env.summary_max_length)
707+
const sec_cfg = sector_configs[classification.primary]
708+
const init_sal = Math.max(0, Math.min(1, 0.4 + 0.1 * classification.additional.length))
709+
await q.ins_mem.run(
710+
id,
711+
user_id || null,
712+
cur_seg,
713+
stored_content,
714+
simhash,
715+
classification.primary,
716+
tags || null,
717+
JSON.stringify(metadata || {}),
718+
now,
719+
now,
720+
now,
721+
init_sal,
722+
sec_cfg.decay_lambda,
723+
1,
724+
null,
725+
null,
726+
null, // compressed_vec
727+
0 // feedback_score
728+
)
729+
const emb_res = await embedMultiSector(id, content, all_sectors, use_chunking ? chunks : undefined)
730+
for (const result of emb_res) {
731+
const vec_buf = vectorToBuffer(result.vector)
732+
await q.ins_vec.run(id, result.sector, vec_buf, result.dim)
733+
}
734+
const mean_vec = calc_mean_vec(emb_res, all_sectors)
735+
const mean_vec_buf = vectorToBuffer(mean_vec)
736+
await q.upd_mean_vec.run(id, mean_vec.length, mean_vec_buf)
735737

736-
// Store compressed vector for smart tier (for future query optimization)
737-
if (tier === 'smart' && mean_vec.length > 128) {
738-
const comp = compress_vec_for_storage(mean_vec, 128)
739-
const comp_buf = vectorToBuffer(comp)
740-
await q.upd_compressed_vec.run(comp_buf, id)
741-
}
738+
// Store compressed vector for smart tier (for future query optimization)
739+
if (tier === 'smart' && mean_vec.length > 128) {
740+
const comp = compress_vec_for_storage(mean_vec, 128)
741+
const comp_buf = vectorToBuffer(comp)
742+
await q.upd_compressed_vec.run(comp_buf, id)
743+
}
742744

743-
await create_single_waypoint(id, mean_vec, now)
744-
await transaction.commit()
745-
return {
746-
id,
747-
primary_sector: classification.primary,
748-
sectors: all_sectors,
749-
chunks: chunks.length
745+
await create_single_waypoint(id, mean_vec, now)
746+
await transaction.commit()
747+
return {
748+
id,
749+
primary_sector: classification.primary,
750+
sectors: all_sectors,
751+
chunks: chunks.length
752+
}
753+
} catch (error) {
754+
await transaction.rollback()
755+
throw error
750756
}
751-
} catch (error) {
752-
await transaction.rollback()
753-
throw error
754-
}
757+
})
755758
}
756759
export async function reinforce_memory(id: string, boost: number = 0.1): Promise<void> {
757760
const mem = await q.get_mem.get(id)
@@ -765,35 +768,37 @@ export async function update_memory(
765768
tags?: string[],
766769
metadata?: any
767770
): Promise<{ id: string, updated: boolean }> {
768-
const mem = await q.get_mem.get(id)
769-
if (!mem) throw new Error(`Memory ${id} not found`)
770-
const new_content = content !== undefined ? content : mem.content
771-
const new_tags = tags !== undefined ? j(tags) : (mem.tags || '[]')
772-
const new_meta = metadata !== undefined ? j(metadata) : (mem.meta || '{}')
773-
await transaction.begin()
774-
try {
775-
if (content !== undefined && content !== mem.content) {
776-
const chunks = chunk_text(new_content)
777-
const use_chunking = chunks.length > 1
778-
const classification = classify_content(new_content, metadata)
779-
const all_sectors = [classification.primary, ...classification.additional]
780-
await q.del_vec.run(id)
781-
const emb_res = await embedMultiSector(id, new_content, all_sectors, use_chunking ? chunks : undefined)
782-
for (const result of emb_res) {
783-
const vec_buf = vectorToBuffer(result.vector)
784-
await q.ins_vec.run(id, result.sector, vec_buf, result.dim)
771+
return dbq.exec(async () => {
772+
const mem = await q.get_mem.get(id)
773+
if (!mem) throw new Error(`Memory ${id} not found`)
774+
const new_content = content !== undefined ? content : mem.content
775+
const new_tags = tags !== undefined ? j(tags) : (mem.tags || '[]')
776+
const new_meta = metadata !== undefined ? j(metadata) : (mem.meta || '{}')
777+
await transaction.begin()
778+
try {
779+
if (content !== undefined && content !== mem.content) {
780+
const chunks = chunk_text(new_content)
781+
const use_chunking = chunks.length > 1
782+
const classification = classify_content(new_content, metadata)
783+
const all_sectors = [classification.primary, ...classification.additional]
784+
await q.del_vec.run(id)
785+
const emb_res = await embedMultiSector(id, new_content, all_sectors, use_chunking ? chunks : undefined)
786+
for (const result of emb_res) {
787+
const vec_buf = vectorToBuffer(result.vector)
788+
await q.ins_vec.run(id, result.sector, vec_buf, result.dim)
789+
}
790+
const mean_vec = calc_mean_vec(emb_res, all_sectors)
791+
const mean_vec_buf = vectorToBuffer(mean_vec)
792+
await q.upd_mean_vec.run(id, mean_vec.length, mean_vec_buf)
793+
await q.upd_mem_with_sector.run(new_content, classification.primary, new_tags, new_meta, Date.now(), id)
794+
} else {
795+
await q.upd_mem.run(new_content, new_tags, new_meta, Date.now(), id)
785796
}
786-
const mean_vec = calc_mean_vec(emb_res, all_sectors)
787-
const mean_vec_buf = vectorToBuffer(mean_vec)
788-
await q.upd_mean_vec.run(id, mean_vec.length, mean_vec_buf)
789-
await q.upd_mem_with_sector.run(new_content, classification.primary, new_tags, new_meta, Date.now(), id)
790-
} else {
791-
await q.upd_mem.run(new_content, new_tags, new_meta, Date.now(), id)
797+
await transaction.commit()
798+
return { id, updated: true }
799+
} catch (error) {
800+
await transaction.rollback()
801+
throw error
792802
}
793-
await transaction.commit()
794-
return { id, updated: true }
795-
} catch (error) {
796-
await transaction.rollback()
797-
throw error
798-
}
803+
})
799804
}

backend/src/ops/ingest.ts

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { add_hsg_memory } from '../memory/hsg'
22
import { q, transaction } from '../core/db'
33
import { rid, now, j } from '../utils'
44
import { extractText, ExtractionResult } from './extract'
5+
import { dbq } from '../core/queue'
56

67
const LG = 8000, SEC = 3000
78

@@ -28,19 +29,21 @@ const split = (t: string, sz: number): string[] => {
2829
}
2930

3031
const mkRoot = async (txt: string, ex: ExtractionResult, meta?: Record<string, unknown>) => {
31-
const sum = txt.length > 500 ? txt.slice(0, 500) + '...' : txt
32-
const cnt = `[Document: ${ex.metadata.content_type.toUpperCase()}]\n\n${sum}\n\n[Full content split across ${Math.ceil(txt.length / SEC)} sections]`
33-
const id = rid(), ts = now()
34-
await transaction.begin()
35-
try {
36-
await q.ins_mem.run(id, cnt, 'reflective', j([]), j({ ...meta, ...ex.metadata, is_root: true, ingestion_strategy: 'root-child', ingested_at: ts }), ts, ts, ts, 1.0, 0.1, 1, null, null)
37-
await transaction.commit()
38-
return id
39-
} catch (e) {
40-
console.error('[ERROR] Root failed:', e)
41-
await transaction.rollback()
42-
throw e
43-
}
32+
return dbq.exec(async () => {
33+
const sum = txt.length > 500 ? txt.slice(0, 500) + '...' : txt
34+
const cnt = `[Document: ${ex.metadata.content_type.toUpperCase()}]\n\n${sum}\n\n[Full content split across ${Math.ceil(txt.length / SEC)} sections]`
35+
const id = rid(), ts = now()
36+
await transaction.begin()
37+
try {
38+
await q.ins_mem.run(id, cnt, 'reflective', j([]), j({ ...meta, ...ex.metadata, is_root: true, ingestion_strategy: 'root-child', ingested_at: ts }), ts, ts, ts, 1.0, 0.1, 1, null, null)
39+
await transaction.commit()
40+
return id
41+
} catch (e) {
42+
console.error('[ERROR] Root failed:', e)
43+
await transaction.rollback()
44+
throw e
45+
}
46+
})
4447
}
4548

4649
const mkChild = async (txt: string, idx: number, tot: number, rid: string, meta?: Record<string, unknown>) => {
@@ -49,17 +52,19 @@ const mkChild = async (txt: string, idx: number, tot: number, rid: string, meta?
4952
}
5053

5154
const link = async (rid: string, cid: string, idx: number) => {
52-
const ts = now()
53-
await transaction.begin()
54-
try {
55-
await q.ins_waypoint.run(rid, cid, 1.0, ts, ts)
56-
await transaction.commit()
57-
console.log(`🔗 Link: ${rid.slice(0, 8)}${cid.slice(0, 8)} (${idx})`)
58-
} catch (e) {
59-
await transaction.rollback()
60-
console.error(`❌ Link failed for ${idx}:`, e)
61-
throw e
62-
}
55+
return dbq.exec(async () => {
56+
const ts = now()
57+
await transaction.begin()
58+
try {
59+
await q.ins_waypoint.run(rid, cid, 1.0, ts, ts)
60+
await transaction.commit()
61+
console.log(`🔗 Link: ${rid.slice(0, 8)}${cid.slice(0, 8)} (${idx})`)
62+
} catch (e) {
63+
await transaction.rollback()
64+
console.error(`❌ Link failed for ${idx}:`, e)
65+
throw e
66+
}
67+
})
6368
}
6469

6570
export async function ingestDocument(t: string, data: string | Buffer, meta?: Record<string, unknown>, cfg?: ingestion_cfg): Promise<IngestionResult> {

0 commit comments

Comments
 (0)