RTP rework: added sorter class, updated RTSP input/output/analyser to be compatible with it
This commit is contained in:
parent
f926ceab0d
commit
e3886c8acf
9 changed files with 116 additions and 127 deletions
|
@ -95,7 +95,7 @@ bool AnalyserRTSP::parsePacket(){
|
|||
DETAIL_MED("Received packet for unknown track number on channel %u", chan);
|
||||
}
|
||||
if (trackNo){
|
||||
sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();
|
||||
sdpState.tracks[trackNo].sorter.rtpSeq = pkt.getSequence();
|
||||
}
|
||||
|
||||
if (detail >= 10){
|
||||
|
|
|
@ -3,9 +3,8 @@
|
|||
Mist::InputRTSP *classPointer = 0;
|
||||
Socket::Connection *mainConn = 0;
|
||||
|
||||
void incomingPacket(const DTSC::Packet &pkt){
|
||||
classPointer->incoming(pkt);
|
||||
}
|
||||
void incomingPacket(const DTSC::Packet &pkt){classPointer->incoming(pkt);}
|
||||
void insertRTP(const uint64_t track, const RTP::Packet &p){classPointer->incomingRTP(track, p);}
|
||||
|
||||
/// Function used to send RTP packets over UDP
|
||||
///\param socket A UDP Connection pointer, sent as a void*, to keep portability.
|
||||
|
@ -18,6 +17,8 @@ void sendUDP(void *socket, char *data, unsigned int len, unsigned int channel){
|
|||
}
|
||||
|
||||
namespace Mist{
|
||||
void InputRTSP::incomingRTP(const uint64_t track, const RTP::Packet &p){sdpState.handleIncomingRTP(track, p);}
|
||||
|
||||
InputRTSP::InputRTSP(Util::Config *cfg) : Input(cfg){
|
||||
TCPmode = true;
|
||||
sdpState.myMeta = &myMeta;
|
||||
|
@ -296,7 +297,7 @@ namespace Mist{
|
|||
if (!trackNo && (chan % 2) != 1){
|
||||
WARN_MSG("Received packet for unknown track number on channel %u", chan);
|
||||
}
|
||||
if (trackNo){sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();}
|
||||
if (trackNo){sdpState.tracks[trackNo].sorter.rtpSeq = pkt.getSequence();}
|
||||
|
||||
sdpState.handleIncomingRTP(trackNo, pkt);
|
||||
|
||||
|
@ -320,59 +321,7 @@ namespace Mist{
|
|||
// continue;
|
||||
//}
|
||||
tcpCon.addDown(s.data_len);
|
||||
RTP::Packet pack(s.data, s.data_len);
|
||||
if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();}
|
||||
// packet is very early - assume dropped after 30 packets
|
||||
while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -30){
|
||||
WARN_MSG("Giving up on packet %u", it->second.rtpSeq);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.lostTotal);
|
||||
++(it->second.lostCurrent);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
// send any buffered packets we may have
|
||||
while (it->second.packBuffer.count(it->second.rtpSeq)){
|
||||
sdpState.handleIncomingRTP(it->first, pack);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
}
|
||||
}
|
||||
// send any buffered packets we may have
|
||||
while (it->second.packBuffer.count(it->second.rtpSeq)){
|
||||
sdpState.handleIncomingRTP(it->first, pack);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
}
|
||||
// packet is slightly early - buffer it
|
||||
if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){
|
||||
INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence());
|
||||
it->second.packBuffer[pack.getSequence()] = pack;
|
||||
}
|
||||
// packet is late
|
||||
if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){
|
||||
// negative difference?
|
||||
--(it->second.lostTotal);
|
||||
--(it->second.lostCurrent);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)",
|
||||
(int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())));
|
||||
return false;
|
||||
}
|
||||
// packet is in order
|
||||
if (it->second.rtpSeq == pack.getSequence()){
|
||||
sdpState.handleIncomingRTP(it->first, pack);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
if (!it->second.theirSSRC){it->second.theirSSRC = pack.getSSRC();}
|
||||
}
|
||||
}
|
||||
if (Util::epoch() / 5 != it->second.rtcpSent){
|
||||
it->second.rtcpSent = Util::epoch() / 5;
|
||||
it->second.pack.sendRTCP_RR(connectedAt, it->second, it->first, myMeta, sendUDP);
|
||||
it->second.sorter.addPacket(s.data, s.data_len);
|
||||
}
|
||||
}
|
||||
return r;
|
||||
|
|
|
@ -14,6 +14,7 @@ namespace Mist{
|
|||
InputRTSP(Util::Config *cfg);
|
||||
bool needsLock(){return false;}
|
||||
void incoming(const DTSC::Packet &pkt);
|
||||
void incomingRTP(const uint64_t track, const RTP::Packet &p);
|
||||
|
||||
protected:
|
||||
// Private Functions
|
||||
|
|
|
@ -14,8 +14,9 @@ namespace Mist{
|
|||
Socket::Connection *mainConn = 0;
|
||||
OutRTSP *classPointer = 0;
|
||||
|
||||
/// Helper function for passing packets into the OutRTSP class
|
||||
/// Helper functions for passing packets into the OutRTSP class
|
||||
void insertPacket(const DTSC::Packet &pkt){classPointer->incomingPacket(pkt);}
|
||||
void insertRTP(const uint64_t track, const RTP::Packet &p){classPointer->incomingRTP(track, p);}
|
||||
|
||||
/// Takes incoming packets and buffers them.
|
||||
void OutRTSP::incomingPacket(const DTSC::Packet &pkt){
|
||||
|
@ -37,6 +38,7 @@ namespace Mist{
|
|||
bufferLivePacket(newPkt);
|
||||
//bufferLivePacket(DTSC::RetimedPacket(pkt.getTime() + packetOffset, pkt));
|
||||
}
|
||||
void OutRTSP::incomingRTP(const uint64_t track, const RTP::Packet &p){sdpState.handleIncomingRTP(track, p);}
|
||||
|
||||
OutRTSP::OutRTSP(Socket::Connection &myConn) : Output(myConn){
|
||||
connectedAt = Util::epoch() + 2208988800ll;
|
||||
|
@ -410,8 +412,8 @@ namespace Mist{
|
|||
uint32_t trackNo = sdpState.getTrackNoForChannel(tcpHead.data()[1]);
|
||||
if (trackNo && isPushing()){
|
||||
RTP::Packet pkt(tcpPacket.data() + 4, len);
|
||||
sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();
|
||||
sdpState.handleIncomingRTP(trackNo, pkt);
|
||||
sdpState.tracks[trackNo].sorter.rtpSeq = pkt.getSequence();
|
||||
incomingRTP(trackNo, pkt);
|
||||
}
|
||||
// attempt to read more packets
|
||||
return handleTCP();
|
||||
|
@ -423,6 +425,7 @@ namespace Mist{
|
|||
for (std::map<uint32_t, SDP::Track>::iterator it = sdpState.tracks.begin();
|
||||
it != sdpState.tracks.end(); ++it){
|
||||
Socket::UDPConnection &s = it->second.data;
|
||||
it->second.sorter.setCallback(it->first, insertRTP);
|
||||
while (s.Receive()){
|
||||
if (s.getDestPort() != it->second.cPortA && checkPort){
|
||||
// wrong sending port, ignore packet
|
||||
|
@ -431,56 +434,10 @@ namespace Mist{
|
|||
lastRecv = Util::epoch(); // prevent disconnect of idle TCP connection when using UDP
|
||||
myConn.addDown(s.data_len);
|
||||
RTP::Packet pack(s.data, s.data_len);
|
||||
if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();}
|
||||
// packet is very early - assume dropped after 30 packets
|
||||
while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -30){
|
||||
WARN_MSG("Giving up on packet %u", it->second.rtpSeq);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.lostTotal);
|
||||
++(it->second.lostCurrent);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
// send any buffered packets we may have
|
||||
while (it->second.packBuffer.count(it->second.rtpSeq)){
|
||||
sdpState.handleIncomingRTP(it->first, pack);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
}
|
||||
}
|
||||
// send any buffered packets we may have
|
||||
while (it->second.packBuffer.count(it->second.rtpSeq)){
|
||||
sdpState.handleIncomingRTP(it->first, pack);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
}
|
||||
// packet is slightly early - buffer it
|
||||
if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){
|
||||
INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence());
|
||||
it->second.packBuffer[pack.getSequence()] = pack;
|
||||
}
|
||||
// packet is late
|
||||
if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){
|
||||
// negative difference?
|
||||
--(it->second.lostTotal);
|
||||
--(it->second.lostCurrent);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)",
|
||||
(int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())));
|
||||
return;
|
||||
}
|
||||
// packet is in order
|
||||
if (it->second.rtpSeq == pack.getSequence()){
|
||||
sdpState.handleIncomingRTP(it->first, pack);
|
||||
++(it->second.rtpSeq);
|
||||
++(it->second.packTotal);
|
||||
++(it->second.packCurrent);
|
||||
if (!it->second.theirSSRC){
|
||||
it->second.theirSSRC = pack.getSSRC();
|
||||
}
|
||||
if (!it->second.theirSSRC){
|
||||
it->second.theirSSRC = pack.getSSRC();
|
||||
}
|
||||
it->second.sorter.addPacket(pack);
|
||||
}
|
||||
if (Util::epoch() / 5 != it->second.rtcpSent){
|
||||
it->second.rtcpSent = Util::epoch() / 5;
|
||||
|
|
|
@ -17,6 +17,7 @@ namespace Mist{
|
|||
void requestHandler();
|
||||
bool onFinish();
|
||||
void incomingPacket(const DTSC::Packet &pkt);
|
||||
void incomingRTP(const uint64_t track, const RTP::Packet &p);
|
||||
|
||||
private:
|
||||
long long connectedAt; ///< The timestamp the connection was made, as reference point for RTCP
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue