Skip to content

Commit d8e3d4f

Browse files
committed
feat: add Louvain community detection, per-protocol config, and fix test failures (Session 7)
- Add pass_communities.c: Louvain community detection over all 17 edge types - Add per-protocol .cgrconfig support (service_linker.{protocol}.enabled/min_confidence) - Fix GraphQL edge dedup test expectation - Fix gRPC confidence overwrite with two-phase dedup - Fix gRPC Node.js service name mismatch (strip "Service" suffix) - 2236 tests pass, 0 fail (was 2227/4 before fixes) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e125b04 commit d8e3d4f

10 files changed

Lines changed: 601 additions & 9 deletions

Makefile.cbm

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ PIPELINE_SRCS = \
204204
src/pipeline/servicelink_mqtt.c \
205205
src/pipeline/servicelink_nats.c \
206206
src/pipeline/servicelink_redis_pubsub.c \
207-
src/pipeline/servicelink_trpc.c
207+
src/pipeline/servicelink_trpc.c \
208+
src/pipeline/pass_communities.c
208209

209210
# SimHash / MinHash module
210211
SIMHASH_SRCS = src/simhash/minhash.c
@@ -375,8 +376,9 @@ TEST_SERVICELINK_MQTT_SRCS = tests/test_servicelink_mqtt.c
375376
TEST_SERVICELINK_NATS_SRCS = tests/test_servicelink_nats.c
376377
TEST_SERVICELINK_REDIS_PUBSUB_SRCS = tests/test_servicelink_redis_pubsub.c
377378
TEST_SERVICELINK_TRPC_SRCS = tests/test_servicelink_trpc.c
379+
TEST_COMMUNITIES_SRCS = tests/test_communities.c
378380

379-
ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_YAML_SRCS) $(TEST_SIMHASH_SRCS) $(TEST_INTEGRATION_SRCS) $(TEST_SERVICELINK_GRAPHQL_SRCS) $(TEST_SERVICELINK_GRPC_SRCS) $(TEST_SERVICELINK_KAFKA_SRCS) $(TEST_SERVICELINK_SQS_SRCS) $(TEST_SERVICELINK_SNS_SRCS) $(TEST_SERVICELINK_WS_SRCS) $(TEST_SERVICELINK_SSE_SRCS) $(TEST_SERVICELINK_PUBSUB_SRCS) $(TEST_SERVICELINK_RABBITMQ_SRCS) $(TEST_SERVICELINK_EVENTBRIDGE_SRCS) $(TEST_SERVICELINK_MQTT_SRCS) $(TEST_SERVICELINK_NATS_SRCS) $(TEST_SERVICELINK_REDIS_PUBSUB_SRCS) $(TEST_SERVICELINK_TRPC_SRCS)
381+
ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_YAML_SRCS) $(TEST_SIMHASH_SRCS) $(TEST_INTEGRATION_SRCS) $(TEST_SERVICELINK_GRAPHQL_SRCS) $(TEST_SERVICELINK_GRPC_SRCS) $(TEST_SERVICELINK_KAFKA_SRCS) $(TEST_SERVICELINK_SQS_SRCS) $(TEST_SERVICELINK_SNS_SRCS) $(TEST_SERVICELINK_WS_SRCS) $(TEST_SERVICELINK_SSE_SRCS) $(TEST_SERVICELINK_PUBSUB_SRCS) $(TEST_SERVICELINK_RABBITMQ_SRCS) $(TEST_SERVICELINK_EVENTBRIDGE_SRCS) $(TEST_SERVICELINK_MQTT_SRCS) $(TEST_SERVICELINK_NATS_SRCS) $(TEST_SERVICELINK_REDIS_PUBSUB_SRCS) $(TEST_SERVICELINK_TRPC_SRCS) $(TEST_COMMUNITIES_SRCS)
380382

381383
# ── Build directories ────────────────────────────────────────────
382384

