Implemented UDP socket packet send pacing, WebRTC now makes use of this new feature.
This commit is contained in:
parent
a1232d56af
commit
2a8f2f75d3
8 changed files with 83 additions and 12 deletions
|
@ -1608,6 +1608,7 @@ int Socket::Server::getSocket(){
|
||||||
/// If both fail, prints an DLVL_FAIL debug message.
|
/// If both fail, prints an DLVL_FAIL debug message.
|
||||||
/// \param nonblock Whether the socket should be nonblocking.
|
/// \param nonblock Whether the socket should be nonblocking.
|
||||||
Socket::UDPConnection::UDPConnection(bool nonblock){
|
Socket::UDPConnection::UDPConnection(bool nonblock){
|
||||||
|
lastPace = 0;
|
||||||
boundPort = 0;
|
boundPort = 0;
|
||||||
family = AF_INET6;
|
family = AF_INET6;
|
||||||
sock = socket(AF_INET6, SOCK_DGRAM, 0);
|
sock = socket(AF_INET6, SOCK_DGRAM, 0);
|
||||||
|
@ -1680,6 +1681,7 @@ void Socket::UDPConnection::checkRecvBuf(){
|
||||||
/// Copies a UDP socket, re-allocating local copies of any needed structures.
|
/// Copies a UDP socket, re-allocating local copies of any needed structures.
|
||||||
/// The data/data_size/data_len variables are *not* copied over.
|
/// The data/data_size/data_len variables are *not* copied over.
|
||||||
Socket::UDPConnection::UDPConnection(const UDPConnection &o){
|
Socket::UDPConnection::UDPConnection(const UDPConnection &o){
|
||||||
|
lastPace = 0;
|
||||||
boundPort = 0;
|
boundPort = 0;
|
||||||
family = AF_INET6;
|
family = AF_INET6;
|
||||||
sock = socket(AF_INET6, SOCK_DGRAM, 0);
|
sock = socket(AF_INET6, SOCK_DGRAM, 0);
|
||||||
|
@ -1892,6 +1894,53 @@ void Socket::UDPConnection::SendNow(const char *sdata, size_t len){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Socket::UDPConnection::sendPaced(const char *sdata, size_t len){
|
||||||
|
if (!paceQueue.size() && (!lastPace || Util::getMicros(lastPace) > 10000)){
|
||||||
|
SendNow(sdata, len);
|
||||||
|
lastPace = Util::getMicros();
|
||||||
|
}else{
|
||||||
|
paceQueue.push_back(Util::ResizeablePointer());
|
||||||
|
paceQueue.back().assign(sdata, len);
|
||||||
|
// Try to send a packet, if time allows
|
||||||
|
//sendPaced(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spends uSendWindow microseconds either sending paced packets or sleeping, whichever is more appropriate
|
||||||
|
void Socket::UDPConnection::sendPaced(uint64_t uSendWindow){
|
||||||
|
uint64_t currPace = Util::getMicros();
|
||||||
|
do{
|
||||||
|
uint64_t uTime = Util::getMicros();
|
||||||
|
uint64_t sleepTime = uTime - currPace;
|
||||||
|
if (sleepTime > uSendWindow){
|
||||||
|
sleepTime = 0;
|
||||||
|
}else{
|
||||||
|
sleepTime = uSendWindow - sleepTime;
|
||||||
|
}
|
||||||
|
uint64_t paceWait = uTime - lastPace;
|
||||||
|
size_t qSize = paceQueue.size();
|
||||||
|
// If the queue is complete, wait out the remainder of the time
|
||||||
|
if (!qSize){
|
||||||
|
Util::usleep(sleepTime);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Otherwise, target clearing the queue in 25ms at most.
|
||||||
|
uint64_t targetTime = 25000 / qSize;
|
||||||
|
// If this slows us to below 1 packet per 5ms, go that speed instead.
|
||||||
|
if (targetTime > 5000){targetTime = 5000;}
|
||||||
|
// If the wait is over, send now.
|
||||||
|
if (paceWait >= targetTime){
|
||||||
|
SendNow(*paceQueue.begin(), paceQueue.begin()->size());
|
||||||
|
paceQueue.pop_front();
|
||||||
|
lastPace = uTime;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Otherwise, wait for the smaller of remaining wait time or remaining send window time.
|
||||||
|
if (targetTime - paceWait < sleepTime){sleepTime = targetTime - paceWait;}
|
||||||
|
Util::usleep(sleepTime);
|
||||||
|
}while(Util::getMicros(currPace) < uSendWindow);
|
||||||
|
}
|
||||||
|
|
||||||
std::string Socket::UDPConnection::getBoundAddress(){
|
std::string Socket::UDPConnection::getBoundAddress(){
|
||||||
std::string boundaddr;
|
std::string boundaddr;
|
||||||
uint32_t boundport;
|
uint32_t boundport;
|
||||||
|
|
|
@ -206,6 +206,8 @@ namespace Socket{
|
||||||
std::string boundAddr, boundMulti;
|
std::string boundAddr, boundMulti;
|
||||||
int boundPort;
|
int boundPort;
|
||||||
void checkRecvBuf();
|
void checkRecvBuf();
|
||||||
|
std::deque<Util::ResizeablePointer> paceQueue;
|
||||||
|
uint64_t lastPace;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Util::ResizeablePointer data;
|
Util::ResizeablePointer data;
|
||||||
|
@ -228,6 +230,8 @@ namespace Socket{
|
||||||
void SendNow(const std::string &data);
|
void SendNow(const std::string &data);
|
||||||
void SendNow(const char *data);
|
void SendNow(const char *data);
|
||||||
void SendNow(const char *data, size_t len);
|
void SendNow(const char *data, size_t len);
|
||||||
|
void sendPaced(const char * data, size_t len);
|
||||||
|
void sendPaced(uint64_t uSendWindow);
|
||||||
void setSocketFamily(int AF_TYPE);
|
void setSocketFamily(int AF_TYPE);
|
||||||
};
|
};
|
||||||
}// namespace Socket
|
}// namespace Socket
|
||||||
|
|
|
@ -55,6 +55,20 @@ void Util::sleep(int64_t ms){
|
||||||
nanosleep(&T, 0);
|
nanosleep(&T, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sleeps for roughly the indicated amount of microseconds.
|
||||||
|
/// Will not sleep if ms is negative.
|
||||||
|
/// Will not sleep for longer than 0.1 seconds (100000us).
|
||||||
|
/// Can be interrupted early by a signal, no guarantee of minimum sleep time.
|
||||||
|
/// Can be slightly off depending on OS accuracy.
|
||||||
|
void Util::usleep(int64_t us){
|
||||||
|
if (us < 0){return;}
|
||||||
|
if (us > 100000){us = 100000;}
|
||||||
|
struct timespec T;
|
||||||
|
T.tv_sec = 0;
|
||||||
|
T.tv_nsec = 1000 * us;
|
||||||
|
nanosleep(&T, 0);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t Util::getNTP(){
|
uint64_t Util::getNTP(){
|
||||||
struct timespec t;
|
struct timespec t;
|
||||||
clock_gettime(CLOCK_REALTIME, &t);
|
clock_gettime(CLOCK_REALTIME, &t);
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
namespace Util{
|
namespace Util{
|
||||||
void wait(int64_t ms); ///< Sleeps for the indicated amount of milliseconds or longer.
|
void wait(int64_t ms); ///< Sleeps for the indicated amount of milliseconds or longer.
|
||||||
void sleep(int64_t ms); ///< Sleeps for roughly the indicated amount of milliseconds.
|
void sleep(int64_t ms); ///< Sleeps for roughly the indicated amount of milliseconds.
|
||||||
|
void usleep(int64_t us); ///< Sleeps for roughly the indicated amount of microseconds.
|
||||||
uint64_t getMS(); ///< Gets the current time in milliseconds.
|
uint64_t getMS(); ///< Gets the current time in milliseconds.
|
||||||
uint64_t bootSecs(); ///< Gets the current system uptime in seconds.
|
uint64_t bootSecs(); ///< Gets the current system uptime in seconds.
|
||||||
uint64_t unixMS(); ///< Gets the current Unix time in milliseconds.
|
uint64_t unixMS(); ///< Gets the current Unix time in milliseconds.
|
||||||
|
|
|
@ -1607,7 +1607,7 @@ namespace Mist{
|
||||||
keepGoing()){
|
keepGoing()){
|
||||||
uint64_t amount = thisPacket.getTime() - (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead);
|
uint64_t amount = thisPacket.getTime() - (((Util::bootMS() - firstTime) * 1000) / realTime + maxSkipAhead);
|
||||||
if (amount > 1000){amount = 1000;}
|
if (amount > 1000){amount = 1000;}
|
||||||
Util::sleep(amount);
|
idleTime(amount);
|
||||||
//Make sure we stay responsive to requests and stats while waiting
|
//Make sure we stay responsive to requests and stats while waiting
|
||||||
if (wantRequest){
|
if (wantRequest){
|
||||||
requestHandler();
|
requestHandler();
|
||||||
|
|
|
@ -127,6 +127,7 @@ namespace Mist{
|
||||||
std::set<size_t> getSupportedTracks(const std::string &type = "") const;
|
std::set<size_t> getSupportedTracks(const std::string &type = "") const;
|
||||||
|
|
||||||
inline virtual bool keepGoing(){return config->is_active && myConn;}
|
inline virtual bool keepGoing(){return config->is_active && myConn;}
|
||||||
|
virtual void idleTime(uint64_t ms){Util::sleep(ms);}
|
||||||
|
|
||||||
Comms::Connections statComm;
|
Comms::Connections statComm;
|
||||||
bool isBlocking; ///< If true, indicates that myConn is blocking.
|
bool isBlocking; ///< If true, indicates that myConn is blocking.
|
||||||
|
|
|
@ -306,7 +306,7 @@ namespace Mist{
|
||||||
|
|
||||||
void OutWebRTC::requestHandler(){
|
void OutWebRTC::requestHandler(){
|
||||||
if (noSignalling){
|
if (noSignalling){
|
||||||
if (!parseData){Util::sleep(500);}
|
if (!parseData){udp.sendPaced(10000);}
|
||||||
//After 10s of no packets, abort
|
//After 10s of no packets, abort
|
||||||
if (Util::bootMS() > lastRecv + 10000){
|
if (Util::bootMS() > lastRecv + 10000){
|
||||||
Util::logExitReason("received no data for 10+ seconds");
|
Util::logExitReason("received no data for 10+ seconds");
|
||||||
|
@ -1155,7 +1155,7 @@ namespace Mist{
|
||||||
void OutWebRTC::handleWebRTCInputOutputFromThread(){
|
void OutWebRTC::handleWebRTCInputOutputFromThread(){
|
||||||
udp.allocateDestination();
|
udp.allocateDestination();
|
||||||
while (keepGoing()){
|
while (keepGoing()){
|
||||||
if (!handleWebRTCInputOutput()){Util::sleep(20);}
|
if (!handleWebRTCInputOutput()){Util::sleep(10);}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1291,7 +1291,7 @@ namespace Mist{
|
||||||
stun_writer.writeFingerprint();
|
stun_writer.writeFingerprint();
|
||||||
stun_writer.end();
|
stun_writer.end();
|
||||||
|
|
||||||
udp.SendNow((const char *)stun_writer.getBufferPtr(), stun_writer.getBufferSize());
|
udp.sendPaced((const char *)stun_writer.getBufferPtr(), stun_writer.getBufferSize());
|
||||||
myConn.addUp(stun_writer.getBufferSize());
|
myConn.addUp(stun_writer.getBufferSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1336,7 +1336,7 @@ namespace Mist{
|
||||||
HIGH_MSG("Could not answer NACK for %" PRIu32 " #%" PRIu16 ": packet not buffered", pSSRC, seq);
|
HIGH_MSG("Could not answer NACK for %" PRIu32 " #%" PRIu16 ": packet not buffered", pSSRC, seq);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
udp.SendNow(nb.getData(seq), nb.getSize(seq));
|
udp.sendPaced(nb.getData(seq), nb.getSize(seq));
|
||||||
myConn.addUp(nb.getSize(seq));
|
myConn.addUp(nb.getSize(seq));
|
||||||
HIGH_MSG("Answered NACK for %" PRIu32 " #%" PRIu16, pSSRC, seq);
|
HIGH_MSG("Answered NACK for %" PRIu32 " #%" PRIu16, pSSRC, seq);
|
||||||
}
|
}
|
||||||
|
@ -1524,7 +1524,7 @@ namespace Mist{
|
||||||
/* ------------------------------------------------ */
|
/* ------------------------------------------------ */
|
||||||
|
|
||||||
int OutWebRTC::onDTLSHandshakeWantsToWrite(const uint8_t *data, int *nbytes){
|
int OutWebRTC::onDTLSHandshakeWantsToWrite(const uint8_t *data, int *nbytes){
|
||||||
udp.SendNow((const char *)data, (size_t)*nbytes);
|
udp.sendPaced((const char *)data, (size_t)*nbytes);
|
||||||
myConn.addUp(*nbytes);
|
myConn.addUp(*nbytes);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1619,7 +1619,7 @@ namespace Mist{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
udp.SendNow(rtpOutBuffer, (size_t)protectedSize);
|
udp.sendPaced(rtpOutBuffer, (size_t)protectedSize);
|
||||||
|
|
||||||
RTP::Packet tmpPkt(rtpOutBuffer, protectedSize);
|
RTP::Packet tmpPkt(rtpOutBuffer, protectedSize);
|
||||||
uint32_t pSSRC = tmpPkt.getSSRC();
|
uint32_t pSSRC = tmpPkt.getSSRC();
|
||||||
|
@ -1658,7 +1658,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
udp.SendNow(rtpOutBuffer, rtcpPacketSize);
|
udp.sendPaced(rtpOutBuffer, rtcpPacketSize);
|
||||||
myConn.addUp(rtcpPacketSize);
|
myConn.addUp(rtcpPacketSize);
|
||||||
|
|
||||||
if (volkswagenMode){
|
if (volkswagenMode){
|
||||||
|
@ -1681,7 +1681,7 @@ namespace Mist{
|
||||||
// first make sure that we complete the DTLS handshake.
|
// first make sure that we complete the DTLS handshake.
|
||||||
if(doDTLS){
|
if(doDTLS){
|
||||||
while (keepGoing() && !dtlsHandshake.hasKeyingMaterial()){
|
while (keepGoing() && !dtlsHandshake.hasKeyingMaterial()){
|
||||||
if (!handleWebRTCInputOutput()){Util::sleep(10);}
|
if (!handleWebRTCInputOutput()){udp.sendPaced(10000);}
|
||||||
if (lastRecv < Util::bootMS() - 10000){
|
if (lastRecv < Util::bootMS() - 10000){
|
||||||
WARN_MSG("Killing idle connection in handshake phase");
|
WARN_MSG("Killing idle connection in handshake phase");
|
||||||
onFail("idle connection in handshake phase", false);
|
onFail("idle connection in handshake phase", false);
|
||||||
|
@ -1901,7 +1901,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
udp.SendNow((const char *)&buffer[0], buffer_size_in_bytes);
|
udp.sendPaced((const char *)&buffer[0], buffer_size_in_bytes);
|
||||||
myConn.addUp(buffer_size_in_bytes);
|
myConn.addUp(buffer_size_in_bytes);
|
||||||
|
|
||||||
if (volkswagenMode){
|
if (volkswagenMode){
|
||||||
|
@ -1942,7 +1942,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
udp.SendNow((const char *)&buffer[0], buffer_size_in_bytes);
|
udp.sendPaced((const char *)&buffer[0], buffer_size_in_bytes);
|
||||||
myConn.addUp(buffer_size_in_bytes);
|
myConn.addUp(buffer_size_in_bytes);
|
||||||
|
|
||||||
if (volkswagenMode){
|
if (volkswagenMode){
|
||||||
|
@ -1993,7 +1993,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
udp.SendNow((const char *)&buffer[0], buffer_size_in_bytes);
|
udp.sendPaced((const char *)&buffer[0], buffer_size_in_bytes);
|
||||||
myConn.addUp(buffer_size_in_bytes);
|
myConn.addUp(buffer_size_in_bytes);
|
||||||
|
|
||||||
if (volkswagenMode){
|
if (volkswagenMode){
|
||||||
|
|
|
@ -148,6 +148,8 @@ namespace Mist{
|
||||||
virtual void connStats(uint64_t now, Comms::Connections &statComm);
|
virtual void connStats(uint64_t now, Comms::Connections &statComm);
|
||||||
inline virtual bool keepGoing(){return config->is_active && (noSignalling || myConn);}
|
inline virtual bool keepGoing(){return config->is_active && (noSignalling || myConn);}
|
||||||
virtual void requestHandler();
|
virtual void requestHandler();
|
||||||
|
protected:
|
||||||
|
virtual void idleTime(uint64_t ms){udp.sendPaced(ms*1000);}
|
||||||
private:
|
private:
|
||||||
bool noSignalling;
|
bool noSignalling;
|
||||||
uint64_t lastRecv;
|
uint64_t lastRecv;
|
||||||
|
|
Loading…
Add table
Reference in a new issue