RTP rewrite using generic RTP::toDTSC class

This commit is contained in:
Thulinma 2018-06-08 22:32:26 +02:00
parent 9bcd365361
commit 7a03d3e96c
4 changed files with 904 additions and 510 deletions

View file

@ -1,10 +1,12 @@
#include "rtp.h"
#include "adts.h"
#include "bitfields.h"
#include "defines.h"
#include "encode.h"
#include "timing.h"
#include "bitfields.h"
#include "h264.h"
#include "mpeg.h"
#include "sdp.h"
#include "timing.h"
#include <arpa/inet.h>
namespace RTP{
@ -13,7 +15,7 @@ namespace RTP{
unsigned int Packet::getHsize() const{return 12 + 4 * getContribCount();}
unsigned int Packet::getPayloadSize() const{return datalen - getHsize();}
unsigned int Packet::getPayloadSize() const{return maxDataLen - getHsize();}
char *Packet::getPayload() const{return data + getHsize();}
@ -31,13 +33,13 @@ namespace RTP{
unsigned int Packet::getSequence() const{return (((((unsigned int)data[2]) << 8) + data[3]));}
unsigned int Packet::getTimeStamp() const{return ntohl(*((unsigned int *)(data + 4)));}
uint32_t Packet::getTimeStamp() const{return Bit::btohl(data + 4);}
unsigned int Packet::getSSRC() const{return ntohl(*((unsigned int *)(data + 8)));}
char *Packet::getData(){return data + 8 + 4 * getContribCount() + getExtension();}
void Packet::setTimestamp(unsigned int t){*((unsigned int *)(data + 4)) = htonl(t);}
void Packet::setTimestamp(uint32_t t){Bit::htobl(data+4, t);}
void Packet::setSequence(unsigned int seq){*((short *)(data + 2)) = htons(seq);}
@ -46,10 +48,17 @@ namespace RTP{
void Packet::increaseSequence(){*((short *)(data + 2)) = htons(getSequence() + 1);}
void Packet::sendH264(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel){
const char *payload, unsigned int payloadlen, unsigned int channel, bool lastOfAccesUnit){
if ((payload[0] & 0x1F) == 12){return;}
/// \todo This function probably belongs in DMS somewhere.
if (payloadlen + getHsize() + 2 <= maxDataLen){
if (lastOfAccesUnit){
data[1] |= 0x80; // setting the RTP marker bit to 1
}
uint8_t nal_type = (payload[0] & 0x1F);
if (nal_type < 1 || nal_type > 5){
data[1] &= ~0x80; // but not for non-vlc types
}
memcpy(data + getHsize(), payload, payloadlen);
callBack(socket, data, getHsize() + payloadlen, channel);
sentPackets++;
@ -73,8 +82,10 @@ namespace RTP{
// last package
serByte |= 0x40;
sending = payloadlen - sent;
if (lastOfAccesUnit){
data[1] |= 0x80; // setting the RTP marker bit to 1
}
}
data[getHsize() + 1] = serByte;
memcpy(data + getHsize() + 2, payload + 1 + sent, sending);
callBack(socket, data, getHsize() + 2 + sending, channel);
@ -86,6 +97,42 @@ namespace RTP{
}
}
void Packet::sendVP8(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel){
bool isKeyframe = ((payload[0] & 0x01) == 0) ? true : false;
bool isStartOfPartition = true;
size_t chunkSize = MAX_SEND;
size_t bytesWritten = 0;
uint32_t headerSize = getHsize();
while (payloadlen > 0){
chunkSize = std::min<size_t>(1200, payloadlen);
payloadlen -= chunkSize;
data[1] =
(0 != payloadlen) ? (data[1] & 0x7F) : (data[1] | 0x80); // marker bit, 1 for last chunk.
data[headerSize] = 0x00; // reset
data[headerSize] |=
(isStartOfPartition) ? 0x10 : 0x00; // first chunk is always start of a partition.
data[headerSize] |=
(isKeyframe)
? 0x00
: 0x20; // non-reference frame. 0 = frame is needed, 1 = frame can be disgarded.
memcpy(data + headerSize + 1, payload + bytesWritten, chunkSize);
callBack(socket, data, headerSize + 1 + chunkSize, channel);
increaseSequence();
// INFO_MSG("chunk: %zu, sequence: %u", chunkSize, getSequence());
isStartOfPartition = false;
bytesWritten += chunkSize;
sentBytes += headerSize + 1 + chunkSize;
sentPackets++;
}
// WARN_MSG("KEYFRAME: %c", (isKeyframe) ? 'y' : 'n');
}
void Packet::sendH265(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel){
/// \todo This function probably belongs in DMS somewhere.
@ -139,9 +186,7 @@ namespace RTP{
mHead.clear();
mHead.setTempRef(mInfo.tempSeq);
mHead.setPictureType(mInfo.frameType);
if (mInfo.isHeader){
mHead.setSequence();
}
if (mInfo.isHeader){mHead.setSequence();}
mHead.setBegin();
mHead.setEnd();
memcpy(data + getHsize() + 4, payload, payloadlen);
@ -167,9 +212,7 @@ namespace RTP{
mHead.setTempRef(mInfo.tempSeq);
mHead.setPictureType(mInfo.frameType);
if (sent == 0){
if (mInfo.isHeader){
mHead.setSequence();
}
if (mInfo.isHeader){mHead.setSequence();}
mHead.setBegin();
}
memcpy(data + getHsize() + 4, payload + sent, sending);
@ -189,11 +232,15 @@ namespace RTP{
unsigned long sent = 0;
while (sent < payloadlen){
unsigned long nalSize = ntohl(*((unsigned long *)(payload + sent)));
sendH264(socket, callBack, payload + sent + 4, nalSize, channel);
sendH264(socket, callBack, payload + sent + 4, nalSize, channel, (sent + nalSize + 4) >= payloadlen ? true : false);
sent += nalSize + 4;
}
return;
}
if (codec == "VP8"){
sendVP8(socket, callBack, payload, payloadlen, channel);
return;
}
if (codec == "HEVC"){
unsigned long sent = 0;
while (sent < payloadlen){
@ -216,9 +263,7 @@ namespace RTP{
}else if (codec == "MP3" || codec == "MP2"){
// See RFC 2250, "MPEG Audio-specific header"
*((long *)(data + getHsize())) = 0; // this is MBZ and Frag_Offset, which are always 0
if (payload[0] != 0xFF){
FAIL_MSG("MP2/MP3 data does not start with header?");
}
if (payload[0] != 0xFF){FAIL_MSG("MP2/MP3 data does not start with header?");}
offsetLen = 4;
}else if (codec == "AC3"){
*((short *)(data + getHsize())) = htons(0x0001); // this is 6 bits MBZ, 2 bits FT = 0 = full
@ -292,9 +337,12 @@ namespace RTP{
Bit::htobs(rtcpData + 2, 7); // 7 4-byte words follow the header
Bit::htobl(rtcpData + 4, sTrk.mySSRC); // set receiver identifier
Bit::htobl(rtcpData + 8, sTrk.theirSSRC); // set source identifier
rtcpData[12] = (sTrk.sorter.lostCurrent * 255) / (sTrk.sorter.lostCurrent + sTrk.sorter.packCurrent); //fraction lost since prev RR
rtcpData[12] =
(sTrk.sorter.lostCurrent * 255) /
(sTrk.sorter.lostCurrent + sTrk.sorter.packCurrent); // fraction lost since prev RR
Bit::htob24(rtcpData + 13, sTrk.sorter.lostTotal); // cumulative packets lost since start
Bit::htobl(rtcpData+16, sTrk.sorter.rtpSeq | (sTrk.sorter.packTotal & 0xFFFF0000ul)); //highest sequence received
Bit::htobl(rtcpData + 16, sTrk.sorter.rtpSeq | (sTrk.sorter.packTotal &
0xFFFF0000ul)); // highest sequence received
Bit::htobl(rtcpData + 20, 0); /// \TODO jitter (diff in timestamp vs packet arrival)
Bit::htobl(rtcpData + 24, 0); /// \TODO last SR (middle 32 bits of last SR or zero)
Bit::htobl(rtcpData + 28, 0); /// \TODO delay since last SR in 2b seconds + 2b fraction
@ -350,6 +398,7 @@ namespace RTP{
}
sentBytes = o.sentBytes;
sentPackets = o.sentPackets;
}
void Packet::operator=(const Packet &o){
@ -380,24 +429,19 @@ namespace RTP{
}
Packet::Packet(const char *dat, unsigned int len){
managed = false;
datalen = len;
maxDataLen = len;
sentBytes = 0;
sentPackets = 0;
data = (char *)dat;
}
MPEGVideoHeader::MPEGVideoHeader(char *d){
data = d;
}
MPEGVideoHeader::MPEGVideoHeader(char *d){data = d;}
uint16_t MPEGVideoHeader::getTotalLen() const{
uint16_t ret = 4;
if (data[0] & 0x08){
ret += 4;
if (data[4] & 0x40){
ret += data[8];
}
if (data[4] & 0x40){ret += data[8];}
}
return ret;
}
@ -418,36 +462,31 @@ namespace RTP{
return ret.str();
}
void MPEGVideoHeader::clear(){
((uint32_t*)data)[0] = 0;
}
void MPEGVideoHeader::clear(){((uint32_t *)data)[0] = 0;}
void MPEGVideoHeader::setTempRef(uint16_t ref){
data[0] |= (ref >> 8) & 0x03;
data[1] = ref & 0xff;
}
void MPEGVideoHeader::setPictureType(uint8_t pType){
data[2] |= pType & 0x7;
}
void MPEGVideoHeader::setPictureType(uint8_t pType){data[2] |= pType & 0x7;}
void MPEGVideoHeader::setSequence(){
data[2] |= 0x20;
}
void MPEGVideoHeader::setBegin(){
data[2] |= 0x10;
}
void MPEGVideoHeader::setEnd(){
data[2] |= 0x8;
}
void MPEGVideoHeader::setSequence(){data[2] |= 0x20;}
void MPEGVideoHeader::setBegin(){data[2] |= 0x10;}
void MPEGVideoHeader::setEnd(){data[2] |= 0x8;}
Sorter::Sorter(){
packTrack = 0;
Sorter::Sorter(uint64_t trackId, void (*cb)(const uint64_t track, const Packet &p)){
packTrack = trackId;
rtpSeq = 0;
lostTotal = 0;
lostCurrent = 0;
packTotal = 0;
packCurrent = 0;
callback = cb;
}
bool Sorter::wantSeq(uint16_t seq) const{
return !rtpSeq || !(seq < rtpSeq || seq > (rtpSeq + 500) || packBuffer.count(seq));
}
void Sorter::setCallback(uint64_t track, void (*cb)(const uint64_t track, const Packet &p)){
@ -456,9 +495,7 @@ namespace RTP{
}
/// Calls addPacket(pack) with a newly constructed RTP::Packet from the given arguments.
void Sorter::addPacket(const char *dat, unsigned int len){
addPacket(RTP::Packet(dat, len));
}
void Sorter::addPacket(const char *dat, unsigned int len){addPacket(RTP::Packet(dat, len));}
/// Takes in new RTP packets for a single track.
/// Automatically sorts them, waiting when packets come in slow or not at all.
@ -475,7 +512,9 @@ namespace RTP{
++packCurrent;
// send any buffered packets we may have
while (packBuffer.count(rtpSeq)){
callback(packTrack, pack);
outPacket(packTrack, packBuffer[rtpSeq]);
packBuffer.erase(rtpSeq);
VERYHIGH_MSG("Sent packet %u, now %llu in buffer", rtpSeq, packBuffer.size());
++rtpSeq;
++packTotal;
++packCurrent;
@ -483,14 +522,16 @@ namespace RTP{
}
// send any buffered packets we may have
while (packBuffer.count(rtpSeq)){
callback(packTrack, pack);
outPacket(packTrack, packBuffer[rtpSeq]);
packBuffer.erase(rtpSeq);
VERYHIGH_MSG("Sent packet %u, now %llu in buffer", rtpSeq, packBuffer.size());
++rtpSeq;
++packTotal;
++packCurrent;
}
// packet is slightly early - buffer it
if ((int16_t)(rtpSeq - (uint16_t)pack.getSequence()) < 0){
INFO_MSG("Buffering early packet #%u->%u", rtpSeq, pack.getSequence());
HIGH_MSG("Buffering early packet #%u->%u", rtpSeq, pack.getSequence());
packBuffer[pack.getSequence()] = pack;
}
// packet is late
@ -506,14 +547,620 @@ namespace RTP{
}
// packet is in order
if (rtpSeq == pack.getSequence()){
callback(packTrack, pack);
outPacket(packTrack, pack);
++rtpSeq;
++packTotal;
++packCurrent;
}
}
toDTSC::toDTSC(){
wrapArounds = 0;
recentWrap = false;
cbPack = 0;
cbInit = 0;
multiplier = 1.0;
trackId = 0;
firstTime = 0;
packCount = 0;
lastSeq = 0;
vp8BufferHasKeyframe = false;
}
void toDTSC::setProperties(const uint64_t track, const std::string &c, const std::string &t,
const std::string &i, const double m){
trackId = track;
codec = c;
type = t;
init = i;
multiplier = m;
if (codec == "HEVC" && init.size()){
hevcInfo = h265::initData(init);
h265::metaInfo MI = hevcInfo.getMeta();
fps = MI.fps;
}
if (codec == "H264" && init.size()){
MP4::AVCC avccbox;
avccbox.setPayload(init);
spsData.assign(avccbox.getSPS(), avccbox.getSPSLen());
ppsData.assign(avccbox.getPPS(), avccbox.getPPSLen());
h264::sequenceParameterSet sps(spsData.data(), spsData.size());
h264::SPSMeta hMeta = sps.getCharacteristics();
fps = hMeta.fps;
}
}
void toDTSC::setProperties(const DTSC::Track &Trk){
double m = (double)Trk.rate / 1000.0;
if (Trk.type == "video" || Trk.codec == "MP2" || Trk.codec == "MP3"){m = 90.0;}
setProperties(Trk.trackID, Trk.codec, Trk.type, Trk.init, m);
}
void toDTSC::setCallbacks(void (*cbP)(const DTSC::Packet &pkt),
void (*cbI)(const uint64_t track, const std::string &initData)){
cbPack = cbP;
cbInit = cbI;
}
/// Adds an RTP packet to the converter, outputting DTSC packets and/or updating init data,
/// as-needed.
void toDTSC::addRTP(const RTP::Packet &pkt){
if (codec.empty()){
MEDIUM_MSG("Unknown codec - ignoring RTP packet.");
return;
}
// First calculate the timestamp of the packet, get the pointer and length to RTP payload.
// This part isn't codec-specific, so we do it before anything else.
int64_t pTime = pkt.getTimeStamp();
if (!firstTime){
firstTime = pTime + 1;
INFO_MSG("RTP timestamp rollover expected in " PRETTY_PRINT_TIME, PRETTY_ARG_TIME((0xFFFFFFFFul - firstTime) / multiplier / 1000));
}else{
if (prevTime > pTime && pTime < 0x40000000lu && prevTime > 0x80000000lu){
++wrapArounds;
recentWrap = true;
}
if (recentWrap){
if (pTime < 0x80000000lu && pTime > 0x40000000lu){recentWrap = false;}
if (pTime > 0x80000000lu){pTime -= 0xFFFFFFFFll;}
}
}
prevTime = pkt.getTimeStamp();
uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0xFFFFFFFFull*wrapArounds) / multiplier;
char *pl = pkt.getPayload();
uint32_t plSize = pkt.getPayloadSize();
bool missed = lastSeq != (pkt.getSequence() - 1);
lastSeq = pkt.getSequence();
INSANE_MSG("Received RTP packet for track %llu, time %llu -> %llu", trackId, pkt.getTimeStamp(), msTime);
// From here on, there is codec-specific parsing. We call handler functions for each codec,
// except for the trivial codecs.
if (codec == "H264"){
return handleH264(msTime, pl, plSize, missed, (pkt.getPadding() == 1) ? true : false);
}
if (codec == "AAC"){return handleAAC(msTime, pl, plSize);}
if (codec == "MP2" || codec == "MP3"){return handleMP2(msTime, pl, plSize);}
if (codec == "HEVC"){return handleHEVC(msTime, pl, plSize, missed);}
if (codec == "MPEG2"){return handleMPEG2(msTime, pl, plSize);}
if (codec == "VP8"){
return handleVP8(msTime, pl, plSize, missed, (pkt.getPadding() == 1) ? true : false);
}
// Trivial codecs just fill a packet with raw data and continue. Easy peasy, lemon squeezy.
if (codec == "ALAW" || codec == "opus" || codec == "PCM" || codec == "ULAW"){
DTSC::Packet nextPack;
nextPack.genericFill(msTime, 0, trackId, pl, plSize, 0, false);
outPacket(nextPack);
return;
}
// If we don't know how to handle this codec in RTP, print an error and ignore the packet.
FAIL_MSG("Unimplemented RTP reader for codec `%s`! Throwing away packet.", codec.c_str());
}
void toDTSC::handleAAC(uint64_t msTime, char *pl, uint32_t plSize){
// assume AAC packets are single AU units
/// \todo Support other input than single AU units
unsigned int headLen =
(Bit::btohs(pl) >> 3) + 2; // in bits, so /8, plus two for the prepended size
DTSC::Packet nextPack;
uint16_t samples = aac::AudSpecConf::samples(init);
uint32_t sampleOffset = 0;
uint32_t offset = 0;
uint32_t auSize = 0;
for (uint32_t i = 2; i < headLen; i += 2){
auSize = Bit::btohs(pl + i) >> 3; // only the upper 13 bits
nextPack.genericFill(msTime + sampleOffset / multiplier, 0, trackId, pl + headLen + offset,
std::min(auSize, plSize - headLen - offset), 0, false);
offset += auSize;
sampleOffset += samples;
outPacket(nextPack);
}
}
void toDTSC::handleMP2(uint64_t msTime, char *pl, uint32_t plSize){
if (plSize < 5){
WARN_MSG("Empty packet ignored!");
return;
}
DTSC::Packet nextPack;
nextPack.genericFill(msTime, 0, trackId, pl + 4, plSize - 4, 0, false);
outPacket(nextPack);
}
void toDTSC::handleMPEG2(uint64_t msTime, char *pl, uint32_t plSize){
if (plSize < 5){
WARN_MSG("Empty packet ignored!");
return;
}
///\TODO Merge packets with same timestamp together
HIGH_MSG("Received MPEG2 packet: %s", RTP::MPEGVideoHeader(pl).toString().c_str());
DTSC::Packet nextPack;
nextPack.genericFill(msTime, 0, trackId, pl + 4, plSize - 4, 0, false);
outPacket(nextPack);
}
void toDTSC::handleHEVC(uint64_t msTime, char *pl, uint32_t plSize, bool missed){
if (plSize < 2){
WARN_MSG("Empty packet ignored!");
return;
}
uint8_t nalType = (pl[0] & 0x7E) >> 1;
if (nalType == 48){
ERROR_MSG("AP not supported yet");
}else if (nalType == 49){
DONTEVEN_MSG("H265 Fragmentation Unit");
// No length yet? Check for start bit. Ignore rest.
if (!fuaBuffer.size() && (pl[2] & 0x80) == 0){
HIGH_MSG("Not start of a new FU - throwing away");
return;
}
if (fuaBuffer.size() && ((pl[2] & 0x80) || missed)){
WARN_MSG("H265 FU packet incompleted: %lu", fuaBuffer.size());
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
fuaBuffer[4] |= 0x80; // set error bit
handleHEVCSingle(msTime, fuaBuffer, fuaBuffer.size(),
h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
fuaBuffer.size() = 0;
return;
}
unsigned long len = plSize - 3; // ignore the three FU bytes in front
if (!fuaBuffer.size()){len += 6;}// six extra bytes for the first packet
if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;}
if (!fuaBuffer.size()){
memcpy(fuaBuffer + 6, pl + 3, plSize - 3);
// reconstruct first byte
fuaBuffer[4] = ((pl[2] & 0x3F) << 1) | (pl[0] & 0x81);
fuaBuffer[5] = pl[1];
}else{
memcpy(fuaBuffer + fuaBuffer.size(), pl + 3, plSize - 3);
}
fuaBuffer.size() += len;
if (pl[2] & 0x40){// last packet
VERYHIGH_MSG("H265 FU packet type %s (%u) completed: %lu",
h265::typeToStr((fuaBuffer[4] & 0x7E) >> 1),
(uint8_t)((fuaBuffer[4] & 0x7E) >> 1), fuaBuffer.size());
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
handleHEVCSingle(msTime, fuaBuffer, fuaBuffer.size(),
h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
fuaBuffer.size() = 0;
}
}else if (nalType == 50){
ERROR_MSG("PACI/TSCI not supported yet");
}else{
DONTEVEN_MSG("%s NAL unit (%u)", h265::typeToStr(nalType), nalType);
if (!packBuffer.allocate(plSize + 4)){return;}
Bit::htobl(packBuffer, plSize); // size-prepend
memcpy(packBuffer + 4, pl, plSize);
handleHEVCSingle(msTime, packBuffer, plSize + 4, h265::isKeyframe(packBuffer + 4, plSize));
}
}
void toDTSC::handleHEVCSingle(uint64_t ts, const char *buffer, const uint32_t len, bool isKey){
MEDIUM_MSG("H265: %llu@%llu, %lub%s", trackId, ts, len, isKey ? " (key)" : "");
// Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
// Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x7E) >> 1;
switch (nalType){
case 32: // VPS
case 33: // SPS
case 34: // PPS
hevcInfo.addUnit(buffer);
if (hevcInfo.haveRequired()){
std::string newInit = hevcInfo.generateHVCC();
if (newInit != init){
init = newInit;
outInit(trackId, init);
h265::metaInfo MI = hevcInfo.getMeta();
fps = MI.fps;
}
}
return;
default: // others, continue parsing
break;
}
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < packCount){packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - packCount) > 32){packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, packCount, (frameNo - packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, trackId, buffer, len, 0, isKey);
packCount++;
outPacket(nextPack);
}
/// Handles common H264 packets types, but not all.
/// Generalizes and converts them all to a data format ready for DTSC, then calls handleH264Single
/// for that data.
/// Prints a WARN-level message if packet type is unsupported.
/// \todo Support other H264 packets types?
void toDTSC::handleH264(uint64_t msTime, char *pl, uint32_t plSize, bool missed,
bool hasPadding){
if (!plSize){
WARN_MSG("Empty packet ignored!");
return;
}
uint8_t num_padding_bytes = 0;
if (hasPadding){
num_padding_bytes = pl[plSize - 1];
if (num_padding_bytes >= plSize){
WARN_MSG("Only padding data (%u / %u).", num_padding_bytes, plSize);
return;
}
}
if ((pl[0] & 0x1F) == 0){
WARN_MSG("H264 packet type null ignored");
return;
}
if ((pl[0] & 0x1F) < 24){
DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F));
if (!packBuffer.allocate(plSize + 4)){return;}
Bit::htobl(packBuffer, plSize); // size-prepend
memcpy(packBuffer + 4, pl, plSize);
handleH264Single(msTime, packBuffer, plSize + 4, h264::isKeyframe(packBuffer + 4, plSize));
return;
}
if ((pl[0] & 0x1F) == 24){
DONTEVEN_MSG("H264 STAP-A packet");
unsigned int pos = 1;
while (pos + 1 < plSize){
unsigned int pLen = Bit::btohs(pl + pos);
INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos + 2] & 0x1F));
if (packBuffer.allocate(4 + pLen)){
Bit::htobl(packBuffer, pLen); // size-prepend
memcpy(packBuffer + 4, pl + pos + 2, pLen);
handleH264Single(msTime, packBuffer, pLen + 4, h264::isKeyframe(pl + pos + 2, pLen));
}
pos += 2 + pLen;
}
return;
}
if ((pl[0] & 0x1F) == 28){
DONTEVEN_MSG("H264 FU-A packet");
// No length yet? Check for start bit. Ignore rest.
if (!fuaBuffer.size() && (pl[1] & 0x80) == 0){
HIGH_MSG("Not start of a new FU-A - throwing away");
return;
}
if (fuaBuffer.size() && ((pl[1] & 0x80) || missed)){
WARN_MSG("Ending unfinished FU-A");
INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaBuffer.size());
fuaBuffer.size() = 0;
return;
}
unsigned long len = plSize - 2; // ignore the two FU-A bytes in front
if (!fuaBuffer.size()){len += 5;}// five extra bytes for the first packet
if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;}
if (!fuaBuffer.size()){
memcpy(fuaBuffer + 4, pl + 1, plSize - 1);
// reconstruct first byte
fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pl[0] & 0xE0);
}else{
memcpy(fuaBuffer + fuaBuffer.size(), pl + 2, plSize - 2);
}
fuaBuffer.size() += len;
if (pl[1] & 0x40){// last packet
INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F),
fuaBuffer.size());
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
// attempt to detect multiple H264 packets, even though specs disallow it
handleH264Multi(msTime, fuaBuffer, fuaBuffer.size());
}else{
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
handleH264Single(msTime, fuaBuffer, fuaBuffer.size(),
h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
}
fuaBuffer.size() = 0;
}
return;
}
WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F));
}
void toDTSC::handleH264Single(uint64_t ts, const char *buffer, const uint32_t len, bool isKey){
MEDIUM_MSG("H264: %llu@%llu, %lub%s", trackId, ts, len, isKey ? " (key)" : "");
// Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
// Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x1F);
if (nalType == 9 && len < 20){return;}// ignore delimiter-only packets
switch (nalType){
case 6: // SEI
return;
case 7: // SPS
if (spsData.size() != len - 4 || memcmp(buffer + 4, spsData.data(), len - 4) != 0){
HIGH_MSG("Updated SPS from RTP data");
spsData.assign(buffer + 4, len - 4);
h264::sequenceParameterSet sps(spsData.data(), spsData.size());
h264::SPSMeta hMeta = sps.getCharacteristics();
fps = hMeta.fps;
MP4::AVCC avccBox;
avccBox.setVersion(1);
avccBox.setProfile(spsData[1]);
avccBox.setCompatibleProfiles(spsData[2]);
avccBox.setLevel(spsData[3]);
avccBox.setSPSCount(1);
avccBox.setSPS(spsData);
avccBox.setPPSCount(1);
avccBox.setPPS(ppsData);
std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize());
if (newInit != init){
init = newInit;
outInit(trackId, init);
}
}
return;
case 8: // PPS
if (ppsData.size() != len - 4 || memcmp(buffer + 4, ppsData.data(), len - 4) != 0){
HIGH_MSG("Updated PPS from RTP data");
ppsData.assign(buffer + 4, len - 4);
MP4::AVCC avccBox;
avccBox.setVersion(1);
avccBox.setProfile(spsData[1]);
avccBox.setCompatibleProfiles(spsData[2]);
avccBox.setLevel(spsData[3]);
avccBox.setSPSCount(1);
avccBox.setSPS(spsData);
avccBox.setPPSCount(1);
avccBox.setPPS(ppsData);
std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize());
if (newInit != init){
init = newInit;
outInit(trackId, init);
}
}
return;
case 5:{
// @todo add check if ppsData and spsData are not empty?
size_t needed_bytes = len + ppsData.size() + spsData.size() + 8;
static Util::ResizeablePointer tmp;
tmp.assign(0, 0);
char sizeBuffer[4];
Bit::htobl(sizeBuffer, spsData.size());
tmp.append(sizeBuffer, 4);
tmp.append(spsData.data(), spsData.size());
Bit::htobl(sizeBuffer, ppsData.size());
tmp.append(sizeBuffer, 4);
tmp.append(ppsData.data(), ppsData.size());
tmp.append(buffer, len);
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < packCount){packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - packCount) > 32){packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, packCount, (frameNo - packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already
// correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, trackId, tmp, tmp.size(), 0, isKey);
packCount++;
outPacket(nextPack);
return;
}
return;
default: // others, continue parsing
break;
}
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < packCount){packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - packCount) > 32){packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, packCount, (frameNo - packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, trackId, buffer, len, 0, isKey);
packCount++;
outPacket(nextPack);
}
/// Handles a single H264 packet, checking if others are appended at the end in Annex B format.
/// If so, splits them up and calls handleH264Single for each. If not, calls it only once for the
/// whole payload.
void toDTSC::handleH264Multi(uint64_t ts, char *buffer, const uint32_t len){
uint32_t lastStart = 0;
for (uint32_t i = 0; i < len - 4; ++i){
// search for start code
if (buffer[i] == 0 && buffer[i + 1] == 0 && buffer[i + 2] == 0 && buffer[i + 3] == 1){
// if found, handle a packet from the last start code up to this start code
Bit::htobl(buffer + lastStart, (i - lastStart - 1) - 4); // size-prepend
handleH264Single(ts, buffer + lastStart, (i - lastStart - 1),
h264::isKeyframe(buffer + lastStart + 4, i - lastStart - 5));
lastStart = i;
}
}
// Last packet (might be first, if no start codes found)
Bit::htobl(buffer + lastStart, (len - lastStart) - 4); // size-prepend
handleH264Single(ts, buffer + lastStart, (len - lastStart),
h264::isKeyframe(buffer + lastStart + 4, len - lastStart - 4));
}
void toDTSC::handleVP8(uint64_t msTime, const char *buffer, const uint32_t len, bool missed,
bool hasPadding){
// 1 byte is required but we assume that there some payload
// data too :P
if (len < 3){
FAIL_MSG("Received a VP8 RTP packet with invalid size.");
return;
}
// it may happen that we receive a packet with only padding
// data. (against the spec I think) Although `drno` from
// Mozilla told me these are probing packets and should be
// ignored.
uint8_t num_padding_bytes = 0;
if (hasPadding){
num_padding_bytes = buffer[len - 1];
if (num_padding_bytes >= len){
WARN_MSG("Only padding data (%u/%u)", num_padding_bytes, len);
return;
}
}
// parse the vp8 payload descriptor, https://tools.ietf.org/html/rfc7741#section-4.2
uint8_t extended_control_bits = (buffer[0] & 0x80) >> 7;
uint8_t start_of_partition = (buffer[0] & 0x10) >> 4;
uint8_t partition_index = (buffer[0] & 0x07);
uint32_t vp8_header_size = 1;
vp8_header_size += extended_control_bits;
if (extended_control_bits == 1){
uint8_t pictureid_present = (buffer[1] & 0x80) >> 7;
uint8_t tl0picidx_present = (buffer[1] & 0x40) >> 6;
uint8_t tid_present = (buffer[1] & 0x20) >> 5;
uint8_t keyidx_present = (buffer[1] & 0x10) >> 4;
uint8_t has_extended_pictureid = 0;
if (pictureid_present == 1){has_extended_pictureid = (buffer[2] & 0x80) > 7;}
vp8_header_size += pictureid_present;
vp8_header_size += tl0picidx_present;
vp8_header_size += ((tid_present == 1 || keyidx_present == 1)) ? 1 : 0;
vp8_header_size += has_extended_pictureid;
}
if (vp8_header_size > len){
FAIL_MSG("The vp8 header size exceeds the RTP packet size. Invalid size.");
return;
}
const char *vp8_payload_buffer = buffer + vp8_header_size;
uint32_t vp8_payload_size = len - vp8_header_size;
bool start_of_frame = (start_of_partition == 1) && (partition_index == 0);
if (hasPadding){
if (num_padding_bytes > vp8_payload_size){
FAIL_MSG("More padding bytes than payload bytes. Invalid.");
return;
}
vp8_payload_size -= num_padding_bytes;
if (vp8_payload_size == 0){
WARN_MSG("No payload data at all, only required VP8 header.");
return;
}
}
// when we have data in our buffer and the current packet is
// for a new frame started or we missed some data
// (e.g. only received the first partition of a frame) we will
// flush a new DTSC packet.
if (vp8FrameBuffer.size()){
//new frame and nothing missed? Send.
if (start_of_frame && !missed){
DTSC::Packet nextPack;
nextPack.genericFill(msTime, 0, trackId, vp8FrameBuffer, vp8FrameBuffer.size(), 0,
vp8BufferHasKeyframe);
packCount++;
outPacket(nextPack);
}
//Wipe the buffer clean if missed packets or we just sent data out.
if (start_of_frame || missed){
vp8FrameBuffer.assign(0, 0);
vp8BufferHasKeyframe = false;
}
}
// copy the data into the buffer. assign() will write the
// buffer from the start, append() appends the data to the
// end of the previous buffer.
if (vp8FrameBuffer.size() == 0){
if (!start_of_frame){
FAIL_MSG("Skipping packet; not start of partition (%u).", partition_index);
return;
}
if (!vp8FrameBuffer.assign(vp8_payload_buffer, vp8_payload_size)){
FAIL_MSG("Failed to assign vp8 buffer data.");
}
}else{
vp8FrameBuffer.append(vp8_payload_buffer, vp8_payload_size);
}
bool is_keyframe = (vp8_payload_buffer[0] & 0x01) == 0;
if (start_of_frame && is_keyframe){vp8BufferHasKeyframe = true;}
}
}// namespace RTP

