From 3e73508a6a575a3ce6d94332c12430fb3e567338 Mon Sep 17 00:00:00 2001 From: Marco Date: Fri, 22 Jan 2021 18:08:44 +0100 Subject: [PATCH] Added SDP input --- CMakeLists.txt | 1 + lib/rtp.cpp | 98 ++++++------ lib/rtp.h | 3 +- lib/sdp.cpp | 114 ++++++++++++++ lib/sdp.h | 6 + lib/socket.cpp | 1 + src/input/input_sdp.cpp | 324 ++++++++++++++++++++++++++++++++++++++++ src/input/input_sdp.h | 70 +++++++++ 8 files changed, 571 insertions(+), 46 deletions(-) create mode 100644 src/input/input_sdp.cpp create mode 100644 src/input/input_sdp.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8bdb2576..a2569466 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -486,6 +486,7 @@ makeInput(Playlist playlist)#LTS makeInput(Balancer balancer)#LTS makeInput(RTSP rtsp)#LTS makeInput(SRT srt)#LTS +makeInput(SDP sdp) if(SRT_LIB) makeInput(TSSRT tssrt with_srt)#LTS diff --git a/lib/rtp.cpp b/lib/rtp.cpp index d5353cc5..b490050b 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -506,6 +506,7 @@ namespace RTP{ rtpSeq = pSNo - 5; first = false; } + DONTEVEN_MSG("Received packet #%u, current packet is #%u", pSNo, rtpSeq); if (preBuffer){ //If we've buffered the first 5 packets, assume we have the first one known if (packBuffer.size() >= 5){ @@ -583,6 +584,7 @@ namespace RTP{ packCount = 0; lastSeq = 0; vp8BufferHasKeyframe = false; + curPicParameterSetId = 0; } void toDTSC::setProperties(const uint64_t track, const std::string &c, const std::string &t, @@ -601,7 +603,7 @@ namespace RTP{ MP4::AVCC avccbox; avccbox.setPayload(init); spsData.assign(avccbox.getSPS(), avccbox.getSPSLen()); - ppsData.assign(avccbox.getPPS(), avccbox.getPPSLen()); + ppsData[curPicParameterSetId].assign(avccbox.getPPS(), avccbox.getPPSLen()); h264::sequenceParameterSet sps(spsData.data(), spsData.size()); h264::SPSMeta hMeta = sps.getCharacteristics(); fps = hMeta.fps; @@ -669,6 +671,12 @@ namespace RTP{ } } } + // When there are B-frames, the firstTime can be higher than the current time + // causing msTime to become negative and thus overflow + if (firstTime > pTime + 1){ + WARN_MSG("firstTime was higher than current packet time. Readjusting firsTime..."); + firstTime = pTime + 1; + } prevTime = pkt.getTimeStamp(); uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier + milliSync; char *pl = (char *)pkt.getPayload(); @@ -844,9 +852,13 @@ namespace RTP{ if (fps > 1){ // Assume a steady frame rate, clip the timestamp based on frame number. uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5; - while (frameNo < packCount){packCount--;} + if (frameNo < packCount){ + packCount = frameNo; + } // More than 32 frames behind? We probably skipped something, somewhere... - if ((frameNo - packCount) > 32){packCount = frameNo;} + if ((frameNo - packCount) > 32){ + packCount = frameNo; + } // After some experimentation, we found that the time offset is the difference between the // frame number and the packet counter, times the frame rate in ms offset = (frameNo - packCount) * (1000.0 / fps); @@ -981,9 +993,13 @@ namespace RTP{ if (fps > 1){ // Assume a steady frame rate, clip the timestamp based on frame number. uint64_t frameNo = (currH264Time / (1000.0 / fps)) + 0.5; - while (frameNo < packCount){packCount--;} + if (frameNo < packCount){ + packCount = frameNo; + } // More than 32 frames behind? We probably skipped something, somewhere... - if ((frameNo - packCount) > 32){packCount = frameNo;} + if ((frameNo - packCount) > 32){ + packCount = frameNo; + } // After some experimentation, we found that the time offset is the difference between the // frame number and the packet counter, times the frame rate in ms offset = (frameNo - packCount) * (1000.0 / fps); @@ -1026,58 +1042,50 @@ namespace RTP{ spsData.assign(buffer + 4, len - 4); h264::SPSMeta hMeta = sps.getCharacteristics(); fps = hMeta.fps; - - MP4::AVCC avccBox; - avccBox.setVersion(1); - avccBox.setProfile(spsData[1]); - avccBox.setCompatibleProfiles(spsData[2]); - avccBox.setLevel(spsData[3]); - avccBox.setSPSCount(1); - avccBox.setSPS(spsData); - avccBox.setPPSCount(1); - avccBox.setPPS(ppsData); - std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize()); - if (newInit != init){ - init = newInit; - outInit(trackId, init); - } } return; case 8: // PPS - if (ppsData.size() != len - 4 || memcmp(buffer + 4, ppsData.data(), len - 4) != 0){ - if (!h264::ppsValidate(buffer+4, len-4)){ - WARN_MSG("Ignoring invalid PPS packet! (%" PRIu32 "b)", len-4); - return; - } - HIGH_MSG("Updated PPS from RTP data: %" PRIu32 "b", len-4); - ppsData.assign(buffer + 4, len - 4); - MP4::AVCC avccBox; - avccBox.setVersion(1); - avccBox.setProfile(spsData[1]); - avccBox.setCompatibleProfiles(spsData[2]); - avccBox.setLevel(spsData[3]); - avccBox.setSPSCount(1); - avccBox.setSPS(spsData); - avccBox.setPPSCount(1); - avccBox.setPPS(ppsData); - std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize()); - if (newInit != init){ - init = newInit; - outInit(trackId, init); + // Determine pic_parameter_set_id and check whether the PPS is new or updated + { + h264::ppsUnit PPS(buffer + 4, len - 4); + if (ppsData[PPS.picParameterSetId].size() != len - 4 || memcmp(buffer + 4, ppsData[PPS.picParameterSetId].data(), len - 4) != 0){ + if (!h264::ppsValidate(buffer+4, len-4)){ + WARN_MSG("Ignoring invalid PPS packet! (%" PRIu32 "b)", len-4); + return; + } + HIGH_MSG("Updated PPS with ID %li from RTP data", PPS.picParameterSetId); + ppsData[PPS.picParameterSetId].assign(buffer + 4, len - 4); } } return; case 5:{ - //If this is a keyframe and we have no buffer yet, prepend the SPS/PPS - if (!h264OutBuffer.size()){ + // We have a keyframe: prepend SPS/PPS if the pic_parameter_set_id changed or if this is the first keyframe + h264::codedSliceUnit keyPiece(buffer + 4, len - 4); + if (!h264OutBuffer.size() || keyPiece.picParameterSetId != curPicParameterSetId){ + curPicParameterSetId = keyPiece.picParameterSetId; + // Update meta init data if needed + MP4::AVCC avccBox; + avccBox.setVersion(1); + avccBox.setProfile(spsData[1]); + avccBox.setCompatibleProfiles(spsData[2]); + avccBox.setLevel(spsData[3]); + avccBox.setSPSCount(1); + avccBox.setSPS(spsData); + avccBox.setPPSCount(1); + avccBox.setPPS(ppsData[curPicParameterSetId]); + std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize()); + if (newInit != init){ + init = newInit; + outInit(trackId, init); + } + // Prepend SPS/PPS char sizeBuffer[4]; Bit::htobl(sizeBuffer, spsData.size()); h264OutBuffer.append(sizeBuffer, 4); h264OutBuffer.append(spsData.data(), spsData.size()); - - Bit::htobl(sizeBuffer, ppsData.size()); + Bit::htobl(sizeBuffer, ppsData[curPicParameterSetId].size()); h264OutBuffer.append(sizeBuffer, 4); - h264OutBuffer.append(ppsData.data(), ppsData.size()); + h264OutBuffer.append(ppsData[curPicParameterSetId].data(), ppsData[curPicParameterSetId].size()); } //Note: no return, we still want to buffer the packet itself, below! } diff --git a/lib/rtp.h b/lib/rtp.h index 5af9fa99..dc782936 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -179,7 +179,8 @@ namespace RTP{ void handleH264Single(uint64_t ts, const char *buffer, const uint32_t len, bool isKey); void handleH264Multi(uint64_t ts, char *buffer, const uint32_t len); std::string spsData; ///< SPS for H264 - std::string ppsData; ///< PPS for H264 + uint8_t curPicParameterSetId; + std::map ppsData; ///< PPS for H264 void handleVP8(uint64_t msTime, const char *buffer, const uint32_t len, bool missed, bool hasPadding); Util::ResizeablePointer vp8FrameBuffer; ///< Stores successive VP8 payload data. We always start with the first ///< partition; but we might be missing other partitions when they were diff --git a/lib/sdp.cpp b/lib/sdp.cpp index 091a9e26..d8136c6e 100644 --- a/lib/sdp.cpp +++ b/lib/sdp.cpp @@ -398,6 +398,60 @@ namespace SDP{ return true; } + /// Tries to bind a RTP/RTCP UDP port pair + /// \param portInfo port/#ports as found in SDP file + /// \param hostInfo host address + bool Track::bindUDPPort(std::string portInfo, std::string hostInfo){ + uint32_t portRTP, portRTCP; + + if (portInfo == "" || hostInfo == ""){ + WARN_MSG("Can not setup transport to address %s:%s", hostInfo.c_str(), portInfo.c_str()); + return false; + } + + // Extract port numbers from input string + size_t tempPos; + tempPos = portInfo.find('/'); + if (tempPos != std::string::npos){ + // TODO https://tools.ietf.org/html/rfc4566#section-5.14 + // bind more ports if theres a /, which indicates the amount of port pairs + WARN_MSG("Does not support more than one RTP/RTCP port pair"); + portInfo = portInfo.substr(0, tempPos); + } + std::istringstream ( portInfo ) >> portRTP; + portRTCP = portRTP + 1; + + // During RTSP streams we get the transport info on setup + // in this case the port is set to 0 in the SDP file + if (!portRTP){ + return true; + } + + // Since default is set to IPV6, force to AF_UNSPEC + data.setSocketFamily(AF_UNSPEC); + rtcp.setSocketFamily(AF_UNSPEC); + // Test UDP ports + int sendbuff = 4 * 1024 * 1024; + data.SetDestination(hostInfo, portRTP); + setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + rtcp.SetDestination(hostInfo, portRTCP); + setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + // Bind sockets + portA = data.bind(portRTP, hostInfo); + if (portA != portRTP){ + FAIL_MSG("Server requested RTP port %u, which we couldn't bind", portRTP); + return false; + } + portB = rtcp.bind(portRTCP, hostInfo); + if (portB != portRTCP){ + FAIL_MSG("Server requested RTCP port %u, which we couldn't bind", portRTCP); + return false; + } + + return true; + } + + /// Gets the rtpInfo for a given DTSC::Track, source identifier and timestamp (in millis). std::string Track::rtpInfo(const DTSC::Meta &M, size_t tid, const std::string &source, uint64_t currentTime){ std::stringstream rInfo; @@ -414,8 +468,14 @@ namespace SDP{ void State::parseSDP(const std::string &sdp){ DONTEVEN_MSG("Parsing %zu-byte SDP", sdp.size()); + if (!sdp.size()){ + FAIL_MSG("SDP buffer is empty!"); + return; + } std::stringstream ss(sdp); std::string to; + // (UDP) Host will be set when a c= line is read + std::string host = "127.0.0.1"; size_t tid = INVALID_TRACK_ID; bool nope = true; // true if we have no valid track to fill while (std::getline(ss, to, '\n')){ @@ -423,12 +483,39 @@ namespace SDP{ if (to.empty()){continue;} DONTEVEN_MSG("Parsing SDP line: %s", to.c_str()); + // Extract host IP from c= line + // c= + if (to.substr(0, 2) == "c="){ + // Strip c= + std::stringstream words(to.substr(2)); + std::string item; + size_t tempPos; + + // Strip nettype + getline(words, item, ' '); + // Strip addrtype + getline(words, item, ' '); + // Get connection address + getline(words, item, ' '); + // Strip TTL, which is appended as IP/TTL + tempPos = item.find('/'); + if (tempPos != std::string::npos){ + item = item.substr(0, tempPos); + } + host = item; + } + // All tracks start with a media line + // m= / ... if (to.substr(0, 2) == "m="){ nope = true; tid = myMeta->addTrack(); + + // Strip m= std::stringstream words(to.substr(2)); std::string item; + + // Get media type if (getline(words, item, ' ') && (item == "audio" || item == "video")){ myMeta->setType(tid, item); myMeta->setID(tid, tid); @@ -438,13 +525,22 @@ namespace SDP{ tracks.erase(tid); continue; } + + // Get port info and bind RTP/RTCP UDP pairs getline(words, item, ' '); + if (!tracks[tid].bindUDPPort(item, host) ){ + FAIL_MSG("Failed to bind ports for given port info: %s", item.c_str()); + } + + // Get transport protocol if (!getline(words, item, ' ') || item.substr(0, 7) != "RTP/AVP"){ WARN_MSG("Media transport not supported: %s", item.c_str()); myMeta->removeTrack(tid); tracks.erase(tid); continue; } + + // Get media format description if (getline(words, item, ' ')){ uint64_t avp_type = JSON::Value(item).asInt(); switch (avp_type){ @@ -774,4 +870,22 @@ namespace SDP{ tConv[track].addRTP(pkt); } + /// Re-inits internal variables and removes all tracks from meta + void State::reinitSDP(){ + tConv.clear(); + size_t trackID; + + for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { + trackID = myMeta->getID(it->first); + INFO_MSG("Removing track %zu:%s", it->first, myMeta->getTrackIdentifier(it->first).c_str()); + if (trackID == INVALID_TRACK_ID){ + WARN_MSG("TrackID was invalid"); + } + else{ + myMeta->removeTrack(it->first); + } + } + //myMeta->refresh(); + tracks.clear(); + } }// namespace SDP diff --git a/lib/sdp.h b/lib/sdp.h index ca199442..05b550bb 100644 --- a/lib/sdp.h +++ b/lib/sdp.h @@ -14,6 +14,8 @@ namespace SDP{ public: Track(); std::string generateTransport(uint32_t trackNo, const std::string &dest = "", bool TCPmode = true); + /// Tries to bind a RTP/RTCP UDP port pair + bool bindUDPPort(std::string portInfo, std::string hostInfo); std::string getParamString(const std::string ¶m) const; uint64_t getParamInt(const std::string ¶m) const; bool parseTransport(const std::string &transport, const std::string &host, @@ -55,6 +57,10 @@ namespace SDP{ size_t getTrackNoForChannel(uint8_t chan); size_t parseSetup(HTTP::Parser &H, const std::string &host, const std::string &source); void handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt); + // Sets up the transport from SDP data + bool parseTransport(const std::string &sdpString); + // Re-inits internal variables and removes all tracks from meta + void reinitSDP(); public: DTSC::Meta *myMeta; diff --git a/lib/socket.cpp b/lib/socket.cpp index ade6d75e..a9426e82 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -1701,6 +1701,7 @@ void Socket::UDPConnection::setSocketFamily(int AF_TYPE){\ /// Stores the properties of the receiving end of this UDP socket. /// This will be the receiving end for all SendNow calls. void Socket::UDPConnection::SetDestination(std::string destIp, uint32_t port){ + DONTEVEN_MSG("Setting destination to %s:%u", destIp.c_str(), port); // UDP sockets can switch between IPv4 and IPv6 on demand. // We change IPv4-mapped IPv6 addresses into IPv4 addresses for Windows-sillyness reasons. if (destIp.substr(0, 7) == "::ffff:"){destIp = destIp.substr(7);} diff --git a/src/input/input_sdp.cpp b/src/input/input_sdp.cpp new file mode 100644 index 00000000..694ccaa0 --- /dev/null +++ b/src/input/input_sdp.cpp @@ -0,0 +1,324 @@ +#include "input_sdp.h" + +// Will point to current InputSDP obj after constructor is called +Mist::InputSDP *classPointer = 0; +size_t bytesUp = 0; +// CB used to receive DTSC packets back from RTP sorter +void incomingPacket(const DTSC::Packet &pkt){ + classPointer->incoming(pkt); +} +void insertRTP(const uint64_t track, const RTP::Packet &p){ + classPointer->incomingRTP(track, p); +} + +/// Function used to send RTCP packets over UDP +///\param socket A UDP Connection pointer, sent as a void*, to keep portability. +///\param data The RTP Packet that needs to be sent +///\param len The size of data +///\param channel Not used here, but is kept for compatibility with sendTCP +void sendUDP(void *socket, const char *data, size_t len, uint8_t channel){ + ((Socket::UDPConnection *)socket)->SendNow(data, len); + bytesUp += len; +} + +namespace Mist{ + void InputSDP::incomingRTP(const uint64_t track, const RTP::Packet &p){ + sdpState.handleIncomingRTP(track, p); + } + + InputSDP::InputSDP(Util::Config *cfg) : Input(cfg){ + setPacketOffset = false; + packetOffset = 0; + sdpState.myMeta = &meta; + sdpState.incomingPacketCallback = incomingPacket; + classPointer = this; + standAlone = false; + hasBork = false; + bytesRead = 0; + count = 0; + capa["name"] = "SDP"; + capa["desc"] = "This input allows pulling of RTP packets using a provided SDP file"; + capa["source_match"].append("*.sdp"); + capa["always_match"].append("*.sdp"); + capa["priority"] = 9; + capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][0u].append("HEVC"); + capa["codecs"][0u][0u].append("MPEG2"); + capa["codecs"][0u][0u].append("VP8"); + capa["codecs"][0u][0u].append("VP9"); + capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("MP3"); + capa["codecs"][0u][1u].append("AC3"); + capa["codecs"][0u][1u].append("ALAW"); + capa["codecs"][0u][1u].append("ULAW"); + capa["codecs"][0u][1u].append("PCM"); + capa["codecs"][0u][1u].append("opus"); + capa["codecs"][0u][1u].append("MP2"); + + JSON::Value option; + option["arg"] = "integer"; + option["long"] = "buffer"; + option["short"] = "b"; + option["help"] = "DVR buffer time in ms"; + option["value"].append(50000); + config->addOption("bufferTime", option); + capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; + capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in " + "milliseconds. This is the time available to seek around in, " + "and will automatically be extended to fit whole keyframes " + "as well as the minimum duration needed for stable playback."; + capa["optional"]["DVR"]["option"] = "--buffer"; + capa["optional"]["DVR"]["type"] = "uint"; + capa["optional"]["DVR"]["default"] = 50000; + option.null(); + } + + /// Checks whether the input string ends with .sdp + bool InputSDP::checkArguments(){ + const std::string &inpt = config->getString("input"); + if (inpt.substr(inpt.length() - 4) != ".sdp"){ + FAIL_MSG("Expected a SDP file but received: '%s'", inpt.c_str()); + return false; + } + return true; + } + + /// Lets URIreader open the SDP file at the requested given location + bool InputSDP::openStreamSource(){ + const std::string &inpt = config->getString("input"); + reader.open(inpt); + // Will return false if it cant open file or it is EOF + return reader; + } + + /// Gets and parses the SDP file + void InputSDP::parseStreamHeader(){ + if (!reader){ + FAIL_MSG("Connection lost with input. Could not get stream description!"); + return; + } + + reader.readAll(buffer, bytesRead); + HIGH_MSG("Downloaded SDP file (%lu B)", bytesRead); + + // Save old buffer in order to identify changes + oldBuffer = strdup(buffer); + + sdpState.reinitSDP(); + sdpState.parseSDP(buffer); + + INFO_MSG("Stream contains %zu tracks", M.getValidTracks().size()); + + if (reader){ + reader.close(); + } + } + + void InputSDP::closeStreamSource(){ + if (reader){ + reader.close(); + } + return; + } + + /// Compare two c strings char by char + /// \return false if not equals (or different in size), else true + bool InputSDP::compareStrings(char* str1, char* str2){ + size_t strlen1 = strlen(str1); + size_t strlen2 = strlen(str2); + + if (strlen1 != strlen2){ + return false; + } + + for (int k = 0; k < strlen1; k++){ + if(str1[k] != str2[k]){ + return false; + } + } + return true; + } + + // Checks if there are updates available to the SDP file + // and updates the SDP file accordingly + bool InputSDP::updateSDP(){ + // Reset error flag + hasBork = false; + // Reopen the file if necessary + if (!reader){ + const std::string &inpt = config->getString("input"); + reader.open(inpt); + } + // If the file has dissappeared the stream must have stopped + if (!reader){ + WARN_MSG("SDP file no longer available. Cannot update SDP info."); + return false; + } + // Re-read SDP file + reader.readAll(buffer, bytesRead); + // Re-init SPD state iff contents have changed + INFO_MSG("Downloaded SDP file (%lu B)", bytesRead); + if (bytesRead != 0){ + if (!compareStrings(oldBuffer, buffer)){ + INFO_MSG("SDP contents have changed. Reparsing SDP file"); + // Save old buffer in order to identify changes + oldBuffer = strdup(buffer); + + sdpState.reinitSDP(); + sdpState.parseSDP(buffer); + + INFO_MSG("Stream contains %zu tracks", M.getValidTracks().size()); + } + else{ + FAIL_MSG("Unable to parse stream data for current SDP file. Quitting..."); + return false; + } + } + else{ + FAIL_MSG("SDP file no longer available. Quitting..."); + return false; + } + + + // Close the file so that we can reopen it on err + if (reader){ + reader.close(); + } + + // Notify Meta of changes to tracks + meta.refresh(); + + return true; + } + + // Updates stats and quits if parsePacket returns false + void InputSDP::streamMainLoop(){ + Comms::Statistics statComm; + uint64_t startTime = Util::epoch(); + uint64_t lastSecs = 0; + // Get RTP packets from UDP socket and stop if this fails + while (keepAlive() && parsePacket()){ + uint64_t currSecs = Util::bootSecs(); + if (lastSecs != currSecs){ + lastSecs = currSecs; + // Connect to stats for INPUT detection + statComm.reload(); + if (statComm){ + if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ + config->is_active = false; + Util::logExitReason("received shutdown request from controller"); + return; + } + uint64_t now = Util::bootSecs(); + statComm.setNow(now); + statComm.setCRC(getpid()); + statComm.setStream(streamName); + statComm.setConnector("INPUT:" + capa["name"].asStringRef()); + statComm.setDown(bytesRead); + statComm.setUp(bytesUp); + statComm.setTime(now - startTime); + statComm.setLastSecond(0); + statComm.setHost(getConnectedBinHost()); + } + } + // If the error flag is raised or we are lacking data, try to recover + if (count > 5 || hasBork) { + if (!updateSDP()){ + return; + } + } + } + } + + /// \brief Passes incoming RTP packets to sorter + /// \return False if we cannot recover and should quit. Else returns True + bool InputSDP::parsePacket(){ + uint32_t waitTime = 200; + bool receivedPacket = false; + // How often to send RTCP receiver requests in seconds + const uint32_t rtcpInterval = 7; + for (std::map::iterator it = sdpState.tracks.begin(); + it != sdpState.tracks.end(); ++it){ + + // Get RTP socket for selected track + Socket::UDPConnection &s = it->second.data; + it->second.sorter.setCallback(it->first, insertRTP); + + // Get RTP packets + while (s.Receive()){ + count = 0; + receivedPacket = true; + bytesRead += (s.data.size()); + RTP::Packet pack(s.data, s.data.size()); + + // Init local and remote SSRC if it was not set + if (!it->second.theirSSRC){ + it->second.theirSSRC = pack.getSSRC(); + } + if (!currentSSRC[it->first]){ + currentSSRC[it->first] = pack.getSSRC(); + } + // If we still have some packets from the old track in the socket buffer, skip it + if (oldSSRC[it->first] == pack.getSSRC()){ + continue; + } + // Verify if the SSRC has changed: indicating that a new video is being sent + // Either recover, reload or quit at this point + if (currentSSRC[it->first] != pack.getSSRC()){ + WARN_MSG("Sorter for the current track has encountered an error: current SSRC has changed from %u to %u. Trying to recover...", currentSSRC[it->first], pack.getSSRC()); + oldSSRC[it->first] = currentSSRC[it->first]; + hasBork = true; + return true; + } + + // Let sorter handle RTP specifics + it->second.sorter.addPacket(pack); + DONTEVEN_MSG("Added %zu B RTP packet to buffer with start time %u and SSRC %u: %s", bytesRead, pack.getTimeStamp(), pack.getSSRC(), pack.toString().c_str()); + } + // Send RTCP packet back to host + if (Util::bootSecs() > it->second.rtcpSent + rtcpInterval){ + it->second.rtcpSent = Util::bootSecs(); + it->second.pack.sendRTCP_RR(it->second, sendUDP); + } + } + if (!receivedPacket){ + Util::sleep(waitTime); + count++; + } + return true; + } + + // Buffers incoming DTSC packets (from SDP tracks -> RTP sorter) + void InputSDP::incoming(const DTSC::Packet &pkt){ + if (!M.getBootMsOffset()){ + meta.setBootMsOffset(Util::bootMS() - pkt.getTime()); + packetOffset = 0; + setPacketOffset = true; + }else if (!setPacketOffset){ + packetOffset = (Util::bootMS() - pkt.getTime()) - M.getBootMsOffset(); + setPacketOffset = true; + } + static DTSC::Packet newPkt; + char *pktData; + size_t pktDataLen; + pkt.getString("data", pktData, pktDataLen); + size_t idx = M.trackIDToIndex(pkt.getTrackId(), getpid()); + + HIGH_MSG("Buffering new pkt for track %zu->%zu at offset %zu and time %zu", pkt.getTrackId(), idx, packetOffset, pkt.getTime()); + + if (idx == INVALID_TRACK_ID){ + INFO_MSG("Invalid index for track number %zu", pkt.getTrackId()); + }else{ + if (!userSelect.count(idx)){ + WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx); + userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); + } + if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){ + Util::logExitReason("buffer requested shutdown"); + } + } + + bufferLivePacket(pkt.getTime() + packetOffset, pkt.getInt("offset"), idx, pktData, + pktDataLen, 0, pkt.getFlag("keyframe")); + } +}// namespace Mist diff --git a/src/input/input_sdp.h b/src/input/input_sdp.h new file mode 100644 index 00000000..9e375ba0 --- /dev/null +++ b/src/input/input_sdp.h @@ -0,0 +1,70 @@ +#include "input.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Mist{ + class InputSDP : public Input{ + public: + InputSDP(Util::Config *cfg); + + // Buffers incoming DTSC packets (from SDP tracks -> RTP sorter) + void incoming(const DTSC::Packet &pkt); + + void incomingRTP(const uint64_t track, const RTP::Packet &p); + + // Compare two c strings char by char + bool compareStrings(char* str1, char* str2); + + protected: + void streamMainLoop(); + bool checkArguments(); + // Overwrite default functions from input + bool needHeader(){return false;} + bool readHeader(){return true;} + // Force to stream > serve + bool needsLock(){return false;} + // Open connection with input + bool openStreamSource(); + void closeStreamSource(); + + // Gets and parses SDP file + void parseStreamHeader(); + // Passes incoming RTP packets to sorter + bool parsePacket(); + // Checks if there are updates available to the SDP file + // and updates the SDP file accordingly + bool updateSDP(); + + // Used to read SDP file + HTTP::URIReader reader; + // Contains track info + SDP::State sdpState; + // Total bytes downloaded + size_t bytesRead; + // Local buffer to read into + char* buffer; + // Copy of parsed SDP file in order to detect changes + char* oldBuffer; + + bool setPacketOffset; + int64_t packetOffset; + + // Count amount of pulls without a packet + int count; + // Flag to re-init SDP state + bool hasBork; + + // Map SSRC to tracks in order to recognize when video source changes + std::map currentSSRC; + // Map prev SSRC in order to detect old packages to ignore + std::map oldSSRC; + }; +}// namespace Mist + +typedef Mist::InputSDP mistIn;