diff options
author | Flam3rboy <34555296+Flam3rboy@users.noreply.github.com> | 2021-10-17 00:37:06 +0200 |
---|---|---|
committer | Flam3rboy <34555296+Flam3rboy@users.noreply.github.com> | 2021-10-17 00:37:06 +0200 |
commit | d53a4048d04eec470e08aa7009013a5da2048953 (patch) | |
tree | 1f464987898e04e28db3e981bb12122d19fb02f9 | |
parent | :arrow_up: update mnjsf (diff) | |
download | server-d53a4048d04eec470e08aa7009013a5da2048953.tar.xz |
:zap: improve performance of identify + listener
-rw-r--r-- | gateway/src/listener/listener.ts | 115 | ||||
-rw-r--r-- | gateway/src/opcodes/Identify.ts | 116 |
2 files changed, 152 insertions, 79 deletions
diff --git a/gateway/src/listener/listener.ts b/gateway/src/listener/listener.ts index c5b1a576..79659a1f 100644 --- a/gateway/src/listener/listener.ts +++ b/gateway/src/listener/listener.ts @@ -6,6 +6,9 @@ import { EventOpts, ListenEventOpts, Member, + EVENTEnum, + Relationship, + RelationshipType, } from "@fosscord/util"; import { OPCODES } from "../util/Constants"; import { Send } from "../util/Send"; @@ -21,22 +24,45 @@ import { Recipient } from "@fosscord/util"; // Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards // https://discord.com/developers/docs/topics/gateway#sharding +export function handlePresenceUpdate( + this: WebSocket, + { event, acknowledge, data }: EventOpts +) { + acknowledge?.(); + if (event === EVENTEnum.PresenceUpdate) { + return Send(this, { + op: OPCODES.Dispatch, + t: event, + d: data, + s: this.sequence++, + }); + } +} + // TODO: use already queried guilds/channels of Identify and don't fetch them again export async function setupListener(this: WebSocket) { - const members = await Member.find({ - where: { id: this.user_id }, - relations: ["guild", "guild.channels"], - }); + const [members, recipients, relationships] = await Promise.all([ + Member.find({ + where: { id: this.user_id }, + relations: ["guild", "guild.channels"], + }), + Recipient.find({ + where: { user_id: this.user_id, closed: false }, + relations: ["channel"], + }), + Relationship.find({ + from_id: this.user_id, + type: RelationshipType.friends, + }), + ]); + const guilds = members.map((x) => x.guild); - const recipients = await Recipient.find({ - where: { user_id: this.user_id, closed: false }, - relations: ["channel"], - }); const dm_channels = recipients.map((x) => x.channel); const opts: { acknowledge: boolean; channel?: AMQChannel } = { acknowledge: true, }; + this.listen_options = opts; const consumer = consume.bind(this); if (RabbitMQ.connection) { @@ -47,45 +73,44 @@ export async function setupListener(this: WebSocket) { this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts); - for (const channel of dm_channels) { + relationships.forEach(async (relationship) => { + this.events[relationship.to_id] = await listenEvent( + relationship.to_id, + handlePresenceUpdate.bind(this), + opts + ); + }); + + dm_channels.forEach(async (channel) => { this.events[channel.id] = await listenEvent(channel.id, consumer, opts); - } + }); - for (const guild of guilds) { - // contains guild and dm channels + guilds.forEach(async (guild) => { + const permission = await getPermission(this.user_id, guild.id); + this.permissions[guild.id] = permission; + this.events[guild.id] = await listenEvent(guild.id, consumer, opts); - getPermission(this.user_id, guild.id) - .then(async (x) => { - this.permissions[guild.id] = x; - this.listeners; - this.events[guild.id] = await listenEvent( - guild.id, + guild.channels.forEach(async (channel) => { + if ( + permission + .overwriteChannel(channel.permission_overwrites!) + .has("VIEW_CHANNEL") + ) { + this.events[channel.id] = await listenEvent( + channel.id, consumer, opts ); - - for (const channel of guild.channels) { - if ( - x - .overwriteChannel(channel.permission_overwrites!) - .has("VIEW_CHANNEL") - ) { - this.events[channel.id] = await listenEvent( - channel.id, - consumer, - opts - ); - } - } - }) - .catch((e) => - console.log("couldn't get permission for guild " + guild, e) - ); - } + } + }); + }); this.once("close", () => { if (opts.channel) opts.channel.close(); - else Object.values(this.events).forEach((x) => x()); + else { + Object.values(this.events).forEach((x) => x()); + Object.values(this.member_events).forEach((x) => x()); + } }); } @@ -97,10 +122,23 @@ async function consume(this: WebSocket, opts: EventOpts) { const consumer = consume.bind(this); const listenOpts = opts as ListenEventOpts; + opts.acknowledge?.(); // console.log("event", event); // subscription managment switch (event) { + case "GUILD_MEMBER_REMOVE": + this.member_events[data.user.id]?.(); + delete this.member_events[data.user.id]; + case "GUILD_MEMBER_ADD": + if (this.member_events[data.user.id]) break; // already subscribed + this.member_events[data.user.id] = await listenEvent( + data.user.id, + handlePresenceUpdate.bind(this), + this.listen_options + ); + break; + case "RELATIONSHIP_REMOVE": case "CHANNEL_DELETE": case "GUILD_DELETE": delete this.events[id]; @@ -196,5 +234,4 @@ async function consume(this: WebSocket, opts: EventOpts) { d: data, s: this.sequence++, }); - opts.acknowledge?.(); } diff --git a/gateway/src/opcodes/Identify.ts b/gateway/src/opcodes/Identify.ts index 006dc83c..bd7fc894 100644 --- a/gateway/src/opcodes/Identify.ts +++ b/gateway/src/opcodes/Identify.ts @@ -13,6 +13,11 @@ import { PrivateUserProjection, ReadState, Application, + emitEvent, + SessionsReplace, + PrivateSessionProjection, + MemberPrivateProjection, + PresenceUpdateEvent, } from "@fosscord/util"; import { Send } from "../util/Send"; import { CLOSECODES, OPCODES } from "../util/Constants"; @@ -43,11 +48,56 @@ export async function onIdentify(this: WebSocket, data: Payload) { } this.user_id = decoded.id; - const user = await User.findOneOrFail({ - where: { id: this.user_id }, - relations: ["relationships", "relationships.to"], - select: [...PrivateUserProjection, "relationships"], - }); + const session_id = genSessionId(); + this.session_id = session_id; //Set the session of the WebSocket object + + const [user, read_states, members, recipients, session, application] = + await Promise.all([ + User.findOneOrFail({ + where: { id: this.user_id }, + relations: ["relationships", "relationships.to"], + select: [...PrivateUserProjection, "relationships"], + }), + ReadState.find({ user_id: this.user_id }), + Member.find({ + where: { id: this.user_id }, + select: MemberPrivateProjection, + relations: [ + "guild", + "guild.channels", + "guild.emojis", + "guild.emojis.user", + "guild.roles", + "guild.stickers", + "user", + "roles", + ], + }), + Recipient.find({ + where: { user_id: this.user_id, closed: false }, + relations: [ + "channel", + "channel.recipients", + "channel.recipients.user", + ], + // TODO: public user selection + }), + // save the session and delete it when the websocket is closed + new Session({ + user_id: this.user_id, + session_id: session_id, + // TODO: check if status is only one of: online, dnd, offline, idle + status: identify.presence?.status || "online", //does the session always start as online? + client_info: { + //TODO read from identity + client: "desktop", + os: identify.properties?.os, + version: 0, + }, + }).save(), + Application.findOne({ id: this.user_id }), + ]); + if (!user) return this.close(CLOSECODES.Authentication_failed); if (!identify.intents) identify.intents = BigInt("0b11111111111111"); @@ -68,19 +118,6 @@ export async function onIdentify(this: WebSocket, data: Payload) { } var users: PublicUser[] = []; - const members = await Member.find({ - where: { id: this.user_id }, - relations: [ - "guild", - "guild.channels", - "guild.emojis", - "guild.emojis.user", - "guild.roles", - "guild.stickers", - "user", - "roles", - ], - }); const merged_members = members.map((x: Member) => { return [ { @@ -112,11 +149,6 @@ export async function onIdentify(this: WebSocket, data: Payload) { const user_guild_settings_entries = members.map((x) => x.settings); - const recipients = await Recipient.find({ - where: { user_id: this.user_id, closed: false }, - relations: ["channel", "channel.recipients", "channel.recipients.user"], - // TODO: public user selection - }); const channels = recipients.map((x) => { // @ts-ignore x.channel.recipients = x.channel.recipients?.map((x) => x.user); @@ -144,24 +176,28 @@ export async function onIdentify(this: WebSocket, data: Payload) { users.push(public_related_user); } - const session_id = genSessionId(); - this.session_id = session_id; //Set the session of the WebSocket object - const session = new Session({ - user_id: this.user_id, - session_id: session_id, - status: "online", //does the session always start as online? - client_info: { - //TODO read from identity - client: "desktop", - os: "linux", - version: 0, - }, + setImmediate(async () => { + // run in seperate "promise context" because ready payload is not dependent on those events + emitEvent({ + event: "SESSIONS_REPLACE", + user_id: this.user_id, + data: await Session.find({ + where: { user_id: this.user_id }, + select: PrivateSessionProjection, + }), + } as SessionsReplace); + emitEvent({ + event: "PRESENCE_UPDATE", + user_id: this.user_id, + data: { + user: await User.getPublicUser(this.user_id), + activities: session.activities, + client_status: session?.client_info, + status: session.status, + }, + } as PresenceUpdateEvent); }); - //We save the session and we delete it when the websocket is closed - await session.save(); - - const read_states = await ReadState.find({ user_id: this.user_id }); read_states.forEach((s: any) => { s.id = s.channel_id; delete s.user_id; @@ -192,7 +228,7 @@ export async function onIdentify(this: WebSocket, data: Payload) { const d: ReadyEventData = { v: 8, - application: await Application.findOne({ id: this.user_id }), + application, user: privateUser, user_settings: user.settings, // @ts-ignore |