Operations guide for the SDLC indexing pipeline: dispatching, entity extraction, checkpoint management, and recovery.
Siphon CDC replicates GitLab PostgreSQL tables into ClickHouse datalake tables. The SDLC indexing pipeline transforms datalake rows into graph node and edge tables.
GitLab PostgreSQL
-> Siphon CDC (logical replication)
-> NATS JetStream
-> ClickHouse datalake tables
-> [Dispatcher] publishes indexing requests to NATS
-> [Indexer] extracts, transforms, writes graph tables
| Setting | Value |
|---|---|
| Retention | WorkQueue (deleted on ack) |
| Max messages per subject | 1 |
| Discard policy | New (reject duplicates while in-flight) |
| Subject | Purpose |
|---|---|
sdlc.global.indexing.requested |
Trigger global entity indexing (User) |
sdlc.namespace.indexing.requested.<org>.<ns> |
Trigger namespace entity indexing |
Consumer names are derived from the configured consumer_name (default in production: gkg-indexer) and the subscription subject, with dots replaced by hyphens and wildcards spelled out.
| Consumer | Stream | Role |
|---|---|---|
gkg-indexer-sdlc-global-indexing-requested |
GKG_INDEXER |
Global entity handlers |
gkg-indexer-sdlc-namespace-indexing-requested-wildcard-wildcard |
GKG_INDEXER |
Namespaced entity handlers |
gkg-indexer-sdlc-namespace-deletion-requested-wildcard |
GKG_INDEXER |
Namespace deletion handler |
With ephemeral consumers (consumer_name: None, the local dev default), NATS assigns random names that don't survive restarts.
Each ontology entity type gets its own EntityHandler instance. Handlers for global entities subscribe to the global subject; handlers for namespaced entities subscribe to the namespace subject.
| Scope | Subject | Default max_attempts | DLQ |
|---|---|---|---|
| Global (e.g. User) | sdlc.global.indexing.requested |
1 | No (re-dispatched next cycle) |
| Namespaced (e.g. Project, MergeRequest) | sdlc.namespace.indexing.requested.*.* |
1 | No (re-dispatched next cycle) |
All SDLC handlers rely on the dispatcher to re-create requests on the next cycle rather than retrying via NATS. This is the eventual consistency model.
The dispatcher runs as gkg-server --mode DispatchIndexing and publishes indexing requests on a schedule.
| Task | What it does |
|---|---|
| GlobalDispatcher | Publishes a single GlobalIndexingRequest with watermark = now |
| NamespaceDispatcher | Queries siphon_knowledge_graph_enabled_namespaces, publishes one request per enabled namespace |
Both run on every scheduler cycle. The NATS max_messages_per_subject: 1 constraint ensures at-most-one in-flight request per subject. Duplicate publishes are silently rejected.
The indexer tracks progress in a checkpoint table in the graph database.
| Column | Type | Purpose |
|---|---|---|
key |
String | Position identifier (e.g., global.User, ns.42.Project) |
watermark |
DateTime64 | Upper bound of the extraction window |
cursor_values |
String (JSON) | Keyset pagination cursor for resuming mid-batch |
_version |
DateTime64 | ReplacingMergeTree version |
_deleted |
Bool | Soft delete flag |
| State | Meaning |
|---|---|
| No row | First run. Extracts from epoch. |
cursor_values is null/empty |
Previous run completed. Next run starts a fresh window. |
cursor_values has values |
Interrupted mid-pagination. Resumes from the cursor. |
SELECT key, watermark, cursor_values
FROM `<gkg-database>`.checkpoint
ORDER BY key;SELECT key, watermark, cursor_values
FROM `<gkg-database>`.checkpoint
WHERE key LIKE 'ns.42.%'
ORDER BY key;-
Delete the namespace's checkpoints so the indexer starts from epoch:
ALTER TABLE `<gkg-database>`.checkpoint DELETE WHERE key LIKE 'ns.<namespace_id>.%';
-
Trigger the dispatcher (it will publish a new request on the next cycle):
# Option A: wait for the next scheduled dispatch cycle # Option B: trigger manually (see "Trigger dispatcher manually" above) # Option C: publish directly to NATS nats pub sdlc.namespace.indexing.requested.<org_id>.<namespace_id> \ '{"organization":<org_id>,"namespace":<namespace_id>,"watermark":"2026-03-24T00:00:00Z"}'
ALTER TABLE `<gkg-database>`.checkpoint
DELETE WHERE key = 'ns.<namespace_id>.<EntityType>';For example, to reindex only Projects in namespace 42:
ALTER TABLE `<gkg-database>`.checkpoint
DELETE WHERE key = 'ns.42.Project';ALTER TABLE `<gkg-database>`.checkpoint
DELETE WHERE key LIKE 'global.%';-
Truncate graph tables:
SELECT 'TRUNCATE TABLE `<gkg-database>`.' || name || ';' FROM system.tables WHERE database = '<gkg-database>';
Run the generated statements.
-
Clear all checkpoints:
TRUNCATE TABLE `<gkg-database>`.checkpoint; -
Trigger the dispatcher to re-publish all requests.
Manually set a checkpoint watermark to re-extract rows after that timestamp:
INSERT INTO `<gkg-database>`.checkpoint (key, watermark, cursor_values, _version, _deleted)
VALUES ('ns.42.Project', '2026-01-01T00:00:00', '', now64(), false);The handler will extract rows where _siphon_watermark falls between the old watermark and the new one.
nats stream info GKG_INDEXER
nats consumer ls GKG_INDEXER
nats consumer info GKG_INDEXER <consumer_name>Remove a stuck message for a single namespace:
nats stream purge GKG_INDEXER --subject='sdlc.namespace.indexing.requested.<org>.<ns>'nats stream purge GKG_INDEXER --subject='sdlc.global.indexing.requested'
nats stream purge GKG_INDEXER --subject='sdlc.namespace.indexing.requested.*.*'Destroys all in-flight messages. The dispatcher will re-create them on the next cycle.
kubectl -n gkg delete pod gkg-nats-0
kubectl -n gkg delete pvc gkg-nats-js-gkg-nats-0
kubectl -n gkg wait --for=condition=ready pod/gkg-nats-0 --timeout=120sSDLC handlers use max_attempts: 1 by default (no NATS-level retry). Instead, they rely on eventual consistency:
- Handler fails and term-acks the message
- Message is removed from the stream
- On the next dispatcher cycle, the dispatcher publishes a new request for the same subject
- The handler picks it up and retries from its checkpoint
A transient failure (ClickHouse timeout, network blip) resolves on its own within one dispatch interval.
Within a single handler invocation, the datalake batch extraction query retries up to 3 times with exponential backoff (100ms, 200ms, 400ms) before failing the handler.
-
Verify the namespace is enabled:
SELECT * FROM gitlab_clickhouse_main_production.siphon_knowledge_graph_enabled_namespaces WHERE root_namespace_id = <namespace_id> AND _siphon_deleted = false;
-
Check if a message is in-flight (blocking new dispatches):
nats stream info GKG_INDEXER --subjects
-
Check the checkpoint for errors or stale cursors:
SELECT key, watermark, cursor_values FROM `<gkg-database>`.checkpoint WHERE key LIKE 'ns.<namespace_id>.%';
If the indexer is consuming CPU but not making progress:
-
Check ClickHouse for long-running queries:
SELECT query_id, elapsed, query FROM system.processes WHERE elapsed > 60 ORDER BY elapsed DESC;
-
Kill the stuck query if needed:
KILL QUERY WHERE query_id = '<query_id>';
-
The handler will fail and the dispatcher will re-create the request.
If cursor_values contains invalid JSON:
ALTER TABLE `<gkg-database>`.checkpoint
DELETE WHERE key = '<position_key>';The handler will restart extraction from epoch for that entity type.
The TableCleanup scheduled task runs OPTIMIZE TABLE ... FINAL CLEANUP on all graph tables every 24 hours. This compacts ReplacingMergeTree tables and physically removes soft-deleted rows.
To trigger manually:
OPTIMIZE TABLE `<gkg-database>`.gl_project FINAL CLEANUP;