diff --git a/webrtc/src/Server.ts b/webrtc/src/Server.ts
index 7a1070b9..e2fa8634 100644
--- a/webrtc/src/Server.ts
+++ b/webrtc/src/Server.ts
@@ -1,215 +1,56 @@
-import { Server as WebSocketServer } from "ws";
-import { WebSocket, CLOSECODES } from "@fosscord/gateway";
-import { Config, initDatabase } from "@fosscord/util";
-import OPCodeHandlers, { Payload } from "./opcodes";
-import { setHeartbeat } from "./util";
-import * as mediasoup from "mediasoup";
-import { types as MediasoupTypes } from "mediasoup";
-import udp from "dgram";
-import sodium from "libsodium-wrappers";
-import { assert } from "console";
-
-var port = Number(process.env.PORT);
-if (isNaN(port)) port = 3004;
+import { closeDatabase, Config, initDatabase, initEvent } from "@fosscord/util";
+import dotenv from "dotenv";
+import http from "http";
+import ws from "ws";
+import { Connection } from "./events/Connection";
+dotenv.config();
export class Server {
- public ws: WebSocketServer;
- public mediasoupWorkers: MediasoupTypes.Worker[] = [];
- public mediasoupRouters: MediasoupTypes.Router[] = [];
- public mediasoupTransports: MediasoupTypes.WebRtcTransport[] = [];
- public mediasoupProducers: MediasoupTypes.Producer[] = [];
- public mediasoupConsumers: MediasoupTypes.Consumer[] = [];
-
- public decryptKey: Uint8Array;
- public testUdp = udp.createSocket("udp6");
-
- constructor() {
- this.ws = new WebSocketServer({
- port,
- maxPayload: 4096,
- });
- this.ws.on("connection", async (socket: WebSocket) => {
- await setHeartbeat(socket);
-
- socket.on("message", async (message: string) => {
- const payload: Payload = JSON.parse(message);
-
- if (OPCodeHandlers[payload.op])
- try {
- await OPCodeHandlers[payload.op].call(this, socket, payload);
- }
- catch (e) {
- console.error(e);
- socket.close(CLOSECODES.Unknown_error);
- }
- else {
- console.error(`Unimplemented`, payload);
- socket.close(CLOSECODES.Unknown_opcode);
- }
+ public ws: ws.Server;
+ public port: number;
+ public server: http.Server;
+ public production: boolean;
+
+ constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) {
+ this.port = port;
+ this.production = production || false;
+
+ if (server) this.server = server;
+ else {
+ this.server = http.createServer(function (req, res) {
+ res.writeHead(200).end("Online");
});
+ }
- socket.on("close", (code: number, reason: string) => {
- console.log(`client closed ${code} ${reason}`);
- for (var consumer of this.mediasoupConsumers) consumer.close();
- for (var producer of this.mediasoupProducers) producer.close();
- for (var transport of this.mediasoupTransports) transport.close();
-
- this.mediasoupConsumers = [];
- this.mediasoupProducers = [];
- this.mediasoupTransports = [];
- })
+ this.server.on("upgrade", (request, socket, head) => {
+ if (!request.url?.includes("voice")) return;
+ this.ws.handleUpgrade(request, socket, head, (socket) => {
+ // @ts-ignore
+ socket.server = this;
+ this.ws.emit("connection", socket, request);
+ });
});
- this.testUdp.bind(60000);
- this.testUdp.on("message", (msg, rinfo) => {
- //random key from like, the libsodium examples on npm lol
-
- //give me my remote port?
- if (sodium.to_hex(msg) == "0001004600000001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") {
- this.testUdp.send(Buffer.from([rinfo.port, 0]), rinfo.port, rinfo.address);
- console.log(`got magic packet to send remote port? ${rinfo.address}:${rinfo.port}`);
- return;
- }
-
- //Hello
- if (sodium.to_hex(msg) == "0100000000000000") {
- console.log(`[UDP] client helloed`);
- return;
- }
-
- const nonce = Buffer.concat([msg.slice(-4), Buffer.from("\x00".repeat(20))]);
- console.log(`[UDP] nonce for this message: ${nonce.toString("hex")}`);
-
- console.log(`[UDP] message: ${sodium.to_hex(msg)}`);
-
- // let encrypted;
- // if (Buffer.from(msg).indexOf("\x81\xc9") == 0) {
- // encrypted = msg.slice(0x18, -4);
- // }
- // else if (Buffer.from(msg).indexOf("\x90\x78") == 0) {
- // encrypted = msg.slice(0x1C, -4);
- // }
- // else {
- // encrypted = msg.slice(0x18, -4);
- // console.log(`wtf header received: ${encrypted.toString("hex")}`);
- // }
-
- let encrypted = msg;
-
- if (sodium.to_hex(msg).indexOf("80c8000600000001") == 0) {
- //call status
-
- encrypted = encrypted.slice(8, -4);
- assert(encrypted.length == 40);
-
- try {
- const decrypted = sodium.crypto_secretbox_open_easy(encrypted, nonce, Buffer.from(this.decryptKey));
- console.log("[UDP] [ call status ]" + decrypted);
- }
- catch (e) {
- console.error(`[UDP] decrypt failure\n${e}\n${encrypted.toString("base64")}`);
- }
- return;
- }
-
- // try {
- // const decrypted = sodium.crypto_secretbox_open_easy(encrypted, nonce, Buffer.from(this.decryptKey.map(x => String.fromCharCode(x)).join("")));
- // console.log("[UDP] " + decrypted);
- // }
- // catch (e) {
- // console.error(`[UDP] decrypt failure\n${e}\n${msg.toString("base64")}`);
- // }
+ this.ws = new ws.Server({
+ maxPayload: 1024 * 1024 * 100,
+ noServer: true
});
+ this.ws.on("connection", Connection);
+ this.ws.on("error", console.error);
}
- async listen(): Promise<void> {
- // @ts-ignore
+ async start(): Promise<void> {
await initDatabase();
await Config.init();
- await this.createWorkers();
- console.log("[DB] connected");
- console.log(`[WebRTC] online on 0.0.0.0:${port}`);
+ await initEvent();
+ if (!this.server.listening) {
+ this.server.listen(this.port);
+ console.log(`[WebRTC] online on 0.0.0.0:${this.port}`);
+ }
}
- async createWorkers(): Promise<void> {
- const numWorkers = 1;
- for (let i = 0; i < numWorkers; i++) {
- const worker = await mediasoup.createWorker({ logLevel: "debug", logTags: ["dtls", "ice", "info", "message", "bwe"] });
- if (!worker) return;
-
- worker.on("died", () => {
- console.error("mediasoup worker died");
- });
-
- worker.observer.on("newrouter", async (router: MediasoupTypes.Router) => {
- console.log("new router created [id:%s]", router.id);
-
- this.mediasoupRouters.push(router);
-
- router.observer.on("newtransport", async (transport: MediasoupTypes.WebRtcTransport) => {
- console.log("new transport created [id:%s]", transport.id);
-
- await transport.enableTraceEvent();
-
- transport.on('dtlsstatechange', (dtlsstate) => {
- console.log(dtlsstate);
- });
-
- transport.on("sctpstatechange", (sctpstate) => {
- console.log(sctpstate);
- });
-
- router.observer.on("newrtpobserver", (rtpObserver: MediasoupTypes.RtpObserver) => {
- console.log("new RTP observer created [id:%s]", rtpObserver.id);
-
- // rtpObserver.observer.on("")
- });
-
- transport.on("connect", () => {
- console.log("transport connect");
- });
-
- transport.observer.on("newproducer", (producer: MediasoupTypes.Producer) => {
- console.log("new producer created [id:%s]", producer.id);
-
- this.mediasoupProducers.push(producer);
- });
-
- transport.observer.on("newconsumer", (consumer: MediasoupTypes.Consumer) => {
- console.log("new consumer created [id:%s]", consumer.id);
-
- this.mediasoupConsumers.push(consumer);
-
- consumer.on("rtp", (rtpPacket) => {
- console.log(rtpPacket);
- });
- });
-
- transport.observer.on("newdataproducer", (dataProducer) => {
- console.log("new data producer created [id:%s]", dataProducer.id);
- });
-
- transport.on("trace", (trace) => {
- console.log(trace);
- });
-
- this.mediasoupTransports.push(transport);
- });
- });
-
- await worker.createRouter({
- mediaCodecs: [
- {
- kind: "audio",
- mimeType: "audio/opus",
- clockRate: 48000,
- channels: 2,
- preferredPayloadType: 111,
- },
- ],
- });
-
- this.mediasoupWorkers.push(worker);
- }
+ async stop() {
+ closeDatabase();
+ this.server.close();
}
-}
+}
\ No newline at end of file
diff --git a/webrtc/src/events/Close.ts b/webrtc/src/events/Close.ts
new file mode 100644
index 00000000..1c203653
--- /dev/null
+++ b/webrtc/src/events/Close.ts
@@ -0,0 +1,9 @@
+import { WebSocket } from "@fosscord/gateway";
+import { Session } from "@fosscord/util";
+
+export async function onClose(this: WebSocket, code: number, reason: string) {
+ console.log("[WebRTC] closed", code, reason.toString());
+
+ if (this.session_id) await Session.delete({ session_id: this.session_id });
+ this.removeAllListeners();
+}
\ No newline at end of file
diff --git a/webrtc/src/events/Connection.ts b/webrtc/src/events/Connection.ts
new file mode 100644
index 00000000..bf228d64
--- /dev/null
+++ b/webrtc/src/events/Connection.ts
@@ -0,0 +1,60 @@
+import { CLOSECODES, Send, setHeartbeat, WebSocket } from "@fosscord/gateway";
+import { IncomingMessage } from "http";
+import { URL } from "url";
+import WS from "ws";
+import { VoiceOPCodes } from "../util";
+import { onClose } from "./Close";
+import { onMessage } from "./Message";
+var erlpack: any;
+try {
+ erlpack = require("@yukikaze-bot/erlpack");
+} catch (error) {}
+
+// TODO: check rate limit
+// TODO: specify rate limit in config
+// TODO: check msg max size
+
+export async function Connection(this: WS.Server, socket: WebSocket, request: IncomingMessage) {
+ try {
+ socket.on("close", onClose.bind(socket));
+ socket.on("message", onMessage.bind(socket));
+ console.log("[WebRTC] new connection", request.url);
+
+ if (process.env.WS_LOGEVENTS) {
+ [
+ "close",
+ "error",
+ "upgrade",
+ //"message",
+ "open",
+ "ping",
+ "pong",
+ "unexpected-response"
+ ].forEach((x) => {
+ socket.on(x, (y) => console.log("[WebRTC]", x, y));
+ });
+ }
+
+ const { searchParams } = new URL(`http://localhost${request.url}`);
+
+ socket.encoding = "json";
+ socket.version = Number(searchParams.get("v")) || 5;
+ if (socket.version < 3) return socket.close(CLOSECODES.Unknown_error, "invalid version");
+
+ setHeartbeat(socket);
+
+ socket.readyTimeout = setTimeout(() => {
+ return socket.close(CLOSECODES.Session_timed_out);
+ }, 1000 * 30);
+
+ await Send(socket, {
+ op: VoiceOPCodes.HELLO,
+ d: {
+ heartbeat_interval: 1000 * 30
+ }
+ });
+ } catch (error) {
+ console.error("[WebRTC]", error);
+ return socket.close(CLOSECODES.Unknown_error);
+ }
+}
\ No newline at end of file
diff --git a/webrtc/src/events/Message.ts b/webrtc/src/events/Message.ts
new file mode 100644
index 00000000..8f75a815
--- /dev/null
+++ b/webrtc/src/events/Message.ts
@@ -0,0 +1,38 @@
+import { CLOSECODES, Payload, WebSocket } from "@fosscord/gateway";
+import { Tuple } from "lambert-server";
+import OPCodeHandlers from "../opcodes";
+import { VoiceOPCodes } from "../util";
+
+const PayloadSchema = {
+ op: Number,
+ $d: new Tuple(Object, Number), // or number for heartbeat sequence
+ $s: Number,
+ $t: String
+};
+
+export async function onMessage(this: WebSocket, buffer: Buffer) {
+ try {
+ var data: Payload = JSON.parse(buffer.toString());
+ if (data.op !== VoiceOPCodes.IDENTIFY && !this.user_id) return this.close(CLOSECODES.Not_authenticated);
+
+ // @ts-ignore
+ const OPCodeHandler = OPCodeHandlers[data.op];
+ if (!OPCodeHandler) {
+ // @ts-ignore
+ console.error("[WebRTC] Unkown opcode " + VoiceOPCodes[data.op]);
+ // TODO: if all opcodes are implemented comment this out:
+ // this.close(CloseCodes.Unknown_opcode);
+ return;
+ }
+
+ if (![VoiceOPCodes.HEARTBEAT, VoiceOPCodes.SPEAKING].includes(data.op as VoiceOPCodes)) {
+ // @ts-ignore
+ console.log("[WebRTC] Opcode " + VoiceOPCodes[data.op]);
+ }
+
+ return await OPCodeHandler.call(this, data);
+ } catch (error) {
+ console.error("[WebRTC] error", error);
+ // if (!this.CLOSED && this.CLOSING) return this.close(CloseCodes.Unknown_error);
+ }
+}
\ No newline at end of file
diff --git a/webrtc/src/index.ts b/webrtc/src/index.ts
index e69de29b..7cecc9b6 100644
--- a/webrtc/src/index.ts
+++ b/webrtc/src/index.ts
@@ -0,0 +1,2 @@
+export * from "./Server";
+export * from "./util/index";
\ No newline at end of file
diff --git a/webrtc/src/opcodes/BackendVersion.ts b/webrtc/src/opcodes/BackendVersion.ts
new file mode 100644
index 00000000..b4b61c7d
--- /dev/null
+++ b/webrtc/src/opcodes/BackendVersion.ts
@@ -0,0 +1,6 @@
+import { Payload, Send, WebSocket } from "@fosscord/gateway";
+import { VoiceOPCodes } from "../util";
+
+export async function onBackendVersion(this: WebSocket, data: Payload) {
+ await Send(this, { op: VoiceOPCodes.VOICE_BACKEND_VERSION, d: { voice: "0.8.43", rtc_worker: "0.3.26" } });
+}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Connect.ts b/webrtc/src/opcodes/Connect.ts
deleted file mode 100644
index 1f874a44..00000000
--- a/webrtc/src/opcodes/Connect.ts
+++ /dev/null
@@ -1,40 +0,0 @@
-import { WebSocket } from "@fosscord/gateway";
-import { Payload } from "./index";
-import { Server } from "../Server"
-
-/*
-Sent by client:
-
-{
- "op": 12,
- "d": {
- "audio_ssrc": 0,
- "video_ssrc": 0,
- "rtx_ssrc": 0,
- "streams": [
- {
- "type": "video",
- "rid": "100",
- "ssrc": 0,
- "active": false,
- "quality": 100,
- "rtx_ssrc": 0,
- "max_bitrate": 2500000,
- "max_framerate": 20,
- "max_resolution": {
- "type": "fixed",
- "width": 1280,
- "height": 720
- }
- }
- ]
- }
-}
-*/
-
-export async function onConnect(this: Server, socket: WebSocket, data: Payload) {
- socket.send(JSON.stringify({ //what is op 15?
- op: 15,
- d: { any: 100 }
- }))
-}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Heartbeat.ts b/webrtc/src/opcodes/Heartbeat.ts
index 47f33f76..1b6c5bcd 100644
--- a/webrtc/src/opcodes/Heartbeat.ts
+++ b/webrtc/src/opcodes/Heartbeat.ts
@@ -1,8 +1,9 @@
-import { WebSocket } from "@fosscord/gateway";
-import { Payload } from "./index";
-import { setHeartbeat } from "../util";
-import { Server } from "../Server"
+import { CLOSECODES, Payload, Send, setHeartbeat, WebSocket } from "@fosscord/gateway";
+import { VoiceOPCodes } from "../util";
-export async function onHeartbeat(this: Server, socket: WebSocket, data: Payload) {
- await setHeartbeat(socket, data.d);
+export async function onHeartbeat(this: WebSocket, data: Payload) {
+ setHeartbeat(this);
+ if (isNaN(data.d)) return this.close(CLOSECODES.Decode_error);
+
+ await Send(this, { op: VoiceOPCodes.HEARTBEAT_ACK, d: data.d });
}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Identify.ts b/webrtc/src/opcodes/Identify.ts
index 210b5041..4ecec949 100644
--- a/webrtc/src/opcodes/Identify.ts
+++ b/webrtc/src/opcodes/Identify.ts
@@ -1,50 +1,50 @@
-import { WebSocket, CLOSECODES } from "@fosscord/gateway";
-import { Payload } from "./index";
-import { VoiceOPCodes, Session, User, Guild } from "@fosscord/util";
-import { Server } from "../Server";
+import { CLOSECODES, Payload, Send, WebSocket } from "@fosscord/gateway";
+import { validateSchema, VoiceIdentifySchema, VoiceState } from "@fosscord/util";
+import { endpoint, getClients, VoiceOPCodes } from "@fosscord/webrtc";
+import SemanticSDP from "semantic-sdp";
+const defaultSDP = require("../../../assets/sdp.json");
-export interface IdentifyPayload extends Payload {
- d: {
- server_id: string, //guild id
- session_id: string, //gateway session
- streams: Array<{
- type: string,
- rid: string, //number
- quality: number,
- }>,
- token: string, //voice_states token
- user_id: string,
- video: boolean,
- };
-}
+export async function onIdentify(this: WebSocket, data: Payload) {
+ clearTimeout(this.readyTimeout);
+ const { server_id, user_id, session_id, token, streams, video } = validateSchema("VoiceIdentifySchema", data.d) as VoiceIdentifySchema;
-export async function onIdentify(this: Server, socket: WebSocket, data: Payload) {
+ const voiceState = await VoiceState.findOne({ guild_id: server_id, user_id, token, session_id });
+ if (!voiceState) return this.close(CLOSECODES.Authentication_failed);
- const session = await Session.findOneOrFail(
- { session_id: data.d.session_id, },
- {
- where: { user_id: data.d.user_id },
- relations: ["user"]
- }
- );
- const user = session.user;
- const guild = await Guild.findOneOrFail({ id: data.d.server_id }, { relations: ["members"] });
+ this.user_id = user_id;
+ this.session_id = session_id;
+ const sdp = SemanticSDP.SDPInfo.expand(defaultSDP);
+ sdp.setDTLS(SemanticSDP.DTLSInfo.expand({ setup: "actpass", hash: "sha-256", fingerprint: endpoint.getDTLSFingerprint() }));
+
+ this.client = {
+ websocket: this,
+ out: {
+ tracks: new Map()
+ },
+ in: {
+ audio_ssrc: 0,
+ video_ssrc: 0,
+ rtx_ssrc: 0
+ },
+ sdp,
+ channel_id: voiceState.channel_id
+ };
- if (!guild.members.find(x => x.id === user.id))
- return socket.close(CLOSECODES.Invalid_intent);
+ const clients = getClients(voiceState.channel_id)!;
+ clients.add(this.client);
- var transport = this.mediasoupTransports[0] || await this.mediasoupRouters[0].createWebRtcTransport({
- listenIps: [{ ip: "10.22.64.146" }],
- enableUdp: true,
+ this.on("close", () => {
+ clients.delete(this.client!);
});
- socket.send(JSON.stringify({
+ await Send(this, {
op: VoiceOPCodes.READY,
d: {
- streams: data.d.streams ? [...data.d.streams.map(x => ({ ...x, rtx_ssrc: Math.floor(Math.random() * 10000), ssrc: Math.floor(Math.random() * 10000), active: true, }))] : undefined,
- ssrc: Math.floor(Math.random() * 10000),
- ip: transport.iceCandidates[0].ip,
- port: transport.iceCandidates[0].port,
+ streams: [
+ // { type: "video", ssrc: this.ssrc + 1, rtx_ssrc: this.ssrc + 2, rid: "100", quality: 100, active: false }
+ ],
+ ssrc: -1,
+ port: endpoint.getLocalPort(),
modes: [
"aead_aes256_gcm_rtpsize",
"aead_aes256_gcm",
@@ -53,11 +53,8 @@ export async function onIdentify(this: Server, socket: WebSocket, data: Payload)
"xsalsa20_poly1305_suffix",
"xsalsa20_poly1305"
],
- experiments: [
- "bwe_conservative_link_estimate",
- "bwe_remote_locus_client",
- "fixed_keyframe_interval"
- ]
- },
- }));
+ ip: "127.0.0.1",
+ experiments: []
+ }
+ });
}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Resume.ts b/webrtc/src/opcodes/Resume.ts
deleted file mode 100644
index 856b550c..00000000
--- a/webrtc/src/opcodes/Resume.ts
+++ /dev/null
@@ -1,24 +0,0 @@
-import { CLOSECODES, WebSocket } from "@fosscord/gateway";
-import { Payload } from "./index";
-import { Server } from "../Server"
-import { Guild, Session, VoiceOPCodes } from "@fosscord/util";
-
-export async function onResume(this: Server, socket: WebSocket, data: Payload) {
- const session = await Session.findOneOrFail(
- { session_id: data.d.session_id, },
- {
- where: { user_id: data.d.user_id },
- relations: ["user"]
- }
- );
- const user = session.user;
- const guild = await Guild.findOneOrFail({ id: data.d.server_id }, { relations: ["members"] });
-
- if (!guild.members.find(x => x.id === user.id))
- return socket.close(CLOSECODES.Invalid_intent);
-
- socket.send(JSON.stringify({
- op: VoiceOPCodes.RESUMED,
- d: null,
- }))
-}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/SelectProtocol.ts b/webrtc/src/opcodes/SelectProtocol.ts
index 71772454..c660c8e2 100644
--- a/webrtc/src/opcodes/SelectProtocol.ts
+++ b/webrtc/src/opcodes/SelectProtocol.ts
@@ -1,206 +1,46 @@
-import { WebSocket } from "@fosscord/gateway";
-import { Payload } from "./index";
-import { VoiceOPCodes } from "@fosscord/util";
-import { Server } from "../Server";
-import * as mediasoup from "mediasoup";
-import { RtpCodecCapability } from "mediasoup/node/lib/RtpParameters";
-import * as sdpTransform from 'sdp-transform';
-import sodium from "libsodium-wrappers";
-
-export interface CodecPayload {
- name: string,
- type: "audio" | "video",
- priority: number,
- payload_type: number,
- rtx_payload_type: number | null,
-}
-
-export interface SelectProtocolPayload extends Payload {
- d: {
- codecs: Array<CodecPayload>,
- data: string, // SDP if webrtc
- protocol: string,
- rtc_connection_id: string,
- sdp?: string, // same as data
- };
-}
-
-/*
-
- Sent by client:
-{
- "op": 1,
- "d": {
- "protocol": "webrtc",
- "data": "
- a=extmap-allow-mixed
- a=ice-ufrag:vNxb
- a=ice-pwd:tZvpbVPYEKcnW0gGRPq0OOnh
- a=ice-options:trickle
- a=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level
- a=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
- a=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01
- a=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid
- a=rtpmap:111 opus/48000/2
- a=extmap:14 urn:ietf:params:rtp-hdrext:toffset
- a=extmap:13 urn:3gpp:video-orientation
- a=extmap:5 http://www.webrtc.org/experiments/rtp-hdrext/playout-delay
- a=extmap:6 http://www.webrtc.org/experiments/rtp-hdrext/video-content-type
- a=extmap:7 http://www.webrtc.org/experiments/rtp-hdrext/video-timing
- a=extmap:8 http://www.webrtc.org/experiments/rtp-hdrext/color-space
- a=extmap:10 urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id
- a=extmap:11 urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id
- a=rtpmap:96 VP8/90000
- a=rtpmap:97 rtx/90000
- ",
- "codecs": [
- {
- "name": "opus",
- "type": "audio",
- "priority": 1000,
- "payload_type": 111,
- "rtx_payload_type": null
- },
- {
- "name": "H264",
- "type": "video",
- "priority": 1000,
- "payload_type": 102,
- "rtx_payload_type": 121
- },
- {
- "name": "VP8",
- "type": "video",
- "priority": 2000,
- "payload_type": 96,
- "rtx_payload_type": 97
- },
- {
- "name": "VP9",
- "type": "video",
- "priority": 3000,
- "payload_type": 98,
- "rtx_payload_type": 99
- }
- ],
- "rtc_connection_id": "3faa0b80-b3e2-4bae-b291-273801fbb7ab"
- }
-}
-*/
-
-export async function onSelectProtocol(this: Server, socket: WebSocket, data: Payload) {
- if (data.d.sdp) {
- const rtpCapabilities = this.mediasoupRouters[0].rtpCapabilities;
- const codecs = rtpCapabilities.codecs as RtpCodecCapability[];
-
- const transport = this.mediasoupTransports[0]; //whatever
-
- const res = sdpTransform.parse(data.d.sdp);
-
- const videoCodec = this.mediasoupRouters[0].rtpCapabilities.codecs!.find((x: any) => x.kind === "video");
- const audioCodec = this.mediasoupRouters[0].rtpCapabilities.codecs!.find((x: any) => x.kind === "audio");
-
- const producer = this.mediasoupProducers[0] || await transport.produce({
- kind: "audio",
- rtpParameters: {
- mid: "audio",
- codecs: [{
- clockRate: audioCodec!.clockRate,
- payloadType: audioCodec!.preferredPayloadType as number,
- mimeType: audioCodec!.mimeType,
- channels: audioCodec?.channels,
- }],
- headerExtensions: res.ext?.map(x => ({
- id: x.value,
- uri: x.uri,
- })),
- },
- paused: false,
- });
-
- console.log("can consume: " + this.mediasoupRouters[0].canConsume({ producerId: producer.id, rtpCapabilities: rtpCapabilities }));
-
- const consumer = this.mediasoupConsumers[0] || await transport.consume({
- producerId: producer.id,
- paused: false,
- rtpCapabilities,
- });
-
- transport.connect({
- dtlsParameters: {
- fingerprints: transport.dtlsParameters.fingerprints,
- role: "server",
- }
- });
-
- socket.send(JSON.stringify({
- op: VoiceOPCodes.SESSION_DESCRIPTION,
- d: {
- video_codec: videoCodec?.mimeType?.substring(6) || undefined,
- // mode: "xsalsa20_poly1305_lite",
- media_session_id: transport.id,
- audio_codec: audioCodec?.mimeType.substring(6),
- secret_key: sodium.from_hex("724b092810ec86d7e35c9d067702b31ef90bc43a7b598626749914d6a3e033ed").buffer,
- sdp: `m=audio ${50001} ICE/SDP\n`
- + `a=fingerprint:sha-256 ${transport.dtlsParameters.fingerprints.find(x => x.algorithm === "sha-256")?.value}\n`
- + `c=IN IP4 ${transport.iceCandidates[0].ip}\n`
- + `t=0 0\n`
- + `a=ice-lite\n`
- + `a=rtcp-mux\n`
- + `a=rtcp:${50001}\n`
- + `a=ice-ufrag:${transport.iceParameters.usernameFragment}\n`
- + `a=ice-pwd:${transport.iceParameters.password}\n`
- + `a=fingerprint:sha-256 ${transport.dtlsParameters.fingerprints.find(x => x.algorithm === "sha-256")?.value}\n`
- + `a=candidate:1 1 ${transport.iceCandidates[0].protocol.toUpperCase()} ${transport.iceCandidates[0].priority} ${transport.iceCandidates[0].ip} ${50001} typ ${transport.iceCandidates[0].type}`
- }
- }));
- return;
- }
- else {
- /*
- {
- "video_codec":"H264",
- "sdp":
- "
- m=audio 50010 ICE/SDP
- a=fingerprint:sha-256 4A:79:94:16:44:3F:BD:05:41:5A:C7:20:F3:12:54:70:00:73:5D:33:00:2D:2C:80:9B:39:E1:9F:2D:A7:49:87
- c=IN IP4 109.200.214.158
- a=rtcp:50010
- a=ice-ufrag:+npq
- a=ice-pwd:+jf7jAesMeHHby43FRqWTy
- a=fingerprint:sha-256 4A:79:94:16:44:3F:BD:05:41:5A:C7:20:F3:12:54:70:00:73:5D:33:00:2D:2C:80:9B:39:E1:9F:2D:A7:49:87
- a=candidate:1 1 UDP 4261412862 109.200.214.158 50010 typ host",
- "media_session_id":"59265c94fa13e313492c372c4c8da801
- ",
- "audio_codec":"opus"
- }
- */
-
-
- /*
- {
- "video_codec": "H264",
- "secret_key": [36, 80, 96, 53, 95, 149, 253, 16, 137, 186, 238, 222, 251, 180, 94, 150, 112, 137, 192, 109, 69, 79, 218, 111, 217, 197, 56, 74, 18, 41, 51, 140],
- "mode": "aead_aes256_gcm_rtpsize",
- "media_session_id": "797575a97a87b63e81e2399348b97ad1",
- "audio_codec": "opus"
- };
- */
-
- this.decryptKey = sodium.randombytes_buf(sodium.crypto_secretbox_KEYBYTES);
-
- // this.decryptKey = new Array(sodium.crypto_secretbox_KEYBYTES).fill(null).map((x, i) => i + 1);
- console.log(this.decryptKey);
-
- socket.send(JSON.stringify({
- op: VoiceOPCodes.SESSION_DESCRIPTION,
- d: {
- video_codec: "H264",
- secret_key: [...this.decryptKey.values()],
- mode: "aead_aes256_gcm_rtpsize",
- media_session_id: "blah blah blah",
- audio_codec: "opus",
- }
- }));
- }
+import { Payload, Send, WebSocket } from "@fosscord/gateway";
+import { SelectProtocolSchema, validateSchema } from "@fosscord/util";
+import { endpoint, PublicIP, VoiceOPCodes } from "@fosscord/webrtc";
+import SemanticSDP from "semantic-sdp";
+
+export async function onSelectProtocol(this: WebSocket, payload: Payload) {
+ if (!this.client) return;
+
+ const data = validateSchema("SelectProtocolSchema", payload.d) as SelectProtocolSchema;
+
+ const offer = SemanticSDP.SDPInfo.parse("m=audio\n" + data.sdp!);
+ this.client.sdp!.setICE(offer.getICE());
+ this.client.sdp!.setDTLS(offer.getDTLS());
+
+ const transport = endpoint.createTransport(this.client.sdp!);
+ this.client.transport = transport;
+ transport.setRemoteProperties(this.client.sdp!);
+ transport.setLocalProperties(this.client.sdp!);
+
+ const dtls = transport.getLocalDTLSInfo();
+ const ice = transport.getLocalICEInfo();
+ const port = endpoint.getLocalPort();
+ const fingerprint = dtls.getHash() + " " + dtls.getFingerprint();
+ const candidates = transport.getLocalCandidates();
+ const candidate = candidates[0];
+
+ const answer = `m=audio ${port} ICE/SDP
+a=fingerprint:${fingerprint}
+c=IN IP4 ${PublicIP}
+a=rtcp:${port}
+a=ice-ufrag:${ice.getUfrag()}
+a=ice-pwd:${ice.getPwd()}
+a=fingerprint:${fingerprint}
+a=candidate:1 1 ${candidate.getTransport()} ${candidate.getFoundation()} ${candidate.getAddress()} ${candidate.getPort()} typ host
+`;
+
+ await Send(this, {
+ op: VoiceOPCodes.SELECT_PROTOCOL_ACK,
+ d: {
+ video_codec: "H264",
+ sdp: answer,
+ media_session_id: this.session_id,
+ audio_codec: "opus"
+ }
+ });
}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Speaking.ts b/webrtc/src/opcodes/Speaking.ts
index 861a7c3d..e2227040 100644
--- a/webrtc/src/opcodes/Speaking.ts
+++ b/webrtc/src/opcodes/Speaking.ts
@@ -1,7 +1,22 @@
-import { WebSocket } from "@fosscord/gateway";
-import { Payload } from "./index"
-import { VoiceOPCodes } from "@fosscord/util";
-import { Server } from "../Server"
+import { Payload, Send, WebSocket } from "@fosscord/gateway";
+import { getClients, VoiceOPCodes } from "../util";
-export async function onSpeaking(this: Server, socket: WebSocket, data: Payload) {
+// {"speaking":1,"delay":5,"ssrc":2805246727}
+
+export async function onSpeaking(this: WebSocket, data: Payload) {
+ if (!this.client) return;
+
+ getClients(this.client.channel_id).forEach((client) => {
+ if (client === this.client) return;
+ const ssrc = this.client!.out.tracks.get(client.websocket.user_id);
+
+ Send(client.websocket, {
+ op: VoiceOPCodes.SPEAKING,
+ d: {
+ user_id: client.websocket.user_id,
+ speaking: data.d.speaking,
+ ssrc: ssrc?.audio_ssrc || 0
+ }
+ });
+ });
}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Version.ts b/webrtc/src/opcodes/Version.ts
deleted file mode 100644
index 0ea6eb4d..00000000
--- a/webrtc/src/opcodes/Version.ts
+++ /dev/null
@@ -1,14 +0,0 @@
-import { WebSocket } from "@fosscord/gateway";
-import { Payload } from "./index";
-import { setHeartbeat } from "../util";
-import { Server } from "../Server"
-
-export async function onVersion(this: Server, socket: WebSocket, data: Payload) {
- socket.send(JSON.stringify({
- op: 16,
- d: {
- voice: "0.8.31", //version numbers?
- rtc_worker: "0.3.18",
- }
- }))
-}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/Video.ts b/webrtc/src/opcodes/Video.ts
new file mode 100644
index 00000000..ff20d5a9
--- /dev/null
+++ b/webrtc/src/opcodes/Video.ts
@@ -0,0 +1,118 @@
+import { Payload, Send, WebSocket } from "@fosscord/gateway";
+import { validateSchema, VoiceVideoSchema } from "@fosscord/util";
+import { channels, getClients, VoiceOPCodes } from "@fosscord/webrtc";
+import { IncomingStreamTrack, SSRCs } from "medooze-media-server";
+import SemanticSDP from "semantic-sdp";
+
+export async function onVideo(this: WebSocket, payload: Payload) {
+ if (!this.client) return;
+ const { transport, channel_id } = this.client;
+ if (!transport) return;
+ const d = validateSchema("VoiceVideoSchema", payload.d) as VoiceVideoSchema;
+
+ await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } });
+
+ const id = "stream" + this.user_id;
+
+ var stream = this.client.in.stream!;
+ if (!stream) {
+ stream = this.client.transport!.createIncomingStream(
+ // @ts-ignore
+ SemanticSDP.StreamInfo.expand({
+ id,
+ // @ts-ignore
+ tracks: []
+ })
+ );
+ this.client.in.stream = stream;
+
+ const interval = setInterval(() => {
+ for (const track of stream.getTracks()) {
+ for (const layer of Object.values(track.getStats())) {
+ console.log(track.getId(), layer.total);
+ }
+ }
+ }, 5000);
+
+ stream.on("stopped", () => {
+ console.log("stream stopped");
+ clearInterval(interval);
+ });
+ this.on("close", () => {
+ transport!.stop();
+ });
+ const out = transport.createOutgoingStream(
+ // @ts-ignore
+ SemanticSDP.StreamInfo.expand({
+ id: "out" + this.user_id,
+ // @ts-ignore
+ tracks: []
+ })
+ );
+ this.client.out.stream = out;
+
+ const clients = channels.get(channel_id)!;
+
+ clients.forEach((client) => {
+ if (client.websocket.user_id === this.user_id) return;
+ if (!client.in.stream) return;
+
+ client.in.stream?.getTracks().forEach((track) => {
+ attachTrack.call(this, track, client.websocket.user_id);
+ });
+ });
+ }
+
+ if (d.audio_ssrc) {
+ handleSSRC.call(this, "audio", { media: d.audio_ssrc, rtx: d.audio_ssrc + 1 });
+ }
+ if (d.video_ssrc && d.rtx_ssrc) {
+ handleSSRC.call(this, "video", { media: d.video_ssrc, rtx: d.rtx_ssrc });
+ }
+}
+
+function attachTrack(this: WebSocket, track: IncomingStreamTrack, user_id: string) {
+ if (!this.client) return;
+ const outTrack = this.client.transport!.createOutgoingStreamTrack(track.getMedia());
+ outTrack.attachTo(track);
+ this.client.out.stream!.addTrack(outTrack);
+ var ssrcs = this.client.out.tracks.get(user_id)!;
+ if (!ssrcs) ssrcs = this.client.out.tracks.set(user_id, { audio_ssrc: 0, rtx_ssrc: 0, video_ssrc: 0 }).get(user_id)!;
+
+ if (track.getMedia() === "audio") {
+ ssrcs.audio_ssrc = outTrack.getSSRCs().media!;
+ } else if (track.getMedia() === "video") {
+ ssrcs.video_ssrc = outTrack.getSSRCs().media!;
+ ssrcs.rtx_ssrc = outTrack.getSSRCs().rtx!;
+ }
+
+ Send(this, {
+ op: VoiceOPCodes.VIDEO,
+ d: {
+ user_id: user_id,
+ ...ssrcs
+ } as VoiceVideoSchema
+ });
+}
+
+function handleSSRC(this: WebSocket, type: "audio" | "video", ssrcs: SSRCs) {
+ if (!this.client) return;
+ const stream = this.client.in.stream!;
+ const transport = this.client.transport!;
+
+ const id = type + ssrcs.media;
+ var track = stream.getTrack(id);
+ if (!track) {
+ console.log("createIncomingStreamTrack", id);
+ track = transport.createIncomingStreamTrack(type, { id, ssrcs });
+ stream.addTrack(track);
+
+ const clients = getClients(this.client.channel_id)!;
+ clients.forEach((client) => {
+ if (client.websocket.user_id === this.user_id) return;
+ if (!client.out.stream) return;
+
+ attachTrack.call(this, track, client.websocket.user_id);
+ });
+ }
+}
\ No newline at end of file
diff --git a/webrtc/src/opcodes/index.ts b/webrtc/src/opcodes/index.ts
index 4d4dbc30..8c664cce 100644
--- a/webrtc/src/opcodes/index.ts
+++ b/webrtc/src/opcodes/index.ts
@@ -1,43 +1,19 @@
-import { WebSocket } from "@fosscord/gateway";
-import { VoiceOPCodes } from "@fosscord/util";
-import { Server } from "../Server";
-
-export interface Payload {
- op: number;
- d: any;
- s: number;
- t: string;
-}
-
+import { Payload, WebSocket } from "@fosscord/gateway";
+import { VoiceOPCodes } from "../util";
+import { onBackendVersion } from "./BackendVersion";
+import { onHeartbeat } from "./Heartbeat";
import { onIdentify } from "./Identify";
import { onSelectProtocol } from "./SelectProtocol";
-import { onHeartbeat } from "./Heartbeat";
import { onSpeaking } from "./Speaking";
-import { onResume } from "./Resume";
-import { onConnect } from "./Connect";
-
-import { onVersion } from "./Version";
-
-export type OPCodeHandler = (this: Server, socket: WebSocket, data: Payload) => any;
+import { onVideo } from "./Video";
-const handlers: { [key: number]: OPCodeHandler } = {
- [VoiceOPCodes.IDENTIFY]: onIdentify, //op 0
- [VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol, //op 1
- //op 2 voice_ready
- [VoiceOPCodes.HEARTBEAT]: onHeartbeat, //op 3
- //op 4 session_description
- [VoiceOPCodes.SPEAKING]: onSpeaking, //op 5
- //op 6 heartbeat_ack
- [VoiceOPCodes.RESUME]: onResume, //op 7
- //op 8 hello
- //op 9 resumed
- //op 10?
- //op 11?
- [VoiceOPCodes.CLIENT_CONNECT]: onConnect, //op 12
- //op 13?
- //op 15?
- //op 16? empty data on client send but server sends {"voice":"0.8.24+bugfix.voice.streams.opt.branch-ffcefaff7","rtc_worker":"0.3.14-crypto-collision-copy"}
- [VoiceOPCodes.VERSION]: onVersion,
-};
+export type OPCodeHandler = (this: WebSocket, data: Payload) => any;
-export default handlers;
\ No newline at end of file
+export default {
+ [VoiceOPCodes.HEARTBEAT]: onHeartbeat,
+ [VoiceOPCodes.IDENTIFY]: onIdentify,
+ [VoiceOPCodes.VOICE_BACKEND_VERSION]: onBackendVersion,
+ [VoiceOPCodes.VIDEO]: onVideo,
+ [VoiceOPCodes.SPEAKING]: onSpeaking,
+ [VoiceOPCodes.SELECT_PROTOCOL]: onSelectProtocol
+};
\ No newline at end of file
diff --git a/webrtc/src/start.ts b/webrtc/src/start.ts
index f902ec1b..9a5f38ee 100644
--- a/webrtc/src/start.ts
+++ b/webrtc/src/start.ts
@@ -1,11 +1,13 @@
+process.on("uncaughtException", console.error);
+process.on("unhandledRejection", console.error);
+
import { config } from "dotenv";
+import { Server } from "./Server";
config();
-//testing
-process.env.DATABASE = "../bundle/database.db";
-process.env.DEBUG = "mediasoup*"
-
-import { Server } from "./Server";
+const port = Number(process.env.PORT) || 3004;
-const server = new Server();
-server.listen();
\ No newline at end of file
+const server = new Server({
+ port
+});
+server.start();
\ No newline at end of file
diff --git a/webrtc/src/test.ts b/webrtc/src/test.ts
deleted file mode 100644
index df407b56..00000000
--- a/webrtc/src/test.ts
+++ /dev/null
@@ -1,8 +0,0 @@
-import { getSupportedRtpCapabilities } from "mediasoup";
-
-async function test() {
- console.log(getSupportedRtpCapabilities());
-}
-setTimeout(() => {}, 1000000);
-
-test();
diff --git a/webrtc/src/util/Constants.ts b/webrtc/src/util/Constants.ts
new file mode 100644
index 00000000..64d78e22
--- /dev/null
+++ b/webrtc/src/util/Constants.ts
@@ -0,0 +1,26 @@
+export enum VoiceStatus {
+ CONNECTED = 0,
+ CONNECTING = 1,
+ AUTHENTICATING = 2,
+ RECONNECTING = 3,
+ DISCONNECTED = 4
+}
+
+export enum VoiceOPCodes {
+ IDENTIFY = 0,
+ SELECT_PROTOCOL = 1,
+ READY = 2,
+ HEARTBEAT = 3,
+ SELECT_PROTOCOL_ACK = 4,
+ SPEAKING = 5,
+ HEARTBEAT_ACK = 6,
+ RESUME = 7,
+ HELLO = 8,
+ RESUMED = 9,
+ VIDEO = 12,
+ CLIENT_DISCONNECT = 13,
+ SESSION_UPDATE = 14,
+ MEDIA_SINK_WANTS = 15,
+ VOICE_BACKEND_VERSION = 16,
+ CHANNEL_OPTIONS_UPDATE = 17
+}
\ No newline at end of file
diff --git a/webrtc/src/util/Heartbeat.ts b/webrtc/src/util/Heartbeat.ts
deleted file mode 100644
index 8c5e3a7a..00000000
--- a/webrtc/src/util/Heartbeat.ts
+++ /dev/null
@@ -1,23 +0,0 @@
-import { WebSocket, CLOSECODES } from "@fosscord/gateway";
-import { VoiceOPCodes } from "@fosscord/util";
-
-export async function setHeartbeat(socket: WebSocket, nonce?: Number) {
- if (socket.heartbeatTimeout) clearTimeout(socket.heartbeatTimeout);
-
- socket.heartbeatTimeout = setTimeout(() => {
- return socket.close(CLOSECODES.Session_timed_out);
- }, 1000 * 45);
-
- if (!nonce) {
- socket.send(JSON.stringify({
- op: VoiceOPCodes.HELLO,
- d: {
- v: 5,
- heartbeat_interval: 13750,
- }
- }));
- }
- else {
- socket.send(JSON.stringify({ op: VoiceOPCodes.HEARTBEAT_ACK, d: nonce }));
- }
-}
\ No newline at end of file
diff --git a/webrtc/src/util/MediaServer.ts b/webrtc/src/util/MediaServer.ts
new file mode 100644
index 00000000..93230c91
--- /dev/null
+++ b/webrtc/src/util/MediaServer.ts
@@ -0,0 +1,51 @@
+import { WebSocket } from "@fosscord/gateway";
+import MediaServer, { IncomingStream, OutgoingStream, Transport } from "medooze-media-server";
+import SemanticSDP from "semantic-sdp";
+MediaServer.enableLog(true);
+
+export const PublicIP = process.env.PUBLIC_IP || "127.0.0.1";
+
+try {
+ const range = process.env.WEBRTC_PORT_RANGE || "4000";
+ var ports = range.split("-");
+ const min = Number(ports[0]);
+ const max = Number(ports[1]);
+
+ MediaServer.setPortRange(min, max);
+} catch (error) {
+ console.error("Invalid env var: WEBRTC_PORT_RANGE", process.env.WEBRTC_PORT_RANGE, error);
+ process.exit(1);
+}
+
+export const endpoint = MediaServer.createEndpoint(PublicIP);
+
+export const channels = new Map<string, Set<Client>>();
+
+export interface Client {
+ transport?: Transport;
+ websocket: WebSocket;
+ out: {
+ stream?: OutgoingStream;
+ tracks: Map<
+ string,
+ {
+ audio_ssrc: number;
+ video_ssrc: number;
+ rtx_ssrc: number;
+ }
+ >;
+ };
+ in: {
+ stream?: IncomingStream;
+ audio_ssrc: number;
+ video_ssrc: number;
+ rtx_ssrc: number;
+ };
+ sdp: SemanticSDP.SDPInfo;
+ channel_id: string;
+}
+
+export function getClients(channel_id: string) {
+ if (!channels.has(channel_id)) channels.set(channel_id, new Set());
+ return channels.get(channel_id)!;
+}
\ No newline at end of file
diff --git a/webrtc/src/util/index.ts b/webrtc/src/util/index.ts
index e8557452..2e09bc48 100644
--- a/webrtc/src/util/index.ts
+++ b/webrtc/src/util/index.ts
@@ -1 +1,2 @@
-export * from "./Heartbeat"
\ No newline at end of file
+export * from "./Constants";
+export * from "./MediaServer";
\ No newline at end of file
|