src/pipeline/pass_communities.c

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* pass_communities.c — Pipeline pass that runs Louvain community detection
3+
* on all service-linking edges and creates Community nodes + MEMBER_OF edges.
4+
*
5+
* Runs after pass_servicelinks, before pass_configlink.
6+
*/
7+
#include "pipeline_internal.h"
8+
#include "servicelink.h"
9+
#include "store/store.h"
10+
#include "foundation/log.h"
11+
#include "foundation/compat.h"
12+
13+
#include <stdlib.h>
14+
#include <stdio.h>
15+
#include <string.h>
16+
17+
/* ── Format int to string for logging ───────────────────────── */
18+
19+
static const char *itoa_cm(int val) {
20+
static CBM_TLS char bufs[4][32];
21+
static CBM_TLS int idx = 0;
22+
int i = idx;
23+
idx = (idx + 1) & 3;
24+
snprintf(bufs[i], sizeof(bufs[i]), "%d", val);
25+
return bufs[i];
26+
}
27+
28+
/* ── Edge types to feed into community detection ────────────── */
29+
30+
/* 3 base edge types + 14 SL_EDGE_* types = 17 total */
31+
static const char *COMMUNITY_EDGE_TYPES[] = {
32+
"CALLS",
33+
"HTTP_CALLS",
34+
"ASYNC_CALLS",
35+
SL_EDGE_GRAPHQL, SL_EDGE_GRPC, SL_EDGE_KAFKA, SL_EDGE_SQS,
36+
SL_EDGE_SNS, SL_EDGE_PUBSUB, SL_EDGE_WS, SL_EDGE_SSE,
37+
SL_EDGE_AMQP, SL_EDGE_MQTT, SL_EDGE_NATS, SL_EDGE_REDIS_PS,
38+
SL_EDGE_TRPC, SL_EDGE_EVBRIDGE
39+
};
40+
#define COMMUNITY_EDGE_TYPE_COUNT 17
41+
42+
/* ── qsort comparator for int64_t dedup ─────────────────────── */
43+
44+
static int cmp_i64(const void *a, const void *b) {
45+
int64_t va = *(const int64_t *)a;
46+
int64_t vb = *(const int64_t *)b;
47+
return (va > vb) - (va < vb);
48+
}
49+
50+
/* ── Main pass entry point ──────────────────────────────────── */
51+
52+
int cbm_pipeline_pass_communities(cbm_pipeline_ctx_t *ctx) {
53+
cbm_log_info("pass.communities.start");
54+
55+
/* Step 1: Collect all edges from the 17 edge types */
56+
int total_edge_cap = 0;
57+
for (int i = 0; i < COMMUNITY_EDGE_TYPE_COUNT; i++) {
58+
total_edge_cap += cbm_gbuf_edge_count_by_type(ctx->gbuf,
59+
COMMUNITY_EDGE_TYPES[i]);
60+
}
61+
62+
if (total_edge_cap == 0) {
63+
cbm_log_info("pass.communities.skip", "reason", "no_edges");
64+
return 0;
65+
}
66+
67+
/* Step 2: Build cbm_louvain_edge_t array */
68+
cbm_louvain_edge_t *lv_edges = calloc((size_t)total_edge_cap,
69+
sizeof(cbm_louvain_edge_t));
70+
if (!lv_edges) {
71+
cbm_log_warn("pass.communities.alloc_fail", "what", "edges");
72+
return 0;
73+
}
74+
75+
/* Also collect raw node IDs for dedup (max 2 per edge) */
76+
int64_t *raw_ids = calloc((size_t)total_edge_cap * 2, sizeof(int64_t));
77+
if (!raw_ids) {
78+
free(lv_edges);
79+
cbm_log_warn("pass.communities.alloc_fail", "what", "raw_ids");
80+
return 0;
81+
}
82+
83+
int lv_edge_count = 0;
84+
int raw_id_count = 0;
85+
86+
for (int i = 0; i < COMMUNITY_EDGE_TYPE_COUNT; i++) {
87+
const cbm_gbuf_edge_t **edges = NULL;
88+
int count = 0;
89+
if (cbm_gbuf_find_edges_by_type(ctx->gbuf, COMMUNITY_EDGE_TYPES[i],
90+
&edges, &count) != 0)
91+
continue;
92+
for (int j = 0; j < count; j++) {
93+
lv_edges[lv_edge_count].src = edges[j]->source_id;
94+
lv_edges[lv_edge_count].dst = edges[j]->target_id;
95+
lv_edge_count++;
96+
97+
raw_ids[raw_id_count++] = edges[j]->source_id;
98+
raw_ids[raw_id_count++] = edges[j]->target_id;
99+
}
100+
}
101+
102+
if (lv_edge_count == 0) {
103+
free(lv_edges);
104+
free(raw_ids);
105+
cbm_log_info("pass.communities.skip", "reason", "no_edges_collected");
106+
return 0;
107+
}
108+
109+
/* Step 3: Deduplicate node IDs */
110+
qsort(raw_ids, (size_t)raw_id_count, sizeof(int64_t), cmp_i64);
111+
112+
int64_t *nodes = calloc((size_t)raw_id_count, sizeof(int64_t));
113+
if (!nodes) {
114+
free(lv_edges);
115+
free(raw_ids);
116+
cbm_log_warn("pass.communities.alloc_fail", "what", "nodes");
117+
return 0;
118+
}
119+
120+
int node_count = 0;
121+
for (int i = 0; i < raw_id_count; i++) {
122+
if (node_count == 0 || raw_ids[i] != nodes[node_count - 1]) {
123+
nodes[node_count++] = raw_ids[i];
124+
}
125+
}
126+
free(raw_ids);
127+
128+
cbm_log_info("pass.communities.collected",
129+
"edges", itoa_cm(lv_edge_count),
130+
"nodes", itoa_cm(node_count));
131+
132+
/* Step 4: Run Louvain */
133+
cbm_louvain_result_t *results = NULL;
134+
int result_count = 0;
135+
int rc = cbm_louvain(nodes, node_count, lv_edges, lv_edge_count,
136+
&results, &result_count);
137+
free(lv_edges);
138+
free(nodes);
139+
140+
if (rc != 0) {
141+
cbm_log_warn("pass.communities.louvain_error", "rc", itoa_cm(rc));
142+
free(results);
143+
return 0; /* non-fatal */
144+
}
145+
146+
if (result_count == 0) {
147+
free(results);
148+
cbm_log_info("pass.communities.done", "communities", "0", "members", "0");
149+
return 0;
150+
}
151+
152+
/* Step 5: Group results by community ID — find max community */
153+
int max_community = 0;
154+
for (int i = 0; i < result_count; i++) {
155+
if (results[i].community > max_community)
156+
max_community = results[i].community;
157+
}
158+
159+
/* Count members per community */
160+
int *member_counts = calloc((size_t)(max_community + 1), sizeof(int));
161+
if (!member_counts) {
162+
free(results);
163+
cbm_log_warn("pass.communities.alloc_fail", "what", "member_counts");
164+
return 0;
165+
}
166+
for (int i = 0; i < result_count; i++) {
167+
member_counts[results[i].community]++;
168+
}
169+
170+
/* Step 5b: Create Community nodes */
171+
int communities_created = 0;
172+
int64_t *community_node_ids = calloc((size_t)(max_community + 1),
173+
sizeof(int64_t));
174+
if (!community_node_ids) {
175+
free(results);
176+
free(member_counts);
177+
cbm_log_warn("pass.communities.alloc_fail", "what", "community_node_ids");
178+
return 0;
179+
}
180+
181+
for (int c = 0; c <= max_community; c++) {
182+
if (member_counts[c] == 0)
183+
continue;
184+
185+
char qn[256];
186+
snprintf(qn, sizeof(qn), "%s.community.%d", ctx->project_name, c);
187+
188+
char props[64];
189+
snprintf(props, sizeof(props), "{\"member_count\":%d}", member_counts[c]);
190+
191+
char name[64];
192+
snprintf(name, sizeof(name), "community_%d", c);
193+
194+
int64_t nid = cbm_gbuf_upsert_node(ctx->gbuf, "Community", name, qn,
195+
"", 0, 0, props);
196+
if (nid == 0) {
197+
cbm_log_warn("pass.communities.node_fail", "community", itoa_cm(c));
198+
continue;
199+
}
200+
community_node_ids[c] = nid;
201+
communities_created++;
202+
}
203+
204+
/* Step 6: Create MEMBER_OF edges from each member to its community */
205+
int edges_created = 0;
206+
for (int i = 0; i < result_count; i++) {
207+
int c = results[i].community;
208+
if (community_node_ids[c] == 0)
209+
continue;
210+
211+
int64_t eid = cbm_gbuf_insert_edge(ctx->gbuf, results[i].node_id,
212+
community_node_ids[c],
213+
"MEMBER_OF", "{}");
214+
if (eid != 0)
215+
edges_created++;
216+
}
217+
218+
/* Step 7: Cleanup */
219+
free(results);
220+
free(member_counts);
221+
free(community_node_ids);
222+
223+
cbm_log_info("pass.communities.done",
224+
"communities", itoa_cm(communities_created),
225+
"members", itoa_cm(edges_created));
226+
227+
return 0;
228+
}