View file

@ -1,9 +1,12 @@
#pragma once
#include "dtsc.h"
#include "h264.h"
#include "h265.h"
#include "json.h"
#include "mp4.h"
#include "mp4_generic.h"
#include "socket.h"
#include "util.h"
#include <algorithm>
#include <cstdio>
#include <deque>
@ -32,7 +35,6 @@ namespace RTP{
bool managed;
char *data; ///< The actual RTP packet that is being sent
uint32_t maxDataLen; ///< Amount of reserved bytes for the packet(s)
unsigned int datalen; ///<Size of rtp packet
int sentPackets;
int sentBytes; // Because ugly is beautiful
public:
@ -47,14 +49,16 @@ namespace RTP{
unsigned int getMarker() const;
unsigned int getPayloadType() const;
unsigned int getSequence() const;
unsigned int getTimeStamp() const;
uint32_t getTimeStamp() const;
void setSequence(unsigned int seq);
unsigned int getSSRC() const;
void setSSRC(unsigned long ssrc);
void setTimestamp(unsigned int t);
void setTimestamp(uint32_t t);
void increaseSequence();
void sendH264(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel, bool lastOfAccesUnit);
void sendVP8(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel);
void sendH265(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel);
@ -65,7 +69,8 @@ namespace RTP{
std::string codec);
void sendRTCP_SR(long long &connectedAt, void *socket, unsigned int tid, DTSC::Meta &metadata,
void callBack(void *, char *, unsigned int, unsigned int));
void sendRTCP_RR(long long &connectedAt, SDP::Track & sTrk, unsigned int tid, DTSC::Meta &metadata,
void sendRTCP_RR(long long &connectedAt, SDP::Track &sTrk, unsigned int tid,
DTSC::Meta &metadata,
void callBack(void *, char *, unsigned int, unsigned int));
Packet();
@ -76,20 +81,31 @@ namespace RTP{
~Packet();
Packet(const char *dat, unsigned int len);
char *getData();
char *ptr() const { return data; }
};
/// Sorts RTP packets, outputting them through a callback in correct order.
/// Also keeps track of statistics, which it expects to be read/reset externally (for now).
/// Optionally can be inherited from with the outPacket function overridden to not use a callback.
class Sorter{
public:
Sorter();
Sorter(uint64_t trackId = 0, void (*callback)(const uint64_t track, const Packet &p) = 0);
bool wantSeq(uint16_t seq) const;
void addPacket(const char *dat, unsigned int len);
void addPacket(const Packet &pack);
// By default, calls the callback function, if set.
virtual void outPacket(const uint64_t track, const Packet &p){
if (callback){callback(track, p);}
}
void setCallback(uint64_t track, void (*callback)(const uint64_t track, const Packet &p));
uint16_t rtpSeq;
int32_t lostTotal, lostCurrent;
uint32_t packTotal, packCurrent;
private:
uint64_t packTrack;
std::map<uint16_t, Packet> packBuffer;
std::map<uint16_t, Packet> packetHistory;
void (*callback)(const uint64_t track, const Packet &p);
};
@ -104,9 +120,66 @@ namespace RTP{
void setSequence();
void setBegin();
void setEnd();
private:
char *data;
};
/// Converts (sorted) RTP packets into DTSC packets.
/// Outputs DTSC packets through a callback function or overridden virtual function.
/// Updates init data through a callback function or overridden virtual function.
class toDTSC{
public:
toDTSC();
void setProperties(const uint64_t track, const std::string &codec, const std::string &type,
const std::string &init, const double multiplier);
void setProperties(const DTSC::Track &Trk);
void setCallbacks(void (*cbPack)(const DTSC::Packet &pkt),
void (*cbInit)(const uint64_t track, const std::string &initData));
void addRTP(const RTP::Packet &rPkt);
virtual void outPacket(const DTSC::Packet &pkt){
if (cbPack){cbPack(pkt);}
}
virtual void outInit(const uint64_t track, const std::string &initData){
if (cbInit){cbInit(track, initData);}
}
public:
uint64_t trackId;
double multiplier; ///< Multiplier to convert from millis to RTP time
std::string codec; ///< Codec of this track
std::string type; ///< Type of this track
std::string init; ///< Init data of this track
unsigned int lastSeq; ///< Last sequence number seen
uint64_t packCount; ///< Amount of DTSC packets outputted, for H264/HEVC
double fps; ///< Framerate, for H264, HEVC
uint32_t wrapArounds; ///< Counter for RTP timestamp wrapArounds
bool recentWrap; ///< True if a wraparound happened recently.
uint32_t prevTime;
uint64_t firstTime;
void (*cbPack)(const DTSC::Packet &pkt);
void (*cbInit)(const uint64_t track, const std::string &initData);
// Codec-specific handlers
void handleAAC(uint64_t msTime, char *pl, uint32_t plSize);
void handleMP2(uint64_t msTime, char *pl, uint32_t plSize);
void handleMPEG2(uint64_t msTime, char *pl, uint32_t plSize);
void handleHEVC(uint64_t msTime, char *pl, uint32_t plSize, bool missed);
void handleHEVCSingle(uint64_t ts, const char *buffer, const uint32_t len, bool isKey);
h265::initData hevcInfo; ///< For HEVC init parsing
Util::ResizeablePointer fuaBuffer; ///< For H264/HEVC FU-A packets
Util::ResizeablePointer packBuffer; ///< For H264/HEVC regular and STAP packets
void handleH264(uint64_t msTime, char *pl, uint32_t plSize, bool missed, bool hasPadding);
void handleH264Single(uint64_t ts, const char *buffer, const uint32_t len, bool isKey);
void handleH264Multi(uint64_t ts, char *buffer, const uint32_t len);
std::string spsData; ///< SPS for H264
std::string ppsData; ///< PPS for H264
void handleVP8(uint64_t msTime, const char *buffer, const uint32_t len, bool missed,
bool hasPadding);
Util::ResizeablePointer
vp8FrameBuffer; ///< Stores successive VP8 payload data. We always start with the first
///< partition; but we might be missing other partitions when they were
///< lost. (a partition is basically what's called a slice in H264).
bool vp8BufferHasKeyframe;
};
}// namespace RTP

View file

@ -8,6 +8,13 @@
#include "util.h"
namespace SDP{
static State *snglState = 0;
static void snglStateInitCallback(const uint64_t track, const std::string &initData){
snglState->updateInit(track, initData);
}
Track::Track(){
rtcpSent = 0;
channel = -1;
@ -54,15 +61,23 @@ namespace SDP{
"a=fmtp:97 packetization-mode=1;profile-level-id="
<< std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[1]
<< std::dec << "E0" << std::hex << std::setw(2) << std::setfill('0')
<< (int)trk.init.data()[3] << std::dec
<< ";"
"sprop-parameter-sets="
<< Encodings::Base64::encode(std::string(avccbox.getSPS(), avccbox.getSPSLen()))
<< ","
<< Encodings::Base64::encode(std::string(avccbox.getPPS(), avccbox.getPPSLen()))
<< "\r\n"
"a=framerate:"
<< ((double)trk.fpks) / 1000.0
<< (int)trk.init.data()[3] << std::dec << ";"
<< "sprop-parameter-sets=";
size_t count = avccbox.getSPSCount();
for (size_t i = 0; i < count; ++i){
mediaDesc << (i ? "," : "")
<< Encodings::Base64::encode(
std::string(avccbox.getSPS(i), avccbox.getSPSLen(i)));
}
mediaDesc << ",";
count = avccbox.getPPSCount();
for (size_t i = 0; i < count; ++i){
mediaDesc << (i ? "," : "")
<< Encodings::Base64::encode(
std::string(avccbox.getPPS(i), avccbox.getPPSLen(i)));
}
mediaDesc << "\r\n"
<< "a=framerate:" << ((double)trk.fpks) / 1000.0
<< "\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
@ -335,6 +350,12 @@ namespace SDP{
return rInfo.str();
}
State::State(){
incomingPacketCallback = 0;
myMeta = 0;
snglState = this;
}
void State::parseSDP(const std::string &sdp){
DONTEVEN_MSG("Parsing %llu-byte SDP", sdp.size());
std::stringstream ss(sdp);
@ -430,9 +451,11 @@ namespace SDP{
}
}
}
tConv[trackNo].setProperties(*thisTrack);
HIGH_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
continue;
}
if (nope){continue;}// ignore lines if we have no valid track
// RTP mapping
if (to.substr(0, 8) == "a=rtpmap"){
@ -487,6 +510,7 @@ namespace SDP{
if (!thisTrack->codec.size()){
ERROR_MSG("Unsupported RTP mapping: %s", mediaType.c_str());
}else{
tConv[trackNo].setProperties(*thisTrack);
HIGH_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
}
continue;
@ -576,6 +600,7 @@ namespace SDP{
Trk.width = MI.width;
Trk.height = MI.height;
Trk.fpks = RTrk.fpsMeta * 1000;
tConv[trackNo].setProperties(Trk);
}
/// Calculates H264 track metadata from vps, sps and pps data stored in tracks[trackNo]
@ -598,6 +623,7 @@ namespace SDP{
Trk.height = hMeta.height;
Trk.fpks = hMeta.fps * 1000;
Trk.init = std::string(avccBox.payload(), avccBox.payloadSize());
tConv[trackNo].setProperties(Trk);
}
uint32_t State::getTrackNoForChannel(uint8_t chan){
@ -666,136 +692,6 @@ namespace SDP{
return 0;
}
/// Handles a single H264 packet, checking if others are appended at the end in Annex B format.
/// If so, splits them up and calls h264Packet for each. If not, calls it only once for the whole
/// payload.
void State::h264MultiParse(uint64_t ts, const uint64_t track, char *buffer, const uint32_t len){
uint32_t lastStart = 0;
for (uint32_t i = 0; i < len - 4; ++i){
// search for start code
if (buffer[i] == 0 && buffer[i + 1] == 0 && buffer[i + 2] == 0 && buffer[i + 3] == 1){
// if found, handle a packet from the last start code up to this start code
Bit::htobl(buffer + lastStart, (i - lastStart - 1) - 4); // size-prepend
h264Packet(ts, track, buffer + lastStart, (i - lastStart - 1),
h264::isKeyframe(buffer + lastStart + 4, i - lastStart - 5));
lastStart = i;
}
}
// Last packet (might be first, if no start codes found)
Bit::htobl(buffer + lastStart, (len - lastStart) - 4); // size-prepend
h264Packet(ts, track, buffer + lastStart, (len - lastStart),
h264::isKeyframe(buffer + lastStart + 4, len - lastStart - 4));
}
void State::h264Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey){
MEDIUM_MSG("H264: %llu@%llu, %lub%s", track, ts, len, isKey ? " (key)" : "");
// Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
// Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x1F);
if (nalType == 9 && len < 20){return;}// ignore delimiter-only packets
switch (nalType){
case 6: //SEI
return;
case 7: // SPS
if (tracks[track].spsData.size() != len - 4 ||
memcmp(buffer + 4, tracks[track].spsData.data(), len - 4) != 0){
INFO_MSG("Updated SPS from RTP data");
tracks[track].spsData.assign(buffer + 4, len - 4);
updateH264Init(track);
}
return;
case 8: // PPS
if (tracks[track].ppsData.size() != len - 4 ||
memcmp(buffer + 4, tracks[track].ppsData.data(), len - 4) != 0){
INFO_MSG("Updated PPS from RTP data");
tracks[track].ppsData.assign(buffer + 4, len - 4);
updateH264Init(track);
}
return;
default: // others, continue parsing
break;
}
double fps = tracks[track].fpsMeta;
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < tracks[track].packCount){tracks[track].packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - tracks[track].packCount) > 32){tracks[track].packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - tracks[track].packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = tracks[track].packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, tracks[track].packCount,
(frameNo - tracks[track].packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
tracks[track].packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey);
tracks[track].packCount++;
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
}
void State::h265Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey){
MEDIUM_MSG("H265: %llu@%llu, %lub%s", track, ts, len, isKey ? " (key)" : "");
// Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
// Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x7E) >> 1;
switch (nalType){
case 32: // VPS
case 33: // SPS
case 34: // PPS
tracks[track].hevcInfo.addUnit(buffer);
updateH265Init(track);
return;
default: // others, continue parsing
break;
}
double fps = tracks[track].fpsMeta;
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < tracks[track].packCount){tracks[track].packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - tracks[track].packCount) > 32){tracks[track].packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - tracks[track].packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = tracks[track].packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, tracks[track].packCount,
(frameNo - tracks[track].packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
tracks[track].packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey);
tracks[track].packCount++;
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
}
/// Returns the multiplier to use to get milliseconds from the RTP payload type for the given
/// track
double getMultiplier(const DTSC::Track &Trk){
@ -803,239 +699,18 @@ namespace SDP{
return ((double)Trk.rate / 1000.0);
}
void State::updateInit(const uint64_t trackNo, const std::string &initData){
if (myMeta->tracks.count(trackNo)){
myMeta->tracks[trackNo].init = initData;
}
}
/// Handles RTP packets generically, for both TCP and UDP-based connections.
/// In case of UDP, expects packets to be pre-sorted.
void State::handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt){
DTSC::Track &Trk = myMeta->tracks[track];
if (!tracks[track].firstTime){tracks[track].firstTime = pkt.getTimeStamp() + 1;}
uint64_t millis = (pkt.getTimeStamp() - tracks[track].firstTime + 1) / getMultiplier(Trk);
char *pl = pkt.getPayload();
uint32_t plSize = pkt.getPayloadSize();
INSANE_MSG("Received RTP packet for track %llu, time %llu -> %llu", track, pkt.getTimeStamp(),
millis);
if (Trk.codec == "ALAW" || Trk.codec == "opus" || Trk.codec == "PCM" || Trk.codec == "ULAW"){
DTSC::Packet nextPack;
nextPack.genericFill(millis, 0, track, pl, plSize, 0, false);
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
return;
}
if (Trk.codec == "AAC"){
// assume AAC packets are single AU units
/// \todo Support other input than single AU units
unsigned int headLen =
(Bit::btohs(pl) >> 3) + 2; // in bits, so /8, plus two for the prepended size
DTSC::Packet nextPack;
uint16_t samples = aac::AudSpecConf::samples(Trk.init);
uint32_t sampleOffset = 0;
uint32_t offset = 0;
uint32_t auSize = 0;
for (uint32_t i = 2; i < headLen; i += 2){
auSize = Bit::btohs(pl + i) >> 3; // only the upper 13 bits
nextPack.genericFill(
(pkt.getTimeStamp() + sampleOffset - tracks[track].firstTime + 1) / getMultiplier(Trk),
0, track, pl + headLen + offset, std::min(auSize, plSize - headLen - offset), 0, false);
offset += auSize;
sampleOffset += samples;
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
}
return;
}
if (Trk.codec == "MP2" || Trk.codec == "MP3"){
if (plSize < 5){
WARN_MSG("Empty packet ignored!");
return;
}
DTSC::Packet nextPack;
nextPack.genericFill(millis, 0, track, pl + 4, plSize - 4, 0, false);
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
return;
}
if (Trk.codec == "MPEG2"){
if (plSize < 5){
WARN_MSG("Empty packet ignored!");
return;
}
///\TODO Merge packets with same timestamp together
HIGH_MSG("Received MPEG2 packet: %s", RTP::MPEGVideoHeader(pl).toString().c_str());
DTSC::Packet nextPack;
nextPack.genericFill(millis, 0, track, pl + 4, plSize - 4, 0, false);
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
return;
}
if (Trk.codec == "HEVC"){
if (plSize < 2){
WARN_MSG("Empty packet ignored!");
return;
}
uint8_t nalType = (pl[0] & 0x7E) >> 1;
if (nalType == 48){
ERROR_MSG("AP not supported yet");
}else if (nalType == 49){
DONTEVEN_MSG("H265 Fragmentation Unit");
static Util::ResizeablePointer fuaBuffer;
// No length yet? Check for start bit. Ignore rest.
if (!fuaBuffer.size() && (pl[2] & 0x80) == 0){
HIGH_MSG("Not start of a new FU - throwing away");
return;
}
if (fuaBuffer.size() && ((pl[2] & 0x80) || (tracks[track].sorter.rtpSeq != pkt.getSequence()))){
WARN_MSG("H265 FU packet incompleted: %lu", fuaBuffer.size());
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
fuaBuffer[4] |= 0x80; // set error bit
h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
fuaBuffer.size() = 0;
return;
tConv[track].setCallbacks(incomingPacketCallback, snglStateInitCallback);
tConv[track].addRTP(pkt);
}
unsigned long len = plSize - 3; // ignore the three FU bytes in front
if (!fuaBuffer.size()){len += 6;}// six extra bytes for the first packet
if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;}
if (!fuaBuffer.size()){
memcpy(fuaBuffer + 6, pl + 3, plSize - 3);
// reconstruct first byte
fuaBuffer[4] = ((pl[2] & 0x3F) << 1) | (pl[0] & 0x81);
fuaBuffer[5] = pl[1];
}else{
memcpy(fuaBuffer + fuaBuffer.size(), pl + 3, plSize - 3);
}
fuaBuffer.size() += len;
if (pl[2] & 0x40){// last packet
VERYHIGH_MSG("H265 FU packet type %s (%u) completed: %lu",
h265::typeToStr((fuaBuffer[4] & 0x7E) >> 1),
(uint8_t)((fuaBuffer[4] & 0x7E) >> 1), fuaBuffer.size());
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
fuaBuffer.size() = 0;
}
return;
}else if (nalType == 50){
ERROR_MSG("PACI/TSCI not supported yet");
}else{
DONTEVEN_MSG("%s NAL unit (%u)", h265::typeToStr(nalType), nalType);
static Util::ResizeablePointer packBuffer;
if (!packBuffer.allocate(plSize + 4)){return;}
Bit::htobl(packBuffer, plSize); // size-prepend
memcpy(packBuffer + 4, pl, plSize);
h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer,
plSize + 4, h265::isKeyframe(packBuffer + 4, plSize));
return;
}
return;
}
if (Trk.codec == "H264"){
// Handles common H264 packets types, but not all.
// Generalizes and converts them all to a data format ready for DTSC, then calls h264Packet
// for that data.
// Prints a WARN-level message if packet type is unsupported.
/// \todo Support other H264 packets types?
if (!plSize){
WARN_MSG("Empty packet ignored!");
return;
}
if ((pl[0] & 0x1F) == 0){
WARN_MSG("H264 packet type null ignored");
return;
}
if ((pl[0] & 0x1F) < 24){
DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F));
static Util::ResizeablePointer packBuffer;
if (!packBuffer.allocate(plSize + 4)){return;}
Bit::htobl(packBuffer, plSize); // size-prepend
memcpy(packBuffer + 4, pl, plSize);
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer,
plSize + 4, h264::isKeyframe(packBuffer + 4, plSize));
return;
}
if ((pl[0] & 0x1F) == 24){
DONTEVEN_MSG("H264 STAP-A packet");
unsigned int len = 0;
unsigned int pos = 1;
while (pos + 1 < plSize){
unsigned int pLen = Bit::btohs(pl + pos);
INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos + 2] & 0x1F));
pos += 2 + pLen;
len += 4 + pLen;
}
static Util::ResizeablePointer packBuffer;
if (!packBuffer.allocate(len)){return;}
pos = 1;
len = 0;
bool isKey = false;
while (pos + 1 < plSize){
unsigned int pLen = Bit::btohs(pl + pos);
isKey |= h264::isKeyframe(pl + pos + 2, pLen);
Bit::htobl(packBuffer + len, pLen); // size-prepend
memcpy(packBuffer + len + 4, pl + pos + 2, pLen);
len += 4 + pLen;
pos += 2 + pLen;
}
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len,
isKey);
return;
}
if ((pl[0] & 0x1F) == 28){
DONTEVEN_MSG("H264 FU-A packet");
static Util::ResizeablePointer fuaBuffer;
// No length yet? Check for start bit. Ignore rest.
if (!fuaBuffer.size() && (pl[1] & 0x80) == 0){
HIGH_MSG("Not start of a new FU-A - throwing away");
return;
}
if (fuaBuffer.size() && ((pl[1] & 0x80) || (tracks[track].sorter.rtpSeq != pkt.getSequence()))){
WARN_MSG("Ending unfinished FU-A");
INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaBuffer.size());
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
// attempt to detect multiple H264 packets, even though specs disallow it
h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track,
fuaBuffer, fuaBuffer.size());
}else{
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
fuaBuffer[4] |= 0x80; // set error bit
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
}
fuaBuffer.size() = 0;
return;
}
unsigned long len = plSize - 2; // ignore the two FU-A bytes in front
if (!fuaBuffer.size()){len += 5;}// five extra bytes for the first packet
if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;}
if (!fuaBuffer.size()){
memcpy(fuaBuffer + 4, pl + 1, plSize - 1);
// reconstruct first byte
fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pl[0] & 0xE0);
}else{
memcpy(fuaBuffer + fuaBuffer.size(), pl + 2, plSize - 2);
}
fuaBuffer.size() += len;
if (pl[1] & 0x40){// last packet
INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F),
fuaBuffer.size());
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
// attempt to detect multiple H264 packets, even though specs disallow it
h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track,
fuaBuffer, fuaBuffer.size());
}else{
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
}
fuaBuffer.size() = 0;
}
return;
}
WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F));
return;
}
}
}// namespace SDP

