Skip to content

Commit 6f8236c

Browse files
committed
feat: add cross-repo protocol linking and community detection
Cross-project matching: - Endpoint registry collects all producers/consumers during indexing - _crosslinks.db stores cross-project links with confidence scores (exact=0.95 for identical strings, normalized=0.85 for case/separator diffs) - cross_project_links MCP tool with protocol/project/identifier filters Community detection: - Louvain algorithm for discovering tightly-coupled node clusters - Per-protocol community assignment
1 parent 51d0e98 commit 6f8236c

4 files changed

Lines changed: 1003 additions & 0 deletions

File tree

src/pipeline/pass_communities.c

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* pass_communities.c — Pipeline pass that runs Louvain community detection
3+
* on all service-linking edges and creates Community nodes + MEMBER_OF edges.
4+
*
5+
* Runs after pass_servicelinks, before pass_configlink.
6+
*/
7+
#include "pipeline_internal.h"
8+
#include "servicelink.h"
9+
#include "store/store.h"
10+
#include "foundation/log.h"
11+
#include "foundation/compat.h"
12+
13+
#include <stdlib.h>
14+
#include <stdio.h>
15+
#include <string.h>
16+
17+
/* ── Format int to string for logging ───────────────────────── */
18+
19+
static const char *itoa_cm(int val) {
20+
static CBM_TLS char bufs[4][32];
21+
static CBM_TLS int idx = 0;
22+
int i = idx;
23+
idx = (idx + 1) & 3;
24+
snprintf(bufs[i], sizeof(bufs[i]), "%d", val);
25+
return bufs[i];
26+
}
27+
28+
/* ── Edge types to feed into community detection ────────────── */
29+
30+
/* 3 base edge types + 14 SL_EDGE_* types = 17 total */
31+
static const char *COMMUNITY_EDGE_TYPES[] = {
32+
"CALLS",
33+
"HTTP_CALLS",
34+
"ASYNC_CALLS",
35+
SL_EDGE_GRAPHQL, SL_EDGE_GRPC, SL_EDGE_KAFKA, SL_EDGE_SQS,
36+
SL_EDGE_SNS, SL_EDGE_PUBSUB, SL_EDGE_WS, SL_EDGE_SSE,
37+
SL_EDGE_AMQP, SL_EDGE_MQTT, SL_EDGE_NATS, SL_EDGE_REDIS_PS,
38+
SL_EDGE_TRPC, SL_EDGE_EVBRIDGE
39+
};
40+
#define COMMUNITY_EDGE_TYPE_COUNT 17
41+
42+
/* ── qsort comparator for int64_t dedup ─────────────────────── */
43+
44+
static int cmp_i64(const void *a, const void *b) {
45+
int64_t va = *(const int64_t *)a;
46+
int64_t vb = *(const int64_t *)b;
47+
return (va > vb) - (va < vb);
48+
}
49+
50+
/* ── Main pass entry point ──────────────────────────────────── */
51+
52+
int cbm_pipeline_pass_communities(cbm_pipeline_ctx_t *ctx) {
53+
cbm_log_info("pass.communities.start");
54+
55+
/* Step 1: Collect all edges from the 17 edge types */
56+
int total_edge_cap = 0;
57+
for (int i = 0; i < COMMUNITY_EDGE_TYPE_COUNT; i++) {
58+
total_edge_cap += cbm_gbuf_edge_count_by_type(ctx->gbuf,
59+
COMMUNITY_EDGE_TYPES[i]);
60+
}
61+
62+
if (total_edge_cap == 0) {
63+
cbm_log_info("pass.communities.skip", "reason", "no_edges");
64+
return 0;
65+
}
66+
67+
/* Step 2: Build cbm_louvain_edge_t array */
68+
cbm_louvain_edge_t *lv_edges = calloc((size_t)total_edge_cap,
69+
sizeof(cbm_louvain_edge_t));
70+
if (!lv_edges) {
71+
cbm_log_warn("pass.communities.alloc_fail", "what", "edges");
72+
return 0;
73+
}
74+
75+
/* Also collect raw node IDs for dedup (max 2 per edge) */
76+
int64_t *raw_ids = calloc((size_t)total_edge_cap * 2, sizeof(int64_t));
77+
if (!raw_ids) {
78+
free(lv_edges);
79+
cbm_log_warn("pass.communities.alloc_fail", "what", "raw_ids");
80+
return 0;
81+
}
82+
83+
int lv_edge_count = 0;
84+
int raw_id_count = 0;
85+
86+
for (int i = 0; i < COMMUNITY_EDGE_TYPE_COUNT; i++) {
87+
const cbm_gbuf_edge_t **edges = NULL;
88+
int count = 0;
89+
if (cbm_gbuf_find_edges_by_type(ctx->gbuf, COMMUNITY_EDGE_TYPES[i],
90+
&edges, &count) != 0)
91+
continue;
92+
for (int j = 0; j < count; j++) {
93+
lv_edges[lv_edge_count].src = edges[j]->source_id;
94+
lv_edges[lv_edge_count].dst = edges[j]->target_id;
95+
lv_edge_count++;
96+
97+
raw_ids[raw_id_count++] = edges[j]->source_id;
98+
raw_ids[raw_id_count++] = edges[j]->target_id;
99+
}
100+
}
101+
102+
if (lv_edge_count == 0) {
103+
free(lv_edges);
104+
free(raw_ids);
105+
cbm_log_info("pass.communities.skip", "reason", "no_edges_collected");
106+
return 0;
107+
}
108+
109+
/* Step 3: Deduplicate node IDs */
110+
qsort(raw_ids, (size_t)raw_id_count, sizeof(int64_t), cmp_i64);
111+
112+
int64_t *nodes = calloc((size_t)raw_id_count, sizeof(int64_t));
113+
if (!nodes) {
114+
free(lv_edges);
115+
free(raw_ids);
116+
cbm_log_warn("pass.communities.alloc_fail", "what", "nodes");
117+
return 0;
118+
}
119+
120+
int node_count = 0;
121+
for (int i = 0; i < raw_id_count; i++) {
122+
if (node_count == 0 || raw_ids[i] != nodes[node_count - 1]) {
123+
nodes[node_count++] = raw_ids[i];
124+
}
125+
}
126+
free(raw_ids);
127+
128+
cbm_log_info("pass.communities.collected",
129+
"edges", itoa_cm(lv_edge_count),
130+
"nodes", itoa_cm(node_count));
131+
132+
/* Step 4: Run Louvain */
133+
cbm_louvain_result_t *results = NULL;
134+
int result_count = 0;
135+
int rc = cbm_louvain(nodes, node_count, lv_edges, lv_edge_count,
136+
&results, &result_count);
137+
free(lv_edges);
138+
free(nodes);
139+
140+
if (rc != 0) {
141+
cbm_log_warn("pass.communities.louvain_error", "rc", itoa_cm(rc));
142+
free(results);
143+
return 0; /* non-fatal */
144+
}
145+
146+
if (result_count == 0) {
147+
free(results);
148+
cbm_log_info("pass.communities.done", "communities", "0", "members", "0");
149+
return 0;
150+
}
151+
152+
/* Step 5: Group results by community ID — find max community */
153+
int max_community = 0;
154+
for (int i = 0; i < result_count; i++) {
155+
if (results[i].community > max_community)
156+
max_community = results[i].community;
157+
}
158+
159+
/* Count members per community */
160+
int *member_counts = calloc((size_t)(max_community + 1), sizeof(int));
161+
if (!member_counts) {
162+
free(results);
163+
cbm_log_warn("pass.communities.alloc_fail", "what", "member_counts");
164+
return 0;
165+
}
166+
for (int i = 0; i < result_count; i++) {
167+
member_counts[results[i].community]++;
168+
}
169+
170+
/* Step 5b: Create Community nodes */
171+
int communities_created = 0;
172+
int64_t *community_node_ids = calloc((size_t)(max_community + 1),
173+
sizeof(int64_t));
174+
if (!community_node_ids) {
175+
free(results);
176+
free(member_counts);
177+
cbm_log_warn("pass.communities.alloc_fail", "what", "community_node_ids");
178+
return 0;
179+
}
180+
181+
for (int c = 0; c <= max_community; c++) {
182+
if (member_counts[c] == 0)
183+
continue;
184+
185+
char qn[256];
186+
snprintf(qn, sizeof(qn), "%s.community.%d", ctx->project_name, c);
187+
188+
char props[64];
189+
snprintf(props, sizeof(props), "{\"member_count\":%d}", member_counts[c]);
190+
191+
char name[64];
192+
snprintf(name, sizeof(name), "community_%d", c);
193+
194+
int64_t nid = cbm_gbuf_upsert_node(ctx->gbuf, "Community", name, qn,
195+
"", 0, 0, props);
196+
if (nid == 0) {
197+
cbm_log_warn("pass.communities.node_fail", "community", itoa_cm(c));
198+
continue;
199+
}
200+
community_node_ids[c] = nid;
201+
communities_created++;
202+
}
203+
204+
/* Step 6: Create MEMBER_OF edges from each member to its community */
205+
int edges_created = 0;
206+
for (int i = 0; i < result_count; i++) {
207+
int c = results[i].community;
208+
if (community_node_ids[c] == 0)
209+
continue;
210+
211+
int64_t eid = cbm_gbuf_insert_edge(ctx->gbuf, results[i].node_id,
212+
community_node_ids[c],
213+
"MEMBER_OF", "{}");
214+
if (eid != 0)
215+
edges_created++;
216+
}
217+
218+
/* Step 7: Cleanup */
219+
free(results);
220+
free(member_counts);
221+
free(community_node_ids);
222+
223+
cbm_log_info("pass.communities.done",
224+
"communities", itoa_cm(communities_created),
225+
"members", itoa_cm(edges_created));
226+
227+
return 0;
228+
}

0 commit comments

Comments
 (0)