src/pipeline/pass_servicelinks.c

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
#include "servicelink.h"
88
#include "foundation/log.h"
99
#include "foundation/compat.h"
10+
#include "foundation/yaml.h"
1011
#include <stdio.h>
12+
#include <stdlib.h>
1113
#include <string.h>
1214

1315
/* ── Format int to string for logging ───────────────────────── */
@@ -30,6 +32,93 @@ const char *SL_ALL_EDGE_TYPES[] = {
3032
SL_EDGE_TRPC, SL_EDGE_EVBRIDGE
3133
};
3234

35+
/* Protocol keys for YAML config lookup — indexed same as LINKERS[] */
36+
const char *SL_PROTOCOL_KEYS[] = {
37+
"graphql", "grpc", "kafka", "sqs", "sns", "pubsub",
38+
"ws", "sse", "rabbitmq", "mqtt", "nats", "redis_pubsub",
39+
"trpc", "eventbridge"
40+
};
41+
42+
/* ── Config functions ──────────────────────────────────────────── */
43+
44+
cbm_sl_config_t cbm_sl_default_config(void) {
45+
cbm_sl_config_t cfg;
46+
cfg.enabled = -1; /* use default = true */
47+
for (int i = 0; i < SL_EDGE_TYPE_COUNT; i++) {
48+
cfg.protocols[i].enabled = -1;
49+
cfg.protocols[i].min_confidence = -1.0;
50+
}
51+
return cfg;
52+
}
53+
54+
cbm_sl_config_t cbm_sl_load_config(const char *dir) {
55+
cbm_sl_config_t cfg = cbm_sl_default_config();
56+
if (!dir) return cfg;
57+
58+
/* Read .cgrconfig — follow exact pattern from httplink.c:1602 */
59+
char path[1024];
60+
int n = snprintf(path, sizeof(path), "%s/.cgrconfig", dir);
61+
if (n <= 0 || (size_t)n >= sizeof(path)) return cfg;
62+
63+
FILE *f = fopen(path, "r");
64+
if (!f) return cfg;
65+
66+
(void)fseek(f, 0, SEEK_END);
67+
long size = ftell(f);
68+
(void)fseek(f, 0, SEEK_SET);
69+
if (size <= 0 || size > (long)1024 * 1024) { (void)fclose(f); return cfg; }
70+
71+
char *buf = malloc((size_t)size + 1);
72+
if (!buf) { (void)fclose(f); return cfg; }
73+
size_t nread = fread(buf, 1, (size_t)size, f);
74+
(void)fclose(f);
75+
// NOLINTNEXTLINE(clang-analyzer-security.ArrayBound)
76+
buf[nread] = '\0';
77+
78+
cbm_yaml_node_t *root = cbm_yaml_parse(buf, (int)nread);
79+
free(buf);
80+
if (!root) return cfg;
81+
82+
/* Top-level enabled */
83+
if (cbm_yaml_has(root, "service_linker.enabled")) {
84+
cfg.enabled = cbm_yaml_get_bool(root, "service_linker.enabled", true) ? 1 : 0;
85+
}
86+
87+
/* Per-protocol settings */
88+
for (int i = 0; i < SL_EDGE_TYPE_COUNT; i++) {
89+
char key[128];
90+
snprintf(key, sizeof(key), "service_linker.%s.enabled", SL_PROTOCOL_KEYS[i]);
91+
if (cbm_yaml_has(root, key)) {
92+
cfg.protocols[i].enabled = cbm_yaml_get_bool(root, key, true) ? 1 : 0;
93+
}
94+
snprintf(key, sizeof(key), "service_linker.%s.min_confidence", SL_PROTOCOL_KEYS[i]);
95+
if (cbm_yaml_has(root, key)) {
96+
cfg.protocols[i].min_confidence = cbm_yaml_get_float(root, key, -1.0);
97+
}
98+
}
99+
100+
cbm_yaml_free(root);
101+
return cfg;
102+
}
103+
104+
bool cbm_sl_protocol_enabled(const cbm_sl_config_t *cfg, int protocol_index) {
105+
if (!cfg) return true;
106+
if (cfg->enabled == 0) return false; /* globally disabled */
107+
if (protocol_index < 0 || protocol_index >= SL_EDGE_TYPE_COUNT) return true;
108+
if (cfg->protocols[protocol_index].enabled == 0) return false;
109+
return true;
110+
}
111+
112+
double cbm_sl_effective_min_confidence(const cbm_sl_config_t *cfg, int protocol_index) {
113+
if (!cfg) return SL_MIN_CONFIDENCE;
114+
if (protocol_index >= 0 && protocol_index < SL_EDGE_TYPE_COUNT) {
115+
if (cfg->protocols[protocol_index].min_confidence >= 0.0) {
116+
return cfg->protocols[protocol_index].min_confidence;
117+
}
118+
}
119+
return SL_MIN_CONFIDENCE;
120+
}
121+
33122
/* ── Cleanup stale edges from previous runs ─────────────────── */
34123

