Skip to content

Commit 2da7657

Browse files
authored
fix: properly pass maxPgConnections config through connection chain (#589)
# Fix maxPgConnections Configuration This PR fixes an issue where the `maxPgConnections` configuration parameter was being ignored in the edge worker. The parameter is now properly passed through the connection chain with a default value of 4 (previously hardcoded to 10). ## Changes: - Added `maxPgConnections` parameter to all relevant interfaces and function calls - Updated the connection resolution logic to use the configured value or default to 4 - Added test functions to verify both default and custom connection limits - Added unit tests to validate the connection configuration behavior - Added E2E tests to ensure the configuration is properly applied This change ensures that users can now properly control the maximum number of PostgreSQL connections used by the edge worker, which is important for resource management and performance tuning.
1 parent b09543c commit 2da7657

File tree

10 files changed

+182
-7
lines changed

10 files changed

+182
-7
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@pgflow/edge-worker': patch
3+
---
4+
5+
Fix maxPgConnections config being ignored - now properly passed through the connection chain with default of 4

pkgs/edge-worker/deno.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkgs/edge-worker/src/EdgeWorker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ export class EdgeWorker {
143143
const platform = await createAdapter({
144144
sql: config.sql,
145145
connectionString: config.connectionString,
146+
maxPgConnections: config.maxPgConnections,
146147
});
147148
this.platform = platform;
148149

@@ -197,6 +198,7 @@ export class EdgeWorker {
197198
const platform = await createAdapter({
198199
sql: config.sql,
199200
connectionString: config.connectionString,
201+
maxPgConnections: config.maxPgConnections,
200202
});
201203
this.platform = platform;
202204

pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
5454
private loggingFactory!: ReturnType<typeof createLoggingFactory>;
5555

5656
constructor(
57-
options?: { sql?: postgres.Sql; connectionString?: string },
57+
options?: { sql?: postgres.Sql; connectionString?: string; maxPgConnections?: number },
5858
deps: SupabasePlatformDeps = getPlatformDeps()
5959
) {
6060
this.deps = deps;

pkgs/edge-worker/src/platform/createAdapter.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { getPlatformDeps } from './deps.js';
77
interface AdapterOptions {
88
sql?: postgres.Sql;
99
connectionString?: string;
10+
maxPgConnections?: number;
1011
}
1112

1213
/**

pkgs/edge-worker/src/platform/resolveConnection.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface ConnectionOptions {
2424
export interface SqlConnectionOptions {
2525
sql?: postgres.Sql;
2626
connectionString?: string;
27+
maxPgConnections?: number;
2728
}
2829

2930
/**
@@ -82,19 +83,21 @@ export function resolveSqlConnection(
8283
return options.sql;
8384
}
8485

86+
const max = options?.maxPgConnections ?? 4;
87+
8588
// 2. config.connectionString
8689
if (options?.connectionString) {
87-
return postgres(options.connectionString, { prepare: false, max: 10 });
90+
return postgres(options.connectionString, { prepare: false, max });
8891
}
8992

9093
// 3. EDGE_WORKER_DB_URL
9194
if (env.EDGE_WORKER_DB_URL) {
92-
return postgres(env.EDGE_WORKER_DB_URL, { prepare: false, max: 10 });
95+
return postgres(env.EDGE_WORKER_DB_URL, { prepare: false, max });
9396
}
9497

9598
// 4. Local Supabase detection + docker URL
9699
if (isLocalSupabaseEnv(env)) {
97-
return postgres(DOCKER_TRANSACTION_POOLER_URL, { prepare: false, max: 10 });
100+
return postgres(DOCKER_TRANSACTION_POOLER_URL, { prepare: false, max });
98101
}
99102

100103
throw new Error(
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { EdgeWorker } from '@pgflow/edge-worker';
2+
3+
// Tests that default maxPgConnections is 4 (not the old hardcoded 10)
4+
// NO maxPgConnections specified - should default to 4
5+
EdgeWorker.start(
6+
async (_payload, { sql }) => {
7+
const queueName = 'conn_max_pg_default';
8+
const actualMax = sql.options.max;
9+
const status = actualMax === 4 ? 'success' : 'error';
10+
const errorMessage = actualMax === 4 ? null : `Expected max=4, got ${actualMax}`;
11+
12+
await sql`
13+
INSERT INTO e2e_test_results (queue_name, status, actual, error_message)
14+
VALUES (${queueName}, ${status}, ${sql.json({ max: actualMax })}, ${errorMessage})
15+
`;
16+
17+
return { max: actualMax };
18+
},
19+
{ queueName: 'conn_max_pg_default' }
20+
);
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { EdgeWorker } from '@pgflow/edge-worker';
2+
3+
// Tests that explicit maxPgConnections: 7 is respected
4+
EdgeWorker.start(
5+
async (_payload, { sql }) => {
6+
const queueName = 'conn_max_pg_override';
7+
const actualMax = sql.options.max;
8+
const status = actualMax === 7 ? 'success' : 'error';
9+
const errorMessage = actualMax === 7 ? null : `Expected max=7, got ${actualMax}`;
10+
11+
await sql`
12+
INSERT INTO e2e_test_results (queue_name, status, actual, error_message)
13+
VALUES (${queueName}, ${status}, ${sql.json({ max: actualMax })}, ${errorMessage})
14+
`;
15+
16+
return { max: actualMax };
17+
},
18+
{
19+
queueName: 'conn_max_pg_override',
20+
maxPgConnections: 7,
21+
}
22+
);

pkgs/edge-worker/tests/e2e/connection-config.test.ts

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
import { withSql } from '../sql.ts';
2-
import { assertGreaterOrEqual } from 'jsr:@std/assert';
2+
import { assertEquals, assertGreaterOrEqual } from 'jsr:@std/assert';
33
import {
44
sendBatch,
55
seqLastValue,
66
startWorker,
7+
waitFor,
78
waitForSeqToIncrementBy,
89
} from './_helpers.ts';
910
import type postgres from 'postgres';
1011

12+
interface ConnTestResult {
13+
queue_name: string;
14+
status: string;
15+
actual: Record<string, unknown> | null;
16+
error_message: string | null;
17+
}
18+
1119
async function setupTest(sql: postgres.Sql, queueName: string) {
1220
await sql`CREATE SEQUENCE IF NOT EXISTS conn_test_seq`;
1321
await sql`ALTER SEQUENCE conn_test_seq RESTART WITH 1`;
@@ -24,6 +32,53 @@ async function setupTest(sql: postgres.Sql, queueName: string) {
2432
`;
2533
}
2634

35+
async function setupMaxPgConnectionsTest(sql: postgres.Sql, queueName: string) {
36+
// Create results table if not exists (generic, reusable for other tests)
37+
await sql`
38+
CREATE TABLE IF NOT EXISTS e2e_test_results (
39+
id SERIAL PRIMARY KEY,
40+
queue_name TEXT NOT NULL,
41+
status TEXT NOT NULL,
42+
actual JSONB,
43+
error_message TEXT,
44+
created_at TIMESTAMPTZ DEFAULT NOW()
45+
)
46+
`;
47+
// Clear previous results for this queue
48+
await sql`DELETE FROM e2e_test_results WHERE queue_name = ${queueName}`;
49+
// Standard queue setup
50+
await sql`
51+
SELECT * FROM pgmq.drop_queue(${queueName})
52+
WHERE EXISTS (
53+
SELECT 1 FROM pgmq.list_queues() WHERE queue_name = ${queueName}
54+
)
55+
`;
56+
await sql`SELECT pgmq.create(${queueName})`;
57+
await sql`
58+
DELETE FROM pgflow.workers
59+
WHERE last_heartbeat_at < NOW() - INTERVAL '6 seconds'
60+
`;
61+
}
62+
63+
async function waitForTestResult(
64+
sql: postgres.Sql,
65+
queueName: string,
66+
timeoutMs = 10000
67+
): Promise<ConnTestResult> {
68+
return await waitFor(
69+
async () => {
70+
const rows = await sql<ConnTestResult[]>`
71+
SELECT queue_name, status, actual, error_message
72+
FROM e2e_test_results
73+
WHERE queue_name = ${queueName}
74+
LIMIT 1
75+
`;
76+
return rows.length > 0 ? rows[0] : false;
77+
},
78+
{ timeoutMs, description: `test result for ${queueName}` }
79+
);
80+
}
81+
2782
Deno.test(
2883
{
2984
name: 'connection config - zero config uses Docker pooler',
@@ -112,3 +167,51 @@ Deno.test(
112167
});
113168
}
114169
);
170+
171+
Deno.test(
172+
{
173+
name: 'connection config - default maxPgConnections is 4',
174+
sanitizeOps: false,
175+
sanitizeResources: false,
176+
},
177+
async () => {
178+
await withSql(async (sql) => {
179+
const queueName = 'conn_max_pg_default';
180+
await setupMaxPgConnectionsTest(sql, queueName);
181+
await startWorker(queueName);
182+
await sendBatch(1, queueName);
183+
184+
const result = await waitForTestResult(sql, queueName);
185+
assertEquals(
186+
result.status,
187+
'success',
188+
`Expected success but got error: ${result.error_message} (actual=${JSON.stringify(result.actual)})`
189+
);
190+
assertEquals(result.actual?.max, 4);
191+
});
192+
}
193+
);
194+
195+
Deno.test(
196+
{
197+
name: 'connection config - maxPgConnections override works',
198+
sanitizeOps: false,
199+
sanitizeResources: false,
200+
},
201+
async () => {
202+
await withSql(async (sql) => {
203+
const queueName = 'conn_max_pg_override';
204+
await setupMaxPgConnectionsTest(sql, queueName);
205+
await startWorker(queueName);
206+
await sendBatch(1, queueName);
207+
208+
const result = await waitForTestResult(sql, queueName);
209+
assertEquals(
210+
result.status,
211+
'success',
212+
`Expected success but got error: ${result.error_message} (actual=${JSON.stringify(result.actual)})`
213+
);
214+
assertEquals(result.actual?.max, 7);
215+
});
216+
}
217+
);

pkgs/edge-worker/tests/unit/platform/connectionPriority.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,21 @@ Deno.test('resolveSqlConnection - sql takes priority over everything', () => {
219219
assertEquals(result, mockSql);
220220
mockSql.end();
221221
});
222+
223+
// ============================================================
224+
// maxPgConnections tests
225+
// ============================================================
226+
227+
Deno.test('resolveSqlConnection - uses default maxPgConnections of 4', () => {
228+
const env = { SUPABASE_URL: LOCAL_SUPABASE_URL };
229+
const result = resolveSqlConnection(env);
230+
assertEquals(result.options.max, 4);
231+
result.end();
232+
});
233+
234+
Deno.test('resolveSqlConnection - respects maxPgConnections override', () => {
235+
const env = { SUPABASE_URL: LOCAL_SUPABASE_URL };
236+
const result = resolveSqlConnection(env, { maxPgConnections: 7 });
237+
assertEquals(result.options.max, 7);
238+
result.end();
239+
});

0 commit comments

Comments
 (0)