Skip to content

Commit 09b4ec2

Browse files
committed
feat: add cross-service protocol linkers (GraphQL, gRPC, Kafka, SQS, SNS)
Implement 5 of 14 protocol linkers for the servicelink pipeline pass. Each linker discovers producers/consumers in source code via regex scanning and creates typed edges in the graph buffer. - servicelink.h: shared types, helpers, 14 linker declarations - pass_servicelinks.c: orchestrator with dispatch table - servicelink_graphql.c: SDL/resolver/client discovery, 3-tier confidence - servicelink_grpc.c: .proto/server/client discovery, wildcard matching - servicelink_kafka.c: producer/consumer patterns, exact topic matching - servicelink_sqs.c: send/receive patterns, URL/ARN queue name extraction - servicelink_sns.c: publish/subscribe patterns, ARN/Terraform topic extraction - 65 tests across 5 test files (all passing) - 9 remaining stubs for future sessions (WS, SSE, Pub/Sub, etc.) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 95d09cc commit 09b4ec2

15 files changed

Lines changed: 7066 additions & 2 deletions

Makefile.cbm

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,12 @@ PIPELINE_SRCS = \
178178
src/pipeline/pass_infrascan.c \
179179
src/pipeline/httplink.c \
180180
src/pipeline/pass_servicelinks.c \
181-
src/pipeline/servicelink_stubs.c
181+
src/pipeline/servicelink_stubs.c \
182+
src/pipeline/servicelink_graphql.c \
183+
src/pipeline/servicelink_grpc.c \
184+
src/pipeline/servicelink_kafka.c \
185+
src/pipeline/servicelink_sqs.c \
186+
src/pipeline/servicelink_sns.c
182187

183188
# Traces module (new)
184189
TRACES_SRCS = src/traces/traces.c
@@ -284,6 +289,16 @@ TEST_TRACES_SRCS = tests/test_traces.c
284289

285290
TEST_HTTPLINK_SRCS = tests/test_httplink.c
286291

292+
TEST_SERVICELINK_GRAPHQL_SRCS = tests/test_servicelink_graphql.c
293+
294+
TEST_SERVICELINK_GRPC_SRCS = tests/test_servicelink_grpc.c
295+
296+
TEST_SERVICELINK_KAFKA_SRCS = tests/test_servicelink_kafka.c
297+
298+
TEST_SERVICELINK_SQS_SRCS = tests/test_servicelink_sqs.c
299+
300+
TEST_SERVICELINK_SNS_SRCS = tests/test_servicelink_sns.c
301+
287302
TEST_CLI_SRCS = tests/test_cli.c
288303

289304
TEST_MEM_SRCS = tests/test_mem.c
@@ -292,7 +307,7 @@ TEST_UI_SRCS = tests/test_ui.c
292307

293308
TEST_SECURITY_SRCS = tests/test_security.c
294309

295-
ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_HTTPLINK_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_INTEGRATION_SRCS)
310+
ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_HTTPLINK_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_INTEGRATION_SRCS) $(TEST_SERVICELINK_GRAPHQL_SRCS) $(TEST_SERVICELINK_GRPC_SRCS) $(TEST_SERVICELINK_KAFKA_SRCS) $(TEST_SERVICELINK_SQS_SRCS) $(TEST_SERVICELINK_SNS_SRCS)
296311

297312
# ── Build directories ────────────────────────────────────────────
298313

