Skip to content
This repository was archived by the owner on Feb 27, 2023. It is now read-only.

Commit e9c92ad

Browse files
authored
fix: rpc race condition (#702)
* test race condition * linter * final touches and linter * fix build
1 parent 4c2efba commit e9c92ad

29 files changed

Lines changed: 158 additions & 118 deletions

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"@dcl/kernel-interface": "^2.0.0-20210922153939.commit-017905d",
6767
"@dcl/legacy-ecs": "^6.11.8",
6868
"@dcl/protocol": "^1.0.0-3373991894.commit-8aa3a49",
69-
"@dcl/rpc": "^1.0.4-20221113192916.commit-3ef5187",
69+
"@dcl/rpc": "^1.1.1-20221115000939.commit-9a51ad0",
7070
"@dcl/scene-runtime": "^1.0.0-20221102005705.commit-05d463c",
7171
"@dcl/schemas": "^5.21.0",
7272
"@dcl/urn-resolver": "^2.0.2",

packages/entryPoints/inject.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ globalThis.process = {
44
browser: true,
55
env: {},
66
nextTick(fn, ...args) {
7-
require('queue-microtask')(() => fn(...args))
7+
queueMicrotask(() => fn(...args))
88
}
99
}
Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,8 @@
1-
import { createRpcClient, Transport } from '@dcl/rpc'
2-
import future, { IFuture } from 'fp-future'
3-
import { registerCRDTService } from './services/crdtService'
4-
import { RendererProtocol } from './types'
1+
import { createRpcClient, RpcClientPort, Transport } from '@dcl/rpc'
52

6-
export const rendererProtocol: IFuture<RendererProtocol> = future()
7-
8-
export async function createRendererRpcClient(transport: Transport): Promise<RendererProtocol> {
3+
export async function createRendererRpcClient(transport: Transport): Promise<RpcClientPort> {
94
const rpcClient = await createRpcClient(transport)
105
const clientPort = await rpcClient.createPort('renderer-protocol')
116

12-
const crdtService = registerCRDTService(clientPort)
13-
14-
rendererProtocol.resolve({
15-
crdtService
16-
})
17-
18-
return rendererProtocol
7+
return clientPort
198
}

packages/renderer-protocol/transports/webSocketTransportAdapter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ export function webSocketTransportAdapter(url: string, options: CommonRendererOp
9090

9191
const transport: Transport = {
9292
...events,
93+
get isConnected(): boolean {
94+
return (socket && socket.readyState === socket.OPEN) || false
95+
},
9396
sendMessage(message: any) {
9497
send(message)
9598
},

packages/renderer-protocol/transports/webTransport.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export type WebTransportOptions = {
55
wasmModule: any
66
}
77

8-
export function webTransport(options: WebTransportOptions): Transport {
8+
export function webTransport(options: WebTransportOptions, unityDclInstance: any) {
99
const events = mitt<TransportEvents>()
1010
const ALLOC_SIZE = 8388608
1111
let heapPtr: number
@@ -17,15 +17,25 @@ export function webTransport(options: WebTransportOptions): Transport {
1717
}
1818

1919
let isClosed = false
20+
let didConnect = false
2021

21-
;(globalThis as any).DCL.BinaryMessageFromEngine = function (data: Uint8Array) {
22+
unityDclInstance.BinaryMessageFromEngine = function (data: Uint8Array) {
23+
if (!didConnect) {
24+
throw new Error('Received data from unity before connection was established')
25+
}
2226
const copiedData = new Uint8Array(data)
2327
events.emit('message', copiedData)
2428
}
2529

2630
const transport: Transport = {
2731
...events,
32+
get isConnected() {
33+
return didConnect
34+
},
2835
sendMessage(message) {
36+
if (!didConnect) {
37+
throw new Error('Tried to send a message before connection was established')
38+
}
2939
if (!!sendMessageToRenderer && !isClosed) {
3040
options.wasmModule.HEAPU8.set(message, heapPtr)
3141
sendMessageToRenderer(heapPtr, message.length)
@@ -39,7 +49,12 @@ export function webTransport(options: WebTransportOptions): Transport {
3949
}
4050
}
4151

42-
queueMicrotask(() => events.emit('connect', {}))
52+
events.on('connect', () => {
53+
didConnect = true
54+
})
55+
56+
// connect the transport
57+
events.emit('connect', {})
4358

4459
return transport
4560
}

packages/shared/apis/host/EngineAPI.ts

Lines changed: 48 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import {
1111

1212
import { PortContext } from './context'
1313
import { EntityAction, EntityActionType } from 'shared/types'
14-
15-
import { rendererProtocol } from './../../../renderer-protocol/rpcClient'
14+
import { registerCRDTService } from 'renderer-protocol/services/crdtService'
1615

1716
function getPayload(payloadType: EAType, payload: Payload): any {
1817
switch (payloadType) {
@@ -60,60 +59,62 @@ function getPayload(payloadType: EAType, payload: Payload): any {
6059
}
6160

6261
export function registerEngineApiServiceServerImplementation(port: RpcServerPort<PortContext>) {
63-
codegen.registerService(port, EngineApiServiceDefinition, async () => ({
64-
async sendBatch(req: ManyEntityAction, ctx) {
65-
const actions: EntityAction[] = []
62+
codegen.registerService(port, EngineApiServiceDefinition, async (port, ctx) => {
63+
const crdtService = registerCRDTService(ctx.rendererPort)
64+
65+
return {
66+
async sendBatch(req: ManyEntityAction, ctx) {
67+
const actions: EntityAction[] = []
6668

67-
for (const action of req.actions) {
68-
const actionType = eaTypeToStr(action.type)
69-
if (actionType && action.payload) {
70-
actions.push({
71-
type: actionType,
72-
tag: action.tag,
73-
payload: getPayload(action.type, action.payload as any)
74-
})
69+
for (const action of req.actions) {
70+
const actionType = eaTypeToStr(action.type)
71+
if (actionType && action.payload) {
72+
actions.push({
73+
type: actionType,
74+
tag: action.tag,
75+
payload: getPayload(action.type, action.payload as any)
76+
})
77+
}
7578
}
76-
}
7779

78-
if (actions.length) {
79-
ctx.sendBatch(actions)
80-
}
80+
if (actions.length) {
81+
ctx.sendBatch(actions)
82+
}
8183

82-
const events: EventData[] = ctx.events
84+
const events: EventData[] = ctx.events
8385

84-
if (events.length) {
85-
ctx.events = []
86-
}
86+
if (events.length) {
87+
ctx.events = []
88+
}
8789

88-
return { events }
89-
},
90+
return { events }
91+
},
9092

91-
async subscribe(req, ctx) {
92-
ctx.subscribedEvents.add(req.eventId)
93-
return {}
94-
},
95-
async unsubscribe(req, ctx) {
96-
ctx.subscribedEvents.delete(req.eventId)
97-
return {}
98-
},
99-
async crdtSendToRenderer(req, ctx) {
100-
const protocol = await rendererProtocol
101-
return protocol.crdtService.sendCrdt({
102-
sceneId: ctx.sceneData.id,
103-
payload: req.data,
104-
sceneNumber: ctx.sceneData.sceneNumber
105-
})
106-
},
93+
async subscribe(req, ctx) {
94+
ctx.subscribedEvents.add(req.eventId)
95+
return {}
96+
},
97+
async unsubscribe(req, ctx) {
98+
ctx.subscribedEvents.delete(req.eventId)
99+
return {}
100+
},
101+
async crdtSendToRenderer(req, ctx) {
102+
return crdtService.sendCrdt({
103+
sceneId: ctx.sceneData.id,
104+
payload: req.data,
105+
sceneNumber: ctx.sceneData.sceneNumber
106+
})
107+
},
107108

108-
async crdtGetMessageFromRenderer(_, ctx) {
109-
const protocol = await rendererProtocol
110-
const response = await protocol.crdtService.pullCrdt({
111-
sceneId: ctx.sceneData.id,
112-
sceneNumber: ctx.sceneData.sceneNumber
113-
})
114-
return { data: [response.payload] }
109+
async crdtGetMessageFromRenderer(_, ctx) {
110+
const response = await crdtService.pullCrdt({
111+
sceneId: ctx.sceneData.id,
112+
sceneNumber: ctx.sceneData.sceneNumber
113+
})
114+
return { data: [response.payload] }
115+
}
115116
}
116-
}))
117+
})
117118
}
118119
function eaTypeToStr(type: EAType): EntityActionType | null {
119120
switch (type) {

packages/shared/apis/host/context.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { ILogger } from './../../logger'
22
import { EntityAction, LoadableScene } from './../../types'
33
import { PermissionItem } from '@dcl/protocol/out-ts/decentraland/kernel/apis/permissions.gen'
44
import { EventData } from '@dcl/protocol/out-ts/decentraland/kernel/apis/engine_api.gen'
5+
import { RpcClientPort } from '@dcl/rpc'
56

67
type WithRequired<T, K extends keyof T> = T & { [P in K]-?: T[P] }
78

@@ -22,4 +23,7 @@ export type PortContext = {
2223
sendSceneEvent<K extends keyof IEvents>(id: K, event: IEvents[K]): void
2324
sendProtoSceneEvent(event: EventData): void
2425
logger: ILogger
26+
27+
// port used for this specific scene in the renderer
28+
rendererPort: RpcClientPort
2529
}

packages/shared/comms/adapters/OfflineAdapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export class OfflineAdapter implements MinimumCommunicationsAdapter {
1010
async getVoiceHandler(): Promise<VoiceHandler> {
1111
return createOpusVoiceHandler()
1212
}
13-
async disconnect(error?: Error | undefined): Promise<void> {}
14-
send(data: Uint8Array, hints: SendHints): void {}
13+
async disconnect(_error?: Error | undefined): Promise<void> {}
14+
send(_data: Uint8Array, _hints: SendHints): void {}
1515
async connect(): Promise<void> {}
1616
}

packages/shared/comms/adapters/SimulatorAdapter.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ export class SimulationRoom implements RoomConnection {
4646
this.tick = setInterval(this.update.bind(this), 60)
4747
this.roomConnection = new Rfc4RoomConnection({
4848
events: mitt<CommsAdapterEvents>(),
49-
send(data: Uint8Array, hints: SendHints): void {},
49+
send(_data: Uint8Array, _hints: SendHints): void {},
5050
async connect(): Promise<void> {},
51-
async disconnect(error?: Error): Promise<void> {},
51+
async disconnect(_error?: Error): Promise<void> {},
5252
async getVoiceHandler() {
5353
throw new Error('not implemented')
5454
}
@@ -190,11 +190,11 @@ export class SimulationRoom implements RoomConnection {
190190
}
191191
}
192192

193-
async disconnect(error?: Error | undefined): Promise<void> {
193+
async disconnect(_error?: Error | undefined): Promise<void> {
194194
clearInterval(this.tick)
195195
}
196196

197-
send(data: Uint8Array, hints: SendHints): void {}
197+
send(_data: Uint8Array, _hints: SendHints): void {}
198198

199199
async connect(): Promise<void> {
200200
await Promise.all(new Array(100).fill(0).map(() => this.spawnPeer()))

0 commit comments

Comments
 (0)