diff options
Diffstat (limited to 'gateway/src')
27 files changed, 1358 insertions, 0 deletions
diff --git a/gateway/src/Server.ts b/gateway/src/Server.ts new file mode 100644 index 00000000..d4b3271c --- /dev/null +++ b/gateway/src/Server.ts @@ -0,0 +1,55 @@ +import "missing-native-js-functions"; +import dotenv from "dotenv"; +dotenv.config(); +import { Config, db, RabbitMQ } from "@fosscord/server-util"; +import { Server as WebSocketServer } from "ws"; +import { Connection } from "./events/Connection"; +import http from "http"; + +export class Server { + public ws: WebSocketServer; + public port: number; + public server: http.Server; + public production: boolean; + + constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) { + this.port = port; + this.production = production || false; + + if (server) this.server = server; + else + this.server = http.createServer(function (req, res) { + res.writeHead(200).end("Online"); + }); + + this.ws = new WebSocketServer({ + maxPayload: 4096, + server: this.server, + }); + this.ws.on("connection", Connection); + this.ws.on("error", console.error); + } + + async setupSchema() { + // TODO: adjust expireAfterSeconds -> lower + await Promise.all([db.collection("events").createIndex({ created_at: 1 }, { expireAfterSeconds: 60 })]); + } + + async start(): Promise<void> { + // @ts-ignore + await (db as Promise<Connection>); + await this.setupSchema(); + await Config.init(); + await RabbitMQ.init(); + console.log("[Database] connected"); + if (!this.server.listening) { + this.server.listen(this.port); + console.log(`[Gateway] online on 0.0.0.0:${this.port}`); + } + } + + async stop() { + await db.close(); + this.server.close(); + } +} diff --git a/gateway/src/events/Close.ts b/gateway/src/events/Close.ts new file mode 100644 index 00000000..f819b064 --- /dev/null +++ b/gateway/src/events/Close.ts @@ -0,0 +1,6 @@ +import WebSocket from "ws"; +import { Message } from "./Message"; + +export function Close(this: WebSocket, code: number, reason: string) { + this.off("message", Message); +} diff --git a/gateway/src/events/Connection.ts b/gateway/src/events/Connection.ts new file mode 100644 index 00000000..1ef9fb48 --- /dev/null +++ b/gateway/src/events/Connection.ts @@ -0,0 +1,62 @@ +import WebSocket, { Server } from "../util/WebSocket"; +import { IncomingMessage } from "http"; +import { Close } from "./Close"; +import { Message } from "./Message"; +import { setHeartbeat } from "../util/setHeartbeat"; +import { Send } from "../util/Send"; +import { CLOSECODES, OPCODES } from "../util/Constants"; +import { createDeflate } from "zlib"; +var erlpack: any; +try { + erlpack = require("erlpack"); +} catch (error) {} + +// TODO: check rate limit +// TODO: specify rate limit in config +// TODO: check msg max size + +export async function Connection(this: Server, socket: WebSocket, request: IncomingMessage) { + try { + socket.on("close", Close); + socket.on("message", Message); + + const { searchParams } = new URL(`http://localhost${request.url}`); + // @ts-ignore + socket.encoding = searchParams.get("encoding") || "json"; + if (!["json", "etf"].includes(socket.encoding)) { + if (socket.encoding === "etf" && erlpack) throw new Error("Erlpack is not installed: 'npm i -D erlpack'"); + return socket.close(CLOSECODES.Decode_error); + } + + // @ts-ignore + socket.version = Number(searchParams.get("version")) || 8; + if (socket.version != 8) return socket.close(CLOSECODES.Invalid_API_version); + + // @ts-ignore + socket.compress = searchParams.get("compress") || ""; + if (socket.compress) { + if (socket.compress !== "zlib-stream") return socket.close(CLOSECODES.Decode_error); + socket.deflate = createDeflate({ chunkSize: 65535 }); + socket.deflate.on("data", (chunk) => socket.send(chunk)); + } + + socket.permissions = {}; + socket.sequence = 0; + + setHeartbeat(socket); + + await Send(socket, { + op: OPCODES.Hello, + d: { + heartbeat_interval: 1000 * 30, + }, + }); + + socket.readyTimeout = setTimeout(() => { + return socket.close(CLOSECODES.Session_timed_out); + }, 1000 * 30); + } catch (error) { + console.error(error); + return socket.close(CLOSECODES.Unknown_error); + } +} diff --git a/gateway/src/events/Message.ts b/gateway/src/events/Message.ts new file mode 100644 index 00000000..2ca82b3c --- /dev/null +++ b/gateway/src/events/Message.ts @@ -0,0 +1,45 @@ +import WebSocket, { Data } from "../util/WebSocket"; +var erlpack: any; +try { + erlpack = require("erlpack"); +} catch (error) {} +import OPCodeHandlers from "../opcodes"; +import { Payload, CLOSECODES } from "../util/Constants"; +import { instanceOf, Tuple } from "lambert-server"; +import { check } from "../opcodes/instanceOf"; + +const PayloadSchema = { + op: Number, + $d: new Tuple(Object, Number), // or number for heartbeat sequence + $s: Number, + $t: String, +}; + +export async function Message(this: WebSocket, buffer: Data) { + // TODO: compression + var data: Payload; + + if (this.encoding === "etf" && buffer instanceof Buffer) data = erlpack.unpack(buffer); + else if (this.encoding === "json" && typeof buffer === "string") data = JSON.parse(buffer); + else return; + + check.call(this, PayloadSchema, data); + + // @ts-ignore + const OPCodeHandler = OPCodeHandlers[data.op]; + if (!OPCodeHandler) { + console.error("Unknown_opcode: " + data.op); + // TODO: if all opcodes are implemented comment this out: + // this.close(CLOSECODES.Unknown_opcode); + return; + } + + console.log("got: " + OPCodeHandler.name); + + try { + return await OPCodeHandler.call(this, data); + } catch (error) { + console.error(error); + if (!this.CLOSED && this.CLOSING) return this.close(CLOSECODES.Unknown_error); + } +} diff --git a/gateway/src/index.ts b/gateway/src/index.ts new file mode 100644 index 00000000..7513bd2f --- /dev/null +++ b/gateway/src/index.ts @@ -0,0 +1 @@ +export * from "./Server"; diff --git a/gateway/src/listener/listener.ts b/gateway/src/listener/listener.ts new file mode 100644 index 00000000..6a6967d6 --- /dev/null +++ b/gateway/src/listener/listener.ts @@ -0,0 +1,382 @@ +import { + db, + Event, + MongooseCache, + UserModel, + getPermission, + Permissions, + ChannelModel, + RabbitMQ, + EVENT, +} from "@fosscord/server-util"; +import { OPCODES } from "../util/Constants"; +import { Send } from "../util/Send"; +import WebSocket from "../util/WebSocket"; +import "missing-native-js-functions"; +import { ConsumeMessage } from "amqplib"; + +// TODO: close connection on Invalidated Token +// TODO: check intent +// TODO: Guild Member Update is sent for current-user updates regardless of whether the GUILD_MEMBERS intent is set. +// ? How to resubscribe MongooseCache for new dm channel events? Maybe directly send them to the user_id regardless of the channel_id? -> max overhead of creating 10 events in database for dm user group. Or a new field in event -> recipient_ids? + +// Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards +// https://discord.com/developers/docs/topics/gateway#sharding + +export interface DispatchOpts { + eventStream: MongooseCache; + guilds: Array<string>; +} + +function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) { + if (this.shard_count) { + guilds = guilds.filter((x) => (BigInt(x) >> 22n) % this.shard_count! === this.shard_id); + } + + return [ + { + $match: { + $or: [ + { "fullDocument.guild_id": { $in: guilds } }, + { "fullDocument.user_id": this.user_id }, + { "fullDocument.channel_id": { $in: channels } }, + ], + }, + }, + ]; +} + +async function rabbitListen(this: WebSocket, id: string) { + await this.rabbitCh!.assertExchange(id, "fanout", { durable: false }); + const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true }); + + this.rabbitCh!.bindQueue(q.queue, id, ""); + this.rabbitCh!.consume(q.queue, consume.bind(this), { + noAck: false, + }); + this.rabbitCh!.queues[id] = q.queue; +} + +// TODO: use already required guilds/channels of Identify and don't fetch them again +export async function setupListener(this: WebSocket) { + const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); + const channels = await ChannelModel.find( + { $or: [{ recipient_ids: this.user_id }, { guild_id: { $in: user.guilds } }] }, + { id: true, permission_overwrites: true } + ).exec(); + const dm_channels = channels.filter((x) => !x.guild_id); + const guild_channels = channels.filter((x) => x.guild_id); + + if (RabbitMQ.connection) { + // @ts-ignore + this.rabbitCh = await RabbitMQ.connection.createChannel(); + this.rabbitCh!.queues = {}; + + rabbitListen.call(this, this.user_id); + + for (const channel of dm_channels) { + rabbitListen.call(this, channel.id); + } + for (const guild of user.guilds) { + // contains guild and dm channels + + getPermission(this.user_id, guild) + .then((x) => { + this.permissions[guild] = x; + rabbitListen.call(this, guild); + for (const channel of guild_channels) { + if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { + rabbitListen.call(this, channel.id); + } + } + }) + .catch((e) => {}); + } + + this.once("close", () => { + this.rabbitCh!.close(); + }); + } else { + const eventStream = new MongooseCache( + db.collection("events"), + getPipeline.call( + this, + user.guilds, + channels.map((x) => x.id) + ), + { + onlyEvents: true, + } + ); + + await eventStream.init(); + eventStream.on("insert", (document: Event) => + dispatch.call(this, document, { eventStream, guilds: user.guilds }) + ); + + this.once("close", () => eventStream.destroy()); + } +} + +// TODO: use rabbitmq to only receive events that are included in intents +function consume(this: WebSocket, opts: ConsumeMessage | null) { + if (!opts) return; + if (!this.rabbitCh) return; + const data = JSON.parse(opts.content.toString()); + const id = data.id as string; + const event = opts.properties.type as EVENT; + const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm + + console.log("rabbitmq event", event); + + // subscription managment + switch (event) { + case "CHANNEL_DELETE": + case "GUILD_DELETE": + this.rabbitCh.cancel(id); + break; + case "CHANNEL_CREATE": + // TODO: check if user has permission to channel + case "GUILD_CREATE": + rabbitListen.call(this, id); + break; + case "CHANNEL_UPDATE": + const queue_id = this.rabbitCh.queues[id]; + // @ts-ignore + const exists = this.rabbitCh.consumers[id]; + if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { + if (exists) break; + rabbitListen.call(this, id); + } else { + if (!exists) break; + this.rabbitCh.cancel(queue_id); + this.rabbitCh.unbindQueue(queue_id, id, ""); + } + break; + } + + // permission checking + switch (event) { + case "INVITE_CREATE": + case "INVITE_DELETE": + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + case "PRESENCE_UPDATE": // exception if user is friend + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + Send(this, { + op: OPCODES.Dispatch, + t: event, + d: data, + s: this.sequence++, + }); + this.rabbitCh.ack(opts); +} + +// TODO: cache permission +// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries. +// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes. +// TODO: only subscribe for events that are in the connection intents +// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes) + +export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) { + var permission = new Permissions("ADMINISTRATOR"); // default permission for dms + console.log("event", document); + var channel_id = document.channel_id || document.data?.channel_id; + // TODO: clean up + if (document.event === "GUILD_CREATE") { + guilds.push(document.data.id); + eventStream.changeStream(getPipeline.call(this, guilds)); + } else if (document.event === "GUILD_DELETE") { + guilds.remove(document.guild_id!); + eventStream.changeStream(getPipeline.call(this, guilds)); + } else if (document.event === "CHANNEL_DELETE") channel_id = null; + if (document.guild_id && !this.intents.has("GUILDS")) return; + + try { + permission = await getPermission(this.user_id, document.guild_id, channel_id); + } catch (e) { + permission = new Permissions(); + } + + // check intents: https://discord.com/developers/docs/topics/gateway#gateway-intents + switch (document.event) { + case "GUILD_DELETE": + case "GUILD_CREATE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + // gets sent if GUILDS intent is set (already checked in if document.guild_id) + break; + case "GUILD_INTEGRATIONS_UPDATE": + if (!this.intents.has("GUILD_INTEGRATIONS")) return; + break; + case "WEBHOOKS_UPDATE": + if (!this.intents.has("GUILD_WEBHOOKS")) return; + break; + case "GUILD_EMOJI_UPDATE": + if (!this.intents.has("GUILD_EMOJIS")) return; + break; + // only send them, if the user subscribed for this part of the member list, or is a bot + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + if (!this.intents.has("GUILD_MEMBERS")) return; + break; + case "VOICE_STATE_UPDATE": + if (!this.intents.has("GUILD_VOICE_STATES")) return; + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!this.intents.has("GUILD_BANS")) return; + break; + case "INVITE_CREATE": + case "INVITE_DELETE": + if (!this.intents.has("GUILD_INVITES")) return; + case "PRESENCE_UPDATE": + if (!this.intents.has("GUILD_PRESENCES")) return; + break; + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + if (!this.intents.has("GUILD_MESSAGES") && document.guild_id) return; + if (!this.intents.has("DIRECT_MESSAGES") && !document.guild_id) return; + break; + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + if (!this.intents.has("GUILD_MESSAGE_REACTIONS") && document.guild_id) return; + if (!this.intents.has("DIRECT_MESSAGE_REACTIONS") && !document.guild_id) return; + break; + + case "TYPING_START": + if (!this.intents.has("GUILD_MESSAGE_TYPING") && document.guild_id) return; + if (!this.intents.has("DIRECT_MESSAGE_TYPING") && !document.guild_id) return; + break; + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // Any events not defined in an intent are considered "passthrough" and will always be sent to you. + break; + } + + // check permissions + switch (document.event) { + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "INVITE_CREATE": + case "INVITE_DELETE": + if (!permission.has("MANAGE_GUILD")) break; + case "PRESENCE_UPDATE": + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + return Send(this, { + op: OPCODES.Dispatch, + t: document.event, + d: document.data, + s: this.sequence++, + }); +} diff --git a/gateway/src/opcodes/Heartbeat.ts b/gateway/src/opcodes/Heartbeat.ts new file mode 100644 index 00000000..015257b9 --- /dev/null +++ b/gateway/src/opcodes/Heartbeat.ts @@ -0,0 +1,12 @@ +import { CLOSECODES, Payload } from "../util/Constants"; +import { Send } from "../util/Send"; +import { setHeartbeat } from "../util/setHeartbeat"; +import WebSocket from "../util/WebSocket"; + +export async function onHeartbeat(this: WebSocket, data: Payload) { + // TODO: validate payload + + setHeartbeat(this); + + await Send(this, { op: 11 }); +} diff --git a/gateway/src/opcodes/Identify.ts b/gateway/src/opcodes/Identify.ts new file mode 100644 index 00000000..43368367 --- /dev/null +++ b/gateway/src/opcodes/Identify.ts @@ -0,0 +1,174 @@ +import { CLOSECODES, Payload, OPCODES } from "../util/Constants"; +import WebSocket from "../util/WebSocket"; +import { + ChannelModel, + checkToken, + GuildModel, + Intents, + MemberDocument, + MemberModel, + ReadyEventData, + UserModel, + toObject, + EVENTEnum, + Config, +} from "@fosscord/server-util"; +import { setupListener } from "../listener/listener"; +import { IdentifySchema } from "../schema/Identify"; +import { Send } from "../util/Send"; +// import experiments from "./experiments.json"; +const experiments: any = []; +import { check } from "./instanceOf"; + +// TODO: bot sharding +// TODO: check priviliged intents +// TODO: check if already identified + +export async function onIdentify(this: WebSocket, data: Payload) { + clearTimeout(this.readyTimeout); + check.call(this, IdentifySchema, data.d); + + const identify: IdentifySchema = data.d; + + try { + const { jwtSecret } = Config.get().security; + var { decoded } = await checkToken(identify.token, jwtSecret); // will throw an error if invalid + } catch (error) { + console.error("invalid token", error); + return this.close(CLOSECODES.Authentication_failed); + } + this.user_id = decoded.id; + if (!identify.intents) identify.intents = 0b11111111111111n; + this.intents = new Intents(identify.intents); + if (identify.shard) { + this.shard_id = identify.shard[0]; + this.shard_count = identify.shard[1]; + if ( + !this.shard_count || + !this.shard_id || + this.shard_id >= this.shard_count || + this.shard_id < 0 || + this.shard_count <= 0 + ) { + return this.close(CLOSECODES.Invalid_shard); + } + } + + const members = toObject(await MemberModel.find({ id: this.user_id }).exec()); + const merged_members = members.map((x: any) => { + const y = { ...x, user_id: x.id }; + delete y.settings; + delete y.id; + return [y]; + }) as MemberDocument[][]; + const user_guild_settings_entries = members.map((x) => x.settings); + + const channels = await ChannelModel.find({ recipient_ids: this.user_id }).exec(); + const user = await UserModel.findOne({ id: this.user_id }).exec(); + if (!user) return this.close(CLOSECODES.Authentication_failed); + + const public_user = { + username: user.username, + discriminator: user.discriminator, + id: user.id, + public_flags: user.public_flags, + avatar: user.avatar, + bot: user.bot, + }; + + const guilds = await GuildModel.find({ id: { $in: user.guilds } }) + .populate({ path: "joined_at", match: { id: this.user_id } }) + .exec(); + + const privateUser = { + avatar: user.avatar, + mobile: user.mobile, + desktop: user.desktop, + discriminator: user.discriminator, + email: user.email, + flags: user.flags, + id: user.id, + mfa_enabled: user.mfa_enabled, + nsfw_allowed: user.nsfw_allowed, + phone: user.phone, + premium: user.premium, + premium_type: user.premium_type, + public_flags: user.public_flags, + username: user.username, + verified: user.verified, + bot: user.bot, + accent_color: user.accent_color || 0, + banner: user.banner, + }; + + const d: ReadyEventData = { + v: 8, + user: privateUser, + user_settings: user.user_settings, + // @ts-ignore + guilds: toObject(guilds).map((x) => { + // @ts-ignore + x.guild_hashes = { + channels: { omitted: false, hash: "y4PV2fZ0gmo" }, + metadata: { omitted: false, hash: "bs1/ckvud3Y" }, + roles: { omitted: false, hash: "SxA+c5CaYpo" }, + version: 1, + }; + return x; + }), + guild_experiments: [], // TODO + geo_ordered_rtc_regions: [], // TODO + relationships: user.user_data.relationships, + read_state: { + // TODO + entries: [], + partial: false, + version: 304128, + }, + user_guild_settings: { + entries: user_guild_settings_entries, + partial: false, // TODO partial + version: 642, + }, + // @ts-ignore + private_channels: toObject(channels).map((x: ChannelDocument) => { + x.recipient_ids = x.recipients.map((y: any) => y.id); + delete x.recipients; + return x; + }), + session_id: "", // TODO + analytics_token: "", // TODO + connected_accounts: [], // TODO + consents: { + personalization: { + consented: false, // TODO + }, + }, + country_code: user.user_settings.locale, + friend_suggestion_count: 0, // TODO + // @ts-ignore + experiments: experiments, // TODO + guild_join_requests: [], // TODO what is this? + users: [ + public_user, + ...toObject(channels) + .map((x: any) => x.recipients) + .flat(), + ].unique(), // TODO + merged_members: merged_members, + // shard // TODO: only for bots sharding + // application // TODO for applications + }; + + console.log("Send ready"); + + // TODO: send real proper data structure + await Send(this, { + op: OPCODES.Dispatch, + t: EVENTEnum.Ready, + s: this.sequence++, + d, + }); + + await setupListener.call(this); +} diff --git a/gateway/src/opcodes/LazyRequest.ts b/gateway/src/opcodes/LazyRequest.ts new file mode 100644 index 00000000..b1d553b9 --- /dev/null +++ b/gateway/src/opcodes/LazyRequest.ts @@ -0,0 +1,105 @@ +// @ts-nocheck WIP +import { + db, + getPermission, + MemberModel, + MongooseCache, + PublicUserProjection, + RoleModel, + toObject, +} from "@fosscord/server-util"; +import { LazyRequest } from "../schema/LazyRequest"; +import { OPCODES, Payload } from "../util/Constants"; +import { Send } from "../util/Send"; +import WebSocket from "../util/WebSocket"; +import { check } from "./instanceOf"; + +// TODO: check permission and only show roles/members that have access to this channel +// TODO: config: if want to list all members (even those who are offline) sorted by role, or just those who are online + +export async function onLazyRequest(this: WebSocket, { d }: Payload) { + // TODO: check data + check.call(this, LazyRequest, d); + const { guild_id, typing, channels, activities } = d as LazyRequest; + + const permissions = await getPermission(this.user_id, guild_id); + permissions.hasThrow("VIEW_CHANNEL"); + + // MongoDB query to retrieve all hoisted roles and join them with the members and users collection + const roles = toObject( + await db + .collection("roles") + .aggregate([ + { + $match: { + guild_id, + // hoist: true // TODO: also match @everyone role + }, + }, + { $sort: { position: 1 } }, + { + $lookup: { + from: "members", + let: { id: "$id" }, + pipeline: [ + { $match: { $expr: { $in: ["$$id", "$roles"] } } }, + { $limit: 100 }, + { + $lookup: { + from: "users", + let: { user_id: "$id" }, + pipeline: [ + { $match: { $expr: { $eq: ["$id", "$$user_id"] } } }, + { $project: PublicUserProjection }, + ], + as: "user", + }, + }, + { + $unwind: "$user", + }, + ], + as: "members", + }, + }, + ]) + .toArray() + ); + + const groups = roles.map((x) => ({ id: x.id === guild_id ? "online" : x.id, count: x.members.length })); + const member_count = roles.reduce((a, b) => b.members.length + a, 0); + const items = []; + + for (const role of roles) { + items.push({ + group: { + count: role.members.length, + id: role.id === guild_id ? "online" : role.name, + }, + }); + for (const member of role.members) { + member.roles.remove(guild_id); + items.push({ member }); + } + } + + return Send(this, { + op: OPCODES.Dispatch, + s: this.sequence++, + t: "GUILD_MEMBER_LIST_UPDATE", + d: { + ops: [ + { + range: [0, 99], + op: "SYNC", + items, + }, + ], + online_count: member_count, // TODO count online count + member_count, + id: "everyone", + guild_id, + groups, + }, + }); +} diff --git a/gateway/src/opcodes/PresenceUpdate.ts b/gateway/src/opcodes/PresenceUpdate.ts new file mode 100644 index 00000000..3760f8a3 --- /dev/null +++ b/gateway/src/opcodes/PresenceUpdate.ts @@ -0,0 +1,6 @@ +import { CLOSECODES, Payload } from "../util/Constants"; +import WebSocket from "../util/WebSocket"; + +export function onPresenceUpdate(this: WebSocket, data: Payload) { + // return this.close(CLOSECODES.Unknown_error); +} diff --git a/gateway/src/opcodes/RequestGuildMembers.ts b/gateway/src/opcodes/RequestGuildMembers.ts new file mode 100644 index 00000000..2701d978 --- /dev/null +++ b/gateway/src/opcodes/RequestGuildMembers.ts @@ -0,0 +1,7 @@ +import { CLOSECODES, Payload } from "../util/Constants"; + +import WebSocket from "../util/WebSocket"; + +export function onRequestGuildMembers(this: WebSocket, data: Payload) { + // return this.close(CLOSECODES.Unknown_error); +} diff --git a/gateway/src/opcodes/Resume.ts b/gateway/src/opcodes/Resume.ts new file mode 100644 index 00000000..4efde9b0 --- /dev/null +++ b/gateway/src/opcodes/Resume.ts @@ -0,0 +1,14 @@ +import { CLOSECODES, Payload } from "../util/Constants"; +import { Send } from "../util/Send"; + +import WebSocket from "../util/WebSocket"; + +export async function onResume(this: WebSocket, data: Payload) { + console.log("Got Resume -> cancel not implemented"); + await Send(this, { + op: 9, + d: false, + }); + + // return this.close(CLOSECODES.Invalid_session); +} diff --git a/gateway/src/opcodes/VoiceStateUpdate.ts b/gateway/src/opcodes/VoiceStateUpdate.ts new file mode 100644 index 00000000..0d51513d --- /dev/null +++ b/gateway/src/opcodes/VoiceStateUpdate.ts @@ -0,0 +1,26 @@ +import { VoiceStateUpdateSchema } from "../schema/VoiceStateUpdate.ts"; +import { CLOSECODES, Payload } from "../util/Constants"; +import { Send } from "../util/Send"; + +import WebSocket from "../util/WebSocket"; +import { check } from "./instanceOf"; +// TODO: implementation +// TODO: check if a voice server is setup +// TODO: save voice servers in database and retrieve them +// Notice: Bot users respect the voice channel's user limit, if set. When the voice channel is full, you will not receive the Voice State Update or Voice Server Update events in response to your own Voice State Update. Having MANAGE_CHANNELS permission bypasses this limit and allows you to join regardless of the channel being full or not. + +export async function onVoiceStateUpdate(this: WebSocket, data: Payload) { + check.call(this, VoiceStateUpdateSchema, data.d); + const body = data.d as VoiceStateUpdateSchema; + + await Send(this, { + op: 0, + s: this.sequence++, + t: "VOICE_SERVER_UPDATE", + d: { + token: ``, + guild_id: body.guild_id, + endpoint: `localhost:3004`, + }, + }); +} diff --git a/gateway/src/opcodes/experiments.json b/gateway/src/opcodes/experiments.json new file mode 100644 index 00000000..0370b5da --- /dev/null +++ b/gateway/src/opcodes/experiments.json @@ -0,0 +1,76 @@ +[ + [4047587481, 0, 0, -1, 0], + [1509401575, 0, 1, -1, 0], + [1865079242, 0, 1, -1, 0], + [1962538549, 1, 0, -1, 0], + [3816091942, 3, 2, -1, 0], + [4130837190, 0, 10, -1, 0], + [1861568052, 0, 1, -1, 0], + [2290910058, 6, 2, -1, 0], + [1578940118, 1, 1, -1, 0], + [1571676964, 0, 1, -1, 2], + [3640172371, 0, 2, -1, 2], + [1658164312, 2, 1, -1, 0], + [98883956, 1, 1, -1, 0], + [3114091169, 0, 1, -1, 0], + [2570684145, 4, 1, -1, 2], + [4007615411, 0, 1, -1, 0], + [3665310159, 2, 1, -1, 1], + [852550504, 3, 1, -1, 0], + [2333572067, 0, 1, -1, 0], + [935994771, 1, 1, -1, 0], + [1127795596, 1, 1, -1, 0], + [4168223991, 0, 1, -1, 0], + [18585280, 0, 1, -1, 1], + [327482016, 0, 1, -1, 2], + [3458098201, 7, 1, -1, 0], + [478613943, 2, 1, -1, 1], + [2792197902, 0, 1, -1, 2], + [284670956, 0, 1, -1, 0], + [2099185390, 0, 1, -1, 0], + [1202202685, 0, 1, -1, 0], + [2122174751, 0, 1, -1, 0], + [3633864632, 0, 1, -1, 0], + [3103053065, 0, 1, -1, 0], + [820624960, 0, 1, -1, 0], + [1134479292, 0, 1, -1, 0], + [2511257455, 3, 1, -1, 3], + [2599708267, 0, 1, -1, 0], + [613180822, 1, 1, -1, 0], + [2885186814, 0, 1, -1, 0], + [221503477, 0, 1, -1, 0], + [1054317075, 0, 1, -1, 3], + [683872522, 0, 1, -1, 1], + [1739278764, 0, 2, -1, 0], + [2855249023, 0, 1, -1, 0], + [3721841948, 0, 1, -1, 0], + [1285203515, 0, 1, -1, 0], + [1365487849, 6, 1, -1, 0], + [955229746, 0, 1, -1, 0], + [3128009767, 0, 10, -1, 0], + [441885003, 0, 1, -1, 0], + [3433971238, 0, 1, -1, 2], + [1038765354, 3, 1, -1, 0], + [1174347196, 0, 1, -1, 0], + [3649806352, 1, 1, -1, 0], + [2973729510, 2, 1, -1, 0], + [2571931329, 1, 6, -1, 0], + [3884442008, 0, 1, -1, 0], + [978673395, 1, 1, -1, 0], + [4050927174, 0, 1, -1, 0], + [1260103069, 0, 1, -1, 0], + [4168894280, 0, 1, -1, 0], + [4045587091, 0, 1, -1, 0], + [2003494159, 1, 1, -1, 0], + [51193042, 0, 1, -1, 0], + [2634540382, 3, 1, -1, 0], + [886364171, 0, 1, -1, 0], + [3898604944, 0, 1, -1, 0], + [3388129398, 0, 1, -1, 0], + [3964382884, 2, 1, -1, 1], + [3305874255, 0, 1, -1, 0], + [156590431, 0, 1, -1, 0], + [3106485751, 0, 0, -1, 0], + [3035674767, 0, 1, -1, 0], + [851697110, 0, 1, -1, 0] +] diff --git a/gateway/src/opcodes/index.ts b/gateway/src/opcodes/index.ts new file mode 100644 index 00000000..fa57f568 --- /dev/null +++ b/gateway/src/opcodes/index.ts @@ -0,0 +1,25 @@ +import { Payload } from "../util/Constants"; +import WebSocket from "../util/WebSocket"; +import { onHeartbeat } from "./Heartbeat"; +import { onIdentify } from "./Identify"; +import { onLazyRequest } from "./LazyRequest"; +import { onPresenceUpdate } from "./PresenceUpdate"; +import { onRequestGuildMembers } from "./RequestGuildMembers"; +import { onResume } from "./Resume"; +import { onVoiceStateUpdate } from "./VoiceStateUpdate"; + +export type OPCodeHandler = (this: WebSocket, data: Payload) => any; + +export default { + 1: onHeartbeat, + 2: onIdentify, + 3: onPresenceUpdate, + 4: onVoiceStateUpdate, + // 5: Voice Server Ping + 6: onResume, + // 7: Reconnect: You should attempt to reconnect and resume immediately. + 8: onRequestGuildMembers, + // 9: Invalid Session + // 10: Hello + 14: onLazyRequest, +}; diff --git a/gateway/src/opcodes/instanceOf.ts b/gateway/src/opcodes/instanceOf.ts new file mode 100644 index 00000000..c4ee5ee6 --- /dev/null +++ b/gateway/src/opcodes/instanceOf.ts @@ -0,0 +1,18 @@ +import { instanceOf } from "lambert-server"; +import { CLOSECODES } from "../util/Constants"; +import WebSocket from "../util/WebSocket"; + +export function check(this: WebSocket, schema: any, data: any) { + try { + const error = instanceOf(schema, data, { path: "body" }); + if (error !== true) { + throw error; + } + return true; + } catch (error) { + console.error(error); + // invalid payload + this.close(CLOSECODES.Decode_error); + throw error; + } +} diff --git a/gateway/src/schema/Activity.ts b/gateway/src/schema/Activity.ts new file mode 100644 index 00000000..62cf7ad6 --- /dev/null +++ b/gateway/src/schema/Activity.ts @@ -0,0 +1,49 @@ +import { ActivityBodySchema } from "@fosscord/server-util"; +import { EmojiSchema } from "./Emoji"; + +export const ActivitySchema = { + afk: Boolean, + status: String, + $activities: [ActivityBodySchema], + $since: Number, // unix time (in milliseconds) of when the client went idle, or null if the client is not idle +}; + +export interface ActivitySchema { + afk: boolean; + status: string; + activities?: [ + { + name: string; // the activity's name + type: number; // activity type // TODO: check if its between range 0-5 + url?: string; // stream url, is validated when type is 1 + created_at?: number; // unix timestamp of when the activity was added to the user's session + timestamps?: { + // unix timestamps for start and/or end of the game + start: number; + end: number; + }; + application_id?: string; // application id for the game + details?: string; + state?: string; + emoji?: EmojiSchema; + party?: { + id?: string; + size?: [number]; // used to show the party's current and maximum size // TODO: array length 2 + }; + assets?: { + large_image?: string; // the id for a large asset of the activity, usually a snowflake + large_text?: string; // text displayed when hovering over the large image of the activity + small_image?: string; // the id for a small asset of the activity, usually a snowflake + small_text?: string; // text displayed when hovering over the small image of the activity + }; + secrets?: { + join?: string; // the secret for joining a party + spectate?: string; // the secret for spectating a game + match?: string; // the secret for a specific instanced match + }; + instance?: boolean; + flags: bigint; // activity flags OR d together, describes what the payload includes + } + ]; + since?: number; // unix time (in milliseconds) of when the client went idle, or null if the client is not idle +} diff --git a/gateway/src/schema/Emoji.ts b/gateway/src/schema/Emoji.ts new file mode 100644 index 00000000..413b8359 --- /dev/null +++ b/gateway/src/schema/Emoji.ts @@ -0,0 +1,11 @@ +export const EmojiSchema = { + name: String, // the name of the emoji + $id: String, // the id of the emoji + animated: Boolean, // whether this emoji is animated +}; + +export interface EmojiSchema { + name: string; + id?: string; + animated: Boolean; +} diff --git a/gateway/src/schema/Identify.ts b/gateway/src/schema/Identify.ts new file mode 100644 index 00000000..646c5f05 --- /dev/null +++ b/gateway/src/schema/Identify.ts @@ -0,0 +1,83 @@ +import { ActivitySchema } from "./Activity"; + +export const IdentifySchema = { + token: String, + $intents: BigInt, // discord uses a Integer for bitfields we use bigints tho. | instanceOf will automatically convert the Number to a BigInt + $properties: { + // bruh discord really uses $ in the property key for bots, so we need to double prefix it, because instanceOf treats $ (prefix) as a optional key + $os: String, + $os_arch: String, + $browser: String, + $device: String, + $$os: String, + $$browser: String, + $$device: String, + $browser_user_agent: String, + $browser_version: String, + $os_version: String, + $referrer: String, + $$referrer: String, + $referring_domain: String, + $$referring_domain: String, + $referrer_current: String, + $referring_domain_current: String, + $release_channel: String, + $client_build_number: Number, + $client_event_source: String, + $client_version: String, + $system_locale: String, + }, + $presence: ActivitySchema, + $compress: Boolean, + $large_threshold: Number, + $shard: [BigInt, BigInt], + $guild_subscriptions: Boolean, + $capabilities: Number, + $client_state: { + $guild_hashes: Object, + $highest_last_message_id: String, + $read_state_version: Number, + $user_guild_settings_version: Number, + }, + $v: Number, +}; + +export interface IdentifySchema { + token: string; + properties: { + // bruh discord really uses $ in the property key, so we need to double prefix it, because instanceOf treats $ (prefix) as a optional key + os?: string; + os_atch?: string; + browser?: string; + device?: string; + $os?: string; + $browser?: string; + $device?: string; + browser_user_agent?: string; + browser_version?: string; + os_version?: string; + referrer?: string; + referring_domain?: string; + referrer_current?: string; + referring_domain_current?: string; + release_channel?: "stable" | "dev" | "ptb" | "canary"; + client_build_number?: number; + client_event_source?: any; + client_version?: string; + system_locale?: string; + }; + intents?: bigint; // discord uses a Integer for bitfields we use bigints tho. | instanceOf will automatically convert the Number to a BigInt + presence?: ActivitySchema; + compress?: boolean; + large_threshold?: number; + shard?: [bigint, bigint]; + guild_subscriptions?: boolean; + capabilities?: number; + client_state?: { + guild_hashes?: any; + highest_last_message_id?: string; + read_state_version?: number; + user_guild_settings_version?: number; + }; + v?: number; +} diff --git a/gateway/src/schema/LazyRequest.ts b/gateway/src/schema/LazyRequest.ts new file mode 100644 index 00000000..7c828ac6 --- /dev/null +++ b/gateway/src/schema/LazyRequest.ts @@ -0,0 +1,19 @@ +export interface LazyRequest { + guild_id: string; + channels?: Record<string, [number, number]>; + activities?: boolean; + threads?: boolean; + typing?: true; + members?: any[]; + thread_member_lists?: any[]; +} + +export const LazyRequest = { + guild_id: String, + $activities: Boolean, + $channels: Object, + $typing: Boolean, + $threads: Boolean, + $members: [] as any[], + $thread_member_lists: [] as any[], +}; diff --git a/gateway/src/schema/VoiceStateUpdate.ts.ts b/gateway/src/schema/VoiceStateUpdate.ts.ts new file mode 100644 index 00000000..4345c2f6 --- /dev/null +++ b/gateway/src/schema/VoiceStateUpdate.ts.ts @@ -0,0 +1,15 @@ +export const VoiceStateUpdateSchema = { + $guild_id: String, + channel_id: String, + self_mute: Boolean, + self_deaf: Boolean, + self_video: Boolean, +}; + +export interface VoiceStateUpdateSchema { + guild_id?: string; + channel_id: string; + self_mute: boolean; + self_deaf: boolean; + self_video: boolean; +} diff --git a/gateway/src/start.ts b/gateway/src/start.ts new file mode 100644 index 00000000..09a54751 --- /dev/null +++ b/gateway/src/start.ts @@ -0,0 +1,14 @@ +process.on("uncaughtException", console.error); +process.on("unhandledRejection", console.error); + +import { Server } from "./Server"; +import { config } from "dotenv"; +config(); + +var port = Number(process.env.PORT); +if (isNaN(port)) port = 3002; + +const server = new Server({ + port, +}); +server.start(); diff --git a/gateway/src/util/Config.ts b/gateway/src/util/Config.ts new file mode 100644 index 00000000..9ceb8cd5 --- /dev/null +++ b/gateway/src/util/Config.ts @@ -0,0 +1,41 @@ +// @ts-nocheck +import { Config } from "@fosscord/server-util"; +import { getConfigPathForFile } from "@fosscord/server-util/dist/util/Config"; +import Ajv, { JSONSchemaType } from "ajv"; + +export interface DefaultOptions { + endpoint?: string; + security: { + jwtSecret: string; + }; +} + +const schema: JSONSchemaType<DefaultOptions> = { + type: "object", + properties: { + endpoint: { + type: "string", + nullable: true, + }, + security: { + type: "object", + properties: { + jwtSecret: { + type: "string", + }, + }, + required: ["jwtSecret"], + }, + }, + required: ["security"], +}; + +const ajv = new Ajv(); +const validator = ajv.compile(schema); + +const configPath = getConfigPathForFile("fosscord", "gateway", ".json"); +export const gatewayConfig = new Config<DefaultOptions>({ + path: configPath, + schemaValidator: validator, + schema: schema, +}); diff --git a/gateway/src/util/Constants.ts b/gateway/src/util/Constants.ts new file mode 100644 index 00000000..692f9028 --- /dev/null +++ b/gateway/src/util/Constants.ts @@ -0,0 +1,50 @@ +export enum OPCODES { + Dispatch = 0, + Heartbeat = 1, + Identify = 2, + Presence_Update = 3, + Voice_State_Update = 4, + Voice_Server_Ping = 5, // ? What is opcode 5? + Resume = 6, + Reconnect = 7, + Request_Guild_Members = 8, + Invalid_Session = 9, + Hello = 10, + Heartbeat_ACK = 11, + Guild_Sync = 12, + DM_Update = 13, + Lazy_Request = 14, + Lobby_Connect = 15, + Lobby_Disconnect = 16, + Lobby_Voice_States_Update = 17, + Stream_Create = 18, + Stream_Delete = 19, + Stream_Watch = 20, + Stream_Ping = 21, + Stream_Set_Paused = 22, + Request_Application_Commands = 24, +} +export enum CLOSECODES { + Unknown_error = 4000, + Unknown_opcode, + Decode_error, + Not_authenticated, + Authentication_failed, + Already_authenticated, + Invalid_session, + Invalid_seq, + Rate_limited, + Session_timed_out, + Invalid_shard, + Sharding_required, + Invalid_API_version, + Invalid_intent, + Disallowed_intent, +} + +export interface Payload { + op: OPCODES; + d?: any; + s?: number; + t?: string; +} diff --git a/gateway/src/util/Send.ts b/gateway/src/util/Send.ts new file mode 100644 index 00000000..be25ac4f --- /dev/null +++ b/gateway/src/util/Send.ts @@ -0,0 +1,28 @@ +var erlpack: any; +try { + erlpack = require("erlpack"); +} catch (error) {} +import { Payload } from "../util/Constants"; + +import WebSocket from "./WebSocket"; + +export async function Send(socket: WebSocket, data: Payload) { + let buffer: Buffer | string; + if (socket.encoding === "etf") buffer = erlpack.pack(data); + // TODO: encode circular object + else if (socket.encoding === "json") buffer = JSON.stringify(data); + else return; + // TODO: compression + if (socket.deflate) { + socket.deflate.write(buffer); + socket.deflate.flush(); + return; + } + + return new Promise((res, rej) => { + socket.send(buffer, (err) => { + if (err) return rej(err); + return res(null); + }); + }); +} diff --git a/gateway/src/util/WebSocket.ts b/gateway/src/util/WebSocket.ts new file mode 100644 index 00000000..1bd0ff2f --- /dev/null +++ b/gateway/src/util/WebSocket.ts @@ -0,0 +1,23 @@ +import { Intents, Permissions } from "@fosscord/server-util"; +import WS, { Server, Data } from "ws"; +import { Deflate } from "zlib"; +import { Channel } from "amqplib"; + +interface WebSocket extends WS { + version: number; + user_id: string; + encoding: "etf" | "json"; + compress?: "zlib-stream"; + shard_count?: bigint; + shard_id?: bigint; + deflate?: Deflate; + heartbeatTimeout: NodeJS.Timeout; + readyTimeout: NodeJS.Timeout; + intents: Intents; + sequence: number; + rabbitCh?: Channel & { queues: Record<string, string> }; + permissions: Record<string, Permissions>; +} + +export default WebSocket; +export { Server, Data }; diff --git a/gateway/src/util/setHeartbeat.ts b/gateway/src/util/setHeartbeat.ts new file mode 100644 index 00000000..9f88b481 --- /dev/null +++ b/gateway/src/util/setHeartbeat.ts @@ -0,0 +1,11 @@ +import { CLOSECODES } from "./Constants"; +import WebSocket from "./WebSocket"; + +// TODO: make heartbeat timeout configurable +export function setHeartbeat(socket: WebSocket) { + if (socket.heartbeatTimeout) clearTimeout(socket.heartbeatTimeout); + + socket.heartbeatTimeout = setTimeout(() => { + return socket.close(CLOSECODES.Session_timed_out); + }, 1000 * 45); +} |