src/pipeline/pass_servicelinks.c

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* pass_servicelinks.c — Pipeline pass that orchestrates all cross-service protocol linkers.
3+
*
4+
* Called after pass_httplinks. Runs each protocol linker sequentially.
5+
* Individual linker failures are logged but don't stop execution.
6+
*/
7+
#include "servicelink.h"
8+
#include "foundation/log.h"
9+
#include "foundation/compat.h"
10+
#include <stdio.h>
11+
#include <string.h>
12+
13+
/* ── Format int to string for logging ───────────────────────── */
14+
15+
static const char *itoa_sl(int val) {
16+
static CBM_TLS char bufs[4][32];
17+
static CBM_TLS int idx = 0;
18+
int i = idx;
19+
idx = (idx + 1) & 3;
20+
snprintf(bufs[i], sizeof(bufs[i]), "%d", val);
21+
return bufs[i];
22+
}
23+
24+
/* ── Edge type array (declared extern in servicelink.h) ─────── */
25+
26+
const char *SL_ALL_EDGE_TYPES[] = {
27+
SL_EDGE_GRAPHQL, SL_EDGE_GRPC, SL_EDGE_KAFKA, SL_EDGE_SQS,
28+
SL_EDGE_SNS, SL_EDGE_PUBSUB, SL_EDGE_WS, SL_EDGE_SSE,
29+
SL_EDGE_AMQP, SL_EDGE_MQTT, SL_EDGE_NATS, SL_EDGE_REDIS_PS,
30+
SL_EDGE_TRPC, SL_EDGE_EVBRIDGE
31+
};
32+
33+
/* ── Cleanup stale edges from previous runs ─────────────────── */
34+
35+
static void cleanup_stale_edges(cbm_pipeline_ctx_t *ctx) {
36+
for (int i = 0; i < SL_EDGE_TYPE_COUNT; i++) {
37+
cbm_gbuf_delete_edges_by_type(ctx->gbuf, SL_ALL_EDGE_TYPES[i]);
38+
}
39+
}
40+
41+
/* ── Linker dispatch table ──────────────────────────────────── */
42+
43+
typedef int (*cbm_sl_linker_fn)(cbm_pipeline_ctx_t *ctx);
44+
45+
typedef struct {
46+
const char *name;
47+
cbm_sl_linker_fn fn;
48+
} cbm_sl_linker_entry_t;
49+
50+
static const cbm_sl_linker_entry_t LINKERS[] = {
51+
{ "GraphQL", cbm_servicelink_graphql },
52+
{ "gRPC", cbm_servicelink_grpc },
53+
{ "Kafka", cbm_servicelink_kafka },
54+
{ "SQS", cbm_servicelink_sqs },
55+
{ "SNS", cbm_servicelink_sns },
56+
{ "Pub/Sub", cbm_servicelink_pubsub },
57+
{ "WebSocket", cbm_servicelink_ws },
58+
{ "SSE", cbm_servicelink_sse },
59+
{ "RabbitMQ", cbm_servicelink_rabbitmq },
60+
{ "MQTT", cbm_servicelink_mqtt },
61+
{ "NATS", cbm_servicelink_nats },
62+
{ "Redis Pub/Sub", cbm_servicelink_redis_pubsub },
63+
{ "tRPC", cbm_servicelink_trpc },
64+
{ "EventBridge", cbm_servicelink_eventbridge },
65+
};
66+
#define LINKER_COUNT (int)(sizeof(LINKERS) / sizeof(LINKERS[0]))
67+
68+
/* ── Main pass entry point ──────────────────────────────────── */
69+
70+
int cbm_pipeline_pass_servicelinks(cbm_pipeline_ctx_t *ctx) {
71+
cbm_log_info("pass.servicelinks.start", "linkers", itoa_sl(LINKER_COUNT));
72+
73+
/* Step 1: Clean stale edges */
74+
cleanup_stale_edges(ctx);
75+
76+
/* Step 2: Run each linker */
77+
int total_links = 0;
78+
int errors = 0;
79+
80+
for (int i = 0; i < LINKER_COUNT; i++) {
81+
cbm_log_info("servicelink.run", "name", LINKERS[i].name);
82+
int rc = LINKERS[i].fn(ctx);
83+
if (rc < 0) {
84+
cbm_log_warn("servicelink.error", "name", LINKERS[i].name,
85+
"rc", itoa_sl(rc));
86+
errors++;
87+
} else {
88+
total_links += rc;
89+
cbm_log_info("servicelink.done", "name", LINKERS[i].name,
90+
"links", itoa_sl(rc));
91+
}
92+
}
93+
94+
cbm_log_info("pass.servicelinks.done", "total_links", itoa_sl(total_links),
95+
"errors", itoa_sl(errors));
96+
97+
/* Return 0 unless ALL linkers failed */
98+
return (errors == LINKER_COUNT) ? -1 : 0;
99+
}

