diff options
author | Madeline <46743919+MaddyUnderStars@users.noreply.github.com> | 2022-09-25 18:24:21 +1000 |
---|---|---|
committer | Madeline <46743919+MaddyUnderStars@users.noreply.github.com> | 2022-09-25 23:35:18 +1000 |
commit | 0d23eaba09a4878520bf346af4cead90d76829fc (patch) | |
tree | d930eacceff0b407b44abe55f01d8e3c5dfbfa34 /src/gateway/events | |
parent | Allow edited_timestamp to passthrough in handleMessage (diff) | |
download | server-0d23eaba09a4878520bf346af4cead90d76829fc.tar.xz |
Refactor to mono-repo + upgrade packages
Diffstat (limited to 'src/gateway/events')
-rw-r--r-- | src/gateway/events/Close.ts | 47 | ||||
-rw-r--r-- | src/gateway/events/Connection.ts | 84 | ||||
-rw-r--r-- | src/gateway/events/Message.ts | 74 |
3 files changed, 205 insertions, 0 deletions
diff --git a/src/gateway/events/Close.ts b/src/gateway/events/Close.ts new file mode 100644 index 00000000..40d9a6f7 --- /dev/null +++ b/src/gateway/events/Close.ts @@ -0,0 +1,47 @@ +import { WebSocket } from "@fosscord/gateway"; +import { + emitEvent, + PresenceUpdateEvent, + PrivateSessionProjection, + Session, + SessionsReplace, + User, +} from "@fosscord/util"; + +export async function Close(this: WebSocket, code: number, reason: string) { + console.log("[WebSocket] closed", code, reason); + if (this.heartbeatTimeout) clearTimeout(this.heartbeatTimeout); + if (this.readyTimeout) clearTimeout(this.readyTimeout); + this.deflate?.close(); + this.inflate?.close(); + this.removeAllListeners(); + + if (this.session_id) { + await Session.delete({ session_id: this.session_id }); + const sessions = await Session.find({ + where: { user_id: this.user_id }, + select: PrivateSessionProjection, + }); + await emitEvent({ + event: "SESSIONS_REPLACE", + user_id: this.user_id, + data: sessions, + } as SessionsReplace); + const session = sessions.first() || { + activities: [], + client_info: {}, + status: "offline", + }; + + await emitEvent({ + event: "PRESENCE_UPDATE", + user_id: this.user_id, + data: { + user: await User.getPublicUser(this.user_id), + activities: session.activities, + client_status: session?.client_info, + status: session.status, + }, + } as PresenceUpdateEvent); + } +} diff --git a/src/gateway/events/Connection.ts b/src/gateway/events/Connection.ts new file mode 100644 index 00000000..bed3cf44 --- /dev/null +++ b/src/gateway/events/Connection.ts @@ -0,0 +1,84 @@ +import WS from "ws"; +import { WebSocket } from "@fosscord/gateway"; +import { Send } from "../util/Send"; +import { CLOSECODES, OPCODES } from "../util/Constants"; +import { setHeartbeat } from "../util/Heartbeat"; +import { IncomingMessage } from "http"; +import { Close } from "./Close"; +import { Message } from "./Message"; +import { Deflate, Inflate } from "fast-zlib"; +import { URL } from "url"; +import { Config } from "@fosscord/util"; +var erlpack: any; +try { + erlpack = require("@yukikaze-bot/erlpack"); +} catch (error) {} + +// TODO: check rate limit +// TODO: specify rate limit in config +// TODO: check msg max size + +export async function Connection( + this: WS.Server, + socket: WebSocket, + request: IncomingMessage +) { + const forwardedFor = Config.get().security.forwadedFor; + const ipAddress = forwardedFor ? request.headers[forwardedFor] as string : request.socket.remoteAddress; + + socket.ipAddress = ipAddress; + + try { + // @ts-ignore + socket.on("close", Close); + // @ts-ignore + socket.on("message", Message); + console.log(`[Gateway] New connection from ${socket.ipAddress}, total ${this.clients.size}`); + + const { searchParams } = new URL(`http://localhost${request.url}`); + // @ts-ignore + socket.encoding = searchParams.get("encoding") || "json"; + if (!["json", "etf"].includes(socket.encoding)) { + if (socket.encoding === "etf" && erlpack) { + throw new Error( + "Erlpack is not installed: 'npm i @yukikaze-bot/erlpack'" + ); + } + return socket.close(CLOSECODES.Decode_error); + } + + socket.version = Number(searchParams.get("version")) || 8; + if (socket.version != 8) + return socket.close(CLOSECODES.Invalid_API_version); + + // @ts-ignore + socket.compress = searchParams.get("compress") || ""; + if (socket.compress) { + if (socket.compress !== "zlib-stream") + return socket.close(CLOSECODES.Decode_error); + socket.deflate = new Deflate(); + socket.inflate = new Inflate(); + } + + socket.events = {}; + socket.member_events = {}; + socket.permissions = {}; + socket.sequence = 0; + + setHeartbeat(socket); + + await Send(socket, { + op: OPCODES.Hello, + d: { + heartbeat_interval: 1000 * 30, + }, + }); + + socket.readyTimeout = setTimeout(() => { + return socket.close(CLOSECODES.Session_timed_out); + }, 1000 * 30); + } catch (error) { + console.error(error); + return socket.close(CLOSECODES.Unknown_error); + } +} diff --git a/src/gateway/events/Message.ts b/src/gateway/events/Message.ts new file mode 100644 index 00000000..db7dbad2 --- /dev/null +++ b/src/gateway/events/Message.ts @@ -0,0 +1,74 @@ +import { CLOSECODES, OPCODES } from "../util/Constants"; +import { WebSocket, Payload } from "@fosscord/gateway"; +var erlpack: any; +try { + erlpack = require("@yukikaze-bot/erlpack"); +} catch (error) { } +import OPCodeHandlers from "../opcodes"; +import { Tuple } from "lambert-server"; +import { check } from "../opcodes/instanceOf"; +import WS from "ws"; +import BigIntJson from "json-bigint"; +import * as Sentry from "@sentry/node"; +const bigIntJson = BigIntJson({ storeAsString: true }); + +const PayloadSchema = { + op: Number, + $d: new Tuple(Object, Number), // or number for heartbeat sequence + $s: Number, + $t: String, +}; + +export async function Message(this: WebSocket, buffer: WS.Data) { + // TODO: compression + var data: Payload; + + if (this.encoding === "etf" && buffer instanceof Buffer) + data = erlpack.unpack(buffer); + else if (this.encoding === "json" && buffer instanceof Buffer) { + if (this.inflate) { + try { + buffer = this.inflate.process(buffer) as any; + } catch { + buffer = buffer.toString() as any; + } + } + data = bigIntJson.parse(buffer as string); + } + else if (typeof buffer == "string") { + data = bigIntJson.parse(buffer as string) + } + else return; + + check.call(this, PayloadSchema, data); + + // @ts-ignore + const OPCodeHandler = OPCodeHandlers[data.op]; + if (!OPCodeHandler) { + console.error("[Gateway] Unkown opcode " + data.op); + // TODO: if all opcodes are implemented comment this out: + // this.close(CLOSECODES.Unknown_opcode); + return; + } + + // const transaction = Sentry.startTransaction({ + // op: OPCODES[data.op], + // name: `GATEWAY ${OPCODES[data.op]}`, + // data: { + // ...data.d, + // token: data?.d?.token ? "[Redacted]" : undefined, + // }, + // }); + + try { + var ret = await OPCodeHandler.call(this, data); + // transaction.finish(); + return ret; + } catch (error) { + Sentry.captureException(error); + // transaction.finish(); + console.error(`Error: Op ${data.op}`, error); + // if (!this.CLOSED && this.CLOSING) + return this.close(CLOSECODES.Unknown_error); + } +} |