Skip to content

Commit 90fe6d4

Browse files
authored
Merge pull request #1119 from constructive-io/feat/realtime-preset-integration
feat: integrate RealtimeManager into constructive preset and cache lifecycle
2 parents b7f2ece + db20d49 commit 90fe6d4

16 files changed

Lines changed: 205 additions & 137 deletions

File tree

graphile/graphile-cache/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"dependencies": {
3232
"@pgpmjs/logger": "workspace:^",
3333
"express": "^5.2.1",
34+
"graphile-realtime-subscriptions": "workspace:^",
3435
"grafserv": "1.0.0",
3536
"lru-cache": "^11.2.7",
3637
"pg-cache": "workspace:^",

graphile/graphile-cache/src/create-instance.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
import { createServer } from 'node:http';
2+
import { Logger } from '@pgpmjs/logger';
23
import express from 'express';
34
import { postgraphile } from 'postgraphile';
45
import { grafserv } from 'grafserv/express/v4';
56
import type { GraphileCacheEntry } from './graphile-cache';
67

8+
const log = new Logger('graphile-cache:create');
9+
710
interface GraphileInstanceOptions {
811
preset: any;
912
cacheKey: string;
13+
/**
14+
* When true, a RealtimeManager is created and started alongside the
15+
* PostGraphile instance. The pool is extracted from the preset's
16+
* pgServices (managed by pg-cache) rather than passed separately.
17+
*/
18+
enableRealtime?: boolean;
1019
}
1120

1221
/**
@@ -18,11 +27,17 @@ interface GraphileInstanceOptions {
1827
*
1928
* Callers are responsible for building the `GraphileConfig.Preset` (including
2029
* pgServices, grafserv options, grafast context, etc.) before passing it here.
30+
*
31+
* When `enableRealtime` is true, a RealtimeManager is created that bridges
32+
* cursor-tracked events from `drain_changes()` into the PostGraphile
33+
* instance's PgSubscriber EventEmitter. Both `pgSubscriber` and the pg
34+
* pool are extracted from the resolved preset's pgServices — no separate
35+
* pool parameter is needed.
2136
*/
2237
export const createGraphileInstance = async (
2338
opts: GraphileInstanceOptions
2439
): Promise<GraphileCacheEntry> => {
25-
const { preset, cacheKey } = opts;
40+
const { preset, cacheKey, enableRealtime = false } = opts;
2641

2742
const pgl = postgraphile(preset);
2843
const serv = pgl.createServ(grafserv);
@@ -32,12 +47,47 @@ export const createGraphileInstance = async (
3247
await serv.addTo(handler, httpServer);
3348
await serv.ready();
3449

35-
return {
50+
const entry: GraphileCacheEntry = {
3651
pgl,
3752
serv,
3853
handler,
3954
httpServer,
4055
cacheKey,
4156
createdAt: Date.now(),
4257
};
58+
59+
if (enableRealtime) {
60+
try {
61+
const { RealtimeManager } = await import('graphile-realtime-subscriptions');
62+
63+
// Extract PgSubscriber and pool from the resolved preset's pgServices.
64+
// The pool is the same instance managed by pg-cache (via getPgPool)
65+
// and threaded into the preset by makePgService({ pool, schemas }).
66+
const resolvedPreset = pgl.getResolvedPreset();
67+
const pgService = (resolvedPreset as any).pgServices?.[0];
68+
const pgSubscriber = pgService?.pgSubscriber ?? null;
69+
const pool = pgService?.adaptorSettings?.pool ?? null;
70+
71+
if (!pgSubscriber) {
72+
log.warn(`PostGraphile[${cacheKey}] has no pgSubscriber — RealtimeManager will not be started`);
73+
} else if (!pool) {
74+
log.warn(`PostGraphile[${cacheKey}] has no pool in pgService — RealtimeManager will not be started`);
75+
} else {
76+
const manager = new RealtimeManager({
77+
pgSubscriber,
78+
pool,
79+
nodeId: `graphile-cache:${cacheKey}`,
80+
schema: 'realtime_public',
81+
});
82+
83+
await manager.start();
84+
entry.realtimeManager = manager;
85+
log.info(`RealtimeManager started for PostGraphile[${cacheKey}]`);
86+
}
87+
} catch (err) {
88+
log.error(`Failed to start RealtimeManager for PostGraphile[${cacheKey}]:`, err);
89+
}
90+
}
91+
92+
return entry;
4393
};

graphile/graphile-cache/src/graphile-cache.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ export interface GraphileCacheEntry {
8686
httpServer: HttpServer;
8787
cacheKey: string;
8888
createdAt: number;
89+
/** Optional RealtimeManager for cursor-tracked subscription delivery */
90+
realtimeManager?: { stop(): Promise<void> } | null;
8991
}
9092

9193
// Track disposed entries to prevent double-disposal
@@ -119,6 +121,14 @@ const disposeEntry = async (entry: GraphileCacheEntry, key: string): Promise<voi
119121
entry.httpServer.close(() => resolve());
120122
});
121123
}
124+
// Stop RealtimeManager if present (before releasing PostGraphile)
125+
if (entry.realtimeManager) {
126+
try {
127+
await entry.realtimeManager.stop();
128+
} catch (err) {
129+
log.error(`Error stopping RealtimeManager for PostGraphile[${key}]:`, err);
130+
}
131+
}
122132
// Release PostGraphile instance (this also releases grafserv internally)
123133
if (entry.pgl) {
124134
await entry.pgl.release();

0 commit comments

Comments
 (0)