Skip to content

Commit 2a32b9c

Browse files
committed
Graphql server improvements cherry picked from 3js branch
1 parent 6ed9f10 commit 2a32b9c

5 files changed

Lines changed: 437 additions & 227 deletions

File tree

spellsource-client/src/unity

Submodule unity updated from 35482fb to bf9a971

spellsource-graphql/src/apollo-server.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,28 @@ type Handler = (req: Request, res: Response, next: NextFunction) => void;
2020
export const setupApolloServer = async (app: Application, httpServer: Server): Promise<ApolloServer> => {
2121
const schema = await createFullSchema();
2222

23+
// Strawberry Shake passes the auth token via `?accessToken=<jwt>` on the WS URL
24+
// (see SpellsourceClient.ConfigureWebSocketClient on the client). Extract it during
25+
// upgrade so subscription operations can forward it to the Java backend.
26+
const tokenFromUrl = (url: string | undefined): string | undefined => {
27+
if (!url) return undefined;
28+
const parsed = new URL(url, "http://localhost");
29+
const token = parsed.searchParams.get("accessToken");
30+
return token && token !== "0" ? token : undefined;
31+
};
32+
2333
// Modern graphql-transport-ws protocol (graphql-ws library)
2434
const graphqlWsServer = new WebSocketServer({ noServer: true });
25-
const serverCleanup = useServer({ schema }, graphqlWsServer);
35+
const serverCleanup = useServer(
36+
{
37+
schema,
38+
// The context returned here is passed as `executionRequest.context` to
39+
// schema executors — including the wsExecutor in spellsource.ts which
40+
// reads `token` to forward auth to the Java backend's connection_init.
41+
context: (ctx) => ({ token: tokenFromUrl(ctx.extra.request.url) }),
42+
},
43+
graphqlWsServer,
44+
);
2645

2746
// Legacy graphql-ws subprotocol (subscriptions-transport-ws library) — for Strawberry Shake clients
2847
const legacyWsServer = new WebSocketServer({ noServer: true });
@@ -31,6 +50,19 @@ export const setupApolloServer = async (app: Application, httpServer: Server): P
3150
schema,
3251
execute: execute as any,
3352
subscribe: subscribe as any,
53+
onConnect: (_params: any, _ws: any, connectionContext: any) => {
54+
// onConnect's return value becomes the operation context for execute/subscribe.
55+
const token = tokenFromUrl(connectionContext?.request?.url);
56+
console.log(`[legacy-ws] client connected, hasToken=${!!token}`);
57+
return { token };
58+
},
59+
onOperation: (_msg: any, params: any) => {
60+
console.log("[legacy-ws] operation started:", JSON.stringify(params.query?.substring(0, 200)));
61+
return params;
62+
},
63+
onDisconnect: () => {
64+
console.log("[legacy-ws] client disconnected");
65+
},
3466
},
3567
legacyWsServer,
3668
);
@@ -44,12 +76,20 @@ export const setupApolloServer = async (app: Application, httpServer: Server): P
4476
? protocolHeader
4577
: protocolHeader?.split(",").map((p) => p.trim());
4678

79+
console.log(`[ws-upgrade] protocols requested: ${JSON.stringify(protocols)}`);
80+
4781
const wss =
4882
protocols?.includes(GRAPHQL_WS) && !protocols.includes(GRAPHQL_TRANSPORT_WS_PROTOCOL)
4983
? legacyWsServer
5084
: graphqlWsServer;
5185

86+
console.log(`[ws-upgrade] routing to: ${wss === legacyWsServer ? "legacy" : "modern"}`);
87+
5288
wss.handleUpgrade(req, socket, head, (ws) => {
89+
// Log raw messages from the client
90+
ws.on("message", (data) => {
91+
console.log(`[ws-message] ${data.toString().substring(0, 500)}`);
92+
});
5393
wss.emit("connection", ws, req);
5494
});
5595
});

spellsource-graphql/src/schema/spellsource.ts

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import { buildHTTPExecutor } from "@graphql-tools/executor-http";
22
import { schemaFromExecutor, wrapSchema } from "@graphql-tools/wrap";
3+
import { Executor, observableToAsyncIterable } from "@graphql-tools/utils";
4+
import { getOperationAST, print } from "graphql";
5+
import { createClient } from "graphql-ws";
6+
import WebSocket from "ws";
37
import { spellsourceHost, spellsourcePort } from "../config";
48
import { AuthRequest } from "../auth";
59

