Skip to content

Commit 32ec2f2

Browse files
Shidfarclaude
andcommitted
feat: add endpoint persistence and cross-project protocol matching
Task 2: Add protocol_endpoints table to store DDL, implement cbm_persist_endpoints() in pass_crossrepolinks.c, wire endpoint lifecycle into pipeline.c (allocate, persist after dump, free on cleanup). 4 new persistence tests. Task 4: Add cbm_cross_project_link() that scans cache_dir for project DBs, loads protocol_endpoints, matches producers to consumers across project boundaries (exact=0.95, normalized=0.85 confidence), writes results to _crosslinks.db. Wired into pipeline after persist. 6 new cross-project link tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 641d9b9 commit 32ec2f2

7 files changed

Lines changed: 862 additions & 2 deletions

File tree

Makefile.cbm

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ PIPELINE_SRCS = \
205205
src/pipeline/servicelink_nats.c \
206206
src/pipeline/servicelink_redis_pubsub.c \
207207
src/pipeline/servicelink_trpc.c \
208-
src/pipeline/pass_communities.c
208+
src/pipeline/pass_communities.c \
209+
src/pipeline/pass_crossrepolinks.c
209210

210211
# SimHash / MinHash module
211212
SIMHASH_SRCS = src/simhash/minhash.c
@@ -378,8 +379,10 @@ TEST_SERVICELINK_REDIS_PUBSUB_SRCS = tests/test_servicelink_redis_pubsub.c
378379
TEST_SERVICELINK_TRPC_SRCS = tests/test_servicelink_trpc.c
379380
TEST_COMMUNITIES_SRCS = tests/test_communities.c
380381
TEST_ENDPOINT_REGISTRY_SRCS = tests/test_endpoint_registry.c
382+
TEST_ENDPOINT_PERSISTENCE_SRCS = tests/test_endpoint_persistence.c
383+
TEST_CROSS_PROJECT_LINKS_SRCS = tests/test_cross_project_links.c
381384

382-
ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_YAML_SRCS) $(TEST_SIMHASH_SRCS) $(TEST_INTEGRATION_SRCS) $(TEST_SERVICELINK_GRAPHQL_SRCS) $(TEST_SERVICELINK_GRPC_SRCS) $(TEST_SERVICELINK_KAFKA_SRCS) $(TEST_SERVICELINK_SQS_SRCS) $(TEST_SERVICELINK_SNS_SRCS) $(TEST_SERVICELINK_WS_SRCS) $(TEST_SERVICELINK_SSE_SRCS) $(TEST_SERVICELINK_PUBSUB_SRCS) $(TEST_SERVICELINK_RABBITMQ_SRCS) $(TEST_SERVICELINK_EVENTBRIDGE_SRCS) $(TEST_SERVICELINK_MQTT_SRCS) $(TEST_SERVICELINK_NATS_SRCS) $(TEST_SERVICELINK_REDIS_PUBSUB_SRCS) $(TEST_SERVICELINK_TRPC_SRCS) $(TEST_COMMUNITIES_SRCS) $(TEST_ENDPOINT_REGISTRY_SRCS)
385+
ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_YAML_SRCS) $(TEST_SIMHASH_SRCS) $(TEST_INTEGRATION_SRCS) $(TEST_SERVICELINK_GRAPHQL_SRCS) $(TEST_SERVICELINK_GRPC_SRCS) $(TEST_SERVICELINK_KAFKA_SRCS) $(TEST_SERVICELINK_SQS_SRCS) $(TEST_SERVICELINK_SNS_SRCS) $(TEST_SERVICELINK_WS_SRCS) $(TEST_SERVICELINK_SSE_SRCS) $(TEST_SERVICELINK_PUBSUB_SRCS) $(TEST_SERVICELINK_RABBITMQ_SRCS) $(TEST_SERVICELINK_EVENTBRIDGE_SRCS) $(TEST_SERVICELINK_MQTT_SRCS) $(TEST_SERVICELINK_NATS_SRCS) $(TEST_SERVICELINK_REDIS_PUBSUB_SRCS) $(TEST_SERVICELINK_TRPC_SRCS) $(TEST_COMMUNITIES_SRCS) $(TEST_ENDPOINT_REGISTRY_SRCS) $(TEST_ENDPOINT_PERSISTENCE_SRCS) $(TEST_CROSS_PROJECT_LINKS_SRCS)
383386