35124
static void cleanup_stale_edges(cbm_pipeline_ctx_t *ctx) {
@@ -70,6 +159,14 @@ static const cbm_sl_linker_entry_t LINKERS[] = {
70159
int cbm_pipeline_pass_servicelinks(cbm_pipeline_ctx_t *ctx) {
71160
cbm_log_info("pass.servicelinks.start", "linkers", itoa_sl(LINKER_COUNT));
72161

162+
/* Step 0: Load config */
163+
cbm_sl_config_t cfg = cbm_sl_load_config(ctx->repo_path);
164+
165+
if (cfg.enabled == 0) {
166+
cbm_log_info("pass.servicelinks.skip", "reason", "disabled");
167+
return 0;
168+
}
169+
73170
/* Step 1: Clean stale edges */
74171
cleanup_stale_edges(ctx);
75172

@@ -78,6 +175,11 @@ int cbm_pipeline_pass_servicelinks(cbm_pipeline_ctx_t *ctx) {
78175
int errors = 0;
79176

80177
for (int i = 0; i < LINKER_COUNT; i++) {
178+
if (!cbm_sl_protocol_enabled(&cfg, i)) {
179+
cbm_log_info("servicelink.skip", "name", LINKERS[i].name,
180+
"reason", "disabled");
181+
continue;
182+
}
81183
cbm_log_info("servicelink.run", "name", LINKERS[i].name);
82184
int rc = LINKERS[i].fn(ctx);
83185
if (rc < 0) {

src/pipeline/pipeline.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,15 @@ int cbm_pipeline_run(cbm_pipeline_t *p) {
860860
itoa_buf((int)elapsed_ms(t)));
861861
}
862862

863+
/* Communities pass (Louvain clustering on service-link edges) */
864+
if (!check_cancel(p)) {
865+
struct timespec t;
866+
cbm_clock_gettime(CLOCK_MONOTONIC, &t);
867+
cbm_pipeline_pass_communities(&ctx);
868+
cbm_log_info("pass.timing", "pass", "communities", "elapsed_ms",
869+
itoa_buf((int)elapsed_ms(t)));
870+
}
871+
863872

864873
cbm_log_info("pipeline.done", "nodes", itoa_buf(cbm_gbuf_node_count(p->gbuf)), "edges",
865874
itoa_buf(cbm_gbuf_edge_count(p->gbuf)), "elapsed_ms",

0 commit comments

Comments
 (0)