summary refs log tree commit diff
path: root/src/util/Database.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/Database.ts')
-rw-r--r--src/util/Database.ts59
1 files changed, 40 insertions, 19 deletions
diff --git a/src/util/Database.ts b/src/util/Database.ts

index e5323ed6..8c6847a8 100644 --- a/src/util/Database.ts +++ b/src/util/Database.ts
@@ -2,11 +2,10 @@ import "./MongoBigInt"; import mongoose, { Collection, Connection, LeanDocument } from "mongoose"; import { ChangeStream, ChangeEvent, Long } from "mongodb"; import EventEmitter from "events"; -import { Document } from "mongoose"; const uri = process.env.MONGO_URL || "mongodb://localhost:27017/fosscord?readPreference=secondaryPreferred"; +import { URL } from "url"; -// TODO: auto throw error if findOne doesn't find anything -console.log(`[DB] connect: ${uri}`); +const url = new URL(uri.replace("mongodb://", "http://")); const connection = mongoose.createConnection(uri, { autoIndex: true, @@ -14,6 +13,7 @@ const connection = mongoose.createConnection(uri, { useUnifiedTopology: true, useFindAndModify: false, }); +console.log(`[Database] connect: mongodb://${url.username}@${url.host}${url.pathname}${url.search}`); export default <Connection>connection; @@ -47,29 +47,38 @@ export interface MongooseCache { export class MongooseCache extends EventEmitter { public stream: ChangeStream; public data: any; + public initalizing?: Promise<void>; constructor( public collection: Collection, public pipeline: Array<Record<string, unknown>>, public opts: { onlyEvents: boolean; + array?: boolean; } ) { super(); + if (this.opts.array == null) this.opts.array = true; } - init = async () => { - // @ts-ignore - this.stream = this.collection.watch(this.pipeline, { fullDocument: "updateLookup" }); + init = () => { + if (this.initalizing) return this.initalizing; + this.initalizing = new Promise(async (resolve, reject) => { + // @ts-ignore + this.stream = this.collection.watch(this.pipeline, { fullDocument: "updateLookup" }); - this.stream.on("change", this.change); - this.stream.on("close", this.destroy); - this.stream.on("error", console.error); + this.stream.on("change", this.change); + this.stream.on("close", this.destroy); + this.stream.on("error", console.error); - if (!this.opts.onlyEvents) { - const arr = await this.collection.aggregate(this.pipeline).toArray(); - this.data = arr.length ? arr[0] : arr; - } + if (!this.opts.onlyEvents) { + const arr = await this.collection.aggregate(this.pipeline).toArray(); + if (this.opts.array) this.data = arr || []; + else this.data = arr?.[0]; + } + resolve(); + }); + return this.initalizing; }; changeStream = (pipeline: any) => { @@ -91,23 +100,34 @@ export class MongooseCache extends EventEmitter { change = (doc: ChangeEvent) => { try { - // @ts-ignore - if (doc.fullDocument) { - // @ts-ignore - if (!this.opts.onlyEvents) this.data = doc.fullDocument; - } - switch (doc.operationType) { case "dropDatabase": return this.destroy(); case "drop": return this.destroy(); case "delete": + if (!this.opts.onlyEvents) { + if (this.opts.array) { + this.data = this.data.filter((x: any) => doc.documentKey?._id?.equals(x._id)); + } else this.data = null; + } return this.emit("delete", doc.documentKey._id.toHexString()); case "insert": + if (!this.opts.onlyEvents) { + if (this.opts.array) this.data.push(doc.fullDocument); + else this.data = doc.fullDocument; + } return this.emit("insert", doc.fullDocument); case "update": case "replace": + if (!this.opts.onlyEvents) { + if (this.opts.array) { + const i = this.data.findIndex((x: any) => doc.fullDocument?._id?.equals(x._id)); + if (i == -1) this.data.push(doc.fullDocument); + else this.data[i] = doc.fullDocument; + } else this.data = doc.fullDocument; + } + return this.emit("change", doc.fullDocument); case "invalidate": return this.destroy(); @@ -120,6 +140,7 @@ export class MongooseCache extends EventEmitter { }; destroy = () => { + this.data = null; this.stream?.off("change", this.change); this.emit("close");