WebRTC fixes/improvements:

- Added public host setting
- Implemented Sender Report based  track time syncing
- Added 10 second timeout for output connections (no timeout for input connections)
- Timing fixes
This commit is contained in:
Thulinma 2020-06-25 01:12:13 +02:00
parent cff43da016
commit 19a55828a3
4 changed files with 84 additions and 6 deletions

View file

@ -616,6 +616,7 @@ namespace RTP{
if (M.getCodec(tid) == "opus"){ if (M.getCodec(tid) == "opus"){
m = 48.0; m = 48.0;
} }
bootMsOffset = M.getBootMsOffset();
setProperties(M.getID(tid), M.getCodec(tid), M.getType(tid), M.getInit(tid), m); setProperties(M.getID(tid), M.getCodec(tid), M.getType(tid), M.getInit(tid), m);
} }
@ -625,6 +626,24 @@ namespace RTP{
cbInit = cbI; cbInit = cbI;
} }
/// Improves A/V sync by providing an NTP time source
/// msDiff is the amount of millis our current NTP time is ahead of the sync moment NTP time
/// May be negative, if we're behind instead of ahead.
void toDTSC::timeSync(uint32_t rtpTime, int64_t msDiff){
if (!firstTime){return;}
uint64_t rtp64Time = rtpTime;
if (recentWrap){
if (rtpTime > 0x80000000lu){rtp64Time -= 0x100000000ll;}
}
uint64_t msTime = (rtp64Time - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier + milliSync;
int32_t rtpDiff = (bootMsOffset + msTime) - (Util::bootMS() - msDiff);
if (rtpDiff > 25 || rtpDiff < -25){
INFO_MSG("RTP difference (%s %s): %" PRId32 "ms, syncing...", type.c_str(), codec.c_str(), rtpDiff);
milliSync -= rtpDiff;
}
}
/// Adds an RTP packet to the converter, outputting DTSC packets and/or updating init data, /// Adds an RTP packet to the converter, outputting DTSC packets and/or updating init data,
/// as-needed. /// as-needed.
void toDTSC::addRTP(const RTP::Packet &pkt){ void toDTSC::addRTP(const RTP::Packet &pkt){
@ -636,6 +655,7 @@ namespace RTP{
// This part isn't codec-specific, so we do it before anything else. // This part isn't codec-specific, so we do it before anything else.
int64_t pTime = pkt.getTimeStamp(); int64_t pTime = pkt.getTimeStamp();
if (!firstTime){ if (!firstTime){
milliSync = Util::bootMS() - bootMsOffset;
firstTime = pTime + 1; firstTime = pTime + 1;
INFO_MSG("RTP timestamp rollover expected in " PRETTY_PRINT_TIME, INFO_MSG("RTP timestamp rollover expected in " PRETTY_PRINT_TIME,
PRETTY_ARG_TIME((0xFFFFFFFFul - firstTime) / multiplier / 1000)); PRETTY_ARG_TIME((0xFFFFFFFFul - firstTime) / multiplier / 1000));
@ -651,7 +671,7 @@ namespace RTP{
} }
} }
prevTime = pkt.getTimeStamp(); prevTime = pkt.getTimeStamp();
uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier; uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier + milliSync;
char *pl = (char *)pkt.getPayload(); char *pl = (char *)pkt.getPayload();
uint32_t plSize = pkt.getPayloadSize(); uint32_t plSize = pkt.getPayloadSize();
bool missed = lastSeq != (pkt.getSequence() - 1); bool missed = lastSeq != (pkt.getSequence() - 1);

View file

@ -139,6 +139,7 @@ namespace RTP{
void setCallbacks(void (*cbPack)(const DTSC::Packet &pkt), void setCallbacks(void (*cbPack)(const DTSC::Packet &pkt),
void (*cbInit)(const uint64_t track, const std::string &initData)); void (*cbInit)(const uint64_t track, const std::string &initData));
void addRTP(const RTP::Packet &rPkt); void addRTP(const RTP::Packet &rPkt);
void timeSync(uint32_t rtpTime, int64_t msDiff);
virtual void outPacket(const DTSC::Packet &pkt){ virtual void outPacket(const DTSC::Packet &pkt){
if (cbPack){cbPack(pkt);} if (cbPack){cbPack(pkt);}
} }
@ -148,6 +149,7 @@ namespace RTP{
public: public:
uint64_t trackId; uint64_t trackId;
uint64_t bootMsOffset;
double multiplier; ///< Multiplier to convert from millis to RTP time double multiplier; ///< Multiplier to convert from millis to RTP time
std::string codec; ///< Codec of this track std::string codec; ///< Codec of this track
std::string type; ///< Type of this track std::string type; ///< Type of this track
@ -159,6 +161,7 @@ namespace RTP{
bool recentWrap; ///< True if a wraparound happened recently. bool recentWrap; ///< True if a wraparound happened recently.
uint32_t prevTime; uint32_t prevTime;
uint64_t firstTime; uint64_t firstTime;
int32_t milliSync;
void (*cbPack)(const DTSC::Packet &pkt); void (*cbPack)(const DTSC::Packet &pkt);
void (*cbInit)(const uint64_t track, const std::string &initData); void (*cbInit)(const uint64_t track, const std::string &initData);
// Codec-specific handlers // Codec-specific handlers

View file

@ -47,6 +47,7 @@ namespace Mist{
/* ------------------------------------------------ */ /* ------------------------------------------------ */
OutWebRTC::OutWebRTC(Socket::Connection &myConn) : HTTPOutput(myConn){ OutWebRTC::OutWebRTC(Socket::Connection &myConn) : HTTPOutput(myConn){
lastRecv = Util::bootMS();
stats_jitter = 0; stats_jitter = 0;
stats_nacknum = 0; stats_nacknum = 0;
stats_lossnum = 0; stats_lossnum = 0;
@ -76,6 +77,7 @@ namespace Mist{
didReceiveKeyFrame = false; didReceiveKeyFrame = false;
doDTLS = true; doDTLS = true;
volkswagenMode = false; volkswagenMode = false;
syncedNTPClock = false;
if (cert.init("NL", "webrtc", "webrtc") != 0){ if (cert.init("NL", "webrtc", "webrtc") != 0){
onFail("Failed to create the certificate.", true); onFail("Failed to create the certificate.", true);
@ -147,7 +149,7 @@ namespace Mist{
capa["optional"]["preferredaudiocodec"]["option"] = "--webrtc-audio-codecs"; capa["optional"]["preferredaudiocodec"]["option"] = "--webrtc-audio-codecs";
capa["optional"]["preferredaudiocodec"]["short"] = "A"; capa["optional"]["preferredaudiocodec"]["short"] = "A";
capa["optional"]["bindhost"]["name"] = "UDP bind address"; capa["optional"]["bindhost"]["name"] = "UDP bind address (internal)";
capa["optional"]["bindhost"]["help"] = "Interface address or hostname to bind SRTP UDP socket " capa["optional"]["bindhost"]["help"] = "Interface address or hostname to bind SRTP UDP socket "
"to. Defaults to originating interface address."; "to. Defaults to originating interface address.";
capa["optional"]["bindhost"]["default"] = ""; capa["optional"]["bindhost"]["default"] = "";
@ -155,6 +157,13 @@ namespace Mist{
capa["optional"]["bindhost"]["option"] = "--bindhost"; capa["optional"]["bindhost"]["option"] = "--bindhost";
capa["optional"]["bindhost"]["short"] = "B"; capa["optional"]["bindhost"]["short"] = "B";
capa["optional"]["pubhost"]["name"] = "UDP bind address (public)";
capa["optional"]["pubhost"]["help"] = "Interface address or hostname for clients to connect to. Defaults to internal address.";
capa["optional"]["pubhost"]["default"] = "";
capa["optional"]["pubhost"]["type"] = "str";
capa["optional"]["pubhost"]["option"] = "--pubhost";
capa["optional"]["pubhost"]["short"] = "H";
capa["optional"]["mergesessions"]["name"] = "merge sessions"; capa["optional"]["mergesessions"]["name"] = "merge sessions";
capa["optional"]["mergesessions"]["help"] = capa["optional"]["mergesessions"]["help"] =
"if enabled, merges together all views from a single user into a single combined session. " "if enabled, merges together all views from a single user into a single combined session. "
@ -230,6 +239,7 @@ namespace Mist{
// The signaling data contains commands that are used to start // The signaling data contains commands that are used to start
// an input or output stream. // an input or output stream.
void OutWebRTC::onWebsocketFrame(){ void OutWebRTC::onWebsocketFrame(){
lastRecv = Util::bootMS();
if (webSock->frameType != 1){ if (webSock->frameType != 1){
HIGH_MSG("Ignoring non-text websocket frame"); HIGH_MSG("Ignoring non-text websocket frame");
return; return;
@ -730,6 +740,9 @@ namespace Mist{
// This function is called to handle an offer from a peer that wants to push data towards us. // This function is called to handle an offer from a peer that wants to push data towards us.
bool OutWebRTC::handleSignalingCommandRemoteOfferForInput(SDP::Session &sdpSession){ bool OutWebRTC::handleSignalingCommandRemoteOfferForInput(SDP::Session &sdpSession){
if (!meta.getBootMsOffset()){meta.setBootMsOffset(Util::bootMS());}
if (webRTCInputOutputThread != NULL){ if (webRTCInputOutputThread != NULL){
FAIL_MSG("It seems that we're already have a webrtc i/o thread running."); FAIL_MSG("It seems that we're already have a webrtc i/o thread running.");
return false; return false;
@ -884,6 +897,9 @@ namespace Mist{
} }
Util::Procs::socketList.insert(udp.getSock()); Util::Procs::socketList.insert(udp.getSock());
if (config && config->hasOption("pubhost") && config->getString("pubhost").size()){
bindAddr = config->getString("pubhost");
}
sdpAnswer.setCandidate(bindAddr, udpPort); sdpAnswer.setCandidate(bindAddr, udpPort);
return true; return true;
} }
@ -978,6 +994,7 @@ namespace Mist{
usernameLocal.c_str()); usernameLocal.c_str());
return; return;
} }
lastRecv = Util::bootMS();
std::string remoteIP = ""; std::string remoteIP = "";
uint32_t remotePort = 0; uint32_t remotePort = 0;
@ -1010,6 +1027,7 @@ namespace Mist{
FAIL_MSG("Failed to parse a DTLS packet."); FAIL_MSG("Failed to parse a DTLS packet.");
return; return;
} }
lastRecv = Util::bootMS();
if (!dtlsHandshake.hasKeyingMaterial()){ if (!dtlsHandshake.hasKeyingMaterial()){
if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "DTLS: No keying material (yet)" << std::endl;} if (packetLog.is_open()){packetLog << "[" << Util::bootMS() << "]" << "DTLS: No keying material (yet)" << std::endl;}
@ -1077,6 +1095,7 @@ namespace Mist{
FAIL_MSG("Failed to unprotect a RTP packet."); FAIL_MSG("Failed to unprotect a RTP packet.");
return; return;
} }
lastRecv = Util::bootMS();
RTP::Packet unprotPack(udp.data, len); RTP::Packet unprotPack(udp.data, len);
DONTEVEN_MSG("%s", unprotPack.toString().c_str()); DONTEVEN_MSG("%s", unprotPack.toString().c_str());
@ -1108,6 +1127,7 @@ namespace Mist{
FAIL_MSG("Failed to unprotect RTCP."); FAIL_MSG("Failed to unprotect RTCP.");
return; return;
} }
lastRecv = Util::bootMS();
uint8_t fmt = udp.data[0] & 0x1F; uint8_t fmt = udp.data[0] & 0x1F;
if (pt == 77 || pt == 65){ if (pt == 77 || pt == 65){
//77/65 = nack //77/65 = nack
@ -1154,10 +1174,23 @@ namespace Mist{
for (it = webrtcTracks.begin(); it != webrtcTracks.end(); ++it){ for (it = webrtcTracks.begin(); it != webrtcTracks.end(); ++it){
if (it->second.SSRC == SSRC){ if (it->second.SSRC == SSRC){
it->second.sorter.lastBootMS = Util::bootMS(); it->second.sorter.lastBootMS = Util::bootMS();
it->second.sorter.lastNTP = Bit::btohl(udp.data+10);; it->second.sorter.lastNTP = Bit::btohl(udp.data+10);
uint64_t ntpTime = Bit::btohll(udp.data + 8);
uint32_t rtpTime = Bit::btohl(udp.data + 16);
uint32_t packets = Bit::btohl(udp.data + 20); uint32_t packets = Bit::btohl(udp.data + 20);
uint32_t bytes = Bit::btohl(udp.data + 24); uint32_t bytes = Bit::btohl(udp.data + 24);
HIGH_MSG("Received sender report for track %s (%" PRIu32 " pkts, %" PRIu32 "b)", it->second.rtpToDTSC.codec.c_str(), packets, bytes); HIGH_MSG("Received sender report for track %s (%" PRIu32 " pkts, %" PRIu32 "b) time: %" PRIu32 " RTP = %" PRIu64 " NTP", it->second.rtpToDTSC.codec.c_str(), packets, bytes, rtpTime, ntpTime);
if (rtpTime && ntpTime){
//msDiff is the amount of millis our current NTP time is ahead of the sync moment NTP time
//May be negative, if we're behind instead of ahead.
uint64_t ntpDiff = Util::getNTP()-ntpTime;
int64_t msDiff = (ntpDiff>>32) * 1000 + (ntpDiff & 0xFFFFFFFFul) / 4294967.295;
if (!syncedNTPClock){
syncedNTPClock = true;
ntpClockDifference = -msDiff;
}
it->second.rtpToDTSC.timeSync(rtpTime, msDiff+ntpClockDifference);
}
break; break;
} }
} }
@ -1338,12 +1371,23 @@ namespace Mist{
if(doDTLS){ if(doDTLS){
while (keepGoing() && !dtlsHandshake.hasKeyingMaterial()){ while (keepGoing() && !dtlsHandshake.hasKeyingMaterial()){
if (!handleWebRTCInputOutput()){Util::sleep(10);} if (!handleWebRTCInputOutput()){Util::sleep(10);}
if (lastRecv < Util::bootMS() - 10000){
WARN_MSG("Killing idle connection in handshake phase");
onFail("idle connection in handshake phase", false);
return;
}
} }
} }
sentHeader = true; sentHeader = true;
} }
void OutWebRTC::sendNext(){ void OutWebRTC::sendNext(){
if (lastRecv < Util::bootMS() - 10000){
WARN_MSG("Killing idle connection");
onFail("idle connection", false);
return;
}
// Handle nice move-over to new track ID // Handle nice move-over to new track ID
if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){ if (prevVidTrack != INVALID_TRACK_ID && thisIdx != prevVidTrack && M.getType(thisIdx) == "video"){
if (!thisPacket.getFlag("keyframe")){ if (!thisPacket.getFlag("keyframe")){
@ -1401,7 +1445,14 @@ namespace Mist{
} }
WebRTCTrack &rtcTrack = *trackPointer; WebRTCTrack &rtcTrack = *trackPointer;
rtcTrack.rtpPacketizer.setTimestamp(thisPacket.getTime() * SDP::getMultiplier(&M, thisIdx)); double mult = SDP::getMultiplier(&M, thisIdx);
// This checks if we have a whole integer multiplier, and if so,
// ensures only integer math is used to prevent rounding errors
if (mult == (uint64_t)mult){
rtcTrack.rtpPacketizer.setTimestamp(thisPacket.getTime() * (uint64_t)mult);
}else{
rtcTrack.rtpPacketizer.setTimestamp(thisPacket.getTime() * mult);
}
bool isKeyFrame = thisPacket.getFlag("keyframe"); bool isKeyFrame = thisPacket.getFlag("keyframe");
didReceiveKeyFrame = isKeyFrame; didReceiveKeyFrame = isKeyFrame;
@ -1427,7 +1478,7 @@ namespace Mist{
rtcTrack.rtpPacketizer.sendData(&udp, onRTPPacketizerHasDataCallback, dataPointer, dataLen, rtcTrack.rtpPacketizer.sendData(&udp, onRTPPacketizerHasDataCallback, dataPointer, dataLen,
rtcTrack.payloadType, M.getCodec(thisIdx)); rtcTrack.payloadType, M.getCodec(thisIdx));
if (!lastSR.count(thisIdx) || lastSR[thisIdx] < Util::bootMS() + 250){ if (!lastSR.count(thisIdx) || lastSR[thisIdx]+500 < Util::bootMS()){
lastSR[thisIdx] = Util::bootMS(); lastSR[thisIdx] = Util::bootMS();
rtcTrack.rtpPacketizer.sendRTCP_SR((void *)&udp, onRTPPacketizerHasRTCPDataCallback); rtcTrack.rtpPacketizer.sendRTCP_SR((void *)&udp, onRTPPacketizerHasRTCPDataCallback);
} }

View file

@ -145,6 +145,7 @@ namespace Mist{
void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes); void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes);
private: private:
uint64_t lastRecv;
uint64_t lastPackMs; uint64_t lastPackMs;
std::ofstream jitterLog; std::ofstream jitterLog;
std::ofstream packetLog; std::ofstream packetLog;
@ -234,6 +235,9 @@ namespace Mist{
///< future. ///< future.
std::map<uint32_t, nackBuffer> outBuffers; std::map<uint32_t, nackBuffer> outBuffers;
std::map<size_t, uint64_t> lastSR; std::map<size_t, uint64_t> lastSR;
int64_t ntpClockDifference;
bool syncedNTPClock;
}; };
}// namespace Mist }// namespace Mist