summary refs log tree commit diff
path: root/util/src/util/Event.ts
diff options
context:
space:
mode:
Diffstat (limited to 'util/src/util/Event.ts')
-rw-r--r--util/src/util/Event.ts96
1 files changed, 96 insertions, 0 deletions
diff --git a/util/src/util/Event.ts b/util/src/util/Event.ts
new file mode 100644

index 00000000..765e5fc7 --- /dev/null +++ b/util/src/util/Event.ts
@@ -0,0 +1,96 @@ +import { Channel } from "amqplib"; +import { RabbitMQ } from "./RabbitMQ"; +import EventEmitter from "events"; +import { EVENT, Event } from "../interfaces"; +const events = new EventEmitter(); + +export async function emitEvent(payload: Omit<Event, "created_at">) { + const id = (payload.channel_id || payload.user_id || payload.guild_id) as string; + if (!id) return console.error("event doesn't contain any id", payload); + + if (RabbitMQ.connection) { + const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission + await RabbitMQ.channel?.assertExchange(id, "fanout", { durable: false }); + + // assertQueue isn't needed, because a queue will automatically created if it doesn't exist + const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); + if (!successful) throw new Error("failed to send event"); + } else { + events.emit(id, payload); + } +} + +export async function initEvent() { + await RabbitMQ.init(); // does nothing if rabbitmq is not setup + if (RabbitMQ.connection) { + } else { + // use event emitter + } +} + +export interface EventOpts extends Event { + acknowledge?: Function; + channel?: Channel; + cancel: Function; +} + +export interface ListenEventOpts { + channel?: Channel; + acknowledge?: boolean; +} + +export async function listenEvent(event: string, callback: (event: EventOpts) => any, opts?: ListenEventOpts) { + if (RabbitMQ.connection) { + // @ts-ignore + return rabbitListen(opts?.channel || RabbitMQ.channel, event, callback, { acknowledge: opts?.acknowledge }); + } else { + const cancel = () => { + events.removeListener(event, callback); + }; + events.addListener(event, (opts) => callback({ ...opts, cancel })); + + return cancel; + } +} + +async function rabbitListen( + channel: Channel, + id: string, + callback: (event: EventOpts) => any, + opts?: { acknowledge?: boolean } +) { + await channel.assertExchange(id, "fanout", { durable: false }); + const q = await channel.assertQueue("", { exclusive: true, autoDelete: true }); + + const cancel = () => { + channel.cancel(q.queue); + channel.unbindQueue(q.queue, id, ""); + }; + + channel.bindQueue(q.queue, id, ""); + channel.consume( + q.queue, + (opts) => { + if (!opts) return; + + const data = JSON.parse(opts.content.toString()); + const event = opts.properties.type as EVENT; + + callback({ + event, + data, + acknowledge() { + channel.ack(opts); + }, + channel, + cancel, + }); + // rabbitCh.ack(opts); + }, + { + noAck: !opts?.acknowledge, + } + ); + + return cancel; +}