diff --git a/gateway/src/listener/listener.ts b/gateway/src/listener/listener.ts
index c5b1a576..79659a1f 100644
--- a/gateway/src/listener/listener.ts
+++ b/gateway/src/listener/listener.ts
@@ -6,6 +6,9 @@ import {
EventOpts,
ListenEventOpts,
Member,
+ EVENTEnum,
+ Relationship,
+ RelationshipType,
} from "@fosscord/util";
import { OPCODES } from "../util/Constants";
import { Send } from "../util/Send";
@@ -21,22 +24,45 @@ import { Recipient } from "@fosscord/util";
// Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards
// https://discord.com/developers/docs/topics/gateway#sharding
+export function handlePresenceUpdate(
+ this: WebSocket,
+ { event, acknowledge, data }: EventOpts
+) {
+ acknowledge?.();
+ if (event === EVENTEnum.PresenceUpdate) {
+ return Send(this, {
+ op: OPCODES.Dispatch,
+ t: event,
+ d: data,
+ s: this.sequence++,
+ });
+ }
+}
+
// TODO: use already queried guilds/channels of Identify and don't fetch them again
export async function setupListener(this: WebSocket) {
- const members = await Member.find({
- where: { id: this.user_id },
- relations: ["guild", "guild.channels"],
- });
+ const [members, recipients, relationships] = await Promise.all([
+ Member.find({
+ where: { id: this.user_id },
+ relations: ["guild", "guild.channels"],
+ }),
+ Recipient.find({
+ where: { user_id: this.user_id, closed: false },
+ relations: ["channel"],
+ }),
+ Relationship.find({
+ from_id: this.user_id,
+ type: RelationshipType.friends,
+ }),
+ ]);
+
const guilds = members.map((x) => x.guild);
- const recipients = await Recipient.find({
- where: { user_id: this.user_id, closed: false },
- relations: ["channel"],
- });
const dm_channels = recipients.map((x) => x.channel);
const opts: { acknowledge: boolean; channel?: AMQChannel } = {
acknowledge: true,
};
+ this.listen_options = opts;
const consumer = consume.bind(this);
if (RabbitMQ.connection) {
@@ -47,45 +73,44 @@ export async function setupListener(this: WebSocket) {
this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts);
- for (const channel of dm_channels) {
+ relationships.forEach(async (relationship) => {
+ this.events[relationship.to_id] = await listenEvent(
+ relationship.to_id,
+ handlePresenceUpdate.bind(this),
+ opts
+ );
+ });
+
+ dm_channels.forEach(async (channel) => {
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
- }
+ });
- for (const guild of guilds) {
- // contains guild and dm channels
+ guilds.forEach(async (guild) => {
+ const permission = await getPermission(this.user_id, guild.id);
+ this.permissions[guild.id] = permission;
+ this.events[guild.id] = await listenEvent(guild.id, consumer, opts);
- getPermission(this.user_id, guild.id)
- .then(async (x) => {
- this.permissions[guild.id] = x;
- this.listeners;
- this.events[guild.id] = await listenEvent(
- guild.id,
+ guild.channels.forEach(async (channel) => {
+ if (
+ permission
+ .overwriteChannel(channel.permission_overwrites!)
+ .has("VIEW_CHANNEL")
+ ) {
+ this.events[channel.id] = await listenEvent(
+ channel.id,
consumer,
opts
);
-
- for (const channel of guild.channels) {
- if (
- x
- .overwriteChannel(channel.permission_overwrites!)
- .has("VIEW_CHANNEL")
- ) {
- this.events[channel.id] = await listenEvent(
- channel.id,
- consumer,
- opts
- );
- }
- }
- })
- .catch((e) =>
- console.log("couldn't get permission for guild " + guild, e)
- );
- }
+ }
+ });
+ });
this.once("close", () => {
if (opts.channel) opts.channel.close();
- else Object.values(this.events).forEach((x) => x());
+ else {
+ Object.values(this.events).forEach((x) => x());
+ Object.values(this.member_events).forEach((x) => x());
+ }
});
}
@@ -97,10 +122,23 @@ async function consume(this: WebSocket, opts: EventOpts) {
const consumer = consume.bind(this);
const listenOpts = opts as ListenEventOpts;
+ opts.acknowledge?.();
// console.log("event", event);
// subscription managment
switch (event) {
+ case "GUILD_MEMBER_REMOVE":
+ this.member_events[data.user.id]?.();
+ delete this.member_events[data.user.id];
+ case "GUILD_MEMBER_ADD":
+ if (this.member_events[data.user.id]) break; // already subscribed
+ this.member_events[data.user.id] = await listenEvent(
+ data.user.id,
+ handlePresenceUpdate.bind(this),
+ this.listen_options
+ );
+ break;
+ case "RELATIONSHIP_REMOVE":
case "CHANNEL_DELETE":
case "GUILD_DELETE":
delete this.events[id];
@@ -196,5 +234,4 @@ async function consume(this: WebSocket, opts: EventOpts) {
d: data,
s: this.sequence++,
});
- opts.acknowledge?.();
}
diff --git a/gateway/src/opcodes/Identify.ts b/gateway/src/opcodes/Identify.ts
index 006dc83c..bd7fc894 100644
--- a/gateway/src/opcodes/Identify.ts
+++ b/gateway/src/opcodes/Identify.ts
@@ -13,6 +13,11 @@ import {
PrivateUserProjection,
ReadState,
Application,
+ emitEvent,
+ SessionsReplace,
+ PrivateSessionProjection,
+ MemberPrivateProjection,
+ PresenceUpdateEvent,
} from "@fosscord/util";
import { Send } from "../util/Send";
import { CLOSECODES, OPCODES } from "../util/Constants";
@@ -43,11 +48,56 @@ export async function onIdentify(this: WebSocket, data: Payload) {
}
this.user_id = decoded.id;
- const user = await User.findOneOrFail({
- where: { id: this.user_id },
- relations: ["relationships", "relationships.to"],
- select: [...PrivateUserProjection, "relationships"],
- });
+ const session_id = genSessionId();
+ this.session_id = session_id; //Set the session of the WebSocket object
+
+ const [user, read_states, members, recipients, session, application] =
+ await Promise.all([
+ User.findOneOrFail({
+ where: { id: this.user_id },
+ relations: ["relationships", "relationships.to"],
+ select: [...PrivateUserProjection, "relationships"],
+ }),
+ ReadState.find({ user_id: this.user_id }),
+ Member.find({
+ where: { id: this.user_id },
+ select: MemberPrivateProjection,
+ relations: [
+ "guild",
+ "guild.channels",
+ "guild.emojis",
+ "guild.emojis.user",
+ "guild.roles",
+ "guild.stickers",
+ "user",
+ "roles",
+ ],
+ }),
+ Recipient.find({
+ where: { user_id: this.user_id, closed: false },
+ relations: [
+ "channel",
+ "channel.recipients",
+ "channel.recipients.user",
+ ],
+ // TODO: public user selection
+ }),
+ // save the session and delete it when the websocket is closed
+ new Session({
+ user_id: this.user_id,
+ session_id: session_id,
+ // TODO: check if status is only one of: online, dnd, offline, idle
+ status: identify.presence?.status || "online", //does the session always start as online?
+ client_info: {
+ //TODO read from identity
+ client: "desktop",
+ os: identify.properties?.os,
+ version: 0,
+ },
+ }).save(),
+ Application.findOne({ id: this.user_id }),
+ ]);
+
if (!user) return this.close(CLOSECODES.Authentication_failed);
if (!identify.intents) identify.intents = BigInt("0b11111111111111");
@@ -68,19 +118,6 @@ export async function onIdentify(this: WebSocket, data: Payload) {
}
var users: PublicUser[] = [];
- const members = await Member.find({
- where: { id: this.user_id },
- relations: [
- "guild",
- "guild.channels",
- "guild.emojis",
- "guild.emojis.user",
- "guild.roles",
- "guild.stickers",
- "user",
- "roles",
- ],
- });
const merged_members = members.map((x: Member) => {
return [
{
@@ -112,11 +149,6 @@ export async function onIdentify(this: WebSocket, data: Payload) {
const user_guild_settings_entries = members.map((x) => x.settings);
- const recipients = await Recipient.find({
- where: { user_id: this.user_id, closed: false },
- relations: ["channel", "channel.recipients", "channel.recipients.user"],
- // TODO: public user selection
- });
const channels = recipients.map((x) => {
// @ts-ignore
x.channel.recipients = x.channel.recipients?.map((x) => x.user);
@@ -144,24 +176,28 @@ export async function onIdentify(this: WebSocket, data: Payload) {
users.push(public_related_user);
}
- const session_id = genSessionId();
- this.session_id = session_id; //Set the session of the WebSocket object
- const session = new Session({
- user_id: this.user_id,
- session_id: session_id,
- status: "online", //does the session always start as online?
- client_info: {
- //TODO read from identity
- client: "desktop",
- os: "linux",
- version: 0,
- },
+ setImmediate(async () => {
+ // run in seperate "promise context" because ready payload is not dependent on those events
+ emitEvent({
+ event: "SESSIONS_REPLACE",
+ user_id: this.user_id,
+ data: await Session.find({
+ where: { user_id: this.user_id },
+ select: PrivateSessionProjection,
+ }),
+ } as SessionsReplace);
+ emitEvent({
+ event: "PRESENCE_UPDATE",
+ user_id: this.user_id,
+ data: {
+ user: await User.getPublicUser(this.user_id),
+ activities: session.activities,
+ client_status: session?.client_info,
+ status: session.status,
+ },
+ } as PresenceUpdateEvent);
});
- //We save the session and we delete it when the websocket is closed
- await session.save();
-
- const read_states = await ReadState.find({ user_id: this.user_id });
read_states.forEach((s: any) => {
s.id = s.channel_id;
delete s.user_id;
@@ -192,7 +228,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
const d: ReadyEventData = {
v: 8,
- application: await Application.findOne({ id: this.user_id }),
+ application,
user: privateUser,
user_settings: user.settings,
// @ts-ignore
|