View file

@ -1,8 +1,9 @@
#include "dtsc.h"
#include "h265.h"
#include "http_parser.h"
#include "rtp.h"
#include "socket.h"
#include "h265.h"
#include <vector>
namespace SDP{
@ -10,6 +11,16 @@ namespace SDP{
/// Structure used to keep track of selected tracks.
class Track{
public:
Track();
std::string generateTransport(uint32_t trackNo, const std::string &dest = "",
bool TCPmode = true);
std::string getParamString(const std::string &param) const;
uint64_t getParamInt(const std::string &param) const;
bool parseTransport(const std::string &transport, const std::string &host,
const std::string &source, const DTSC::Track &trk);
std::string rtpInfo(const DTSC::Track &trk, const std::string &source, uint64_t currentTime);
public:
Socket::UDPConnection data;
Socket::UDPConnection rtcp;
@ -29,38 +40,26 @@ namespace SDP{
uint64_t fpsTime;
double fpsMeta;
double fps;
Track();
std::string generateTransport(uint32_t trackNo, const std::string &dest = "", bool TCPmode = true);
std::string getParamString(const std::string &param) const;
uint64_t getParamInt(const std::string &param) const;
bool parseTransport(const std::string &transport, const std::string &host,
const std::string &source, const DTSC::Track &trk);
std::string rtpInfo(const DTSC::Track &trk, const std::string &source, uint64_t currentTime);
};
class State{
public:
State(){
incomingPacketCallback = 0;
myMeta = 0;
}
DTSC::Meta *myMeta;
State();
void (*incomingPacketCallback)(const DTSC::Packet &pkt);
std::map<uint32_t, Track> tracks; ///< List of selected tracks with SDP-specific session data.
void parseSDP(const std::string &sdp);
void parseSDPEx(const std::string &sdp);
void updateH264Init(uint64_t trackNo);
void updateH265Init(uint64_t trackNo);
void updateInit(const uint64_t trackNo, const std::string &initData);
uint32_t getTrackNoForChannel(uint8_t chan);
uint32_t parseSetup(HTTP::Parser &H, const std::string &host,
const std::string &source);
uint32_t parseSetup(HTTP::Parser &H, const std::string &host, const std::string &source);
void handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt);
void h264MultiParse(uint64_t ts, const uint64_t track, char *buffer, const uint32_t len);
void h264Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey);
void h265Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey);
public:
DTSC::Meta *myMeta;
std::map<uint32_t, RTP::toDTSC> tConv; ///<Converters to DTSC
std::map<uint32_t, Track> tracks; ///< List of selected tracks with SDP-specific session data.
};
std::string mediaDescription(const DTSC::Track &trk);
}
}// namespace SDP