Skip to content
This repository was archived by the owner on Feb 27, 2023. It is now read-only.

Commit 06ff955

Browse files
authored
Revert "fix: rpc race condition (#702)" (#712)
This reverts commit e9c92ad.
1 parent 069da7b commit 06ff955

29 files changed

Lines changed: 118 additions & 158 deletions

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"@dcl/kernel-interface": "^2.0.0-20210922153939.commit-017905d",
6767
"@dcl/legacy-ecs": "^6.11.8",
6868
"@dcl/protocol": "^1.0.0-3373991894.commit-8aa3a49",
69-
"@dcl/rpc": "^1.1.1-20221115000939.commit-9a51ad0",
69+
"@dcl/rpc": "^1.0.4-20221113192916.commit-3ef5187",
7070
"@dcl/scene-runtime": "^1.0.0-20221102005705.commit-05d463c",
7171
"@dcl/schemas": "^5.21.0",
7272
"@dcl/urn-resolver": "^2.0.2",

packages/entryPoints/inject.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ globalThis.process = {
44
browser: true,
55
env: {},
66
nextTick(fn, ...args) {
7-
queueMicrotask(() => fn(...args))
7+
require('queue-microtask')(() => fn(...args))
88
}
99
}
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
1-
import { createRpcClient, RpcClientPort, Transport } from '@dcl/rpc'
1+
import { createRpcClient, Transport } from '@dcl/rpc'
2+
import future, { IFuture } from 'fp-future'
3+
import { registerCRDTService } from './services/crdtService'
4+
import { RendererProtocol } from './types'
25

3-
export async function createRendererRpcClient(transport: Transport): Promise<RpcClientPort> {
6+
export const rendererProtocol: IFuture<RendererProtocol> = future()
7+
8+
export async function createRendererRpcClient(transport: Transport): Promise<RendererProtocol> {
49
const rpcClient = await createRpcClient(transport)
510
const clientPort = await rpcClient.createPort('renderer-protocol')
611

7-
return clientPort
12+
const crdtService = registerCRDTService(clientPort)
13+
14+
rendererProtocol.resolve({
15+
crdtService
16+
})
17+
18+
return rendererProtocol
819
}

packages/renderer-protocol/transports/webSocketTransportAdapter.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,6 @@ export function webSocketTransportAdapter(url: string, options: CommonRendererOp
9090

9191
const transport: Transport = {
9292
...events,
93-
get isConnected(): boolean {
94-
return (socket && socket.readyState === socket.OPEN) || false
95-
},
9693
sendMessage(message: any) {
9794
send(message)
9895
},

packages/renderer-protocol/transports/webTransport.ts

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ export type WebTransportOptions = {
55
wasmModule: any
66
}
77

8-
export function webTransport(options: WebTransportOptions, unityDclInstance: any) {
8+
export function webTransport(options: WebTransportOptions): Transport {
99
const events = mitt<TransportEvents>()
1010
const ALLOC_SIZE = 8388608
1111
let heapPtr: number
@@ -17,25 +17,15 @@ export function webTransport(options: WebTransportOptions, unityDclInstance: any
1717
}
1818

1919
let isClosed = false
20-
let didConnect = false
2120

22-
unityDclInstance.BinaryMessageFromEngine = function (data: Uint8Array) {
23-
if (!didConnect) {
24-
throw new Error('Received data from unity before connection was established')
25-
}
21+
;(globalThis as any).DCL.BinaryMessageFromEngine = function (data: Uint8Array) {
2622
const copiedData = new Uint8Array(data)
2723
events.emit('message', copiedData)
2824
}
2925

3026
const transport: Transport = {
3127
...events,
32-
get isConnected() {
33-
return didConnect
34-
},
3528
sendMessage(message) {
36-
if (!didConnect) {
37-
throw new Error('Tried to send a message before connection was established')
38-
}
3929
if (!!sendMessageToRenderer && !isClosed) {
4030
options.wasmModule.HEAPU8.set(message, heapPtr)
4131
sendMessageToRenderer(heapPtr, message.length)
@@ -49,12 +39,7 @@ export function webTransport(options: WebTransportOptions, unityDclInstance: any
4939
}
5040
}
5141

52-
events.on('connect', () => {
53-
didConnect = true
54-
})
55-
56-
// connect the transport
57-
events.emit('connect', {})
42+
queueMicrotask(() => events.emit('connect', {}))
5843

5944
return transport
6045
}

packages/shared/apis/host/EngineAPI.ts

Lines changed: 47 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import {
1111

1212
import { PortContext } from './context'
1313
import { EntityAction, EntityActionType } from 'shared/types'
14-
import { registerCRDTService } from 'renderer-protocol/services/crdtService'
14+
15+
import { rendererProtocol } from './../../../renderer-protocol/rpcClient'
1516

1617
function getPayload(payloadType: EAType, payload: Payload): any {
1718
switch (payloadType) {
@@ -59,62 +60,60 @@ function getPayload(payloadType: EAType, payload: Payload): any {
5960
}
6061

6162
export function registerEngineApiServiceServerImplementation(port: RpcServerPort<PortContext>) {
62-
codegen.registerService(port, EngineApiServiceDefinition, async (port, ctx) => {
63-
const crdtService = registerCRDTService(ctx.rendererPort)
64-
65-
return {
66-
async sendBatch(req: ManyEntityAction, ctx) {
67-
const actions: EntityAction[] = []
63+
codegen.registerService(port, EngineApiServiceDefinition, async () => ({
64+
async sendBatch(req: ManyEntityAction, ctx) {
65+
const actions: EntityAction[] = []
6866

69-
for (const action of req.actions) {
70-
const actionType = eaTypeToStr(action.type)
71-
if (actionType && action.payload) {
72-
actions.push({
73-
type: actionType,
74-
tag: action.tag,
75-
payload: getPayload(action.type, action.payload as any)
76-
})
77-
}
67+
for (const action of req.actions) {
68+
const actionType = eaTypeToStr(action.type)
69+
if (actionType && action.payload) {
70+
actions.push({
71+
type: actionType,
72+
tag: action.tag,
73+
payload: getPayload(action.type, action.payload as any)
74+
})
7875
}
76+
}
7977

80-
if (actions.length) {
81-
ctx.sendBatch(actions)
82-
}
78+
if (actions.length) {
79+
ctx.sendBatch(actions)
80+
}
8381

84-
const events: EventData[] = ctx.events
82+
const events: EventData[] = ctx.events
8583

86-
if (events.length) {
87-
ctx.events = []
88-
}
84+
if (events.length) {
85+
ctx.events = []
86+
}
8987

90-
return { events }
91-
},
88+
return { events }
89+
},
9290

93-
async subscribe(req, ctx) {
94-
ctx.subscribedEvents.add(req.eventId)
95-
return {}
96-
},
97-
async unsubscribe(req, ctx) {
98-
ctx.subscribedEvents.delete(req.eventId)
99-
return {}
100-
},
101-
async crdtSendToRenderer(req, ctx) {
102-
return crdtService.sendCrdt({
103-
sceneId: ctx.sceneData.id,
104-
payload: req.data,
105-
sceneNumber: ctx.sceneData.sceneNumber
106-
})
107-
},
91+
async subscribe(req, ctx) {
92+
ctx.subscribedEvents.add(req.eventId)
93+
return {}
94+
},
95+
async unsubscribe(req, ctx) {
96+
ctx.subscribedEvents.delete(req.eventId)
97+
return {}
98+
},
99+
async crdtSendToRenderer(req, ctx) {
100+
const protocol = await rendererProtocol
101+
return protocol.crdtService.sendCrdt({
102+
sceneId: ctx.sceneData.id,
103+
payload: req.data,
104+
sceneNumber: ctx.sceneData.sceneNumber
105+
})
106+
},
108107

109-
async crdtGetMessageFromRenderer(_, ctx) {
110-
const response = await crdtService.pullCrdt({
111-
sceneId: ctx.sceneData.id,
112-
sceneNumber: ctx.sceneData.sceneNumber
113-
})
114-
return { data: [response.payload] }
115-
}
108+
async crdtGetMessageFromRenderer(_, ctx) {
109+
const protocol = await rendererProtocol
110+
const response = await protocol.crdtService.pullCrdt({
111+
sceneId: ctx.sceneData.id,
112+
sceneNumber: ctx.sceneData.sceneNumber
113+
})
114+
return { data: [response.payload] }
116115
}
117-
})
116+
}))
118117
}
119118
function eaTypeToStr(type: EAType): EntityActionType | null {
120119
switch (type) {

packages/shared/apis/host/context.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { ILogger } from './../../logger'
22
import { EntityAction, LoadableScene } from './../../types'
33
import { PermissionItem } from '@dcl/protocol/out-ts/decentraland/kernel/apis/permissions.gen'
44
import { EventData } from '@dcl/protocol/out-ts/decentraland/kernel/apis/engine_api.gen'
5-
import { RpcClientPort } from '@dcl/rpc'
65

76
type WithRequired<T, K extends keyof T> = T & { [P in K]-?: T[P] }
87

@@ -23,7 +22,4 @@ export type PortContext = {
2322
sendSceneEvent<K extends keyof IEvents>(id: K, event: IEvents[K]): void
2423
sendProtoSceneEvent(event: EventData): void
2524
logger: ILogger
26-
27-
// port used for this specific scene in the renderer
28-
rendererPort: RpcClientPort
2925
}

packages/shared/comms/adapters/OfflineAdapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export class OfflineAdapter implements MinimumCommunicationsAdapter {
1010
async getVoiceHandler(): Promise<VoiceHandler> {
1111
return createOpusVoiceHandler()
1212
}
13-
async disconnect(_error?: Error | undefined): Promise<void> {}
14-
send(_data: Uint8Array, _hints: SendHints): void {}
13+
async disconnect(error?: Error | undefined): Promise<void> {}
14+
send(data: Uint8Array, hints: SendHints): void {}
1515
async connect(): Promise<void> {}
1616
}

packages/shared/comms/adapters/SimulatorAdapter.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ export class SimulationRoom implements RoomConnection {
4646
this.tick = setInterval(this.update.bind(this), 60)
4747
this.roomConnection = new Rfc4RoomConnection({
4848
events: mitt<CommsAdapterEvents>(),
49-
send(_data: Uint8Array, _hints: SendHints): void {},
49+
send(data: Uint8Array, hints: SendHints): void {},
5050
async connect(): Promise<void> {},
51-
async disconnect(_error?: Error): Promise<void> {},
51+
async disconnect(error?: Error): Promise<void> {},
5252
async getVoiceHandler() {
5353
throw new Error('not implemented')
5454
}
@@ -190,11 +190,11 @@ export class SimulationRoom implements RoomConnection {
190190
}
191191
}
192192

193-
async disconnect(_error?: Error | undefined): Promise<void> {
193+
async disconnect(error?: Error | undefined): Promise<void> {
194194
clearInterval(this.tick)
195195
}
196196

197-
send(_data: Uint8Array, _hints: SendHints): void {}
197+
send(data: Uint8Array, hints: SendHints): void {}
198198

199199
async connect(): Promise<void> {
200200
await Promise.all(new Array(100).fill(0).map(() => this.spawnPeer()))

0 commit comments

Comments
 (0)