diff --git a/CMakeLists.txt b/CMakeLists.txt index d0a5a69f..d24ac2a7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -159,6 +159,7 @@ set(libHeaders ${SOURCE_DIR}/lib/rijndael.h ${SOURCE_DIR}/lib/rtmpchunks.h ${SOURCE_DIR}/lib/rtp.h + ${SOURCE_DIR}/lib/sdp.h ${SOURCE_DIR}/lib/shared_memory.h ${SOURCE_DIR}/lib/socket.h ${SOURCE_DIR}/lib/stream.h @@ -208,6 +209,7 @@ set(libSources ${SOURCE_DIR}/lib/rijndael.cpp ${SOURCE_DIR}/lib/rtmpchunks.cpp ${SOURCE_DIR}/lib/rtp.cpp + ${SOURCE_DIR}/lib/sdp.cpp ${SOURCE_DIR}/lib/shared_memory.cpp ${SOURCE_DIR}/lib/socket.cpp ${SOURCE_DIR}/lib/stream.cpp @@ -289,6 +291,7 @@ makeAnalyser(MP4 mp4) #LTS makeAnalyser(H264 h264) #LTS makeAnalyser(HLS hls) #LTS makeAnalyser(RIFF riff) #LTS +makeAnalyser(RTSP rtsp) #LTS #LTS_START ######################################## diff --git a/lib/rtp.cpp b/lib/rtp.cpp index 821e602d..26c735ed 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -1,132 +1,227 @@ -#include #include "rtp.h" -#include "timing.h" #include "defines.h" +#include "encode.h" +#include "timing.h" +#include "bitfields.h" +#include "mpeg.h" +#include - -namespace RTP { +namespace RTP{ double Packet::startRTCP = 0; - unsigned int MAX_SEND = 1500-28; + unsigned int MAX_SEND = 1500 - 28; - unsigned int Packet::getHsize() const { - return 12 + 4 * getContribCount(); - } + unsigned int Packet::getHsize() const{return 12 + 4 * getContribCount();} - unsigned int Packet::getPayloadSize() const { - return datalen - getHsize(); - } + unsigned int Packet::getPayloadSize() const{return datalen - getHsize();} - char * Packet::getPayload() const { - return data + getHsize(); - } + char *Packet::getPayload() const{return data + getHsize();} - unsigned int Packet::getVersion() const { - return (data[0] >> 6) & 0x3; - } + unsigned int Packet::getVersion() const{return (data[0] >> 6) & 0x3;} - unsigned int Packet::getPadding() const { - return (data[0] >> 5) & 0x1; - } + unsigned int Packet::getPadding() const{return (data[0] >> 5) & 0x1;} - unsigned int Packet::getExtension() const { - return (data[0] >> 4) & 0x1; - } + unsigned int Packet::getExtension() const{return (data[0] >> 4) & 0x1;} - unsigned int Packet::getContribCount() const { - return (data[0]) & 0xE; - } + unsigned int Packet::getContribCount() const{return (data[0]) & 0xE;} - unsigned int Packet::getMarker() const { - return (data[1] >> 7) & 0x1; - } + unsigned int Packet::getMarker() const{return (data[1] >> 7) & 0x1;} - unsigned int Packet::getPayloadType() const { - return (data[1]) & 0x7F; - } + unsigned int Packet::getPayloadType() const{return (data[1]) & 0x7F;} - unsigned int Packet::getSequence() const { - return (((((unsigned int)data[2]) << 8) + data[3])); - } + unsigned int Packet::getSequence() const{return (((((unsigned int)data[2]) << 8) + data[3]));} - unsigned int Packet::getTimeStamp() const { - return ntohl(*((unsigned int *)(data + 4))); - } + unsigned int Packet::getTimeStamp() const{return ntohl(*((unsigned int *)(data + 4)));} - unsigned int Packet::getSSRC() const { - return ntohl(*((unsigned int *)(data + 8))); - } + unsigned int Packet::getSSRC() const{return ntohl(*((unsigned int *)(data + 8)));} - char * Packet::getData() { - return data + 8 + 4 * getContribCount() + getExtension(); - } + char *Packet::getData(){return data + 8 + 4 * getContribCount() + getExtension();} - void Packet::setTimestamp(unsigned int t) { - *((unsigned int *)(data + 4)) = htonl(t); - } + void Packet::setTimestamp(unsigned int t){*((unsigned int *)(data + 4)) = htonl(t);} - void Packet::setSequence(unsigned int seq) { - *((short *)(data + 2)) = htons(seq); - } + void Packet::setSequence(unsigned int seq){*((short *)(data + 2)) = htons(seq);} - void Packet::setSSRC(unsigned long ssrc) { - *((int *)(data + 8)) = htonl(ssrc); - } + void Packet::setSSRC(unsigned long ssrc){*((int *)(data + 8)) = htonl(ssrc);} - void Packet::increaseSequence() { - *((short *)(data + 2)) = htons(getSequence() + 1); - } + void Packet::increaseSequence(){*((short *)(data + 2)) = htons(getSequence() + 1);} - void Packet::sendH264(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel) { + void Packet::sendH264(void *socket, void callBack(void *, char *, unsigned int, unsigned int), + const char *payload, unsigned int payloadlen, unsigned int channel){ /// \todo This function probably belongs in DMS somewhere. - if (payloadlen+getHsize()+2 <= maxDataLen) { - data[1] |= 0x80;//setting the RTP marker bit to 1 + if (payloadlen + getHsize() + 2 <= maxDataLen){ + data[1] |= 0x80; // setting the RTP marker bit to 1 memcpy(data + getHsize(), payload, payloadlen); callBack(socket, data, getHsize() + payloadlen, channel); sentPackets++; - sentBytes += payloadlen+getHsize(); + sentBytes += payloadlen + getHsize(); increaseSequence(); - } else { - data[1] &= 0x7F;//setting the RTP marker bit to 0 + }else{ + data[1] &= 0x7F; // setting the RTP marker bit to 0 unsigned int sent = 0; - unsigned int sending = maxDataLen-getHsize()-2;//packages are of size MAX_SEND, except for the final one + unsigned int sending = + maxDataLen - getHsize() - 2; // packages are of size MAX_SEND, except for the final one char initByte = (payload[0] & 0xE0) | 0x1C; - char serByte = payload[0] & 0x1F; //ser is now 000 + char serByte = payload[0] & 0x1F; // ser is now 000 data[getHsize()] = initByte; - while (sent < payloadlen) { - if (sent == 0) { - serByte |= 0x80;//set first bit to 1 - } else { - serByte &= 0x7F;//set first bit to 0 + while (sent < payloadlen){ + if (sent == 0){ + serByte |= 0x80; // set first bit to 1 + }else{ + serByte &= 0x7F; // set first bit to 0 } - if (sent + sending >= payloadlen) { - //last package + if (sent + sending >= payloadlen){ + // last package serByte |= 0x40; sending = payloadlen - sent; - data[1] |= 0x80;//setting the RTP marker bit to 1 + data[1] |= 0x80; // setting the RTP marker bit to 1 } data[getHsize() + 1] = serByte; memcpy(data + getHsize() + 2, payload + 1 + sent, sending); callBack(socket, data, getHsize() + 2 + sending, channel); sentPackets++; - sentBytes += sending+getHsize()+2; + sentBytes += sending + getHsize() + 2; sent += sending; increaseSequence(); } } } - void Packet::sendData(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel, std::string codec) { + void Packet::sendH265(void *socket, void callBack(void *, char *, unsigned int, unsigned int), + const char *payload, unsigned int payloadlen, unsigned int channel){ /// \todo This function probably belongs in DMS somewhere. - data[1] |= 0x80;//setting the RTP marker bit to 1 + if (payloadlen + getHsize() + 3 <= maxDataLen){ + data[1] |= 0x80; // setting the RTP marker bit to 1 + memcpy(data + getHsize(), payload, payloadlen); + callBack(socket, data, getHsize() + payloadlen, channel); + sentPackets++; + sentBytes += payloadlen + getHsize(); + increaseSequence(); + }else{ + data[1] &= 0x7F; // setting the RTP marker bit to 0 + unsigned int sent = 0; + unsigned int sending = + maxDataLen - getHsize() - 3; // packages are of size MAX_SEND, except for the final one + char initByteA = (payload[0] & 0x81) | 0x62; + char initByteB = payload[1]; + char serByte = (payload[0] & 0x7E) >> 1; // SE is now 00 + data[getHsize()] = initByteA; + data[getHsize()+1] = initByteB; + while (sent < payloadlen){ + if (sent == 0){ + serByte |= 0x80; // set first bit to 1 + }else{ + serByte &= 0x7F; // set first bit to 0 + } + if (sent + sending >= payloadlen){ + // last package + serByte |= 0x40; + sending = payloadlen - sent; + data[1] |= 0x80; // setting the RTP marker bit to 1 + } + data[getHsize() + 2] = serByte; + memcpy(data + getHsize() + 3, payload + 2 + sent, sending); + callBack(socket, data, getHsize() + 3 + sending, channel); + sentPackets++; + sentBytes += sending + getHsize() + 3; + sent += sending; + increaseSequence(); + } + } + } + + void Packet::sendMPEG2(void *socket, void callBack(void *, char *, unsigned int, unsigned int), + const char *payload, unsigned int payloadlen, unsigned int channel){ + /// \todo This function probably belongs in DMS somewhere. + if (payloadlen + getHsize() + 4 <= maxDataLen){ + data[1] |= 0x80; // setting the RTP marker bit to 1 + Mpeg::MPEG2Info mInfo = Mpeg::parseMPEG2Headers(payload, payloadlen); + MPEGVideoHeader mHead(data+getHsize()); + mHead.clear(); + mHead.setTempRef(mInfo.tempSeq); + mHead.setPictureType(mInfo.frameType); + if (mInfo.isHeader){ + mHead.setSequence(); + } + mHead.setBegin(); + mHead.setEnd(); + memcpy(data + getHsize() + 4, payload, payloadlen); + callBack(socket, data, getHsize() + payloadlen + 4, channel); + sentPackets++; + sentBytes += payloadlen + getHsize() + 4; + increaseSequence(); + }else{ + data[1] &= 0x7F; // setting the RTP marker bit to 0 + unsigned int sent = 0; + unsigned int sending = + maxDataLen - getHsize() - 4; // packages are of size MAX_SEND, except for the final one + Mpeg::MPEG2Info mInfo; + MPEGVideoHeader mHead(data+getHsize()); + while (sent < payloadlen){ + mHead.clear(); + if (sent + sending >= payloadlen){ + mHead.setEnd(); + sending = payloadlen - sent; + data[1] |= 0x80; // setting the RTP marker bit to 1 + } + Mpeg::parseMPEG2Headers(payload, sent+sending, mInfo); + mHead.setTempRef(mInfo.tempSeq); + mHead.setPictureType(mInfo.frameType); + if (sent == 0){ + if (mInfo.isHeader){ + mHead.setSequence(); + } + mHead.setBegin(); + } + memcpy(data + getHsize() + 4, payload + sent, sending); + callBack(socket, data, getHsize() + 4 + sending, channel); + sentPackets++; + sentBytes += sending + getHsize() + 4; + sent += sending; + increaseSequence(); + } + } + } + + void Packet::sendData(void *socket, void callBack(void *, char *, unsigned int, unsigned int), + const char *payload, unsigned int payloadlen, unsigned int channel, + std::string codec){ + if (codec == "H264"){ + unsigned long sent = 0; + while (sent < payloadlen){ + unsigned long nalSize = ntohl(*((unsigned long *)(payload + sent))); + sendH264(socket, callBack, payload + sent + 4, nalSize, channel); + sent += nalSize + 4; + } + return; + } + if (codec == "HEVC"){ + unsigned long sent = 0; + while (sent < payloadlen){ + unsigned long nalSize = ntohl(*((unsigned long *)(payload + sent))); + sendH265(socket, callBack, payload + sent + 4, nalSize, channel); + sent += nalSize + 4; + } + return; + } + if (codec == "MPEG2"){ + sendMPEG2(socket, callBack, payload, payloadlen, channel); + return; + } + /// \todo This function probably belongs in DMS somewhere. + data[1] |= 0x80; // setting the RTP marker bit to 1 long offsetLen = 0; if (codec == "AAC"){ *((long *)(data + getHsize())) = htonl(((payloadlen << 3) & 0x0010fff8) | 0x00100000); offsetLen = 4; - }else if (codec == "MP3"){ - *((long *)(data + getHsize())) = 0;//this is MBZ and Frag_Offset, which is always 0 + }else if (codec == "MP3" || codec == "MP2"){ + //See RFC 2250, "MPEG Audio-specific header" + *((long *)(data + getHsize())) = 0; // this is MBZ and Frag_Offset, which are always 0 + if (payload[0] != 0xFF){ + FAIL_MSG("MP2/MP3 data does not start with header?"); + } offsetLen = 4; }else if (codec == "AC3"){ - *((short *)(data + getHsize())) = htons(0x0001) ;//this is 6 bits MBZ, 2 bits FT = 0 = full frames and 8 bits saying we send 1 frame + *((short *)(data + getHsize())) = htons(0x0001); // this is 6 bits MBZ, 2 bits FT = 0 = full + // frames and 8 bits saying we send 1 frame offsetLen = 2; } if (maxDataLen < getHsize() + offsetLen + payloadlen){ @@ -135,10 +230,10 @@ namespace RTP { return; } uint32_t newMaxLen = getHsize() + offsetLen + payloadlen; - char * newData = new char[newMaxLen]; + char *newData = new char[newMaxLen]; if (newData){ memcpy(newData, data, maxDataLen); - delete [] data; + delete[] data; data = newData; maxDataLen = newMaxLen; } @@ -146,13 +241,14 @@ namespace RTP { memcpy(data + getHsize() + offsetLen, payload, payloadlen); callBack(socket, data, getHsize() + offsetLen + payloadlen, channel); sentPackets++; - sentBytes += payloadlen; + sentBytes += payloadlen + offsetLen + getHsize(); increaseSequence(); } - - void Packet::sendRTCP(long long & connectedAt, void * socket, unsigned int tid , DTSC::Meta & metadata, void callBack(void *, char *, unsigned int, unsigned int)) { - void * rtcpData = malloc(32); + void Packet::sendRTCP(long long &connectedAt, void *socket, unsigned int tid, + DTSC::Meta &metadata, + void callBack(void *, char *, unsigned int, unsigned int)){ + void *rtcpData = malloc(32); if (!rtcpData){ FAIL_MSG("Could not allocate 32 bytes. Something is seriously messed up."); return; @@ -160,62 +256,63 @@ namespace RTP { ((int *)rtcpData)[0] = htonl(0x80C80006); ((int *)rtcpData)[1] = htonl(getSSRC()); // unsigned int tid = packet["trackid"].asInt(); - //timestamp in ms + // timestamp in ms double ntpTime = 2208988800UL + Util::epoch() + (Util::getMS() % 1000) / 1000.0; - if (startRTCP < 1 && startRTCP > -1) { - startRTCP = ntpTime; - } + if (startRTCP < 1 && startRTCP > -1){startRTCP = ntpTime;} ntpTime -= startRTCP; - - ((int *)rtcpData)[2] = htonl(2208988800UL + Util::epoch()); //epoch is in seconds - ((int *)rtcpData)[3] = htonl((Util::getMS() % 1000) * 4294967.295); - if (metadata.tracks[tid].codec == "H264") { - ((int *)rtcpData)[4] = htonl((ntpTime - 0) * 90000); //rtpts - } else { - ((int *)rtcpData)[4] = htonl((ntpTime - 0) * metadata.tracks[tid].rate); //rtpts + + ((int *)rtcpData)[2] = htonl(2208988800UL + Util::epoch()); // epoch is in seconds + ((int *)rtcpData)[3] = htonl((Util::getMS() % 1000) * 4294967.295); + if (metadata.tracks[tid].codec == "H264"){ + ((int *)rtcpData)[4] = htonl((ntpTime - 0) * 90000); // rtpts + }else{ + ((int *)rtcpData)[4] = htonl((ntpTime - 0) * metadata.tracks[tid].rate); // rtpts } - //it should be the time packet was sent maybe, after all? + // it should be the time packet was sent maybe, after all? //*((int *)(rtcpData+16) ) = htonl(getTimeStamp());//rtpts - ((int *)rtcpData)[5] = htonl(sentPackets);//packet - ((int *)rtcpData)[6] = htonl(sentBytes);//octet - callBack(socket, (char*)rtcpData , 28 , 0); + ((int *)rtcpData)[5] = htonl(sentPackets); // packet + ((int *)rtcpData)[6] = htonl(sentBytes); // octet + callBack(socket, (char *)rtcpData, 28, 0); free(rtcpData); } - Packet::Packet() { + Packet::Packet(){ managed = false; data = 0; } - Packet::Packet(unsigned int payloadType, unsigned int sequence, unsigned int timestamp, unsigned int ssrc, unsigned int csrcCount) { + Packet::Packet(unsigned int payloadType, unsigned int sequence, unsigned int timestamp, + unsigned int ssrc, unsigned int csrcCount){ managed = true; - data = new char[12 + 4 * csrcCount + 2 + MAX_SEND]; //headerSize, 2 for FU-A, MAX_SEND for maximum sent size + data = new char[12 + 4 * csrcCount + 2 + + MAX_SEND]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size if (data){ maxDataLen = 12 + 4 * csrcCount + 2 + MAX_SEND; - data[0] = ((2) << 6) | ((0 & 1) << 5) | ((0 & 1) << 4) | (csrcCount & 15); //version, padding, extension, csrc count - data[1] = payloadType & 0x7F; //marker and payload type + data[0] = ((2) << 6) | ((0 & 1) << 5) | ((0 & 1) << 4) | + (csrcCount & 15); // version, padding, extension, csrc count + data[1] = payloadType & 0x7F; // marker and payload type }else{ maxDataLen = 0; } - setSequence(sequence - 1); //we automatically increase the sequence each time when p + setSequence(sequence - 1); // we automatically increase the sequence each time when p setTimestamp(timestamp); setSSRC(ssrc); sentBytes = 0; sentPackets = 0; } - Packet::Packet(const Packet & o) { + Packet::Packet(const Packet &o){ managed = true; maxDataLen = 0; - if (o.data && o.maxDataLen) { - data = new char[o.maxDataLen]; //headerSize, 2 for FU-A, MAX_SEND for maximum sent size - if (data) { + if (o.data && o.maxDataLen){ + data = new char[o.maxDataLen]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size + if (data){ maxDataLen = o.maxDataLen; memcpy(data, o.data, o.maxDataLen); } - } else { - data = new char[14 + MAX_SEND];//headerSize, 2 for FU-A, MAX_SEND for maximum sent size - if (data) { + }else{ + data = new char[14 + MAX_SEND]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size + if (data){ maxDataLen = 14 + MAX_SEND; memset(data, 0, maxDataLen); } @@ -224,23 +321,21 @@ namespace RTP { sentPackets = o.sentPackets; } - void Packet::operator=(const Packet & o) { + void Packet::operator=(const Packet &o){ managed = true; maxDataLen = 0; - if (data && managed) { - delete[] data; - } + if (data && managed){delete[] data;} data = 0; - if (o.data && o.maxDataLen) { - data = new char[o.maxDataLen]; //headerSize, 2 for FU-A, MAX_SEND for maximum sent size - if (data) { + if (o.data && o.maxDataLen){ + data = new char[o.maxDataLen]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size + if (data){ maxDataLen = o.maxDataLen; memcpy(data, o.data, o.maxDataLen); } - } else { - data = new char[14 + MAX_SEND];//headerSize, 2 for FU-A, MAX_SEND for maximum sent size - if (data) { + }else{ + data = new char[14 + MAX_SEND]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size + if (data){ maxDataLen = 14 + MAX_SEND; memset(data, 0, maxDataLen); } @@ -249,15 +344,68 @@ namespace RTP { sentPackets = o.sentPackets; } - Packet::~Packet() { - if (managed) { - delete [] data; - } + Packet::~Packet(){ + if (managed){delete[] data;} } - Packet::Packet(const char * dat, unsigned int len) { + Packet::Packet(const char *dat, unsigned int len){ managed = false; datalen = len; - data = (char *) dat; + data = (char *)dat; + } + + MPEGVideoHeader::MPEGVideoHeader(char *d){ + data = d; + } + + uint16_t MPEGVideoHeader::getTotalLen() const{ + uint16_t ret = 4; + if (data[0] & 0x08){ + ret += 4; + if (data[4] & 0x40){ + ret += data[8]; + } + } + return ret; + } + + std::string MPEGVideoHeader::toString() const{ + std::stringstream ret; + uint32_t firstHead = Bit::btohl(data); + ret << "TR=" << ((firstHead & 0x3FF0000) >> 16); + if (firstHead & 0x4000000){ret << " Ext";} + if (firstHead & 0x2000){ret << " SeqHead";} + if (firstHead & 0x1000){ret << " SliceBegin";} + if (firstHead & 0x800){ret << " SliceEnd";} + ret << " PicType=" << ((firstHead & 0x700) >> 8); + if (firstHead & 0x80){ret << " FBV";} + ret << " BFC=" << ((firstHead & 0x70) >> 4); + if (firstHead & 0x8){ret << " FFV";} + ret << " FFC=" << (firstHead & 0x7); + return ret.str(); + } + + void MPEGVideoHeader::clear(){ + ((uint32_t*)data)[0] = 0; + } + + void MPEGVideoHeader::setTempRef(uint16_t ref){ + data[0] |= (ref >> 8) & 0x03; + data[1] = ref & 0xff; + } + + void MPEGVideoHeader::setPictureType(uint8_t pType){ + data[2] |= pType & 0x7; + } + + void MPEGVideoHeader::setSequence(){ + data[2] |= 0x20; + } + void MPEGVideoHeader::setBegin(){ + data[2] |= 0x10; + } + void MPEGVideoHeader::setEnd(){ + data[2] |= 0x8; } } + diff --git a/lib/rtp.h b/lib/rtp.h index 495a4540..1849e6cf 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -1,65 +1,91 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "socket.h" -#include "json.h" #include "dtsc.h" +#include "json.h" #include "mp4.h" #include "mp4_generic.h" +#include "socket.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /// This namespace holds all RTP-parsing and sending related functionality. -namespace RTP { +namespace RTP{ extern unsigned int MAX_SEND; - /// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in here. - class Packet { - private: - bool managed; - char * data; ///type = item; + thisTrack->trackID = trackNo; + }else{ + WARN_MSG("Media type not supported: %s", item.c_str()); + continue; + } + getline(words, item, ' '); + if (!getline(words, item, ' ') || item != "RTP/AVP"){ + WARN_MSG("Media transport not supported: %s", item.c_str()); + continue; + } + if (getline(words, item, ' ')){ + uint64_t avp_type = JSON::Value(item).asInt(); + switch (avp_type){ + case 8: // PCM A-law + INFO_MSG("PCM A-law payload type"); + nope = false; + thisTrack->codec = "ALAW"; + thisTrack->rate = 8000; + thisTrack->channels = 1; + INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + break; + case 10: // PCM Stereo, 44.1kHz + INFO_MSG("Linear PCM stereo 44.1kHz payload type"); + nope = false; + thisTrack->codec = "PCM"; + thisTrack->size = 16; + thisTrack->rate = 44100; + thisTrack->channels = 2; + INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + break; + case 11: // PCM Mono, 44.1kHz + INFO_MSG("Linear PCM mono 44.1kHz payload type"); + nope = false; + thisTrack->codec = "PCM"; + thisTrack->rate = 44100; + thisTrack->size = 16; + thisTrack->channels = 1; + INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + break; + case 14: // MPA + INFO_MSG("MPA payload type"); + nope = false; + thisTrack->codec = "MP3"; + thisTrack->rate = 0; + thisTrack->size = 0; + thisTrack->channels = 0; + INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + break; + case 32: // MPV + INFO_MSG("MPV payload type"); + nope = false; + thisTrack->codec = "MPEG2"; + INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + break; + default: + // dynamic type + if (avp_type >= 96 && avp_type <= 127){ + INFO_MSG("Dynamic payload type (%llu) detected", avp_type); + nope = false; + continue; + }else{ + FAIL_MSG("Payload type %llu not supported!", avp_type); + continue; + } + } + } + continue; + } + if (nope){continue;}// ignore lines if we have no valid track + // RTP mapping + if (to.substr(0, 8) == "a=rtpmap"){ + std::string mediaType = to.substr(to.find(' ', 8) + 1); + std::string trCodec = mediaType.substr(0, mediaType.find('/')); + // convert to fullcaps + for (unsigned int i = 0; i < trCodec.size(); ++i){ + if (trCodec[i] <= 122 && trCodec[i] >= 97){trCodec[i] -= 32;} + } + if (thisTrack->type == "audio"){ + std::string extraInfo = mediaType.substr(mediaType.find('/') + 1); + if (extraInfo.find('/') != std::string::npos){ + size_t lastSlash = extraInfo.find('/'); + thisTrack->rate = atoll(extraInfo.substr(0, lastSlash).c_str()); + thisTrack->channels = atoll(extraInfo.substr(lastSlash + 1).c_str()); + }else{ + thisTrack->rate = atoll(extraInfo.c_str()); + thisTrack->channels = 1; + } + } + if (trCodec == "H264"){ + thisTrack->codec = "H264"; + thisTrack->rate = 90000; + } + if (trCodec == "H265"){ + thisTrack->codec = "HEVC"; + thisTrack->rate = 90000; + } + if (trCodec == "OPUS"){ + thisTrack->codec = "opus"; + thisTrack->init = std::string("OpusHead\001\002\170\000\200\273\000\000\000\000\000", 19); + } + if (trCodec == "PCMA"){thisTrack->codec = "ALAW";} + if (trCodec == "L8"){ + thisTrack->codec = "PCM"; + thisTrack->size = 8; + } + if (trCodec == "L16"){ + thisTrack->codec = "PCM"; + thisTrack->size = 16; + } + if (trCodec == "L20"){ + thisTrack->codec = "PCM"; + thisTrack->size = 20; + } + if (trCodec == "L24"){ + thisTrack->codec = "PCM"; + thisTrack->size = 24; + } + if (trCodec == "MPEG4-GENERIC"){thisTrack->codec = "AAC";} + if (!thisTrack->codec.size()){ + ERROR_MSG("Unsupported RTP mapping: %s", mediaType.c_str()); + }else{ + INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + } + continue; + } + if (to.substr(0, 10) == "a=control:"){ + tracks[trackNo].control = to.substr(10); + continue; + } + if (to.substr(0, 7) == "a=fmtp:"){ + tracks[trackNo].fmtp = to.substr(7); + if (thisTrack->codec == "AAC"){ + if (tracks[trackNo].getParamString("mode") != "AAC-hbr"){ + // a=fmtp:97 + // profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; + // config=120856E500 + FAIL_MSG("AAC transport mode not supported: %s", + tracks[trackNo].getParamString("mode").c_str()); + nope = true; + myMeta->tracks.erase(trackNo); + tracks.erase(trackNo); + continue; + } + thisTrack->init = Encodings::Hex::decode(tracks[trackNo].getParamString("config")); + // myMeta.tracks[trackNo].rate = aac::AudSpecConf::rate(myMeta.tracks[trackNo].init); + } + if (thisTrack->codec == "H264"){ + // a=fmtp:96 packetization-mode=1; + // sprop-parameter-sets=Z0LAHtkA2D3m//AUABqxAAADAAEAAAMAMg8WLkg=,aMuDyyA=; + // profile-level-id=42C01E + std::string sprop = tracks[trackNo].getParamString("sprop-parameter-sets"); + size_t comma = sprop.find(','); + tracks[trackNo].spsData = Encodings::Base64::decode(sprop.substr(0, comma)); + tracks[trackNo].ppsData = Encodings::Base64::decode(sprop.substr(comma + 1)); + updateH264Init(trackNo); + } + if (thisTrack->codec == "HEVC"){ + tracks[trackNo].hevcInfo.addUnit(Encodings::Base64::decode(tracks[trackNo].getParamString("sprop-vps"))); + tracks[trackNo].hevcInfo.addUnit(Encodings::Base64::decode(tracks[trackNo].getParamString("sprop-sps"))); + tracks[trackNo].hevcInfo.addUnit(Encodings::Base64::decode(tracks[trackNo].getParamString("sprop-pps"))); + updateH265Init(trackNo); + } + continue; + } + // We ignore bandwidth lines + if (to.substr(0, 2) == "b="){continue;} + // we ignore everything before the first media line. + if (!trackNo){continue;} + // at this point, the data is definitely for a track + INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str()); + } + } + + /// Calculates H265 track metadata from sps and pps data stored in tracks[trackNo] + void State::updateH265Init(uint64_t trackNo){ + DTSC::Track &Trk = myMeta->tracks[trackNo]; + SDP::Track &RTrk = tracks[trackNo]; + if (!RTrk.hevcInfo.haveRequired()){ + MEDIUM_MSG("Aborted meta fill for hevc track %lu: no info nal unit", trackNo); + return; + } + Trk.init = RTrk.hevcInfo.generateHVCC(); + + h265::metaInfo MI = tracks[trackNo].hevcInfo.getMeta(); + + RTrk.fpsMeta = MI.fps; + Trk.width = MI.width; + Trk.height = MI.height; + Trk.fpks = RTrk.fpsMeta * 1000; + } + + /// Calculates H264 track metadata from vps, sps and pps data stored in tracks[trackNo] + void State::updateH264Init(uint64_t trackNo){ + DTSC::Track &Trk = myMeta->tracks[trackNo]; + SDP::Track &RTrk = tracks[trackNo]; + h264::sequenceParameterSet sps(RTrk.spsData.data(), RTrk.spsData.size()); + h264::SPSMeta hMeta = sps.getCharacteristics(); + MP4::AVCC avccBox; + avccBox.setVersion(1); + avccBox.setProfile(RTrk.spsData[1]); + avccBox.setCompatibleProfiles(RTrk.spsData[2]); + avccBox.setLevel(RTrk.spsData[3]); + avccBox.setSPSNumber(1); + avccBox.setSPS(RTrk.spsData); + avccBox.setPPSNumber(1); + avccBox.setPPS(RTrk.ppsData); + RTrk.fpsMeta = hMeta.fps; + Trk.width = hMeta.width; + Trk.height = hMeta.height; + Trk.fpks = hMeta.fps * 1000; + Trk.init = std::string(avccBox.payload(), avccBox.payloadSize()); + } + + uint32_t State::getTrackNoForChannel(uint8_t chan){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ + if (chan == it->second.channel){return it->first;} + } + return 0; + } + + uint32_t State::parseSetup(HTTP::Parser &H, const std::string &cH, const std::string &src){ + static uint32_t trackCounter = 0; + if (H.url == "200"){ + ++trackCounter; + if (!tracks.count(trackCounter)){return 0;} + if (!tracks[trackCounter].parseTransport(H.GetHeader("Transport"), cH, src, + myMeta->tracks[trackCounter])){ + return 0; + } + return trackCounter; + } + + if (tracks.size()){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ + if (!it->second.control.size()){ + it->second.control = "track" + JSON::Value((long long)it->first).asString(); + INFO_MSG("Control track: %s", it->second.control.c_str()); + } + if (H.url.find(it->second.control) != std::string::npos || + H.GetVar("pass").find(it->second.control) != std::string::npos){ + INFO_MSG("Parsing SETUP against track %lu", it->first); + if (!it->second.parseTransport(H.GetHeader("Transport"), cH, src, + myMeta->tracks[it->first])){ + return 0; + } + return it->first; + } + } + } + if (H.url.find("/track") != std::string::npos){ + uint32_t trackNo = atoi(H.url.c_str() + H.url.find("/track") + 6); + if (trackNo){ + INFO_MSG("Parsing SETUP against track %lu", trackNo); + if (!tracks[trackNo].parseTransport(H.GetHeader("Transport"), cH, src, + myMeta->tracks[trackNo])){ + return 0; + } + return trackNo; + } + } + return 0; + } + + /// Handles a single H264 packet, checking if others are appended at the end in Annex B format. + /// If so, splits them up and calls h264Packet for each. If not, calls it only once for the whole + /// payload. + void State::h264MultiParse(uint64_t ts, const uint64_t track, char *buffer, const uint32_t len){ + uint32_t lastStart = 0; + for (uint32_t i = 0; i < len - 4; ++i){ + // search for start code + if (buffer[i] == 0 && buffer[i + 1] == 0 && buffer[i + 2] == 0 && buffer[i + 3] == 1){ + // if found, handle a packet from the last start code up to this start code + Bit::htobl(buffer + lastStart, (i - lastStart - 1) - 4); // size-prepend + h264Packet(ts, track, buffer + lastStart, (i - lastStart - 1), + h264::isKeyframe(buffer + lastStart + 4, i - lastStart - 5)); + lastStart = i; + } + } + // Last packet (might be first, if no start codes found) + Bit::htobl(buffer + lastStart, (len - lastStart) - 4); // size-prepend + h264Packet(ts, track, buffer + lastStart, (len - lastStart), + h264::isKeyframe(buffer + lastStart + 4, len - lastStart - 4)); + } + + void State::h264Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len, + bool isKey){ + MEDIUM_MSG("H264: %llu@%llu, %lub%s", track, ts, len, isKey ? " (key)" : ""); + // Ignore zero-length packets (e.g. only contained init data and nothing else) + if (!len){return;} + + // Header data? Compare to init, set if needed, and throw away + uint8_t nalType = (buffer[4] & 0x1F); + switch (nalType){ + case 7: // SPS + if (tracks[track].spsData.size() != len - 4 || + memcmp(buffer + 4, tracks[track].spsData.data(), len - 4) != 0){ + INFO_MSG("Updated SPS from RTP data"); + tracks[track].spsData.assign(buffer + 4, len - 4); + updateH264Init(track); + } + return; + case 8: // PPS + if (tracks[track].ppsData.size() != len - 4 || + memcmp(buffer + 4, tracks[track].ppsData.data(), len - 4) != 0){ + INFO_MSG("Updated PPS from RTP data"); + tracks[track].ppsData.assign(buffer + 4, len - 4); + updateH264Init(track); + } + return; + default: // others, continue parsing + break; + } + + double fps = tracks[track].fpsMeta; + uint32_t offset = 0; + uint64_t newTs = ts; + if (fps > 1){ + // Assume a steady frame rate, clip the timestamp based on frame number. + uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5; + while (frameNo < tracks[track].packCount){tracks[track].packCount--;} + // More than 32 frames behind? We probably skipped something, somewhere... + if ((frameNo - tracks[track].packCount) > 32){tracks[track].packCount = frameNo;} + // After some experimentation, we found that the time offset is the difference between the + // frame number and the packet counter, times the frame rate in ms + offset = (frameNo - tracks[track].packCount) * (1000.0 / fps); + //... and the timestamp is the packet counter times the frame rate in ms. + newTs = tracks[track].packCount * (1000.0 / fps); + VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts, + isKey ? "key" : "i", frameNo, fps, tracks[track].packCount, + (frameNo - tracks[track].packCount), offset); + }else{ + // For non-steady frame rate, assume no offsets are used and the timestamp is already correct + VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i", + tracks[track].packCount); + } + // Fill the new DTSC packet, buffer it. + DTSC::Packet nextPack; + nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey); + tracks[track].packCount++; + if (incomingPacketCallback){incomingPacketCallback(nextPack);} + } + + void State::h265Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len, + bool isKey){ + MEDIUM_MSG("H265: %llu@%llu, %lub%s", track, ts, len, isKey ? " (key)" : ""); + // Ignore zero-length packets (e.g. only contained init data and nothing else) + if (!len){return;} + + // Header data? Compare to init, set if needed, and throw away + uint8_t nalType = (buffer[4] & 0x7E) >> 1; + switch (nalType){ + case 32: // VPS + case 33: // SPS + case 34: // PPS + tracks[track].hevcInfo.addUnit(buffer); + updateH265Init(track); + return; + default: // others, continue parsing + break; + } + + double fps = tracks[track].fpsMeta; + uint32_t offset = 0; + uint64_t newTs = ts; + if (fps > 1){ + // Assume a steady frame rate, clip the timestamp based on frame number. + uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5; + while (frameNo < tracks[track].packCount){tracks[track].packCount--;} + // More than 32 frames behind? We probably skipped something, somewhere... + if ((frameNo - tracks[track].packCount) > 32){tracks[track].packCount = frameNo;} + // After some experimentation, we found that the time offset is the difference between the + // frame number and the packet counter, times the frame rate in ms + offset = (frameNo - tracks[track].packCount) * (1000.0 / fps); + //... and the timestamp is the packet counter times the frame rate in ms. + newTs = tracks[track].packCount * (1000.0 / fps); + VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts, + isKey ? "key" : "i", frameNo, fps, tracks[track].packCount, + (frameNo - tracks[track].packCount), offset); + }else{ + // For non-steady frame rate, assume no offsets are used and the timestamp is already correct + VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i", + tracks[track].packCount); + } + // Fill the new DTSC packet, buffer it. + DTSC::Packet nextPack; + nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey); + tracks[track].packCount++; + if (incomingPacketCallback){incomingPacketCallback(nextPack);} + } + + ///Returns the multiplier to use to get milliseconds from the RTP payload type for the given track + double getMultiplier(const DTSC::Track & Trk){ + if (Trk.type == "video" || Trk.codec == "MP2" || Trk.codec == "MP3"){return 90.0;} + return ((double)Trk.rate / 1000.0); + } + + /// Handles RTP packets generically, for both TCP and UDP-based connections. + /// In case of UDP, expects packets to be pre-sorted. + void State::handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt){ + DTSC::Track &Trk = myMeta->tracks[track]; + if (!tracks[track].firstTime){tracks[track].firstTime = pkt.getTimeStamp() + 1;} + uint64_t millis = (pkt.getTimeStamp() - tracks[track].firstTime + 1) / getMultiplier(Trk); + char *pl = pkt.getPayload(); + uint32_t plSize = pkt.getPayloadSize(); + INSANE_MSG("Received RTP packet for track %llu, time %llu -> %llu", track, pkt.getTimeStamp(), millis); + if (Trk.codec == "ALAW" || Trk.codec == "opus" || Trk.codec == "PCM"){ + DTSC::Packet nextPack; + nextPack.genericFill(millis, 0, track, pl, plSize, 0, false); + if (incomingPacketCallback){incomingPacketCallback(nextPack);} + return; + } + if (Trk.codec == "AAC"){ + // assume AAC packets are single AU units + /// \todo Support other input than single AU units + unsigned int headLen = + (Bit::btohs(pl) >> 3) + 2; // in bits, so /8, plus two for the prepended size + DTSC::Packet nextPack; + uint16_t samples = aac::AudSpecConf::samples(Trk.init); + uint32_t sampleOffset = 0; + uint32_t offset = 0; + uint32_t auSize = 0; + for (uint32_t i = 2; i < headLen; i += 2){ + auSize = Bit::btohs(pl + i) >> 3; // only the upper 13 bits + nextPack.genericFill((pkt.getTimeStamp() + sampleOffset - tracks[track].firstTime + 1) / + getMultiplier(Trk), + 0, track, pl + headLen + offset, + std::min(auSize, plSize - headLen - offset), 0, false); + offset += auSize; + sampleOffset += samples; + if (incomingPacketCallback){incomingPacketCallback(nextPack);} + } + return; + } + if (Trk.codec == "MP2" || Trk.codec == "MP3"){ + if (plSize < 5){ + WARN_MSG("Empty packet ignored!"); + return; + } + DTSC::Packet nextPack; + nextPack.genericFill(millis, 0, track, pl + 4, plSize - 4, 0, false); + if (incomingPacketCallback){incomingPacketCallback(nextPack);} + return; + } + if (Trk.codec == "MPEG2"){ + if (plSize < 5){ + WARN_MSG("Empty packet ignored!"); + return; + } + ///\TODO Merge packets with same timestamp together + HIGH_MSG("Received MPEG2 packet: %s", RTP::MPEGVideoHeader(pl).toString().c_str()); + DTSC::Packet nextPack; + nextPack.genericFill(millis, 0, track, pl + 4, plSize - 4, 0, false); + if (incomingPacketCallback){incomingPacketCallback(nextPack);} + return; + } + if (Trk.codec == "HEVC"){ + if (plSize < 2){ + WARN_MSG("Empty packet ignored!"); + return; + } + uint8_t nalType = (pl[0] & 0x7E) >> 1; + if (nalType == 48){ + ERROR_MSG("AP not supported yet"); + }else if (nalType == 49){ + DONTEVEN_MSG("H265 Fragmentation Unit"); + static Util::ResizeablePointer fuaBuffer; + + // No length yet? Check for start bit. Ignore rest. + if (!fuaBuffer.size() && (pl[2] & 0x80) == 0){ + HIGH_MSG("Not start of a new FU - throwing away"); + return; + } + if (fuaBuffer.size() && + ((pl[2] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){ + WARN_MSG("H265 FU packet incompleted: %lu", fuaBuffer.size()); + Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend + fuaBuffer[4] |= 0x80; // set error bit + h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, + fuaBuffer.size(), h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4)); + fuaBuffer.size() = 0; + return; + } + + unsigned long len = plSize - 3; // ignore the three FU bytes in front + if (!fuaBuffer.size()){len += 6;}// six extra bytes for the first packet + if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;} + if (!fuaBuffer.size()){ + memcpy(fuaBuffer + 6, pl + 3, plSize - 3); + // reconstruct first byte + fuaBuffer[4] = ((pl[2] & 0x3F) << 1) | (pl[0] & 0x81); + fuaBuffer[5] = pl[1]; + }else{ + memcpy(fuaBuffer + fuaBuffer.size(), pl + 3, plSize - 3); + } + fuaBuffer.size() += len; + + if (pl[2] & 0x40){// last packet + VERYHIGH_MSG("H265 FU packet type %s (%u) completed: %lu", h265::typeToStr((fuaBuffer[4] & 0x7E) >> 1), (uint8_t)((fuaBuffer[4] & 0x7E) >> 1), + fuaBuffer.size()); + Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend + h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, + fuaBuffer.size(), h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4)); + fuaBuffer.size() = 0; + } + return; + }else if (nalType == 50){ + ERROR_MSG("PACI/TSCI not supported yet"); + }else{ + DONTEVEN_MSG("%s NAL unit (%u)", h265::typeToStr(nalType), nalType); + static Util::ResizeablePointer packBuffer; + if (!packBuffer.allocate(plSize + 4)){return;} + Bit::htobl(packBuffer, plSize); // size-prepend + memcpy(packBuffer + 4, pl, plSize); + h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, + plSize + 4, h265::isKeyframe(packBuffer + 4, plSize)); + return; + } + return; + } + if (Trk.codec == "H264"){ + // Handles common H264 packets types, but not all. + // Generalizes and converts them all to a data format ready for DTSC, then calls h264Packet + // for that data. + // Prints a WARN-level message if packet type is unsupported. + /// \todo Support other H264 packets types? + if (!plSize){ + WARN_MSG("Empty packet ignored!"); + return; + } + if ((pl[0] & 0x1F) == 0){ + WARN_MSG("H264 packet type null ignored"); + return; + } + if ((pl[0] & 0x1F) < 24){ + DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F)); + static Util::ResizeablePointer packBuffer; + if (!packBuffer.allocate(plSize + 4)){return;} + Bit::htobl(packBuffer, plSize); // size-prepend + memcpy(packBuffer + 4, pl, plSize); + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, + plSize + 4, h264::isKeyframe(packBuffer + 4, plSize)); + return; + } + if ((pl[0] & 0x1F) == 24){ + DONTEVEN_MSG("H264 STAP-A packet"); + unsigned int len = 0; + unsigned int pos = 1; + while (pos + 1 < plSize){ + unsigned int pLen = Bit::btohs(pl + pos); + INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos + 2] & 0x1F)); + pos += 2 + pLen; + len += 4 + pLen; + } + static Util::ResizeablePointer packBuffer; + if (!packBuffer.allocate(len)){return;} + pos = 1; + len = 0; + bool isKey = false; + while (pos + 1 < plSize){ + unsigned int pLen = Bit::btohs(pl + pos); + isKey |= h264::isKeyframe(pl + pos + 2, pLen); + Bit::htobl(packBuffer + len, pLen); // size-prepend + memcpy(packBuffer + len + 4, pl + pos + 2, pLen); + len += 4 + pLen; + pos += 2 + pLen; + } + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len, + isKey); + return; + } + if ((pl[0] & 0x1F) == 28){ + DONTEVEN_MSG("H264 FU-A packet"); + static Util::ResizeablePointer fuaBuffer; + + // No length yet? Check for start bit. Ignore rest. + if (!fuaBuffer.size() && (pl[1] & 0x80) == 0){ + HIGH_MSG("Not start of a new FU-A - throwing away"); + return; + } + if (fuaBuffer.size() && + ((pl[1] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){ + WARN_MSG("Ending unfinished FU-A"); + INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaBuffer.size()); + uint8_t nalType = (fuaBuffer[4] & 0x1F); + if (nalType == 7 || nalType == 8){ + // attempt to detect multiple H264 packets, even though specs disallow it + h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, + fuaBuffer, fuaBuffer.size()); + }else{ + Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend + fuaBuffer[4] |= 0x80; // set error bit + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, + fuaBuffer.size(), h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4)); + } + fuaBuffer.size() = 0; + return; + } + + unsigned long len = plSize - 2; // ignore the two FU-A bytes in front + if (!fuaBuffer.size()){len += 5;}// five extra bytes for the first packet + if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;} + if (!fuaBuffer.size()){ + memcpy(fuaBuffer + 4, pl + 1, plSize - 1); + // reconstruct first byte + fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pl[0] & 0xE0); + }else{ + memcpy(fuaBuffer + fuaBuffer.size(), pl + 2, plSize - 2); + } + fuaBuffer.size() += len; + + if (pl[1] & 0x40){// last packet + INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F), + fuaBuffer.size()); + uint8_t nalType = (fuaBuffer[4] & 0x1F); + if (nalType == 7 || nalType == 8){ + // attempt to detect multiple H264 packets, even though specs disallow it + h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, + fuaBuffer, fuaBuffer.size()); + }else{ + Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, + fuaBuffer.size(), h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4)); + } + fuaBuffer.size() = 0; + } + return; + } + WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F)); + return; + } + } +} + diff --git a/lib/sdp.h b/lib/sdp.h new file mode 100644 index 00000000..8a4dcf5f --- /dev/null +++ b/lib/sdp.h @@ -0,0 +1,66 @@ +#include "dtsc.h" +#include "http_parser.h" +#include "rtp.h" +#include "socket.h" +#include "h265.h" + +namespace SDP{ + + double getMultiplier(const DTSC::Track & Trk); + + /// Structure used to keep track of selected tracks. + class Track{ + public: + Socket::UDPConnection data; + Socket::UDPConnection rtcp; + RTP::Packet pack; + long long rtcpSent; + uint64_t firstTime; + int channel; /// Channel number, used in TCP sending + uint64_t packCount; + uint16_t rtpSeq; + std::map packBuffer; + uint32_t cPort; + std::string transportString; /// Current transport string. + std::string control; + std::string fmtp; /// fmtp string, used by getParamString / getParamInt + std::string spsData; + std::string ppsData; + h265::initData hevcInfo; + uint64_t fpsTime; + double fpsMeta; + double fps; + Track(); + std::string getParamString(const std::string ¶m) const; + uint64_t getParamInt(const std::string ¶m) const; + bool parseTransport(const std::string &transport, const std::string &host, + const std::string &source, const DTSC::Track &trk); + std::string rtpInfo(const DTSC::Track &trk, const std::string &source, uint64_t currentTime); + }; + + class State{ + public: + State(){ + incomingPacketCallback = 0; + myMeta = 0; + } + DTSC::Meta *myMeta; + void (*incomingPacketCallback)(const DTSC::Packet &pkt); + std::map tracks; ///< List of selected tracks with SDP-specific session data. + void parseSDP(const std::string &sdp); + void updateH264Init(uint64_t trackNo); + void updateH265Init(uint64_t trackNo); + uint32_t getTrackNoForChannel(uint8_t chan); + uint32_t parseSetup(HTTP::Parser &H, const std::string &host, + const std::string &source); + void handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt); + void h264MultiParse(uint64_t ts, const uint64_t track, char *buffer, const uint32_t len); + void h264Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len, + bool isKey); + void h265Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len, + bool isKey); + }; + + std::string mediaDescription(const DTSC::Track &trk); +} + diff --git a/src/analysers/analyser_rtsp.cpp b/src/analysers/analyser_rtsp.cpp index 0f8b1767..bd8ac41b 100644 --- a/src/analysers/analyser_rtsp.cpp +++ b/src/analysers/analyser_rtsp.cpp @@ -1,197 +1,119 @@ -#include -#include -#include -#include -#include -#include +#include "analyser_rtsp.h" -#include -#include -#include -#include -#include +AnalyserRTSP *classPointer = 0; -namespace RtspRtp{ +void incomingPacket(const DTSC::Packet &pkt){ + classPointer->incoming(pkt); +} - int analyseRtspRtp(std::string rtspUrl){ - /*//parse hostname - std::string hostname = rtspUrl.substr(7); - hostname = hostname.substr(0,hostname.find('/')); - std::cout << hostname << std::endl; - HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender. - Socket::Connection conn(hostname,554,false);//setting rtsp connection - - bool optionsSent = false; - bool optionsRecvd = false; - bool descSent = false; - bool descRecvd = false; - bool setupComplete = false; - bool playSent = false; - int CSeq = 1; - while(conn.connected()){ - if(!optionsSent){ - HTTP_R.protocol="RTSP/1.0"; - HTTP_R.method = "OPTIONS"; - HTTP_R.url = rtspUrl; - HTTP_R.SetHeader("CSeq",CSeq); - CSeq++; - HTTP_R.SetHeader("User-Agent","mistANALyser"); - HTTP_R.SendRequest(conn); - optionsSent = true; - } - - if (optionsSent&& !optionsRecvd && (conn.Received().size() || conn.spool() )){ - if(HTTP_S.Read(conn)){ - std::cout << "recv opts" << std::endl; - - std::cout << HTTP_S.BuildResponse(HTTP_S.method,HTTP_S.url); - optionsRecvd = true; - } - } - +void AnalyserRTSP::init(Util::Config &conf){ + Analyser::init(conf); +} - - if(optionsRecvd && !descSent){ - HTTP_S.Clean(); - HTTP_R.protocol="RTSP/1.0"; - HTTP_R.method = "DESCRIBE"; - HTTP_R.url = rtspUrl; - HTTP_R.SetHeader("CSeq",CSeq); - CSeq++; - HTTP_R.SetHeader("User-Agent","mistANALyser"); - HTTP_R.SendRequest(conn); - descSent = true; - - } - - std::vector trackIds; - - if (descSent&&!descRecvd && (conn.Received().size() || conn.spool() )){ - - if(HTTP_S.Read(conn)){ - std::cout << "recv desc2" << std::endl; - std::cout << HTTP_S.BuildResponse(HTTP_S.method,HTTP_S.url); - size_t pos = HTTP_S.body.find("m="); - do{ - //finding all track IDs - pos = HTTP_S.body.find("a=control:",pos); - if(pos !=std::string::npos){ - trackIds.push_back(HTTP_S.body.substr(pos+10,HTTP_S.body.find("\r\n",pos)-pos-10 ) );//setting track IDs; - pos++; - } - }while(pos != std::string::npos); - //we have all the tracks - - descRecvd = true; - } - } - - - unsigned int setupsSent = 0; - unsigned int setupsRecvd = 0; - Socket::UDPConnection connectors[trackIds.size()]; - unsigned int setports[trackIds.size()]; - uint32_t bport = 10000; - std::string sessionID = ""; - - std::stringstream setup; - - if(descRecvd && !setupComplete){ - //time to setup. - for(std::vector::iterator it = trackIds.begin();it!=trackIds.end();it++){ - std::cout << "setup " << setupsSent<< std::endl; - while(!connectors[setupsSent].SetConnection( bport,false) ){ - bport +=2;//finding an available port - } - std::cout << "setup" << bport<< std::endl; - setports[setupsSent] = bport; - bport +=2; - if(setupsSent == setupsRecvd){ - //send only one setup - HTTP_S.Clean(); - HTTP_R.protocol="RTSP/1.0"; - HTTP_R.method = "SETUP"; - HTTP_R.url = rtspUrl+ '/' + *(it); - setup << "RTP/AVP/UDP;unicast;client_port="<< setports[setupsSent] <<"-" <= 8){ + for (uint32_t i = 0; i < dataSize; ++i){ + std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)dataPtr[i] << " "; + if (i % 32 == 31){std::cout << std::endl;} } - conn.close();*/ - return 0; + std::cout << std::endl; } - - } - - -int main(int argc, char ** argv){ - Util::Config conf = Util::Config(argv[0]); - conf.addOption("url",JSON::fromString("{\"arg\":\"string\",\"short\":\"u\",\"long\":\"url\",\"help\":\"URL To get.\", \"default\":\"rtsp://localhost/s1k\"}")); - conf.parseArgs(argc, argv); - return RtspRtp::analyseRtspRtp(conf.getString("url")); +AnalyserRTSP::AnalyserRTSP(Util::Config &conf) : Analyser(conf){ + myConn = Socket::Connection(1, 0); + sdpState.myMeta = &myMeta; + sdpState.incomingPacketCallback = incomingPacket; + classPointer = this; } + +bool AnalyserRTSP::isOpen(){ + return myConn; +} + +bool AnalyserRTSP::parsePacket(){ + do{ + // No new data? Sleep and retry, if connection still open + if (!myConn.Received().size() || !myConn.Received().available(1)){ + if (!myConn.spool() && isOpen()){Util::sleep(500);} + continue; + } + if (myConn.Received().copy(1) != "$"){ + // not a TCP RTP packet, read RTSP commands + if (HTTP.Read(myConn)){ + if (HTTP.hasHeader("Content-Type") && HTTP.GetHeader("Content-Type") == "application/sdp"){ + sdpState.parseSDP(HTTP.body); + HTTP.Clean(); + return true; + } + if (HTTP.hasHeader("Transport")){ + uint32_t trackNo = sdpState.parseSetup(HTTP, "", ""); + if (trackNo){ + DETAIL_MED("Parsed transport for track: %lu", trackNo); + }else{ + DETAIL_MED("Could not parse transport string!"); + } + HTTP.Clean(); + return true; + } + + std::cout << HTTP.BuildRequest() << std::endl; + + HTTP.Clean(); + return true; + }else{ + if (!myConn.spool() && isOpen()){Util::sleep(500);} + } + continue; + } + if (!myConn.Received().available(4)){ + if (!myConn.spool() && isOpen()){Util::sleep(500);} + continue; + }// a TCP RTP packet, but not complete yet + + // We have a TCP packet! Read it... + // Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data + std::string tcpHead = myConn.Received().copy(4); + uint16_t len = ntohs(*(short *)(tcpHead.data() + 2)); + if (!myConn.Received().available(len + 4)){ + if (!myConn.spool() && isOpen()){Util::sleep(500);} + continue; + }// a TCP RTP packet, but not complete yet + // remove whole packet from buffer, including 4 byte header + std::string tcpPacket = myConn.Received().remove(len + 4); + RTP::Packet pkt(tcpPacket.data() + 4, len); + uint8_t chan = tcpHead.data()[1]; + uint32_t trackNo = sdpState.getTrackNoForChannel(chan); + DETAIL_HI("Received %ub RTP packet #%u on channel %u, time %llu", len, + (unsigned int)pkt.getSequence(), chan, pkt.getTimeStamp()); + if (!trackNo && (chan % 2) != 1){ + DETAIL_MED("Received packet for unknown track number on channel %u", chan); + } + if (trackNo){ + sdpState.tracks[trackNo].rtpSeq = pkt.getSequence(); + } + + if (detail >= 10){ + char *pl = pkt.getPayload(); + uint32_t payLen = pkt.getPayloadSize(); + for (uint32_t i = 0; i < payLen; ++i){ + std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)pl[i] << " "; + if (i % 32 == 31){std::cout << std::endl;} + } + std::cout << std::endl; + } + + sdpState.handleIncomingRTP(trackNo, pkt); + + return true; + + }while (isOpen()); + + // if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets +} + diff --git a/src/analysers/analyser_rtsp.h b/src/analysers/analyser_rtsp.h index 8d3ff74d..4194d7fe 100644 --- a/src/analysers/analyser_rtsp.h +++ b/src/analysers/analyser_rtsp.h @@ -1,27 +1,23 @@ -#include +#pragma once + #include "analyser.h" -#include -#include -#include -#include -#include -#include +#include #include +#include +#include -class rtpAnalyser : public analysers -{ - Socket::Connection conn; - HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender. - int step; - std::vector tracks; - std::vector connections; - unsigned int trackIt; +class AnalyserRTSP : public Analyser{ +public: + AnalyserRTSP(Util::Config &conf); + static void init(Util::Config &cfg); + bool parsePacket(); + void incoming(const DTSC::Packet &pkt); + bool isOpen(); - public: - rtpAnalyser(Util::Config config); - bool packetReady(); - void PreProcessing(); - //int Analyse(); - int doAnalyse(); - void doValidate(); +private: + Socket::Connection myConn; + HTTP::Parser HTTP; + DTSC::Meta myMeta; + SDP::State sdpState; }; + diff --git a/src/output/output_rtsp.cpp b/src/output/output_rtsp.cpp index be1297ea..667dade4 100644 --- a/src/output/output_rtsp.cpp +++ b/src/output/output_rtsp.cpp @@ -1,19 +1,26 @@ -#include +#include "output_rtsp.h" +#include #include -#include -#include #include #include -#include +#include +#include +#include #include -#include "output_rtsp.h" #include -namespace Mist { +namespace Mist{ - Socket::Connection * mainConn = 0; + Socket::Connection *mainConn = 0; + OutRTSP *classPointer = 0; - OutRTSP::OutRTSP(Socket::Connection & myConn) : Output(myConn){ + /// Helper function for passing packets into the OutRTSP class + void insertPacket(const DTSC::Packet &pkt){classPointer->incomingPacket(pkt);} + + /// Takes incoming packets and buffers them. + void OutRTSP::incomingPacket(const DTSC::Packet &pkt){bufferLivePacket(pkt);} + + OutRTSP::OutRTSP(Socket::Connection &myConn) : Output(myConn){ connectedAt = Util::epoch() + 2208988800ll; pausepoint = 0; setBlocking(false); @@ -22,51 +29,57 @@ namespace Mist { expectTCP = false; lastTimeSync = 0; mainConn = &myConn; + classPointer = this; + sdpState.incomingPacketCallback = insertPacket; + sdpState.myMeta = &myMeta; } - + /// Function used to send RTP packets over UDP ///\param socket A UDP Connection pointer, sent as a void*, to keep portability. ///\param data The RTP Packet that needs to be sent ///\param len The size of data ///\param channel Not used here, but is kept for compatibility with sendTCP - void sendUDP(void * socket, char * data, unsigned int len, unsigned int channel) { - ((Socket::UDPConnection *) socket)->SendNow(data, len); + void sendUDP(void *socket, char *data, unsigned int len, unsigned int channel){ + ((Socket::UDPConnection *)socket)->SendNow(data, len); if (mainConn){mainConn->addUp(len);} } - /// Function used to send RTP packets over TCP ///\param socket A TCP Connection pointer, sent as a void*, to keep portability. ///\param data The RTP Packet that needs to be sent ///\param len The size of data ///\param channel Used to distinguish different data streams when sending RTP over TCP - void sendTCP(void * socket, char * data, unsigned int len, unsigned int channel) { - //1 byte '$', 1 byte channel, 2 bytes length + void sendTCP(void *socket, char *data, unsigned int len, unsigned int channel){ + // 1 byte '$', 1 byte channel, 2 bytes length char buf[] = "$$$$"; buf[1] = channel; - ((short *) buf)[1] = htons(len); - ((Socket::Connection *) socket)->SendNow(buf, 4); - ((Socket::Connection *) socket)->SendNow(data, len); + ((short *)buf)[1] = htons(len); + ((Socket::Connection *)socket)->SendNow(buf, 4); + ((Socket::Connection *)socket)->SendNow(data, len); } - void OutRTSP::init(Util::Config * cfg){ + void OutRTSP::init(Util::Config *cfg){ Output::init(cfg); capa["name"] = "RTSP"; - capa["desc"] = "Provides Real Time Streaming Protocol output, supporting both UDP and TCP transports."; + capa["desc"] = + "Provides Real Time Streaming Protocol output, supporting both UDP and TCP transports."; capa["deps"] = ""; capa["url_rel"] = "/$"; capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][0u].append("HEVC"); + capa["codecs"][0u][0u].append("MPEG2"); capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("MP3"); capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("ALAW"); capa["codecs"][0u][1u].append("PCM"); capa["codecs"][0u][1u].append("opus"); - + capa["codecs"][0u][1u].append("MP2"); + capa["methods"][0u]["handler"] = "rtsp"; capa["methods"][0u]["type"] = "rtsp"; capa["methods"][0u]["priority"] = 2ll; - + capa["optional"]["maxsend"]["name"] = "Max RTP packet size"; capa["optional"]["maxsend"]["help"] = "Maximum size of RTP packets in bytes"; capa["optional"]["maxsend"]["default"] = (long long)RTP::MAX_SEND; @@ -79,92 +92,81 @@ namespace Mist { } void OutRTSP::sendNext(){ - char * dataPointer = 0; + char *dataPointer = 0; unsigned int dataLen = 0; thisPacket.getString("data", dataPointer, dataLen); uint32_t tid = thisPacket.getTrackId(); uint64_t timestamp = thisPacket.getTime(); - - //if we're past the pausing point, seek to it, and pause immediately + + // if we're past the pausing point, seek to it, and pause immediately if (pausepoint && timestamp > pausepoint){ pausepoint = 0; stop(); return; } - if (myMeta.live && lastTimeSync + 666 < timestamp){ lastTimeSync = timestamp; updateMeta(); - DTSC::Track & mainTrk = myMeta.tracks[getMainSelectedTrack()]; + DTSC::Track &mainTrk = myMeta.tracks[getMainSelectedTrack()]; // The extra 2000ms here is for the metadata sync delay. // It can be removed once we get rid of that. - if (timestamp + 2000 + needsLookAhead < mainTrk.keys.rbegin()->getTime() && mainTrk.lastms - mainTrk.keys.rbegin()->getTime() > needsLookAhead){ - INFO_MSG("Skipping forward %llums (%llu ms LA)", mainTrk.keys.rbegin()->getTime() - thisPacket.getTime(), needsLookAhead); + if (timestamp + 2000 + needsLookAhead < mainTrk.keys.rbegin()->getTime() && + mainTrk.lastms - mainTrk.keys.rbegin()->getTime() > needsLookAhead){ + INFO_MSG("Skipping forward %llums (%llu ms LA)", + mainTrk.keys.rbegin()->getTime() - thisPacket.getTime(), needsLookAhead); seek(mainTrk.keys.rbegin()->getTime()); return; } } - - void * socket = 0; + + void *socket = 0; void (*callBack)(void *, char *, unsigned int, unsigned int) = 0; - - if (tracks[tid].channel == -1){//UDP connection - socket = &tracks[tid].data; + + if (sdpState.tracks[tid].channel == -1){// UDP connection + socket = &sdpState.tracks[tid].data; callBack = sendUDP; - if (Util::epoch()/5 != tracks[tid].rtcpSent){ - tracks[tid].rtcpSent = Util::epoch()/5; - tracks[tid].pack.sendRTCP(connectedAt, &tracks[tid].rtcp, tid, myMeta, sendUDP); + if (Util::epoch() / 5 != sdpState.tracks[tid].rtcpSent){ + sdpState.tracks[tid].rtcpSent = Util::epoch() / 5; + sdpState.tracks[tid].pack.sendRTCP(connectedAt, &sdpState.tracks[tid].rtcp, tid, myMeta, + sendUDP); } }else{ socket = &myConn; callBack = sendTCP; } - - if(myMeta.tracks[tid].codec == "H264"){ - long long offset = thisPacket.getInt("offset"); - tracks[tid].pack.setTimestamp(90 * (timestamp + offset)); - unsigned long sent = 0; - while (sent < dataLen) { - unsigned long nalSize = ntohl(*((unsigned long *)(dataPointer + sent))); - tracks[tid].pack.sendH264(socket, callBack, dataPointer + sent + 4, nalSize, tracks[tid].channel); - sent += nalSize + 4; - } - return; - } - //Default packager - tracks[tid].pack.setTimestamp(timestamp * ((double) myMeta.tracks[tid].rate / 1000.0)); - tracks[tid].pack.sendData(socket, callBack, dataPointer, dataLen, tracks[tid].channel,myMeta.tracks[tid].codec); - + uint64_t offset = thisPacket.getInt("offset"); + sdpState.tracks[tid].pack.setTimestamp((timestamp+offset) * SDP::getMultiplier(myMeta.tracks[tid])); + sdpState.tracks[tid].pack.sendData(socket, callBack, dataPointer, dataLen, + sdpState.tracks[tid].channel, myMeta.tracks[tid].codec); } /// This request handler also checks for UDP packets void OutRTSP::requestHandler(){ - if (!expectTCP){ - handleUDP(); - } + if (!expectTCP){handleUDP();} Output::requestHandler(); } void OutRTSP::onRequest(){ RTP::MAX_SEND = config->getInteger("maxsend"); - //if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets + // if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets while ((!expectTCP || handleTCP()) && HTTP_R.Read(myConn)){ - //cancel broken URLs + // cancel broken URLs if (HTTP_R.url.size() < 8){ - WARN_MSG("Invalid data found in RTSP input around ~%llub - disconnecting!", myConn.dataDown()); + WARN_MSG("Invalid data found in RTSP input around ~%llub - disconnecting!", + myConn.dataDown()); myConn.close(); break; } HTTP_S.Clean(); HTTP_S.protocol = "RTSP/1.0"; - - //set the streamname and session + + // set the streamname and session if (!source.size()){ std::string source = HTTP_R.url.substr(7); - unsigned int loc = std::min(source.find(':'),source.find('/')); - source = source.substr(0,loc); + unsigned int loc = std::min(source.find(':'), source.find('/')); + source = source.substr(0, loc); } size_t found = HTTP_R.url.find('/', 7); if (!streamName.size()){ @@ -172,30 +174,29 @@ namespace Mist { Util::sanitizeName(streamName); } if (streamName.size()){ - HTTP_S.SetHeader("Session", Secure::md5(HTTP_S.GetHeader("User-Agent") + getConnectedHost()) + "_" + streamName); + HTTP_S.SetHeader("Session", + Secure::md5(HTTP_S.GetHeader("User-Agent") + getConnectedHost()) + "_" + + streamName); } - - //set the date + + // set the date time_t timer; time(&timer); - struct tm * timeNow = gmtime(&timer); + struct tm *timeNow = gmtime(&timer); char dString[42]; strftime(dString, 42, "%a, %d %h %Y, %X GMT", timeNow); HTTP_S.SetHeader("Date", dString); - - //set the sequence number to match the received sequence number - if (HTTP_R.hasHeader("CSeq")){ - HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("CSeq")); - } - if (HTTP_R.hasHeader("Cseq")){ - HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("Cseq")); - } - + + // set the sequence number to match the received sequence number + if (HTTP_R.hasHeader("CSeq")){HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("CSeq"));} + if (HTTP_R.hasHeader("Cseq")){HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("Cseq"));} + INFO_MSG("Handling %s", HTTP_R.method.c_str()); - //handle the request + // handle the request if (HTTP_R.method == "OPTIONS"){ - HTTP_S.SetHeader("Public", "SETUP, TEARDOWN, PLAY, PAUSE, DESCRIBE, GET_PARAMETER, ANNOUNCE, RECORD"); + HTTP_S.SetHeader("Public", + "SETUP, TEARDOWN, PLAY, PAUSE, DESCRIBE, GET_PARAMETER, ANNOUNCE, RECORD"); HTTP_S.SendResponse("200", "OK", myConn); HTTP_R.Clean(); continue; @@ -210,27 +211,35 @@ namespace Mist { selectedTracks.clear(); std::stringstream transportString; transportString << "v=0\r\n" - "o=- " << Util::getMS() << " 1 IN IP4 127.0.0.1\r\n" - "s=" << streamName << "\r\n" - "c=IN IP4 0.0.0.0\r\n" - "i=" << streamName << "\r\n" - "u=" << HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName << "\r\n" - "t=0 0\r\n" - "a=tool:MistServer\r\n" - "a=type:broadcast\r\n" - "a=control:*\r\n"; + "o=- " + << Util::getMS() << " 1 IN IP4 127.0.0.1\r\n" + "s=" + << streamName << "\r\n" + "c=IN IP4 0.0.0.0\r\n" + "i=" + << streamName << "\r\n" + "u=" + << HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName + << "\r\n" + "t=0 0\r\n" + "a=tool:MistServer\r\n" + "a=type:broadcast\r\n" + "a=control:*\r\n"; if (myMeta.live){ transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-\r\n"; }else{ - transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-" << ((double)endTime()) / 1000.0 << "\r\n"; + transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-" + << ((double)endTime()) / 1000.0 << "\r\n"; } - - for (std::map::iterator objIt = myMeta.tracks.begin(); objIt != myMeta.tracks.end(); ++objIt) { - transportString << tracks[objIt->first].mediaDescription(objIt->second); + + for (std::map::iterator objIt = myMeta.tracks.begin(); + objIt != myMeta.tracks.end(); ++objIt){ + transportString << SDP::mediaDescription(objIt->second); } transportString << "\r\n"; HIGH_MSG("Reply: %s", transportString.str().c_str()); - HTTP_S.SetHeader("Content-Base", HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) + "/" + streamName); + HTTP_S.SetHeader("Content-Base", + HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) + "/" + streamName); HTTP_S.SetHeader("Content-Type", "application/sdp"); HTTP_S.SetBody(transportString.str()); HTTP_S.SendResponse("200", "OK", myConn); @@ -238,95 +247,72 @@ namespace Mist { continue; } if (HTTP_R.method == "SETUP"){ - size_t trackPos = HTTP_R.url.rfind("/track"); - if (trackPos != std::string::npos){ - unsigned int trId = atol(HTTP_R.url.substr(trackPos + 6).c_str()); - if (myMeta.tracks.count(trId)){ - if (tracks[trId].parseTransport(HTTP_R.GetHeader("Transport"), getConnectedHost(), source, myMeta.tracks[trId])){ - selectedTracks.insert(trId); - HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); - HTTP_S.SetHeader("Transport", tracks[trId].transportString); - HTTP_S.SetHeader("Cache-Control", "no-cache"); - HTTP_S.SendResponse("200", "OK", myConn); - }else{ - HTTP_S.SendResponse("404", "Track not available", myConn); - } - HTTP_R.Clean(); - continue; - } + uint32_t trackNo = sdpState.parseSetup(HTTP_R, getConnectedHost(), source); + HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); + HTTP_S.SetHeader("Cache-Control", "no-cache"); + if (trackNo){ + selectedTracks.insert(trackNo); + SDP::Track &sdpTrack = sdpState.tracks[trackNo]; + if (sdpTrack.channel != -1){expectTCP = true;} + HTTP_S.SetHeader("Transport", sdpTrack.transportString); + HTTP_S.SendResponse("200", "OK", myConn); + INFO_MSG("Setup completed for track %lu (%s): %s", trackNo, + myMeta.tracks[trackNo].codec.c_str(), sdpTrack.transportString.c_str()); + }else{ + HTTP_S.SendResponse("404", "Track not known or allowed", myConn); + FAIL_MSG("Could not handle setup for %s", HTTP_R.url.c_str()); } - //might be push setup - check known control points - if (pushing && tracks.size()){ - bool setupHandled = false; - for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ - if (it->second.control.size() && (HTTP_R.url.find(it->second.control) != std::string::npos || HTTP_R.GetVar("pass").find(it->second.control) != std::string::npos)){ - if (it->second.parseTransport(HTTP_R.GetHeader("Transport"), getConnectedHost(), source, myMeta.tracks[it->first])){ - if (it->second.channel != -1){ - expectTCP = true; - } - HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); - HTTP_S.SetHeader("Transport", it->second.transportString); - HTTP_S.SetHeader("Cache-Control", "no-cache"); - HTTP_S.SendResponse("200", "OK", myConn); - INFO_MSG("Setup completed for track %llu (%s): %s", it->first, myMeta.tracks[it->first].codec.c_str(), it->second.transportString.c_str()); - }else{ - HTTP_S.SendResponse("404", "Track not known or allowed", myConn); - } - setupHandled = true; - HTTP_R.Clean(); - break; - } - } - if (!setupHandled){ - HTTP_S.SendResponse("404", "Track not known", myConn); - HTTP_R.Clean(); - } - continue; - } - FAIL_MSG("Could not handle setup: pushing=%s, trackSize=%u", pushing?"true":"false", tracks.size()); + HTTP_R.Clean(); + continue; } if (HTTP_R.method == "PLAY"){ initialSeek(); std::string range = HTTP_R.GetHeader("Range"); if (range != ""){ - range = range.substr(range.find("npt=")+4); - if (!range.empty()) { + range = range.substr(range.find("npt=") + 4); + if (!range.empty()){ range = range.substr(0, range.find('-')); - uint64_t targetPos = 1000*atof(range.c_str()); + uint64_t targetPos = 1000 * atof(range.c_str()); if (targetPos || myMeta.vod){seek(targetPos);} } } std::stringstream rangeStr; if (myMeta.live){ - rangeStr << "npt=" << currentTime()/1000 << "." << std::setw(3) << std::setfill('0') << currentTime()%1000 << "-"; + rangeStr << "npt=" << currentTime() / 1000 << "." << std::setw(3) << std::setfill('0') + << currentTime() % 1000 << "-"; }else{ - rangeStr << "npt=" << currentTime()/1000 << "." << std::setw(3) << std::setfill('0') << currentTime()%1000 << "-" << std::setw(1) << endTime()/1000 << "." << std::setw(3) << std::setfill('0') << endTime()%1000; + rangeStr << "npt=" << currentTime() / 1000 << "." << std::setw(3) << std::setfill('0') + << currentTime() % 1000 << "-" << std::setw(1) << endTime() / 1000 << "." + << std::setw(3) << std::setfill('0') << endTime() % 1000; } HTTP_S.SetHeader("Range", rangeStr.str()); std::stringstream infoString; if (selectedTracks.size()){ - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ + for (std::set::iterator it = selectedTracks.begin(); + it != selectedTracks.end(); ++it){ if (!infoString.str().empty()){infoString << ",";} - infoString << tracks[*it].rtpInfo(myMeta.tracks[*it], source+"/"+streamName, currentTime()); + infoString << sdpState.tracks[*it].rtpInfo(myMeta.tracks[*it], + source + "/" + streamName, currentTime()); } } HTTP_S.SetHeader("RTP-Info", infoString.str()); HTTP_S.SendResponse("200", "OK", myConn); - parseData = true; + parseData = true; HTTP_R.Clean(); continue; } if (HTTP_R.method == "PAUSE"){ HTTP_S.SendResponse("200", "OK", myConn); std::string range = HTTP_R.GetHeader("Range"); - if (!range.empty()){ - range = range.substr(range.find("npt=")+4); - } + if (!range.empty()){range = range.substr(range.find("npt=") + 4);} if (range.empty()){ stop(); }else{ - pausepoint = 1000 * (int) atof(range.c_str()); - if (pausepoint > currentTime()){pausepoint = 0; stop();} + pausepoint = 1000 * (int)atof(range.c_str()); + if (pausepoint > currentTime()){ + pausepoint = 0; + stop(); + } } HTTP_R.Clean(); continue; @@ -343,7 +329,7 @@ namespace Mist { return; } INFO_MSG("Pushing to stream %s", streamName.c_str()); - parseSDP(HTTP_R.body); + sdpState.parseSDP(HTTP_R.body); HTTP_S.SendResponse("200", "OK", myConn); HTTP_R.Clean(); continue; @@ -360,519 +346,84 @@ namespace Mist { /// Disconnects the user bool OutRTSP::onFinish(){ - if (myConn){ - myConn.close(); - } + if (myConn){myConn.close();} return false; } /// Attempts to parse TCP RTP packets at the beginning of the header. /// Returns whether it is safe to attempt to read HTTP/RTSP packets (true) or not (false). bool OutRTSP::handleTCP(){ - if (!myConn.Received().size() || !myConn.Received().available(1)){return false;}//no data - if (myConn.Received().copy(1) != "$"){return true;}//not a TCP RTP packet - if (!myConn.Received().available(4)){return false;}//a TCP RTP packet, but not complete yet - //We have a TCP packet! Read it... - //Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data + if (!myConn.Received().size() || !myConn.Received().available(1)){return false;}// no data + if (myConn.Received().copy(1) != "$"){return true;}// not a TCP RTP packet + if (!myConn.Received().available(4)){return false;}// a TCP RTP packet, but not complete yet + // We have a TCP packet! Read it... + // Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data std::string tcpHead = myConn.Received().copy(4); - uint16_t len = ntohs(*(short*)(tcpHead.data()+2)); - if (!myConn.Received().available(len+4)){return false;}//a TCP RTP packet, but not complete yet - //remove whole packet from buffer, including 4 byte header - std::string tcpPacket = myConn.Received().remove(len+4); - for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ - if (tcpHead.data()[1] == it->second.channel){ - RTP::Packet pkt(tcpPacket.data()+4, len); - it->second.rtpSeq = pkt.getSequence(); - handleIncomingRTP(it->first, pkt); - break; - } + uint16_t len = ntohs(*(short *)(tcpHead.data() + 2)); + if (!myConn.Received().available(len + 4)){ + return false; + }// a TCP RTP packet, but not complete yet + // remove whole packet from buffer, including 4 byte header + std::string tcpPacket = myConn.Received().remove(len + 4); + uint32_t trackNo = sdpState.getTrackNoForChannel(tcpHead.data()[1]); + if (trackNo && isPushing()){ + RTP::Packet pkt(tcpPacket.data() + 4, len); + sdpState.tracks[trackNo].rtpSeq = pkt.getSequence(); + sdpState.handleIncomingRTP(trackNo, pkt); } - //attempt to read more packets + // attempt to read more packets return handleTCP(); } /// Reads and handles RTP packets over UDP, if needed void OutRTSP::handleUDP(){ - for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ - Socket::UDPConnection & s = it->second.data; + if (!isPushing()){return;} + for (std::map::iterator it = sdpState.tracks.begin(); + it != sdpState.tracks.end(); ++it){ + Socket::UDPConnection &s = it->second.data; while (s.Receive()){ if (s.getDestPort() != it->second.cPort){ - //wrong sending port, ignore packet + // wrong sending port, ignore packet continue; } - lastRecv = Util::epoch();//prevent disconnect of idle TCP connection when using UDP + lastRecv = Util::epoch(); // prevent disconnect of idle TCP connection when using UDP myConn.addDown(s.data_len); RTP::Packet pack(s.data, s.data_len); if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();} - //packet is very early - assume dropped after 10 packets + // packet is very early - assume dropped after 10 packets while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -10){ WARN_MSG("Giving up on packet %u", it->second.rtpSeq); ++(it->second.rtpSeq); - //send any buffered packets we may have + // send any buffered packets we may have while (it->second.packBuffer.count(it->second.rtpSeq)){ - handleIncomingRTP(it->first, pack); + sdpState.handleIncomingRTP(it->first, pack); ++(it->second.rtpSeq); } } - //send any buffered packets we may have + // send any buffered packets we may have while (it->second.packBuffer.count(it->second.rtpSeq)){ - handleIncomingRTP(it->first, pack); + sdpState.handleIncomingRTP(it->first, pack); ++(it->second.rtpSeq); } - //packet is slightly early - buffer it + // packet is slightly early - buffer it if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){ INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence()); it->second.packBuffer[pack.getSequence()] = pack; } - //packet is late + // packet is late if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){ - //negative difference? - WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); + // negative difference? + WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", + (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); return; } - //packet is in order + // packet is in order if (it->second.rtpSeq == pack.getSequence()){ - handleIncomingRTP(it->first, pack); + sdpState.handleIncomingRTP(it->first, pack); ++(it->second.rtpSeq); } } } } - - /// Handles a single H264 packet, checking if others are appended at the end in Annex B format. - /// If so, splits them up and calls h264Packet for each. If not, calls it only once for the whole payload. - void OutRTSP::h264MultiParse(uint64_t ts, const uint64_t track, char * buffer, const uint32_t len){ - uint32_t lastStart = 0; - for (uint32_t i = 0; i < len-4; ++i){ - //search for start code - if (buffer[i] == 0 && buffer[i+1] == 0 && buffer[i+2] == 0 && buffer[i+3] == 1){ - //if found, handle a packet from the last start code up to this start code - Bit::htobl(buffer+lastStart, (i-lastStart-1)-4);//size-prepend - h264Packet(ts, track, buffer+lastStart, (i-lastStart-1), h264::isKeyframe(buffer+lastStart+4, i-lastStart-5)); - lastStart = i; - } - } - //Last packet (might be first, if no start codes found) - Bit::htobl(buffer+lastStart, (len-lastStart)-4);//size-prepend - h264Packet(ts, track, buffer+lastStart, (len-lastStart), h264::isKeyframe(buffer+lastStart+4, len-lastStart-4)); - } - - void OutRTSP::h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, bool isKey){ - //Ignore zero-length packets (e.g. only contained init data and nothing else) - if (!len){return;} - - //Header data? Compare to init, set if needed, and throw away - uint8_t nalType = (buffer[4] & 0x1F); - switch (nalType){ - case 7: //SPS - if (tracks[track].spsData.size() != len-4 || memcmp(buffer+4, tracks[track].spsData.data(), len-4) != 0){ - INFO_MSG("Updated SPS from RTP data"); - tracks[track].spsData.assign(buffer+4, len-4); - updateH264Init(track); - } - return; - case 8: //PPS - if (tracks[track].ppsData.size() != len-4 || memcmp(buffer+4, tracks[track].ppsData.data(), len-4) != 0){ - INFO_MSG("Updated PPS from RTP data"); - tracks[track].ppsData.assign(buffer+4, len-4); - updateH264Init(track); - } - return; - default://others, continue parsing - break; - } - - double fps = tracks[track].fpsMeta; - uint32_t offset = 0; - uint64_t newTs = ts; - if (fps > 1){ - //Assume a steady frame rate, clip the timestamp based on frame number. - uint64_t frameNo = (ts / (1000.0/fps))+0.5; - while (frameNo < tracks[track].packCount){ - tracks[track].packCount--; - } - //More than 32 frames behind? We probably skipped something, somewhere... - if ((frameNo-tracks[track].packCount) > 32){ - tracks[track].packCount = frameNo; - } - //After some experimentation, we found that the time offset is the difference between the frame number and the packet counter, times the frame rate in ms - offset = (frameNo-tracks[track].packCount) * (1000.0/fps); - //... and the timestamp is the packet counter times the frame rate in ms. - newTs = tracks[track].packCount * (1000.0/fps); - VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts, isKey?"key":"i", frameNo, fps, tracks[track].packCount, (frameNo-tracks[track].packCount), offset); - }else{ - //For non-steady frame rate, assume no offsets are used and the timestamp is already correct - VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey?"key":"i", tracks[track].packCount); - } - //Fill the new DTSC packet, buffer it. - DTSC::Packet nextPack; - nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey); - tracks[track].packCount++; - bufferLivePacket(nextPack); - } - - /// Handles RTP packets generically, for both TCP and UDP-based connections. - /// In case of UDP, expects packets to be pre-sorted. - void OutRTSP::handleIncomingRTP(const uint64_t track, const RTP::Packet & pkt){ - if (!tracks[track].firstTime){ - tracks[track].firstTime = pkt.getTimeStamp() + 1; - } - if (myMeta.tracks[track].codec == "ALAW" || myMeta.tracks[track].codec == "opus" || myMeta.tracks[track].codec == "MP3" || myMeta.tracks[track].codec == "PCM"){ - char * pl = pkt.getPayload(); - DTSC::Packet nextPack; - nextPack.genericFill((pkt.getTimeStamp() - tracks[track].firstTime + 1) / ((double)myMeta.tracks[track].rate / 1000.0), 0, track, pl, pkt.getPayloadSize(), 0, false); - bufferLivePacket(nextPack); - return; - } - if (myMeta.tracks[track].codec == "AAC"){ - //assume AAC packets are single AU units - /// \todo Support other input than single AU units - char * pl = pkt.getPayload(); - unsigned int headLen = (Bit::btohs(pl) >> 3) + 2;//in bits, so /8, plus two for the prepended size - DTSC::Packet nextPack; - uint16_t samples = aac::AudSpecConf::samples(myMeta.tracks[track].init); - uint32_t sampleOffset = 0; - uint32_t offset = 0; - uint32_t auSize = 0; - for (uint32_t i = 2; i < headLen; i += 2){ - auSize = Bit::btohs(pl+i) >> 3;//only the upper 13 bits - nextPack.genericFill((pkt.getTimeStamp() + sampleOffset - tracks[track].firstTime + 1) / ((double)myMeta.tracks[track].rate / 1000.0), 0, track, pl+headLen+offset, std::min(auSize, pkt.getPayloadSize() - headLen - offset), 0, false); - offset += auSize; - sampleOffset += samples; - bufferLivePacket(nextPack); - } - return; - } - if (myMeta.tracks[track].codec == "H264"){ - //Handles common H264 packets types, but not all. - //Generalizes and converts them all to a data format ready for DTSC, then calls h264Packet for that data. - //Prints a WARN-level message if packet type is unsupported. - /// \todo Support other H264 packets types? - char * pl = pkt.getPayload(); - if (!pkt.getPayloadSize()){ - WARN_MSG("Empty packet ignored!"); - return; - } - if ((pl[0] & 0x1F) == 0){ - WARN_MSG("H264 packet type null ignored"); - return; - } - if ((pl[0] & 0x1F) < 24){ - DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F)); - static char * packBuffer = 0; - static unsigned long packBufferSize = 0; - unsigned long len = pkt.getPayloadSize(); - if (packBufferSize < len+4){ - char * tmp = (char*)realloc(packBuffer, len+4); - if (tmp){ - packBuffer = tmp; - packBufferSize = len+4; - }else{ - free(packBuffer); - packBufferSize = 0; - packBuffer = 0; - FAIL_MSG("Failed to allocate memory for H264 packet"); - return; - } - } - Bit::htobl(packBuffer, len);//size-prepend - memcpy(packBuffer+4, pl, len); - h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len+4, h264::isKeyframe(packBuffer+4, len)); - return; - } - if ((pl[0] & 0x1F) == 24){ - DONTEVEN_MSG("H264 STAP-A packet"); - unsigned int len = 0; - unsigned int pos = 1; - while (pos + 1 < pkt.getPayloadSize()){ - unsigned int pLen = Bit::btohs(pl+pos); - INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos+2] & 0x1F)); - pos += 2+pLen; - len += 4+pLen; - } - static char * packBuffer = 0; - static unsigned long packBufferSize = 0; - if (packBufferSize < len){ - char * tmp = (char*)realloc(packBuffer, len); - if (tmp){ - packBuffer = tmp; - packBufferSize = len; - }else{ - free(packBuffer); - packBufferSize = 0; - packBuffer = 0; - FAIL_MSG("Failed to allocate memory for H264 STAP-A packet"); - return; - } - } - pos = 1; - len = 0; - bool isKey = false; - while (pos + 1 < pkt.getPayloadSize()){ - unsigned int pLen = Bit::btohs(pl+pos); - isKey |= h264::isKeyframe(pl+pos+2, pLen); - Bit::htobl(packBuffer+len, pLen);//size-prepend - memcpy(packBuffer+len+4, pl+pos+2, pLen); - len += 4+pLen; - pos += 2+pLen; - } - h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len, isKey); - return; - } - if ((pl[0] & 0x1F) == 28){ - DONTEVEN_MSG("H264 FU-A packet"); - static char * fuaBuffer = 0; - static unsigned long fuaBufferSize = 0; - static unsigned long fuaCurrLen = 0; - - //No length yet? Check for start bit. Ignore rest. - if (!fuaCurrLen && (pkt.getPayload()[1] & 0x80) == 0){ - HIGH_MSG("Not start of a new FU-A - throwing away"); - return; - } - if (fuaCurrLen && ((pkt.getPayload()[1] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){ - WARN_MSG("Ending unfinished FU-A"); - INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaCurrLen); - uint8_t nalType = (fuaBuffer[4] & 0x1F); - if (nalType == 7 || nalType == 8){ - //attempt to detect multiple H264 packets, even though specs disallow it - h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen); - }else{ - Bit::htobl(fuaBuffer, fuaCurrLen-4);//size-prepend - fuaBuffer[4] |= 0x80;//set error bit - h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen, h264::isKeyframe(fuaBuffer+4, fuaCurrLen-4)); - } - fuaCurrLen = 0; - return; - } - - unsigned long len = pkt.getPayloadSize() - 2;//ignore the two FU-A bytes in front - if (!fuaCurrLen){len += 5;}//five extra bytes for the first packet - - - if (fuaBufferSize < fuaCurrLen + len){ - char * tmp = (char*)realloc(fuaBuffer, fuaCurrLen + len); - if (tmp){ - fuaBuffer = tmp; - fuaBufferSize = fuaCurrLen + len; - }else{ - free(fuaBuffer); - fuaBufferSize = 0; - fuaBuffer = 0; - FAIL_MSG("Failed to allocate memory for H264 FU-A packet"); - return; - } - } - - if (fuaCurrLen == 0){ - memcpy(fuaBuffer+4, pkt.getPayload()+1, pkt.getPayloadSize()-1); - //reconstruct first byte - fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pkt.getPayload()[0] & 0xE0); - }else{ - memcpy(fuaBuffer+fuaCurrLen, pkt.getPayload()+2, pkt.getPayloadSize()-2); - } - fuaCurrLen += len; - - if (pkt.getPayload()[1] & 0x40){//last packet - INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F), fuaCurrLen); - uint8_t nalType = (fuaBuffer[4] & 0x1F); - if (nalType == 7 || nalType == 8){ - //attempt to detect multiple H264 packets, even though specs disallow it - h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen); - }else{ - Bit::htobl(fuaBuffer, fuaCurrLen-4);//size-prepend - h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen, h264::isKeyframe(fuaBuffer+4, fuaCurrLen-4)); - } - fuaCurrLen = 0; - } - return; - } - WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F)); - return; - } - } - - /// Calculates H264 track metadata from sps and pps data stored in tracks[trackNo] - void OutRTSP::updateH264Init(uint64_t trackNo){ - DTSC::Track & Trk = myMeta.tracks[trackNo]; - RTPTrack & RTrk = tracks[trackNo]; - h264::sequenceParameterSet sps(RTrk.spsData.data(), RTrk.spsData.size()); - h264::SPSMeta hMeta = sps.getCharacteristics(); - MP4::AVCC avccBox; - avccBox.setVersion(1); - avccBox.setProfile(RTrk.spsData[1]); - avccBox.setCompatibleProfiles(RTrk.spsData[2]); - avccBox.setLevel(RTrk.spsData[3]); - avccBox.setSPSNumber(1); - avccBox.setSPS(RTrk.spsData); - avccBox.setPPSNumber(1); - avccBox.setPPS(RTrk.ppsData); - RTrk.fpsMeta = hMeta.fps; - Trk.width = hMeta.width; - Trk.height = hMeta.height; - Trk.fpks = hMeta.fps * 1000; - Trk.init = std::string(avccBox.payload(), avccBox.payloadSize()); - } - - void OutRTSP::parseSDP(const std::string & sdp){ - std::stringstream ss(sdp); - std::string to; - uint64_t trackNo = 0; - bool nope = true; //true if we have no valid track to fill - DTSC::Track * thisTrack = 0; - while(std::getline(ss,to,'\n')){ - if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size()-1, 1);} - - // All tracks start with a media line - if (to.substr(0,2) == "m="){ - nope = true; - ++trackNo; - thisTrack = &(myMeta.tracks[trackNo]); - std::stringstream words(to.substr(2)); - std::string item; - if (getline(words, item, ' ') && (item == "audio" || item == "video")){ - thisTrack->type = item; - thisTrack->trackID = trackNo; - }else{ - WARN_MSG("Media type not supported: %s", item.c_str()); - continue; - } - getline(words, item, ' '); - if (!getline(words, item, ' ') || item != "RTP/AVP"){ - WARN_MSG("Media transport not supported: %s", item.c_str()); - continue; - } - if (getline(words, item, ' ')){ - uint64_t avp_type = JSON::Value(item).asInt(); - switch (avp_type){ - case 8: //PCM A-law - INFO_MSG("PCM A-law payload type"); - nope = false; - thisTrack->codec = "ALAW"; - thisTrack->rate = 8000; - thisTrack->channels = 1; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); - break; - case 10: //PCM Stereo, 44.1kHz - INFO_MSG("Linear PCM stereo 44.1kHz payload type"); - nope = false; - thisTrack->codec = "PCM"; - thisTrack->size = 16; - thisTrack->rate = 44100; - thisTrack->channels = 2; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); - break; - case 11: //PCM Mono, 44.1kHz - INFO_MSG("Linear PCM mono 44.1kHz payload type"); - nope = false; - thisTrack->codec = "PCM"; - thisTrack->rate = 44100; - thisTrack->size = 16; - thisTrack->channels = 1; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); - break; - default: - //dynamic type - if (avp_type >= 96 && avp_type <= 127){ - INFO_MSG("Dynamic payload type (%llu) detected", avp_type); - nope = false; - continue; - }else{ - FAIL_MSG("Payload type %llu not supported!", avp_type); - continue; - } - } - } - continue; - } - if (nope){continue;}//ignore lines if we have no valid track - // RTP mapping - if (to.substr(0, 8) == "a=rtpmap"){ - std::string mediaType = to.substr(to.find(' ', 8)+1); - std::string trCodec = mediaType.substr(0, mediaType.find('/')); - //convert to fullcaps - for(unsigned int i=0;i=97){trCodec[i]-=32;} - } - if (thisTrack->type == "audio"){ - std::string extraInfo = mediaType.substr(mediaType.find('/')+1); - if (extraInfo.find('/') != std::string::npos){ - size_t lastSlash = extraInfo.find('/'); - thisTrack->rate = atoll(extraInfo.substr(0, lastSlash).c_str()); - thisTrack->channels = atoll(extraInfo.substr(lastSlash+1).c_str()); - }else{ - thisTrack->rate = atoll(extraInfo.c_str()); - thisTrack->channels = 1; - } - } - if (trCodec == "H264"){thisTrack->codec = "H264";} - if (trCodec == "OPUS"){ - thisTrack->codec = "opus"; - thisTrack->init = std::string("OpusHead\001\002\170\000\200\273\000\000\000\000\000", 19); - } - if (trCodec == "PCMA"){thisTrack->codec = "ALAW";} - if (trCodec == "L8"){ - thisTrack->codec = "PCM"; - thisTrack->size = 8; - } - if (trCodec == "L16"){ - thisTrack->codec = "PCM"; - thisTrack->size = 16; - } - if (trCodec == "L20"){ - thisTrack->codec = "PCM"; - thisTrack->size = 20; - } - if (trCodec == "L24"){ - thisTrack->codec = "PCM"; - thisTrack->size = 24; - } - if (trCodec == "MPEG4-GENERIC"){thisTrack->codec = "AAC";} - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); - continue; - } - if (to.substr(0, 10) == "a=control:"){ - tracks[trackNo].control = to.substr(10); - continue; - } - if (to.substr(0, 7) == "a=fmtp:"){ - tracks[trackNo].fmtp = to.substr(7); - if (thisTrack->codec == "AAC"){ - if (tracks[trackNo].getParamString("mode") != "AAC-hbr"){ - //a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=120856E500 - FAIL_MSG("AAC transport mode not supported: %s", tracks[trackNo].getParamString("mode").c_str()); - nope = true; - myMeta.tracks.erase(trackNo); - tracks.erase(trackNo); - continue; - } - thisTrack->init = Encodings::Hex::decode(tracks[trackNo].getParamString("config")); - //myMeta.tracks[trackNo].rate = aac::AudSpecConf::rate(myMeta.tracks[trackNo].init); - - } - if (thisTrack->codec == "H264"){ - //a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z0LAHtkA2D3m//AUABqxAAADAAEAAAMAMg8WLkg=,aMuDyyA=; profile-level-id=42C01E - std::string sprop = tracks[trackNo].getParamString("sprop-parameter-sets"); - size_t comma = sprop.find(','); - tracks[trackNo].spsData = Encodings::Base64::decode(sprop.substr(0,comma)); - tracks[trackNo].ppsData = Encodings::Base64::decode(sprop.substr(comma+1)); - updateH264Init(trackNo); - } - continue; - } - // We ignore bandwidth lines - if (to.substr(0,2) == "b="){ - continue; - } - //we ignore everything before the first media line. - if (!trackNo){ - continue; - } - //at this point, the data is definitely for a track - INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str()); - } - - } - } diff --git a/src/output/output_rtsp.h b/src/output/output_rtsp.h index dca61b0d..4a76bfe0 100644 --- a/src/output/output_rtsp.h +++ b/src/output/output_rtsp.h @@ -1,214 +1,36 @@ #pragma once #include "output.h" -#include -#include -#include -#include #include +#include +#include +#include +#include -namespace Mist { - ///Structure used to keep track of selected tracks. - class RTPTrack { - public: - Socket::UDPConnection data; - Socket::UDPConnection rtcp; - RTP::Packet pack; - long long rtcpSent; - uint64_t firstTime; - int channel;/// Channel number, used in TCP sending - uint64_t packCount; - uint16_t rtpSeq; - std::map packBuffer; - uint32_t cPort; - std::string transportString; - std::string control; - std::string fmtp; - std::string spsData; - std::string ppsData; - uint64_t fpsTime; - double fpsMeta; - double fps; - RTPTrack(){ - rtcpSent = 0; - channel = -1; - firstTime = 0; - packCount = 0; - cPort = 0; - rtpSeq = 0; - fpsTime = 0; - fpsMeta = 0; - fps = 0; - } - std::string getParamString(const std::string & param) const{ - if (!fmtp.size()){return "";} - size_t pos = fmtp.find(param); - if (pos == std::string::npos){return "";} - pos += param.size()+1; - size_t ePos = fmtp.find_first_of(" ;", pos); - return fmtp.substr(pos, ePos-pos); - } - uint64_t getParamInt(const std::string & param) const{ - return atoll(getParamString(param).c_str()); - } - std::string mediaDescription(const DTSC::Track & trk){ - std::stringstream mediaDesc; - if (trk.codec == "H264") { - MP4::AVCC avccbox; - avccbox.setPayload(trk.init); - mediaDesc << "m=video 0 RTP/AVP 97\r\n" - "a=rtpmap:97 H264/90000\r\n" - "a=cliprect:0,0," << trk.height << "," << trk.width << "\r\n" - "a=framesize:97 " << trk.width << '-' << trk.height << "\r\n" - "a=fmtp:97 packetization-mode=1;profile-level-id=" - << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[1] << std::dec << "E0" - << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[3] << std::dec << ";" - "sprop-parameter-sets=" - << Encodings::Base64::encode(std::string(avccbox.getSPS(), avccbox.getSPSLen())) - << "," - << Encodings::Base64::encode(std::string(avccbox.getPPS(), avccbox.getPPSLen())) - << "\r\n" - "a=framerate:" << ((double)trk.fpks)/1000.0 << "\r\n" - "a=control:track" << trk.trackID << "\r\n"; - } else if (trk.codec == "AAC") { - mediaDesc << "m=audio 0 RTP/AVP 96" << "\r\n" - "a=rtpmap:96 mpeg4-generic/" << trk.rate << "/" << trk.channels << "\r\n" - "a=fmtp:96 streamtype=5; profile-level-id=15; config="; - for (unsigned int i = 0; i < trk.init.size(); i++) { - mediaDesc << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init[i] << std::dec; - } - //these values are described in RFC 3640 - mediaDesc << "; mode=AAC-hbr; SizeLength=13; IndexLength=3; IndexDeltaLength=3;\r\n" - "a=control:track" << trk.trackID << "\r\n"; - }else if (trk.codec == "MP3") { - mediaDesc << "m=" << trk.type << " 0 RTP/AVP 14" << "\r\n" - "a=rtpmap:14 MPA/90000/" << trk.channels << "\r\n" - "a=control:track" << trk.trackID << "\r\n"; - }else if ( trk.codec == "AC3") { - mediaDesc << "m=audio 0 RTP/AVP 100" << "\r\n" - "a=rtpmap:100 AC3/" << trk.rate << "/" << trk.channels << "\r\n" - "a=control:track" << trk.trackID << "\r\n"; - }else if ( trk.codec == "ALAW") { - if (trk.channels == 1 && trk.rate == 8000){ - mediaDesc << "m=audio 0 RTP/AVP 8" << "\r\n"; - }else{ - mediaDesc << "m=audio 0 RTP/AVP 101" << "\r\n"; - mediaDesc << "a=rtpmap:101 PCMA/" << trk.rate << "/" << trk.channels << "\r\n"; - } - mediaDesc << "a=control:track" << trk.trackID << "\r\n"; - }else if ( trk.codec == "PCM") { - if (trk.size == 16 && trk.channels == 2 && trk.rate == 44100){ - mediaDesc << "m=audio 0 RTP/AVP 10" << "\r\n"; - } else if (trk.size == 16 && trk.channels == 1 && trk.rate == 44100){ - mediaDesc << "m=audio 0 RTP/AVP 11" << "\r\n"; - }else{ - mediaDesc << "m=audio 0 RTP/AVP 103" << "\r\n"; - mediaDesc << "a=rtpmap:103 L" << trk.size << "/" << trk.rate << "/" << trk.channels << "\r\n"; - } - mediaDesc << "a=control:track" << trk.trackID << "\r\n"; - }else if ( trk.codec == "opus") { - mediaDesc << "m=audio 0 RTP/AVP 102" << "\r\n" - "a=rtpmap:102 opus/" << trk.rate << "/" << trk.channels << "\r\n" - "a=control:track" << trk.trackID << "\r\n"; - } - return mediaDesc.str(); - } - bool parseTransport(const std::string & transport, const std::string & host, const std::string & source, const DTSC::Track & trk){ - unsigned int SSrc = rand(); - if (trk.codec == "H264") { - pack = RTP::Packet(97, 1, 0, SSrc); - }else if(trk.codec == "AAC"){ - pack = RTP::Packet(96, 1, 0, SSrc); - }else if(trk.codec == "AC3"){ - pack = RTP::Packet(100, 1, 0, SSrc); - }else if(trk.codec == "MP3"){ - pack = RTP::Packet(14, 1, 0, SSrc); - }else if(trk.codec == "ALAW"){ - if (trk.channels == 1 && trk.rate == 8000){ - pack = RTP::Packet(8, 1, 0, SSrc); - }else{ - pack = RTP::Packet(101, 1, 0, SSrc); - } - }else if ( trk.codec == "PCM") { - if (trk.size == 16 && trk.channels == 2 && trk.rate == 44100){ - pack = RTP::Packet(10, 1, 0, SSrc); - } else if (trk.size == 16 && trk.channels == 1 && trk.rate == 44100){ - pack = RTP::Packet(11, 1, 0, SSrc); - }else{ - pack = RTP::Packet(103, 1, 0, SSrc); - } - }else if(trk.codec == "opus"){ - pack = RTP::Packet(102, 1, 0, SSrc); - }else{ - ERROR_MSG("Unsupported codec %s for RTSP on track %u", trk.codec.c_str(), trk.trackID); - return false; - } - if (transport.find("TCP") != std::string::npos) { - std::string chanE = transport.substr(transport.find("interleaved=") + 12, (transport.size() - transport.rfind('-') - 1)); //extract channel ID - channel = atol(chanE.c_str()); - rtcpSent = 0; - transportString = transport; - } else { - channel = -1; - size_t port_loc = transport.rfind("client_port=") + 12; - cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str()); - uint32_t portA, portB; - //find available ports locally; - int sendbuff = 4*1024*1024; - data.SetDestination(host, cPort); - portA = data.bind(0); - setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); - rtcp.SetDestination(host, cPort + 1); - portB = rtcp.bind(0); - setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); - std::stringstream tStr; - tStr << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";"; - if (source.size()){ - tStr << "source=" << source << ";"; - } - tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << SSrc << std::dec; - transportString = tStr.str(); - INFO_MSG("Transport string: %s", transportString.c_str()); - } - return true; - } - std::string rtpInfo(const DTSC::Track & trk, const std::string & source, uint64_t currentTime){ - unsigned int timeMultiplier = 1; - timeMultiplier = ((double)trk.rate / 1000.0); - if (trk.codec == "H264") { - timeMultiplier = 90; - } - std::stringstream rInfo; - rInfo << "url=" << source << "/track" << trk.trackID << ";"; //get the current url, not localhost - rInfo << "sequence=" << pack.getSequence() << ";rtptime=" << currentTime * timeMultiplier; - return rInfo.str(); - } - }; - - class OutRTSP : public Output { - public: - OutRTSP(Socket::Connection & myConn); - static void init(Util::Config * cfg); - void sendNext(); - void onRequest(); - void requestHandler(); - bool onFinish(); - private: - void parseSDP(const std::string & sdp); - long long connectedAt;///< The timestamp the connection was made, as reference point for RTCP packets. - std::map tracks;///< List of selected tracks with RTSP-specific session data. - unsigned int pausepoint;///< Position to pause at, when reached - HTTP::Parser HTTP_R, HTTP_S; - std::string source; - uint64_t lastTimeSync; - bool expectTCP; - bool handleTCP(); - void handleUDP(); - void handleIncomingRTP(const uint64_t track, const RTP::Packet & pkt); - void h264MultiParse(uint64_t ts, const uint64_t track, char * buffer, const uint32_t len); - void h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, bool isKey); - void updateH264Init(uint64_t trackNo); +namespace Mist{ + class OutRTSP : public Output{ + public: + OutRTSP(Socket::Connection &myConn); + static void init(Util::Config *cfg); + void sendNext(); + void onRequest(); + void requestHandler(); + bool onFinish(); + void incomingPacket(const DTSC::Packet &pkt); + + private: + long long connectedAt; ///< The timestamp the connection was made, as reference point for RTCP + ///packets. + unsigned int pausepoint; ///< Position to pause at, when reached + SDP::State sdpState; + HTTP::Parser HTTP_R, HTTP_S; + std::string source; + uint64_t lastTimeSync; + bool expectTCP; + bool handleTCP(); + void handleUDP(); }; } typedef Mist::OutRTSP mistOut; +