Skip to content

Commit 7f1ce0d

Browse files
committed
Upgrade the deployment s2 client to 0.22.5 as well and fix some typecheck issues
1 parent be35f81 commit 7f1ce0d

File tree

10 files changed

+64
-73
lines changed

10 files changed

+64
-73
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const S2EnvSchema = z.preprocess(
3939
S2_ENABLED: z.literal("1"),
4040
S2_ACCESS_TOKEN: z.string(),
4141
S2_DEPLOYMENT_LOGS_BASIN_NAME: z.string(),
42+
S2_DEPLOYMENT_STREAMS_LOCAL: z.string().default("0"),
4243
}),
4344
z.object({
4445
S2_ENABLED: z.literal("0"),

apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ export class DeploymentPresenter {
217217
let eventStream = undefined;
218218
if (
219219
env.S2_ENABLED === "1" &&
220-
(buildServerMetadata || gitMetadata?.source === "trigger_github_app")
220+
(buildServerMetadata || gitMetadata?.source === "trigger_github_app" || env.S2_DEPLOYMENT_STREAMS_LOCAL === "1")
221221
) {
222222
const [error, accessToken] = await tryCatch(this.getS2AccessToken(project.externalRef));
223223

@@ -290,9 +290,9 @@ export class DeploymentPresenter {
290290
return cachedToken;
291291
}
292292

293-
const { access_token: accessToken } = await s2.accessTokens.issue({
293+
const { accessToken } = await s2.accessTokens.issue({
294294
id: `${projectRef}-${new Date().getTime()}`,
295-
expires_at: new Date(Date.now() + 60 * 60 * 1000).toISOString(), // 1 hour
295+
expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour
296296
scope: {
297297
ops: ["read"],
298298
basins: {

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,17 +198,14 @@ export default function Page() {
198198

199199
const readSession = await stream.readSession(
200200
{
201-
seq_num: 0,
202-
wait: 60,
203-
as: "bytes",
201+
start: { from: { seqNum: 0 }, clamp: true },
202+
stop: { waitSecs: 60 },
204203
},
205204
{ signal: abortController.signal }
206205
);
207206

208-
const decoder = new TextDecoder();
209-
210207
for await (const record of readSession) {
211-
const decoded = decoder.decode(record.body);
208+
const decoded = record.body;
212209
const result = DeploymentEventFromString.safeParse(decoded);
213210

214211
if (!result.success) {
@@ -217,8 +214,8 @@ export default function Page() {
217214
const headers: Record<string, string> = {};
218215

219216
if (record.headers) {
220-
for (const [nameBytes, valueBytes] of record.headers) {
221-
headers[decoder.decode(nameBytes)] = decoder.decode(valueBytes);
217+
for (const [name, value] of record.headers) {
218+
headers[name] = value;
222219
}
223220
}
224221
const level = (headers["level"]?.toLowerCase() as LogEntry["level"]) ?? "info";

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -101,29 +101,27 @@ const { action, loader } = createActionApiRoute(
101101
run.realtimeStreamsVersion
102102
);
103103

104-
if (realtimeStream.readRecords) {
105-
const records = await realtimeStream.readRecords(
106-
run.friendlyId,
107-
`$trigger.input:${body.streamId}`,
108-
body.lastSeqNum
109-
);
110-
111-
if (records.length > 0) {
112-
const record = records[0]!;
113-
114-
// Record data is the raw user payload — no wrapper to unwrap
115-
await engine.completeWaitpoint({
116-
id: result.waitpoint.id,
117-
output: {
118-
value: record.data,
119-
type: "application/json",
120-
isError: false,
121-
},
122-
});
123-
124-
// Clean up the Redis cache since we completed it ourselves
125-
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
126-
}
104+
const records = await realtimeStream.readRecords(
105+
run.friendlyId,
106+
`$trigger.input:${body.streamId}`,
107+
body.lastSeqNum
108+
);
109+
110+
if (records.length > 0) {
111+
const record = records[0]!;
112+
113+
// Record data is the raw user payload — no wrapper to unwrap
114+
await engine.completeWaitpoint({
115+
id: result.waitpoint.id,
116+
output: {
117+
value: record.data,
118+
type: "application/json",
119+
isError: false,
120+
},
121+
});
122+
123+
// Clean up the Redis cache since we completed it ourselves
124+
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
127125
}
128126
} catch {
129127
// Non-fatal: if the S2 check fails, the waitpoint is still PENDING.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,4 +466,8 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
466466
});
467467
}
468468
}
469+
470+
async readRecords(): Promise<never> {
471+
throw new Error("readRecords is not implemented for Redis realtime streams");
472+
}
469473
}

apps/webapp/app/services/realtime/types.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,11 @@ export interface StreamIngestor {
2323

2424
getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise<number>;
2525

26-
/**
27-
* Read records from a stream starting after a given sequence number.
28-
* Returns immediately with whatever records exist (non-blocking).
29-
* Not all backends support this — returns undefined if unsupported.
30-
*/
31-
readRecords?(
26+
readRecords(
3227
runId: string,
3328
streamId: string,
3429
afterSeqNum?: number
35-
): Promise<StreamRecord[] | undefined>;
30+
): Promise<StreamRecord[]>;
3631
}
3732

3833
export type StreamResponseOptions = {

apps/webapp/app/v3/services/deployment.server.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { env } from "~/env.server";
1313
import { createRemoteImageBuild } from "../remoteImageBuilder.server";
1414
import { FINAL_DEPLOYMENT_STATUSES } from "./failDeployment.server";
1515
import { enqueueBuild, generateRegistryCredentials } from "~/services/platform.v3.server";
16-
import { AppendRecord, S2 } from "@s2-dev/streamstore";
16+
import { AppendInput, AppendRecord, S2 } from "@s2-dev/streamstore";
1717
import { createRedisClient } from "~/redis.server";
1818

1919
const S2_TOKEN_KEY_PREFIX = "s2-token:read:deployment-event-stream:project:";
@@ -368,7 +368,11 @@ export class DeploymentService extends BaseService {
368368
);
369369

370370
return fromPromise(
371-
stream.append(events.map((event) => AppendRecord.string({ body: JSON.stringify(event) }))),
371+
stream.append(
372+
AppendInput.create(
373+
events.map((event) => AppendRecord.string({ body: JSON.stringify(event) }))
374+
)
375+
),
372376
(error) => ({
373377
type: "failed_to_append_to_event_log" as const,
374378
cause: error,
@@ -396,9 +400,9 @@ export class DeploymentService extends BaseService {
396400
type: "failed_to_create_event_stream" as const,
397401
cause: error,
398402
})
399-
).map(({ name }) => ({
403+
).map(() => ({
400404
basin: basin.name,
401-
stream: name,
405+
stream: `projects/${project.externalRef}/deployments/${deployment.shortCode}`,
402406
}));
403407
}
404408

@@ -426,7 +430,7 @@ export class DeploymentService extends BaseService {
426430
fromPromise(
427431
s2.accessTokens.issue({
428432
id: `${project.externalRef}-${new Date().getTime()}`,
429-
expires_at: new Date(Date.now() + 60 * 60 * 1000).toISOString(), // 1 hour
433+
expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour
430434
scope: {
431435
ops: ["read"],
432436
basins: {
@@ -441,7 +445,7 @@ export class DeploymentService extends BaseService {
441445
type: "other" as const,
442446
cause: error,
443447
})
444-
).map(({ access_token }) => access_token);
448+
).map(({ accessToken }) => accessToken);
445449

446450
const cacheToken = (token: string) =>
447451
fromPromise(

packages/cli-v3/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
"@opentelemetry/resources": "2.0.1",
9393
"@opentelemetry/sdk-trace-node": "2.0.1",
9494
"@opentelemetry/semantic-conventions": "1.36.0",
95-
"@s2-dev/streamstore": "^0.17.6",
95+
"@s2-dev/streamstore": "^0.22.5",
9696
"@trigger.dev/build": "workspace:4.4.1",
9797
"@trigger.dev/core": "workspace:4.4.1",
9898
"@trigger.dev/schema-to-json": "workspace:4.4.1",

packages/cli-v3/src/commands/deploy.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,9 +1155,8 @@ async function handleNativeBuildServerDeploy({
11551155
const [readSessionError, readSession] = await tryCatch(
11561156
stream.readSession(
11571157
{
1158-
seq_num: 0,
1159-
wait: 60 * 20, // 20 minutes
1160-
as: "bytes",
1158+
start: { from: { seqNum: 0 }, clamp: true },
1159+
stop: { waitSecs: 60 * 20 }, // 20 minutes
11611160
},
11621161
{ signal: abortController.signal }
11631162
)
@@ -1176,12 +1175,11 @@ async function handleNativeBuildServerDeploy({
11761175
return process.exit(0);
11771176
}
11781177

1179-
const decoder = new TextDecoder();
11801178
let finalDeploymentEvent: DeploymentFinalizedEvent["data"] | undefined;
11811179
let queuedSpinnerStopped = false;
11821180

11831181
for await (const record of readSession) {
1184-
const decoded = decoder.decode(record.body);
1182+
const decoded = record.body;
11851183
const result = DeploymentEventFromString.safeParse(decoded);
11861184
if (!result.success) {
11871185
logger.debug("Failed to parse deployment event, skipping", {
@@ -1195,7 +1193,7 @@ async function handleNativeBuildServerDeploy({
11951193

11961194
switch (event.type) {
11971195
case "log": {
1198-
if (record.seq_num === 0) {
1196+
if (record.seqNum === 0) {
11991197
$queuedSpinner.stop("Build started");
12001198
console.log("│");
12011199
queuedSpinnerStopped = true;

pnpm-lock.yaml

Lines changed: 13 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)