Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,49 @@ mod moved_tests {
event_task.await.expect("event task should complete");
}

#[tokio::test]
async fn transport_close_during_preflight_never_emits_connection_closed() {
let ctx = ActorContext::new_with_kv(
"actor-preflight-transport-close",
"actor",
Vec::new(),
"local",
Kv::new_in_memory(),
);
ctx.configure_connection_runtime(crate::actor::config::ActorConfig::default());
let (events_tx, mut events_rx) = mpsc::unbounded_channel();
ctx.configure_actor_events(Some(events_tx));
let closed_conn_id = Arc::new(Mutex::new(None::<String>));

let event_closed_conn_id = closed_conn_id.clone();
let event_task = tokio::spawn(async move {
match events_rx.recv().await.expect("preflight event") {
ActorEvent::ConnectionPreflight { conn, reply, .. } => {
conn.disconnect(Some("transport closed"))
.await
.expect("pending connection transport close should succeed");
reply.send(Err(anyhow::anyhow!("reject after transport close")));
}
other => panic!("unexpected event: {other:?}"),
}

if let Ok(Some(ActorEvent::ConnectionClosed { conn })) =
tokio::time::timeout(Duration::from_millis(20), events_rx.recv()).await
{
*event_closed_conn_id.lock() = Some(conn.id().to_owned());
}
});

let error = ctx
.connect_with_state(vec![1], false, None, None, async { Ok(vec![2]) })
.await
.expect_err("connection should fail");

assert!(format!("{error:#}").contains("reject after transport close"));
assert_eq!(*closed_conn_id.lock(), None);
event_task.await.expect("event task should complete");
}

#[test]
fn persisted_connection_uses_ts_v4_fixed_id_wire_format() {
let persisted = PersistedConnection {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { actor } from "rivetkit";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/conn-preflight-visibility.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:

type ConnState = {
label: string;
Expand All @@ -8,6 +8,8 @@
label?: string;
beforeDelayMs?: number;
createDelayMs?: number;
rejectBefore?: boolean;
rejectCreate?: boolean;
};

const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
Expand Down Expand Up @@ -38,13 +40,19 @@
if (params?.beforeDelayMs) {
await sleep(params.beforeDelayMs);
}
if (params?.rejectBefore) {
throw new Error("rejected before connect");
}
},
createConnState: async (c, params: ConnParams): Promise<ConnState> => {
c.state.createStarted += 1;
c.state.createVisibleLabels.push(visibleLabels(c));
if (params?.createDelayMs) {
await sleep(params.createDelayMs);
}
if (params?.rejectCreate) {
throw new Error("rejected create conn state");
}
return { label: params?.label ?? "anonymous" };
},
onConnect: (c, conn) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// @ts-nocheck

import { expect, test } from "vitest";
import { describeDriverMatrix } from "./shared-matrix";
import { setupDriverTest } from "./shared-utils";

describeDriverMatrix(
"Connection Preflight Disconnect",
(driverTestConfig) => {
test("should not call onDisconnect when preflight fails", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);
const handle = client.connPreflightVisibilityActor.getOrCreate([
"failed-preflight-disconnect",
crypto.randomUUID(),
]);
const primary = handle.connect({ label: "primary" });
await primary.snapshot();

const rejectedBefore = handle.connect({
label: "rejected-before",
rejectBefore: true,
});
await expect(rejectedBefore.snapshot()).rejects.toThrow();

const rejectedCreate = handle.connect({
label: "rejected-create",
rejectCreate: true,
});
await expect(rejectedCreate.snapshot()).rejects.toThrow();

const snapshot = await primary.snapshot();
expect(snapshot.disconnectSnapshots).toEqual([]);
expect(snapshot.visibleLabels).toEqual(["primary"]);

await primary.dispose();
});
},
{
encodings: ["bare"],
runtimes: ["wasm"],
sqliteBackends: ["remote"],
},
);
Loading