src/pipeline/servicelink.h

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* servicelink.h — Shared types and declarations for cross-service protocol linking.
3+
*
4+
* Each protocol linker discovers producers/consumers in source code and creates
5+
* typed edges (GRAPHQL_CALLS, KAFKA_CALLS, etc.) in the graph buffer.
6+
*/
7+
#ifndef CBM_SERVICELINK_H
8+
#define CBM_SERVICELINK_H
9+
10+
#include "pipeline_internal.h"
11+
#include "httplink.h" /* reuse cbm_confidence_band, cbm_read_source_lines_disk, etc. */
12+
#include "foundation/compat_regex.h" /* portable regex: cbm_regex_t, cbm_regcomp, etc. */
13+
#include "foundation/log.h" /* cbm_log_info, cbm_log_warn, cbm_log_error */
14+
15+
#include <stdbool.h>
16+
#include <stdint.h>
17+
#include <string.h>
18+
#include <stdio.h>
19+
20+
/* ── Buffer limits ──────────────────────────────────────────── */
21+
#define SL_MAX_PRODUCERS 8192
22+
#define SL_MAX_CONSUMERS 8192
23+
#define SL_MAX_PER_NODE 64 /* max discoveries per single function node */
24+
#define SL_MIN_CONFIDENCE 0.25 /* minimum confidence to create an edge */
25+
26+
/* ── Edge type constants ────────────────────────────────────── */
27+
#define SL_EDGE_GRAPHQL "GRAPHQL_CALLS"
28+
#define SL_EDGE_GRPC "GRPC_CALLS"
29+
#define SL_EDGE_KAFKA "KAFKA_CALLS"
30+
#define SL_EDGE_SQS "SQS_CALLS"
31+
#define SL_EDGE_SNS "SNS_CALLS"
32+
#define SL_EDGE_PUBSUB "PUBSUB_CALLS"
33+
#define SL_EDGE_WS "WS_CALLS"
34+
#define SL_EDGE_SSE "SSE_CALLS"
35+
#define SL_EDGE_AMQP "AMQP_CALLS"
36+
#define SL_EDGE_MQTT "MQTT_CALLS"
37+
#define SL_EDGE_NATS "NATS_CALLS"
38+
#define SL_EDGE_REDIS_PS "REDIS_PUBSUB_CALLS"
39+
#define SL_EDGE_TRPC "TRPC_CALLS"
40+
#define SL_EDGE_EVBRIDGE "EVENTBRIDGE_CALLS"
41+
42+
/* ── All edge types for cleanup (defined in pass_servicelinks.c) ── */
43+
extern const char *SL_ALL_EDGE_TYPES[];
44+
#define SL_EDGE_TYPE_COUNT 14
45+
46+
/* ── Generic producer/consumer structs ──────────────────────── */
47+
48+
typedef struct {
49+
char identifier[256]; /* topic, subject, channel, operation, procedure */
50+
char source_qn[512]; /* qualified name of producing function */
51+
int64_t source_id; /* gbuf node ID */
52+
char file_path[256]; /* file where discovered */
53+
char extra[256]; /* protocol-specific: method, exchange, qos, etc. */
54+
} cbm_sl_producer_t;
55+
56+
typedef struct {
57+
char identifier[256]; /* topic, subject, channel, operation, procedure */
58+
char handler_qn[512]; /* qualified name of consuming function */
59+
int64_t handler_id; /* gbuf node ID */
60+
char file_path[256]; /* file where discovered */
61+
char extra[256]; /* protocol-specific metadata */
62+
} cbm_sl_consumer_t;
63+
64+
/* ── Linker result ──────────────────────────────────────────── */
65+
66+
typedef struct {
67+
const char *name; /* protocol name for logging */
68+
int links_created;
69+
int producers_found;
70+
int consumers_found;
71+
} cbm_sl_result_t;
72+
73+
/* ── Helper: read source for a node ─────────────────────────── */
74+
75+
static inline char *sl_read_node_source(const cbm_pipeline_ctx_t *ctx,
76+
const cbm_gbuf_node_t *node) {
77+
return cbm_read_source_lines_disk(ctx->repo_path, node->file_path,
78+
node->start_line, node->end_line);
79+
}
80+
81+
/* ── Helper: get file extension ─────────────────────────────── */
82+
83+
static inline const char *sl_file_ext(const char *path) {
84+
const char *dot = strrchr(path, '.');
85+
return dot ? dot : "";
86+
}
87+
88+
/* ── Helper: insert edge with standard props ────────────────── */
89+
90+
static inline int64_t sl_insert_edge(cbm_pipeline_ctx_t *ctx,
91+
int64_t src_id, int64_t tgt_id, const char *edge_type,
92+
const char *identifier, double confidence, const char *extra_json)
93+
{
94+
char props[512];
95+
if (extra_json && extra_json[0]) {
96+
snprintf(props, sizeof(props),
97+
"{\"identifier\":\"%s\",\"confidence\":%.3f,\"confidence_band\":\"%s\",%s}",
98+
identifier, confidence, cbm_confidence_band(confidence), extra_json);
99+
} else {
100+
snprintf(props, sizeof(props),
101+
"{\"identifier\":\"%s\",\"confidence\":%.3f,\"confidence_band\":\"%s\"}",
102+
identifier, confidence, cbm_confidence_band(confidence));
103+
}
104+
return cbm_gbuf_insert_edge(ctx->gbuf, src_id, tgt_id, edge_type, props);
105+
}
106+
107+
/* ── Per-protocol linker entry points ───────────────────────── */
108+
109+
int cbm_servicelink_graphql(cbm_pipeline_ctx_t *ctx);
110+
int cbm_servicelink_grpc(cbm_pipeline_ctx_t *ctx);
111+
int cbm_servicelink_kafka(cbm_pipeline_ctx_t *ctx);
112+
int cbm_servicelink_sqs(cbm_pipeline_ctx_t *ctx);
113+
int cbm_servicelink_sns(cbm_pipeline_ctx_t *ctx);
114+
int cbm_servicelink_pubsub(cbm_pipeline_ctx_t *ctx);
115+
int cbm_servicelink_ws(cbm_pipeline_ctx_t *ctx);
116+
int cbm_servicelink_sse(cbm_pipeline_ctx_t *ctx);
117+
int cbm_servicelink_rabbitmq(cbm_pipeline_ctx_t *ctx);
118+
int cbm_servicelink_mqtt(cbm_pipeline_ctx_t *ctx);
119+
int cbm_servicelink_nats(cbm_pipeline_ctx_t *ctx);
120+
int cbm_servicelink_redis_pubsub(cbm_pipeline_ctx_t *ctx);
121+
int cbm_servicelink_trpc(cbm_pipeline_ctx_t *ctx);
122+
int cbm_servicelink_eventbridge(cbm_pipeline_ctx_t *ctx);
123+
124+
#endif /* CBM_SERVICELINK_H */

0 commit comments

Comments
 (0)