Skip to content

Commit 4c2d2a0

Browse files
committed
self-hosted supabase guidde
1 parent c5f45ae commit 4c2d2a0

20 files changed

Lines changed: 652 additions & 126 deletions

pkgs/edge-worker/src/core/BatchProcessor.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@ export class BatchProcessor<TMessage extends IMessage> {
1818
}
1919

2020
async processBatch() {
21+
const availableSlots = this.executionController.availableSlots;
22+
if (availableSlots <= 0) {
23+
await this.executionController.waitForSlot();
24+
return;
25+
}
26+
2127
this.logger.polling();
22-
const messageRecords = await this.poller.poll();
28+
const messageRecords = await this.poller.poll(availableSlots);
2329

2430
if (this.signal.aborted) {
2531
this.logger.info('Discarding messageRecords because worker is stopping');
@@ -28,10 +34,12 @@ export class BatchProcessor<TMessage extends IMessage> {
2834

2935
this.logger.taskCount(messageRecords.length);
3036

31-
const startPromises = messageRecords.map((message) =>
32-
this.executionController.start(message)
33-
);
34-
await Promise.all(startPromises);
37+
for (const message of messageRecords) {
38+
void this.executionController.start(message).catch(() => {
39+
// ExecutionController already logs task failures; swallow here so
40+
// refilling the next slot does not produce unhandled rejections.
41+
});
42+
}
3543
}
3644

3745
async awaitCompletion() {

pkgs/edge-worker/src/core/ExecutionController.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ export class ExecutionController<TMessage extends IMessage> {
1111
private promiseQueue: PromiseQueue;
1212
private signal: AbortSignal;
1313
private createExecutor: (record: TMessage, signal: AbortSignal) => IExecutor;
14+
private readonly maxConcurrent: number;
15+
private slotWaiters = new Set<() => void>();
1416

1517
constructor(
1618
executorFactory: (record: TMessage, signal: AbortSignal) => IExecutor,
@@ -20,23 +22,59 @@ export class ExecutionController<TMessage extends IMessage> {
2022
) {
2123
this.signal = abortSignal;
2224
this.createExecutor = executorFactory;
25+
this.maxConcurrent = config.maxConcurrent;
2326
this.promiseQueue = newQueue(config.maxConcurrent);
2427
this.logger = logger;
2528
}
2629

27-
async start(record: TMessage) {
30+
get availableSlots(): number {
31+
return Math.max(0, this.maxConcurrent - this.promiseQueue.size());
32+
}
33+
34+
start(record: TMessage) {
2835
const executor = this.createExecutor(record, this.signal);
2936

3037
this.logger.debug(`Scheduling execution of task ${executor.msgId}`);
3138

32-
return await this.promiseQueue.add(async () => {
39+
return this.promiseQueue.add(async () => {
3340
try {
3441
this.logger.debug(`Executing task ${executor.msgId}...`);
3542
await executor.execute();
3643
this.logger.debug(`Execution successful for ${executor.msgId}`);
3744
} catch (error) {
3845
this.logger.error(`Execution failed for ${executor.msgId}`, error);
3946
throw error;
47+
} finally {
48+
this.notifySlotWaiters();
49+
}
50+
});
51+
}
52+
53+
async waitForSlot(): Promise<void> {
54+
if (this.signal.aborted || this.availableSlots > 0) {
55+
return;
56+
}
57+
58+
await new Promise<void>((resolve) => {
59+
const onAbort = () => {
60+
cleanup();
61+
resolve();
62+
};
63+
const onSlotFreed = () => {
64+
cleanup();
65+
resolve();
66+
};
67+
const cleanup = () => {
68+
this.slotWaiters.delete(onSlotFreed);
69+
this.signal.removeEventListener('abort', onAbort);
70+
};
71+
72+
this.slotWaiters.add(onSlotFreed);
73+
this.signal.addEventListener('abort', onAbort, { once: true });
74+
75+
if (this.signal.aborted || this.availableSlots > 0) {
76+
cleanup();
77+
resolve();
4078
}
4179
});
4280
}
@@ -50,4 +88,12 @@ export class ExecutionController<TMessage extends IMessage> {
5088
);
5189
await this.promiseQueue.done();
5290
}
91+
92+
private notifySlotWaiters() {
93+
const waiters = [...this.slotWaiters];
94+
this.slotWaiters.clear();
95+
for (const waiter of waiters) {
96+
waiter();
97+
}
98+
}
5399
}

pkgs/edge-worker/src/core/Worker.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export class Worker {
66
private lifecycle: ILifecycle;
77
private logger: Logger;
88
private abortController = new AbortController();
9+
private readonly requestShutdown?: () => void;
910

1011
private batchProcessor: IBatchProcessor;
1112
private sql: postgres.Sql;
@@ -16,12 +17,14 @@ export class Worker {
1617
batchProcessor: IBatchProcessor,
1718
lifecycle: ILifecycle,
1819
sql: postgres.Sql,
19-
logger: Logger
20+
logger: Logger,
21+
requestShutdown?: () => void
2022
) {
2123
this.sql = sql;
2224
this.lifecycle = lifecycle;
2325
this.batchProcessor = batchProcessor;
2426
this.logger = logger;
27+
this.requestShutdown = requestShutdown;
2528
}
2629

2730
startOnlyOnce(workerBootstrap: WorkerBootstrap) {
@@ -69,12 +72,13 @@ export class Worker {
6972
return;
7073
}
7174

72-
this.lifecycle.transitionToStopping();
75+
this.lifecycle.transitionToStopping();
7376

74-
try {
75-
// Signal deprecation (which includes "Stopped accepting new messages")
76-
this.logDeprecation();
77-
this.abortController.abort();
77+
try {
78+
// Signal deprecation (which includes "Stopped accepting new messages")
79+
this.logDeprecation();
80+
this.requestShutdown?.();
81+
this.abortController.abort();
7882

7983
try {
8084
this.logger.debug('-> Waiting for main loop to complete');

pkgs/edge-worker/src/core/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export type { Json } from '@pgflow/core';
77
export type Supplier<T> = () => T;
88

99
export interface IPoller<IMessage> {
10-
poll(): Promise<IMessage[]>;
10+
poll(limit?: number): Promise<IMessage[]>;
1111
}
1212

1313
export interface IExecutor {

pkgs/edge-worker/src/flow/StepTaskPoller.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,24 @@ export class StepTaskPoller<TFlow extends AnyFlow>
3636
this.logger = logger;
3737
}
3838

39-
async poll(): Promise<StepTaskWithMessage<TFlow>[]> {
39+
async poll(limit?: number): Promise<StepTaskWithMessage<TFlow>[]> {
4040
if (this.isAborted()) {
4141
this.logger.debug('Polling aborted, returning empty array');
4242
return [];
4343
}
4444

4545
const workerId = this.getWorkerId();
46+
const batchSize = limit ?? this.config.batchSize;
4647
this.logger.debug(
47-
`Two-phase polling for flow tasks with batch size ${this.config.batchSize}, maxPollSeconds: ${this.config.maxPollSeconds}, pollIntervalMs: ${this.config.pollIntervalMs}`
48+
`Two-phase polling for flow tasks with batch size ${batchSize}, maxPollSeconds: ${this.config.maxPollSeconds}, pollIntervalMs: ${this.config.pollIntervalMs}`
4849
);
4950

5051
try {
5152
// Phase 1: Read messages from queue
5253
const messages = await this.adapter.readMessages(
5354
this.config.queueName,
5455
this.config.visibilityTimeout ?? 2,
55-
this.config.batchSize,
56+
batchSize,
5657
this.config.maxPollSeconds,
5758
this.config.pollIntervalMs
5859
);

pkgs/edge-worker/src/flow/createFlowWorker.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,5 +204,11 @@ export function createFlowWorker<
204204
);
205205

206206
// Return Worker
207-
return new Worker(batchProcessor, lifecycle, sql, createLogger('Worker'));
207+
return new Worker(
208+
batchProcessor,
209+
lifecycle,
210+
sql,
211+
createLogger('Worker'),
212+
() => platformAdapter.requestShutdown()
213+
);
208214
}

pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,17 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
104104
}
105105

106106
async stopWorker(): Promise<void> {
107-
// Trigger shutdown signal
108-
this.abortController.abort();
109-
110-
// Cleanup resources
111-
await this._platformResources.sql.end();
107+
this.requestShutdown();
112108

113109
if (this.worker) {
114110
await this.worker.stop();
115111
}
112+
113+
await this._platformResources.sql.end();
114+
}
115+
116+
requestShutdown(): void {
117+
this.abortController.abort();
116118
}
117119

118120
createLogger(module: string): Logger {

pkgs/edge-worker/src/platform/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ export interface PlatformAdapter<TResources extends Record<string, unknown> = Re
7777
*/
7878
stopWorker(): Promise<void>;
7979

80+
/**
81+
* Trigger the shared shutdown signal used by pollers, executors, and contexts.
82+
*/
83+
requestShutdown(): void;
84+
8085
/**
8186
* Get the connection string for the database
8287
* Returns undefined if sql was provided directly via config

pkgs/edge-worker/src/queue/ReadWithPollPoller.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@ export class ReadWithPollPoller<TPayload extends Json> {
2222
this.logger = logger;
2323
}
2424

25-
async poll(): Promise<PgmqMessageRecord<TPayload>[]> {
25+
async poll(limit?: number): Promise<PgmqMessageRecord<TPayload>[]> {
2626
if (this.isAborted()) {
2727
this.logger.debug('Polling aborted, returning empty array');
2828
return [];
2929
}
3030

31-
this.logger.debug(`Polling queue '${this.queue.queueName}' with batch size ${this.config.batchSize}`);
31+
const batchSize = limit ?? this.config.batchSize;
32+
33+
this.logger.debug(`Polling queue '${this.queue.queueName}' with batch size ${batchSize}`);
3234
const messages = await this.queue.readWithPoll(
33-
this.config.batchSize,
35+
batchSize,
3436
this.config.visibilityTimeout,
3537
this.config.maxPollSeconds,
3638
this.config.pollIntervalMs

pkgs/edge-worker/src/queue/createQueueWorker.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,5 +218,11 @@ export function createQueueWorker<TPayload extends Json, TResources extends Reco
218218
createLogger('BatchProcessor')
219219
);
220220

221-
return new Worker(batchProcessor, lifecycle, sql, createLogger('Worker'));
221+
return new Worker(
222+
batchProcessor,
223+
lifecycle,
224+
sql,
225+
createLogger('Worker'),
226+
() => platformAdapter.requestShutdown()
227+
);
222228
}

0 commit comments

Comments
 (0)