Skip to content

Commit 8af4472

Browse files
committed
wip: fix whatever is going on with envoys
1 parent 7a19ce1 commit 8af4472

6 files changed

Lines changed: 61 additions & 24 deletions

File tree

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,23 @@ pub async fn handle_init(
132132
bypass_cache: false,
133133
}),
134134
// TODO: Move to op
135-
udb.run(|tx| {
135+
udb.run(|root_tx| {
136136
let init = init.clone();
137137
async move {
138-
let tx = tx.with_subspace(pegboard::keys::subspace());
138+
// Write protocol version to runner config so the actor create op
139+
// knows to use the v2 workflow for this pool.
140+
{
141+
let ns_tx = root_tx.with_subspace(namespace::keys::subspace());
142+
ns_tx.write(
143+
&pegboard::keys::runner_config::ProtocolVersionKey::new(
144+
namespace_id,
145+
pool_name.clone(),
146+
),
147+
protocol_version,
148+
)?;
149+
}
150+
151+
let tx = root_tx.with_subspace(pegboard::keys::subspace());
139152

140153
let create_ts_key =
141154
pegboard::keys::envoy::CreateTsKey::new(namespace_id, envoy_key.clone());

pnpm-lock.yaml

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rivetkit-asyncapi/asyncapi.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"asyncapi": "3.0.0",
33
"info": {
44
"title": "RivetKit WebSocket Protocol",
5-
"version": "2.2.0",
5+
"version": "2.2.1",
66
"description": "WebSocket protocol for bidirectional communication between RivetKit clients and actors"
77
},
88
"channels": {

rivetkit-typescript/packages/rivetkit/src/manager/router.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ export function buildManagerRouter(
563563
try {
564564
// Forward the request using the inline client driver
565565
const response = await managerDriver.sendRequest(
566-
actorId,
566+
{ directId: actorId },
567567
new Request(`http://actor/${pathWithQuery}`, {
568568
method: c.req.method,
569569
headers: c.req.raw.headers,
@@ -616,7 +616,7 @@ export function buildManagerRouter(
616616
try {
617617
// Send a special request to the actor to force disconnect the connection
618618
const response = await managerDriver.sendRequest(
619-
actorId,
619+
{ directId: actorId },
620620
new Request(
621621
`http://actor/.test/force-disconnect?conn=${connId}`,
622622
{

rivetkit-typescript/packages/rivetkit/tests/driver-engine.test.ts

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,30 @@ const driverTestConfig = {
5252
);
5353
}
5454

55+
// Create runner config for the namespace so the engine
56+
// knows how to schedule actors for this pool.
57+
const rcResponse = await fetch(
58+
`${namespaceEndpoint}/runner-configs/${runnerName}?namespace=${namespace}`,
59+
{
60+
method: "PUT",
61+
headers: {
62+
"Content-Type": "application/json",
63+
Authorization: "Bearer dev",
64+
},
65+
body: JSON.stringify({
66+
datacenters: {
67+
default: { normal: {} },
68+
},
69+
}),
70+
},
71+
);
72+
if (!rcResponse.ok) {
73+
const errorBody = await rcResponse.text().catch(() => "");
74+
throw new Error(
75+
`Create runner config failed: ${rcResponse.status} ${rcResponse.statusText} ${errorBody}`,
76+
);
77+
}
78+
5579
// Create driver config.
5680
const driverConfig = createEngineDriver();
5781

@@ -83,43 +107,43 @@ const driverTestConfig = {
83107
inlineClient,
84108
);
85109

86-
// Wait for runner registration so tests do not race actor creation
87-
// against asynchronous runner connect.
88-
const runnersUrl = new URL(
89-
`${endpoint.replace(/\/$/, "")}/runners`,
110+
// Wait for envoy registration so tests do not race actor creation
111+
// against asynchronous envoy connect.
112+
const envoysUrl = new URL(
113+
`${endpoint.replace(/\/$/, "")}/envoys`,
90114
);
91-
runnersUrl.searchParams.set("namespace", namespace);
92-
runnersUrl.searchParams.set("name", runnerName);
115+
envoysUrl.searchParams.set("namespace", namespace);
116+
envoysUrl.searchParams.set("name", runnerName);
93117
let probeError: unknown;
94118
for (let attempt = 0; attempt < 120; attempt++) {
95119
try {
96-
const runnerResponse = await fetch(runnersUrl, {
120+
const envoyResponse = await fetch(envoysUrl, {
97121
method: "GET",
98122
headers: {
99123
Authorization: `Bearer ${token}`,
100124
},
101125
});
102-
if (!runnerResponse.ok) {
103-
const errorBody = await runnerResponse
126+
if (!envoyResponse.ok) {
127+
const errorBody = await envoyResponse
104128
.text()
105129
.catch(() => "");
106130
probeError = new Error(
107-
`List runners failed: ${runnerResponse.status} ${runnerResponse.statusText} ${errorBody}`,
131+
`List envoys failed: ${envoyResponse.status} ${envoyResponse.statusText} ${errorBody}`,
108132
);
109133
} else {
110134
const responseJson =
111-
(await runnerResponse.json()) as {
112-
runners?: Array<{ name?: string }>;
135+
(await envoyResponse.json()) as {
136+
envoys?: Array<{ pool_name?: string }>;
113137
};
114-
const hasRunner = !!responseJson.runners?.some(
115-
(runner) => runner.name === runnerName,
138+
const hasEnvoy = !!responseJson.envoys?.some(
139+
(envoy) => envoy.pool_name === runnerName,
116140
);
117-
if (hasRunner) {
141+
if (hasEnvoy) {
118142
probeError = undefined;
119143
break;
120144
}
121145
probeError = new Error(
122-
`Runner ${runnerName} not registered yet`,
146+
`Envoy ${runnerName} not registered yet`,
123147
);
124148
}
125149
} catch (err) {
Binary file not shown.

0 commit comments

Comments
 (0)