diff --git a/src/listener/listener.ts b/src/listener/listener.ts
index 692c12b6..6a6967d6 100644
--- a/src/listener/listener.ts
+++ b/src/listener/listener.ts
@@ -46,6 +46,17 @@ function getPipeline(this: WebSocket, guilds: string[], channels: string[] = [])
];
}
+async function rabbitListen(this: WebSocket, id: string) {
+ await this.rabbitCh!.assertExchange(id, "fanout", { durable: false });
+ const q = await this.rabbitCh!.assertQueue("", { exclusive: true, autoDelete: true });
+
+ this.rabbitCh!.bindQueue(q.queue, id, "");
+ this.rabbitCh!.consume(q.queue, consume.bind(this), {
+ noAck: false,
+ });
+ this.rabbitCh!.queues[id] = q.queue;
+}
+
// TODO: use already required guilds/channels of Identify and don't fetch them again
export async function setupListener(this: WebSocket) {
const user = await UserModel.findOne({ id: this.user_id }, { guilds: true }).exec();
@@ -57,11 +68,14 @@ export async function setupListener(this: WebSocket) {
const guild_channels = channels.filter((x) => x.guild_id);
if (RabbitMQ.connection) {
+ // @ts-ignore
this.rabbitCh = await RabbitMQ.connection.createChannel();
- this.rabbitCh!.assertQueue(this.user_id).then(() => this.rabbitCh!.consume(this.user_id, consume.bind(this)));
+ this.rabbitCh!.queues = {};
+
+ rabbitListen.call(this, this.user_id);
for (const channel of dm_channels) {
- this.rabbitCh!.assertQueue(channel.id).then(() => this.rabbitCh!.consume(channel.id, consume.bind(this)));
+ rabbitListen.call(this, channel.id);
}
for (const guild of user.guilds) {
// contains guild and dm channels
@@ -69,12 +83,10 @@ export async function setupListener(this: WebSocket) {
getPermission(this.user_id, guild)
.then((x) => {
this.permissions[guild] = x;
- this.rabbitCh!.assertQueue(guild).then(() => this.rabbitCh!.consume(guild, consume.bind(this)));
+ rabbitListen.call(this, guild);
for (const channel of guild_channels) {
if (x.overwriteChannel(channel.permission_overwrites).has("VIEW_CHANNEL")) {
- this.rabbitCh!.assertQueue(channel.id).then(() =>
- this.rabbitCh!.consume(channel.id, consume.bind(this))
- );
+ rabbitListen.call(this, channel.id);
}
}
})
@@ -126,17 +138,19 @@ function consume(this: WebSocket, opts: ConsumeMessage | null) {
case "CHANNEL_CREATE":
// TODO: check if user has permission to channel
case "GUILD_CREATE":
- this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this)));
+ rabbitListen.call(this, id);
break;
case "CHANNEL_UPDATE":
+ const queue_id = this.rabbitCh.queues[id];
// @ts-ignore
const exists = this.rabbitCh.consumers[id];
if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) {
if (exists) break;
- this.rabbitCh!.assertQueue(id).then(() => this.rabbitCh!.consume(id, consume.bind(this)));
+ rabbitListen.call(this, id);
} else {
if (!exists) break;
- this.rabbitCh.cancel(id);
+ this.rabbitCh.cancel(queue_id);
+ this.rabbitCh.unbindQueue(queue_id, id, "");
}
break;
}
diff --git a/src/util/WebSocket.ts b/src/util/WebSocket.ts
index 11db47e0..1bd0ff2f 100644
--- a/src/util/WebSocket.ts
+++ b/src/util/WebSocket.ts
@@ -15,7 +15,7 @@ interface WebSocket extends WS {
readyTimeout: NodeJS.Timeout;
intents: Intents;
sequence: number;
- rabbitCh?: Channel;
+ rabbitCh?: Channel & { queues: Record<string, string> };
permissions: Record<string, Permissions>;
}
|