From dd2382e858f5460afb289633aa331270d8d08186 Mon Sep 17 00:00:00 2001 From: Marco Date: Thu, 1 Jul 2021 13:15:40 +0200 Subject: [PATCH] Added SDP output --- CMakeLists.txt | 1 + lib/sdp.cpp | 91 ++++++++++---- lib/sdp.h | 4 +- lib/socket.cpp | 6 + lib/socket.h | 1 + src/output/output_rtsp.cpp | 93 ++++++--------- src/output/output_rtsp.h | 1 + src/output/output_sdp.cpp | 236 +++++++++++++++++++++++++++++++++++++ src/output/output_sdp.h | 34 ++++++ 9 files changed, 383 insertions(+), 84 deletions(-) create mode 100644 src/output/output_sdp.cpp create mode 100644 src/output/output_sdp.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e6cc96a6..8bdb2576 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -562,6 +562,7 @@ makeOutput(CMAF cmaf http)#LTS makeOutput(EBML ebml) makeOutput(RTSP rtsp)#LTS makeOutput(WAV wav)#LTS +makeOutput(SDP sdp http) add_executable(MistProcFFMPEG ${BINARY_DIR}/mist/.headers diff --git a/lib/sdp.cpp b/lib/sdp.cpp index a6ae716b..091a9e26 100644 --- a/lib/sdp.cpp +++ b/lib/sdp.cpp @@ -7,6 +7,7 @@ #include "sdp.h" #include "url.h" #include "util.h" +#include //Dynamic types we hardcode: //96 = AAC @@ -58,7 +59,8 @@ namespace SDP{ } /// Gets the SDP contents for sending out a particular given DTSC::Track. - std::string mediaDescription(const DTSC::Meta *meta, size_t tid){ + /// \param port UDP port we are sending data to. Defaults to 0 + std::string mediaDescription(const DTSC::Meta *meta, size_t tid, uint64_t port){ const DTSC::Meta &M = *meta; std::stringstream mediaDesc; @@ -68,7 +70,7 @@ namespace SDP{ if (codec == "H264"){ MP4::AVCC avccbox; avccbox.setPayload(init); - mediaDesc << "m=video 0 RTP/AVP 97\r\n" + mediaDesc << "m=video " << port << " RTP/AVP 97\r\n" "a=rtpmap:97 H264/90000\r\n" "a=cliprect:0,0," << M.getHeight(tid) << "," << M.getWidth(tid) << "\r\na=framesize:97 " @@ -94,7 +96,7 @@ namespace SDP{ << ((double)M.getFpks(tid)) / 1000.0 << "\r\na=control:track" << tid << "\r\n"; }else if (codec == "HEVC"){ h265::initData iData(init); - mediaDesc << "m=video 0 RTP/AVP 104\r\n" + mediaDesc << "m=video " << port << " RTP/AVP 104\r\n" "a=rtpmap:104 H265/90000\r\n" "a=cliprect:0,0," << M.getHeight(tid) << "," << M.getWidth(tid) << "\r\na=framesize:104 " @@ -126,7 +128,7 @@ namespace SDP{ mediaDesc << "\r\na=framerate:" << ((double)M.getFpks(tid)) / 1000.0 << "\r\na=control:track" << tid << "\r\n"; }else if (codec == "VP8"){ - mediaDesc << "m=video 0 RTP/AVP 98\r\n" + mediaDesc << "m=video " << port << " RTP/AVP 98\r\n" "a=rtpmap:98 VP8/90000\r\n" "a=cliprect:0,0," << M.getHeight(tid) << "," << M.getWidth(tid) << "\r\na=framesize:98 " @@ -134,7 +136,7 @@ namespace SDP{ << "a=framerate:" << ((double)M.getFpks(tid)) / 1000.0 << "\r\n" << "a=control:track" << tid << "\r\n"; }else if (codec == "VP9"){ - mediaDesc << "m=video 0 RTP/AVP 99\r\n" + mediaDesc << "m=video " << port << " RTP/AVP 99\r\n" "a=rtpmap:99 VP8/90000\r\n" "a=cliprect:0,0," << M.getHeight(tid) << "," << M.getWidth(tid) << "\r\na=framesize:99 " @@ -142,13 +144,13 @@ namespace SDP{ << "a=framerate:" << ((double)M.getFpks(tid)) / 1000.0 << "\r\n" << "a=control:track" << tid << "\r\n"; }else if (codec == "MPEG2"){ - mediaDesc << "m=video 0 RTP/AVP 32\r\n" + mediaDesc << "m=video " << port << " RTP/AVP 32\r\n" "a=cliprect:0,0," << M.getHeight(tid) << "," << M.getWidth(tid) << "\r\na=framesize:32 " << M.getWidth(tid) << '-' << M.getHeight(tid) << "\r\na=framerate:" << ((double)M.getFpks(tid)) / 1000.0 << "\r\na=control:track" << tid << "\r\n"; }else if (codec == "AAC"){ - mediaDesc << "m=audio 0 RTP/AVP 96" + mediaDesc << "m=audio " << port << " RTP/AVP 96" << "\r\n" "a=rtpmap:96 mpeg4-generic/" << M.getRate(tid) << "/" << M.getChannels(tid) @@ -162,53 +164,53 @@ namespace SDP{ "a=control:track" << tid << "\r\n"; }else if (codec == "MP3" || codec == "MP2"){ - mediaDesc << "m=" << M.getType(tid) << " 0 RTP/AVP 14" + mediaDesc << "m=" << M.getType(tid) << " " << port << " RTP/AVP 14" << "\r\n" "a=rtpmap:14 MPA/90000/" << M.getChannels(tid) << "\r\n" << "a=control:track" << tid << "\r\n"; }else if (codec == "AC3"){ - mediaDesc << "m=audio 0 RTP/AVP 100" + mediaDesc << "m=audio " << port << " RTP/AVP 100" << "\r\n" "a=rtpmap:100 AC3/" << M.getRate(tid) << "/" << M.getChannels(tid) << "\r\n" << "a=control:track" << tid << "\r\n"; }else if (codec == "ALAW"){ if (M.getChannels(tid) == 1 && M.getRate(tid) == 8000){ - mediaDesc << "m=audio 0 RTP/AVP 8" + mediaDesc << "m=audio " << port << " RTP/AVP 8" << "\r\n"; }else{ - mediaDesc << "m=audio 0 RTP/AVP 101" + mediaDesc << "m=audio " << port << " RTP/AVP 101" << "\r\n"; mediaDesc << "a=rtpmap:101 PCMA/" << M.getRate(tid) << "/" << M.getChannels(tid) << "\r\n"; } mediaDesc << "a=control:track" << tid << "\r\n"; }else if (codec == "ULAW"){ if (M.getChannels(tid) == 1 && M.getRate(tid) == 8000){ - mediaDesc << "m=audio 0 RTP/AVP 0" + mediaDesc << "m=audio " << port << " RTP/AVP 0" << "\r\n"; }else{ - mediaDesc << "m=audio 0 RTP/AVP 105" + mediaDesc << "m=audio " << port << " RTP/AVP 105" << "\r\n"; mediaDesc << "a=rtpmap:105 PCMU/" << M.getRate(tid) << "/" << M.getChannels(tid) << "\r\n"; } mediaDesc << "a=control:track" << tid << "\r\n"; }else if (codec == "PCM"){ if (M.getSize(tid) == 16 && M.getChannels(tid) == 2 && M.getRate(tid) == 44100){ - mediaDesc << "m=audio 0 RTP/AVP 10" + mediaDesc << "m=audio " << port << " RTP/AVP 10" << "\r\n"; }else if (M.getSize(tid) == 16 && M.getChannels(tid) == 1 && M.getRate(tid) == 44100){ - mediaDesc << "m=audio 0 RTP/AVP 11" + mediaDesc << "m=audio " << port << " RTP/AVP 11" << "\r\n"; }else{ - mediaDesc << "m=audio 0 RTP/AVP 103" + mediaDesc << "m=audio " << port << " RTP/AVP 103" << "\r\n"; mediaDesc << "a=rtpmap:103 L" << M.getSize(tid) << "/" << M.getRate(tid) << "/" << M.getChannels(tid) << "\r\n"; } mediaDesc << "a=control:track" << tid << "\r\n"; }else if (codec == "opus"){ - mediaDesc << "m=audio 0 RTP/AVP 102" + mediaDesc << "m=audio " << port << " RTP/AVP 102" << "\r\n" "a=rtpmap:102 opus/" << M.getRate(tid) << "/" << M.getChannels(tid) << "\r\n" @@ -243,13 +245,43 @@ namespace SDP{ } } - /// Sets the TCP/UDP connection details from a given transport string. - /// Sets the transportString member to the current transport string on success. - /// \param host The host connecting to us. - /// \source The source identifier. - /// \return True if successful, false otherwise. - bool Track::parseTransport(const std::string &transport, const std::string &host, - const std::string &source, const DTSC::Meta *M, size_t tid){ + /// Prepares data and RTP control UDP sockets for multicast delivery + /// \param dest: IP address to where we want to push RTP packets to + /// \param port: port which will receive DATA. port+1 will receive RTCP + /// \return True if we have bound the correct ports, False if we need to abort + bool Track::prepareSockets(const std::string dest, uint32_t port){ + HIGH_MSG("Preparing sockets for pushing towards '%s' port '%" PRIu32 "'", dest.c_str(), port); + int sendbuff = 4 * 1024 * 1024; + setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + data.setSocketFamily(AF_UNSPEC); + rtcp.setSocketFamily(AF_UNSPEC); + data.SetDestination(dest, 1337); + rtcp.SetDestination(dest, 1337); + + // If no explicit port was set, we will bind to anything that works for us + // We only need to bind the RTCP UDP port, since we are not receiving data + if (!port){ + int retries = 0; + while (!portB && retries < 10){ + portB = rtcp.bind(0); + } + portA = portB-1; + }else{ + portA = port; + portB = rtcp.bind(portA + 1); + } + data.SetDestination(dest, portA); + rtcp.SetDestination(dest, portB); + + if(portB != portA + 1 || !portA || !portB){ + return false; + } + return true; + } + + /// \brief Determines the codec of the current track + bool Track::setPackCodec(const DTSC::Meta *M, size_t tid){ std::string codec = M->getCodec(tid); if (codec == "H264"){ pack = RTP::Packet(97, 1, 0, mySSRC); @@ -293,6 +325,17 @@ namespace SDP{ ERROR_MSG("Unsupported codec %s for RTSP on track %zu", codec.c_str(), tid); return false; } + return true; + } + + /// Sets the TCP/UDP connection details from a given transport string. + /// Sets the transportString member to the current transport string on success. + /// \param host The host connecting to us. + /// \source The source identifier. + /// \return True if successful, false otherwise. + bool Track::parseTransport(const std::string &transport, const std::string &host, + const std::string &source, const DTSC::Meta *M, size_t tid){ + if (!setPackCodec(M, tid)){return false;} if (transport.find("TCP") != std::string::npos){ std::string chanE = transport.substr(transport.find("interleaved=") + 12, (transport.size() - transport.rfind('-') - 1)); // extract channel ID diff --git a/lib/sdp.h b/lib/sdp.h index 4560aeca..ca199442 100644 --- a/lib/sdp.h +++ b/lib/sdp.h @@ -19,6 +19,8 @@ namespace SDP{ bool parseTransport(const std::string &transport, const std::string &host, const std::string &source, const DTSC::Meta *M, size_t tid); std::string rtpInfo(const DTSC::Meta &M, size_t tid, const std::string &source, uint64_t currentTime); + bool prepareSockets(const std::string dest, uint32_t port); + bool setPackCodec(const DTSC::Meta *M, size_t tid); public: Socket::UDPConnection data; @@ -60,5 +62,5 @@ namespace SDP{ std::map tracks; ///< List of selected tracks with SDP-specific session data. }; - std::string mediaDescription(const DTSC::Meta *M, size_t tid); + std::string mediaDescription(const DTSC::Meta *M, size_t tid, uint64_t port = 0); }// namespace SDP diff --git a/lib/socket.cpp b/lib/socket.cpp index 868c0ae9..ade6d75e 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -1692,6 +1692,12 @@ Socket::UDPConnection::~UDPConnection(){ } } +// Sets socket family type (to IPV4 or IPV6) (AF_INET=2, AF_INET6=10) +void Socket::UDPConnection::setSocketFamily(int AF_TYPE){\ + INFO_MSG("Switching UDP socket from %s to %s", addrFam(family), addrFam(AF_TYPE)); + family = AF_TYPE; +} + /// Stores the properties of the receiving end of this UDP socket. /// This will be the receiving end for all SendNow calls. void Socket::UDPConnection::SetDestination(std::string destIp, uint32_t port){ diff --git a/lib/socket.h b/lib/socket.h index e4af9825..a85fbf39 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -223,5 +223,6 @@ namespace Socket{ void SendNow(const std::string &data); void SendNow(const char *data); void SendNow(const char *data, size_t len); + void setSocketFamily(int AF_TYPE); }; }// namespace Socket diff --git a/src/output/output_rtsp.cpp b/src/output/output_rtsp.cpp index c9d233e1..87de58ac 100644 --- a/src/output/output_rtsp.cpp +++ b/src/output/output_rtsp.cpp @@ -172,41 +172,14 @@ namespace Mist{ if (reqUrl.size() && lastAnnounce + 5 < Util::bootSecs()){ INFO_MSG("Sending announce"); lastAnnounce = Util::bootSecs(); - std::stringstream transportString; - transportString.precision(3); - transportString << std::fixed - << "v=0\r\n" - "o=- " - << Util::getMS() - << " 1 IN IP4 127.0.0.1\r\n" - "s=" - << streamName - << "\r\n" - "c=IN IP4 0.0.0.0\r\n" - "i=" - << streamName - << "\r\n" - "u=" - << reqUrl - << "\r\n" - "t=0 0\r\n" - "a=tool:" APPIDENT "\r\n" - "a=type:broadcast\r\n" - "a=control:*\r\n" - << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-" - << ((double)endTime()) / 1000.0 << "\r\n"; - - std::set validTracks = M.getValidTracks(); - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - transportString << SDP::mediaDescription(&M, *it); - } + std::string transportString = generateSDP(reqUrl); HTTP_S.Clean(); HTTP_S.SetHeader("Content-Type", "application/sdp"); HTTP_S.SetHeader("Content-Base", reqUrl); HTTP_S.method = "ANNOUNCE"; HTTP_S.url = reqUrl; HTTP_S.protocol = "RTSP/1.0"; - HTTP_S.SendRequest(myConn, transportString.str()); + HTTP_S.SendRequest(myConn, transportString); HTTP_R.Clean(); } } @@ -320,38 +293,11 @@ namespace Mist{ reqUrl = HTTP::URL(HTTP_R.url).link(streamName).getProxyUrl(); initialize(); userSelect.clear(); - std::stringstream transportString; - transportString.precision(3); - transportString << std::fixed - << "v=0\r\n" - "o=- " - << Util::getMS() - << " 1 IN IP4 127.0.0.1\r\n" - "s=" - << streamName - << "\r\n" - "c=IN IP4 0.0.0.0\r\n" - "i=" - << streamName - << "\r\n" - "u=" - << reqUrl - << "\r\n" - "t=0 0\r\n" - "a=tool:" APPIDENT "\r\n" - "a=type:broadcast\r\n" - "a=control:*\r\n" - << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-" - << ((double)endTime()) / 1000.0 << "\r\n"; - - std::set validTracks = Util::wouldSelect(M, targetParams, capa, UA); - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ - transportString << SDP::mediaDescription(&M, *it); - } - HIGH_MSG("Reply: %s", transportString.str().c_str()); + std::string transportString = generateSDP(reqUrl); + HIGH_MSG("Reply: %s", transportString.c_str()); HTTP_S.SetHeader("Content-Base", reqUrl); HTTP_S.SetHeader("Content-Type", "application/sdp"); - HTTP_S.SetBody(transportString.str()); + HTTP_S.SetBody(transportString); HTTP_S.SendResponse("200", "OK", myConn); HTTP_R.Clean(); continue; @@ -479,6 +425,35 @@ namespace Mist{ return false; } + /// \param reqUrl: URI to additional session information + std::string OutRTSP::generateSDP(std::string reqUrl){ + std::stringstream transportString; + transportString.precision(3); + // Add session description & time description + transportString << std::fixed << + "v=0\r\n" + "o=- " << Util::getMS() << " 1 IN IP4 127.0.0.1\r\n" + "s=" << streamName << "\r\n" + "c=IN IP4 0.0.0.0\r\n" + "i=" << streamName << "\r\n" + "u=" << reqUrl << "\r\n" + "t=0 0\r\n" + "a=tool:" APPIDENT "\r\n" + "a=type:broadcast\r\n" + "a=control:*\r\n"; + if (M.getLive()){ + transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-\r\n"; + }else{ + transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-" << ((double)endTime()) / 1000.0 << "\r\n"; + } + // Add Media description + std::set validTracks = M.getValidTracks(); + for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); ++it){ + transportString << SDP::mediaDescription(&M, *it); + } + return transportString.str(); + } + /// Attempts to parse TCP RTP packets at the beginning of the header. /// Returns whether it is safe to attempt to read HTTP/RTSP packets (true) or not (false). bool OutRTSP::handleTCP(){ diff --git a/src/output/output_rtsp.h b/src/output/output_rtsp.h index 67944736..98bb0a86 100644 --- a/src/output/output_rtsp.h +++ b/src/output/output_rtsp.h @@ -30,6 +30,7 @@ namespace Mist{ int64_t packetOffset; bool expectTCP; bool checkPort; + std::string generateSDP(std::string reqUrl); bool handleTCP(); void handleUDP(); }; diff --git a/src/output/output_sdp.cpp b/src/output/output_sdp.cpp new file mode 100644 index 00000000..6baf9067 --- /dev/null +++ b/src/output/output_sdp.cpp @@ -0,0 +1,236 @@ +#include "output_sdp.h" +#include "lib/defines.h" +#include "mist/defines.h" +#include "src/output/output.h" +#include +#include + +namespace Mist{ + Socket::Connection *mainConn = 0; + + OutSDP::OutSDP(Socket::Connection &conn) : HTTPOutput(conn){ + mainConn = &conn; + exitOnNoRTCP = false; + } + + /// Function used to send RTP packets over UDP + ///\param socket A UDP Connection pointer, sent as a void*, to keep portability. + ///\param data The RTP Packet that needs to be sent + ///\param len The size of data + ///\param channel Not used here, but is kept for compatibility with sendTCP + void sendUDP(void *socket, const char *data, size_t len, uint8_t){ + ((Socket::UDPConnection *)socket)->SendNow(data, len); + if (mainConn){mainConn->addUp(len);} + } + + /// \brief Initializes the SDP state + /// \param port: Each track will have a data and RTCP port. + /// These are consecutive and start at startPort + /// \param targetIP: (Multicast supported) IP address we are pushing the stream towards + void OutSDP::initTracks(uint32_t & port, std::string targetIP){ + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ + SDP::Track newTrk; + newTrk.setPackCodec(&M, it->first); + if (!newTrk.prepareSockets(targetIP, port)){ + FAIL_MSG("Could not bind required UDP sockets. Aborting push..."); + parseData = false; + wantRequest = true; + return; + } + // If a starting port was set, increment by 2 for the next track + if (port){ + port+=2; + } + sdpState.tracks.insert(std::pair(it->first, newTrk)); + } + } + + void OutSDP::sendHeader(){ + if (isRecording()){ + std::string targetIP; + if (targetParams["targetIP"].size()){ + targetIP = targetParams["targetIP"]; + } else { + targetIP = "234.234.234.234"; + } + // If not set, default to random ports we can bind + uint32_t port = 0; + if (targetParams["startPort"].size()){ + port = atoll(targetParams["startPort"].c_str()); + } + // Add meta tracks to sdpState + initTracks(port, targetIP); + myConn.SendNow(generateSDP(targetIP, streamName)); + INFO_MSG("Pushing %zu tracks towards target address '%s'.", sdpState.tracks.size(), targetIP.c_str()); + } + sentHeader = true; + } + + /// \brief Generates an SDP file which clients can use to connect to the stream + /// initTracks should be called before this function is called, to make sure we have the right ports + /// \param targetAddress: the (multicast supported) target, where we will push the stream towards + /// \param streamName: the name of the stream we are serving + std::string OutSDP::generateSDP(std::string targetAddress, std::string streamName){ + prevRTCP = Util::bootSecs(); + std::stringstream sdpAsString; + sdpAsString.precision(3); + // Add session description & time description + sdpAsString << std::fixed << + "v=0\r\n" + "o=- " << Util::getMS() << " 1 IN IP4 " << targetAddress << "\r\n" + "s=" << streamName << "\r\n" + "i=" << streamName << "\r\n" + "c=IN IP4 " << targetAddress << "\r\n" + "t=0 0\r\n" + "a=tool:" APPIDENT "\r\n" + "a=recvonly\r\n" + "a=type:broadcast\r\n"; + // Add Media description + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); ++it){ + DONTEVEN_MSG("Track '%zu' should be pushing data towards UDP port '%" PRIu32 "'", it->first, sdpState.tracks[it->first].portA); + sdpAsString << SDP::mediaDescription(&M, it->first, sdpState.tracks.at(it->first).portA); + } + return sdpAsString.str(); + } + + void OutSDP::init(Util::Config *cfg){ + HTTPOutput::init(cfg); + capa["name"] = "SDP"; + capa["friendly"] = "RTP streams using SDP"; + capa["desc"] = "Make streams available as a RTP stream, using the SDP"; + capa["url_rel"] = "/$.sdp"; + capa["url_match"] = "/$.sdp"; + capa["codecs"][0u][0u].append("+H264"); + capa["codecs"][0u][1u].append("+HEVC"); + capa["codecs"][0u][2u].append("+MPEG2"); + capa["codecs"][0u][3u].append("+VP8"); + capa["codecs"][0u][4u].append("+VP9"); + capa["codecs"][0u][5u].append("+AAC"); + capa["codecs"][0u][6u].append("+MP3"); + capa["codecs"][0u][7u].append("+AC3"); + capa["codecs"][0u][8u].append("+ALAW"); + capa["codecs"][0u][9u].append("+ULAW"); + capa["codecs"][0u][10u].append("+PCM"); + capa["codecs"][0u][11u].append("+opus"); + capa["codecs"][0u][12u].append("+MP2"); + + capa["methods"][0u]["handler"] = "http"; + capa["methods"][0u]["type"] = "html5/application/sdp"; + capa["methods"][0u]["url_rel"] = "/$.sdp"; + capa["methods"][0u]["priority"] = 11; + + capa["push_urls"].append("/*.sdp"); + + JSON::Value opt; + opt["arg"] = "string"; + opt["default"] = ""; + opt["arg_num"] = 1; + opt["help"] = "Target filename to store SDP file as, or - for stdout."; + cfg->addOption("target", opt); + + } + + OutSDP::~OutSDP(){ + // Remove written SDP file, if it was set as the output target + if (isRecording()){ + HTTP::URL target(config->getString("target")); + if(target.getExt() == "sdp"){ + INFO_MSG("Removing .SDP file located at '%s'", target.getFilePath().c_str()); + std::remove(target.getFilePath().c_str()); + } + } + } + + void OutSDP::sendNext(){ + char *dataPointer = 0; + size_t dataLen = 0; + thisPacket.getString("data", dataPointer, dataLen); + uint64_t timestamp = thisPacket.getTime(); + + void *socket = 0; + void (*callBack)(void *, const char *, size_t, uint8_t) = 0; + + // Get data socket and send RTCP + if (sdpState.tracks[thisIdx].channel == -1){ + socket = &sdpState.tracks[thisIdx].data; + callBack = sendUDP; + if (Util::bootSecs() != sdpState.tracks[thisIdx].rtcpSent){ + sdpState.tracks[thisIdx].pack.setTimestamp(timestamp * SDP::getMultiplier(&M, thisIdx)); + sdpState.tracks[thisIdx].rtcpSent = Util::bootSecs(); + sdpState.tracks[thisIdx].pack.sendRTCP_SR(&sdpState.tracks[thisIdx].rtcp, sendUDP); + } + }else{ + FAIL_MSG("RTP SDP output does not support TCP. No data will be sent to the target address"); + return; + } + uint64_t offset = thisPacket.getInt("offset"); + sdpState.tracks[thisIdx].pack.setTimestamp((timestamp + offset) * SDP::getMultiplier(&M, thisIdx)); + sdpState.tracks[thisIdx].pack.sendData(socket, callBack, dataPointer, dataLen, + sdpState.tracks[thisIdx].channel, meta.getCodec(thisIdx)); + + // Update last RTCP received variable + if (exitOnNoRTCP){ + checkForRTCP(thisIdx); + // Exit if no RTCP packets received for 30 seconds + if (Util::bootSecs() - prevRTCP > 30){ + parseData = false; + wantRequest = true; + WARN_MSG("Shutting down because we have not received RTCP receiver reports for 30 seconds."); + } + } + } + + /// Reads RTCP packets sent back by viewers + void OutSDP::checkForRTCP(uint64_t thisIdx){ + Socket::UDPConnection &socket = sdpState.tracks[thisIdx].rtcp; + while (socket.Receive()){ + prevRTCP = Util::bootSecs(); + INFO_MSG("received RTCP packet '%u'", prevRTCP); + if (myConn){ + myConn.addDown(sdpState.tracks[thisIdx].rtcp.data.size()); + }; + RTP::Packet pack(sdpState.tracks[thisIdx].rtcp.data, sdpState.tracks[thisIdx].rtcp.data.size()); + } + } + + void OutSDP::onHTTP(){ + std::string targetIP; + uint32_t port; + + // Init latest RTCP packet time on current time + prevRTCP = Util::bootSecs(); + if (myConn.getHost().size()){ + targetIP = myConn.getHost(); + } else { + targetIP = "234.234.234.234"; + } + // If not set, default to random ports we can bind + if (targetParams["startPort"].size()){ + port = atoll(targetParams["startPort"].c_str()); + } else { + port = 0; + } + // When we start an RTP stream via a HTTP request, close it when we no longer receive RTCP packets + exitOnNoRTCP = true; + + // Add meta tracks to sdpState + initTracks(port, targetIP); + INFO_MSG("Pushing %zu tracks towards target address '%s')", sdpState.tracks.size(), targetIP.c_str()); + + // Send SDP file back + H.setCORSHeaders(); + H.SetBody(generateSDP(targetIP, streamName)); + H.SendResponse("200", "OK", myConn); + H.CleanPreserveHeaders(); + // No more requests needed. Keep alive until we are no longer receiving RTCP packets + parseData = true; + wantRequest = false; + } + + /// Disconnects the user + bool OutSDP::onFinish(){ + INFO_MSG("Shutting down..."); + if (myConn){myConn.close();} + return false; + } +}// namespace Mist diff --git a/src/output/output_sdp.h b/src/output/output_sdp.h new file mode 100644 index 00000000..2d4b3368 --- /dev/null +++ b/src/output/output_sdp.h @@ -0,0 +1,34 @@ +#include "output_http.h" +#include +#include +#include +#include +#include +#include + +namespace Mist{ + class OutSDP : public HTTPOutput{ + public: + OutSDP(Socket::Connection &conn); + ~OutSDP(); + static void init(Util::Config *cfg); + void onHTTP(); + void sendNext(); + void sendHeader(); + bool onFinish(); + + private: + void initTracks(uint32_t & port, std::string targetIP); + void checkForRTCP(uint64_t thisIdx); + std::string generateSDP(std::string targetAddress, std::string streamName); + SDP::State sdpState; + uint32_t prevRTCP; + bool exitOnNoRTCP; + bool isFileTarget(){ + INFO_MSG("Checking file target! %s", isRecording()?"yes":"no"); + return isRecording(); + } + }; +}// namespace Mist + +typedef Mist::OutSDP mistOut;