diff options
Diffstat (limited to 'src/gateway/events')
-rw-r--r-- | src/gateway/events/Close.ts | 46 | ||||
-rw-r--r-- | src/gateway/events/Connection.ts | 94 | ||||
-rw-r--r-- | src/gateway/events/Message.ts | 61 |
3 files changed, 201 insertions, 0 deletions
diff --git a/src/gateway/events/Close.ts b/src/gateway/events/Close.ts new file mode 100644 index 00000000..5b7c512c --- /dev/null +++ b/src/gateway/events/Close.ts @@ -0,0 +1,46 @@ +import { WebSocket } from "@fosscord/gateway"; +import { + emitEvent, + PresenceUpdateEvent, + PrivateSessionProjection, + Session, + SessionsReplace, + User, +} from "@fosscord/util"; + +export async function Close(this: WebSocket, code: number, reason: string) { + console.log("[WebSocket] closed", code, reason); + if (this.heartbeatTimeout) clearTimeout(this.heartbeatTimeout); + if (this.readyTimeout) clearTimeout(this.readyTimeout); + this.deflate?.close(); + this.removeAllListeners(); + + if (this.session_id) { + await Session.delete({ session_id: this.session_id }); + const sessions = await Session.find({ + where: { user_id: this.user_id }, + select: PrivateSessionProjection, + }); + await emitEvent({ + event: "SESSIONS_REPLACE", + user_id: this.user_id, + data: sessions, + } as SessionsReplace); + const session = sessions.first() || { + activities: [], + client_info: {}, + status: "offline", + }; + + await emitEvent({ + event: "PRESENCE_UPDATE", + user_id: this.user_id, + data: { + user: await User.getPublicUser(this.user_id), + activities: session.activities, + client_status: session?.client_info, + status: session.status, + }, + } as PresenceUpdateEvent); + } +} diff --git a/src/gateway/events/Connection.ts b/src/gateway/events/Connection.ts new file mode 100644 index 00000000..508b4741 --- /dev/null +++ b/src/gateway/events/Connection.ts @@ -0,0 +1,94 @@ +import WS from "ws"; +import { WebSocket } from "@fosscord/gateway"; +import { Send } from "../util/Send"; +import { CLOSECODES, OPCODES } from "../util/Constants"; +import { setHeartbeat } from "../util/Heartbeat"; +import { IncomingMessage } from "http"; +import { Close } from "./Close"; +import { Message } from "./Message"; +import { createDeflate } from "zlib"; +import { URL } from "url"; +let erlpack: any; +try { + erlpack = require("@yukikaze-bot/erlpack"); +} catch (error) {} + +// TODO: check rate limit +// TODO: specify rate limit in config +// TODO: check msg max size + +export async function Connection( + this: WS.Server, + socket: WebSocket, + request: IncomingMessage +) { + try { + // @ts-ignore + socket.on("close", Close); + // @ts-ignore + socket.on("message", Message); + + if(process.env.WS_LOGEVENTS) + [ + "close", + "error", + "upgrade", + //"message", + "open", + "ping", + "pong", + "unexpected-response" + ].forEach(x=>{ + socket.on(x, y => console.log(x, y)); + }); + + console.log(`[Gateway] Connections: ${this.clients.size}`); + + const { searchParams } = new URL(`http://localhost${request.url}`); + // @ts-ignore + socket.encoding = searchParams.get("encoding") || "json"; + if (!["json", "etf"].includes(socket.encoding)) { + if (socket.encoding === "etf" && erlpack) { + throw new Error( + "Erlpack is not installed: 'npm i @yukikaze-bot/erlpack'" + ); + } + return socket.close(CLOSECODES.Decode_error); + } + + // @ts-ignore + socket.version = Number(searchParams.get("version")) || 8; + if (socket.version != 8) + return socket.close(CLOSECODES.Invalid_API_version); + + // @ts-ignore + socket.compress = searchParams.get("compress") || ""; + if (socket.compress) { + if (socket.compress !== "zlib-stream") + return socket.close(CLOSECODES.Decode_error); + socket.deflate = createDeflate({ chunkSize: 65535 }); + socket.deflate.on("data", (chunk) => socket.send(chunk)); + } + + socket.events = {}; + socket.member_events = {}; + socket.permissions = {}; + socket.sequence = 0; + + setHeartbeat(socket); + + await Send(socket, { + op: OPCODES.Hello, + d: { + heartbeat_interval: 1000 * 30, + }, + }); + + socket.readyTimeout = setTimeout(() => { + return socket.close(CLOSECODES.Session_timed_out); + }, 1000 * 30); + } catch (error) { + console.error(error); + return socket.close(CLOSECODES.Unknown_error); + } +} diff --git a/src/gateway/events/Message.ts b/src/gateway/events/Message.ts new file mode 100644 index 00000000..569f5fc7 --- /dev/null +++ b/src/gateway/events/Message.ts @@ -0,0 +1,61 @@ +import { CLOSECODES } from "../util/Constants"; +import { WebSocket, Payload } from "@fosscord/gateway"; +let erlpack: any; +try { + erlpack = require("@yukikaze-bot/erlpack"); +} catch (error) {} +import OPCodeHandlers from "../opcodes"; +import { check } from "../opcodes/instanceOf"; + +const PayloadSchema = { + op: Number, + $d: Object || Number, // or number for heartbeat sequence + $s: Number, + $t: String, +}; + +export async function Message(this: WebSocket, buffer: Buffer) { + // TODO: compression + let data: Payload; + + if (this.encoding === "etf" && buffer instanceof Buffer) + data = erlpack.unpack(buffer); + else if (this.encoding === "json") + data = JSON.parse(buffer as unknown as string); //TODO: is this even correct?? seems to work for web clients... + else if(/--debug|--inspect/.test(process.execArgv.join(' '))) { + debugger; + return; + } + else { + console.log("Invalid gateway connection! Use a debugger to inspect!"); + return; + } + + if(process.env.WS_VERBOSE) + console.log(`[Websocket] Incomming message: ${JSON.stringify(data)}`); + if(data.op !== 1) + check.call(this, PayloadSchema, data); + else { //custom validation for numbers, because heartbeat + if(data.s || data.t || (typeof data.d !== "number" && data.d)) { + console.log("Invalid heartbeat..."); + this.close(CLOSECODES.Decode_error); + } + } + + // @ts-ignore + const OPCodeHandler = OPCodeHandlers[data.op]; + if (!OPCodeHandler) { + console.error("[Gateway] Unkown opcode " + data.op); + // TODO: if all opcodes are implemented comment this out: + // this.close(CLOSECODES.Unknown_opcode); + return; + } + + try { + return await OPCodeHandler.call(this, data); + } catch (error) { + console.error(error); + if (!this.CLOSED && this.CLOSING) + return this.close(CLOSECODES.Unknown_error); + } +} |