summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlam3rboy <34555296+Flam3rboy@users.noreply.github.com>2021-08-12 16:47:52 +0200
committerFlam3rboy <34555296+Flam3rboy@users.noreply.github.com>2021-08-12 16:47:52 +0200
commit421cf4c3db9a2810285b123dfd82f116f81d0fa7 (patch)
tree288d33739c2a947522fbb69a320aa4d3c5cbec2b
parent:bug: fix voicestate (diff)
downloadserver-421cf4c3db9a2810285b123dfd82f116f81d0fa7.tar.xz
:sparkles: rabbitmq
-rw-r--r--.vscode/launch.json2
-rw-r--r--package-lock.json216
-rw-r--r--package.json4
-rw-r--r--src/Server.ts3
-rw-r--r--src/events/Connection.ts1
-rw-r--r--src/events/Message.ts4
-rw-r--r--src/listener/listener.ts193
-rw-r--r--src/opcodes/LazyRequest.ts21
-rw-r--r--src/util/WebSocket.ts5
9 files changed, 403 insertions, 46 deletions
diff --git a/.vscode/launch.json b/.vscode/launch.json

index 07fd32ac..29bdde13 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json
@@ -9,7 +9,7 @@ "type": "node", "request": "launch", "name": "Launch Server", - "program": "${workspaceFolder}/dist/index.js", + "program": "${workspaceFolder}/dist/start.js", "preLaunchTask": "tsc: build - tsconfig.json", "outFiles": ["${workspaceFolder}/dist/**/*.js"] } diff --git a/package-lock.json b/package-lock.json
index dac9cbcd..f3b31763 100644 --- a/package-lock.json +++ b/package-lock.json
@@ -9,8 +9,9 @@ "version": "1.0.0", "license": "ISC", "dependencies": { - "@fosscord/server-util": "^1.3.43", + "@fosscord/server-util": "^1.3.51", "ajv": "^8.5.0", + "amqplib": "^0.8.0", "dotenv": "^8.2.0", "jsonwebtoken": "^8.5.1", "lambert-server": "^1.2.8", @@ -22,6 +23,7 @@ "ws": "^7.4.2" }, "devDependencies": { + "@types/amqplib": "^0.8.1", "@types/jsonwebtoken": "^8.5.0", "@types/mongoose-autopopulate": "^0.10.1", "@types/uuid": "^8.3.0", @@ -31,15 +33,16 @@ } }, "node_modules/@fosscord/server-util": { - "version": "1.3.43", - "resolved": "https://registry.npmjs.org/@fosscord/server-util/-/server-util-1.3.43.tgz", - "integrity": "sha512-zHJOi4ZZNWZo/7DsfszHBsQpBVVO2r82gP7JQ55aebRK9Bc4UaYM9684iA1quRiTxUInvbvVcf8/iAx8m55qSA==", + "version": "1.3.51", + "resolved": "https://registry.npmjs.org/@fosscord/server-util/-/server-util-1.3.51.tgz", + "integrity": "sha512-z7gAwj8UFTrbabtyMqb/7mhiTTpk07dKMu2ZaL4yeTPBR2Ppnkd3JqYHcrqIyMfeENgzTFoOVSjtun1aFUFDag==", "dependencies": { "@types/jsonwebtoken": "^8.5.0", "@types/mongoose-autopopulate": "^0.10.1", "@types/mongoose-lean-virtuals": "^0.5.1", "@types/node": "^14.14.25", "ajv": "^8.5.0", + "amqplib": "^0.8.0", "dot-prop": "^6.0.1", "env-paths": "^2.2.1", "jsonwebtoken": "^8.5.1", @@ -50,6 +53,22 @@ "typescript": "^4.1.3" } }, + "node_modules/@types/amqplib": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.1.tgz", + "integrity": "sha512-8dCjF+dHZ8Y6JOoHD1BMnxP0quAncvZq4wA/lS072NjX9vIzVRSMcmfKy2Os8ZQ8VWWp74MD09GMbVbKS6/Fxw==", + "dev": true, + "dependencies": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, + "node_modules/@types/bluebird": { + "version": "3.5.36", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.36.tgz", + "integrity": "sha512-HBNx4lhkxN7bx6P0++W8E289foSu8kO8GCk2unhuVggO+cE7rh9DhZUyPhUxNRG9m+5B5BTKxZQ5ZP92x/mx9Q==", + "dev": true + }, "node_modules/@types/bson": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/@types/bson/-/bson-4.0.3.tgz", @@ -159,6 +178,48 @@ "url": "https://github.com/sponsors/epoberezkin" } }, + "node_modules/amqplib": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.8.0.tgz", + "integrity": "sha512-icU+a4kkq4Y1PS4NNi+YPDMwdlbFcZ1EZTQT2nigW3fvOb6AOgUQ9+Mk4ue0Zu5cBg/XpDzB40oH10ysrk2dmA==", + "dependencies": { + "bitsyntax": "~0.1.0", + "bluebird": "^3.7.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "safe-buffer": "~5.2.1", + "url-parse": "~1.5.1" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/amqplib/node_modules/bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" + }, + "node_modules/amqplib/node_modules/isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + }, + "node_modules/amqplib/node_modules/readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "node_modules/amqplib/node_modules/string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + }, "node_modules/ansi-styles": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", @@ -230,6 +291,24 @@ "file-uri-to-path": "1.0.0" } }, + "node_modules/bitsyntax": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.1.0.tgz", + "integrity": "sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "debug": "~2.6.9", + "safe-buffer": "~5.1.2" + }, + "engines": { + "node": ">=0.8" + } + }, + "node_modules/bitsyntax/node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, "node_modules/bl": { "version": "2.2.1", "resolved": "https://registry.npmjs.org/bl/-/bl-2.2.1.tgz", @@ -305,6 +384,11 @@ "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==", "dev": true }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "node_modules/bytes": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", @@ -1640,6 +1724,11 @@ "node": ">=0.6" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "node_modules/range-parser": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz", @@ -1758,6 +1847,11 @@ "node": ">=0.10.0" } }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, "node_modules/resolve": { "version": "1.20.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.20.0.tgz", @@ -2178,6 +2272,15 @@ "punycode": "^2.1.0" } }, + "node_modules/url-parse": { + "version": "1.5.3", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.3.tgz", + "integrity": "sha512-IIORyIQD9rvj0A4CLWsHkBBJuNqWpFQe224b6j9t/ABmquIS0qDU2pY6kl6AuOrL5OkCXHMCFNe1jBcuAggjvQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", @@ -2264,15 +2367,16 @@ }, "dependencies": { "@fosscord/server-util": { - "version": "1.3.43", - "resolved": "https://registry.npmjs.org/@fosscord/server-util/-/server-util-1.3.43.tgz", - "integrity": "sha512-zHJOi4ZZNWZo/7DsfszHBsQpBVVO2r82gP7JQ55aebRK9Bc4UaYM9684iA1quRiTxUInvbvVcf8/iAx8m55qSA==", + "version": "1.3.51", + "resolved": "https://registry.npmjs.org/@fosscord/server-util/-/server-util-1.3.51.tgz", + "integrity": "sha512-z7gAwj8UFTrbabtyMqb/7mhiTTpk07dKMu2ZaL4yeTPBR2Ppnkd3JqYHcrqIyMfeENgzTFoOVSjtun1aFUFDag==", "requires": { "@types/jsonwebtoken": "^8.5.0", "@types/mongoose-autopopulate": "^0.10.1", "@types/mongoose-lean-virtuals": "^0.5.1", "@types/node": "^14.14.25", "ajv": "^8.5.0", + "amqplib": "^0.8.0", "dot-prop": "^6.0.1", "env-paths": "^2.2.1", "jsonwebtoken": "^8.5.1", @@ -2283,6 +2387,22 @@ "typescript": "^4.1.3" } }, + "@types/amqplib": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.8.1.tgz", + "integrity": "sha512-8dCjF+dHZ8Y6JOoHD1BMnxP0quAncvZq4wA/lS072NjX9vIzVRSMcmfKy2Os8ZQ8VWWp74MD09GMbVbKS6/Fxw==", + "dev": true, + "requires": { + "@types/bluebird": "*", + "@types/node": "*" + } + }, + "@types/bluebird": { + "version": "3.5.36", + "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.36.tgz", + "integrity": "sha512-HBNx4lhkxN7bx6P0++W8E289foSu8kO8GCk2unhuVggO+cE7rh9DhZUyPhUxNRG9m+5B5BTKxZQ5ZP92x/mx9Q==", + "dev": true + }, "@types/bson": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/@types/bson/-/bson-4.0.3.tgz", @@ -2385,6 +2505,47 @@ "uri-js": "^4.2.2" } }, + "amqplib": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.8.0.tgz", + "integrity": "sha512-icU+a4kkq4Y1PS4NNi+YPDMwdlbFcZ1EZTQT2nigW3fvOb6AOgUQ9+Mk4ue0Zu5cBg/XpDzB40oH10ysrk2dmA==", + "requires": { + "bitsyntax": "~0.1.0", + "bluebird": "^3.7.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "safe-buffer": "~5.2.1", + "url-parse": "~1.5.1" + }, + "dependencies": { + "bluebird": { + "version": "3.7.2", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.7.2.tgz", + "integrity": "sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==" + }, + "isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + }, + "readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + } + } + }, "ansi-styles": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", @@ -2441,6 +2602,23 @@ "file-uri-to-path": "1.0.0" } }, + "bitsyntax": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.1.0.tgz", + "integrity": "sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==", + "requires": { + "buffer-more-ints": "~1.0.0", + "debug": "~2.6.9", + "safe-buffer": "~5.1.2" + }, + "dependencies": { + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + } + } + }, "bl": { "version": "2.2.1", "resolved": "https://registry.npmjs.org/bl/-/bl-2.2.1.tgz", @@ -2507,6 +2685,11 @@ "integrity": "sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==", "dev": true }, + "buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "bytes": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.0.tgz", @@ -3529,6 +3712,11 @@ "resolved": "https://registry.npmjs.org/qs/-/qs-6.7.0.tgz", "integrity": "sha512-VCdBRNFTX1fyE7Nb6FYoURo/SPe62QCaAyzJvUjwRaIsc+NePBEniHlvxFmmX56+HZphIGtV0XeCirBtpDrTyQ==" }, + "querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, "range-parser": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz", @@ -3625,6 +3813,11 @@ "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==" }, + "requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=" + }, "resolve": { "version": "1.20.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.20.0.tgz", @@ -3942,6 +4135,15 @@ "punycode": "^2.1.0" } }, + "url-parse": { + "version": "1.5.3", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.3.tgz", + "integrity": "sha512-IIORyIQD9rvj0A4CLWsHkBBJuNqWpFQe224b6j9t/ABmquIS0qDU2pY6kl6AuOrL5OkCXHMCFNe1jBcuAggjvQ==", + "requires": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", diff --git a/package.json b/package.json
index 108b59f9..4a6043d6 100644 --- a/package.json +++ b/package.json
@@ -13,8 +13,9 @@ "author": "Fosscord", "license": "ISC", "dependencies": { - "@fosscord/server-util": "^1.3.43", + "@fosscord/server-util": "^1.3.51", "ajv": "^8.5.0", + "amqplib": "^0.8.0", "dotenv": "^8.2.0", "jsonwebtoken": "^8.5.1", "lambert-server": "^1.2.8", @@ -26,6 +27,7 @@ "ws": "^7.4.2" }, "devDependencies": { + "@types/amqplib": "^0.8.1", "@types/jsonwebtoken": "^8.5.0", "@types/mongoose-autopopulate": "^0.10.1", "@types/uuid": "^8.3.0", diff --git a/src/Server.ts b/src/Server.ts
index 06cd74de..d4b3271c 100644 --- a/src/Server.ts +++ b/src/Server.ts
@@ -1,7 +1,7 @@ import "missing-native-js-functions"; import dotenv from "dotenv"; dotenv.config(); -import { Config, db } from "@fosscord/server-util"; +import { Config, db, RabbitMQ } from "@fosscord/server-util"; import { Server as WebSocketServer } from "ws"; import { Connection } from "./events/Connection"; import http from "http"; @@ -40,6 +40,7 @@ export class Server { await (db as Promise<Connection>); await this.setupSchema(); await Config.init(); + await RabbitMQ.init(); console.log("[Database] connected"); if (!this.server.listening) { this.server.listen(this.port); diff --git a/src/events/Connection.ts b/src/events/Connection.ts
index 473c992c..1ef9fb48 100644 --- a/src/events/Connection.ts +++ b/src/events/Connection.ts
@@ -40,6 +40,7 @@ export async function Connection(this: Server, socket: WebSocket, request: Incom socket.deflate.on("data", (chunk) => socket.send(chunk)); } + socket.permissions = {}; socket.sequence = 0; setHeartbeat(socket); diff --git a/src/events/Message.ts b/src/events/Message.ts
index 51c5a294..2ca82b3c 100644 --- a/src/events/Message.ts +++ b/src/events/Message.ts
@@ -25,8 +25,6 @@ export async function Message(this: WebSocket, buffer: Data) { check.call(this, PayloadSchema, data); - console.log(data); - // @ts-ignore const OPCodeHandler = OPCodeHandlers[data.op]; if (!OPCodeHandler) { @@ -36,6 +34,8 @@ export async function Message(this: WebSocket, buffer: Data) { return; } + console.log("got: " + OPCodeHandler.name); + try { return await OPCodeHandler.call(this, data); } catch (error) { diff --git a/src/listener/listener.ts b/src/listener/listener.ts
index ae15c971..692c12b6 100644 --- a/src/listener/listener.ts +++ b/src/listener/listener.ts
@@ -1,8 +1,19 @@ -import { db, Event, MongooseCache, UserModel, getPermission, Permissions, ChannelModel } from "@fosscord/server-util"; +import { + db, + Event, + MongooseCache, + UserModel, + getPermission, + Permissions, + ChannelModel, + RabbitMQ, + EVENT, +} from "@fosscord/server-util"; import { OPCODES } from "../util/Constants"; import { Send } from "../util/Send"; import WebSocket from "../util/WebSocket"; import "missing-native-js-functions"; +import { ConsumeMessage } from "amqplib"; // TODO: close connection on Invalidated Token // TODO: check intent @@ -35,40 +46,176 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = []) ]; } +// TODO: use already required guilds/channels of Identify and don't fetch them again export async function setupListener(this: WebSocket) { - const channels = await ChannelModel.find({ recipient_ids: this.user_id }, { id: true }).exec(); - console.log( - "subscribe to channels", - channels.map((x) => x.id) - ); - const user = await UserModel.findOne({ id: this.user_id }).lean().exec(); - var guilds = user!.guilds; - - const eventStream = new MongooseCache( - db.collection("events"), - getPipeline.call( - this, - guilds, - channels.map((x) => x.id) - ), - { - onlyEvents: true, + const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec(); + const channels = await ChannelModel.find( + { $or: [{ recipient_ids: this.user_id }, { guild_id: { $in: user.guilds } }] }, + { id: true, permission_overwrites: true } + ).exec(); + const dm_channels = channels.filter((x) => !x.guild_id); + const guild_channels = channels.filter((x) => x.guild_id); + + if (RabbitMQ.connection) { + this.rabbitCh = await RabbitMQ.connection.createChannel(); + this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this))); + + for (const channel of dm_channels) { + this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this))); + } + for (const guild of user.guilds) { + // contains guild and dm channels + + getPermission(this.user_id, guild) + .then((x) => { + this.permissions[guild] = x; + this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this))); + for (const channel of guild_channels) { + if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) { + this.rabbitCh!.assertQueue(channel.id).then(() => + this.rabbitCh!.consume(channel.id, consume.bind(this)) + ); + } + } + }) + .catch((e) => {}); } - ); - await eventStream.init(); - eventStream.on("insert", (document: Event) => dispatch.call(this, document, { eventStream, guilds })); + this.once("close", () => { + this.rabbitCh!.close(); + }); + } else { + const eventStream = new MongooseCache( + db.collection("events"), + getPipeline.call( + this, + user.guilds, + channels.map((x) => x.id) + ), + { + onlyEvents: true, + } + ); - this.once("close", () => eventStream.destroy()); + await eventStream.init(); + eventStream.on("insert", (document: Event) => + dispatch.call(this, document, { eventStream, guilds: user.guilds }) + ); + + this.once("close", () => eventStream.destroy()); + } +} + +// TODO: use rabbitmq to only receive events that are included in intents +function consume(this: WebSocket, opts: ConsumeMessage | null) { + if (!opts) return; + if (!this.rabbitCh) return; + const data = JSON.parse(opts.content.toString()); + const id = data.id as string; + const event = opts.properties.type as EVENT; + const permission = this.permissions[id] || new Permissions("ADMINISTRATOR"); // default permission for dm + + console.log("rabbitmq event", event); + + // subscription managment + switch (event) { + case "CHANNEL_DELETE": + case "GUILD_DELETE": + this.rabbitCh.cancel(id); + break; + case "CHANNEL_CREATE": + // TODO: check if user has permission to channel + case "GUILD_CREATE": + this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + break; + case "CHANNEL_UPDATE": + // @ts-ignore + const exists = this.rabbitCh.consumers[id]; + if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { + if (exists) break; + this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this))); + } else { + if (!exists) break; + this.rabbitCh.cancel(id); + } + break; + } + + // permission checking + switch (event) { + case "INVITE_CREATE": + case "INVITE_DELETE": + case "GUILD_INTEGRATIONS_UPDATE": + if (!permission.has("MANAGE_GUILD")) return; + break; + case "WEBHOOKS_UPDATE": + if (!permission.has("MANAGE_WEBHOOKS")) return; + break; + case "GUILD_MEMBER_ADD": + case "GUILD_MEMBER_REMOVE": + case "GUILD_MEMBER_UPDATE": + // only send them, if the user subscribed for this part of the member list, or is a bot + case "PRESENCE_UPDATE": // exception if user is friend + break; + case "GUILD_BAN_ADD": + case "GUILD_BAN_REMOVE": + if (!permission.has("BAN_MEMBERS")) break; + break; + case "VOICE_STATE_UPDATE": + case "MESSAGE_CREATE": + case "MESSAGE_DELETE": + case "MESSAGE_DELETE_BULK": + case "MESSAGE_UPDATE": + case "CHANNEL_PINS_UPDATE": + case "MESSAGE_REACTION_ADD": + case "MESSAGE_REACTION_REMOVE": + case "MESSAGE_REACTION_REMOVE_ALL": + case "MESSAGE_REACTION_REMOVE_EMOJI": + case "TYPING_START": + // only gets send if the user is alowed to view the current channel + if (!permission.has("VIEW_CHANNEL")) return; + break; + case "GUILD_CREATE": + case "GUILD_DELETE": + case "GUILD_UPDATE": + case "GUILD_ROLE_CREATE": + case "GUILD_ROLE_UPDATE": + case "GUILD_ROLE_DELETE": + case "CHANNEL_CREATE": + case "CHANNEL_DELETE": + case "CHANNEL_UPDATE": + case "GUILD_EMOJI_UPDATE": + case "READY": // will be sent by the gateway + case "USER_UPDATE": + case "APPLICATION_COMMAND_CREATE": + case "APPLICATION_COMMAND_DELETE": + case "APPLICATION_COMMAND_UPDATE": + default: + // always gets sent + // Any events not defined in an intent are considered "passthrough" and will always be sent + break; + } + + Send(this, { + op: OPCODES.Dispatch, + t: event, + d: data, + s: this.sequence++, + }); + this.rabbitCh.ack(opts); } // TODO: cache permission +// we shouldn't fetch the permission for every event, as a message send event with many channel members would result in many thousand db queries. +// instead we should calculate all (guild, channel) permissions once and dynamically update if it changes. +// TODO: only subscribe for events that are in the connection intents +// TODO: only subscribe for channel/guilds that the user has access to (and re-subscribe if it changes) export async function dispatch(this: WebSocket, document: Event, { eventStream, guilds }: DispatchOpts) { var permission = new Permissions("ADMINISTRATOR"); // default permission for dms console.log("event", document); var channel_id = document.channel_id || document.data?.channel_id; - + // TODO: clean up if (document.event === "GUILD_CREATE") { guilds.push(document.data.id); eventStream.changeStream(getPipeline.call(this, guilds)); diff --git a/src/opcodes/LazyRequest.ts b/src/opcodes/LazyRequest.ts
index 8a7bb8c4..b1d553b9 100644 --- a/src/opcodes/LazyRequest.ts +++ b/src/opcodes/LazyRequest.ts
@@ -23,6 +23,7 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) { const { guild_id, typing, channels, activities } = d as LazyRequest; const permissions = await getPermission(this.user_id, guild_id); + permissions.hasThrow("VIEW_CHANNEL"); // MongoDB query to retrieve all hoisted roles and join them with the members and users collection const roles = toObject( @@ -70,16 +71,16 @@ export async function onLazyRequest(this: WebSocket, { d }: Payload) { const items = []; for (const role of roles) { - items.push({ - group: { - count: role.members.length, - id: role.id === guild_id ? "online" : role.name - } - }); - for (const member of role.members) { - member.roles.remove(guild_id); - items.push({ member }); - } + items.push({ + group: { + count: role.members.length, + id: role.id === guild_id ? "online" : role.name, + }, + }); + for (const member of role.members) { + member.roles.remove(guild_id); + items.push({ member }); + } } return Send(this, { diff --git a/src/util/WebSocket.ts b/src/util/WebSocket.ts
index 347d78cb..11db47e0 100644 --- a/src/util/WebSocket.ts +++ b/src/util/WebSocket.ts
@@ -1,6 +1,7 @@ -import { Intents } from "@fosscord/server-util"; +import { Intents, Permissions } from "@fosscord/server-util"; import WS, { Server, Data } from "ws"; import { Deflate } from "zlib"; +import { Channel } from "amqplib"; interface WebSocket extends WS { version: number; @@ -14,6 +15,8 @@ interface WebSocket extends WS { readyTimeout: NodeJS.Timeout; intents: Intents; sequence: number; + rabbitCh?: Channel; + permissions: Record<string, Permissions>; } export default WebSocket;