summary refs log tree commit diff
path: root/src/WebRTCSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/WebRTCSession.cpp')
-rw-r--r--src/WebRTCSession.cpp795
1 files changed, 408 insertions, 387 deletions
diff --git a/src/WebRTCSession.cpp b/src/WebRTCSession.cpp

index f3fd1bdc..32b67123 100644 --- a/src/WebRTCSession.cpp +++ b/src/WebRTCSession.cpp
@@ -1,9 +1,10 @@ #include <cctype> -#include "WebRTCSession.h" #include "Logging.h" +#include "WebRTCSession.h" -extern "C" { +extern "C" +{ #include "gst/gst.h" #include "gst/sdp/sdp.h" @@ -13,478 +14,498 @@ extern "C" { Q_DECLARE_METATYPE(WebRTCSession::State) -namespace { -bool isoffering_; -std::string localsdp_; -std::vector<mtx::events::msg::CallCandidates::Candidate> localcandidates_; - -gboolean newBusMessage(GstBus *bus G_GNUC_UNUSED, GstMessage *msg, gpointer user_data); -GstWebRTCSessionDescription* parseSDP(const std::string &sdp, GstWebRTCSDPType type); -void generateOffer(GstElement *webrtc); -void setLocalDescription(GstPromise *promise, gpointer webrtc); -void addLocalICECandidate(GstElement *webrtc G_GNUC_UNUSED, guint mlineIndex, gchar *candidate, gpointer G_GNUC_UNUSED); -gboolean onICEGatheringCompletion(gpointer timerid); -void iceConnectionStateChanged(GstElement *webrtcbin, GParamSpec *pspec G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED); -void createAnswer(GstPromise *promise, gpointer webrtc); -void addDecodeBin(GstElement *webrtc G_GNUC_UNUSED, GstPad *newpad, GstElement *pipe); -void linkNewPad(GstElement *decodebin G_GNUC_UNUSED, GstPad *newpad, GstElement *pipe); -std::string::const_iterator findName(const std::string &sdp, const std::string &name); -int getPayloadType(const std::string &sdp, const std::string &name); -} - -WebRTCSession::WebRTCSession() : QObject() +WebRTCSession::WebRTCSession() + : QObject() { - qRegisterMetaType<WebRTCSession::State>(); - connect(this, &WebRTCSession::stateChanged, this, &WebRTCSession::setState); + qRegisterMetaType<WebRTCSession::State>(); + connect(this, &WebRTCSession::stateChanged, this, &WebRTCSession::setState); } bool WebRTCSession::init(std::string *errorMessage) { - if (initialised_) - return true; + if (initialised_) + return true; - GError *error = nullptr; - if (!gst_init_check(nullptr, nullptr, &error)) { - std::string strError = std::string("WebRTC: failed to initialise GStreamer: "); - if (error) { - strError += error->message; - g_error_free(error); - } - nhlog::ui()->error(strError); - if (errorMessage) - *errorMessage = strError; - return false; - } + GError *error = nullptr; + if (!gst_init_check(nullptr, nullptr, &error)) { + std::string strError = std::string("WebRTC: failed to initialise GStreamer: "); + if (error) { + strError += error->message; + g_error_free(error); + } + nhlog::ui()->error(strError); + if (errorMessage) + *errorMessage = strError; + return false; + } - gchar *version = gst_version_string(); - std::string gstVersion(version); - g_free(version); - nhlog::ui()->info("WebRTC: initialised " + gstVersion); + gchar *version = gst_version_string(); + std::string gstVersion(version); + g_free(version); + nhlog::ui()->info("WebRTC: initialised " + gstVersion); - // GStreamer Plugins: - // Base: audioconvert, audioresample, opus, playback, volume - // Good: autodetect, rtpmanager - // Bad: dtls, srtp, webrtc - // libnice [GLib]: nice - initialised_ = true; - std::string strError = gstVersion + ": Missing plugins: "; - const gchar *needed[] = {"audioconvert", "audioresample", "autodetect", "dtls", "nice", - "opus", "playback", "rtpmanager", "srtp", "volume", "webrtc", nullptr}; - GstRegistry *registry = gst_registry_get(); - for (guint i = 0; i < g_strv_length((gchar**)needed); i++) { - GstPlugin *plugin = gst_registry_find_plugin(registry, needed[i]); - if (!plugin) { - strError += std::string(needed[i]) + " "; - initialised_ = false; - continue; - } - gst_object_unref(plugin); - } + // GStreamer Plugins: + // Base: audioconvert, audioresample, opus, playback, volume + // Good: autodetect, rtpmanager + // Bad: dtls, srtp, webrtc + // libnice [GLib]: nice + initialised_ = true; + std::string strError = gstVersion + ": Missing plugins: "; + const gchar *needed[] = {"audioconvert", + "audioresample", + "autodetect", + "dtls", + "nice", + "opus", + "playback", + "rtpmanager", + "srtp", + "volume", + "webrtc", + nullptr}; + GstRegistry *registry = gst_registry_get(); + for (guint i = 0; i < g_strv_length((gchar **)needed); i++) { + GstPlugin *plugin = gst_registry_find_plugin(registry, needed[i]); + if (!plugin) { + strError += std::string(needed[i]) + " "; + initialised_ = false; + continue; + } + gst_object_unref(plugin); + } - if (!initialised_) { - nhlog::ui()->error(strError); - if (errorMessage) - *errorMessage = strError; - } - return initialised_; + if (!initialised_) { + nhlog::ui()->error(strError); + if (errorMessage) + *errorMessage = strError; + } + return initialised_; } -bool -WebRTCSession::createOffer() +namespace { + +bool isoffering_; +std::string localsdp_; +std::vector<mtx::events::msg::CallCandidates::Candidate> localcandidates_; + +gboolean +newBusMessage(GstBus *bus G_GNUC_UNUSED, GstMessage *msg, gpointer user_data) { - isoffering_ = true; - localsdp_.clear(); - localcandidates_.clear(); - return startPipeline(111); // a dynamic opus payload type + WebRTCSession *session = static_cast<WebRTCSession *>(user_data); + switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_EOS: + nhlog::ui()->error("WebRTC: end of stream"); + session->end(); + break; + case GST_MESSAGE_ERROR: + GError *error; + gchar *debug; + gst_message_parse_error(msg, &error, &debug); + nhlog::ui()->error( + "WebRTC: error from element {}: {}", GST_OBJECT_NAME(msg->src), error->message); + g_clear_error(&error); + g_free(debug); + session->end(); + break; + default: + break; + } + return TRUE; } -bool -WebRTCSession::acceptOffer(const std::string &sdp) +GstWebRTCSessionDescription * +parseSDP(const std::string &sdp, GstWebRTCSDPType type) { - nhlog::ui()->debug("WebRTC: received offer:\n{}", sdp); - if (state_ != State::DISCONNECTED) - return false; + GstSDPMessage *msg; + gst_sdp_message_new(&msg); + if (gst_sdp_message_parse_buffer((guint8 *)sdp.c_str(), sdp.size(), msg) == GST_SDP_OK) { + return gst_webrtc_session_description_new(type, msg); + } else { + nhlog::ui()->error("WebRTC: failed to parse remote session description"); + gst_object_unref(msg); + return nullptr; + } +} - isoffering_ = false; - localsdp_.clear(); - localcandidates_.clear(); +void +setLocalDescription(GstPromise *promise, gpointer webrtc) +{ + const GstStructure *reply = gst_promise_get_reply(promise); + gboolean isAnswer = gst_structure_id_has_field(reply, g_quark_from_string("answer")); + GstWebRTCSessionDescription *gstsdp = nullptr; + gst_structure_get(reply, + isAnswer ? "answer" : "offer", + GST_TYPE_WEBRTC_SESSION_DESCRIPTION, + &gstsdp, + nullptr); + gst_promise_unref(promise); + g_signal_emit_by_name(webrtc, "set-local-description", gstsdp, nullptr); - int opusPayloadType = getPayloadType(sdp, "opus"); - if (opusPayloadType == -1) - return false; + gchar *sdp = gst_sdp_message_as_text(gstsdp->sdp); + localsdp_ = std::string(sdp); + g_free(sdp); + gst_webrtc_session_description_free(gstsdp); - GstWebRTCSessionDescription *offer = parseSDP(sdp, GST_WEBRTC_SDP_TYPE_OFFER); - if (!offer) - return false; + nhlog::ui()->debug( + "WebRTC: local description set ({}):\n{}", isAnswer ? "answer" : "offer", localsdp_); +} - if (!startPipeline(opusPayloadType)) { - gst_webrtc_session_description_free(offer); - return false; - } +void +createOffer(GstElement *webrtc) +{ + // create-offer first, then set-local-description + GstPromise *promise = + gst_promise_new_with_change_func(setLocalDescription, webrtc, nullptr); + g_signal_emit_by_name(webrtc, "create-offer", nullptr, promise); +} - // set-remote-description first, then create-answer - GstPromise *promise = gst_promise_new_with_change_func(createAnswer, webrtc_, nullptr); - g_signal_emit_by_name(webrtc_, "set-remote-description", offer, promise); - gst_webrtc_session_description_free(offer); - return true; +void +createAnswer(GstPromise *promise, gpointer webrtc) +{ + // create-answer first, then set-local-description + gst_promise_unref(promise); + promise = gst_promise_new_with_change_func(setLocalDescription, webrtc, nullptr); + g_signal_emit_by_name(webrtc, "create-answer", nullptr, promise); } -bool -WebRTCSession::startPipeline(int opusPayloadType) +gboolean +onICEGatheringCompletion(gpointer timerid) { - if (state_ != State::DISCONNECTED) - return false; + *(guint *)(timerid) = 0; + if (isoffering_) { + emit WebRTCSession::instance().offerCreated(localsdp_, localcandidates_); + emit WebRTCSession::instance().stateChanged(WebRTCSession::State::OFFERSENT); + } else { + emit WebRTCSession::instance().answerCreated(localsdp_, localcandidates_); + emit WebRTCSession::instance().stateChanged(WebRTCSession::State::ANSWERSENT); + } + return FALSE; +} - emit stateChanged(State::INITIATING); +void +addLocalICECandidate(GstElement *webrtc G_GNUC_UNUSED, + guint mlineIndex, + gchar *candidate, + gpointer G_GNUC_UNUSED) +{ + nhlog::ui()->debug("WebRTC: local candidate: (m-line:{}):{}", mlineIndex, candidate); - if (!createPipeline(opusPayloadType)) - return false; + if (WebRTCSession::instance().state() >= WebRTCSession::State::OFFERSENT) { + emit WebRTCSession::instance().newICECandidate( + {"audio", (uint16_t)mlineIndex, candidate}); + return; + } - webrtc_ = gst_bin_get_by_name(GST_BIN(pipe_), "webrtcbin"); + localcandidates_.push_back({"audio", (uint16_t)mlineIndex, candidate}); - if (!stunServer_.empty()) { - nhlog::ui()->info("WebRTC: setting STUN server: {}", stunServer_); - g_object_set(webrtc_, "stun-server", stunServer_.c_str(), nullptr); - } + // GStreamer v1.16: webrtcbin's notify::ice-gathering-state triggers + // GST_WEBRTC_ICE_GATHERING_STATE_COMPLETE too early. Fixed in v1.18. Use a 100ms timeout in + // the meantime + static guint timerid = 0; + if (timerid) + g_source_remove(timerid); - for (const auto &uri : turnServers_) { - nhlog::ui()->info("WebRTC: setting TURN server: {}", uri); - gboolean udata; - g_signal_emit_by_name(webrtc_, "add-turn-server", uri.c_str(), (gpointer)(&udata)); - } - if (turnServers_.empty()) - nhlog::ui()->warn("WebRTC: no TURN server provided"); + timerid = g_timeout_add(100, onICEGatheringCompletion, &timerid); +} - // generate the offer when the pipeline goes to PLAYING - if (isoffering_) - g_signal_connect(webrtc_, "on-negotiation-needed", G_CALLBACK(generateOffer), nullptr); +void +iceConnectionStateChanged(GstElement *webrtc, + GParamSpec *pspec G_GNUC_UNUSED, + gpointer user_data G_GNUC_UNUSED) +{ + GstWebRTCICEConnectionState newState; + g_object_get(webrtc, "ice-connection-state", &newState, nullptr); + switch (newState) { + case GST_WEBRTC_ICE_CONNECTION_STATE_CHECKING: + nhlog::ui()->debug("WebRTC: GstWebRTCICEConnectionState -> Checking"); + emit WebRTCSession::instance().stateChanged(WebRTCSession::State::CONNECTING); + break; + case GST_WEBRTC_ICE_CONNECTION_STATE_FAILED: + nhlog::ui()->error("WebRTC: GstWebRTCICEConnectionState -> Failed"); + emit WebRTCSession::instance().stateChanged(WebRTCSession::State::ICEFAILED); + break; + default: + break; + } +} - // on-ice-candidate is emitted when a local ICE candidate has been gathered - g_signal_connect(webrtc_, "on-ice-candidate", G_CALLBACK(addLocalICECandidate), nullptr); +void +linkNewPad(GstElement *decodebin G_GNUC_UNUSED, GstPad *newpad, GstElement *pipe) +{ + GstCaps *caps = gst_pad_get_current_caps(newpad); + if (!caps) + return; - // capture ICE failure - g_signal_connect(webrtc_, "notify::ice-connection-state", - G_CALLBACK(iceConnectionStateChanged), nullptr); + const gchar *name = gst_structure_get_name(gst_caps_get_structure(caps, 0)); + gst_caps_unref(caps); - // incoming streams trigger pad-added - gst_element_set_state(pipe_, GST_STATE_READY); - g_signal_connect(webrtc_, "pad-added", G_CALLBACK(addDecodeBin), pipe_); + GstPad *queuepad = nullptr; + if (g_str_has_prefix(name, "audio")) { + nhlog::ui()->debug("WebRTC: received incoming audio stream"); + GstElement *queue = gst_element_factory_make("queue", nullptr); + GstElement *convert = gst_element_factory_make("audioconvert", nullptr); + GstElement *resample = gst_element_factory_make("audioresample", nullptr); + GstElement *sink = gst_element_factory_make("autoaudiosink", nullptr); + gst_bin_add_many(GST_BIN(pipe), queue, convert, resample, sink, nullptr); + gst_element_sync_state_with_parent(queue); + gst_element_sync_state_with_parent(convert); + gst_element_sync_state_with_parent(resample); + gst_element_sync_state_with_parent(sink); + gst_element_link_many(queue, convert, resample, sink, nullptr); + queuepad = gst_element_get_static_pad(queue, "sink"); + } - // webrtcbin lifetime is the same as that of the pipeline - gst_object_unref(webrtc_); + if (queuepad) { + if (GST_PAD_LINK_FAILED(gst_pad_link(newpad, queuepad))) + nhlog::ui()->error("WebRTC: unable to link new pad"); + else { + emit WebRTCSession::instance().stateChanged( + WebRTCSession::State::CONNECTED); + } + gst_object_unref(queuepad); + } +} - // start the pipeline - GstStateChangeReturn ret = gst_element_set_state(pipe_, GST_STATE_PLAYING); - if (ret == GST_STATE_CHANGE_FAILURE) { - nhlog::ui()->error("WebRTC: unable to start pipeline"); - end(); - return false; - } +void +addDecodeBin(GstElement *webrtc G_GNUC_UNUSED, GstPad *newpad, GstElement *pipe) +{ + if (GST_PAD_DIRECTION(newpad) != GST_PAD_SRC) + return; - GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipe_)); - gst_bus_add_watch(bus, newBusMessage, this); - gst_object_unref(bus); - emit stateChanged(State::INITIATED); - return true; + nhlog::ui()->debug("WebRTC: received incoming stream"); + GstElement *decodebin = gst_element_factory_make("decodebin", nullptr); + g_signal_connect(decodebin, "pad-added", G_CALLBACK(linkNewPad), pipe); + gst_bin_add(GST_BIN(pipe), decodebin); + gst_element_sync_state_with_parent(decodebin); + GstPad *sinkpad = gst_element_get_static_pad(decodebin, "sink"); + if (GST_PAD_LINK_FAILED(gst_pad_link(newpad, sinkpad))) + nhlog::ui()->error("WebRTC: unable to link new pad"); + gst_object_unref(sinkpad); } -#define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS,payload=" - -bool -WebRTCSession::createPipeline(int opusPayloadType) +std::string::const_iterator +findName(const std::string &sdp, const std::string &name) { - std::string pipeline("webrtcbin bundle-policy=max-bundle name=webrtcbin " - "autoaudiosrc ! volume name=srclevel ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! " - "queue ! " RTP_CAPS_OPUS + std::to_string(opusPayloadType) + " ! webrtcbin."); - - webrtc_ = nullptr; - GError *error = nullptr; - pipe_ = gst_parse_launch(pipeline.c_str(), &error); - if (error) { - nhlog::ui()->error("WebRTC: failed to parse pipeline: {}", error->message); - g_error_free(error); - end(); - return false; - } - return true; + return std::search( + sdp.cbegin(), + sdp.cend(), + name.cbegin(), + name.cend(), + [](unsigned char c1, unsigned char c2) { return std::tolower(c1) == std::tolower(c2); }); } -bool -WebRTCSession::acceptAnswer(const std::string &sdp) +int +getPayloadType(const std::string &sdp, const std::string &name) { - nhlog::ui()->debug("WebRTC: received answer:\n{}", sdp); - if (state_ != State::OFFERSENT) - return false; + // eg a=rtpmap:111 opus/48000/2 + auto e = findName(sdp, name); + if (e == sdp.cend()) { + nhlog::ui()->error("WebRTC: remote offer - " + name + " attribute missing"); + return -1; + } - GstWebRTCSessionDescription *answer = parseSDP(sdp, GST_WEBRTC_SDP_TYPE_ANSWER); - if (!answer) { - end(); - return false; - } + if (auto s = sdp.rfind(':', e - sdp.cbegin()); s == std::string::npos) { + nhlog::ui()->error("WebRTC: remote offer - unable to determine " + name + + " payload type"); + return -1; + } else { + ++s; + try { + return std::stoi(std::string(sdp, s, e - sdp.cbegin() - s)); + } catch (...) { + nhlog::ui()->error("WebRTC: remote offer - unable to determine " + name + + " payload type"); + } + } + return -1; +} - g_signal_emit_by_name(webrtc_, "set-remote-description", answer, nullptr); - gst_webrtc_session_description_free(answer); - return true; } -void -WebRTCSession::acceptICECandidates(const std::vector<mtx::events::msg::CallCandidates::Candidate> &candidates) +bool +WebRTCSession::createOffer() { - if (state_ >= State::INITIATED) { - for (const auto &c : candidates) { - nhlog::ui()->debug("WebRTC: remote candidate: (m-line:{}):{}", c.sdpMLineIndex, c.candidate); - g_signal_emit_by_name(webrtc_, "add-ice-candidate", c.sdpMLineIndex, c.candidate.c_str()); - } - } + isoffering_ = true; + localsdp_.clear(); + localcandidates_.clear(); + return startPipeline(111); // a dynamic opus payload type } bool -WebRTCSession::toggleMuteAudioSrc(bool &isMuted) +WebRTCSession::acceptOffer(const std::string &sdp) { - if (state_ < State::INITIATED) - return false; + nhlog::ui()->debug("WebRTC: received offer:\n{}", sdp); + if (state_ != State::DISCONNECTED) + return false; - GstElement *srclevel = gst_bin_get_by_name(GST_BIN(pipe_), "srclevel"); - if (!srclevel) - return false; + isoffering_ = false; + localsdp_.clear(); + localcandidates_.clear(); - gboolean muted; - g_object_get(srclevel, "mute", &muted, nullptr); - g_object_set(srclevel, "mute", !muted, nullptr); - gst_object_unref(srclevel); - isMuted = !muted; - return true; -} + int opusPayloadType = getPayloadType(sdp, "opus"); + if (opusPayloadType == -1) + return false; -void -WebRTCSession::end() -{ - nhlog::ui()->debug("WebRTC: ending session"); - if (pipe_) { - gst_element_set_state(pipe_, GST_STATE_NULL); - gst_object_unref(pipe_); - pipe_ = nullptr; - } - webrtc_ = nullptr; - if (state_ != State::DISCONNECTED) - emit stateChanged(State::DISCONNECTED); -} + GstWebRTCSessionDescription *offer = parseSDP(sdp, GST_WEBRTC_SDP_TYPE_OFFER); + if (!offer) + return false; -namespace { + if (!startPipeline(opusPayloadType)) { + gst_webrtc_session_description_free(offer); + return false; + } -std::string::const_iterator findName(const std::string &sdp, const std::string &name) -{ - return std::search(sdp.cbegin(), sdp.cend(), name.cbegin(), name.cend(), - [](unsigned char c1, unsigned char c2) {return std::tolower(c1) == std::tolower(c2);}); + // set-remote-description first, then create-answer + GstPromise *promise = gst_promise_new_with_change_func(createAnswer, webrtc_, nullptr); + g_signal_emit_by_name(webrtc_, "set-remote-description", offer, promise); + gst_webrtc_session_description_free(offer); + return true; } -int getPayloadType(const std::string &sdp, const std::string &name) +bool +WebRTCSession::acceptAnswer(const std::string &sdp) { - // eg a=rtpmap:111 opus/48000/2 - auto e = findName(sdp, name); - if (e == sdp.cend()) { - nhlog::ui()->error("WebRTC: remote offer - " + name + " attribute missing"); - return -1; - } - - if (auto s = sdp.rfind(':', e - sdp.cbegin()); s == std::string::npos) { - nhlog::ui()->error("WebRTC: remote offer - unable to determine " + name + " payload type"); - return -1; - } - else { - ++s; - try { - return std::stoi(std::string(sdp, s, e - sdp.cbegin() - s)); - } - catch(...) { - nhlog::ui()->error("WebRTC: remote offer - unable to determine " + name + " payload type"); - } - } - return -1; -} + nhlog::ui()->debug("WebRTC: received answer:\n{}", sdp); + if (state_ != State::OFFERSENT) + return false; -gboolean -newBusMessage(GstBus *bus G_GNUC_UNUSED, GstMessage *msg, gpointer user_data) -{ - WebRTCSession *session = (WebRTCSession*)user_data; - switch (GST_MESSAGE_TYPE(msg)) { - case GST_MESSAGE_EOS: - nhlog::ui()->error("WebRTC: end of stream"); - session->end(); - break; - case GST_MESSAGE_ERROR: - GError *error; - gchar *debug; - gst_message_parse_error(msg, &error, &debug); - nhlog::ui()->error("WebRTC: error from element {}: {}", GST_OBJECT_NAME(msg->src), error->message); - g_clear_error(&error); - g_free(debug); - session->end(); - break; - default: - break; - } - return TRUE; -} + GstWebRTCSessionDescription *answer = parseSDP(sdp, GST_WEBRTC_SDP_TYPE_ANSWER); + if (!answer) { + end(); + return false; + } -GstWebRTCSessionDescription* -parseSDP(const std::string &sdp, GstWebRTCSDPType type) -{ - GstSDPMessage *msg; - gst_sdp_message_new(&msg); - if (gst_sdp_message_parse_buffer((guint8*)sdp.c_str(), sdp.size(), msg) == GST_SDP_OK) { - return gst_webrtc_session_description_new(type, msg); - } - else { - nhlog::ui()->error("WebRTC: failed to parse remote session description"); - gst_object_unref(msg); - return nullptr; - } + g_signal_emit_by_name(webrtc_, "set-remote-description", answer, nullptr); + gst_webrtc_session_description_free(answer); + return true; } void -generateOffer(GstElement *webrtc) +WebRTCSession::acceptICECandidates( + const std::vector<mtx::events::msg::CallCandidates::Candidate> &candidates) { - // create-offer first, then set-local-description - GstPromise *promise = gst_promise_new_with_change_func(setLocalDescription, webrtc, nullptr); - g_signal_emit_by_name(webrtc, "create-offer", nullptr, promise); + if (state_ >= State::INITIATED) { + for (const auto &c : candidates) { + nhlog::ui()->debug( + "WebRTC: remote candidate: (m-line:{}):{}", c.sdpMLineIndex, c.candidate); + g_signal_emit_by_name( + webrtc_, "add-ice-candidate", c.sdpMLineIndex, c.candidate.c_str()); + } + } } -void -setLocalDescription(GstPromise *promise, gpointer webrtc) +bool +WebRTCSession::startPipeline(int opusPayloadType) { - const GstStructure *reply = gst_promise_get_reply(promise); - gboolean isAnswer = gst_structure_id_has_field(reply, g_quark_from_string("answer")); - GstWebRTCSessionDescription *gstsdp = nullptr; - gst_structure_get(reply, isAnswer ? "answer" : "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &gstsdp, nullptr); - gst_promise_unref(promise); - g_signal_emit_by_name(webrtc, "set-local-description", gstsdp, nullptr); + if (state_ != State::DISCONNECTED) + return false; - gchar *sdp = gst_sdp_message_as_text(gstsdp->sdp); - localsdp_ = std::string(sdp); - g_free(sdp); - gst_webrtc_session_description_free(gstsdp); + emit stateChanged(State::INITIATING); - nhlog::ui()->debug("WebRTC: local description set ({}):\n{}", isAnswer ? "answer" : "offer", localsdp_); -} + if (!createPipeline(opusPayloadType)) + return false; -void -addLocalICECandidate(GstElement *webrtc G_GNUC_UNUSED, guint mlineIndex, gchar *candidate, gpointer G_GNUC_UNUSED) -{ - nhlog::ui()->debug("WebRTC: local candidate: (m-line:{}):{}", mlineIndex, candidate); + webrtc_ = gst_bin_get_by_name(GST_BIN(pipe_), "webrtcbin"); - if (WebRTCSession::instance().state() >= WebRTCSession::State::OFFERSENT) { - emit WebRTCSession::instance().newICECandidate({"audio", (uint16_t)mlineIndex, candidate}); - return; - } + if (!stunServer_.empty()) { + nhlog::ui()->info("WebRTC: setting STUN server: {}", stunServer_); + g_object_set(webrtc_, "stun-server", stunServer_.c_str(), nullptr); + } - localcandidates_.push_back({"audio", (uint16_t)mlineIndex, candidate}); + for (const auto &uri : turnServers_) { + nhlog::ui()->info("WebRTC: setting TURN server: {}", uri); + gboolean udata; + g_signal_emit_by_name(webrtc_, "add-turn-server", uri.c_str(), (gpointer)(&udata)); + } + if (turnServers_.empty()) + nhlog::ui()->warn("WebRTC: no TURN server provided"); - // GStreamer v1.16: webrtcbin's notify::ice-gathering-state triggers GST_WEBRTC_ICE_GATHERING_STATE_COMPLETE too early - // fixed in v1.18 - // use a 100ms timeout in the meantime - static guint timerid = 0; - if (timerid) - g_source_remove(timerid); + // generate the offer when the pipeline goes to PLAYING + if (isoffering_) + g_signal_connect( + webrtc_, "on-negotiation-needed", G_CALLBACK(::createOffer), nullptr); - timerid = g_timeout_add(100, onICEGatheringCompletion, &timerid); -} + // on-ice-candidate is emitted when a local ICE candidate has been gathered + g_signal_connect(webrtc_, "on-ice-candidate", G_CALLBACK(addLocalICECandidate), nullptr); -gboolean -onICEGatheringCompletion(gpointer timerid) -{ - *(guint*)(timerid) = 0; - if (isoffering_) { - emit WebRTCSession::instance().offerCreated(localsdp_, localcandidates_); - emit WebRTCSession::instance().stateChanged(WebRTCSession::State::OFFERSENT); - } - else { - emit WebRTCSession::instance().answerCreated(localsdp_, localcandidates_); - emit WebRTCSession::instance().stateChanged(WebRTCSession::State::ANSWERSENT); - } - return FALSE; -} + // capture ICE failure + g_signal_connect( + webrtc_, "notify::ice-connection-state", G_CALLBACK(iceConnectionStateChanged), nullptr); -void -iceConnectionStateChanged(GstElement *webrtc, GParamSpec *pspec G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED) -{ - GstWebRTCICEConnectionState newState; - g_object_get(webrtc, "ice-connection-state", &newState, nullptr); - switch (newState) { - case GST_WEBRTC_ICE_CONNECTION_STATE_CHECKING: - nhlog::ui()->debug("WebRTC: GstWebRTCICEConnectionState -> Checking"); - emit WebRTCSession::instance().stateChanged(WebRTCSession::State::CONNECTING); - break; - case GST_WEBRTC_ICE_CONNECTION_STATE_FAILED: - nhlog::ui()->error("WebRTC: GstWebRTCICEConnectionState -> Failed"); - emit WebRTCSession::instance().stateChanged(WebRTCSession::State::ICEFAILED); - break; - default: - break; - } -} + // incoming streams trigger pad-added + gst_element_set_state(pipe_, GST_STATE_READY); + g_signal_connect(webrtc_, "pad-added", G_CALLBACK(addDecodeBin), pipe_); -void -createAnswer(GstPromise *promise, gpointer webrtc) -{ - // create-answer first, then set-local-description - gst_promise_unref(promise); - promise = gst_promise_new_with_change_func(setLocalDescription, webrtc, nullptr); - g_signal_emit_by_name(webrtc, "create-answer", nullptr, promise); + // webrtcbin lifetime is the same as that of the pipeline + gst_object_unref(webrtc_); + + // start the pipeline + GstStateChangeReturn ret = gst_element_set_state(pipe_, GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) { + nhlog::ui()->error("WebRTC: unable to start pipeline"); + end(); + return false; + } + + GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipe_)); + gst_bus_add_watch(bus, newBusMessage, this); + gst_object_unref(bus); + emit stateChanged(State::INITIATED); + return true; } -void -addDecodeBin(GstElement *webrtc G_GNUC_UNUSED, GstPad *newpad, GstElement *pipe) +#define RTP_CAPS_OPUS "application/x-rtp,media=audio,encoding-name=OPUS,payload=" + +bool +WebRTCSession::createPipeline(int opusPayloadType) { - if (GST_PAD_DIRECTION(newpad) != GST_PAD_SRC) - return; + std::string pipeline("webrtcbin bundle-policy=max-bundle name=webrtcbin " + "autoaudiosrc ! volume name=srclevel ! audioconvert ! " + "audioresample ! queue ! opusenc ! rtpopuspay ! " + "queue ! " RTP_CAPS_OPUS + + std::to_string(opusPayloadType) + " ! webrtcbin."); - nhlog::ui()->debug("WebRTC: received incoming stream"); - GstElement *decodebin = gst_element_factory_make("decodebin", nullptr); - g_signal_connect(decodebin, "pad-added", G_CALLBACK(linkNewPad), pipe); - gst_bin_add(GST_BIN(pipe), decodebin); - gst_element_sync_state_with_parent(decodebin); - GstPad *sinkpad = gst_element_get_static_pad(decodebin, "sink"); - if (GST_PAD_LINK_FAILED(gst_pad_link(newpad, sinkpad))) - nhlog::ui()->error("WebRTC: unable to link new pad"); - gst_object_unref(sinkpad); + webrtc_ = nullptr; + GError *error = nullptr; + pipe_ = gst_parse_launch(pipeline.c_str(), &error); + if (error) { + nhlog::ui()->error("WebRTC: failed to parse pipeline: {}", error->message); + g_error_free(error); + end(); + return false; + } + return true; } -void -linkNewPad(GstElement *decodebin G_GNUC_UNUSED, GstPad *newpad, GstElement *pipe) +bool +WebRTCSession::toggleMuteAudioSrc(bool &isMuted) { - GstCaps *caps = gst_pad_get_current_caps(newpad); - if (!caps) - return; - - const gchar *name = gst_structure_get_name(gst_caps_get_structure(caps, 0)); - gst_caps_unref(caps); + if (state_ < State::INITIATED) + return false; - GstPad *queuepad = nullptr; - if (g_str_has_prefix(name, "audio")) { - nhlog::ui()->debug("WebRTC: received incoming audio stream"); - GstElement *queue = gst_element_factory_make("queue", nullptr); - GstElement *convert = gst_element_factory_make("audioconvert", nullptr); - GstElement *resample = gst_element_factory_make("audioresample", nullptr); - GstElement *sink = gst_element_factory_make("autoaudiosink", nullptr); - gst_bin_add_many(GST_BIN(pipe), queue, convert, resample, sink, nullptr); - gst_element_sync_state_with_parent(queue); - gst_element_sync_state_with_parent(convert); - gst_element_sync_state_with_parent(resample); - gst_element_sync_state_with_parent(sink); - gst_element_link_many(queue, convert, resample, sink, nullptr); - queuepad = gst_element_get_static_pad(queue, "sink"); - } + GstElement *srclevel = gst_bin_get_by_name(GST_BIN(pipe_), "srclevel"); + if (!srclevel) + return false; - if (queuepad) { - if (GST_PAD_LINK_FAILED(gst_pad_link(newpad, queuepad))) - nhlog::ui()->error("WebRTC: unable to link new pad"); - else { - emit WebRTCSession::instance().stateChanged(WebRTCSession::State::CONNECTED); - } - gst_object_unref(queuepad); - } + gboolean muted; + g_object_get(srclevel, "mute", &muted, nullptr); + g_object_set(srclevel, "mute", !muted, nullptr); + gst_object_unref(srclevel); + isMuted = !muted; + return true; } +void +WebRTCSession::end() +{ + nhlog::ui()->debug("WebRTC: ending session"); + if (pipe_) { + gst_element_set_state(pipe_, GST_STATE_NULL); + gst_object_unref(pipe_); + pipe_ = nullptr; + } + webrtc_ = nullptr; + if (state_ != State::DISCONNECTED) + emit stateChanged(State::DISCONNECTED); }