Improved RTP NACK handling and dropped packet handling

This commit is contained in:
Thulinma 2020-06-01 23:15:27 +02:00
parent bd9ae56532
commit 70b0f94552
6 changed files with 273 additions and 120 deletions

View file

@ -12,6 +12,8 @@
namespace RTP{
double Packet::startRTCP = 0;
unsigned int MAX_SEND = 1500 - 28;
unsigned int PACKET_REORDER_WAIT = 5;
unsigned int PACKET_DROP_TIMEOUT = 30;
unsigned int Packet::getHsize() const{
unsigned int r = 12 + 4 * getContribCount();
@ -474,15 +476,16 @@ namespace RTP{
Sorter::Sorter(uint64_t trackId, void (*cb)(const uint64_t track, const Packet &p)){
packTrack = trackId;
rtpSeq = 0;
rtpWSeq = 0;
lostTotal = 0;
lostCurrent = 0;
packTotal = 0;
packCurrent = 0;
callback = cb;
}
bool Sorter::wantSeq(uint16_t seq) const{
return !rtpSeq || !(seq < rtpSeq || seq > (rtpSeq + 500) || packBuffer.count(seq));
first = true;
preBuffer = true;
lastBootMS = 0;
lastNTP = 0;
}
void Sorter::setCallback(uint64_t track, void (*cb)(const uint64_t track, const Packet &p)){
@ -497,57 +500,76 @@ namespace RTP{
/// Automatically sorts them, waiting when packets come in slow or not at all.
/// Calls the callback with packets in sorted order, whenever it becomes possible to do so.
void Sorter::addPacket(const Packet &pack){
if (!rtpSeq){rtpSeq = pack.getSequence();}
// packet is very early - assume dropped after 150 packets
while ((int16_t)(rtpSeq - ((uint16_t)pack.getSequence())) < -150){
WARN_MSG("Giving up on packet %u", rtpSeq);
++rtpSeq;
++lostTotal;
++lostCurrent;
++packTotal;
++packCurrent;
// send any buffered packets we may have
while (packBuffer.count(rtpSeq)){
outPacket(packTrack, packBuffer[rtpSeq]);
packBuffer.erase(rtpSeq);
INFO_MSG("Sent packet %u, now %zu in buffer", rtpSeq, packBuffer.size());
uint16_t pSNo = pack.getSequence();
if (first){
rtpWSeq = pSNo;
rtpSeq = pSNo - 5;
first = false;
}
if (preBuffer){
//If we've buffered the first 5 packets, assume we have the first one known
if (packBuffer.size() >= 5){
preBuffer = false;
rtpSeq = packBuffer.begin()->first;
rtpWSeq = rtpSeq;
}
}else{
// packet is very early - assume dropped after PACKET_DROP_TIMEOUT packets
while ((int16_t)(rtpSeq - pSNo) < -(int)PACKET_DROP_TIMEOUT){
WARN_MSG("Giving up on packet %u", rtpSeq);
++rtpSeq;
++lostTotal;
++lostCurrent;
++packTotal;
++packCurrent;
}
}
//Update wanted counter if we passed it (1 of 2)
if ((int16_t)(rtpWSeq - rtpSeq) < 0){rtpWSeq = rtpSeq;}
// packet is somewhat early - ask for packet after PACKET_REORDER_WAIT packets
while ((int16_t)(rtpWSeq - pSNo) < -(int)PACKET_REORDER_WAIT){
//Only wanted if we don't already have it
if (!packBuffer.count(rtpWSeq)){
wantedSeqs.insert(rtpWSeq);
}
++rtpWSeq;
}
// send any buffered packets we may have
uint16_t prertpSeq = rtpSeq;
while (packBuffer.count(rtpSeq)){
outPacket(packTrack, packBuffer[rtpSeq]);
packBuffer.erase(rtpSeq);
INFO_MSG("Sent packet %u, now %zu in buffer", rtpSeq, packBuffer.size());
++rtpSeq;
++packTotal;
++packCurrent;
}
if (prertpSeq != rtpSeq){
INFO_MSG("Sent packets %" PRIu16 "-%" PRIu16 ", now %zu in buffer", prertpSeq, rtpSeq, packBuffer.size());
}
// packet is slightly early - buffer it
if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) < 0){
if ((int16_t)(rtpSeq - pSNo) < 0){
HIGH_MSG("Buffering early packet #%u->%u", rtpSeq, pack.getSequence());
packBuffer[pack.getSequence()] = pack;
}
// packet is late
if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) > 0){
if ((int16_t)(rtpSeq - pSNo) > 0){
// negative difference?
--lostTotal;
--lostCurrent;
++packTotal;
++packCurrent;
WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)",
(int16_t)(rtpSeq - (uint16_t)pack.getSequence()));
return;
//--lostTotal;
//--lostCurrent;
//++packTotal;
//++packCurrent;
//WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", (int16_t)(rtpSeq - pSNo));
//return;
}
// packet is in order
if (rtpSeq == pack.getSequence()){
if (rtpSeq == pSNo){
outPacket(packTrack, pack);
++rtpSeq;
++packTotal;
++packCurrent;
}
//Update wanted counter if we passed it (2 of 2)
if ((int16_t)(rtpWSeq - rtpSeq) < 0){rtpWSeq = rtpSeq;}
}
toDTSC::toDTSC(){

View file

@ -26,6 +26,8 @@ namespace SDP{
namespace RTP{
extern uint32_t MAX_SEND;
extern unsigned int PACKET_REORDER_WAIT;
extern unsigned int PACKET_DROP_TIMEOUT;
/// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP
/// mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in
@ -86,7 +88,6 @@ namespace RTP{
class Sorter{
public:
Sorter(uint64_t trackId = 0, void (*callback)(const uint64_t track, const Packet &p) = 0);
bool wantSeq(uint16_t seq) const;
void addPacket(const char *dat, unsigned int len);
void addPacket(const Packet &pack);
// By default, calls the callback function, if set.
@ -95,9 +96,14 @@ namespace RTP{
}
void setCallback(uint64_t track, void (*callback)(const uint64_t track, const Packet &p));
uint16_t rtpSeq;
uint16_t rtpWSeq;
bool first;
bool preBuffer;
int32_t lostTotal, lostCurrent;
uint32_t packTotal, packCurrent;
std::set<uint16_t> wantedSeqs;
uint32_t lastNTP; ///< Middle 32 bits of last Sender Report NTP timestamp
uint64_t lastBootMS; ///< bootMS time of last Sender Report
private:
uint64_t packTrack;
std::map<uint16_t, Packet> packBuffer;

View file

@ -531,7 +531,7 @@ namespace RTP{
}
void FECPacket::sendRTCP_RR(RTP::FECSorter &sorter, uint32_t mySSRC, uint32_t theirSSRC, void *userData,
void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel)){
void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel), uint32_t jitter){
char *rtcpData = (char *)malloc(32);
if (!rtcpData){
FAIL_MSG("Could not allocate 32 bytes. Something is seriously messed up.");
@ -547,9 +547,13 @@ namespace RTP{
Bit::htob24(rtcpData + 13, sorter.lostTotal); // cumulative packets lost since start
Bit::htobl(rtcpData + 16,
sorter.rtpSeq | (sorter.packTotal & 0xFFFF0000ul)); // highest sequence received
Bit::htobl(rtcpData + 20, 0); /// \TODO jitter (diff in timestamp vs packet arrival)
Bit::htobl(rtcpData + 24, 0); /// \TODO last SR (middle 32 bits of last SR or zero)
Bit::htobl(rtcpData + 28, 0); /// \TODO delay since last SR in 2b seconds + 2b fraction
Bit::htobl(rtcpData + 20, jitter); // jitter
Bit::htobl(rtcpData + 24, sorter.lastNTP); // last SR NTP time (middle 32 bits)
if (sorter.lastBootMS){
Bit::htobl(rtcpData + 28, (Util::bootMS() - sorter.lastBootMS) * 65.536); // delay since last SR in 1/65536th of a second
}else{
Bit::htobl(rtcpData + 28, 0); // no delay since last SR yet
}
callBack(userData, rtcpData, 32, 0);
sorter.lostCurrent = 0;
sorter.packCurrent = 0;

View file

@ -86,7 +86,7 @@ namespace RTP{
class FECPacket : public Packet{
public:
void sendRTCP_RR(RTP::FECSorter &sorter, uint32_t mySSRC, uint32_t theirSSRC, void *userData,
void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel));
void callBack(void *userData, const char *payload, size_t nbytes, uint8_t channel), uint32_t jitter = 0);
};
}// namespace RTP