From 2a8f2f75d3916a7818c5634e1abca1176f0644d6 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 15 Jun 2023 12:34:25 +0200 Subject: [PATCH] Implemented UDP socket packet send pacing, WebRTC now makes use of this new feature. --- lib/socket.cpp | 49 ++++++++++++++++++++++++++++++++++++ lib/socket.h | 4 +++ lib/timing.cpp | 14 +++++++++++ lib/timing.h | 1 + src/output/output.cpp | 2 +- src/output/output.h | 1 + src/output/output_webrtc.cpp | 22 ++++++++-------- src/output/output_webrtc.h | 2 ++ 8 files changed, 83 insertions(+), 12 deletions(-) diff --git a/lib/socket.cpp b/lib/socket.cpp index a0972f8a..203a6b79 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -1608,6 +1608,7 @@ int Socket::Server::getSocket(){ /// If both fail, prints an DLVL_FAIL debug message. /// \param nonblock Whether the socket should be nonblocking. Socket::UDPConnection::UDPConnection(bool nonblock){ + lastPace = 0; boundPort = 0; family = AF_INET6; sock = socket(AF_INET6, SOCK_DGRAM, 0); @@ -1680,6 +1681,7 @@ void Socket::UDPConnection::checkRecvBuf(){ /// Copies a UDP socket, re-allocating local copies of any needed structures. /// The data/data_size/data_len variables are *not* copied over. Socket::UDPConnection::UDPConnection(const UDPConnection &o){ + lastPace = 0; boundPort = 0; family = AF_INET6; sock = socket(AF_INET6, SOCK_DGRAM, 0); @@ -1892,6 +1894,53 @@ void Socket::UDPConnection::SendNow(const char *sdata, size_t len){ } } +void Socket::UDPConnection::sendPaced(const char *sdata, size_t len){ + if (!paceQueue.size() && (!lastPace || Util::getMicros(lastPace) > 10000)){ + SendNow(sdata, len); + lastPace = Util::getMicros(); + }else{ + paceQueue.push_back(Util::ResizeablePointer()); + paceQueue.back().assign(sdata, len); + // Try to send a packet, if time allows + //sendPaced(0); + } +} + +/// Spends uSendWindow microseconds either sending paced packets or sleeping, whichever is more appropriate +void Socket::UDPConnection::sendPaced(uint64_t uSendWindow){ + uint64_t currPace = Util::getMicros(); + do{ + uint64_t uTime = Util::getMicros(); + uint64_t sleepTime = uTime - currPace; + if (sleepTime > uSendWindow){ + sleepTime = 0; + }else{ + sleepTime = uSendWindow - sleepTime; + } + uint64_t paceWait = uTime - lastPace; + size_t qSize = paceQueue.size(); + // If the queue is complete, wait out the remainder of the time + if (!qSize){ + Util::usleep(sleepTime); + return; + } + // Otherwise, target clearing the queue in 25ms at most. + uint64_t targetTime = 25000 / qSize; + // If this slows us to below 1 packet per 5ms, go that speed instead. + if (targetTime > 5000){targetTime = 5000;} + // If the wait is over, send now. + if (paceWait >= targetTime){ + SendNow(*paceQueue.begin(), paceQueue.begin()->size()); + paceQueue.pop_front(); + lastPace = uTime; + continue; + } + // Otherwise, wait for the smaller of remaining wait time or remaining send window time. + if (targetTime - paceWait < sleepTime){sleepTime = targetTime - paceWait;} + Util::usleep(sleepTime); + }while(Util::getMicros(currPace) < uSendWindow); +} + std::string Socket::UDPConnection::getBoundAddress(){ std::string boundaddr; uint32_t boundport; diff --git a/lib/socket.h b/lib/socket.h index ddba48ac..8edb852e 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -206,6 +206,8 @@ namespace Socket{ std::string boundAddr, boundMulti; int boundPort; void checkRecvBuf(); + std::deque paceQueue; + uint64_t lastPace; public: Util::ResizeablePointer data; @@ -228,6 +230,8 @@ namespace Socket{ void SendNow(const std::string &data); void SendNow(const char *data); void SendNow(const char *data, size_t len); + void sendPaced(const char * data, size_t len); + void sendPaced(uint64_t uSendWindow); void setSocketFamily(int AF_TYPE); }; }// namespace Socket diff --git a/lib/timing.cpp b/lib/timing.cpp index 3487d393..2834667b 100644 --- a/lib/timing.cpp +++ b/lib/timing.cpp @@ -55,6 +55,20 @@ void Util::sleep(int64_t ms){ nanosleep(&T, 0); } +/// Sleeps for roughly the indicated amount of microseconds. +/// Will not sleep if ms is negative. +/// Will not sleep for longer than 0.1 seconds (100000us). +/// Can be interrupted early by a signal, no guarantee of minimum sleep time. +/// Can be slightly off depending on OS accuracy. +void Util::usleep(int64_t us){ + if (us < 0){return;} + if (us > 100000){us = 100000;} + struct timespec T; + T.tv_sec = 0; + T.tv_nsec = 1000 * us; + nanosleep(&T, 0); +} + uint64_t Util::getNTP(){ struct timespec t; clock_gettime(CLOCK_REALTIME, &t); diff --git a/lib/timing.h b/lib/timing.h index a0eff784..979d338a 100644 --- a/lib/timing.h +++ b/lib/timing.h @@ -8,6 +8,7 @@ namespace Util{ void wait(int64_t ms); ///< Sleeps for the indicated amount of milliseconds or longer. void sleep(int64_t ms); ///< Sleeps for roughly the indicated amount of milliseconds. + void usleep(int64_t us); ///< Sleeps for roughly the indicated amount of microseconds. uint64_t getMS(); ///< Gets the current time in milliseconds. uint64_t bootSecs(); ///< Gets the current system uptime in seconds. uint64_t unixMS(); ///< Gets the current Unix time in milliseconds. diff --git a/src/output/output.cpp b/src/output/output.cpp index 397f31a7..114086e7 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1607,7 +1607,7 @@ namespace Mist{ keepGoing()){ uint64_t amount = thisPacket.getTime() - (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead); if (amount > 1000){amount = 1000;} - Util::sleep(amount); + idleTime(amount); //Make sure we stay responsive to requests and stats while waiting if (wantRequest){ requestHandler(); diff --git a/src/output/output.h b/src/output/output.h index f4210a5f..5ce23375 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -127,6 +127,7 @@ namespace Mist{ std::set getSupportedTracks(const std::string &type = "") const; inline virtual bool keepGoing(){return config->is_active && myConn;} + virtual void idleTime(uint64_t ms){Util::sleep(ms);} Comms::Connections statComm; bool isBlocking; ///< If true, indicates that myConn is blocking. diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index 286749a6..5004a35e 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -306,7 +306,7 @@ namespace Mist{ void OutWebRTC::requestHandler(){ if (noSignalling){ - if (!parseData){Util::sleep(500);} + if (!parseData){udp.sendPaced(10000);} //After 10s of no packets, abort if (Util::bootMS() > lastRecv + 10000){ Util::logExitReason("received no data for 10+ seconds"); @@ -1155,7 +1155,7 @@ namespace Mist{ void OutWebRTC::handleWebRTCInputOutputFromThread(){ udp.allocateDestination(); while (keepGoing()){ - if (!handleWebRTCInputOutput()){Util::sleep(20);} + if (!handleWebRTCInputOutput()){Util::sleep(10);} } } @@ -1291,7 +1291,7 @@ namespace Mist{ stun_writer.writeFingerprint(); stun_writer.end(); - udp.SendNow((const char *)stun_writer.getBufferPtr(), stun_writer.getBufferSize()); + udp.sendPaced((const char *)stun_writer.getBufferPtr(), stun_writer.getBufferSize()); myConn.addUp(stun_writer.getBufferSize()); } @@ -1336,7 +1336,7 @@ namespace Mist{ HIGH_MSG("Could not answer NACK for %" PRIu32 " #%" PRIu16 ": packet not buffered", pSSRC, seq); return; } - udp.SendNow(nb.getData(seq), nb.getSize(seq)); + udp.sendPaced(nb.getData(seq), nb.getSize(seq)); myConn.addUp(nb.getSize(seq)); HIGH_MSG("Answered NACK for %" PRIu32 " #%" PRIu16, pSSRC, seq); } @@ -1524,7 +1524,7 @@ namespace Mist{ /* ------------------------------------------------ */ int OutWebRTC::onDTLSHandshakeWantsToWrite(const uint8_t *data, int *nbytes){ - udp.SendNow((const char *)data, (size_t)*nbytes); + udp.sendPaced((const char *)data, (size_t)*nbytes); myConn.addUp(*nbytes); return 0; } @@ -1619,7 +1619,7 @@ namespace Mist{ return; } } - udp.SendNow(rtpOutBuffer, (size_t)protectedSize); + udp.sendPaced(rtpOutBuffer, (size_t)protectedSize); RTP::Packet tmpPkt(rtpOutBuffer, protectedSize); uint32_t pSSRC = tmpPkt.getSSRC(); @@ -1658,7 +1658,7 @@ namespace Mist{ } } - udp.SendNow(rtpOutBuffer, rtcpPacketSize); + udp.sendPaced(rtpOutBuffer, rtcpPacketSize); myConn.addUp(rtcpPacketSize); if (volkswagenMode){ @@ -1681,7 +1681,7 @@ namespace Mist{ // first make sure that we complete the DTLS handshake. if(doDTLS){ while (keepGoing() && !dtlsHandshake.hasKeyingMaterial()){ - if (!handleWebRTCInputOutput()){Util::sleep(10);} + if (!handleWebRTCInputOutput()){udp.sendPaced(10000);} if (lastRecv < Util::bootMS() - 10000){ WARN_MSG("Killing idle connection in handshake phase"); onFail("idle connection in handshake phase", false); @@ -1901,7 +1901,7 @@ namespace Mist{ } } - udp.SendNow((const char *)&buffer[0], buffer_size_in_bytes); + udp.sendPaced((const char *)&buffer[0], buffer_size_in_bytes); myConn.addUp(buffer_size_in_bytes); if (volkswagenMode){ @@ -1942,7 +1942,7 @@ namespace Mist{ } } - udp.SendNow((const char *)&buffer[0], buffer_size_in_bytes); + udp.sendPaced((const char *)&buffer[0], buffer_size_in_bytes); myConn.addUp(buffer_size_in_bytes); if (volkswagenMode){ @@ -1993,7 +1993,7 @@ namespace Mist{ } } - udp.SendNow((const char *)&buffer[0], buffer_size_in_bytes); + udp.sendPaced((const char *)&buffer[0], buffer_size_in_bytes); myConn.addUp(buffer_size_in_bytes); if (volkswagenMode){ diff --git a/src/output/output_webrtc.h b/src/output/output_webrtc.h index 477b07ef..02ef7bd9 100644 --- a/src/output/output_webrtc.h +++ b/src/output/output_webrtc.h @@ -148,6 +148,8 @@ namespace Mist{ virtual void connStats(uint64_t now, Comms::Connections &statComm); inline virtual bool keepGoing(){return config->is_active && (noSignalling || myConn);} virtual void requestHandler(); + protected: + virtual void idleTime(uint64_t ms){udp.sendPaced(ms*1000);} private: bool noSignalling; uint64_t lastRecv;