summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--api/src/start.ts2
-rw-r--r--bundle/src/Server.ts3
-rw-r--r--bundle/src/start.ts65
-rw-r--r--util/src/util/Event.ts28
4 files changed, 66 insertions, 32 deletions
diff --git a/api/src/start.ts b/api/src/start.ts
index d799c190..717e1b8f 100644
--- a/api/src/start.ts
+++ b/api/src/start.ts
@@ -7,7 +7,7 @@ config();
 import { FosscordServer } from "./Server";
 import cluster from "cluster";
 import os from "os";
-const cores = Number(process.env.threads) || os.cpus().length;
+const cores = Number(process.env.THREADS) || os.cpus().length;
 
 if (cluster.isMaster && process.env.NODE_ENV == "production") {
 	console.log(`Primary ${process.pid} is running`);
diff --git a/bundle/src/Server.ts b/bundle/src/Server.ts
index d541735f..f29cd649 100644
--- a/bundle/src/Server.ts
+++ b/bundle/src/Server.ts
@@ -12,7 +12,7 @@ import { Config, initDatabase } from "@fosscord/util";
 const app = express();
 const server = http.createServer();
 const port = Number(process.env.PORT) || 3001;
-const production = false;
+const production = true;
 server.on("request", app);
 
 // @ts-ignore
@@ -23,6 +23,7 @@ const cdn = new CDNServer({ server, port, production, app });
 const gateway = new Gateway.Server({ server, port, production });
 
 async function main() {
+	server.listen(port);
 	await initDatabase();
 	await Config.init();
 	// only set endpointPublic, if not already set
diff --git a/bundle/src/start.ts b/bundle/src/start.ts
index 8e7c3129..ee914b8b 100644
--- a/bundle/src/start.ts
+++ b/bundle/src/start.ts
@@ -1,6 +1,6 @@
 // process.env.MONGOMS_DEBUG = "true";
 import "reflect-metadata";
-import cluster from "cluster";
+import cluster, { Worker } from "cluster";
 import os from "os";
 import { red, bold, yellow, cyan } from "nanocolors";
 import { initStats } from "./stats";
@@ -8,20 +8,21 @@ import { config } from "dotenv";
 config();
 import { execSync } from "child_process";
 
-// TODO: add tcp socket event transmission
-const cores = 1 || Number(process.env.threads) || os.cpus().length;
+// TODO: add socket event transmission
+let cores = Number(process.env.THREADS) || os.cpus().length;
 
-function getCommitOrFail() {
-	try {
-		return execSync("git rev-parse HEAD").toString().trim();
-	} catch (e) {
-		return null;
+if (cluster.isMaster) {
+	function getCommitOrFail() {
+		try {
+			return execSync("git rev-parse HEAD").toString().trim();
+		} catch (e) {
+			return null;
+		}
 	}
-}
-const commit = getCommitOrFail();
+	const commit = getCommitOrFail();
 
-console.log(
-	bold(`
+	console.log(
+		bold(`
 ███████╗ ██████╗ ███████╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗
 ██╔════╝██╔═══██╗██╔════╝██╔════╝██╔════╝██╔═══██╗██╔══██╗██╔══██╗
 █████╗  ██║   ██║███████╗███████╗██║     ██║   ██║██████╔╝██║  ██║
@@ -38,32 +39,40 @@ console.log(
 		)}
 
 Current commit: ${
-		commit !== null
-			? `${cyan(commit)} (${yellow(commit.slice(0, 7))})`
-			: "Unknown (Git cannot be found)"
-	}
+			commit !== null
+				? `${cyan(commit)} (${yellow(commit.slice(0, 7))})`
+				: "Unknown (Git cannot be found)"
+		}
 `)
-);
+	);
 
-if (commit == null)
-	console.log(yellow(`Warning: Git is not installed or not in PATH.`));
+	if (commit == null) {
+		console.log(yellow(`Warning: Git is not installed or not in PATH.`));
+	}
 
-if (cluster.isMaster && !process.env.masterStarted) {
-	process.env.masterStarted = "true";
+	initStats();
 
-	(async () => {
-		initStats();
+	console.log(`[Process] starting with ${cores} threads`);
 
-		if (cores === 1) {
-			require("./Server");
-			return;
-		}
+	if (cores === 1) {
+		require("./Server");
+	} else {
+		process.env.EVENT_TRANSMISSION = "process";
 
 		// Fork workers.
 		for (let i = 0; i < cores; i++) {
 			cluster.fork();
+			console.log(`[Process] worker ${i} started.`);
 		}
 
+		cluster.on("message", (sender: Worker, message: any) => {
+			for (const id in cluster.workers) {
+				const worker = cluster.workers[id];
+				if (worker === sender || !worker) continue;
+				worker.send(message);
+			}
+		});
+
 		cluster.on("exit", (worker: any, code: any, signal: any) => {
 			console.log(
 				`[Worker] ${red(
@@ -72,7 +81,7 @@ if (cluster.isMaster && !process.env.masterStarted) {
 			);
 			cluster.fork();
 		});
-	})();
+	}
 } else {
 	require("./Server");
 }
diff --git a/util/src/util/Event.ts b/util/src/util/Event.ts
index 8ed009d5..bb624051 100644
--- a/util/src/util/Event.ts
+++ b/util/src/util/Event.ts
@@ -15,6 +15,8 @@ export async function emitEvent(payload: Omit<Event, "created_at">) {
 		// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
 		const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
 		if (!successful) throw new Error("failed to send event");
+	} else if (process.env.EVENT_TRANSMISSION === "process") {
+		process.send?.({ type: "event", event: payload, id } as ProcessEvent);
 	} else {
 		events.emit(id, payload);
 	}
@@ -25,6 +27,7 @@ export async function initEvent() {
 	if (RabbitMQ.connection) {
 	} else {
 		// use event emitter
+		// use process messages
 	}
 }
 
@@ -39,17 +42,38 @@ export interface ListenEventOpts {
 	acknowledge?: boolean;
 }
 
+export interface ProcessEvent {
+	type: "event";
+	event: Event;
+	id: string;
+}
+
 export async function listenEvent(event: string, callback: (event: EventOpts) => any, opts?: ListenEventOpts) {
 	if (RabbitMQ.connection) {
 		// @ts-ignore
 		return rabbitListen(opts?.channel || RabbitMQ.channel, event, callback, { acknowledge: opts?.acknowledge });
+	} else if (process.env.EVENT_TRANSMISSION === "process") {
+		const cancel = () => {
+			process.removeListener("message", listener);
+			process.setMaxListeners(process.getMaxListeners() - 1);
+		};
+
+		const listener = (msg: ProcessEvent) => {
+			msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel });
+		};
+
+		process.addListener("message", listener);
+		process.setMaxListeners(process.getMaxListeners() + 1);
+
+		return cancel;
 	} else {
+		const listener = (opts: any) => callback({ ...opts, cancel });
 		const cancel = () => {
-			events.removeListener(event, callback);
+			events.removeListener(event, listener);
 			events.setMaxListeners(events.getMaxListeners() - 1);
 		};
 		events.setMaxListeners(events.getMaxListeners() + 1);
-		events.addListener(event, (opts) => callback({ ...opts, cancel }));
+		events.addListener(event, listener);
 
 		return cancel;
 	}