summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Server.ts3
-rw-r--r--src/events/Connection.ts1
-rw-r--r--src/events/Message.ts4
-rw-r--r--src/listener/listener.ts193
-rw-r--r--src/opcodes/LazyRequest.ts21
-rw-r--r--src/util/WebSocket.ts5
6 files changed, 190 insertions, 37 deletions
diff --git a/src/Server.ts b/src/Server.ts

index 06cd74de..d4b3271c 100644 --- a/src/Server.ts +++ b/src/Server.ts
@@ -1,7 +1,7 @@ import "missing-native-js-functions"; import dotenv from "dotenv"; dotenv.config(); -import { Config, db } from "@fosscord/server-util"; +import { Config, db, RabbitMQ } from "@fosscord/server-util"; import { Server as WebSocketServer } from "ws"; import { Connection } from "./events/Connection"; import http from "http"; @@ -40,6 +40,7 @@ export class Server { await (db as Promise<Connection>); await this.setupSchema(); await Config.init(); + await RabbitMQ.init(); console.log("[Database] connected"); if (!this.server.listening) { this.server.listen(this.port); diff --git a/src/events/Connection.ts b/src/events/Connection.ts
index 473c992c..1ef9fb48 100644 --- a/src/events/Connection.ts +++ b/src/events/Connection.ts
@@ -40,6 +40,7 @@ export async function Connection(this: Server, socket: WebSocket, request: Incom socket.deflate.on("data", (chunk) => socket.send(chunk)); } + socket.permissions = {}; socket.sequence = 0; setHeartbeat(socket); diff --git a/src/events/Message.ts b/src/events/Message.ts
index 51c5a294..2ca82b3c 100644 --- a/src/events/Message.ts +++ b/src/events/Message.ts
@@ -25,8 +25,6 @@ export async function Message(this: WebSocket, buffer: Data) { check.call(this, PayloadSchema, data); - console.log(data); - // @ts-ignore const OPCodeHandler = OPCodeHandlers[data.op]; if (!OPCodeHandler) { @@ -36,6 +34,8 @@ export async function Message(this: WebSocket, buffer: Data) { return; } + console.log("got: " + OPCodeHandler.name); + try { return await OPCodeHandler.call(this, data); } catch (error) { diff --git a/src/listener/listener.ts b/src/listener/listener.ts
index ae15c971..692c12b6 100644 --- a/src/listener/listener.ts +++ b/src/listener/listener.ts
@@ -1,8 +1,19 @@ -import { db, Event, MongooseCache, UserModel, getPermission, Permissions, ChannelModel } from "@fosscord/server-util"; +import { + db, + Event, + MongooseCache, + UserModel, + getPermission, + Permissions, + ChannelModel, + RabbitMQ, + EVENT, +} from "@fosscord/server-util"; import { OPCODES } from "../util/Constants"; import { Send } from "../util/Send"; import WebSocket from "../util/WebSocket"; import "missing-native-js-functions"; +import { ConsumeMessage } from "amqplib"; // TODO: close connection on Invalidated Token // TODO: check intent @@ -35,40 +46,176 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) ]; } +// TODO: use already required guilds/channels of Identify and don't fetch them again export async function setupListener(this: WebSocket) { - const channels = await ChannelModel.find({ recipient_ids: this.user_id }, { id: true }).exec(); - console.log( - "subscribe to channels", - channels.map((x) => x.id) - ); - const user = await UserModel.findOne({ id: this.user_id }).lean().exec(); - var guilds = user!.guilds; - - const eventStream = new MongooseCache( - db.collection("events"), - getPipeline.call( - this, - guilds, - channels.map((x) => x.id) - ), - { - onlyEvents: true, + const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); + const channels = await ChannelModel.find( + { $or: [{ recipient_ids: this.user_id }, { guild_id: { $in: user.guilds } }] }, + { id: true, permission_overwrites: true } + ).exec(); + const dm_channels = channels.filter((x) => !x.guild_id); + const guild_channels = channels.filter((x) => x.guild_id); + + if (RabbitMQ.connection) { + this.rabbitCh = await RabbitMQ.connection.createChannel(); + this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this))); + + for (const channel of dm_channels) { + this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this))); + } + for (const guild of user.guilds) { + // contains guild and dm channels + + getPermission(this.user_id, guild) + .then((x) => { + this.permissions[guild] = x; + this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this))); + for (const channel of guild_channels) { + if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { + this.rabbitCh!.assertQueue(channel.id).then(() => + this.rabbitCh!.consume(channel.id, consume.bind(this)) + ); + } + } + }) + .catch((e) => {}); } - ); - await eventStream.init(); - eventStream.on("insert", (document: Event) => dispatch.call(this, document, { eventStream, guilds })); + this.once("close", () => { + this.rabbitCh!.close(); + }); + } else { + const eventStream = new MongooseCache( + db.collection("events"), + getPipeline.call( + this, + user.guilds, + channels.map((x) => x.id) + ), + { + onlyEvents: true, + } + ); - this.once("close", () => eventStream.destroy()); + await eventStream.init(); + eventStream.on("insert", (document: Event) => + dispatch.call(this, document, { eventStream, guilds: user.guilds }) + ); + + this.once("close", () => eventStream.destroy()); + } +} + +// TODO: use rabbitmq to only receive events that are included in intents +function consume(this: WebSocket, opts: ConsumeMessage | null) { + if (!opts) return; + if (!this.rabbitCh) return; + const data = JSON.parse(opts.content.toString()); + const id = data.id as string; + const event = opts.properties.type as EVENT; + const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm + + console.log("rabbitmq event", event); + + // subscription managment + switch (event) { + case "CHANNEL_DELETE": + case "GUILD_DELETE": + this.rabbitCh.cancel(id); + break; + case "CHANNEL_CREATE": + // TODO: check if user has permission to channel + case "GUILD_CREATE": + this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + break; + case "CHANNEL_UPDATE": + // @ts-ignore + const exists = this.rabbitCh.consumers[id]; + if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { + if (exists) break; + this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + } else { + if (!exists) break; + this.rabbitCh.cancel(id); + } + break; + } + + // permission checking + switch (event) { + case "INVITE_CREATE": + case "INVITE_DELETE": + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + case "PRESENCE_UPDATE": // exception if user is friend + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + Send(this, { + op: OPCODES.Dispatch, + t: event, + d: data, + s: this.sequence++, + }); + this.rabbitCh.ack(opts); } // TODO: cache permission +// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries. +// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes. +// TODO: only subscribe for events that are in the connection intents +// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes) export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) { var permission = new Permissions("ADMINISTRATOR"); // default permission for dms console.log("event", document); var channel_id = document.channel_id || document.data?.channel_id; - + // TODO: clean up if (document.event === "GUILD_CREATE") { guilds.push(document.data.id); eventStream.changeStream(getPipeline.call(this, guilds)); diff --git a/src/opcodes/LazyRequest.ts b/src/opcodes/LazyRequest.ts
index 8a7bb8c4..b1d553b9 100644 --- a/src/opcodes/LazyRequest.ts +++ b/src/opcodes/LazyRequest.ts
@@ -23,6 +23,7 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) { const { guild_id, typing, channels, activities } = d as LazyRequest; const permissions = await getPermission(this.user_id, guild_id); + permissions.hasThrow("VIEW_CHANNEL"); // MongoDB query to retrieve all hoisted roles and join them with the members and users collection const roles = toObject( @@ -70,16 +71,16 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) { const items = []; for (const role of roles) { - items.push({ - group: { - count: role.members.length, - id: role.id === guild_id ? "online" : role.name - } - }); - for (const member of role.members) { - member.roles.remove(guild_id); - items.push({ member }); - } + items.push({ + group: { + count: role.members.length, + id: role.id === guild_id ? "online" : role.name, + }, + }); + for (const member of role.members) { + member.roles.remove(guild_id); + items.push({ member }); + } } return Send(this, { diff --git a/src/util/WebSocket.ts b/src/util/WebSocket.ts
index 347d78cb..11db47e0 100644 --- a/src/util/WebSocket.ts +++ b/src/util/WebSocket.ts
@@ -1,6 +1,7 @@ -import { Intents } from "@fosscord/server-util"; +import { Intents, Permissions } from "@fosscord/server-util"; import WS, { Server, Data } from "ws"; import { Deflate } from "zlib"; +import { Channel } from "amqplib"; interface WebSocket extends WS { version: number; @@ -14,6 +15,8 @@ interface WebSocket extends WS { readyTimeout: NodeJS.Timeout; intents: Intents; sequence: number; + rabbitCh?: Channel; + permissions: Record<string, Permissions>; } export default WebSocket;