6-
const executor = buildHTTPExecutor({
10+
// HTTP executor — used for queries and mutations.
11+
const httpExecutor = buildHTTPExecutor({
712
endpoint: `http://${spellsourceHost}:${spellsourcePort}/graphql`,
813
fetch: (url, init, context, info) => {
914
const req = context as AuthRequest;
@@ -18,8 +23,65 @@ const executor = buildHTTPExecutor({
1823
},
1924
});
2025

26+
// WebSocket executor — used for subscriptions. Forwards subscription operations
27+
// to the Java backend's graphql-transport-ws endpoint at /graphql, opening a fresh
28+
// graphql-ws connection per subscription so the auth token can be passed via
29+
// connection_init payload (which the backend's GraphQLWSHandler reads).
30+
const wsExecutor: Executor = (executionRequest) => {
31+
const req = executionRequest.context as AuthRequest | undefined;
32+
const token = req?.token;
33+
34+
const client = createClient({
35+
url: `ws://${spellsourceHost}:${spellsourcePort}/graphql`,
36+
webSocketImpl: WebSocket,
37+
connectionParams: token ? { Authorization: `Bearer ${token}` } : {},
38+
lazy: false,
39+
});
40+
41+
return observableToAsyncIterable({
42+
subscribe: (observer) => {
43+
const unsubscribe = client.subscribe(
44+
{
45+
query: print(executionRequest.document),
46+
variables: executionRequest.variables as Record<string, unknown>,
47+
operationName: executionRequest.operationName,
48+
extensions: executionRequest.extensions as Record<string, unknown>,
49+
},
50+
{
51+
next: (data) => observer.next?.(data as never),
52+
error: (err) => {
53+
if (!observer.error) return;
54+
if (err instanceof Error) {
55+
observer.error(err);
56+
} else if (Array.isArray(err)) {
57+
observer.error(new Error(err.map((e) => (e as { message: string }).message).join(", ")));
58+
} else {
59+
observer.error(new Error(`Subscription error: ${JSON.stringify(err)}`));
60+
}
61+
},
62+
complete: () => observer.complete?.(),
63+
},
64+
);
65+
return {
66+
unsubscribe: () => {
67+
unsubscribe();
68+
void client.dispose();
69+
},
70+
};
71+
},
72+
});
73+
};
74+
75+
const executor: Executor = (executionRequest) => {
76+
const operationAST = getOperationAST(executionRequest.document, executionRequest.operationName);
77+
if (operationAST?.operation === "subscription") {
78+
return wsExecutor(executionRequest);
79+
}
80+
return httpExecutor(executionRequest);
81+
};
82+
2183
export const createSpellsourceSchema = async () =>
2284
wrapSchema({
23-
schema: await schemaFromExecutor(executor),
85+
schema: await schemaFromExecutor(httpExecutor),
2486
executor,
2587
});

spellsource-server/src/main/java/com/hiddenswitch/framework/impl/GraphQLSubscriptionResolverImpl.java

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package com.hiddenswitch.framework.impl;
22

33
import com.hiddenswitch.framework.Accounts;
4+
import com.hiddenswitch.framework.Matchmaking;
45
import com.hiddenswitch.framework.graphql.*;
56
import graphql.kickstart.tools.GraphQLSubscriptionResolver;
6-
import io.vertx.core.Future;
7+
import io.vertx.core.Vertx;
8+
import io.vertx.core.eventbus.MessageConsumer;
79
import org.apache.commons.lang3.NotImplementedException;
810
import org.reactivestreams.Publisher;
11+
import org.reactivestreams.Subscriber;
12+
import org.reactivestreams.Subscription;
913

1014
import javax.naming.AuthenticationException;
15+
import java.util.concurrent.atomic.AtomicBoolean;
1116

1217
/**
1318
* GraphQL subscription resolver.
@@ -37,9 +42,19 @@ public Publisher<ServerGameMessage> gameMessages() throws Exception {
3742
return GraphQLGameBridge.gameMessagesPublisher(userId);
3843
}
3944

45+
/**
46+
* Emits a {@link MatchFound} when the matchmaking system creates a game for the
47+
* authenticated user. Listens on the event bus address
48+
* {@code matchmaking:enqueue:{userId}} which is published to by
49+
* {@link Matchmaking#notifyGameReady(String, String)}.
50+
*/
4051
@Override
4152
public Publisher<MatchFound> matchFound() throws Exception {
42-
throw new NotImplementedException();
53+
var userId = Accounts.userId();
54+
if (userId == null) {
55+
throw new AuthenticationException("must be authenticated");
56+
}
57+
return new MatchFoundPublisher(userId);
4358
}
4459

4560
@Override
@@ -56,4 +71,66 @@ public Publisher<Invite> inviteUpdated() throws Exception {
5671
public Publisher<EditableCard> editableCardUpdated() throws Exception {
5772
throw new NotImplementedException();
5873
}
74+
75+
/**
76+
* A Publisher that listens on the event bus for matchmaking notifications and emits
77+
* {@link MatchFound} when the user is matched into a game.
78+
* <p>
79+
* The event bus message body is the gameId string. The publisher replies to the
80+
* event bus message to acknowledge receipt (the matchmaker waits for this reply
81+
* before proceeding).
82+
*/
83+
private static class MatchFoundPublisher implements Publisher<MatchFound> {
84+
private final String userId;
85+
86+
MatchFoundPublisher(String userId) {
87+
this.userId = userId;
88+
}
89+
90+
@Override
91+
public void subscribe(Subscriber<? super MatchFound> subscriber) {
92+
var cancelled = new AtomicBoolean(false);
93+
var address = Matchmaking.MATCHMAKING_ENQUEUE + userId;
94+
var vertx = Vertx.currentContext().owner();
95+
var eventBus = vertx.eventBus();
96+
97+
MessageConsumer<String> consumer = eventBus.consumer(address);
98+
99+
subscriber.onSubscribe(new Subscription() {
100+
@Override
101+
public void request(long n) {
102+
// backpressure not needed; match events are rare
103+
}
104+
105+
@Override
106+
public void cancel() {
107+
cancelled.set(true);
108+
consumer.unregister();
109+
}
110+
});
111+
112+
consumer.handler(message -> {
113+
if (cancelled.get()) return;
114+
115+
var gameId = message.body();
116+
// Reply to acknowledge receipt so the matchmaker knows the player was notified
117+
message.reply("ok");
118+
119+
var matchFound = MatchFound.builder()
120+
.setGameId(gameId)
121+
.setUrl("")
122+
.setPlayerKey("")
123+
.setPlayerSecret("")
124+
.build();
125+
126+
subscriber.onNext(matchFound);
127+
});
128+
129+
consumer.exceptionHandler(err -> {
130+
if (!cancelled.get()) {
131+
subscriber.onError(err);
132+
}
133+
});
134+
}
135+
}
59136
}

0 commit comments

Comments
 (0)