diff --git a/lib/rtp.cpp b/lib/rtp.cpp index 72853752..878a14d6 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -286,21 +286,21 @@ namespace RTP{ FAIL_MSG("Could not allocate 32 bytes. Something is seriously messed up."); return; } - if (!(sTrk.lostCurrent + sTrk.packCurrent)){sTrk.packCurrent++;} + if (!(sTrk.sorter.lostCurrent + sTrk.sorter.packCurrent)){sTrk.sorter.packCurrent++;} rtcpData[0] = 0x81;//version 2, no padding, one receiver report rtcpData[1] = 201;//receiver report Bit::htobs(rtcpData+2, 7);//7 4-byte words follow the header Bit::htobl(rtcpData+4, sTrk.mySSRC);//set receiver identifier Bit::htobl(rtcpData+8, sTrk.theirSSRC);//set source identifier - rtcpData[12] = (sTrk.lostCurrent * 255) / (sTrk.lostCurrent + sTrk.packCurrent); //fraction lost since prev RR - Bit::htob24(rtcpData+13, sTrk.lostTotal); //cumulative packets lost since start - Bit::htobl(rtcpData+16, sTrk.rtpSeq | (sTrk.packTotal & 0xFFFF0000ul)); //highest sequence received + rtcpData[12] = (sTrk.sorter.lostCurrent * 255) / (sTrk.sorter.lostCurrent + sTrk.sorter.packCurrent); //fraction lost since prev RR + Bit::htob24(rtcpData+13, sTrk.sorter.lostTotal); //cumulative packets lost since start + Bit::htobl(rtcpData+16, sTrk.sorter.rtpSeq | (sTrk.sorter.packTotal & 0xFFFF0000ul)); //highest sequence received Bit::htobl(rtcpData+20, 0); /// \TODO jitter (diff in timestamp vs packet arrival) Bit::htobl(rtcpData+24, 0); /// \TODO last SR (middle 32 bits of last SR or zero) Bit::htobl(rtcpData+28, 0); /// \TODO delay since last SR in 2b seconds + 2b fraction callBack(&(sTrk.rtcp), (char *)rtcpData, 32, 0); - sTrk.lostCurrent = 0; - sTrk.packCurrent = 0; + sTrk.sorter.lostCurrent = 0; + sTrk.sorter.packCurrent = 0; free(rtcpData); } @@ -441,5 +441,79 @@ namespace RTP{ data[2] |= 0x8; } + Sorter::Sorter(){ + packTrack = 0; + rtpSeq = 0; + lostTotal = 0; + lostCurrent = 0; + packTotal = 0; + packCurrent = 0; + } + + void Sorter::setCallback(uint64_t track, void (*cb)(const uint64_t track, const Packet &p)){ + callback = cb; + packTrack = track; + } + + /// Calls addPacket(pack) with a newly constructed RTP::Packet from the given arguments. + void Sorter::addPacket(const char *dat, unsigned int len){ + addPacket(RTP::Packet(dat, len)); + } + + /// Takes in new RTP packets for a single track. + /// Automatically sorts them, waiting when packets come in slow or not at all. + /// Calls the callback with packets in sorted order, whenever it becomes possible to do so. + void Sorter::addPacket(const Packet & pack){ + if (!rtpSeq){rtpSeq = pack.getSequence();} + // packet is very early - assume dropped after 30 packets + while ((int16_t)(rtpSeq - ((uint16_t)pack.getSequence())) < -30){ + WARN_MSG("Giving up on packet %u", rtpSeq); + ++rtpSeq; + ++lostTotal; + ++lostCurrent; + ++packTotal; + ++packCurrent; + // send any buffered packets we may have + while (packBuffer.count(rtpSeq)){ + callback(packTrack, pack); + ++rtpSeq; + ++packTotal; + ++packCurrent; + } + } + // send any buffered packets we may have + while (packBuffer.count(rtpSeq)){ + callback(packTrack, pack); + ++rtpSeq; + ++packTotal; + ++packCurrent; + } + // packet is slightly early - buffer it + if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) < 0){ + INFO_MSG("Buffering early packet #%u->%u", rtpSeq, pack.getSequence()); + packBuffer[pack.getSequence()] = pack; + } + // packet is late + if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) > 0){ + // negative difference? + --lostTotal; + --lostCurrent; + ++packTotal; + ++packCurrent; + WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", + (int16_t)(rtpSeq - (uint16_t)pack.getSequence())); + return; + } + // packet is in order + if (rtpSeq == pack.getSequence()){ + callback(packTrack, pack); + ++rtpSeq; + ++packTotal; + ++packCurrent; + } + } + + + } diff --git a/lib/rtp.h b/lib/rtp.h index 2d0468cf..2bfc1f1a 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -78,6 +78,21 @@ namespace RTP{ char *getData(); }; + class Sorter{ + public: + Sorter(); + void addPacket(const char *dat, unsigned int len); + void addPacket(const Packet & pack); + void setCallback(uint64_t track, void (*callback)(const uint64_t track, const Packet &p)); + uint16_t rtpSeq; + int32_t lostTotal, lostCurrent; + uint32_t packTotal, packCurrent; + private: + uint64_t packTrack; + std::map packBuffer; + void (*callback)(const uint64_t track, const Packet &p); + }; + class MPEGVideoHeader{ public: MPEGVideoHeader(char * d); diff --git a/lib/sdp.cpp b/lib/sdp.cpp index 2ca1b91b..be10c77b 100644 --- a/lib/sdp.cpp +++ b/lib/sdp.cpp @@ -13,11 +13,6 @@ namespace SDP{ channel = -1; firstTime = 0; packCount = 0; - rtpSeq = 0; - lostTotal = 0; - lostCurrent = 0; - packTotal = 0; - packCurrent = 0; fpsTime = 0; fpsMeta = 0; fps = 0; @@ -882,7 +877,7 @@ namespace SDP{ HIGH_MSG("Not start of a new FU - throwing away"); return; } - if (fuaBuffer.size() && ((pl[2] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){ + if (fuaBuffer.size() && ((pl[2] & 0x80) || (tracks[track].sorter.rtpSeq != pkt.getSequence()))){ WARN_MSG("H265 FU packet incompleted: %lu", fuaBuffer.size()); Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend fuaBuffer[4] |= 0x80; // set error bit @@ -989,7 +984,7 @@ namespace SDP{ HIGH_MSG("Not start of a new FU-A - throwing away"); return; } - if (fuaBuffer.size() && ((pl[1] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){ + if (fuaBuffer.size() && ((pl[1] & 0x80) || (tracks[track].sorter.rtpSeq != pkt.getSequence()))){ WARN_MSG("Ending unfinished FU-A"); INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaBuffer.size()); uint8_t nalType = (fuaBuffer[4] & 0x1F); diff --git a/lib/sdp.h b/lib/sdp.h index 049da973..a41b2c3f 100644 --- a/lib/sdp.h +++ b/lib/sdp.h @@ -14,14 +14,11 @@ namespace SDP{ Socket::UDPConnection data; Socket::UDPConnection rtcp; RTP::Packet pack; + RTP::Sorter sorter; long long rtcpSent; uint64_t firstTime; int channel; /// Channel number, used in TCP sending uint64_t packCount; - uint16_t rtpSeq; - int32_t lostTotal, lostCurrent; - uint32_t packTotal, packCurrent; - std::map packBuffer; std::string transportString; /// Current transport string. std::string control; std::string fmtp; /// fmtp string, used by getParamString / getParamInt diff --git a/src/analysers/analyser_rtsp.cpp b/src/analysers/analyser_rtsp.cpp index 380ad5cd..05716fb2 100644 --- a/src/analysers/analyser_rtsp.cpp +++ b/src/analysers/analyser_rtsp.cpp @@ -95,7 +95,7 @@ bool AnalyserRTSP::parsePacket(){ DETAIL_MED("Received packet for unknown track number on channel %u", chan); } if (trackNo){ - sdpState.tracks[trackNo].rtpSeq = pkt.getSequence(); + sdpState.tracks[trackNo].sorter.rtpSeq = pkt.getSequence(); } if (detail >= 10){ diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp index 2f807157..7beb9761 100755 --- a/src/input/input_rtsp.cpp +++ b/src/input/input_rtsp.cpp @@ -3,9 +3,8 @@ Mist::InputRTSP *classPointer = 0; Socket::Connection *mainConn = 0; -void incomingPacket(const DTSC::Packet &pkt){ - classPointer->incoming(pkt); -} +void incomingPacket(const DTSC::Packet &pkt){classPointer->incoming(pkt);} +void insertRTP(const uint64_t track, const RTP::Packet &p){classPointer->incomingRTP(track, p);} /// Function used to send RTP packets over UDP ///\param socket A UDP Connection pointer, sent as a void*, to keep portability. @@ -18,6 +17,8 @@ void sendUDP(void *socket, char *data, unsigned int len, unsigned int channel){ } namespace Mist{ + void InputRTSP::incomingRTP(const uint64_t track, const RTP::Packet &p){sdpState.handleIncomingRTP(track, p);} + InputRTSP::InputRTSP(Util::Config *cfg) : Input(cfg){ TCPmode = true; sdpState.myMeta = &myMeta; @@ -296,7 +297,7 @@ namespace Mist{ if (!trackNo && (chan % 2) != 1){ WARN_MSG("Received packet for unknown track number on channel %u", chan); } - if (trackNo){sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();} + if (trackNo){sdpState.tracks[trackNo].sorter.rtpSeq = pkt.getSequence();} sdpState.handleIncomingRTP(trackNo, pkt); @@ -320,59 +321,7 @@ namespace Mist{ // continue; //} tcpCon.addDown(s.data_len); - RTP::Packet pack(s.data, s.data_len); - if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();} - // packet is very early - assume dropped after 30 packets - while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -30){ - WARN_MSG("Giving up on packet %u", it->second.rtpSeq); - ++(it->second.rtpSeq); - ++(it->second.lostTotal); - ++(it->second.lostCurrent); - ++(it->second.packTotal); - ++(it->second.packCurrent); - // send any buffered packets we may have - while (it->second.packBuffer.count(it->second.rtpSeq)){ - sdpState.handleIncomingRTP(it->first, pack); - ++(it->second.rtpSeq); - ++(it->second.packTotal); - ++(it->second.packCurrent); - } - } - // send any buffered packets we may have - while (it->second.packBuffer.count(it->second.rtpSeq)){ - sdpState.handleIncomingRTP(it->first, pack); - ++(it->second.rtpSeq); - ++(it->second.packTotal); - ++(it->second.packCurrent); - } - // packet is slightly early - buffer it - if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){ - INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence()); - it->second.packBuffer[pack.getSequence()] = pack; - } - // packet is late - if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){ - // negative difference? - --(it->second.lostTotal); - --(it->second.lostCurrent); - ++(it->second.packTotal); - ++(it->second.packCurrent); - WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", - (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); - return false; - } - // packet is in order - if (it->second.rtpSeq == pack.getSequence()){ - sdpState.handleIncomingRTP(it->first, pack); - ++(it->second.rtpSeq); - ++(it->second.packTotal); - ++(it->second.packCurrent); - if (!it->second.theirSSRC){it->second.theirSSRC = pack.getSSRC();} - } - } - if (Util::epoch() / 5 != it->second.rtcpSent){ - it->second.rtcpSent = Util::epoch() / 5; - it->second.pack.sendRTCP_RR(connectedAt, it->second, it->first, myMeta, sendUDP); + it->second.sorter.addPacket(s.data, s.data_len); } } return r; diff --git a/src/input/input_rtsp.h b/src/input/input_rtsp.h index 6ce16ecf..be792ed2 100755 --- a/src/input/input_rtsp.h +++ b/src/input/input_rtsp.h @@ -14,6 +14,7 @@ namespace Mist{ InputRTSP(Util::Config *cfg); bool needsLock(){return false;} void incoming(const DTSC::Packet &pkt); + void incomingRTP(const uint64_t track, const RTP::Packet &p); protected: // Private Functions diff --git a/src/output/output_rtsp.cpp b/src/output/output_rtsp.cpp index 15af80ab..d942d9b1 100644 --- a/src/output/output_rtsp.cpp +++ b/src/output/output_rtsp.cpp @@ -14,8 +14,9 @@ namespace Mist{ Socket::Connection *mainConn = 0; OutRTSP *classPointer = 0; - /// Helper function for passing packets into the OutRTSP class + /// Helper functions for passing packets into the OutRTSP class void insertPacket(const DTSC::Packet &pkt){classPointer->incomingPacket(pkt);} + void insertRTP(const uint64_t track, const RTP::Packet &p){classPointer->incomingRTP(track, p);} /// Takes incoming packets and buffers them. void OutRTSP::incomingPacket(const DTSC::Packet &pkt){ @@ -37,6 +38,7 @@ namespace Mist{ bufferLivePacket(newPkt); //bufferLivePacket(DTSC::RetimedPacket(pkt.getTime() + packetOffset, pkt)); } + void OutRTSP::incomingRTP(const uint64_t track, const RTP::Packet &p){sdpState.handleIncomingRTP(track, p);} OutRTSP::OutRTSP(Socket::Connection &myConn) : Output(myConn){ connectedAt = Util::epoch() + 2208988800ll; @@ -410,8 +412,8 @@ namespace Mist{ uint32_t trackNo = sdpState.getTrackNoForChannel(tcpHead.data()[1]); if (trackNo && isPushing()){ RTP::Packet pkt(tcpPacket.data() + 4, len); - sdpState.tracks[trackNo].rtpSeq = pkt.getSequence(); - sdpState.handleIncomingRTP(trackNo, pkt); + sdpState.tracks[trackNo].sorter.rtpSeq = pkt.getSequence(); + incomingRTP(trackNo, pkt); } // attempt to read more packets return handleTCP(); @@ -423,6 +425,7 @@ namespace Mist{ for (std::map::iterator it = sdpState.tracks.begin(); it != sdpState.tracks.end(); ++it){ Socket::UDPConnection &s = it->second.data; + it->second.sorter.setCallback(it->first, insertRTP); while (s.Receive()){ if (s.getDestPort() != it->second.cPortA && checkPort){ // wrong sending port, ignore packet @@ -431,56 +434,10 @@ namespace Mist{ lastRecv = Util::epoch(); // prevent disconnect of idle TCP connection when using UDP myConn.addDown(s.data_len); RTP::Packet pack(s.data, s.data_len); - if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();} - // packet is very early - assume dropped after 30 packets - while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -30){ - WARN_MSG("Giving up on packet %u", it->second.rtpSeq); - ++(it->second.rtpSeq); - ++(it->second.lostTotal); - ++(it->second.lostCurrent); - ++(it->second.packTotal); - ++(it->second.packCurrent); - // send any buffered packets we may have - while (it->second.packBuffer.count(it->second.rtpSeq)){ - sdpState.handleIncomingRTP(it->first, pack); - ++(it->second.rtpSeq); - ++(it->second.packTotal); - ++(it->second.packCurrent); - } - } - // send any buffered packets we may have - while (it->second.packBuffer.count(it->second.rtpSeq)){ - sdpState.handleIncomingRTP(it->first, pack); - ++(it->second.rtpSeq); - ++(it->second.packTotal); - ++(it->second.packCurrent); - } - // packet is slightly early - buffer it - if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){ - INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence()); - it->second.packBuffer[pack.getSequence()] = pack; - } - // packet is late - if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){ - // negative difference? - --(it->second.lostTotal); - --(it->second.lostCurrent); - ++(it->second.packTotal); - ++(it->second.packCurrent); - WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", - (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); - return; - } - // packet is in order - if (it->second.rtpSeq == pack.getSequence()){ - sdpState.handleIncomingRTP(it->first, pack); - ++(it->second.rtpSeq); - ++(it->second.packTotal); - ++(it->second.packCurrent); - if (!it->second.theirSSRC){ - it->second.theirSSRC = pack.getSSRC(); - } + if (!it->second.theirSSRC){ + it->second.theirSSRC = pack.getSSRC(); } + it->second.sorter.addPacket(pack); } if (Util::epoch() / 5 != it->second.rtcpSent){ it->second.rtcpSent = Util::epoch() / 5; diff --git a/src/output/output_rtsp.h b/src/output/output_rtsp.h index 375126ff..8802a849 100644 --- a/src/output/output_rtsp.h +++ b/src/output/output_rtsp.h @@ -17,6 +17,7 @@ namespace Mist{ void requestHandler(); bool onFinish(); void incomingPacket(const DTSC::Packet &pkt); + void incomingRTP(const uint64_t track, const RTP::Packet &p); private: long long connectedAt; ///< The timestamp the connection was made, as reference point for RTCP