diff options
-rw-r--r-- | api/src/start.ts | 2 | ||||
-rw-r--r-- | bundle/src/Server.ts | 3 | ||||
-rw-r--r-- | bundle/src/start.ts | 65 | ||||
-rw-r--r-- | util/src/util/Event.ts | 28 |
4 files changed, 66 insertions, 32 deletions
diff --git a/api/src/start.ts b/api/src/start.ts index d799c190..717e1b8f 100644 --- a/api/src/start.ts +++ b/api/src/start.ts @@ -7,7 +7,7 @@ config(); import { FosscordServer } from "./Server"; import cluster from "cluster"; import os from "os"; -const cores = Number(process.env.threads) || os.cpus().length; +const cores = Number(process.env.THREADS) || os.cpus().length; if (cluster.isMaster && process.env.NODE_ENV == "production") { console.log(`Primary ${process.pid} is running`); diff --git a/bundle/src/Server.ts b/bundle/src/Server.ts index d541735f..f29cd649 100644 --- a/bundle/src/Server.ts +++ b/bundle/src/Server.ts @@ -12,7 +12,7 @@ import { Config, initDatabase } from "@fosscord/util"; const app = express(); const server = http.createServer(); const port = Number(process.env.PORT) || 3001; -const production = false; +const production = true; server.on("request", app); // @ts-ignore @@ -23,6 +23,7 @@ const cdn = new CDNServer({ server, port, production, app }); const gateway = new Gateway.Server({ server, port, production }); async function main() { + server.listen(port); await initDatabase(); await Config.init(); // only set endpointPublic, if not already set diff --git a/bundle/src/start.ts b/bundle/src/start.ts index 8e7c3129..ee914b8b 100644 --- a/bundle/src/start.ts +++ b/bundle/src/start.ts @@ -1,6 +1,6 @@ // process.env.MONGOMS_DEBUG = "true"; import "reflect-metadata"; -import cluster from "cluster"; +import cluster, { Worker } from "cluster"; import os from "os"; import { red, bold, yellow, cyan } from "nanocolors"; import { initStats } from "./stats"; @@ -8,20 +8,21 @@ import { config } from "dotenv"; config(); import { execSync } from "child_process"; -// TODO: add tcp socket event transmission -const cores = 1 || Number(process.env.threads) || os.cpus().length; +// TODO: add socket event transmission +let cores = Number(process.env.THREADS) || os.cpus().length; -function getCommitOrFail() { - try { - return execSync("git rev-parse HEAD").toString().trim(); - } catch (e) { - return null; +if (cluster.isMaster) { + function getCommitOrFail() { + try { + return execSync("git rev-parse HEAD").toString().trim(); + } catch (e) { + return null; + } } -} -const commit = getCommitOrFail(); + const commit = getCommitOrFail(); -console.log( - bold(` + console.log( + bold(` ███████╗ ██████╗ ███████╗███████╗ ██████╗ ██████╗ ██████╗ ██████╗ ██╔════╝██╔═══██╗██╔════╝██╔════╝██╔════╝██╔═══██╗██╔══██╗██╔══██╗ █████╗ ██║ ██║███████╗███████╗██║ ██║ ██║██████╔╝██║ ██║ @@ -38,32 +39,40 @@ console.log( )} Current commit: ${ - commit !== null - ? `${cyan(commit)} (${yellow(commit.slice(0, 7))})` - : "Unknown (Git cannot be found)" - } + commit !== null + ? `${cyan(commit)} (${yellow(commit.slice(0, 7))})` + : "Unknown (Git cannot be found)" + } `) -); + ); -if (commit == null) - console.log(yellow(`Warning: Git is not installed or not in PATH.`)); + if (commit == null) { + console.log(yellow(`Warning: Git is not installed or not in PATH.`)); + } -if (cluster.isMaster && !process.env.masterStarted) { - process.env.masterStarted = "true"; + initStats(); - (async () => { - initStats(); + console.log(`[Process] starting with ${cores} threads`); - if (cores === 1) { - require("./Server"); - return; - } + if (cores === 1) { + require("./Server"); + } else { + process.env.EVENT_TRANSMISSION = "process"; // Fork workers. for (let i = 0; i < cores; i++) { cluster.fork(); + console.log(`[Process] worker ${i} started.`); } + cluster.on("message", (sender: Worker, message: any) => { + for (const id in cluster.workers) { + const worker = cluster.workers[id]; + if (worker === sender || !worker) continue; + worker.send(message); + } + }); + cluster.on("exit", (worker: any, code: any, signal: any) => { console.log( `[Worker] ${red( @@ -72,7 +81,7 @@ if (cluster.isMaster && !process.env.masterStarted) { ); cluster.fork(); }); - })(); + } } else { require("./Server"); } diff --git a/util/src/util/Event.ts b/util/src/util/Event.ts index 8ed009d5..bb624051 100644 --- a/util/src/util/Event.ts +++ b/util/src/util/Event.ts @@ -15,6 +15,8 @@ export async function emitEvent(payload: Omit<Event, "created_at">) { // assertQueue isn't needed, because a queue will automatically created if it doesn't exist const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); if (!successful) throw new Error("failed to send event"); + } else if (process.env.EVENT_TRANSMISSION === "process") { + process.send?.({ type: "event", event: payload, id } as ProcessEvent); } else { events.emit(id, payload); } @@ -25,6 +27,7 @@ export async function initEvent() { if (RabbitMQ.connection) { } else { // use event emitter + // use process messages } } @@ -39,17 +42,38 @@ export interface ListenEventOpts { acknowledge?: boolean; } +export interface ProcessEvent { + type: "event"; + event: Event; + id: string; +} + export async function listenEvent(event: string, callback: (event: EventOpts) => any, opts?: ListenEventOpts) { if (RabbitMQ.connection) { // @ts-ignore return rabbitListen(opts?.channel || RabbitMQ.channel, event, callback, { acknowledge: opts?.acknowledge }); + } else if (process.env.EVENT_TRANSMISSION === "process") { + const cancel = () => { + process.removeListener("message", listener); + process.setMaxListeners(process.getMaxListeners() - 1); + }; + + const listener = (msg: ProcessEvent) => { + msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel }); + }; + + process.addListener("message", listener); + process.setMaxListeners(process.getMaxListeners() + 1); + + return cancel; } else { + const listener = (opts: any) => callback({ ...opts, cancel }); const cancel = () => { - events.removeListener(event, callback); + events.removeListener(event, listener); events.setMaxListeners(events.getMaxListeners() - 1); }; events.setMaxListeners(events.getMaxListeners() + 1); - events.addListener(event, (opts) => callback({ ...opts, cancel })); + events.addListener(event, listener); return cancel; } |