diff options
Diffstat (limited to 'gateway/src/listener/listener.ts')
-rw-r--r-- | gateway/src/listener/listener.ts | 309 |
1 files changed, 51 insertions, 258 deletions
diff --git a/gateway/src/listener/listener.ts b/gateway/src/listener/listener.ts index 6a6967d6..708bfe5c 100644 --- a/gateway/src/listener/listener.ts +++ b/gateway/src/listener/listener.ts @@ -1,63 +1,31 @@ import { db, Event, - MongooseCache, UserModel, getPermission, Permissions, ChannelModel, RabbitMQ, EVENT, -} from "@fosscord/server-util"; + listenEvent, + EventOpts, + ListenEventOpts, +} from "@fosscord/util"; import { OPCODES } from "../util/Constants"; import { Send } from "../util/Send"; import WebSocket from "../util/WebSocket"; import "missing-native-js-functions"; import { ConsumeMessage } from "amqplib"; +import { Channel } from "amqplib"; // TODO: close connection on Invalidated Token // TODO: check intent // TODO: Guild Member Update is sent for current-user updates regardless of whether the GUILD_MEMBERS intent is set. -// ? How to resubscribe MongooseCache for new dm channel events? Maybe directly send them to the user_id regardless of the channel_id? -> max overhead of creating 10 events in database for dm user group. Or a new field in event -> recipient_ids? // Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards // https://discord.com/developers/docs/topics/gateway#sharding -export interface DispatchOpts { - eventStream: MongooseCache; - guilds: Array<string>; -} - -function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) { - if (this.shard_count) { - guilds = guilds.filter((x) => (BigInt(x) >> 22n) % this.shard_count! === this.shard_id); - } - - return [ - { - $match: { - $or: [ - { "fullDocument.guild_id": { $in: guilds } }, - { "fullDocument.user_id": this.user_id }, - { "fullDocument.channel_id": { $in: channels } }, - ], - }, - }, - ]; -} - -async function rabbitListen(this: WebSocket, id: string) { - await this.rabbitCh!.assertExchange(id, "fanout", { durable: false }); - const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true }); - - this.rabbitCh!.bindQueue(q.queue, id, ""); - this.rabbitCh!.consume(q.queue, consume.bind(this), { - noAck: false, - }); - this.rabbitCh!.queues[id] = q.queue; -} - -// TODO: use already required guilds/channels of Identify and don't fetch them again +// TODO: use already queried guilds/channels of Identify and don't fetch them again export async function setupListener(this: WebSocket) { const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); const channels = await ChannelModel.find( @@ -67,90 +35,77 @@ export async function setupListener(this: WebSocket) { const dm_channels = channels.filter((x) => !x.guild_id); const guild_channels = channels.filter((x) => x.guild_id); + const opts: { acknowledge: boolean; channel?: Channel } = { acknowledge: true }; + const consumer = consume.bind(this); + if (RabbitMQ.connection) { + opts.channel = await RabbitMQ.connection.createChannel(); // @ts-ignore - this.rabbitCh = await RabbitMQ.connection.createChannel(); - this.rabbitCh!.queues = {}; + opts.channel.queues = {}; + } - rabbitListen.call(this, this.user_id); + this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts); - for (const channel of dm_channels) { - rabbitListen.call(this, channel.id); - } - for (const guild of user.guilds) { - // contains guild and dm channels + for (const channel of dm_channels) { + this.events[channel.id] = await listenEvent(channel.id, consumer, opts); + } - getPermission(this.user_id, guild) - .then((x) => { - this.permissions[guild] = x; - rabbitListen.call(this, guild); - for (const channel of guild_channels) { - if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { - rabbitListen.call(this, channel.id); - } + for (const guild of user.guilds) { + // contains guild and dm channels + + getPermission(this.user_id, guild) + .then(async (x) => { + this.permissions[guild] = x; + this.listeners; + this.events[guild] = await listenEvent(guild, consumer, opts); + for (const channel of guild_channels) { + if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { + this.events[channel.id] = await listenEvent(channel.id, consumer, opts); } - }) - .catch((e) => {}); - } - - this.once("close", () => { - this.rabbitCh!.close(); - }); - } else { - const eventStream = new MongooseCache( - db.collection("events"), - getPipeline.call( - this, - user.guilds, - channels.map((x) => x.id) - ), - { - onlyEvents: true, - } - ); - - await eventStream.init(); - eventStream.on("insert", (document: Event) => - dispatch.call(this, document, { eventStream, guilds: user.guilds }) - ); - - this.once("close", () => eventStream.destroy()); + } + }) + .catch((e) => {}); } + + this.once("close", () => { + if (opts.channel) opts.channel.close(); + else Object.values(this.events).forEach((x) => x()); + }); } -// TODO: use rabbitmq to only receive events that are included in intents -function consume(this: WebSocket, opts: ConsumeMessage | null) { - if (!opts) return; - if (!this.rabbitCh) return; - const data = JSON.parse(opts.content.toString()); +// TODO: only subscribe for events that are in the connection intents +function consume(this: WebSocket, opts: EventOpts) { + const { data, event } = opts; const id = data.id as string; - const event = opts.properties.type as EVENT; const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm - console.log("rabbitmq event", event); + const consumer = consume.bind(this); + const listenOpts = opts as ListenEventOpts; + console.log("event", event); // subscription managment switch (event) { case "CHANNEL_DELETE": case "GUILD_DELETE": - this.rabbitCh.cancel(id); + delete this.events[id]; + opts.cancel(); break; case "CHANNEL_CREATE": + if (!permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) return; // TODO: check if user has permission to channel case "GUILD_CREATE": - rabbitListen.call(this, id); + listenEvent(id, consumer, listenOpts); break; case "CHANNEL_UPDATE": - const queue_id = this.rabbitCh.queues[id]; + const exists = this.events[id]; // @ts-ignore - const exists = this.rabbitCh.consumers[id]; if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { if (exists) break; - rabbitListen.call(this, id); + listenEvent(id, consumer, listenOpts); } else { - if (!exists) break; - this.rabbitCh.cancel(queue_id); - this.rabbitCh.unbindQueue(queue_id, id, ""); + if (!exists) return; // return -> do not send channel update events for hidden channels + opts.cancel(id); + delete this.events[id]; } break; } @@ -216,167 +171,5 @@ function consume(this: WebSocket, opts: ConsumeMessage | null) { d: data, s: this.sequence++, }); - this.rabbitCh.ack(opts); -} - -// TODO: cache permission -// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries. -// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes. -// TODO: only subscribe for events that are in the connection intents -// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes) - -export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) { - var permission = new Permissions("ADMINISTRATOR"); // default permission for dms - console.log("event", document); - var channel_id = document.channel_id || document.data?.channel_id; - // TODO: clean up - if (document.event === "GUILD_CREATE") { - guilds.push(document.data.id); - eventStream.changeStream(getPipeline.call(this, guilds)); - } else if (document.event === "GUILD_DELETE") { - guilds.remove(document.guild_id!); - eventStream.changeStream(getPipeline.call(this, guilds)); - } else if (document.event === "CHANNEL_DELETE") channel_id = null; - if (document.guild_id && !this.intents.has("GUILDS")) return; - - try { - permission = await getPermission(this.user_id, document.guild_id, channel_id); - } catch (e) { - permission = new Permissions(); - } - - // check intents: https://discord.com/developers/docs/topics/gateway#gateway-intents - switch (document.event) { - case "GUILD_DELETE": - case "GUILD_CREATE": - case "GUILD_UPDATE": - case "GUILD_ROLE_CREATE": - case "GUILD_ROLE_UPDATE": - case "GUILD_ROLE_DELETE": - case "CHANNEL_CREATE": - case "CHANNEL_DELETE": - case "CHANNEL_UPDATE": - // gets sent if GUILDS intent is set (already checked in if document.guild_id) - break; - case "GUILD_INTEGRATIONS_UPDATE": - if (!this.intents.has("GUILD_INTEGRATIONS")) return; - break; - case "WEBHOOKS_UPDATE": - if (!this.intents.has("GUILD_WEBHOOKS")) return; - break; - case "GUILD_EMOJI_UPDATE": - if (!this.intents.has("GUILD_EMOJIS")) return; - break; - // only send them, if the user subscribed for this part of the member list, or is a bot - case "GUILD_MEMBER_ADD": - case "GUILD_MEMBER_REMOVE": - case "GUILD_MEMBER_UPDATE": - if (!this.intents.has("GUILD_MEMBERS")) return; - break; - case "VOICE_STATE_UPDATE": - if (!this.intents.has("GUILD_VOICE_STATES")) return; - break; - case "GUILD_BAN_ADD": - case "GUILD_BAN_REMOVE": - if (!this.intents.has("GUILD_BANS")) return; - break; - case "INVITE_CREATE": - case "INVITE_DELETE": - if (!this.intents.has("GUILD_INVITES")) return; - case "PRESENCE_UPDATE": - if (!this.intents.has("GUILD_PRESENCES")) return; - break; - case "MESSAGE_CREATE": - case "MESSAGE_DELETE": - case "MESSAGE_DELETE_BULK": - case "MESSAGE_UPDATE": - case "CHANNEL_PINS_UPDATE": - if (!this.intents.has("GUILD_MESSAGES") && document.guild_id) return; - if (!this.intents.has("DIRECT_MESSAGES") && !document.guild_id) return; - break; - case "MESSAGE_REACTION_ADD": - case "MESSAGE_REACTION_REMOVE": - case "MESSAGE_REACTION_REMOVE_ALL": - case "MESSAGE_REACTION_REMOVE_EMOJI": - if (!this.intents.has("GUILD_MESSAGE_REACTIONS") && document.guild_id) return; - if (!this.intents.has("DIRECT_MESSAGE_REACTIONS") && !document.guild_id) return; - break; - - case "TYPING_START": - if (!this.intents.has("GUILD_MESSAGE_TYPING") && document.guild_id) return; - if (!this.intents.has("DIRECT_MESSAGE_TYPING") && !document.guild_id) return; - break; - case "READY": // will be sent by the gateway - case "USER_UPDATE": - case "APPLICATION_COMMAND_CREATE": - case "APPLICATION_COMMAND_DELETE": - case "APPLICATION_COMMAND_UPDATE": - default: - // Any events not defined in an intent are considered "passthrough" and will always be sent to you. - break; - } - - // check permissions - switch (document.event) { - case "GUILD_INTEGRATIONS_UPDATE": - if (!permission.has("MANAGE_GUILD")) return; - break; - case "WEBHOOKS_UPDATE": - if (!permission.has("MANAGE_WEBHOOKS")) return; - break; - case "GUILD_MEMBER_ADD": - case "GUILD_MEMBER_REMOVE": - case "GUILD_MEMBER_UPDATE": - // only send them, if the user subscribed for this part of the member list, or is a bot - break; - case "GUILD_BAN_ADD": - case "GUILD_BAN_REMOVE": - if (!permission.has("BAN_MEMBERS")) break; - break; - case "INVITE_CREATE": - case "INVITE_DELETE": - if (!permission.has("MANAGE_GUILD")) break; - case "PRESENCE_UPDATE": - break; - case "VOICE_STATE_UPDATE": - case "MESSAGE_CREATE": - case "MESSAGE_DELETE": - case "MESSAGE_DELETE_BULK": - case "MESSAGE_UPDATE": - case "CHANNEL_PINS_UPDATE": - case "MESSAGE_REACTION_ADD": - case "MESSAGE_REACTION_REMOVE": - case "MESSAGE_REACTION_REMOVE_ALL": - case "MESSAGE_REACTION_REMOVE_EMOJI": - case "TYPING_START": - // only gets send if the user is alowed to view the current channel - if (!permission.has("VIEW_CHANNEL")) return; - break; - case "GUILD_CREATE": - case "GUILD_DELETE": - case "GUILD_UPDATE": - case "GUILD_ROLE_CREATE": - case "GUILD_ROLE_UPDATE": - case "GUILD_ROLE_DELETE": - case "CHANNEL_CREATE": - case "CHANNEL_DELETE": - case "CHANNEL_UPDATE": - case "GUILD_EMOJI_UPDATE": - case "READY": // will be sent by the gateway - case "USER_UPDATE": - case "APPLICATION_COMMAND_CREATE": - case "APPLICATION_COMMAND_DELETE": - case "APPLICATION_COMMAND_UPDATE": - default: - // always gets sent - // Any events not defined in an intent are considered "passthrough" and will always be sent - break; - } - - return Send(this, { - op: OPCODES.Dispatch, - t: document.event, - d: document.data, - s: this.sequence++, - }); + opts.acknowledge?.(); } |