diff --git a/bundle/scripts/benchmark/connections.js b/bundle/scripts/benchmark/connections.js
new file mode 100644
index 00000000..efc1bcb6
--- /dev/null
+++ b/bundle/scripts/benchmark/connections.js
@@ -0,0 +1,58 @@
+const cluster = require("cluster");
+const WebSocket = require("ws");
+const endpoint = process.env.GATEWAY || "ws://localhost:3001";
+const connections = Number(process.env.CONNECTIONS) || 50;
+const threads = Number(process.env.THREADS) || require("os").cpus().length || 1;
+const token = process.env.TOKEN;
+
+if (!token) {
+ console.error("TOKEN env var missing");
+ process.exit();
+}
+
+if (cluster.isMaster) {
+ for (let i = 0; i < threads; i++) {
+ cluster.fork();
+ }
+
+ cluster.on("exit", (worker, code, signal) => {
+ console.log(`worker ${worker.process.pid} died`);
+ });
+} else {
+ for (let i = 0; i < connections; i++) {
+ connect();
+ }
+}
+
+function connect() {
+ const client = new WebSocket(endpoint);
+ client.on("message", (data) => {
+ data = JSON.parse(data);
+
+ switch (data.op) {
+ case 10:
+ client.interval = setInterval(() => {
+ client.send(JSON.stringify({ op: 1 }));
+ }, data.d.heartbeat_interval);
+
+ client.send(
+ JSON.stringify({
+ op: 2,
+ d: {
+ token,
+ properties: {},
+ },
+ })
+ );
+
+ break;
+ }
+ });
+ client.once("close", (code, reason) => {
+ clearInterval(client.interval);
+ connect();
+ });
+ client.on("error", (err) => {
+ // console.log(err);
+ });
+}
diff --git a/bundle/scripts/benchmark/index.js b/bundle/scripts/benchmark/index.js
new file mode 100644
index 00000000..37ac5633
--- /dev/null
+++ b/bundle/scripts/benchmark/index.js
@@ -0,0 +1,4 @@
+require("dotenv").config();
+
+require("./connections");
+require("./messages");
diff --git a/bundle/scripts/benchmark/messages.js b/bundle/scripts/benchmark/messages.js
new file mode 100644
index 00000000..70b786d1
--- /dev/null
+++ b/bundle/scripts/benchmark/messages.js
@@ -0,0 +1 @@
+// TODO
diff --git a/bundle/scripts/build.js b/bundle/scripts/build.js
index e8eb24b8..dfbaec15 100644
--- a/bundle/scripts/build.js
+++ b/bundle/scripts/build.js
@@ -36,8 +36,8 @@ console.log(
execSync(
"node \"" +
path.join(__dirname, "..", "node_modules", "typescript", "lib", "tsc.js") +
- "\" -p " +
- path.join(__dirname, ".."),
+ "\" -p \"" +
+ path.join(__dirname, "..") + "\"",
{
cwd: path.join(__dirname, ".."),
shell: true,
diff --git a/bundle/src/stats.ts b/bundle/src/stats.ts
index 18bb85ca..7928de89 100644
--- a/bundle/src/stats.ts
+++ b/bundle/src/stats.ts
@@ -31,5 +31,6 @@ export function initStats() {
process.memoryUsage().rss / 1024 / 1024
)}mb/${memory.totalMemMb.toFixed(0)}mb ${networkUsage}`
);
- }, 1000 * 10);
+ // TODO: node-os-utils might have a memory leak, more investigation needed
+ }, 1000 * 60 * 5);
}
diff --git a/gateway/src/Server.ts b/gateway/src/Server.ts
index cf4f906c..7e1489be 100644
--- a/gateway/src/Server.ts
+++ b/gateway/src/Server.ts
@@ -32,7 +32,6 @@ export class Server {
}
this.server.on("upgrade", (request, socket, head) => {
- console.log("socket requests upgrade", request.url);
// @ts-ignore
this.ws.handleUpgrade(request, socket, head, (socket) => {
this.ws.emit("connection", socket, request);
diff --git a/gateway/src/events/Close.ts b/gateway/src/events/Close.ts
index 1299ad5c..5c1bd292 100644
--- a/gateway/src/events/Close.ts
+++ b/gateway/src/events/Close.ts
@@ -1,10 +1,13 @@
import { WebSocket } from "@fosscord/gateway";
-import { Message } from "./Message";
import { Session } from "@fosscord/util";
export async function Close(this: WebSocket, code: number, reason: string) {
console.log("[WebSocket] closed", code, reason);
if (this.session_id) await Session.delete({ session_id: this.session_id });
- // @ts-ignore
- this.off("message", Message);
+ if (this.heartbeatTimeout) clearTimeout(this.heartbeatTimeout);
+ if (this.readyTimeout) clearTimeout(this.readyTimeout);
+
+ this.deflate?.close();
+
+ this.removeAllListeners();
}
diff --git a/gateway/src/events/Connection.ts b/gateway/src/events/Connection.ts
index 2cf22f7d..9bb034f0 100644
--- a/gateway/src/events/Connection.ts
+++ b/gateway/src/events/Connection.ts
@@ -28,6 +28,7 @@ export async function Connection(
socket.on("close", Close);
// @ts-ignore
socket.on("message", Message);
+ console.log(`[Gateway] Connections: ${this.clients.size}`);
const { searchParams } = new URL(`http://localhost${request.url}`);
// @ts-ignore
diff --git a/gateway/src/events/Message.ts b/gateway/src/events/Message.ts
index af318bfd..acc39bb9 100644
--- a/gateway/src/events/Message.ts
+++ b/gateway/src/events/Message.ts
@@ -37,8 +37,6 @@ export async function Message(this: WebSocket, buffer: WS.Data) {
return;
}
- console.log("[Gateway] Opcode " + OPCODES[data.op]);
-
try {
return await OPCodeHandler.call(this, data);
} catch (error) {
diff --git a/gateway/src/opcodes/Identify.ts b/gateway/src/opcodes/Identify.ts
index 673dde9d..c91ca5dd 100644
--- a/gateway/src/opcodes/Identify.ts
+++ b/gateway/src/opcodes/Identify.ts
@@ -214,8 +214,6 @@ export async function onIdentify(this: WebSocket, data: Payload) {
// application // TODO for applications
};
- console.log("Send ready");
-
// TODO: send real proper data structure
await Send(this, {
op: OPCODES.Dispatch,
diff --git a/gateway/src/util/Send.ts b/gateway/src/util/Send.ts
index 4defa898..196d4205 100644
--- a/gateway/src/util/Send.ts
+++ b/gateway/src/util/Send.ts
@@ -18,6 +18,9 @@ export async function Send(socket: WebSocket, data: Payload) {
}
return new Promise((res, rej) => {
+ if (socket.readyState !== 1) {
+ return rej("socket not open");
+ }
socket.send(buffer, (err: any) => {
if (err) return rej(err);
return res(null);
diff --git a/util/src/util/Event.ts b/util/src/util/Event.ts
index bf9547b1..8ed009d5 100644
--- a/util/src/util/Event.ts
+++ b/util/src/util/Event.ts
@@ -46,7 +46,9 @@ export async function listenEvent(event: string, callback: (event: EventOpts) =>
} else {
const cancel = () => {
events.removeListener(event, callback);
+ events.setMaxListeners(events.getMaxListeners() - 1);
};
+ events.setMaxListeners(events.getMaxListeners() + 1);
events.addListener(event, (opts) => callback({ ...opts, cancel }));
return cancel;
|