From 3b3a00d7bd5b4cdbfa97a71c425f0e2a1ff204c9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 7 Mar 2023 13:25:57 +0100 Subject: [PATCH] WHIP/WISH/WHEP support --- src/output/output_webrtc.cpp | 158 +++++++++++++++++++++++++++++++++-- src/output/output_webrtc.h | 6 +- 2 files changed, 156 insertions(+), 8 deletions(-) diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index 0f5ce503..b8f0169a 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -51,6 +51,7 @@ namespace Mist{ /* ------------------------------------------------ */ OutWebRTC::OutWebRTC(Socket::Connection &myConn) : HTTPOutput(myConn){ + noSignalling = false; totalPkts = 0; totalLoss = 0; totalRetrans = 0; @@ -303,6 +304,147 @@ namespace Mist{ } } + void OutWebRTC::requestHandler(){ + if (noSignalling){ + if (!parseData){Util::sleep(500);} + //After 10s of no packets, abort + if (Util::bootMS() > lastRecv + 10000){ + Util::logExitReason("received no data for 10+ seconds"); + config->is_active = false; + } + return; + } + HTTPOutput::requestHandler(); + } + + void OutWebRTC::respondHTTP(const HTTP::Parser & req, bool headersOnly){ + // Check for WHIP payload + if (req.method == "OPTIONS"){ + H.setCORSHeaders(); + H.StartResponse("200", "All good", req, myConn); + H.Chunkify(0, 0, myConn); + } + if (req.method == "POST"){ + if (req.GetHeader("Content-Type") == "application/sdp"){ + SDP::Session sdpParser; + const std::string &offerStr = req.body; + if (packetLog.is_open()){ + packetLog << "[" << Util::bootMS() << "]" << offerStr << std::endl << std::endl; + } + if (!sdpParser.parseSDP(offerStr) || !sdpAnswer.parseOffer(offerStr)){ + H.setCORSHeaders(); + H.StartResponse("400", "Could not parse", req, myConn); + H.Chunkify("Failed to parse offer SDP", myConn); + H.Chunkify(0, 0, myConn); + return; + } + + bool ret = false; + if (sdpParser.hasSendOnlyMedia()){ + ret = handleSignalingCommandRemoteOfferForInput(sdpParser); + }else{ + ret = handleSignalingCommandRemoteOfferForOutput(sdpParser); + } + if (ret){ + noSignalling = true; + H.SetHeader("Content-Type", "application/sdp"); + H.SetHeader("Location", streamName + "/" + JSON::Value(getpid()).asString()); + if (config->getString("iceservers").size()){ + std::deque links; + JSON::Value iceConf = JSON::fromString(config->getString("iceservers")); + jsonForEach(iceConf, i){ + if (i->isMember("url") && (*i)["url"].isString()){ + JSON::Value &u = (*i)["url"]; + std::string str = u.asString()+"; rel=\"ice-server\";"; + if (i->isMember("username")){ + str += " username=" + (*i)["username"].toString() + ";"; + } + if (i->isMember("credential")){ + str += " credential=" + (*i)["credential"].toString() + ";"; + } + if (i->isMember("credentialType")){ + str += " credential-type=" + (*i)["credentialType"].toString() + ";"; + } + links.push_back(str); + } + if (i->isMember("urls") && (*i)["urls"].isString()){ + JSON::Value &u = (*i)["urls"]; + std::string str = u.asString()+"; rel=\"ice-server\";"; + if (i->isMember("username")){ + str += " username=" + (*i)["username"].toString() + ";"; + } + if (i->isMember("credential")){ + str += " credential=" + (*i)["credential"].toString() + ";"; + } + if (i->isMember("credentialType")){ + str += " credential-type=" + (*i)["credentialType"].toString() + ";"; + } + links.push_back(str); + } + if (i->isMember("urls") && (*i)["urls"].isArray()){ + jsonForEach((*i)["urls"], j){ + JSON::Value &u = *j; + std::string str = u.asString()+"; rel=\"ice-server\";"; + if (i->isMember("username")){ + str += " username=" + (*i)["username"].toString() + ";"; + } + if (i->isMember("credential")){ + str += " credential=" + (*i)["credential"].toString() + ";"; + } + if (i->isMember("credentialType")){ + str += " credential-type=" + (*i)["credentialType"].toString() + ";"; + } + links.push_back(str); + } + } + } + if (links.size()){ + if (links.size() == 1){ + H.SetHeader("Link", *links.begin()); + }else{ + std::deque::iterator it = links.begin(); + std::string linkHeader = *it; + ++it; + while (it != links.end()){ + linkHeader += "\r\nLink: " + *it; + ++it; + } + H.SetHeader("Link", linkHeader); + } + } + } + H.setCORSHeaders(); + H.StartResponse("201", "Created", req, myConn); + H.Chunkify(sdpAnswer.toString(), myConn); + H.Chunkify(0, 0, myConn); + myConn.close(); + return; + }else{ + H.setCORSHeaders(); + H.StartResponse("403", "Not allowed", req, myConn); + H.Chunkify("Request not allowed", myConn); + H.Chunkify(0, 0, myConn); + return; + } + } + } + + // We don't implement PATCH requests + if (req.method == "PATCH"){ + H.setCORSHeaders(); + H.StartResponse("405", "PATCH not supported", req, myConn); + H.Chunkify("This endpoint only supports WHIP/WHEP/WISH POST requests or WebSocket connections", myConn); + H.Chunkify(0, 0, myConn); + return; + } + + //Generic response handler + H.setCORSHeaders(); + H.StartResponse("405", "Must POST or use websocket", req, myConn); + H.Chunkify("This endpoint only supports WHIP/WHEP/WISH POST requests or WebSocket connections", myConn); + H.Chunkify(0, 0, myConn); + } + // This function is executed when we receive a signaling data. // The signaling data contains commands that are used to start // an input or output stream. @@ -604,11 +746,13 @@ namespace Mist{ } bool OutWebRTC::dropPushTrack(uint32_t trackId, const std::string & dropReason){ - JSON::Value commandResult; - commandResult["type"] = "on_track_drop"; - commandResult["track"] = trackId; - commandResult["mediatype"] = M.getType(trackId); - webSock->sendFrame(commandResult.toString()); + if (!noSignalling){ + JSON::Value commandResult; + commandResult["type"] = "on_track_drop"; + commandResult["track"] = trackId; + commandResult["mediatype"] = M.getType(trackId); + webSock->sendFrame(commandResult.toString()); + } return Output::dropPushTrack(trackId, dropReason); } @@ -940,9 +1084,9 @@ namespace Mist{ sdpAnswer.setDirection("recvonly"); // start our receive thread (handles STUN, DTLS, RTP input) - webRTCInputOutputThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL); rtcpTimeoutInMillis = Util::bootMS() + 2000; rtcpKeyFrameTimeoutInMillis = Util::bootMS() + 2000; + webRTCInputOutputThread = new tthread::thread(webRTCInputOutputThreadFunc, NULL); idleInterval = 1000; @@ -1903,7 +2047,7 @@ namespace Mist{ //Do not reduce under 32 kbps if (videoConstraint < 1024*32){videoConstraint = 1024*32;} - if (videoConstraint != preConstraint){ + if (!noSignalling && videoConstraint != preConstraint){ INFO_MSG("Reduced video bandwidth maximum to %" PRIu32 " because average loss is %.2f", videoConstraint, curr_avg_loss); JSON::Value commandResult; commandResult["type"] = "on_video_bitrate"; diff --git a/src/output/output_webrtc.h b/src/output/output_webrtc.h index 8fa6d58c..a999fce3 100644 --- a/src/output/output_webrtc.h +++ b/src/output/output_webrtc.h @@ -132,6 +132,8 @@ namespace Mist{ virtual void sendHeader(); virtual void sendNext(); virtual void onWebsocketFrame(); + virtual void respondHTTP(const HTTP::Parser & req, bool headersOnly); + virtual void preHTTP(){} virtual void preWebsocketConnect(); virtual bool dropPushTrack(uint32_t trackId, const std::string & dropReason); void onIdle(); @@ -145,8 +147,10 @@ namespace Mist{ void onRTPPacketizerHasRTPPacket(const char *data, size_t nbytes); void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes); virtual void connStats(uint64_t now, Comms::Connections &statComm); - + inline virtual bool keepGoing(){return config->is_active && (noSignalling || myConn);} + virtual void requestHandler(); private: + bool noSignalling; uint64_t lastRecv; uint64_t lastPackMs; uint64_t totalPkts;