diff --git a/lib/rtp.cpp b/lib/rtp.cpp index 5e347bce..de1b6fe5 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -12,6 +12,8 @@ namespace RTP{ double Packet::startRTCP = 0; unsigned int MAX_SEND = 1500 - 28; + unsigned int PACKET_REORDER_WAIT = 5; + unsigned int PACKET_DROP_TIMEOUT = 30; unsigned int Packet::getHsize() const{ unsigned int r = 12 + 4 * getContribCount(); @@ -474,15 +476,16 @@ namespace RTP{ Sorter::Sorter(uint64_t trackId, void (*cb)(const uint64_t track, const Packet &p)){ packTrack = trackId; rtpSeq = 0; + rtpWSeq = 0; lostTotal = 0; lostCurrent = 0; packTotal = 0; packCurrent = 0; callback = cb; - } - - bool Sorter::wantSeq(uint16_t seq) const{ - return !rtpSeq || !(seq < rtpSeq || seq > (rtpSeq + 500) || packBuffer.count(seq)); + first = true; + preBuffer = true; + lastBootMS = 0; + lastNTP = 0; } void Sorter::setCallback(uint64_t track, void (*cb)(const uint64_t track, const Packet &p)){ @@ -497,57 +500,76 @@ namespace RTP{ /// Automatically sorts them, waiting when packets come in slow or not at all. /// Calls the callback with packets in sorted order, whenever it becomes possible to do so. void Sorter::addPacket(const Packet &pack){ - if (!rtpSeq){rtpSeq = pack.getSequence();} - // packet is very early - assume dropped after 150 packets - while ((int16_t)(rtpSeq - ((uint16_t)pack.getSequence())) < -150){ - WARN_MSG("Giving up on packet %u", rtpSeq); - ++rtpSeq; - ++lostTotal; - ++lostCurrent; - ++packTotal; - ++packCurrent; - // send any buffered packets we may have - while (packBuffer.count(rtpSeq)){ - outPacket(packTrack, packBuffer[rtpSeq]); - packBuffer.erase(rtpSeq); - INFO_MSG("Sent packet %u, now %zu in buffer", rtpSeq, packBuffer.size()); + uint16_t pSNo = pack.getSequence(); + if (first){ + rtpWSeq = pSNo; + rtpSeq = pSNo - 5; + first = false; + } + if (preBuffer){ + //If we've buffered the first 5 packets, assume we have the first one known + if (packBuffer.size() >= 5){ + preBuffer = false; + rtpSeq = packBuffer.begin()->first; + rtpWSeq = rtpSeq; + } + }else{ + // packet is very early - assume dropped after PACKET_DROP_TIMEOUT packets + while ((int16_t)(rtpSeq - pSNo) < -(int)PACKET_DROP_TIMEOUT){ + WARN_MSG("Giving up on packet %u", rtpSeq); ++rtpSeq; + ++lostTotal; + ++lostCurrent; ++packTotal; ++packCurrent; } } + //Update wanted counter if we passed it (1 of 2) + if ((int16_t)(rtpWSeq - rtpSeq) < 0){rtpWSeq = rtpSeq;} + // packet is somewhat early - ask for packet after PACKET_REORDER_WAIT packets + while ((int16_t)(rtpWSeq - pSNo) < -(int)PACKET_REORDER_WAIT){ + //Only wanted if we don't already have it + if (!packBuffer.count(rtpWSeq)){ + wantedSeqs.insert(rtpWSeq); + } + ++rtpWSeq; + } // send any buffered packets we may have + uint16_t prertpSeq = rtpSeq; while (packBuffer.count(rtpSeq)){ outPacket(packTrack, packBuffer[rtpSeq]); packBuffer.erase(rtpSeq); - INFO_MSG("Sent packet %u, now %zu in buffer", rtpSeq, packBuffer.size()); ++rtpSeq; ++packTotal; ++packCurrent; } + if (prertpSeq != rtpSeq){ + INFO_MSG("Sent packets %" PRIu16 "-%" PRIu16 ", now %zu in buffer", prertpSeq, rtpSeq, packBuffer.size()); + } // packet is slightly early - buffer it - if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) < 0){ + if ((int16_t)(rtpSeq - pSNo) < 0){ HIGH_MSG("Buffering early packet #%u->%u", rtpSeq, pack.getSequence()); packBuffer[pack.getSequence()] = pack; } // packet is late - if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) > 0){ + if ((int16_t)(rtpSeq - pSNo) > 0){ // negative difference? - --lostTotal; - --lostCurrent; - ++packTotal; - ++packCurrent; - WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", - (int16_t)(rtpSeq - (uint16_t)pack.getSequence())); - return; + //--lostTotal; + //--lostCurrent; + //++packTotal; + //++packCurrent; + //WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", (int16_t)(rtpSeq - pSNo)); + //return; } // packet is in order - if (rtpSeq == pack.getSequence()){ + if (rtpSeq == pSNo){ outPacket(packTrack, pack); ++rtpSeq; ++packTotal; ++packCurrent; } + //Update wanted counter if we passed it (2 of 2) + if ((int16_t)(rtpWSeq - rtpSeq) < 0){rtpWSeq = rtpSeq;} } toDTSC::toDTSC(){ diff --git a/lib/rtp.h b/lib/rtp.h index 04b120cf..9adf5eb6 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -26,6 +26,8 @@ namespace SDP{ namespace RTP{ extern uint32_t MAX_SEND; + extern unsigned int PACKET_REORDER_WAIT; + extern unsigned int PACKET_DROP_TIMEOUT; /// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP /// mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in @@ -86,7 +88,6 @@ namespace RTP{ class Sorter{ public: Sorter(uint64_t trackId = 0, void (*callback)(const uint64_t track, const Packet &p) = 0); - bool wantSeq(uint16_t seq) const; void addPacket(const char *dat, unsigned int len); void addPacket(const Packet &pack); // By default, calls the callback function, if set. @@ -95,9 +96,14 @@ namespace RTP{ } void setCallback(uint64_t track, void (*callback)(const uint64_t track, const Packet &p)); uint16_t rtpSeq; + uint16_t rtpWSeq; + bool first; + bool preBuffer; int32_t lostTotal, lostCurrent; uint32_t packTotal, packCurrent; - + std::set wantedSeqs; + uint32_t lastNTP; ///< Middle 32 bits of last Sender Report NTP timestamp + uint64_t lastBootMS; ///< bootMS time of last Sender Report private: uint64_t packTrack; std::map packBuffer; diff --git a/lib/rtp_fec.cpp b/lib/rtp_fec.cpp index a78ad617..c5da87e3 100644 --- a/lib/rtp_fec.cpp +++ b/lib/rtp_fec.cpp @@ -531,7 +531,7 @@ namespace RTP{ } void FECPacket::sendRTCP_RR(RTP::FECSorter &sorter, uint32_t mySSRC, uint32_t theirSSRC, void *userData, - void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel)){ + void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel), uint32_t jitter){ char *rtcpData = (char *)malloc(32); if (!rtcpData){ FAIL_MSG("Could not allocate 32 bytes. Something is seriously messed up."); @@ -547,9 +547,13 @@ namespace RTP{ Bit::htob24(rtcpData + 13, sorter.lostTotal); // cumulative packets lost since start Bit::htobl(rtcpData + 16, sorter.rtpSeq | (sorter.packTotal & 0xFFFF0000ul)); // highest sequence received - Bit::htobl(rtcpData + 20, 0); /// \TODO jitter (diff in timestamp vs packet arrival) - Bit::htobl(rtcpData + 24, 0); /// \TODO last SR (middle 32 bits of last SR or zero) - Bit::htobl(rtcpData + 28, 0); /// \TODO delay since last SR in 2b seconds + 2b fraction + Bit::htobl(rtcpData + 20, jitter); // jitter + Bit::htobl(rtcpData + 24, sorter.lastNTP); // last SR NTP time (middle 32 bits) + if (sorter.lastBootMS){ + Bit::htobl(rtcpData + 28, (Util::bootMS() - sorter.lastBootMS) * 65.536); // delay since last SR in 1/65536th of a second + }else{ + Bit::htobl(rtcpData + 28, 0); // no delay since last SR yet + } callBack(userData, rtcpData, 32, 0); sorter.lostCurrent = 0; sorter.packCurrent = 0; diff --git a/lib/rtp_fec.h b/lib/rtp_fec.h index a73630a8..263cbeeb 100644 --- a/lib/rtp_fec.h +++ b/lib/rtp_fec.h @@ -86,7 +86,7 @@ namespace RTP{ class FECPacket : public Packet{ public: void sendRTCP_RR(RTP::FECSorter &sorter, uint32_t mySSRC, uint32_t theirSSRC, void *userData, - void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel)); + void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel), uint32_t jitter = 0); }; }// namespace RTP diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index efd94515..e19e7c2a 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -25,13 +25,32 @@ namespace Mist{ /* ------------------------------------------------ */ - WebRTCTrack::WebRTCTrack() - : payloadType(0), SSRC(0), ULPFECPayloadType(0), REDPayloadType(0), RTXPayloadType(0), - prevReceivedSequenceNumber(0){} + WebRTCTrack::WebRTCTrack(){ + payloadType = 0; + SSRC = 0; + ULPFECPayloadType = 0; + REDPayloadType = 0; + RTXPayloadType = 0; + lastTransit = 0; + jitter = 0; + } + + void WebRTCTrack::gotPacket(uint32_t ts){ + uint32_t arrival = Util::bootMS() * rtpToDTSC.multiplier; + int transit = arrival - ts; + int d = transit - lastTransit; + lastTransit = transit; + if (d < 0) d = -d; + jitter += (1. / 16.) * ((double)d - jitter); + } /* ------------------------------------------------ */ OutWebRTC::OutWebRTC(Socket::Connection &myConn) : HTTPOutput(myConn){ + stats_jitter = 0; + stats_nacknum = 0; + stats_lossnum = 0; + stats_lossperc = 0; lastPackMs = 0; vidTrack = INVALID_TRACK_ID; prevVidTrack = INVALID_TRACK_ID; @@ -52,6 +71,7 @@ namespace Mist{ rtcpKeyFrameDelayInMillis = 2000; rtcpKeyFrameTimeoutInMillis = 0; videoBitrate = 6 * 1000 * 1000; + videoConstraint = videoBitrate; RTP::MAX_SEND = 1350 - 28; didReceiveKeyFrame = false; doDTLS = true; @@ -114,7 +134,7 @@ namespace Mist{ "Comma separated list of video codecs you want to support in preferred order. e.g. " "H264,VP8"; capa["optional"]["preferredvideocodec"]["default"] = "H264,VP9,VP8"; - capa["optional"]["preferredvideocodec"]["type"] = "string"; + capa["optional"]["preferredvideocodec"]["type"] = "str"; capa["optional"]["preferredvideocodec"]["option"] = "--webrtc-video-codecs"; capa["optional"]["preferredvideocodec"]["short"] = "V"; @@ -123,7 +143,7 @@ namespace Mist{ "Comma separated list of audio codecs you want to support in preferred order. e.g. " "opus,ALAW,ULAW"; capa["optional"]["preferredaudiocodec"]["default"] = "opus,ALAW,ULAW"; - capa["optional"]["preferredaudiocodec"]["type"] = "string"; + capa["optional"]["preferredaudiocodec"]["type"] = "str"; capa["optional"]["preferredaudiocodec"]["option"] = "--webrtc-audio-codecs"; capa["optional"]["preferredaudiocodec"]["short"] = "A"; @@ -131,7 +151,7 @@ namespace Mist{ capa["optional"]["bindhost"]["help"] = "Interface address or hostname to bind SRTP UDP socket " "to. Defaults to originating interface address."; capa["optional"]["bindhost"]["default"] = ""; - capa["optional"]["bindhost"]["type"] = "string"; + capa["optional"]["bindhost"]["type"] = "str"; capa["optional"]["bindhost"]["option"] = "--bindhost"; capa["optional"]["bindhost"]["short"] = "B"; @@ -161,12 +181,49 @@ namespace Mist{ capa["optional"]["packetlog"]["short"] = "P"; capa["optional"]["packetlog"]["default"] = 0; + capa["optional"]["nacktimeout"]["name"] = "RTP NACK timeout"; + capa["optional"]["nacktimeout"]["help"] = "Amount of packets any track will wait for a packet to arrive before NACKing it"; + capa["optional"]["nacktimeout"]["option"] = "--nacktimeout"; + capa["optional"]["nacktimeout"]["short"] = "x"; + capa["optional"]["nacktimeout"]["type"] = "uint"; + capa["optional"]["nacktimeout"]["default"] = 5; + + capa["optional"]["losttimeout"]["name"] = "RTP lost timeout"; + capa["optional"]["losttimeout"]["help"] = "Amount of packets any track will wait for a packet to arrive before considering it lost"; + capa["optional"]["losttimeout"]["option"] = "--losttimeout"; + capa["optional"]["losttimeout"]["short"] = "l"; + capa["optional"]["losttimeout"]["type"] = "uint"; + capa["optional"]["losttimeout"]["default"] = 30; + + capa["optional"]["nacktimeoutmobile"]["name"] = "RTP NACK timeout (mobile)"; + capa["optional"]["nacktimeoutmobile"]["help"] = "Amount of packets any track will wait for a packet to arrive before NACKing it, on mobile connections"; + capa["optional"]["nacktimeoutmobile"]["option"] = "--nacktimeoutmobile"; + capa["optional"]["nacktimeoutmobile"]["short"] = "X"; + capa["optional"]["nacktimeoutmobile"]["type"] = "uint"; + capa["optional"]["nacktimeoutmobile"]["default"] = 15; + + capa["optional"]["losttimeoutmobile"]["name"] = "RTP lost timeout (mobile)"; + capa["optional"]["losttimeoutmobile"]["help"] = "Amount of packets any track will wait for a packet to arrive before considering it lost, on mobile connections"; + capa["optional"]["losttimeoutmobile"]["option"] = "--losttimeoutmobile"; + capa["optional"]["losttimeoutmobile"]["short"] = "L"; + capa["optional"]["losttimeoutmobile"]["type"] = "uint"; + capa["optional"]["losttimeoutmobile"]["default"] = 90; + config->addOptionsFromCapabilities(capa); } void OutWebRTC::preWebsocketConnect(){ HTTP::URL tmpUrl("http://" + H.GetHeader("Host")); externalAddr = tmpUrl.host; + if (UA.find("Mobi") != std::string::npos){ + RTP::PACKET_REORDER_WAIT = config->getInteger("nacktimeoutmobile"); + RTP::PACKET_DROP_TIMEOUT = config->getInteger("losttimeoutmobile"); + INFO_MSG("Using mobile RTP configuration: NACK at %u, drop at %u", RTP::PACKET_REORDER_WAIT, RTP::PACKET_DROP_TIMEOUT); + }else{ + RTP::PACKET_REORDER_WAIT = config->getInteger("nacktimeout"); + RTP::PACKET_DROP_TIMEOUT = config->getInteger("losttimeout"); + INFO_MSG("Using regular RTP configuration: NACK at %u, drop at %u", RTP::PACKET_REORDER_WAIT, RTP::PACKET_DROP_TIMEOUT); + } } // This function is executed when we receive a signaling data. @@ -259,10 +316,29 @@ namespace Mist{ "Failed to handle the video bitrate change request."); return; } + videoConstraint = videoBitrate; + if (videoConstraint < 1024){videoConstraint = 1024;} JSON::Value commandResult; commandResult["type"] = "on_video_bitrate"; commandResult["result"] = true; commandResult["video_bitrate"] = videoBitrate; + commandResult["video_bitrate_constraint"] = videoConstraint; + webSock->sendFrame(commandResult.toString()); + return; + } + + if (command["type"] == "rtp_props"){ + if (command.isMember("nack")){ + RTP::PACKET_REORDER_WAIT = command["nack"].asInt(); + } + if (command.isMember("drop")){ + RTP::PACKET_DROP_TIMEOUT = command["drop"].asInt(); + } + JSON::Value commandResult; + commandResult["type"] = "on_rtp_props"; + commandResult["result"] = true; + commandResult["nack"] = RTP::PACKET_REORDER_WAIT; + commandResult["drop"] = RTP::PACKET_DROP_TIMEOUT; webSock->sendFrame(commandResult.toString()); return; } @@ -544,6 +620,18 @@ namespace Mist{ commandResult["tracks"].append(it->first); } webSock->sendFrame(commandResult.toString()); + }else if (isPushing()){ + JSON::Value commandResult; + commandResult["type"] = "on_media_receive"; + commandResult["millis"] = endTime(); + for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ + commandResult["tracks"].append(M.getCodec(it->first)); + } + commandResult["stats"]["nack_num"] = stats_nacknum; + commandResult["stats"]["loss_num"] = stats_lossnum; + commandResult["stats"]["jitter_ms"] = stats_jitter; + commandResult["stats"]["loss_perc"] = stats_lossperc; + webSock->sendFrame(commandResult.toString()); } } @@ -744,6 +832,8 @@ namespace Mist{ rtcpTimeoutInMillis = Util::bootMS() + 2000; rtcpKeyFrameTimeoutInMillis = Util::bootMS() + 2000; + idleInterval = 1000; + return true; } @@ -980,12 +1070,6 @@ namespace Mist{ // Find the WebRTCTrack corresponding to the packet we received WebRTCTrack &rtcTrack = webrtcTracks[idx]; - // Do not parse packets we don't care about - if (!rtcTrack.sorter.wantSeq(currSeqNum)){ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Sequence #" << currSeqNum << " not interesting, ignored" << std::endl;} - return; - } - // Decrypt the SRTP to RTP int len = (int)udp.data_len; if (srtpReader.unprotectRtp((uint8_t *)udp.data, &len) != 0){ @@ -996,18 +1080,7 @@ namespace Mist{ RTP::Packet unprotPack(udp.data, len); DONTEVEN_MSG("%s", unprotPack.toString().c_str()); - // Here follows a very rudimentary algo for requesting lost - // packets; I guess after some experimentation a better - // algorithm should be used; this is used to trigger NACKs. - if (rtcTrack.prevReceivedSequenceNumber != 0 && (rtcTrack.prevReceivedSequenceNumber + 1) != currSeqNum){ - while (rtcTrack.prevReceivedSequenceNumber < currSeqNum){ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Sending NACK for sequence #" << rtcTrack.prevReceivedSequenceNumber << std::endl;} - sendRTCPFeedbackNACK(rtcTrack, rtcTrack.prevReceivedSequenceNumber); - rtcTrack.prevReceivedSequenceNumber++; - } - } - - rtcTrack.prevReceivedSequenceNumber = currSeqNum; + rtcTrack.gotPacket(unprotPack.getTimeStamp()); if (rtp_pkt.getPayloadType() == rtcTrack.REDPayloadType || rtp_pkt.getPayloadType() == rtcTrack.ULPFECPayloadType){ if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "RED packet " << rtp_pkt.getPayloadType() << " #" << currSeqNum << std::endl;} @@ -1017,59 +1090,84 @@ namespace Mist{ if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Basic packet " << rtp_pkt.getPayloadType() << " #" << currSeqNum << std::endl;} rtcTrack.sorter.addPacket(unprotPack); } - }else if ((pt >= 64) && (pt < 96)){ - if (pt == 77 || pt == 78 || pt == 65){ - int len = udp.data_len; - if (srtpReader.unprotectRtcp((uint8_t *)udp.data, &len) != 0){ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "RTCP decrypt failure" << std::endl;} - FAIL_MSG("Failed to unprotect RTCP."); - return; - } - uint8_t fmt = udp.data[0] & 0x1F; - if (pt == 77 || pt == 65){ - if (fmt == 1){ - uint32_t pSSRC = Bit::btohl(udp.data + 8); - uint16_t seq = Bit::btohs(udp.data + 12); - uint16_t bitmask = Bit::btohs(udp.data + 14); - ackNACK(pSSRC, seq); - size_t missed = 1; - if (bitmask & 1){ackNACK(pSSRC, seq + 1); missed++;} - if (bitmask & 2){ackNACK(pSSRC, seq + 2); missed++;} - if (bitmask & 4){ackNACK(pSSRC, seq + 3); missed++;} - if (bitmask & 8){ackNACK(pSSRC, seq + 4); missed++;} - if (bitmask & 16){ackNACK(pSSRC, seq + 5); missed++;} - if (bitmask & 32){ackNACK(pSSRC, seq + 6); missed++;} - if (bitmask & 64){ackNACK(pSSRC, seq + 7); missed++;} - if (bitmask & 128){ackNACK(pSSRC, seq + 8); missed++;} - if (bitmask & 256){ackNACK(pSSRC, seq + 9); missed++;} - if (bitmask & 512){ackNACK(pSSRC, seq + 10); missed++;} - if (bitmask & 1024){ackNACK(pSSRC, seq + 11); missed++;} - if (bitmask & 2048){ackNACK(pSSRC, seq + 12); missed++;} - if (bitmask & 4096){ackNACK(pSSRC, seq + 13); missed++;} - if (bitmask & 8192){ackNACK(pSSRC, seq + 14); missed++;} - if (bitmask & 16384){ackNACK(pSSRC, seq + 15); missed++;} - if (bitmask & 32768){ackNACK(pSSRC, seq + 16); missed++;} - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "NACK: " << missed << " missed packet(s)" << std::endl;} - }else{ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Feedback: Unimplemented (type " << fmt << ")" << std::endl;} - INFO_MSG("Received unimplemented RTP feedback message (%d)", fmt); - } - } - if (pt == 78){ - if (fmt == 1){ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "PLI: Picture Loss Indication ( = keyframe request = ignored)" << std::endl;} - DONTEVEN_MSG("Received picture loss indication"); - }else{ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Feedback: Unimplemented (payload specific type " << fmt << ")" << std::endl;} - INFO_MSG("Received unimplemented payload-specific feedback message (%d)", fmt); - } - } + //Send NACKs for packets that we still need + while (rtcTrack.sorter.wantedSeqs.size()){ + uint16_t sNum = *(rtcTrack.sorter.wantedSeqs.begin()); + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Sending NACK for sequence #" << sNum << std::endl;} + stats_nacknum++; + sendRTCPFeedbackNACK(rtcTrack, sNum); + rtcTrack.sorter.wantedSeqs.erase(sNum); } }else{ - if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Unknown payload type: " << pt << std::endl;} - FAIL_MSG("Unknown payload type: %u", pt); + //Decrypt feedback packet + int len = udp.data_len; + if (srtpReader.unprotectRtcp((uint8_t *)udp.data, &len) != 0){ + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "RTCP decrypt failure" << std::endl;} + FAIL_MSG("Failed to unprotect RTCP."); + return; + } + uint8_t fmt = udp.data[0] & 0x1F; + if (pt == 77 || pt == 65){ + //77/65 = nack + if (fmt == 1){ + uint32_t pSSRC = Bit::btohl(udp.data + 8); + uint16_t seq = Bit::btohs(udp.data + 12); + uint16_t bitmask = Bit::btohs(udp.data + 14); + ackNACK(pSSRC, seq); + size_t missed = 1; + if (bitmask & 1){ackNACK(pSSRC, seq + 1); missed++;} + if (bitmask & 2){ackNACK(pSSRC, seq + 2); missed++;} + if (bitmask & 4){ackNACK(pSSRC, seq + 3); missed++;} + if (bitmask & 8){ackNACK(pSSRC, seq + 4); missed++;} + if (bitmask & 16){ackNACK(pSSRC, seq + 5); missed++;} + if (bitmask & 32){ackNACK(pSSRC, seq + 6); missed++;} + if (bitmask & 64){ackNACK(pSSRC, seq + 7); missed++;} + if (bitmask & 128){ackNACK(pSSRC, seq + 8); missed++;} + if (bitmask & 256){ackNACK(pSSRC, seq + 9); missed++;} + if (bitmask & 512){ackNACK(pSSRC, seq + 10); missed++;} + if (bitmask & 1024){ackNACK(pSSRC, seq + 11); missed++;} + if (bitmask & 2048){ackNACK(pSSRC, seq + 12); missed++;} + if (bitmask & 4096){ackNACK(pSSRC, seq + 13); missed++;} + if (bitmask & 8192){ackNACK(pSSRC, seq + 14); missed++;} + if (bitmask & 16384){ackNACK(pSSRC, seq + 15); missed++;} + if (bitmask & 32768){ackNACK(pSSRC, seq + 16); missed++;} + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "NACK: " << missed << " missed packet(s)" << std::endl;} + }else{ + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Feedback: Unimplemented (type " << fmt << ")" << std::endl;} + INFO_MSG("Received unimplemented RTP feedback message (%d)", fmt); + } + }else if (pt == 78){ + //78 = PLI + if (fmt == 1){ + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "PLI: Picture Loss Indication ( = keyframe request = ignored)" << std::endl;} + DONTEVEN_MSG("Received picture loss indication"); + }else{ + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Feedback: Unimplemented (payload specific type " << fmt << ")" << std::endl;} + INFO_MSG("Received unimplemented payload-specific feedback message (%d)", fmt); + } + }else if (pt == 72){ + //72 = sender report + uint32_t SSRC = Bit::btohl(udp.data + 4); + std::map::iterator it; + for (it = webrtcTracks.begin(); it != webrtcTracks.end(); ++it){ + if (it->second.SSRC == SSRC){ + it->second.sorter.lastBootMS = Util::bootMS(); + it->second.sorter.lastNTP = Bit::btohl(udp.data+10);; + uint32_t packets = Bit::btohl(udp.data + 20); + uint32_t bytes = Bit::btohl(udp.data + 24); + HIGH_MSG("Received sender report for track %s (%" PRIu32 " pkts, %" PRIu32 "b)", it->second.rtpToDTSC.codec.c_str(), packets, bytes); + break; + } + } + }else if (pt == 73){ + //73 = receiver report + // \TODO Implement, maybe? + }else{ + if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "Unknown payload type: " << pt << std::endl;} + WARN_MSG("Unknown RTP feedback payload type: %u", pt); + } } } @@ -1386,15 +1484,9 @@ namespace Mist{ } void OutWebRTC::sendRTCPFeedbackREMB(const WebRTCTrack &rtcTrack){ - - if (videoBitrate == 0){ - FAIL_MSG("videoBitrate is 0, which is invalid. Resetting to our default value."); - videoBitrate = 6 * 1000 * 1000; - } - // create the `BR Exp` and `BR Mantissa parts. uint32_t br_exponent = 0; - uint32_t br_mantissa = videoBitrate; + uint32_t br_mantissa = videoConstraint; while (br_mantissa > 0x3FFFF){ br_mantissa >>= 1; ++br_exponent; @@ -1501,7 +1593,7 @@ namespace Mist{ // sequence numbers are lost it makes sense to implement this // too. void OutWebRTC::sendRTCPFeedbackNACK(const WebRTCTrack &rtcTrack, uint16_t lostSequenceNumber){ - HIGH_MSG("Requesting missing sequence number %u", lostSequenceNumber); + INFO_MSG("Requesting missing sequence number %u", lostSequenceNumber); std::vector buffer; buffer.push_back(0x80 | 0x01); // V=2 (0x80) | FMT=1 (0x01) @@ -1547,8 +1639,30 @@ namespace Mist{ } void OutWebRTC::sendRTCPFeedbackRR(WebRTCTrack &rtcTrack){ + stats_lossperc = (double)(rtcTrack.sorter.lostCurrent * 100.) / (double)(rtcTrack.sorter.lostCurrent + rtcTrack.sorter.packCurrent); + stats_jitter = rtcTrack.jitter/rtcTrack.rtpToDTSC.multiplier; + stats_lossnum = rtcTrack.sorter.lostTotal; + //If we have > 5% loss, constrain video by 10% + if (stats_lossperc > 5){ + videoConstraint *= 0.9; + if (videoConstraint < 1024){videoConstraint = 1024;} + JSON::Value commandResult; + commandResult["type"] = "on_video_bitrate"; + commandResult["result"] = true; + commandResult["video_bitrate"] = videoBitrate; + commandResult["video_bitrate_constraint"] = videoConstraint; + webSock->sendFrame(commandResult.toString()); + } + if (stats_lossperc > 1 || stats_jitter > 20){ + INFO_MSG("Receiver Report (%s): %.2f%% loss, %" PRIu32 " total lost, %.2f ms jitter", rtcTrack.rtpToDTSC.codec.c_str(), stats_lossperc, rtcTrack.sorter.lostTotal, stats_jitter); + }else{ + HIGH_MSG("Receiver Report (%s): %.2f%% loss, %" PRIu32 " total lost, %.2f ms jitter", rtcTrack.rtpToDTSC.codec.c_str(), stats_lossperc, rtcTrack.sorter.lostTotal, stats_jitter); + } - ((RTP::FECPacket *)&(rtcTrack.rtpPacketizer))->sendRTCP_RR(rtcTrack.sorter, SSRC, rtcTrack.SSRC, (void *)&udp, onRTPPacketizerHasRTCPDataCallback); + if (packetLog.is_open()){ + packetLog << "[" << Util::bootMS() << "] Receiver Report (" << rtcTrack.rtpToDTSC.codec << "): " << stats_lossperc << " percent loss, " << rtcTrack.sorter.lostTotal << " total lost, " << stats_jitter << " ms jitter" << std::endl; + } + ((RTP::FECPacket *)&(rtcTrack.rtpPacketizer))->sendRTCP_RR(rtcTrack.sorter, SSRC, rtcTrack.SSRC, (void *)&udp, onRTPPacketizerHasRTCPDataCallback, (uint32_t)rtcTrack.jitter); } void OutWebRTC::sendSPSPPS(size_t dtscIdx, WebRTCTrack &rtcTrack){ diff --git a/src/output/output_webrtc.h b/src/output/output_webrtc.h index ab2f8026..b982bcf8 100644 --- a/src/output/output_webrtc.h +++ b/src/output/output_webrtc.h @@ -115,8 +115,9 @@ namespace Mist{ ///< stream. uint8_t RTXPayloadType; ///< The retransmission payload type when we use RTX (retransmission ///< with separate SSRC/payload type) - uint16_t prevReceivedSequenceNumber; ///< The previously received sequence number. This is used - ///< to NACK packets when we loose one. + void gotPacket(uint32_t ts); + uint32_t lastTransit; + double jitter; }; /* ------------------------------------------------ */ @@ -201,6 +202,7 @@ namespace Mist{ ///< to the other peer. This gets protected. uint32_t videoBitrate; ///< The bitrate to use for incoming video streams. Can be configured via ///< the signaling channel. Defaults to 6mbit. + uint32_t videoConstraint; size_t audTrack, vidTrack, prevVidTrack; double target_rate; ///< Target playback speed rate (1.0 = normal, 0 = auto) @@ -214,6 +216,11 @@ namespace Mist{ bool doDTLS; bool volkswagenMode; + double stats_jitter; + uint64_t stats_nacknum; + uint64_t stats_lossnum; + double stats_lossperc; + #if defined(WEBRTC_PCAP) PCAPWriter pcapOut; ///< Used during development to write unprotected packets that can be ///< inspected in e.g. wireshark.