summary refs log tree commit diff
path: root/gateway/src/listener
diff options
context:
space:
mode:
authorFlam3rboy <34555296+Flam3rboy@users.noreply.github.com>2021-08-12 20:18:05 +0200
committerFlam3rboy <34555296+Flam3rboy@users.noreply.github.com>2021-08-12 20:18:05 +0200
commitfa31e7f8db61efe085f7d8a317e6a8640ebb3f46 (patch)
tree5d12bfa42b9ed09e67935c1e6a5063babe33eeb8 /gateway/src/listener
parentMerge branch 'master' into gateway (diff)
downloadserver-fa31e7f8db61efe085f7d8a317e6a8640ebb3f46.tar.xz
:sparkles: gateway
Diffstat (limited to 'gateway/src/listener')
-rw-r--r--gateway/src/listener/listener.ts382
1 files changed, 382 insertions, 0 deletions
diff --git a/gateway/src/listener/listener.ts b/gateway/src/listener/listener.ts
new file mode 100644

index 00000000..6a6967d6 --- /dev/null +++ b/gateway/src/listener/listener.ts
@@ -0,0 +1,382 @@ +import { + db, + Event, + MongooseCache, + UserModel, + getPermission, + Permissions, + ChannelModel, + RabbitMQ, + EVENT, +} from "@fosscord/server-util"; +import { OPCODES } from "../util/Constants"; +import { Send } from "../util/Send"; +import WebSocket from "../util/WebSocket"; +import "missing-native-js-functions"; +import { ConsumeMessage } from "amqplib"; + +// TODO: close connection on Invalidated Token +// TODO: check intent +// TODO: Guild Member Update is sent for current-user updates regardless of whether the GUILD_MEMBERS intent is set. +// ? How to resubscribe MongooseCache for new dm channel events? Maybe directly send them to the user_id regardless of the channel_id? -> max overhead of creating 10 events in database for dm user group. Or a new field in event -> recipient_ids? + +// Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards +// https://discord.com/developers/docs/topics/gateway#sharding + +export interface DispatchOpts { + eventStream: MongooseCache; + guilds: Array<string>; +} + +function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) { + if (this.shard_count) { + guilds = guilds.filter((x) => (BigInt(x) >> 22n) % this.shard_count! === this.shard_id); + } + + return [ + { + $match: { + $or: [ + { "fullDocument.guild_id": { $in: guilds } }, + { "fullDocument.user_id": this.user_id }, + { "fullDocument.channel_id": { $in: channels } }, + ], + }, + }, + ]; +} + +async function rabbitListen(this: WebSocket, id: string) { + await this.rabbitCh!.assertExchange(id, "fanout", { durable: false }); + const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true }); + + this.rabbitCh!.bindQueue(q.queue, id, ""); + this.rabbitCh!.consume(q.queue, consume.bind(this), { + noAck: false, + }); + this.rabbitCh!.queues[id] = q.queue; +} + +// TODO: use already required guilds/channels of Identify and don't fetch them again +export async function setupListener(this: WebSocket) { + const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); + const channels = await ChannelModel.find( + { $or: [{ recipient_ids: this.user_id }, { guild_id: { $in: user.guilds } }] }, + { id: true, permission_overwrites: true } + ).exec(); + const dm_channels = channels.filter((x) => !x.guild_id); + const guild_channels = channels.filter((x) => x.guild_id); + + if (RabbitMQ.connection) { + // @ts-ignore + this.rabbitCh = await RabbitMQ.connection.createChannel(); + this.rabbitCh!.queues = {}; + + rabbitListen.call(this, this.user_id); + + for (const channel of dm_channels) { + rabbitListen.call(this, channel.id); + } + for (const guild of user.guilds) { + // contains guild and dm channels + + getPermission(this.user_id, guild) + .then((x) => { + this.permissions[guild] = x; + rabbitListen.call(this, guild); + for (const channel of guild_channels) { + if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { + rabbitListen.call(this, channel.id); + } + } + }) + .catch((e) => {}); + } + + this.once("close", () => { + this.rabbitCh!.close(); + }); + } else { + const eventStream = new MongooseCache( + db.collection("events"), + getPipeline.call( + this, + user.guilds, + channels.map((x) => x.id) + ), + { + onlyEvents: true, + } + ); + + await eventStream.init(); + eventStream.on("insert", (document: Event) => + dispatch.call(this, document, { eventStream, guilds: user.guilds }) + ); + + this.once("close", () => eventStream.destroy()); + } +} + +// TODO: use rabbitmq to only receive events that are included in intents +function consume(this: WebSocket, opts: ConsumeMessage | null) { + if (!opts) return; + if (!this.rabbitCh) return; + const data = JSON.parse(opts.content.toString()); + const id = data.id as string; + const event = opts.properties.type as EVENT; + const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm + + console.log("rabbitmq event", event); + + // subscription managment + switch (event) { + case "CHANNEL_DELETE": + case "GUILD_DELETE": + this.rabbitCh.cancel(id); + break; + case "CHANNEL_CREATE": + // TODO: check if user has permission to channel + case "GUILD_CREATE": + rabbitListen.call(this, id); + break; + case "CHANNEL_UPDATE": + const queue_id = this.rabbitCh.queues[id]; + // @ts-ignore + const exists = this.rabbitCh.consumers[id]; + if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { + if (exists) break; + rabbitListen.call(this, id); + } else { + if (!exists) break; + this.rabbitCh.cancel(queue_id); + this.rabbitCh.unbindQueue(queue_id, id, ""); + } + break; + } + + // permission checking + switch (event) { + case "INVITE_CREATE": + case "INVITE_DELETE": + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + case "PRESENCE_UPDATE": // exception if user is friend + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + Send(this, { + op: OPCODES.Dispatch, + t: event, + d: data, + s: this.sequence++, + }); + this.rabbitCh.ack(opts); +} + +// TODO: cache permission +// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries. +// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes. +// TODO: only subscribe for events that are in the connection intents +// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes) + +export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) { + var permission = new Permissions("ADMINISTRATOR"); // default permission for dms + console.log("event", document); + var channel_id = document.channel_id || document.data?.channel_id; + // TODO: clean up + if (document.event === "GUILD_CREATE") { + guilds.push(document.data.id); + eventStream.changeStream(getPipeline.call(this, guilds)); + } else if (document.event === "GUILD_DELETE") { + guilds.remove(document.guild_id!); + eventStream.changeStream(getPipeline.call(this, guilds)); + } else if (document.event === "CHANNEL_DELETE") channel_id = null; + if (document.guild_id && !this.intents.has("GUILDS")) return; + + try { + permission = await getPermission(this.user_id, document.guild_id, channel_id); + } catch (e) { + permission = new Permissions(); + } + + // check intents: https://discord.com/developers/docs/topics/gateway#gateway-intents + switch (document.event) { + case "GUILD_DELETE": + case "GUILD_CREATE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + // gets sent if GUILDS intent is set (already checked in if document.guild_id) + break; + case "GUILD_INTEGRATIONS_UPDATE": + if (!this.intents.has("GUILD_INTEGRATIONS")) return; + break; + case "WEBHOOKS_UPDATE": + if (!this.intents.has("GUILD_WEBHOOKS")) return; + break; + case "GUILD_EMOJI_UPDATE": + if (!this.intents.has("GUILD_EMOJIS")) return; + break; + // only send them, if the user subscribed for this part of the member list, or is a bot + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + if (!this.intents.has("GUILD_MEMBERS")) return; + break; + case "VOICE_STATE_UPDATE": + if (!this.intents.has("GUILD_VOICE_STATES")) return; + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!this.intents.has("GUILD_BANS")) return; + break; + case "INVITE_CREATE": + case "INVITE_DELETE": + if (!this.intents.has("GUILD_INVITES")) return; + case "PRESENCE_UPDATE": + if (!this.intents.has("GUILD_PRESENCES")) return; + break; + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + if (!this.intents.has("GUILD_MESSAGES") && document.guild_id) return; + if (!this.intents.has("DIRECT_MESSAGES") && !document.guild_id) return; + break; + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + if (!this.intents.has("GUILD_MESSAGE_REACTIONS") && document.guild_id) return; + if (!this.intents.has("DIRECT_MESSAGE_REACTIONS") && !document.guild_id) return; + break; + + case "TYPING_START": + if (!this.intents.has("GUILD_MESSAGE_TYPING") && document.guild_id) return; + if (!this.intents.has("DIRECT_MESSAGE_TYPING") && !document.guild_id) return; + break; + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // Any events not defined in an intent are considered "passthrough" and will always be sent to you. + break; + } + + // check permissions + switch (document.event) { + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "INVITE_CREATE": + case "INVITE_DELETE": + if (!permission.has("MANAGE_GUILD")) break; + case "PRESENCE_UPDATE": + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + return Send(this, { + op: OPCODES.Dispatch, + t: document.event, + d: document.data, + s: this.sequence++, + }); +}