From 19a55828a3590a3e4d7cec434381875620b50320 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 25 Jun 2020 01:12:13 +0200 Subject: [PATCH] WebRTC fixes/improvements: - Added public host setting - Implemented Sender Report based track time syncing - Added 10 second timeout for output connections (no timeout for input connections) - Timing fixes --- lib/rtp.cpp | 22 ++++++++++++- lib/rtp.h | 3 ++ src/output/output_webrtc.cpp | 61 +++++++++++++++++++++++++++++++++--- src/output/output_webrtc.h | 4 +++ 4 files changed, 84 insertions(+), 6 deletions(-) diff --git a/lib/rtp.cpp b/lib/rtp.cpp index 5d2072d0..7ffb69b9 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -616,6 +616,7 @@ namespace RTP{ if (M.getCodec(tid) == "opus"){ m = 48.0; } + bootMsOffset = M.getBootMsOffset(); setProperties(M.getID(tid), M.getCodec(tid), M.getType(tid), M.getInit(tid), m); } @@ -625,6 +626,24 @@ namespace RTP{ cbInit = cbI; } + /// Improves A/V sync by providing an NTP time source + /// msDiff is the amount of millis our current NTP time is ahead of the sync moment NTP time + /// May be negative, if we're behind instead of ahead. + void toDTSC::timeSync(uint32_t rtpTime, int64_t msDiff){ + if (!firstTime){return;} + uint64_t rtp64Time = rtpTime; + if (recentWrap){ + if (rtpTime > 0x80000000lu){rtp64Time -= 0x100000000ll;} + } + uint64_t msTime = (rtp64Time - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier + milliSync; + int32_t rtpDiff = (bootMsOffset + msTime) - (Util::bootMS() - msDiff); + if (rtpDiff > 25 || rtpDiff < -25){ + INFO_MSG("RTP difference (%s %s): %" PRId32 "ms, syncing...", type.c_str(), codec.c_str(), rtpDiff); + milliSync -= rtpDiff; + } + + } + /// Adds an RTP packet to the converter, outputting DTSC packets and/or updating init data, /// as-needed. void toDTSC::addRTP(const RTP::Packet &pkt){ @@ -636,6 +655,7 @@ namespace RTP{ // This part isn't codec-specific, so we do it before anything else. int64_t pTime = pkt.getTimeStamp(); if (!firstTime){ + milliSync = Util::bootMS() - bootMsOffset; firstTime = pTime + 1; INFO_MSG("RTP timestamp rollover expected in " PRETTY_PRINT_TIME, PRETTY_ARG_TIME((0xFFFFFFFFul - firstTime) / multiplier / 1000)); @@ -651,7 +671,7 @@ namespace RTP{ } } prevTime = pkt.getTimeStamp(); - uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier; + uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier + milliSync; char *pl = (char *)pkt.getPayload(); uint32_t plSize = pkt.getPayloadSize(); bool missed = lastSeq != (pkt.getSequence() - 1); diff --git a/lib/rtp.h b/lib/rtp.h index 9adf5eb6..9b2725bb 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -139,6 +139,7 @@ namespace RTP{ void setCallbacks(void (*cbPack)(const DTSC::Packet &pkt), void (*cbInit)(const uint64_t track, const std::string &initData)); void addRTP(const RTP::Packet &rPkt); + void timeSync(uint32_t rtpTime, int64_t msDiff); virtual void outPacket(const DTSC::Packet &pkt){ if (cbPack){cbPack(pkt);} } @@ -148,6 +149,7 @@ namespace RTP{ public: uint64_t trackId; + uint64_t bootMsOffset; double multiplier; ///< Multiplier to convert from millis to RTP time std::string codec; ///< Codec of this track std::string type; ///< Type of this track @@ -159,6 +161,7 @@ namespace RTP{ bool recentWrap; ///< True if a wraparound happened recently. uint32_t prevTime; uint64_t firstTime; + int32_t milliSync; void (*cbPack)(const DTSC::Packet &pkt); void (*cbInit)(const uint64_t track, const std::string &initData); // Codec-specific handlers diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index 6b7a1fc6..117dd8b5 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -47,6 +47,7 @@ namespace Mist{ /* ------------------------------------------------ */ OutWebRTC::OutWebRTC(Socket::Connection &myConn) : HTTPOutput(myConn){ + lastRecv = Util::bootMS(); stats_jitter = 0; stats_nacknum = 0; stats_lossnum = 0; @@ -76,6 +77,7 @@ namespace Mist{ didReceiveKeyFrame = false; doDTLS = true; volkswagenMode = false; + syncedNTPClock = false; if (cert.init("NL", "webrtc", "webrtc") != 0){ onFail("Failed to create the certificate.", true); @@ -147,7 +149,7 @@ namespace Mist{ capa["optional"]["preferredaudiocodec"]["option"] = "--webrtc-audio-codecs"; capa["optional"]["preferredaudiocodec"]["short"] = "A"; - capa["optional"]["bindhost"]["name"] = "UDP bind address"; + capa["optional"]["bindhost"]["name"] = "UDP bind address (internal)"; capa["optional"]["bindhost"]["help"] = "Interface address or hostname to bind SRTP UDP socket " "to. Defaults to originating interface address."; capa["optional"]["bindhost"]["default"] = ""; @@ -155,6 +157,13 @@ namespace Mist{ capa["optional"]["bindhost"]["option"] = "--bindhost"; capa["optional"]["bindhost"]["short"] = "B"; + capa["optional"]["pubhost"]["name"] = "UDP bind address (public)"; + capa["optional"]["pubhost"]["help"] = "Interface address or hostname for clients to connect to. Defaults to internal address."; + capa["optional"]["pubhost"]["default"] = ""; + capa["optional"]["pubhost"]["type"] = "str"; + capa["optional"]["pubhost"]["option"] = "--pubhost"; + capa["optional"]["pubhost"]["short"] = "H"; + capa["optional"]["mergesessions"]["name"] = "merge sessions"; capa["optional"]["mergesessions"]["help"] = "if enabled, merges together all views from a single user into a single combined session. " @@ -230,6 +239,7 @@ namespace Mist{ // The signaling data contains commands that are used to start // an input or output stream. void OutWebRTC::onWebsocketFrame(){ + lastRecv = Util::bootMS(); if (webSock->frameType != 1){ HIGH_MSG("Ignoring non-text websocket frame"); return; @@ -730,6 +740,9 @@ namespace Mist{ // This function is called to handle an offer from a peer that wants to push data towards us. bool OutWebRTC::handleSignalingCommandRemoteOfferForInput(SDP::Session &sdpSession){ + + if (!meta.getBootMsOffset()){meta.setBootMsOffset(Util::bootMS());} + if (webRTCInputOutputThread != NULL){ FAIL_MSG("It seems that we're already have a webrtc i/o thread running."); return false; @@ -884,6 +897,9 @@ namespace Mist{ } Util::Procs::socketList.insert(udp.getSock()); + if (config && config->hasOption("pubhost") && config->getString("pubhost").size()){ + bindAddr = config->getString("pubhost"); + } sdpAnswer.setCandidate(bindAddr, udpPort); return true; } @@ -978,6 +994,7 @@ namespace Mist{ usernameLocal.c_str()); return; } + lastRecv = Util::bootMS(); std::string remoteIP = ""; uint32_t remotePort = 0; @@ -1010,6 +1027,7 @@ namespace Mist{ FAIL_MSG("Failed to parse a DTLS packet."); return; } + lastRecv = Util::bootMS(); if (!dtlsHandshake.hasKeyingMaterial()){ if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "DTLS: No keying material (yet)" << std::endl;} @@ -1077,6 +1095,7 @@ namespace Mist{ FAIL_MSG("Failed to unprotect a RTP packet."); return; } + lastRecv = Util::bootMS(); RTP::Packet unprotPack(udp.data, len); DONTEVEN_MSG("%s", unprotPack.toString().c_str()); @@ -1108,6 +1127,7 @@ namespace Mist{ FAIL_MSG("Failed to unprotect RTCP."); return; } + lastRecv = Util::bootMS(); uint8_t fmt = udp.data[0] & 0x1F; if (pt == 77 || pt == 65){ //77/65 = nack @@ -1154,10 +1174,23 @@ namespace Mist{ for (it = webrtcTracks.begin(); it != webrtcTracks.end(); ++it){ if (it->second.SSRC == SSRC){ it->second.sorter.lastBootMS = Util::bootMS(); - it->second.sorter.lastNTP = Bit::btohl(udp.data+10);; + it->second.sorter.lastNTP = Bit::btohl(udp.data+10); + uint64_t ntpTime = Bit::btohll(udp.data + 8); + uint32_t rtpTime = Bit::btohl(udp.data + 16); uint32_t packets = Bit::btohl(udp.data + 20); uint32_t bytes = Bit::btohl(udp.data + 24); - HIGH_MSG("Received sender report for track %s (%" PRIu32 " pkts, %" PRIu32 "b)", it->second.rtpToDTSC.codec.c_str(), packets, bytes); + HIGH_MSG("Received sender report for track %s (%" PRIu32 " pkts, %" PRIu32 "b) time: %" PRIu32 " RTP = %" PRIu64 " NTP", it->second.rtpToDTSC.codec.c_str(), packets, bytes, rtpTime, ntpTime); + if (rtpTime && ntpTime){ + //msDiff is the amount of millis our current NTP time is ahead of the sync moment NTP time + //May be negative, if we're behind instead of ahead. + uint64_t ntpDiff = Util::getNTP()-ntpTime; + int64_t msDiff = (ntpDiff>>32) * 1000 + (ntpDiff & 0xFFFFFFFFul) / 4294967.295; + if (!syncedNTPClock){ + syncedNTPClock = true; + ntpClockDifference = -msDiff; + } + it->second.rtpToDTSC.timeSync(rtpTime, msDiff+ntpClockDifference); + } break; } } @@ -1338,12 +1371,23 @@ namespace Mist{ if(doDTLS){ while (keepGoing() && !dtlsHandshake.hasKeyingMaterial()){ if (!handleWebRTCInputOutput()){Util::sleep(10);} + if (lastRecv < Util::bootMS() - 10000){ + WARN_MSG("Killing idle connection in handshake phase"); + onFail("idle connection in handshake phase", false); + return; + } } } sentHeader = true; } void OutWebRTC::sendNext(){ + if (lastRecv < Util::bootMS() - 10000){ + WARN_MSG("Killing idle connection"); + onFail("idle connection", false); + return; + } + // Handle nice move-over to new track ID if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){ if (!thisPacket.getFlag("keyframe")){ @@ -1401,7 +1445,14 @@ namespace Mist{ } WebRTCTrack &rtcTrack = *trackPointer; - rtcTrack.rtpPacketizer.setTimestamp(thisPacket.getTime() * SDP::getMultiplier(&M, thisIdx)); + double mult = SDP::getMultiplier(&M, thisIdx); + // This checks if we have a whole integer multiplier, and if so, + // ensures only integer math is used to prevent rounding errors + if (mult == (uint64_t)mult){ + rtcTrack.rtpPacketizer.setTimestamp(thisPacket.getTime() * (uint64_t)mult); + }else{ + rtcTrack.rtpPacketizer.setTimestamp(thisPacket.getTime() * mult); + } bool isKeyFrame = thisPacket.getFlag("keyframe"); didReceiveKeyFrame = isKeyFrame; @@ -1427,7 +1478,7 @@ namespace Mist{ rtcTrack.rtpPacketizer.sendData(&udp, onRTPPacketizerHasDataCallback, dataPointer, dataLen, rtcTrack.payloadType, M.getCodec(thisIdx)); - if (!lastSR.count(thisIdx) || lastSR[thisIdx] < Util::bootMS() + 250){ + if (!lastSR.count(thisIdx) || lastSR[thisIdx]+500 < Util::bootMS()){ lastSR[thisIdx] = Util::bootMS(); rtcTrack.rtpPacketizer.sendRTCP_SR((void *)&udp, onRTPPacketizerHasRTCPDataCallback); } diff --git a/src/output/output_webrtc.h b/src/output/output_webrtc.h index ef98d0b1..687da87e 100644 --- a/src/output/output_webrtc.h +++ b/src/output/output_webrtc.h @@ -145,6 +145,7 @@ namespace Mist{ void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes); private: + uint64_t lastRecv; uint64_t lastPackMs; std::ofstream jitterLog; std::ofstream packetLog; @@ -234,6 +235,9 @@ namespace Mist{ ///< future. std::map outBuffers; std::map lastSR; + + int64_t ntpClockDifference; + bool syncedNTPClock; }; }// namespace Mist