summary refs log tree commit diff
path: root/gateway/src/events
diff options
context:
space:
mode:
authorFlam3rboy <34555296+Flam3rboy@users.noreply.github.com>2021-08-12 20:18:05 +0200
committerFlam3rboy <34555296+Flam3rboy@users.noreply.github.com>2021-08-12 20:18:05 +0200
commitfa31e7f8db61efe085f7d8a317e6a8640ebb3f46 (patch)
tree5d12bfa42b9ed09e67935c1e6a5063babe33eeb8 /gateway/src/events
parentMerge branch 'master' into gateway (diff)
downloadserver-fa31e7f8db61efe085f7d8a317e6a8640ebb3f46.tar.xz
:sparkles: gateway
Diffstat (limited to 'gateway/src/events')
-rw-r--r--gateway/src/events/Close.ts6
-rw-r--r--gateway/src/events/Connection.ts62
-rw-r--r--gateway/src/events/Message.ts45
3 files changed, 113 insertions, 0 deletions
diff --git a/gateway/src/events/Close.ts b/gateway/src/events/Close.ts
new file mode 100644

index 00000000..f819b064 --- /dev/null +++ b/gateway/src/events/Close.ts
@@ -0,0 +1,6 @@ +import WebSocket from "ws"; +import { Message } from "./Message"; + +export function Close(this: WebSocket, code: number, reason: string) { + this.off("message", Message); +} diff --git a/gateway/src/events/Connection.ts b/gateway/src/events/Connection.ts new file mode 100644
index 00000000..1ef9fb48 --- /dev/null +++ b/gateway/src/events/Connection.ts
@@ -0,0 +1,62 @@ +import WebSocket, { Server } from "../util/WebSocket"; +import { IncomingMessage } from "http"; +import { Close } from "./Close"; +import { Message } from "./Message"; +import { setHeartbeat } from "../util/setHeartbeat"; +import { Send } from "../util/Send"; +import { CLOSECODES, OPCODES } from "../util/Constants"; +import { createDeflate } from "zlib"; +var erlpack: any; +try { + erlpack = require("erlpack"); +} catch (error) {} + +// TODO: check rate limit +// TODO: specify rate limit in config +// TODO: check msg max size + +export async function Connection(this: Server, socket: WebSocket, request: IncomingMessage) { + try { + socket.on("close", Close); + socket.on("message", Message); + + const { searchParams } = new URL(`http://localhost${request.url}`); + // @ts-ignore + socket.encoding = searchParams.get("encoding") || "json"; + if (!["json", "etf"].includes(socket.encoding)) { + if (socket.encoding === "etf" && erlpack) throw new Error("Erlpack is not installed: 'npm i -D erlpack'"); + return socket.close(CLOSECODES.Decode_error); + } + + // @ts-ignore + socket.version = Number(searchParams.get("version")) || 8; + if (socket.version != 8) return socket.close(CLOSECODES.Invalid_API_version); + + // @ts-ignore + socket.compress = searchParams.get("compress") || ""; + if (socket.compress) { + if (socket.compress !== "zlib-stream") return socket.close(CLOSECODES.Decode_error); + socket.deflate = createDeflate({ chunkSize: 65535 }); + socket.deflate.on("data", (chunk) => socket.send(chunk)); + } + + socket.permissions = {}; + socket.sequence = 0; + + setHeartbeat(socket); + + await Send(socket, { + op: OPCODES.Hello, + d: { + heartbeat_interval: 1000 * 30, + }, + }); + + socket.readyTimeout = setTimeout(() => { + return socket.close(CLOSECODES.Session_timed_out); + }, 1000 * 30); + } catch (error) { + console.error(error); + return socket.close(CLOSECODES.Unknown_error); + } +} diff --git a/gateway/src/events/Message.ts b/gateway/src/events/Message.ts new file mode 100644
index 00000000..2ca82b3c --- /dev/null +++ b/gateway/src/events/Message.ts
@@ -0,0 +1,45 @@ +import WebSocket, { Data } from "../util/WebSocket"; +var erlpack: any; +try { + erlpack = require("erlpack"); +} catch (error) {} +import OPCodeHandlers from "../opcodes"; +import { Payload, CLOSECODES } from "../util/Constants"; +import { instanceOf, Tuple } from "lambert-server"; +import { check } from "../opcodes/instanceOf"; + +const PayloadSchema = { + op: Number, + $d: new Tuple(Object, Number), // or number for heartbeat sequence + $s: Number, + $t: String, +}; + +export async function Message(this: WebSocket, buffer: Data) { + // TODO: compression + var data: Payload; + + if (this.encoding === "etf" && buffer instanceof Buffer) data = erlpack.unpack(buffer); + else if (this.encoding === "json" && typeof buffer === "string") data = JSON.parse(buffer); + else return; + + check.call(this, PayloadSchema, data); + + // @ts-ignore + const OPCodeHandler = OPCodeHandlers[data.op]; + if (!OPCodeHandler) { + console.error("Unknown_opcode: " + data.op); + // TODO: if all opcodes are implemented comment this out: + // this.close(CLOSECODES.Unknown_opcode); + return; + } + + console.log("got: " + OPCodeHandler.name); + + try { + return await OPCodeHandler.call(this, data); + } catch (error) { + console.error(error); + if (!this.CLOSED && this.CLOSING) return this.close(CLOSECODES.Unknown_error); + } +}