|
| 1 | +import { OpCode } from "../deps.ts"; |
| 2 | +import { decoder } from "../encoding.ts"; |
| 3 | +import { closeSocket, closeSocketConnection } from "./close_socket.ts"; |
| 4 | +import { readFrame } from "./read_frame.ts"; |
| 5 | +import { sendFrame } from "./send_frame.ts"; |
| 6 | + |
| 7 | +import type { PogSocket } from "../socket.ts"; |
| 8 | +import type { WebSocketFrame } from "../deps.ts"; |
| 9 | + |
| 10 | +function getLength(frames: WebSocketFrame[]): number { |
| 11 | + return frames.reduce((length, frame) => length + frame.payload.length, 0); |
| 12 | +} |
| 13 | + |
| 14 | +function getCloseEvent(frame: WebSocketFrame, decode = decoder()): CloseEvent { |
| 15 | + return { |
| 16 | + // [0x12, 0x34] -> 0x1234 |
| 17 | + code: (frame.payload[0] << 8) | frame.payload[1], |
| 18 | + reason: decode(frame.payload.subarray(2, frame.payload.length)), |
| 19 | + type: "close" |
| 20 | + } |
| 21 | +} |
| 22 | + |
| 23 | +export async function* readSocket(socket: PogSocket): AsyncIterableIterator<PogSocketEvent> { |
| 24 | + const decode = decoder(); |
| 25 | + |
| 26 | + /* handle frames. */ |
| 27 | + let frames: WebSocketFrame[] = []; |
| 28 | + while (!socket.isClosed) { |
| 29 | + const frame = await readFrame(socket); |
| 30 | + if (!frame) { |
| 31 | + closeSocketConnection(socket); |
| 32 | + break; |
| 33 | + } |
| 34 | + |
| 35 | + switch (frame.opcode) { |
| 36 | + case OpCode.TextFrame: |
| 37 | + case OpCode.BinaryFrame: |
| 38 | + case OpCode.Continue: |
| 39 | + frames.push(frame); |
| 40 | + |
| 41 | + /* merge all of the received frames. */ |
| 42 | + if (frame.isLastFrame) { |
| 43 | + const message = new Uint8Array(getLength(frames)); |
| 44 | + let pos = 0; |
| 45 | + for (const frame of frames) { |
| 46 | + message.set(frame.payload, pos); |
| 47 | + pos += frame.payload.length; |
| 48 | + } |
| 49 | + |
| 50 | + yield { |
| 51 | + data: frames[0].opcode === OpCode.TextFrame |
| 52 | + ? decode(message) |
| 53 | + : message, |
| 54 | + type: "message" |
| 55 | + } |
| 56 | + |
| 57 | + frames = []; |
| 58 | + } |
| 59 | + |
| 60 | + break; |
| 61 | + case OpCode.Close: { |
| 62 | + const event = getCloseEvent(frame, decode); |
| 63 | + await closeSocket(socket, event.code, event.reason); |
| 64 | + yield event; |
| 65 | + break; |
| 66 | + } |
| 67 | + case OpCode.Ping: |
| 68 | + await sendFrame(socket, OpCode.Pong, frame.payload); |
| 69 | + yield { type: "ping", data: frame.payload } |
| 70 | + break; |
| 71 | + case OpCode.Pong: |
| 72 | + yield { type: "pong", data: frame.payload } |
| 73 | + break; |
| 74 | + } |
| 75 | + } |
| 76 | +} |
| 77 | + |
| 78 | +export type PogSocketEvent = MessageEvent | PingEvent | PongEvent | CloseEvent; |
| 79 | +export type PogSocketEventType = "message" | "ping" | "pong" | "close"; |
| 80 | + |
| 81 | +interface IPogSocketEvent { |
| 82 | + type: PogSocketEventType; |
| 83 | +} |
| 84 | + |
| 85 | +export interface MessageEvent extends IPogSocketEvent { |
| 86 | + type: "message"; |
| 87 | + data: string | Uint8Array; |
| 88 | +} |
| 89 | + |
| 90 | +export interface PingEvent extends IPogSocketEvent { |
| 91 | + type: "ping"; |
| 92 | + data: Uint8Array; |
| 93 | +} |
| 94 | + |
| 95 | +export interface PongEvent extends IPogSocketEvent { |
| 96 | + type: "pong"; |
| 97 | + data: Uint8Array; |
| 98 | +} |
| 99 | + |
| 100 | +export interface CloseEvent extends IPogSocketEvent { |
| 101 | + type: "close"; |
| 102 | + code: number; |
| 103 | + reason?: string; |
| 104 | +} |
| 105 | + |
0 commit comments