384387
# ── Build directories ────────────────────────────────────────────
385388

src/pipeline/pass_crossrepolinks.c

Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
/*
2+
* pass_crossrepolinks.c — Cross-project protocol endpoint matching.
3+
*
4+
* Two entry points:
5+
* 1. cbm_persist_endpoints() — write discovered endpoints to a project's .db
6+
* 2. cbm_cross_project_link() — scan all project DBs, match producers to
7+
* consumers across project boundaries, write to _crosslinks.db
8+
*/
9+
#include "servicelink.h"
10+
#include "foundation/log.h"
11+
#include "foundation/platform.h"
12+
#include "foundation/compat.h"
13+
#include <store/store.h>
14+
#include <sqlite3.h>
15+
#include <stdio.h>
16+
#include <stdlib.h>
17+
#include <string.h>
18+
#include <ctype.h>
19+
#include <dirent.h>
20+
#include <time.h>
21+
22+
/* Thread-local int-to-string helper (same pattern as pipeline.c itoa_buf). */
23+
static const char *itoa_buf(int val) {
24+
static CBM_TLS char bufs[4][32];
25+
static CBM_TLS int idx = 0;
26+
int i = idx;
27+
idx = (idx + 1) & 3;
28+
snprintf(bufs[i], sizeof(bufs[i]), "%d", val);
29+
return bufs[i];
30+
}
31+
32+
/* ── Endpoint Persistence ─────────────────────────────────────────── */
33+
34+
int cbm_persist_endpoints(const char *db_path, const char *project,
35+
const cbm_sl_endpoint_list_t *endpoints) {
36+
if (!db_path || !project || !endpoints || endpoints->count == 0) return 0;
37+
38+
cbm_store_t *store = cbm_store_open_path(db_path);
39+
if (!store) {
40+
cbm_log_warn("persist_endpoints.open_failed", "path", db_path);
41+
return -1;
42+
}
43+
44+
/* Ensure table exists (for DBs created before this feature) */
45+
cbm_store_exec(store,
46+
"CREATE TABLE IF NOT EXISTS protocol_endpoints ("
47+
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
48+
" project TEXT NOT NULL,"
49+
" protocol TEXT NOT NULL,"
50+
" role TEXT NOT NULL,"
51+
" identifier TEXT NOT NULL,"
52+
" node_qn TEXT NOT NULL,"
53+
" file_path TEXT NOT NULL,"
54+
" extra TEXT DEFAULT '{}',"
55+
" UNIQUE(project, protocol, role, identifier, node_qn)"
56+
");");
57+
58+
/* Clear stale endpoints for this project */
59+
{
60+
sqlite3_stmt *del = NULL;
61+
sqlite3_prepare_v2(cbm_store_get_db(store),
62+
"DELETE FROM protocol_endpoints WHERE project = ?;", -1, &del, NULL);
63+
if (del) {
64+
sqlite3_bind_text(del, 1, project, -1, SQLITE_STATIC);
65+
sqlite3_step(del);
66+
sqlite3_finalize(del);
67+
}
68+
}
69+
70+
/* Insert all endpoints using prepared statement */
71+
cbm_store_exec(store, "BEGIN TRANSACTION;");
72+
sqlite3_stmt *ins = NULL;
73+
sqlite3_prepare_v2(cbm_store_get_db(store),
74+
"INSERT OR IGNORE INTO protocol_endpoints "
75+
"(project, protocol, role, identifier, node_qn, file_path, extra) "
76+
"VALUES (?,?,?,?,?,?,?);", -1, &ins, NULL);
77+
78+
if (ins) {
79+
for (int i = 0; i < endpoints->count; i++) {
80+
const cbm_sl_endpoint_t *ep = &endpoints->items[i];
81+
sqlite3_bind_text(ins, 1, ep->project, -1, SQLITE_STATIC);
82+
sqlite3_bind_text(ins, 2, ep->protocol, -1, SQLITE_STATIC);
83+
sqlite3_bind_text(ins, 3, ep->role, -1, SQLITE_STATIC);
84+
sqlite3_bind_text(ins, 4, ep->identifier, -1, SQLITE_STATIC);
85+
sqlite3_bind_text(ins, 5, ep->node_qn, -1, SQLITE_STATIC);
86+
sqlite3_bind_text(ins, 6, ep->file_path, -1, SQLITE_STATIC);
87+
sqlite3_bind_text(ins, 7, ep->extra, -1, SQLITE_STATIC);
88+
sqlite3_step(ins);
89+
sqlite3_reset(ins);
90+
}
91+
sqlite3_finalize(ins);
92+
}
93+
cbm_store_exec(store, "COMMIT;");
94+
95+
cbm_log_info("persist_endpoints.done", "count", itoa_buf(endpoints->count));
96+
cbm_store_close(store);
97+
return endpoints->count;
98+
}
99+
100+
/* ── Cross-Project Matching ──────────────────────────────────────── */
101+
102+
/* Collected endpoint from any project DB */
103+
typedef struct {
104+
char project[256];
105+
char protocol[32];
106+
char role[16];
107+
char identifier[256];
108+
char node_qn[512];
109+
char file_path[256];
110+
char identifier_norm[256]; /* lowercased, separators stripped */
111+
} xl_endpoint_t;
112+
113+
/* Normalize identifier for matching: lowercase, strip -, _, . */
114+
static void normalize_identifier(const char *src, char *dst, int dst_sz) {
115+
int j = 0;
116+
for (int i = 0; src[i] && j < dst_sz - 1; i++) {
117+
char c = src[i];
118+
if (c == '-' || c == '_' || c == '.') continue;
119+
dst[j++] = (c >= 'A' && c <= 'Z') ? (char)(c + 32) : c;
120+
}
121+
dst[j] = '\0';
122+
}
123+
124+
/* Load endpoints from a single project DB */
125+
static int load_endpoints_from_db(const char *db_path,
126+
xl_endpoint_t **out, int *out_count,
127+
int *out_cap) {
128+
sqlite3 *db = NULL;
129+
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, NULL) != SQLITE_OK) {
130+
return -1;
131+
}
132+
133+
/* Check if table exists */
134+
sqlite3_stmt *check = NULL;
135+
if (sqlite3_prepare_v2(db,
136+
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='protocol_endpoints';",
137+
-1, &check, NULL) != SQLITE_OK) {
138+
sqlite3_close(db);
139+
return -1;
140+
}
141+
int has_table = (sqlite3_step(check) == SQLITE_ROW);
142+
sqlite3_finalize(check);
143+
if (!has_table) {
144+
sqlite3_close(db);
145+
return 0; /* no table — old DB, skip silently */
146+
}
147+
148+
sqlite3_stmt *stmt = NULL;
149+
if (sqlite3_prepare_v2(db,
150+
"SELECT project, protocol, role, identifier, node_qn, file_path "
151+
"FROM protocol_endpoints;", -1, &stmt, NULL) != SQLITE_OK) {
152+
sqlite3_close(db);
153+
return -1;
154+
}
155+
156+
int added = 0;
157+
while (sqlite3_step(stmt) == SQLITE_ROW) {
158+
if (*out_count >= *out_cap) {
159+
int new_cap = (*out_cap == 0) ? 1024 : *out_cap * 2;
160+
xl_endpoint_t *new_buf = realloc(*out, (size_t)new_cap * sizeof(xl_endpoint_t));
161+
if (!new_buf) break;
162+
*out = new_buf;
163+
*out_cap = new_cap;
164+
}
165+
xl_endpoint_t *ep = &(*out)[*out_count];
166+
memset(ep, 0, sizeof(*ep));
167+
const char *col;
168+
col = (const char *)sqlite3_column_text(stmt, 0);
169+
if (col) snprintf(ep->project, sizeof(ep->project), "%s", col);
170+
col = (const char *)sqlite3_column_text(stmt, 1);
171+
if (col) snprintf(ep->protocol, sizeof(ep->protocol), "%s", col);
172+
col = (const char *)sqlite3_column_text(stmt, 2);
173+
if (col) snprintf(ep->role, sizeof(ep->role), "%s", col);
174+
col = (const char *)sqlite3_column_text(stmt, 3);
175+
if (col) snprintf(ep->identifier, sizeof(ep->identifier), "%s", col);
176+
col = (const char *)sqlite3_column_text(stmt, 4);
177+
if (col) snprintf(ep->node_qn, sizeof(ep->node_qn), "%s", col);
178+
col = (const char *)sqlite3_column_text(stmt, 5);
179+
if (col) snprintf(ep->file_path, sizeof(ep->file_path), "%s", col);
180+
181+
normalize_identifier(ep->identifier, ep->identifier_norm,
182+
(int)sizeof(ep->identifier_norm));
183+
(*out_count)++;
184+
added++;
185+
}
186+
sqlite3_finalize(stmt);
187+
sqlite3_close(db);
188+
return added;
189+
}
190+
191+
/* Write cross-links to _crosslinks.db */
192+
static int write_crosslinks(const char *cache_dir,
193+
const xl_endpoint_t *endpoints, int count) {
194+
char db_path[1024];
195+
snprintf(db_path, sizeof(db_path), "%s/_crosslinks.db", cache_dir);
196+
197+
sqlite3 *db = NULL;
198+
if (sqlite3_open(db_path, &db) != SQLITE_OK) {
199+
cbm_log_error("crosslink.open_failed", "path", db_path);
200+
return -1;
201+
}
202+
203+
/* Create schema */
204+
sqlite3_exec(db,
205+
"CREATE TABLE IF NOT EXISTS cross_links ("
206+
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
207+
" protocol TEXT NOT NULL,"
208+
" identifier TEXT NOT NULL,"
209+
" producer_project TEXT NOT NULL,"
210+
" producer_qn TEXT NOT NULL,"
211+
" producer_file TEXT NOT NULL,"
212+
" consumer_project TEXT NOT NULL,"
213+
" consumer_qn TEXT NOT NULL,"
214+
" consumer_file TEXT NOT NULL,"
215+
" confidence REAL NOT NULL,"
216+
" updated_at TEXT NOT NULL,"
217+
" UNIQUE(protocol, identifier, producer_qn, consumer_qn)"
218+
");", NULL, NULL, NULL);
219+
220+
/* Full rebuild */
221+
sqlite3_exec(db, "DELETE FROM cross_links;", NULL, NULL, NULL);
222+
223+
/* Get current timestamp */
224+
char timestamp[64];
225+
time_t now = time(NULL);
226+
struct tm *tm = gmtime(&now);
227+
strftime(timestamp, sizeof(timestamp), "%Y-%m-%dT%H:%M:%SZ", tm);
228+
229+
sqlite3_stmt *ins = NULL;
230+
sqlite3_prepare_v2(db,
231+
"INSERT OR IGNORE INTO cross_links "
232+
"(protocol, identifier, producer_project, producer_qn, producer_file, "
233+
" consumer_project, consumer_qn, consumer_file, confidence, updated_at) "
234+
"VALUES (?,?,?,?,?,?,?,?,?,?);", -1, &ins, NULL);
235+
236+
sqlite3_exec(db, "BEGIN TRANSACTION;", NULL, NULL, NULL);
237+
238+
int link_count = 0;
239+
240+
/* O(n^2) matching — acceptable for expected sizes (few thousand endpoints) */
241+
for (int pi = 0; pi < count; pi++) {
242+
if (strcmp(endpoints[pi].role, "producer") != 0) continue;
243+
const xl_endpoint_t *prod = &endpoints[pi];
244+
245+
for (int ci = 0; ci < count; ci++) {
246+
if (strcmp(endpoints[ci].role, "consumer") != 0) continue;
247+
const xl_endpoint_t *cons = &endpoints[ci];
248+
249+
/* Skip same project */
250+
if (strcmp(prod->project, cons->project) == 0) continue;
251+
/* Must be same protocol */
252+
if (strcmp(prod->protocol, cons->protocol) != 0) continue;
253+
254+
double confidence = 0.0;
255+
const char *match_ident = prod->identifier;
256+
257+
/* Exact match */
258+
if (strcmp(prod->identifier, cons->identifier) == 0) {
259+
confidence = 0.95;
260+
}
261+
/* Normalized match */
262+
else if (strcmp(prod->identifier_norm, cons->identifier_norm) == 0 &&
263+
prod->identifier_norm[0] != '\0') {
264+
confidence = 0.85;
265+
}
266+
267+
if (confidence > 0.0 && ins) {
268+
sqlite3_bind_text(ins, 1, prod->protocol, -1, SQLITE_STATIC);
269+
sqlite3_bind_text(ins, 2, match_ident, -1, SQLITE_STATIC);
270+
sqlite3_bind_text(ins, 3, prod->project, -1, SQLITE_STATIC);
271+
sqlite3_bind_text(ins, 4, prod->node_qn, -1, SQLITE_STATIC);
272+
sqlite3_bind_text(ins, 5, prod->file_path, -1, SQLITE_STATIC);
273+
sqlite3_bind_text(ins, 6, cons->project, -1, SQLITE_STATIC);
274+
sqlite3_bind_text(ins, 7, cons->node_qn, -1, SQLITE_STATIC);
275+
sqlite3_bind_text(ins, 8, cons->file_path, -1, SQLITE_STATIC);
276+
sqlite3_bind_double(ins, 9, confidence);
277+
sqlite3_bind_text(ins, 10, timestamp, -1, SQLITE_STATIC);
278+
sqlite3_step(ins);
279+
sqlite3_reset(ins);
280+
link_count++;
281+
}
282+
}
283+
}
284+
285+
sqlite3_exec(db, "COMMIT;", NULL, NULL, NULL);
286+
if (ins) sqlite3_finalize(ins);
287+
sqlite3_close(db);
288+
return link_count;
289+
}
290+
291+
/* Main entry point: scan cache_dir for *.db, load endpoints, match across projects */
292+
int cbm_cross_project_link(const char *cache_dir) {
293+
if (!cache_dir) return -1;
294+
295+
cbm_log_info("crosslink.start", "cache_dir", cache_dir);
296+
297+
DIR *dir = opendir(cache_dir);
298+
if (!dir) {
299+
cbm_log_warn("crosslink.opendir_failed", "dir", cache_dir);
300+
return -1;
301+
}
302+
303+
/* Collect all endpoints from all project DBs */
304+
xl_endpoint_t *all_endpoints = NULL;
305+
int total = 0, cap = 0;
306+
307+
struct dirent *ent;
308+
while ((ent = readdir(dir)) != NULL) {
309+
const char *name = ent->d_name;
310+
int len = (int)strlen(name);
311+
312+
/* Skip non-.db files */
313+
if (len < 4 || strcmp(name + len - 3, ".db") != 0) continue;
314+
/* Skip _crosslinks.db, tmp-*, _* */
315+
if (name[0] == '_' || strncmp(name, "tmp-", 4) == 0) continue;
316+
317+
char db_path[1024];
318+
snprintf(db_path, sizeof(db_path), "%s/%s", cache_dir, name);
319+
320+
int loaded = load_endpoints_from_db(db_path, &all_endpoints, &total, &cap);
321+
if (loaded > 0) {
322+
cbm_log_info("crosslink.loaded", "db", name,
323+
"endpoints", itoa_buf(loaded));
324+
}
325+
}
326+
closedir(dir);
327+
328+
if (total == 0) {
329+
cbm_log_info("crosslink.done", "links", "0", "reason", "no_endpoints");
330+
free(all_endpoints);
331+
return 0;
332+
}
333+
334+
/* Match across projects and write to _crosslinks.db */
335+
int links = write_crosslinks(cache_dir, all_endpoints, total);
336+
337+
cbm_log_info("crosslink.done", "total_endpoints", itoa_buf(total),
338+
"cross_links", itoa_buf(links));
339+
340+
free(all_endpoints);
341+
return links;
342+
}

0 commit comments

Comments
 (0)