Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 41 additions & 33 deletions test/e2e/realtime-check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ function patchFetch() {


const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
const randomTopic = () => randomTopic();
const fmtSqlResult = (result: any[]) => {
const count = (result as any).count ?? result.length;
return result.length > 0 ? `count=${count} rows=${JSON.stringify(result)}` : `count=${count}`;
Expand Down Expand Up @@ -322,7 +323,7 @@ async function setup(): Promise<{ userId: string; testUser: { email: string; pas
id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
value text NOT NULL DEFAULT gen_random_uuid()
)`),
runSql("table broadcast_changes", sql`CREATE TABLE IF NOT EXISTS public.broadcast_changes (id text PRIMARY KEY, value text NOT NULL)`),
runSql("table broadcast_changes", sql`CREATE TABLE IF NOT EXISTS public.broadcast_changes (id text PRIMARY KEY, value text NOT NULL, topic text NOT NULL)`),
runSql("table wallet", sql`CREATE TABLE IF NOT EXISTS public.wallet (id text PRIMARY KEY, wallet_id text NOT NULL)`),
runSql("table replay_check", sql`CREATE TABLE IF NOT EXISTS public.replay_check (
id text PRIMARY KEY,
Expand Down Expand Up @@ -394,12 +395,13 @@ async function setup(): Promise<{ userId: string; testUser: { email: string; pas
CREATE OR REPLACE FUNCTION broadcast_changes_for_table_trigger() RETURNS TRIGGER AS $$
DECLARE topic text;
BEGIN
topic = 'topic:test';
topic = COALESCE(NEW.topic, OLD.topic);
PERFORM realtime.broadcast_changes(topic, TG_OP, TG_OP, TG_TABLE_NAME, TG_TABLE_SCHEMA, NEW, OLD, TG_LEVEL);
RETURN NULL;
END;
$$ LANGUAGE plpgsql
`);
await runSql("broadcast_changes topic column", sql`ALTER TABLE public.broadcast_changes ADD COLUMN IF NOT EXISTS topic text NOT NULL`);

await runSql("trigger broadcast_changes_for_table_public_broadcast_changes_trigger", sql`
DO $$ BEGIN
Expand Down Expand Up @@ -463,7 +465,7 @@ async function runConnectionTest() {
await test("first connect latency", async () => {
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
const channel = supabase.channel("topic:" + crypto.randomUUID());
const channel = supabase.channel(randomTopic());
const connectMs = await openChannel(channel);
return [{ label: "connect", value: connectMs, unit: "ms" }];
} finally {
Expand All @@ -477,7 +479,7 @@ async function runConnectionTest() {
const DELIVERY_SLO = 99;
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const event = "load";
const sendTimes = new Map<number, number>();
const latencies: number[] = [];
Expand Down Expand Up @@ -514,7 +516,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password:
try {
await signInUser(supabase, testUser.email, testUser.password);
const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes" }, () => {});
const { systemMs } = await openPostgresChannel(channel);
return [{ label: "system", value: systemMs, unit: "ms" }];
Expand All @@ -532,7 +534,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password:
const latencies: number[] = [];

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes" }, (p) => {
const t = sendTimes.get(p.new.id);
if (t !== undefined) latencies.push(performance.now() - t);
Expand Down Expand Up @@ -563,7 +565,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password:
const latencies: number[] = [];

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes", { event: "UPDATE", schema: "public", table: "pg_changes" }, (p) => {
const t = sendTimes.get(p.new.id);
if (t !== undefined) latencies.push(performance.now() - t);
Expand Down Expand Up @@ -595,7 +597,7 @@ async function runLoadPostgresChangesTests(testUser: { email: string; password:
const latencies: number[] = [];

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes", { event: "DELETE", schema: "public", table: "pg_changes" }, (p) => {
const t = sendTimes.get(p.old.id);
if (t !== undefined) latencies.push(performance.now() - t);
Expand Down Expand Up @@ -628,7 +630,7 @@ async function runLoadPresenceTests() {
const observer = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
const senders: ReturnType<typeof createClient>[] = [];
try {
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const trackTimes = new Map<string, number>();
const latencies: number[] = [];

Expand Down Expand Up @@ -676,11 +678,12 @@ async function runLoadBroadcastFromDbTests(testUser: { email: string; password:
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
await signInUser(supabase, testUser.email, testUser.password);
const testTopic = randomTopic();
const sendTimes = new Map<string, number>();
const latencies: number[] = [];

const channel = supabase
.channel("topic:test", { config: { private: true } })
.channel(testTopic, { config: { private: true } })
.on("broadcast", { event: "INSERT" }, (res) => {
const t = sendTimes.get(res.payload.record.id);
if (t !== undefined) latencies.push(performance.now() - t);
Expand All @@ -691,7 +694,7 @@ async function runLoadBroadcastFromDbTests(testUser: { email: string; password:
await Promise.all(Array.from({ length: LOAD_MESSAGES }, async () => {
const id = crypto.randomUUID();
sendTimes.set(id, performance.now());
await supabase.from("broadcast_changes").insert({ id, value: crypto.randomUUID() });
await supabase.from("broadcast_changes").insert({ id, value: crypto.randomUUID(), topic: testTopic });
}));

await settle(() => latencies.length, LOAD_MESSAGES, LOAD_SETTLE_MS);
Expand All @@ -713,7 +716,7 @@ async function runLoadBroadcastTests() {
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
const event = "load";
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const sendTimes = new Map<number, number>();
const latencies: number[] = [];

Expand Down Expand Up @@ -744,7 +747,7 @@ async function runLoadBroadcastTests() {
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
const event = "load";
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const sendTimes = new Map<number, number>();
const latencies: number[] = [];

Expand Down Expand Up @@ -785,7 +788,7 @@ async function runLoadBroadcastReplayTests(testUser: { email: string; password:
try {
await signInUser(supabase, testUser.email, testUser.password);
const event = crypto.randomUUID();
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();

const since = Date.now() - 1000;
await Promise.all(Array.from({ length: LOAD_MESSAGES }, (_, i) =>
Expand Down Expand Up @@ -819,7 +822,7 @@ async function runBroadcastTests() {
try {
let result: any = null;
const event = crypto.randomUUID();
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const expectedPayload = { message: crypto.randomUUID() };

const channel = supabase
Expand All @@ -842,14 +845,16 @@ async function runBroadcastTests() {
try {
let result: any = null;
const event = crypto.randomUUID();
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const expectedPayload = { message: crypto.randomUUID() };

const channel = supabase
.channel(topic, BROADCAST_CONFIG)
.on("broadcast", { event }, ({ payload }) => (result = payload));

const subscribeMs = await openChannel(channel);
// Small settle window so server-side subscription routing is ready before the HTTP broadcast arrives.
await sleep(100);

const res = await fetch(`${PROJECT_URL}/realtime/v1/api/broadcast`, {
method: "POST",
Expand All @@ -874,7 +879,7 @@ async function runPresenceTests(testUser: { email: string; password: string }) {
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
let joinEvent: any = null;
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const message = crypto.randomUUID();
const key = crypto.randomUUID();

Expand Down Expand Up @@ -903,7 +908,7 @@ async function runPresenceTests(testUser: { email: string; password: string }) {
await signInUser(supabase, testUser.email, testUser.password);

let joinEvent: any = null;
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const message = crypto.randomUUID();
const key = crypto.randomUUID();

Expand Down Expand Up @@ -932,7 +937,7 @@ async function runAuthorizationTests(testUser: { email: string; password: string
await test("user using private channel cannot connect without permissions", async () => {
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const channel = supabase.channel(topic, { config: { private: true } }).subscribe();

const { value: finalState, latencyMs: rejectMs } = await waitFor(
Expand All @@ -952,7 +957,7 @@ async function runAuthorizationTests(testUser: { email: string; password: string
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
await signInUser(supabase, testUser.email, testUser.password);
const channel = supabase.channel("topic:" + crypto.randomUUID(), { config: { private: true } });
const channel = supabase.channel(randomTopic(), { config: { private: true } });
const subscribeMs = await openChannel(channel);
return [{ label: "subscribe", value: subscribeMs, unit: "ms" }];
} finally {
Expand All @@ -969,16 +974,17 @@ async function runBroadcastChangesTests(testUser: { email: string; password: str
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
await signInUser(supabase, testUser.email, testUser.password);
const testTopic = randomTopic();
const id = crypto.randomUUID();
const value = crypto.randomUUID();
let result: any = null;

const channel = supabase
.channel("topic:test", { config: { private: true } })
.channel(testTopic, { config: { private: true } })
.on("broadcast", { event: "INSERT" }, (res) => (result = res));

const subscribeMs = await openChannel(channel);
await supabase.from("broadcast_changes").insert({ value, id });
await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic });
const { latencyMs: eventMs } = await waitFor(() => result, "INSERT event");

assert.strictEqual(result.payload.record.id, id);
Expand All @@ -998,17 +1004,18 @@ async function runBroadcastChangesTests(testUser: { email: string; password: str
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
await signInUser(supabase, testUser.email, testUser.password);
const testTopic = randomTopic();
const id = crypto.randomUUID();
const originalValue = crypto.randomUUID();
const updatedValue = crypto.randomUUID();
let result: any = null;

const channel = supabase
.channel("topic:test", { config: { private: true } })
.channel(testTopic, { config: { private: true } })
.on("broadcast", { event: "UPDATE" }, (res) => (result = res));

const subscribeMs = await openChannel(channel);
await supabase.from("broadcast_changes").insert({ value: originalValue, id });
await supabase.from("broadcast_changes").insert({ value: originalValue, id, topic: testTopic });
await supabase.from("broadcast_changes").update({ value: updatedValue }).eq("id", id);
const { latencyMs: eventMs } = await waitFor(() => result, "UPDATE event");

Expand All @@ -1030,16 +1037,17 @@ async function runBroadcastChangesTests(testUser: { email: string; password: str
const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS });
try {
await signInUser(supabase, testUser.email, testUser.password);
const testTopic = randomTopic();
const id = crypto.randomUUID();
const value = crypto.randomUUID();
let result: any = null;

const channel = supabase
.channel("topic:test", { config: { private: true } })
.channel(testTopic, { config: { private: true } })
.on("broadcast", { event: "DELETE" }, (res) => (result = res));

const subscribeMs = await openChannel(channel);
await supabase.from("broadcast_changes").insert({ value, id });
await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic });
await supabase.from("broadcast_changes").delete().eq("id", id);
const { latencyMs: eventMs } = await waitFor(() => result, "DELETE event");

Expand Down Expand Up @@ -1069,7 +1077,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri
const uniqueValue = crypto.randomUUID();

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes",
{ event: "INSERT", schema: "public", table: "pg_changes", filter: `value=eq.${uniqueValue}` },
(payload) => (result = payload));
Expand Down Expand Up @@ -1099,7 +1107,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri
const dummyId = await executeInsert(supabase, "dummy");

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes",
{ event: "UPDATE", schema: "public", table: "pg_changes", filter: `id=eq.${mainId}` },
(payload) => (result = payload));
Expand Down Expand Up @@ -1132,7 +1140,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri
const dummyId = await executeInsert(supabase, "dummy");

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes",
{ event: "DELETE", schema: "public", table: "pg_changes", filter: `id=eq.${mainId}` },
(payload) => (result = payload));
Expand Down Expand Up @@ -1165,7 +1173,7 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri
const deleteId = await executeInsert(supabase, "pg_changes");

const channel = supabase
.channel("topic:" + crypto.randomUUID(), BROADCAST_CONFIG)
.channel(randomTopic(), BROADCAST_CONFIG)
.on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=eq.${insertValue}` }, (p) => (insertResult = p))
.on("postgres_changes", { event: "UPDATE", schema: "public", table: "pg_changes", filter: `id=eq.${updateId}` }, (p) => (updateResult = p))
.on("postgres_changes", { event: "DELETE", schema: "public", table: "pg_changes", filter: `id=eq.${deleteId}` }, (p) => (deleteResult = p));
Expand Down Expand Up @@ -1207,7 +1215,7 @@ async function runBroadcastReplayTests(testUser: { email: string; password: stri
try {
await signInUser(supabase, testUser.email, testUser.password);
const event = crypto.randomUUID();
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();
const payload = { message: crypto.randomUUID() };

const since = Date.now() - 1000;
Expand Down Expand Up @@ -1235,7 +1243,7 @@ async function runBroadcastReplayTests(testUser: { email: string; password: stri
try {
await signInUser(supabase, testUser.email, testUser.password);
const event = crypto.randomUUID();
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();

const since = Date.now() - 1000;
await supabase.from("replay_check").insert({ id: crypto.randomUUID(), topic, event, payload: { value: 1 } });
Expand All @@ -1262,7 +1270,7 @@ async function runBroadcastReplayTests(testUser: { email: string; password: stri
try {
await signInUser(supabase, testUser.email, testUser.password);
const event = crypto.randomUUID();
const topic = "topic:" + crypto.randomUUID();
const topic = randomTopic();

await supabase.from("replay_check").insert({ id: crypto.randomUUID(), topic, event, payload: { value: "old" } });

Expand Down
Loading