From 401001f32f1fe47cb1a074ecd213c6e69462e32b Mon Sep 17 00:00:00 2001 From: Filipe Cabaco Date: Tue, 26 May 2026 17:32:57 +0100 Subject: [PATCH] chore: further reduce flakiness of e2e tests --- test/e2e/realtime-check.ts | 74 +++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/test/e2e/realtime-check.ts b/test/e2e/realtime-check.ts index 3f976f173..0bceefd44 100644 --- a/test/e2e/realtime-check.ts +++ b/test/e2e/realtime-check.ts @@ -135,6 +135,7 @@ function patchFetch() { const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); +const randomTopic = () => randomTopic(); const fmtSqlResult = (result: any[]) => { const count = (result as any).count ?? result.length; return result.length > 0 ? `count=${count} rows=${JSON.stringify(result)}` : `count=${count}`; @@ -322,7 +323,7 @@ async function setup(): Promise<{ userId: string; testUser: { email: string; pas id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, value text NOT NULL DEFAULT gen_random_uuid() )`), - runSql("table broadcast_changes", sql`CREATE TABLE IF NOT EXISTS public.broadcast_changes (id text PRIMARY KEY, value text NOT NULL)`), + runSql("table broadcast_changes", sql`CREATE TABLE IF NOT EXISTS public.broadcast_changes (id text PRIMARY KEY, value text NOT NULL, topic text NOT NULL)`), runSql("table wallet", sql`CREATE TABLE IF NOT EXISTS public.wallet (id text PRIMARY KEY, wallet_id text NOT NULL)`), runSql("table replay_check", sql`CREATE TABLE IF NOT EXISTS public.replay_check ( id text PRIMARY KEY, @@ -394,12 +395,13 @@ async function setup(): Promise<{ userId: string; testUser: { email: string; pas CREATE OR REPLACE FUNCTION broadcast_changes_for_table_trigger() RETURNS TRIGGER AS $$ DECLARE topic text; BEGIN - topic = 'topic:test'; + topic = COALESCE(NEW.topic, OLD.topic); PERFORM realtime.broadcast_changes(topic, TG_OP, TG_OP, TG_TABLE_NAME, TG_TABLE_SCHEMA, NEW, OLD, TG_LEVEL); RETURN NULL; END; $$ LANGUAGE plpgsql `); + await runSql("broadcast_changes topic column", sql`ALTER TABLE public.broadcast_changes ADD COLUMN IF NOT EXISTS topic text NOT NULL`); await runSql("trigger broadcast_changes_for_table_public_broadcast_changes_trigger", sql` DO $$ BEGIN @@ -463,7 +465,7 @@ async function runConnectionTest() { await test("first connect latency", async () => { const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { - const channel = supabase.channel("topic:" + crypto.randomUUID()); + const channel = supabase.channel(randomTopic()); const connectMs = await openChannel(channel); return [{ label: "connect", value: connectMs, unit: "ms" }]; } finally { @@ -477,7 +479,7 @@ async function runConnectionTest() { const DELIVERY_SLO = 99; const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const event = "load"; const sendTimes = new Map(); const latencies: number[] = []; @@ -514,7 +516,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password: try { await signInUser(supabase, testUser.email, testUser.password); const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes" }, () => {}); const { systemMs } = await openPostgresChannel(channel); return [{ label: "system", value: systemMs, unit: "ms" }]; @@ -532,7 +534,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password: const latencies: number[] = []; const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes" }, (p) => { const t = sendTimes.get(p.new.id); if (t !== undefined) latencies.push(performance.now() - t); @@ -563,7 +565,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password: const latencies: number[] = []; const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "UPDATE", schema: "public", table: "pg_changes" }, (p) => { const t = sendTimes.get(p.new.id); if (t !== undefined) latencies.push(performance.now() - t); @@ -595,7 +597,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password: const latencies: number[] = []; const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "DELETE", schema: "public", table: "pg_changes" }, (p) => { const t = sendTimes.get(p.old.id); if (t !== undefined) latencies.push(performance.now() - t); @@ -628,7 +630,7 @@ async function runLoadPresenceTests() { const observer = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); const senders: ReturnType[] = []; try { - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const trackTimes = new Map(); const latencies: number[] = []; @@ -676,11 +678,12 @@ async function runLoadBroadcastFromDbTests(testUser: { email: string; password: const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { await signInUser(supabase, testUser.email, testUser.password); + const testTopic = randomTopic(); const sendTimes = new Map(); const latencies: number[] = []; const channel = supabase - .channel("topic:test", { config: { private: true } }) + .channel(testTopic, { config: { private: true } }) .on("broadcast", { event: "INSERT" }, (res) => { const t = sendTimes.get(res.payload.record.id); if (t !== undefined) latencies.push(performance.now() - t); @@ -691,7 +694,7 @@ async function runLoadBroadcastFromDbTests(testUser: { email: string; password: await Promise.all(Array.from({ length: LOAD_MESSAGES }, async () => { const id = crypto.randomUUID(); sendTimes.set(id, performance.now()); - await supabase.from("broadcast_changes").insert({ id, value: crypto.randomUUID() }); + await supabase.from("broadcast_changes").insert({ id, value: crypto.randomUUID(), topic: testTopic }); })); await settle(() => latencies.length, LOAD_MESSAGES, LOAD_SETTLE_MS); @@ -713,7 +716,7 @@ async function runLoadBroadcastTests() { const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { const event = "load"; - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const sendTimes = new Map(); const latencies: number[] = []; @@ -744,7 +747,7 @@ async function runLoadBroadcastTests() { const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { const event = "load"; - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const sendTimes = new Map(); const latencies: number[] = []; @@ -785,7 +788,7 @@ async function runLoadBroadcastReplayTests(testUser: { email: string; password: try { await signInUser(supabase, testUser.email, testUser.password); const event = crypto.randomUUID(); - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const since = Date.now() - 1000; await Promise.all(Array.from({ length: LOAD_MESSAGES }, (_, i) => @@ -819,7 +822,7 @@ async function runBroadcastTests() { try { let result: any = null; const event = crypto.randomUUID(); - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const expectedPayload = { message: crypto.randomUUID() }; const channel = supabase @@ -842,7 +845,7 @@ async function runBroadcastTests() { try { let result: any = null; const event = crypto.randomUUID(); - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const expectedPayload = { message: crypto.randomUUID() }; const channel = supabase @@ -850,6 +853,8 @@ async function runBroadcastTests() { .on("broadcast", { event }, ({ payload }) => (result = payload)); const subscribeMs = await openChannel(channel); + // Small settle window so server-side subscription routing is ready before the HTTP broadcast arrives. + await sleep(100); const res = await fetch(`${PROJECT_URL}/realtime/v1/api/broadcast`, { method: "POST", @@ -874,7 +879,7 @@ async function runPresenceTests(testUser: { email: string; password: string }) { const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { let joinEvent: any = null; - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const message = crypto.randomUUID(); const key = crypto.randomUUID(); @@ -903,7 +908,7 @@ async function runPresenceTests(testUser: { email: string; password: string }) { await signInUser(supabase, testUser.email, testUser.password); let joinEvent: any = null; - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const message = crypto.randomUUID(); const key = crypto.randomUUID(); @@ -932,7 +937,7 @@ async function runAuthorizationTests(testUser: { email: string; password: string await test("user using private channel cannot connect without permissions", async () => { const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const channel = supabase.channel(topic, { config: { private: true } }).subscribe(); const { value: finalState, latencyMs: rejectMs } = await waitFor( @@ -952,7 +957,7 @@ async function runAuthorizationTests(testUser: { email: string; password: string const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { await signInUser(supabase, testUser.email, testUser.password); - const channel = supabase.channel("topic:" + crypto.randomUUID(), { config: { private: true } }); + const channel = supabase.channel(randomTopic(), { config: { private: true } }); const subscribeMs = await openChannel(channel); return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; } finally { @@ -969,16 +974,17 @@ async function runBroadcastChangesTests(testUser: { email: string; password: str const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { await signInUser(supabase, testUser.email, testUser.password); + const testTopic = randomTopic(); const id = crypto.randomUUID(); const value = crypto.randomUUID(); let result: any = null; const channel = supabase - .channel("topic:test", { config: { private: true } }) + .channel(testTopic, { config: { private: true } }) .on("broadcast", { event: "INSERT" }, (res) => (result = res)); const subscribeMs = await openChannel(channel); - await supabase.from("broadcast_changes").insert({ value, id }); + await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic }); const { latencyMs: eventMs } = await waitFor(() => result, "INSERT event"); assert.strictEqual(result.payload.record.id, id); @@ -998,17 +1004,18 @@ async function runBroadcastChangesTests(testUser: { email: string; password: str const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { await signInUser(supabase, testUser.email, testUser.password); + const testTopic = randomTopic(); const id = crypto.randomUUID(); const originalValue = crypto.randomUUID(); const updatedValue = crypto.randomUUID(); let result: any = null; const channel = supabase - .channel("topic:test", { config: { private: true } }) + .channel(testTopic, { config: { private: true } }) .on("broadcast", { event: "UPDATE" }, (res) => (result = res)); const subscribeMs = await openChannel(channel); - await supabase.from("broadcast_changes").insert({ value: originalValue, id }); + await supabase.from("broadcast_changes").insert({ value: originalValue, id, topic: testTopic }); await supabase.from("broadcast_changes").update({ value: updatedValue }).eq("id", id); const { latencyMs: eventMs } = await waitFor(() => result, "UPDATE event"); @@ -1030,16 +1037,17 @@ async function runBroadcastChangesTests(testUser: { email: string; password: str const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); try { await signInUser(supabase, testUser.email, testUser.password); + const testTopic = randomTopic(); const id = crypto.randomUUID(); const value = crypto.randomUUID(); let result: any = null; const channel = supabase - .channel("topic:test", { config: { private: true } }) + .channel(testTopic, { config: { private: true } }) .on("broadcast", { event: "DELETE" }, (res) => (result = res)); const subscribeMs = await openChannel(channel); - await supabase.from("broadcast_changes").insert({ value, id }); + await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic }); await supabase.from("broadcast_changes").delete().eq("id", id); const { latencyMs: eventMs } = await waitFor(() => result, "DELETE event"); @@ -1069,7 +1077,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri const uniqueValue = crypto.randomUUID(); const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=eq.${uniqueValue}` }, (payload) => (result = payload)); @@ -1099,7 +1107,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri const dummyId = await executeInsert(supabase, "dummy"); const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "UPDATE", schema: "public", table: "pg_changes", filter: `id=eq.${mainId}` }, (payload) => (result = payload)); @@ -1132,7 +1140,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri const dummyId = await executeInsert(supabase, "dummy"); const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "DELETE", schema: "public", table: "pg_changes", filter: `id=eq.${mainId}` }, (payload) => (result = payload)); @@ -1165,7 +1173,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri const deleteId = await executeInsert(supabase, "pg_changes"); const channel = supabase - .channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG) + .channel(randomTopic(), BROADCAST_CONFIG) .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=eq.${insertValue}` }, (p) => (insertResult = p)) .on("postgres_changes", { event: "UPDATE", schema: "public", table: "pg_changes", filter: `id=eq.${updateId}` }, (p) => (updateResult = p)) .on("postgres_changes", { event: "DELETE", schema: "public", table: "pg_changes", filter: `id=eq.${deleteId}` }, (p) => (deleteResult = p)); @@ -1207,7 +1215,7 @@ async function runBroadcastReplayTests(testUser: { email: string; password: stri try { await signInUser(supabase, testUser.email, testUser.password); const event = crypto.randomUUID(); - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const payload = { message: crypto.randomUUID() }; const since = Date.now() - 1000; @@ -1235,7 +1243,7 @@ async function runBroadcastReplayTests(testUser: { email: string; password: stri try { await signInUser(supabase, testUser.email, testUser.password); const event = crypto.randomUUID(); - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); const since = Date.now() - 1000; await supabase.from("replay_check").insert({ id: crypto.randomUUID(), topic, event, payload: { value: 1 } }); @@ -1262,7 +1270,7 @@ async function runBroadcastReplayTests(testUser: { email: string; password: stri try { await signInUser(supabase, testUser.email, testUser.password); const event = crypto.randomUUID(); - const topic = "topic:" + crypto.randomUUID(); + const topic = randomTopic(); await supabase.from("replay_check").insert({ id: crypto.randomUUID(), topic, event, payload: { value: "old" } });