RTP cleanup + RTSP analyser + RTP MPEG2/MP2/HEVC support

This commit is contained in:
Thulinma 2017-05-21 13:52:14 +02:00
parent f5553f49f9
commit 54a8803f29
9 changed files with 1651 additions and 1216 deletions

View file

@ -159,6 +159,7 @@ set(libHeaders
${SOURCE_DIR}/lib/rijndael.h ${SOURCE_DIR}/lib/rijndael.h
${SOURCE_DIR}/lib/rtmpchunks.h ${SOURCE_DIR}/lib/rtmpchunks.h
${SOURCE_DIR}/lib/rtp.h ${SOURCE_DIR}/lib/rtp.h
${SOURCE_DIR}/lib/sdp.h
${SOURCE_DIR}/lib/shared_memory.h ${SOURCE_DIR}/lib/shared_memory.h
${SOURCE_DIR}/lib/socket.h ${SOURCE_DIR}/lib/socket.h
${SOURCE_DIR}/lib/stream.h ${SOURCE_DIR}/lib/stream.h
@ -208,6 +209,7 @@ set(libSources
${SOURCE_DIR}/lib/rijndael.cpp ${SOURCE_DIR}/lib/rijndael.cpp
${SOURCE_DIR}/lib/rtmpchunks.cpp ${SOURCE_DIR}/lib/rtmpchunks.cpp
${SOURCE_DIR}/lib/rtp.cpp ${SOURCE_DIR}/lib/rtp.cpp
${SOURCE_DIR}/lib/sdp.cpp
${SOURCE_DIR}/lib/shared_memory.cpp ${SOURCE_DIR}/lib/shared_memory.cpp
${SOURCE_DIR}/lib/socket.cpp ${SOURCE_DIR}/lib/socket.cpp
${SOURCE_DIR}/lib/stream.cpp ${SOURCE_DIR}/lib/stream.cpp
@ -289,6 +291,7 @@ makeAnalyser(MP4 mp4) #LTS
makeAnalyser(H264 h264) #LTS makeAnalyser(H264 h264) #LTS
makeAnalyser(HLS hls) #LTS makeAnalyser(HLS hls) #LTS
makeAnalyser(RIFF riff) #LTS makeAnalyser(RIFF riff) #LTS
makeAnalyser(RTSP rtsp) #LTS
#LTS_START #LTS_START
######################################## ########################################

View file

@ -1,132 +1,227 @@
#include <arpa/inet.h>
#include "rtp.h" #include "rtp.h"
#include "timing.h"
#include "defines.h" #include "defines.h"
#include "encode.h"
#include "timing.h"
#include "bitfields.h"
#include "mpeg.h"
#include <arpa/inet.h>
namespace RTP{
namespace RTP {
double Packet::startRTCP = 0; double Packet::startRTCP = 0;
unsigned int MAX_SEND = 1500-28; unsigned int MAX_SEND = 1500 - 28;
unsigned int Packet::getHsize() const { unsigned int Packet::getHsize() const{return 12 + 4 * getContribCount();}
return 12 + 4 * getContribCount();
}
unsigned int Packet::getPayloadSize() const { unsigned int Packet::getPayloadSize() const{return datalen - getHsize();}
return datalen - getHsize();
}
char * Packet::getPayload() const { char *Packet::getPayload() const{return data + getHsize();}
return data + getHsize();
}
unsigned int Packet::getVersion() const { unsigned int Packet::getVersion() const{return (data[0] >> 6) & 0x3;}
return (data[0] >> 6) & 0x3;
}
unsigned int Packet::getPadding() const { unsigned int Packet::getPadding() const{return (data[0] >> 5) & 0x1;}
return (data[0] >> 5) & 0x1;
}
unsigned int Packet::getExtension() const { unsigned int Packet::getExtension() const{return (data[0] >> 4) & 0x1;}
return (data[0] >> 4) & 0x1;
}
unsigned int Packet::getContribCount() const { unsigned int Packet::getContribCount() const{return (data[0]) & 0xE;}
return (data[0]) & 0xE;
}
unsigned int Packet::getMarker() const { unsigned int Packet::getMarker() const{return (data[1] >> 7) & 0x1;}
return (data[1] >> 7) & 0x1;
}
unsigned int Packet::getPayloadType() const { unsigned int Packet::getPayloadType() const{return (data[1]) & 0x7F;}
return (data[1]) & 0x7F;
}
unsigned int Packet::getSequence() const { unsigned int Packet::getSequence() const{return (((((unsigned int)data[2]) << 8) + data[3]));}
return (((((unsigned int)data[2]) << 8) + data[3]));
}
unsigned int Packet::getTimeStamp() const { unsigned int Packet::getTimeStamp() const{return ntohl(*((unsigned int *)(data + 4)));}
return ntohl(*((unsigned int *)(data + 4)));
}
unsigned int Packet::getSSRC() const { unsigned int Packet::getSSRC() const{return ntohl(*((unsigned int *)(data + 8)));}
return ntohl(*((unsigned int *)(data + 8)));
}
char * Packet::getData() { char *Packet::getData(){return data + 8 + 4 * getContribCount() + getExtension();}
return data + 8 + 4 * getContribCount() + getExtension();
}
void Packet::setTimestamp(unsigned int t) { void Packet::setTimestamp(unsigned int t){*((unsigned int *)(data + 4)) = htonl(t);}
*((unsigned int *)(data + 4)) = htonl(t);
}
void Packet::setSequence(unsigned int seq) { void Packet::setSequence(unsigned int seq){*((short *)(data + 2)) = htons(seq);}
*((short *)(data + 2)) = htons(seq);
}
void Packet::setSSRC(unsigned long ssrc) { void Packet::setSSRC(unsigned long ssrc){*((int *)(data + 8)) = htonl(ssrc);}
*((int *)(data + 8)) = htonl(ssrc);
}
void Packet::increaseSequence() { void Packet::increaseSequence(){*((short *)(data + 2)) = htons(getSequence() + 1);}
*((short *)(data + 2)) = htons(getSequence() + 1);
}
void Packet::sendH264(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel) { void Packet::sendH264(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel){
/// \todo This function probably belongs in DMS somewhere. /// \todo This function probably belongs in DMS somewhere.
if (payloadlen+getHsize()+2 <= maxDataLen) { if (payloadlen + getHsize() + 2 <= maxDataLen){
data[1] |= 0x80;//setting the RTP marker bit to 1 data[1] |= 0x80; // setting the RTP marker bit to 1
memcpy(data + getHsize(), payload, payloadlen); memcpy(data + getHsize(), payload, payloadlen);
callBack(socket, data, getHsize() + payloadlen, channel); callBack(socket, data, getHsize() + payloadlen, channel);
sentPackets++; sentPackets++;
sentBytes += payloadlen+getHsize(); sentBytes += payloadlen + getHsize();
increaseSequence(); increaseSequence();
} else { }else{
data[1] &= 0x7F;//setting the RTP marker bit to 0 data[1] &= 0x7F; // setting the RTP marker bit to 0
unsigned int sent = 0; unsigned int sent = 0;
unsigned int sending = maxDataLen-getHsize()-2;//packages are of size MAX_SEND, except for the final one unsigned int sending =
maxDataLen - getHsize() - 2; // packages are of size MAX_SEND, except for the final one
char initByte = (payload[0] & 0xE0) | 0x1C; char initByte = (payload[0] & 0xE0) | 0x1C;
char serByte = payload[0] & 0x1F; //ser is now 000 char serByte = payload[0] & 0x1F; // ser is now 000
data[getHsize()] = initByte; data[getHsize()] = initByte;
while (sent < payloadlen) { while (sent < payloadlen){
if (sent == 0) { if (sent == 0){
serByte |= 0x80;//set first bit to 1 serByte |= 0x80; // set first bit to 1
} else { }else{
serByte &= 0x7F;//set first bit to 0 serByte &= 0x7F; // set first bit to 0
} }
if (sent + sending >= payloadlen) { if (sent + sending >= payloadlen){
//last package // last package
serByte |= 0x40; serByte |= 0x40;
sending = payloadlen - sent; sending = payloadlen - sent;
data[1] |= 0x80;//setting the RTP marker bit to 1 data[1] |= 0x80; // setting the RTP marker bit to 1
} }
data[getHsize() + 1] = serByte; data[getHsize() + 1] = serByte;
memcpy(data + getHsize() + 2, payload + 1 + sent, sending); memcpy(data + getHsize() + 2, payload + 1 + sent, sending);
callBack(socket, data, getHsize() + 2 + sending, channel); callBack(socket, data, getHsize() + 2 + sending, channel);
sentPackets++; sentPackets++;
sentBytes += sending+getHsize()+2; sentBytes += sending + getHsize() + 2;
sent += sending; sent += sending;
increaseSequence(); increaseSequence();
} }
} }
} }
void Packet::sendData(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel, std::string codec) { void Packet::sendH265(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel){
/// \todo This function probably belongs in DMS somewhere. /// \todo This function probably belongs in DMS somewhere.
data[1] |= 0x80;//setting the RTP marker bit to 1 if (payloadlen + getHsize() + 3 <= maxDataLen){
data[1] |= 0x80; // setting the RTP marker bit to 1
memcpy(data + getHsize(), payload, payloadlen);
callBack(socket, data, getHsize() + payloadlen, channel);
sentPackets++;
sentBytes += payloadlen + getHsize();
increaseSequence();
}else{
data[1] &= 0x7F; // setting the RTP marker bit to 0
unsigned int sent = 0;
unsigned int sending =
maxDataLen - getHsize() - 3; // packages are of size MAX_SEND, except for the final one
char initByteA = (payload[0] & 0x81) | 0x62;
char initByteB = payload[1];
char serByte = (payload[0] & 0x7E) >> 1; // SE is now 00
data[getHsize()] = initByteA;
data[getHsize()+1] = initByteB;
while (sent < payloadlen){
if (sent == 0){
serByte |= 0x80; // set first bit to 1
}else{
serByte &= 0x7F; // set first bit to 0
}
if (sent + sending >= payloadlen){
// last package
serByte |= 0x40;
sending = payloadlen - sent;
data[1] |= 0x80; // setting the RTP marker bit to 1
}
data[getHsize() + 2] = serByte;
memcpy(data + getHsize() + 3, payload + 2 + sent, sending);
callBack(socket, data, getHsize() + 3 + sending, channel);
sentPackets++;
sentBytes += sending + getHsize() + 3;
sent += sending;
increaseSequence();
}
}
}
void Packet::sendMPEG2(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel){
/// \todo This function probably belongs in DMS somewhere.
if (payloadlen + getHsize() + 4 <= maxDataLen){
data[1] |= 0x80; // setting the RTP marker bit to 1
Mpeg::MPEG2Info mInfo = Mpeg::parseMPEG2Headers(payload, payloadlen);
MPEGVideoHeader mHead(data+getHsize());
mHead.clear();
mHead.setTempRef(mInfo.tempSeq);
mHead.setPictureType(mInfo.frameType);
if (mInfo.isHeader){
mHead.setSequence();
}
mHead.setBegin();
mHead.setEnd();
memcpy(data + getHsize() + 4, payload, payloadlen);
callBack(socket, data, getHsize() + payloadlen + 4, channel);
sentPackets++;
sentBytes += payloadlen + getHsize() + 4;
increaseSequence();
}else{
data[1] &= 0x7F; // setting the RTP marker bit to 0
unsigned int sent = 0;
unsigned int sending =
maxDataLen - getHsize() - 4; // packages are of size MAX_SEND, except for the final one
Mpeg::MPEG2Info mInfo;
MPEGVideoHeader mHead(data+getHsize());
while (sent < payloadlen){
mHead.clear();
if (sent + sending >= payloadlen){
mHead.setEnd();
sending = payloadlen - sent;
data[1] |= 0x80; // setting the RTP marker bit to 1
}
Mpeg::parseMPEG2Headers(payload, sent+sending, mInfo);
mHead.setTempRef(mInfo.tempSeq);
mHead.setPictureType(mInfo.frameType);
if (sent == 0){
if (mInfo.isHeader){
mHead.setSequence();
}
mHead.setBegin();
}
memcpy(data + getHsize() + 4, payload + sent, sending);
callBack(socket, data, getHsize() + 4 + sending, channel);
sentPackets++;
sentBytes += sending + getHsize() + 4;
sent += sending;
increaseSequence();
}
}
}
void Packet::sendData(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel,
std::string codec){
if (codec == "H264"){
unsigned long sent = 0;
while (sent < payloadlen){
unsigned long nalSize = ntohl(*((unsigned long *)(payload + sent)));
sendH264(socket, callBack, payload + sent + 4, nalSize, channel);
sent += nalSize + 4;
}
return;
}
if (codec == "HEVC"){
unsigned long sent = 0;
while (sent < payloadlen){
unsigned long nalSize = ntohl(*((unsigned long *)(payload + sent)));
sendH265(socket, callBack, payload + sent + 4, nalSize, channel);
sent += nalSize + 4;
}
return;
}
if (codec == "MPEG2"){
sendMPEG2(socket, callBack, payload, payloadlen, channel);
return;
}
/// \todo This function probably belongs in DMS somewhere.
data[1] |= 0x80; // setting the RTP marker bit to 1
long offsetLen = 0; long offsetLen = 0;
if (codec == "AAC"){ if (codec == "AAC"){
*((long *)(data + getHsize())) = htonl(((payloadlen << 3) & 0x0010fff8) | 0x00100000); *((long *)(data + getHsize())) = htonl(((payloadlen << 3) & 0x0010fff8) | 0x00100000);
offsetLen = 4; offsetLen = 4;
}else if (codec == "MP3"){ }else if (codec == "MP3" || codec == "MP2"){
*((long *)(data + getHsize())) = 0;//this is MBZ and Frag_Offset, which is always 0 //See RFC 2250, "MPEG Audio-specific header"
*((long *)(data + getHsize())) = 0; // this is MBZ and Frag_Offset, which are always 0
if (payload[0] != 0xFF){
FAIL_MSG("MP2/MP3 data does not start with header?");
}
offsetLen = 4; offsetLen = 4;
}else if (codec == "AC3"){ }else if (codec == "AC3"){
*((short *)(data + getHsize())) = htons(0x0001) ;//this is 6 bits MBZ, 2 bits FT = 0 = full frames and 8 bits saying we send 1 frame *((short *)(data + getHsize())) = htons(0x0001); // this is 6 bits MBZ, 2 bits FT = 0 = full
// frames and 8 bits saying we send 1 frame
offsetLen = 2; offsetLen = 2;
} }
if (maxDataLen < getHsize() + offsetLen + payloadlen){ if (maxDataLen < getHsize() + offsetLen + payloadlen){
@ -135,10 +230,10 @@ namespace RTP {
return; return;
} }
uint32_t newMaxLen = getHsize() + offsetLen + payloadlen; uint32_t newMaxLen = getHsize() + offsetLen + payloadlen;
char * newData = new char[newMaxLen]; char *newData = new char[newMaxLen];
if (newData){ if (newData){
memcpy(newData, data, maxDataLen); memcpy(newData, data, maxDataLen);
delete [] data; delete[] data;
data = newData; data = newData;
maxDataLen = newMaxLen; maxDataLen = newMaxLen;
} }
@ -146,13 +241,14 @@ namespace RTP {
memcpy(data + getHsize() + offsetLen, payload, payloadlen); memcpy(data + getHsize() + offsetLen, payload, payloadlen);
callBack(socket, data, getHsize() + offsetLen + payloadlen, channel); callBack(socket, data, getHsize() + offsetLen + payloadlen, channel);
sentPackets++; sentPackets++;
sentBytes += payloadlen; sentBytes += payloadlen + offsetLen + getHsize();
increaseSequence(); increaseSequence();
} }
void Packet::sendRTCP(long long &connectedAt, void *socket, unsigned int tid,
void Packet::sendRTCP(long long & connectedAt, void * socket, unsigned int tid , DTSC::Meta & metadata, void callBack(void *, char *, unsigned int, unsigned int)) { DTSC::Meta &metadata,
void * rtcpData = malloc(32); void callBack(void *, char *, unsigned int, unsigned int)){
void *rtcpData = malloc(32);
if (!rtcpData){ if (!rtcpData){
FAIL_MSG("Could not allocate 32 bytes. Something is seriously messed up."); FAIL_MSG("Could not allocate 32 bytes. Something is seriously messed up.");
return; return;
@ -160,62 +256,63 @@ namespace RTP {
((int *)rtcpData)[0] = htonl(0x80C80006); ((int *)rtcpData)[0] = htonl(0x80C80006);
((int *)rtcpData)[1] = htonl(getSSRC()); ((int *)rtcpData)[1] = htonl(getSSRC());
// unsigned int tid = packet["trackid"].asInt(); // unsigned int tid = packet["trackid"].asInt();
//timestamp in ms // timestamp in ms
double ntpTime = 2208988800UL + Util::epoch() + (Util::getMS() % 1000) / 1000.0; double ntpTime = 2208988800UL + Util::epoch() + (Util::getMS() % 1000) / 1000.0;
if (startRTCP < 1 && startRTCP > -1) { if (startRTCP < 1 && startRTCP > -1){startRTCP = ntpTime;}
startRTCP = ntpTime;
}
ntpTime -= startRTCP; ntpTime -= startRTCP;
((int *)rtcpData)[2] = htonl(2208988800UL + Util::epoch()); //epoch is in seconds ((int *)rtcpData)[2] = htonl(2208988800UL + Util::epoch()); // epoch is in seconds
((int *)rtcpData)[3] = htonl((Util::getMS() % 1000) * 4294967.295); ((int *)rtcpData)[3] = htonl((Util::getMS() % 1000) * 4294967.295);
if (metadata.tracks[tid].codec == "H264") { if (metadata.tracks[tid].codec == "H264"){
((int *)rtcpData)[4] = htonl((ntpTime - 0) * 90000); //rtpts ((int *)rtcpData)[4] = htonl((ntpTime - 0) * 90000); // rtpts
} else { }else{
((int *)rtcpData)[4] = htonl((ntpTime - 0) * metadata.tracks[tid].rate); //rtpts ((int *)rtcpData)[4] = htonl((ntpTime - 0) * metadata.tracks[tid].rate); // rtpts
} }
//it should be the time packet was sent maybe, after all? // it should be the time packet was sent maybe, after all?
//*((int *)(rtcpData+16) ) = htonl(getTimeStamp());//rtpts //*((int *)(rtcpData+16) ) = htonl(getTimeStamp());//rtpts
((int *)rtcpData)[5] = htonl(sentPackets);//packet ((int *)rtcpData)[5] = htonl(sentPackets); // packet
((int *)rtcpData)[6] = htonl(sentBytes);//octet ((int *)rtcpData)[6] = htonl(sentBytes); // octet
callBack(socket, (char*)rtcpData , 28 , 0); callBack(socket, (char *)rtcpData, 28, 0);
free(rtcpData); free(rtcpData);
} }
Packet::Packet() { Packet::Packet(){
managed = false; managed = false;
data = 0; data = 0;
} }
Packet::Packet(unsigned int payloadType, unsigned int sequence, unsigned int timestamp, unsigned int ssrc, unsigned int csrcCount) { Packet::Packet(unsigned int payloadType, unsigned int sequence, unsigned int timestamp,
unsigned int ssrc, unsigned int csrcCount){
managed = true; managed = true;
data = new char[12 + 4 * csrcCount + 2 + MAX_SEND]; //headerSize, 2 for FU-A, MAX_SEND for maximum sent size data = new char[12 + 4 * csrcCount + 2 +
MAX_SEND]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size
if (data){ if (data){
maxDataLen = 12 + 4 * csrcCount + 2 + MAX_SEND; maxDataLen = 12 + 4 * csrcCount + 2 + MAX_SEND;
data[0] = ((2) << 6) | ((0 & 1) << 5) | ((0 & 1) << 4) | (csrcCount & 15); //version, padding, extension, csrc count data[0] = ((2) << 6) | ((0 & 1) << 5) | ((0 & 1) << 4) |
data[1] = payloadType & 0x7F; //marker and payload type (csrcCount & 15); // version, padding, extension, csrc count
data[1] = payloadType & 0x7F; // marker and payload type
}else{ }else{
maxDataLen = 0; maxDataLen = 0;
} }
setSequence(sequence - 1); //we automatically increase the sequence each time when p setSequence(sequence - 1); // we automatically increase the sequence each time when p
setTimestamp(timestamp); setTimestamp(timestamp);
setSSRC(ssrc); setSSRC(ssrc);
sentBytes = 0; sentBytes = 0;
sentPackets = 0; sentPackets = 0;
} }
Packet::Packet(const Packet & o) { Packet::Packet(const Packet &o){
managed = true; managed = true;
maxDataLen = 0; maxDataLen = 0;
if (o.data && o.maxDataLen) { if (o.data && o.maxDataLen){
data = new char[o.maxDataLen]; //headerSize, 2 for FU-A, MAX_SEND for maximum sent size data = new char[o.maxDataLen]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size
if (data) { if (data){
maxDataLen = o.maxDataLen; maxDataLen = o.maxDataLen;
memcpy(data, o.data, o.maxDataLen); memcpy(data, o.data, o.maxDataLen);
} }
} else { }else{
data = new char[14 + MAX_SEND];//headerSize, 2 for FU-A, MAX_SEND for maximum sent size data = new char[14 + MAX_SEND]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size
if (data) { if (data){
maxDataLen = 14 + MAX_SEND; maxDataLen = 14 + MAX_SEND;
memset(data, 0, maxDataLen); memset(data, 0, maxDataLen);
} }
@ -224,23 +321,21 @@ namespace RTP {
sentPackets = o.sentPackets; sentPackets = o.sentPackets;
} }
void Packet::operator=(const Packet & o) { void Packet::operator=(const Packet &o){
managed = true; managed = true;
maxDataLen = 0; maxDataLen = 0;
if (data && managed) { if (data && managed){delete[] data;}
delete[] data;
}
data = 0; data = 0;
if (o.data && o.maxDataLen) { if (o.data && o.maxDataLen){
data = new char[o.maxDataLen]; //headerSize, 2 for FU-A, MAX_SEND for maximum sent size data = new char[o.maxDataLen]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size
if (data) { if (data){
maxDataLen = o.maxDataLen; maxDataLen = o.maxDataLen;
memcpy(data, o.data, o.maxDataLen); memcpy(data, o.data, o.maxDataLen);
} }
} else { }else{
data = new char[14 + MAX_SEND];//headerSize, 2 for FU-A, MAX_SEND for maximum sent size data = new char[14 + MAX_SEND]; // headerSize, 2 for FU-A, MAX_SEND for maximum sent size
if (data) { if (data){
maxDataLen = 14 + MAX_SEND; maxDataLen = 14 + MAX_SEND;
memset(data, 0, maxDataLen); memset(data, 0, maxDataLen);
} }
@ -249,15 +344,68 @@ namespace RTP {
sentPackets = o.sentPackets; sentPackets = o.sentPackets;
} }
Packet::~Packet() { Packet::~Packet(){
if (managed) { if (managed){delete[] data;}
delete [] data;
} }
} Packet::Packet(const char *dat, unsigned int len){
Packet::Packet(const char * dat, unsigned int len) {
managed = false; managed = false;
datalen = len; datalen = len;
data = (char *) dat; data = (char *)dat;
}
MPEGVideoHeader::MPEGVideoHeader(char *d){
data = d;
}
uint16_t MPEGVideoHeader::getTotalLen() const{
uint16_t ret = 4;
if (data[0] & 0x08){
ret += 4;
if (data[4] & 0x40){
ret += data[8];
}
}
return ret;
}
std::string MPEGVideoHeader::toString() const{
std::stringstream ret;
uint32_t firstHead = Bit::btohl(data);
ret << "TR=" << ((firstHead & 0x3FF0000) >> 16);
if (firstHead & 0x4000000){ret << " Ext";}
if (firstHead & 0x2000){ret << " SeqHead";}
if (firstHead & 0x1000){ret << " SliceBegin";}
if (firstHead & 0x800){ret << " SliceEnd";}
ret << " PicType=" << ((firstHead & 0x700) >> 8);
if (firstHead & 0x80){ret << " FBV";}
ret << " BFC=" << ((firstHead & 0x70) >> 4);
if (firstHead & 0x8){ret << " FFV";}
ret << " FFC=" << (firstHead & 0x7);
return ret.str();
}
void MPEGVideoHeader::clear(){
((uint32_t*)data)[0] = 0;
}
void MPEGVideoHeader::setTempRef(uint16_t ref){
data[0] |= (ref >> 8) & 0x03;
data[1] = ref & 0xff;
}
void MPEGVideoHeader::setPictureType(uint8_t pType){
data[2] |= pType & 0x7;
}
void MPEGVideoHeader::setSequence(){
data[2] |= 0x20;
}
void MPEGVideoHeader::setBegin(){
data[2] |= 0x10;
}
void MPEGVideoHeader::setEnd(){
data[2] |= 0x8;
} }
} }

View file

@ -1,39 +1,41 @@
#pragma once #pragma once
#include <string>
#include <vector>
#include <set>
#include <iostream>
#include <iomanip>
#include <cstdio>
#include <stdint.h>
#include <sstream>
#include <deque>
#include <algorithm>
#include "socket.h"
#include "json.h"
#include "dtsc.h" #include "dtsc.h"
#include "json.h"
#include "mp4.h" #include "mp4.h"
#include "mp4_generic.h" #include "mp4_generic.h"
#include "socket.h"
#include <algorithm>
#include <cstdio>
#include <deque>
#include <iomanip>
#include <iostream>
#include <set>
#include <sstream>
#include <stdint.h>
#include <string>
#include <vector>
/// This namespace holds all RTP-parsing and sending related functionality. /// This namespace holds all RTP-parsing and sending related functionality.
namespace RTP { namespace RTP{
extern unsigned int MAX_SEND; extern unsigned int MAX_SEND;
/// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in here. /// This class is used to make RTP packets. Currently, H264, and AAC are supported. RTP
class Packet { /// mechanisms, like increasing sequence numbers and setting timestamps are all taken care of in
/// here.
class Packet{
private: private:
bool managed; bool managed;
char * data; ///<The actual RTP packet that is being sent char *data; ///<The actual RTP packet that is being sent
uint32_t maxDataLen;///< Amount of reserved bytes for the packet(s) uint32_t maxDataLen; ///< Amount of reserved bytes for the packet(s)
unsigned int datalen; ///<Size of rtp packet unsigned int datalen; ///<Size of rtp packet
int sentPackets; int sentPackets;
int sentBytes;//Because ugly is beautiful int sentBytes; // Because ugly is beautiful
public: public:
static double startRTCP; static double startRTCP;
unsigned int getHsize() const; unsigned int getHsize() const;
unsigned int getPayloadSize() const; unsigned int getPayloadSize() const;
char * getPayload() const; char *getPayload() const;
unsigned int getVersion() const; unsigned int getVersion() const;
unsigned int getPadding() const; unsigned int getPadding() const;
unsigned int getExtension() const; unsigned int getExtension() const;
@ -48,18 +50,42 @@ namespace RTP {
void setTimestamp(unsigned int t); void setTimestamp(unsigned int t);
void increaseSequence(); void increaseSequence();
void sendH264(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel); void sendH264(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
void sendData(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel, std::string codec); const char *payload, unsigned int payloadlen, unsigned int channel);
void sendRTCP(long long & connectedAt, void * socket, unsigned int tid, DTSC::Meta & metadata, void callBack(void *, char *, unsigned int, unsigned int)); void sendH265(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel);
void sendMPEG2(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel);
void sendData(void *socket, void callBack(void *, char *, unsigned int, unsigned int),
const char *payload, unsigned int payloadlen, unsigned int channel,
std::string codec);
void sendRTCP(long long &connectedAt, void *socket, unsigned int tid, DTSC::Meta &metadata,
void callBack(void *, char *, unsigned int, unsigned int));
Packet(); Packet();
Packet(unsigned int pt, unsigned int seq, unsigned int ts, unsigned int ssr, unsigned int csrcCount = 0); Packet(unsigned int pt, unsigned int seq, unsigned int ts, unsigned int ssr,
Packet(const Packet & o); unsigned int csrcCount = 0);
void operator=(const Packet & o); Packet(const Packet &o);
void operator=(const Packet &o);
~Packet(); ~Packet();
Packet(const char * dat, unsigned int len); Packet(const char *dat, unsigned int len);
char * getData(); char *getData();
};
class MPEGVideoHeader{
public:
MPEGVideoHeader(char * d);
void clear();
uint16_t getTotalLen() const;
std::string toString() const;
void setTempRef(uint16_t ref);
void setPictureType(uint8_t pType);
void setSequence();
void setBegin();
void setEnd();
private:
char * data;
}; };
} }

901
lib/sdp.cpp Normal file
View file

@ -0,0 +1,901 @@
#include "sdp.h"
#include "adts.h"
#include "defines.h"
#include "encode.h"
#include "h264.h"
#include "h265.h"
#include "util.h"
namespace SDP{
Track::Track(){
rtcpSent = 0;
channel = -1;
firstTime = 0;
packCount = 0;
cPort = 0;
rtpSeq = 0;
fpsTime = 0;
fpsMeta = 0;
fps = 0;
}
/// Extracts a particular parameter from the fmtp string. fmtp member must be set before calling.
std::string Track::getParamString(const std::string &param) const{
if (!fmtp.size()){return "";}
size_t pos = fmtp.find(param);
if (pos == std::string::npos){return "";}
pos += param.size() + 1;
size_t ePos = fmtp.find_first_of(" ;", pos);
return fmtp.substr(pos, ePos - pos);
}
/// Extracts a particular parameter from the fmtp string as an integer. fmtp member must be set
/// before calling.
uint64_t Track::getParamInt(const std::string &param) const{
return atoll(getParamString(param).c_str());
}
/// Gets the SDP contents for sending out a particular given DTSC::Track.
std::string mediaDescription(const DTSC::Track &trk){
std::stringstream mediaDesc;
if (trk.codec == "H264"){
MP4::AVCC avccbox;
avccbox.setPayload(trk.init);
mediaDesc << "m=video 0 RTP/AVP 97\r\n"
"a=rtpmap:97 H264/90000\r\n"
"a=cliprect:0,0,"
<< trk.height << "," << trk.width << "\r\n"
"a=framesize:97 "
<< trk.width << '-' << trk.height
<< "\r\n"
"a=fmtp:97 packetization-mode=1;profile-level-id="
<< std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[1]
<< std::dec << "E0" << std::hex << std::setw(2) << std::setfill('0')
<< (int)trk.init.data()[3] << std::dec << ";"
"sprop-parameter-sets="
<< Encodings::Base64::encode(std::string(avccbox.getSPS(), avccbox.getSPSLen()))
<< ","
<< Encodings::Base64::encode(std::string(avccbox.getPPS(), avccbox.getPPSLen()))
<< "\r\n"
"a=framerate:"
<< ((double)trk.fpks) / 1000.0 << "\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
}else if (trk.codec == "HEVC"){
h265::initData iData(trk.init);
mediaDesc << "m=video 0 RTP/AVP 104\r\n"
"a=rtpmap:104 H265/90000\r\n"
"a=cliprect:0,0,"
<< trk.height << "," << trk.width << "\r\n"
"a=framesize:104 "
<< trk.width << '-' << trk.height << "\r\n"
<< "a=fmtp:104 sprop-vps=";
const std::set<std::string> & vps = iData.getVPS();
if (vps.size()){
for (std::set<std::string>::iterator it = vps.begin(); it != vps.end(); it++){
if (it != vps.begin()){mediaDesc << ",";}
mediaDesc << Encodings::Base64::encode(*it);
}
}
mediaDesc << "; sprop-sps=";
const std::set<std::string> & sps = iData.getSPS();
if (sps.size()){
for (std::set<std::string>::iterator it = sps.begin(); it != sps.end(); it++){
if (it != sps.begin()){mediaDesc << ",";}
mediaDesc << Encodings::Base64::encode(*it);
}
}
mediaDesc << "; sprop-pps=";
const std::set<std::string> & pps = iData.getPPS();
if (pps.size()){
for (std::set<std::string>::iterator it = pps.begin(); it != pps.end(); it++){
if (it != pps.begin()){mediaDesc << ",";}
mediaDesc << Encodings::Base64::encode(*it);
}
}
mediaDesc << "\r\na=framerate:"
<< ((double)trk.fpks) / 1000.0 << "\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
}else if (trk.codec == "MPEG2"){
mediaDesc << "m=video 0 RTP/AVP 32\r\n"
"a=cliprect:0,0,"
<< trk.height << "," << trk.width << "\r\n"
"a=framesize:32 "
<< trk.width << '-' << trk.height << "\r\n"
<< "a=framerate:" << ((double)trk.fpks) / 1000.0 << "\r\n"
<< "a=control:track" << trk.trackID << "\r\n";
}else if (trk.codec == "AAC"){
mediaDesc << "m=audio 0 RTP/AVP 96"
<< "\r\n"
"a=rtpmap:96 mpeg4-generic/"
<< trk.rate << "/" << trk.channels
<< "\r\n"
"a=fmtp:96 streamtype=5; profile-level-id=15; config=";
for (unsigned int i = 0; i < trk.init.size(); i++){
mediaDesc << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init[i] << std::dec;
}
// these values are described in RFC 3640
mediaDesc << "; mode=AAC-hbr; SizeLength=13; IndexLength=3; IndexDeltaLength=3;\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
}else if (trk.codec == "MP3" || trk.codec == "MP2"){
mediaDesc << "m=" << trk.type << " 0 RTP/AVP 14"
<< "\r\n"
"a=rtpmap:14 MPA/90000/"
<< trk.channels << "\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
}else if (trk.codec == "AC3"){
mediaDesc << "m=audio 0 RTP/AVP 100"
<< "\r\n"
"a=rtpmap:100 AC3/"
<< trk.rate << "/" << trk.channels << "\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
}else if (trk.codec == "ALAW"){
if (trk.channels == 1 && trk.rate == 8000){
mediaDesc << "m=audio 0 RTP/AVP 8"
<< "\r\n";
}else{
mediaDesc << "m=audio 0 RTP/AVP 101"
<< "\r\n";
mediaDesc << "a=rtpmap:101 PCMA/" << trk.rate << "/" << trk.channels << "\r\n";
}
mediaDesc << "a=control:track" << trk.trackID << "\r\n";
}else if (trk.codec == "PCM"){
if (trk.size == 16 && trk.channels == 2 && trk.rate == 44100){
mediaDesc << "m=audio 0 RTP/AVP 10"
<< "\r\n";
}else if (trk.size == 16 && trk.channels == 1 && trk.rate == 44100){
mediaDesc << "m=audio 0 RTP/AVP 11"
<< "\r\n";
}else{
mediaDesc << "m=audio 0 RTP/AVP 103"
<< "\r\n";
mediaDesc << "a=rtpmap:103 L" << trk.size << "/" << trk.rate << "/" << trk.channels
<< "\r\n";
}
mediaDesc << "a=control:track" << trk.trackID << "\r\n";
}else if (trk.codec == "opus"){
mediaDesc << "m=audio 0 RTP/AVP 102"
<< "\r\n"
"a=rtpmap:102 opus/"
<< trk.rate << "/" << trk.channels << "\r\n"
"a=control:track"
<< trk.trackID << "\r\n";
}
return mediaDesc.str();
}
/// Sets the TCP/UDP connection details from a given transport string.
/// Sets the transportString member to the current transport string on success.
/// \param host The host connecting to us.
/// \source The source identifier.
/// \return True if successful, false otherwise.
bool Track::parseTransport(const std::string &transport, const std::string &host,
const std::string &source, const DTSC::Track &trk){
unsigned int SSrc = rand();
if (trk.codec == "H264"){
pack = RTP::Packet(97, 1, 0, SSrc);
}else if (trk.codec == "HEVC"){
pack = RTP::Packet(104, 1, 0, SSrc);
}else if (trk.codec == "MPEG2"){
pack = RTP::Packet(32, 1, 0, SSrc);
}else if (trk.codec == "AAC"){
pack = RTP::Packet(96, 1, 0, SSrc);
}else if (trk.codec == "AC3"){
pack = RTP::Packet(100, 1, 0, SSrc);
}else if (trk.codec == "MP3" || trk.codec == "MP2"){
pack = RTP::Packet(14, 1, 0, SSrc);
}else if (trk.codec == "ALAW"){
if (trk.channels == 1 && trk.rate == 8000){
pack = RTP::Packet(8, 1, 0, SSrc);
}else{
pack = RTP::Packet(101, 1, 0, SSrc);
}
}else if (trk.codec == "PCM"){
if (trk.size == 16 && trk.channels == 2 && trk.rate == 44100){
pack = RTP::Packet(10, 1, 0, SSrc);
}else if (trk.size == 16 && trk.channels == 1 && trk.rate == 44100){
pack = RTP::Packet(11, 1, 0, SSrc);
}else{
pack = RTP::Packet(103, 1, 0, SSrc);
}
}else if (trk.codec == "opus"){
pack = RTP::Packet(102, 1, 0, SSrc);
}else{
ERROR_MSG("Unsupported codec %s for RTSP on track %u", trk.codec.c_str(), trk.trackID);
return false;
}
if (transport.find("TCP") != std::string::npos){
std::string chanE =
transport.substr(transport.find("interleaved=") + 12,
(transport.size() - transport.rfind('-') - 1)); // extract channel ID
channel = atol(chanE.c_str());
rtcpSent = 0;
transportString = transport;
}else{
channel = -1;
size_t port_loc = transport.rfind("client_port=") + 12;
cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str());
uint32_t portA, portB;
// find available ports locally;
int sendbuff = 4 * 1024 * 1024;
data.SetDestination(host, cPort);
portA = data.bind(0);
setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
rtcp.SetDestination(host, cPort + 1);
portB = rtcp.bind(0);
setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
std::stringstream tStr;
tStr << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";";
if (source.size()){tStr << "source=" << source << ";";}
tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << SSrc << std::dec;
transportString = tStr.str();
INFO_MSG("Transport string: %s", transportString.c_str());
}
return true;
}
/// Gets the rtpInfo for a given DTSC::Track, source identifier and timestamp (in millis).
std::string Track::rtpInfo(const DTSC::Track &trk, const std::string &source,
uint64_t currentTime){
std::stringstream rInfo;
rInfo << "url=" << source << "/track" << trk.trackID
<< ";"; // get the current url, not localhost
rInfo << "sequence=" << pack.getSequence() << ";rtptime=" << currentTime * getMultiplier(trk);
return rInfo.str();
}
void State::parseSDP(const std::string &sdp){
std::stringstream ss(sdp);
std::string to;
uint64_t trackNo = 0;
bool nope = true; // true if we have no valid track to fill
DTSC::Track *thisTrack = 0;
while (std::getline(ss, to, '\n')){
if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size() - 1, 1);}
// All tracks start with a media line
if (to.substr(0, 2) == "m="){
nope = true;
++trackNo;
thisTrack = &(myMeta->tracks[trackNo]);
std::stringstream words(to.substr(2));
std::string item;
if (getline(words, item, ' ') && (item == "audio" || item == "video")){
thisTrack->type = item;
thisTrack->trackID = trackNo;
}else{
WARN_MSG("Media type not supported: %s", item.c_str());
continue;
}
getline(words, item, ' ');
if (!getline(words, item, ' ') || item != "RTP/AVP"){
WARN_MSG("Media transport not supported: %s", item.c_str());
continue;
}
if (getline(words, item, ' ')){
uint64_t avp_type = JSON::Value(item).asInt();
switch (avp_type){
case 8: // PCM A-law
INFO_MSG("PCM A-law payload type");
nope = false;
thisTrack->codec = "ALAW";
thisTrack->rate = 8000;
thisTrack->channels = 1;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
case 10: // PCM Stereo, 44.1kHz
INFO_MSG("Linear PCM stereo 44.1kHz payload type");
nope = false;
thisTrack->codec = "PCM";
thisTrack->size = 16;
thisTrack->rate = 44100;
thisTrack->channels = 2;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
case 11: // PCM Mono, 44.1kHz
INFO_MSG("Linear PCM mono 44.1kHz payload type");
nope = false;
thisTrack->codec = "PCM";
thisTrack->rate = 44100;
thisTrack->size = 16;
thisTrack->channels = 1;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
case 14: // MPA
INFO_MSG("MPA payload type");
nope = false;
thisTrack->codec = "MP3";
thisTrack->rate = 0;
thisTrack->size = 0;
thisTrack->channels = 0;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
case 32: // MPV
INFO_MSG("MPV payload type");
nope = false;
thisTrack->codec = "MPEG2";
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
default:
// dynamic type
if (avp_type >= 96 && avp_type <= 127){
INFO_MSG("Dynamic payload type (%llu) detected", avp_type);
nope = false;
continue;
}else{
FAIL_MSG("Payload type %llu not supported!", avp_type);
continue;
}
}
}
continue;
}
if (nope){continue;}// ignore lines if we have no valid track
// RTP mapping
if (to.substr(0, 8) == "a=rtpmap"){
std::string mediaType = to.substr(to.find(' ', 8) + 1);
std::string trCodec = mediaType.substr(0, mediaType.find('/'));
// convert to fullcaps
for (unsigned int i = 0; i < trCodec.size(); ++i){
if (trCodec[i] <= 122 && trCodec[i] >= 97){trCodec[i] -= 32;}
}
if (thisTrack->type == "audio"){
std::string extraInfo = mediaType.substr(mediaType.find('/') + 1);
if (extraInfo.find('/') != std::string::npos){
size_t lastSlash = extraInfo.find('/');
thisTrack->rate = atoll(extraInfo.substr(0, lastSlash).c_str());
thisTrack->channels = atoll(extraInfo.substr(lastSlash + 1).c_str());
}else{
thisTrack->rate = atoll(extraInfo.c_str());
thisTrack->channels = 1;
}
}
if (trCodec == "H264"){
thisTrack->codec = "H264";
thisTrack->rate = 90000;
}
if (trCodec == "H265"){
thisTrack->codec = "HEVC";
thisTrack->rate = 90000;
}
if (trCodec == "OPUS"){
thisTrack->codec = "opus";
thisTrack->init = std::string("OpusHead\001\002\170\000\200\273\000\000\000\000\000", 19);
}
if (trCodec == "PCMA"){thisTrack->codec = "ALAW";}
if (trCodec == "L8"){
thisTrack->codec = "PCM";
thisTrack->size = 8;
}
if (trCodec == "L16"){
thisTrack->codec = "PCM";
thisTrack->size = 16;
}
if (trCodec == "L20"){
thisTrack->codec = "PCM";
thisTrack->size = 20;
}
if (trCodec == "L24"){
thisTrack->codec = "PCM";
thisTrack->size = 24;
}
if (trCodec == "MPEG4-GENERIC"){thisTrack->codec = "AAC";}
if (!thisTrack->codec.size()){
ERROR_MSG("Unsupported RTP mapping: %s", mediaType.c_str());
}else{
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
}
continue;
}
if (to.substr(0, 10) == "a=control:"){
tracks[trackNo].control = to.substr(10);
continue;
}
if (to.substr(0, 7) == "a=fmtp:"){
tracks[trackNo].fmtp = to.substr(7);
if (thisTrack->codec == "AAC"){
if (tracks[trackNo].getParamString("mode") != "AAC-hbr"){
// a=fmtp:97
// profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;
// config=120856E500
FAIL_MSG("AAC transport mode not supported: %s",
tracks[trackNo].getParamString("mode").c_str());
nope = true;
myMeta->tracks.erase(trackNo);
tracks.erase(trackNo);
continue;
}
thisTrack->init = Encodings::Hex::decode(tracks[trackNo].getParamString("config"));
// myMeta.tracks[trackNo].rate = aac::AudSpecConf::rate(myMeta.tracks[trackNo].init);
}
if (thisTrack->codec == "H264"){
// a=fmtp:96 packetization-mode=1;
// sprop-parameter-sets=Z0LAHtkA2D3m//AUABqxAAADAAEAAAMAMg8WLkg=,aMuDyyA=;
// profile-level-id=42C01E
std::string sprop = tracks[trackNo].getParamString("sprop-parameter-sets");
size_t comma = sprop.find(',');
tracks[trackNo].spsData = Encodings::Base64::decode(sprop.substr(0, comma));
tracks[trackNo].ppsData = Encodings::Base64::decode(sprop.substr(comma + 1));
updateH264Init(trackNo);
}
if (thisTrack->codec == "HEVC"){
tracks[trackNo].hevcInfo.addUnit(Encodings::Base64::decode(tracks[trackNo].getParamString("sprop-vps")));
tracks[trackNo].hevcInfo.addUnit(Encodings::Base64::decode(tracks[trackNo].getParamString("sprop-sps")));
tracks[trackNo].hevcInfo.addUnit(Encodings::Base64::decode(tracks[trackNo].getParamString("sprop-pps")));
updateH265Init(trackNo);
}
continue;
}
// We ignore bandwidth lines
if (to.substr(0, 2) == "b="){continue;}
// we ignore everything before the first media line.
if (!trackNo){continue;}
// at this point, the data is definitely for a track
INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str());
}
}
/// Calculates H265 track metadata from sps and pps data stored in tracks[trackNo]
void State::updateH265Init(uint64_t trackNo){
DTSC::Track &Trk = myMeta->tracks[trackNo];
SDP::Track &RTrk = tracks[trackNo];
if (!RTrk.hevcInfo.haveRequired()){
MEDIUM_MSG("Aborted meta fill for hevc track %lu: no info nal unit", trackNo);
return;
}
Trk.init = RTrk.hevcInfo.generateHVCC();
h265::metaInfo MI = tracks[trackNo].hevcInfo.getMeta();
RTrk.fpsMeta = MI.fps;
Trk.width = MI.width;
Trk.height = MI.height;
Trk.fpks = RTrk.fpsMeta * 1000;
}
/// Calculates H264 track metadata from vps, sps and pps data stored in tracks[trackNo]
void State::updateH264Init(uint64_t trackNo){
DTSC::Track &Trk = myMeta->tracks[trackNo];
SDP::Track &RTrk = tracks[trackNo];
h264::sequenceParameterSet sps(RTrk.spsData.data(), RTrk.spsData.size());
h264::SPSMeta hMeta = sps.getCharacteristics();
MP4::AVCC avccBox;
avccBox.setVersion(1);
avccBox.setProfile(RTrk.spsData[1]);
avccBox.setCompatibleProfiles(RTrk.spsData[2]);
avccBox.setLevel(RTrk.spsData[3]);
avccBox.setSPSNumber(1);
avccBox.setSPS(RTrk.spsData);
avccBox.setPPSNumber(1);
avccBox.setPPS(RTrk.ppsData);
RTrk.fpsMeta = hMeta.fps;
Trk.width = hMeta.width;
Trk.height = hMeta.height;
Trk.fpks = hMeta.fps * 1000;
Trk.init = std::string(avccBox.payload(), avccBox.payloadSize());
}
uint32_t State::getTrackNoForChannel(uint8_t chan){
for (std::map<uint32_t, Track>::iterator it = tracks.begin(); it != tracks.end(); ++it){
if (chan == it->second.channel){return it->first;}
}
return 0;
}
uint32_t State::parseSetup(HTTP::Parser &H, const std::string &cH, const std::string &src){
static uint32_t trackCounter = 0;
if (H.url == "200"){
++trackCounter;
if (!tracks.count(trackCounter)){return 0;}
if (!tracks[trackCounter].parseTransport(H.GetHeader("Transport"), cH, src,
myMeta->tracks[trackCounter])){
return 0;
}
return trackCounter;
}
if (tracks.size()){
for (std::map<uint32_t, Track>::iterator it = tracks.begin(); it != tracks.end(); ++it){
if (!it->second.control.size()){
it->second.control = "track" + JSON::Value((long long)it->first).asString();
INFO_MSG("Control track: %s", it->second.control.c_str());
}
if (H.url.find(it->second.control) != std::string::npos ||
H.GetVar("pass").find(it->second.control) != std::string::npos){
INFO_MSG("Parsing SETUP against track %lu", it->first);
if (!it->second.parseTransport(H.GetHeader("Transport"), cH, src,
myMeta->tracks[it->first])){
return 0;
}
return it->first;
}
}
}
if (H.url.find("/track") != std::string::npos){
uint32_t trackNo = atoi(H.url.c_str() + H.url.find("/track") + 6);
if (trackNo){
INFO_MSG("Parsing SETUP against track %lu", trackNo);
if (!tracks[trackNo].parseTransport(H.GetHeader("Transport"), cH, src,
myMeta->tracks[trackNo])){
return 0;
}
return trackNo;
}
}
return 0;
}
/// Handles a single H264 packet, checking if others are appended at the end in Annex B format.
/// If so, splits them up and calls h264Packet for each. If not, calls it only once for the whole
/// payload.
void State::h264MultiParse(uint64_t ts, const uint64_t track, char *buffer, const uint32_t len){
uint32_t lastStart = 0;
for (uint32_t i = 0; i < len - 4; ++i){
// search for start code
if (buffer[i] == 0 && buffer[i + 1] == 0 && buffer[i + 2] == 0 && buffer[i + 3] == 1){
// if found, handle a packet from the last start code up to this start code
Bit::htobl(buffer + lastStart, (i - lastStart - 1) - 4); // size-prepend
h264Packet(ts, track, buffer + lastStart, (i - lastStart - 1),
h264::isKeyframe(buffer + lastStart + 4, i - lastStart - 5));
lastStart = i;
}
}
// Last packet (might be first, if no start codes found)
Bit::htobl(buffer + lastStart, (len - lastStart) - 4); // size-prepend
h264Packet(ts, track, buffer + lastStart, (len - lastStart),
h264::isKeyframe(buffer + lastStart + 4, len - lastStart - 4));
}
void State::h264Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey){
MEDIUM_MSG("H264: %llu@%llu, %lub%s", track, ts, len, isKey ? " (key)" : "");
// Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
// Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x1F);
switch (nalType){
case 7: // SPS
if (tracks[track].spsData.size() != len - 4 ||
memcmp(buffer + 4, tracks[track].spsData.data(), len - 4) != 0){
INFO_MSG("Updated SPS from RTP data");
tracks[track].spsData.assign(buffer + 4, len - 4);
updateH264Init(track);
}
return;
case 8: // PPS
if (tracks[track].ppsData.size() != len - 4 ||
memcmp(buffer + 4, tracks[track].ppsData.data(), len - 4) != 0){
INFO_MSG("Updated PPS from RTP data");
tracks[track].ppsData.assign(buffer + 4, len - 4);
updateH264Init(track);
}
return;
default: // others, continue parsing
break;
}
double fps = tracks[track].fpsMeta;
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < tracks[track].packCount){tracks[track].packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - tracks[track].packCount) > 32){tracks[track].packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - tracks[track].packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = tracks[track].packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, tracks[track].packCount,
(frameNo - tracks[track].packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
tracks[track].packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey);
tracks[track].packCount++;
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
}
void State::h265Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey){
MEDIUM_MSG("H265: %llu@%llu, %lub%s", track, ts, len, isKey ? " (key)" : "");
// Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
// Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x7E) >> 1;
switch (nalType){
case 32: // VPS
case 33: // SPS
case 34: // PPS
tracks[track].hevcInfo.addUnit(buffer);
updateH265Init(track);
return;
default: // others, continue parsing
break;
}
double fps = tracks[track].fpsMeta;
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
// Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
while (frameNo < tracks[track].packCount){tracks[track].packCount--;}
// More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo - tracks[track].packCount) > 32){tracks[track].packCount = frameNo;}
// After some experimentation, we found that the time offset is the difference between the
// frame number and the packet counter, times the frame rate in ms
offset = (frameNo - tracks[track].packCount) * (1000.0 / fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = tracks[track].packCount * (1000.0 / fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts,
isKey ? "key" : "i", frameNo, fps, tracks[track].packCount,
(frameNo - tracks[track].packCount), offset);
}else{
// For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey ? "key" : "i",
tracks[track].packCount);
}
// Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey);
tracks[track].packCount++;
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
}
///Returns the multiplier to use to get milliseconds from the RTP payload type for the given track
double getMultiplier(const DTSC::Track & Trk){
if (Trk.type == "video" || Trk.codec == "MP2" || Trk.codec == "MP3"){return 90.0;}
return ((double)Trk.rate / 1000.0);
}
/// Handles RTP packets generically, for both TCP and UDP-based connections.
/// In case of UDP, expects packets to be pre-sorted.
void State::handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt){
DTSC::Track &Trk = myMeta->tracks[track];
if (!tracks[track].firstTime){tracks[track].firstTime = pkt.getTimeStamp() + 1;}
uint64_t millis = (pkt.getTimeStamp() - tracks[track].firstTime + 1) / getMultiplier(Trk);
char *pl = pkt.getPayload();
uint32_t plSize = pkt.getPayloadSize();
INSANE_MSG("Received RTP packet for track %llu, time %llu -> %llu", track, pkt.getTimeStamp(), millis);
if (Trk.codec == "ALAW" || Trk.codec == "opus" || Trk.codec == "PCM"){
DTSC::Packet nextPack;
nextPack.genericFill(millis, 0, track, pl, plSize, 0, false);
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
return;
}
if (Trk.codec == "AAC"){
// assume AAC packets are single AU units
/// \todo Support other input than single AU units
unsigned int headLen =
(Bit::btohs(pl) >> 3) + 2; // in bits, so /8, plus two for the prepended size
DTSC::Packet nextPack;
uint16_t samples = aac::AudSpecConf::samples(Trk.init);
uint32_t sampleOffset = 0;
uint32_t offset = 0;
uint32_t auSize = 0;
for (uint32_t i = 2; i < headLen; i += 2){
auSize = Bit::btohs(pl + i) >> 3; // only the upper 13 bits
nextPack.genericFill((pkt.getTimeStamp() + sampleOffset - tracks[track].firstTime + 1) /
getMultiplier(Trk),
0, track, pl + headLen + offset,
std::min(auSize, plSize - headLen - offset), 0, false);
offset += auSize;
sampleOffset += samples;
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
}
return;
}
if (Trk.codec == "MP2" || Trk.codec == "MP3"){
if (plSize < 5){
WARN_MSG("Empty packet ignored!");
return;
}
DTSC::Packet nextPack;
nextPack.genericFill(millis, 0, track, pl + 4, plSize - 4, 0, false);
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
return;
}
if (Trk.codec == "MPEG2"){
if (plSize < 5){
WARN_MSG("Empty packet ignored!");
return;
}
///\TODO Merge packets with same timestamp together
HIGH_MSG("Received MPEG2 packet: %s", RTP::MPEGVideoHeader(pl).toString().c_str());
DTSC::Packet nextPack;
nextPack.genericFill(millis, 0, track, pl + 4, plSize - 4, 0, false);
if (incomingPacketCallback){incomingPacketCallback(nextPack);}
return;
}
if (Trk.codec == "HEVC"){
if (plSize < 2){
WARN_MSG("Empty packet ignored!");
return;
}
uint8_t nalType = (pl[0] & 0x7E) >> 1;
if (nalType == 48){
ERROR_MSG("AP not supported yet");
}else if (nalType == 49){
DONTEVEN_MSG("H265 Fragmentation Unit");
static Util::ResizeablePointer fuaBuffer;
// No length yet? Check for start bit. Ignore rest.
if (!fuaBuffer.size() && (pl[2] & 0x80) == 0){
HIGH_MSG("Not start of a new FU - throwing away");
return;
}
if (fuaBuffer.size() &&
((pl[2] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){
WARN_MSG("H265 FU packet incompleted: %lu", fuaBuffer.size());
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
fuaBuffer[4] |= 0x80; // set error bit
h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
fuaBuffer.size() = 0;
return;
}
unsigned long len = plSize - 3; // ignore the three FU bytes in front
if (!fuaBuffer.size()){len += 6;}// six extra bytes for the first packet
if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;}
if (!fuaBuffer.size()){
memcpy(fuaBuffer + 6, pl + 3, plSize - 3);
// reconstruct first byte
fuaBuffer[4] = ((pl[2] & 0x3F) << 1) | (pl[0] & 0x81);
fuaBuffer[5] = pl[1];
}else{
memcpy(fuaBuffer + fuaBuffer.size(), pl + 3, plSize - 3);
}
fuaBuffer.size() += len;
if (pl[2] & 0x40){// last packet
VERYHIGH_MSG("H265 FU packet type %s (%u) completed: %lu", h265::typeToStr((fuaBuffer[4] & 0x7E) >> 1), (uint8_t)((fuaBuffer[4] & 0x7E) >> 1),
fuaBuffer.size());
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h265::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
fuaBuffer.size() = 0;
}
return;
}else if (nalType == 50){
ERROR_MSG("PACI/TSCI not supported yet");
}else{
DONTEVEN_MSG("%s NAL unit (%u)", h265::typeToStr(nalType), nalType);
static Util::ResizeablePointer packBuffer;
if (!packBuffer.allocate(plSize + 4)){return;}
Bit::htobl(packBuffer, plSize); // size-prepend
memcpy(packBuffer + 4, pl, plSize);
h265Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer,
plSize + 4, h265::isKeyframe(packBuffer + 4, plSize));
return;
}
return;
}
if (Trk.codec == "H264"){
// Handles common H264 packets types, but not all.
// Generalizes and converts them all to a data format ready for DTSC, then calls h264Packet
// for that data.
// Prints a WARN-level message if packet type is unsupported.
/// \todo Support other H264 packets types?
if (!plSize){
WARN_MSG("Empty packet ignored!");
return;
}
if ((pl[0] & 0x1F) == 0){
WARN_MSG("H264 packet type null ignored");
return;
}
if ((pl[0] & 0x1F) < 24){
DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F));
static Util::ResizeablePointer packBuffer;
if (!packBuffer.allocate(plSize + 4)){return;}
Bit::htobl(packBuffer, plSize); // size-prepend
memcpy(packBuffer + 4, pl, plSize);
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer,
plSize + 4, h264::isKeyframe(packBuffer + 4, plSize));
return;
}
if ((pl[0] & 0x1F) == 24){
DONTEVEN_MSG("H264 STAP-A packet");
unsigned int len = 0;
unsigned int pos = 1;
while (pos + 1 < plSize){
unsigned int pLen = Bit::btohs(pl + pos);
INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos + 2] & 0x1F));
pos += 2 + pLen;
len += 4 + pLen;
}
static Util::ResizeablePointer packBuffer;
if (!packBuffer.allocate(len)){return;}
pos = 1;
len = 0;
bool isKey = false;
while (pos + 1 < plSize){
unsigned int pLen = Bit::btohs(pl + pos);
isKey |= h264::isKeyframe(pl + pos + 2, pLen);
Bit::htobl(packBuffer + len, pLen); // size-prepend
memcpy(packBuffer + len + 4, pl + pos + 2, pLen);
len += 4 + pLen;
pos += 2 + pLen;
}
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len,
isKey);
return;
}
if ((pl[0] & 0x1F) == 28){
DONTEVEN_MSG("H264 FU-A packet");
static Util::ResizeablePointer fuaBuffer;
// No length yet? Check for start bit. Ignore rest.
if (!fuaBuffer.size() && (pl[1] & 0x80) == 0){
HIGH_MSG("Not start of a new FU-A - throwing away");
return;
}
if (fuaBuffer.size() &&
((pl[1] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){
WARN_MSG("Ending unfinished FU-A");
INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaBuffer.size());
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
// attempt to detect multiple H264 packets, even though specs disallow it
h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track,
fuaBuffer, fuaBuffer.size());
}else{
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
fuaBuffer[4] |= 0x80; // set error bit
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
}
fuaBuffer.size() = 0;
return;
}
unsigned long len = plSize - 2; // ignore the two FU-A bytes in front
if (!fuaBuffer.size()){len += 5;}// five extra bytes for the first packet
if (!fuaBuffer.allocate(fuaBuffer.size() + len)){return;}
if (!fuaBuffer.size()){
memcpy(fuaBuffer + 4, pl + 1, plSize - 1);
// reconstruct first byte
fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pl[0] & 0xE0);
}else{
memcpy(fuaBuffer + fuaBuffer.size(), pl + 2, plSize - 2);
}
fuaBuffer.size() += len;
if (pl[1] & 0x40){// last packet
INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F),
fuaBuffer.size());
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
// attempt to detect multiple H264 packets, even though specs disallow it
h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track,
fuaBuffer, fuaBuffer.size());
}else{
Bit::htobl(fuaBuffer, fuaBuffer.size() - 4); // size-prepend
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer,
fuaBuffer.size(), h264::isKeyframe(fuaBuffer + 4, fuaBuffer.size() - 4));
}
fuaBuffer.size() = 0;
}
return;
}
WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F));
return;
}
}
}

66
lib/sdp.h Normal file
View file

@ -0,0 +1,66 @@
#include "dtsc.h"
#include "http_parser.h"
#include "rtp.h"
#include "socket.h"
#include "h265.h"
namespace SDP{
double getMultiplier(const DTSC::Track & Trk);
/// Structure used to keep track of selected tracks.
class Track{
public:
Socket::UDPConnection data;
Socket::UDPConnection rtcp;
RTP::Packet pack;
long long rtcpSent;
uint64_t firstTime;
int channel; /// Channel number, used in TCP sending
uint64_t packCount;
uint16_t rtpSeq;
std::map<uint16_t, RTP::Packet> packBuffer;
uint32_t cPort;
std::string transportString; /// Current transport string.
std::string control;
std::string fmtp; /// fmtp string, used by getParamString / getParamInt
std::string spsData;
std::string ppsData;
h265::initData hevcInfo;
uint64_t fpsTime;
double fpsMeta;
double fps;
Track();
std::string getParamString(const std::string &param) const;
uint64_t getParamInt(const std::string &param) const;
bool parseTransport(const std::string &transport, const std::string &host,
const std::string &source, const DTSC::Track &trk);
std::string rtpInfo(const DTSC::Track &trk, const std::string &source, uint64_t currentTime);
};
class State{
public:
State(){
incomingPacketCallback = 0;
myMeta = 0;
}
DTSC::Meta *myMeta;
void (*incomingPacketCallback)(const DTSC::Packet &pkt);
std::map<uint32_t, Track> tracks; ///< List of selected tracks with SDP-specific session data.
void parseSDP(const std::string &sdp);
void updateH264Init(uint64_t trackNo);
void updateH265Init(uint64_t trackNo);
uint32_t getTrackNoForChannel(uint8_t chan);
uint32_t parseSetup(HTTP::Parser &H, const std::string &host,
const std::string &source);
void handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt);
void h264MultiParse(uint64_t ts, const uint64_t track, char *buffer, const uint32_t len);
void h264Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey);
void h265Packet(uint64_t ts, const uint64_t track, const char *buffer, const uint32_t len,
bool isKey);
};
std::string mediaDescription(const DTSC::Track &trk);
}

View file

@ -1,197 +1,119 @@
#include <cstdlib> #include "analyser_rtsp.h"
#include <iostream>
#include <fstream>
#include <string>
#include <string.h>
#include <vector>
#include <mist/config.h> AnalyserRTSP *classPointer = 0;
#include <mist/rtp.h>
#include <mist/socket.h>
#include <mist/http_parser.h>
#include <sstream>
namespace RtspRtp{ void incomingPacket(const DTSC::Packet &pkt){
classPointer->incoming(pkt);
}
int analyseRtspRtp(std::string rtspUrl){ void AnalyserRTSP::init(Util::Config &conf){
/*//parse hostname Analyser::init(conf);
std::string hostname = rtspUrl.substr(7); }
hostname = hostname.substr(0,hostname.find('/'));
std::cout << hostname << std::endl;
HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender.
Socket::Connection conn(hostname,554,false);//setting rtsp connection
bool optionsSent = false; void AnalyserRTSP::incoming(const DTSC::Packet &pkt){
bool optionsRecvd = false; char *dataPtr;
bool descSent = false; uint32_t dataSize;
bool descRecvd = false; pkt.getString("data", dataPtr, dataSize);
bool setupComplete = false; DETAIL_MED("Received %ub %sfor track %lu (%s) @ %llums", dataSize, pkt.getFlag("keyframe")?"keyframe ":"", pkt.getTrackId(),
bool playSent = false; myMeta.tracks[pkt.getTrackId()].getIdentifier().c_str(), pkt.getTime());
int CSeq = 1; if (detail >= 8){
while(conn.connected()){ for (uint32_t i = 0; i < dataSize; ++i){
if(!optionsSent){ std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)dataPtr[i] << " ";
HTTP_R.protocol="RTSP/1.0"; if (i % 32 == 31){std::cout << std::endl;}
HTTP_R.method = "OPTIONS";
HTTP_R.url = rtspUrl;
HTTP_R.SetHeader("CSeq",CSeq);
CSeq++;
HTTP_R.SetHeader("User-Agent","mistANALyser");
HTTP_R.SendRequest(conn);
optionsSent = true;
}
if (optionsSent&& !optionsRecvd && (conn.Received().size() || conn.spool() )){
if(HTTP_S.Read(conn)){
std::cout << "recv opts" << std::endl;
std::cout << HTTP_S.BuildResponse(HTTP_S.method,HTTP_S.url);
optionsRecvd = true;
} }
std::cout << std::endl;
} }
}
AnalyserRTSP::AnalyserRTSP(Util::Config &conf) : Analyser(conf){
myConn = Socket::Connection(1, 0);
sdpState.myMeta = &myMeta;
sdpState.incomingPacketCallback = incomingPacket;
classPointer = this;
}
bool AnalyserRTSP::isOpen(){
return myConn;
}
if(optionsRecvd && !descSent){ bool AnalyserRTSP::parsePacket(){
HTTP_S.Clean();
HTTP_R.protocol="RTSP/1.0";
HTTP_R.method = "DESCRIBE";
HTTP_R.url = rtspUrl;
HTTP_R.SetHeader("CSeq",CSeq);
CSeq++;
HTTP_R.SetHeader("User-Agent","mistANALyser");
HTTP_R.SendRequest(conn);
descSent = true;
}
std::vector<std::string> trackIds;
if (descSent&&!descRecvd && (conn.Received().size() || conn.spool() )){
if(HTTP_S.Read(conn)){
std::cout << "recv desc2" << std::endl;
std::cout << HTTP_S.BuildResponse(HTTP_S.method,HTTP_S.url);
size_t pos = HTTP_S.body.find("m=");
do{ do{
//finding all track IDs // No new data? Sleep and retry, if connection still open
pos = HTTP_S.body.find("a=control:",pos); if (!myConn.Received().size() || !myConn.Received().available(1)){
if(pos !=std::string::npos){ if (!myConn.spool() && isOpen()){Util::sleep(500);}
trackIds.push_back(HTTP_S.body.substr(pos+10,HTTP_S.body.find("\r\n",pos)-pos-10 ) );//setting track IDs; continue;
pos++;
} }
}while(pos != std::string::npos); if (myConn.Received().copy(1) != "$"){
//we have all the tracks // not a TCP RTP packet, read RTSP commands
if (HTTP.Read(myConn)){
descRecvd = true; if (HTTP.hasHeader("Content-Type") && HTTP.GetHeader("Content-Type") == "application/sdp"){
sdpState.parseSDP(HTTP.body);
HTTP.Clean();
return true;
} }
if (HTTP.hasHeader("Transport")){
uint32_t trackNo = sdpState.parseSetup(HTTP, "", "");
if (trackNo){
DETAIL_MED("Parsed transport for track: %lu", trackNo);
}else{
DETAIL_MED("Could not parse transport string!");
}
HTTP.Clean();
return true;
} }
std::cout << HTTP.BuildRequest() << std::endl;
unsigned int setupsSent = 0; HTTP.Clean();
unsigned int setupsRecvd = 0; return true;
Socket::UDPConnection connectors[trackIds.size()]; }else{
unsigned int setports[trackIds.size()]; if (!myConn.spool() && isOpen()){Util::sleep(500);}
uint32_t bport = 10000;
std::string sessionID = "";
std::stringstream setup;
if(descRecvd && !setupComplete){
//time to setup.
for(std::vector<std::string>::iterator it = trackIds.begin();it!=trackIds.end();it++){
std::cout << "setup " << setupsSent<< std::endl;
while(!connectors[setupsSent].SetConnection( bport,false) ){
bport +=2;//finding an available port
} }
std::cout << "setup" << bport<< std::endl; continue;
setports[setupsSent] = bport;
bport +=2;
if(setupsSent == setupsRecvd){
//send only one setup
HTTP_S.Clean();
HTTP_R.protocol="RTSP/1.0";
HTTP_R.method = "SETUP";
HTTP_R.url = rtspUrl+ '/' + *(it);
setup << "RTP/AVP/UDP;unicast;client_port="<< setports[setupsSent] <<"-" <<setports[setupsSent]+1 ;
HTTP_R.SetHeader("Transport",setup.str() );
std:: cout << setup.str()<<std::endl;
setup.str(std::string());
setup.clear();
HTTP_R.SetHeader("CSeq",CSeq);
CSeq++;
if(sessionID != ""){
HTTP_R.SetHeader("Session",sessionID);
} }
HTTP_R.SetHeader("User-Agent","mistANALyser"); if (!myConn.Received().available(4)){
HTTP_R.SendRequest(conn); if (!myConn.spool() && isOpen()){Util::sleep(500);}
setupsSent ++; continue;
}// a TCP RTP packet, but not complete yet
// We have a TCP packet! Read it...
// Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data
std::string tcpHead = myConn.Received().copy(4);
uint16_t len = ntohs(*(short *)(tcpHead.data() + 2));
if (!myConn.Received().available(len + 4)){
if (!myConn.spool() && isOpen()){Util::sleep(500);}
continue;
}// a TCP RTP packet, but not complete yet
// remove whole packet from buffer, including 4 byte header
std::string tcpPacket = myConn.Received().remove(len + 4);
RTP::Packet pkt(tcpPacket.data() + 4, len);
uint8_t chan = tcpHead.data()[1];
uint32_t trackNo = sdpState.getTrackNoForChannel(chan);
DETAIL_HI("Received %ub RTP packet #%u on channel %u, time %llu", len,
(unsigned int)pkt.getSequence(), chan, pkt.getTimeStamp());
if (!trackNo && (chan % 2) != 1){
DETAIL_MED("Received packet for unknown track number on channel %u", chan);
}
if (trackNo){
sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();
} }
if (detail >= 10){
char *pl = pkt.getPayload();
while(setupsSent == setupsRecvd+1){ uint32_t payLen = pkt.getPayloadSize();
//lets Assume we assume we always receive a response for (uint32_t i = 0; i < payLen; ++i){
if ( (conn.Received().size() || conn.spool() )){ std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)pl[i] << " ";
if(HTTP_S.Read(conn)){ if (i % 32 == 31){std::cout << std::endl;}
std::cout << "recv setup" << std::endl;
std::cout << HTTP_S.BuildResponse(HTTP_S.method,HTTP_S.url);
optionsRecvd = true;
sessionID = HTTP_S.GetHeader("Session");
setupsRecvd++;
} }
} std::cout << std::endl;
}
//set up all parameters, and then after the for loop we have to listen to setups and all. sent if both are equal, and recv if one is sent
}
setupComplete = true;
} }
if(setupComplete && !playSent){ sdpState.handleIncomingRTP(trackNo, pkt);
//time to play
HTTP_S.Clean();
HTTP_R.protocol="RTSP/1.0";
HTTP_R.method = "PLAY";
HTTP_R.url = rtspUrl;
HTTP_R.SetHeader("CSeq",CSeq); return true;
CSeq++;
HTTP_R.SetHeader("User-Agent","mistANALyser");
HTTP_R.SetHeader("Session",sessionID);
HTTP_R.SendRequest(conn);
playSent = true;
std::cout << "sent play" << std::endl;
char buffer[2000];
while(!connectors[0].iread((void*)buffer,2000)) {
std::cout << "buffer";
}
std::cout <<"buffer is not empty" << std::endl;
}
//streams set up
//time to read some packets
if(descRecvd){
conn.close();
}
}
conn.close();*/
return 0;
}
}while (isOpen());
// if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets
} }
int main(int argc, char ** argv){
Util::Config conf = Util::Config(argv[0]);
conf.addOption("url",JSON::fromString("{\"arg\":\"string\",\"short\":\"u\",\"long\":\"url\",\"help\":\"URL To get.\", \"default\":\"rtsp://localhost/s1k\"}"));
conf.parseArgs(argc, argv);
return RtspRtp::analyseRtspRtp(conf.getString("url"));
}

View file

@ -1,27 +1,23 @@
#include <mist/config.h> #pragma once
#include "analyser.h" #include "analyser.h"
#include <cstdlib> #include <mist/h264.h>
#include <iomanip>
#include <vector>
#include <sstream>
#include <mist/socket.h>
#include <mist/rtp.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/rtp.h>
#include <mist/sdp.h>
class rtpAnalyser : public analysers class AnalyserRTSP : public Analyser{
{ public:
Socket::Connection conn; AnalyserRTSP(Util::Config &conf);
HTTP::Parser HTTP_R, HTTP_S;//HTTP Receiver en HTTP Sender. static void init(Util::Config &cfg);
int step; bool parsePacket();
std::vector<std::string> tracks; void incoming(const DTSC::Packet &pkt);
std::vector<Socket::UDPConnection> connections; bool isOpen();
unsigned int trackIt;
public: private:
rtpAnalyser(Util::Config config); Socket::Connection myConn;
bool packetReady(); HTTP::Parser HTTP;
void PreProcessing(); DTSC::Meta myMeta;
//int Analyse(); SDP::State sdpState;
int doAnalyse();
void doValidate();
}; };

View file

@ -1,19 +1,26 @@
#include <mist/defines.h> #include "output_rtsp.h"
#include <mist/adts.h>
#include <mist/auth.h> #include <mist/auth.h>
#include <mist/encode.h>
#include <mist/stream.h>
#include <mist/bitfields.h> #include <mist/bitfields.h>
#include <mist/bitstream.h> #include <mist/bitstream.h>
#include <mist/adts.h> #include <mist/defines.h>
#include <mist/encode.h>
#include <mist/stream.h>
#include <mist/triggers.h> #include <mist/triggers.h>
#include "output_rtsp.h"
#include <sys/stat.h> #include <sys/stat.h>
namespace Mist { namespace Mist{
Socket::Connection * mainConn = 0; Socket::Connection *mainConn = 0;
OutRTSP *classPointer = 0;
OutRTSP::OutRTSP(Socket::Connection & myConn) : Output(myConn){ /// Helper function for passing packets into the OutRTSP class
void insertPacket(const DTSC::Packet &pkt){classPointer->incomingPacket(pkt);}
/// Takes incoming packets and buffers them.
void OutRTSP::incomingPacket(const DTSC::Packet &pkt){bufferLivePacket(pkt);}
OutRTSP::OutRTSP(Socket::Connection &myConn) : Output(myConn){
connectedAt = Util::epoch() + 2208988800ll; connectedAt = Util::epoch() + 2208988800ll;
pausepoint = 0; pausepoint = 0;
setBlocking(false); setBlocking(false);
@ -22,6 +29,9 @@ namespace Mist {
expectTCP = false; expectTCP = false;
lastTimeSync = 0; lastTimeSync = 0;
mainConn = &myConn; mainConn = &myConn;
classPointer = this;
sdpState.incomingPacketCallback = insertPacket;
sdpState.myMeta = &myMeta;
} }
/// Function used to send RTP packets over UDP /// Function used to send RTP packets over UDP
@ -29,39 +39,42 @@ namespace Mist {
///\param data The RTP Packet that needs to be sent ///\param data The RTP Packet that needs to be sent
///\param len The size of data ///\param len The size of data
///\param channel Not used here, but is kept for compatibility with sendTCP ///\param channel Not used here, but is kept for compatibility with sendTCP
void sendUDP(void * socket, char * data, unsigned int len, unsigned int channel) { void sendUDP(void *socket, char *data, unsigned int len, unsigned int channel){
((Socket::UDPConnection *) socket)->SendNow(data, len); ((Socket::UDPConnection *)socket)->SendNow(data, len);
if (mainConn){mainConn->addUp(len);} if (mainConn){mainConn->addUp(len);}
} }
/// Function used to send RTP packets over TCP /// Function used to send RTP packets over TCP
///\param socket A TCP Connection pointer, sent as a void*, to keep portability. ///\param socket A TCP Connection pointer, sent as a void*, to keep portability.
///\param data The RTP Packet that needs to be sent ///\param data The RTP Packet that needs to be sent
///\param len The size of data ///\param len The size of data
///\param channel Used to distinguish different data streams when sending RTP over TCP ///\param channel Used to distinguish different data streams when sending RTP over TCP
void sendTCP(void * socket, char * data, unsigned int len, unsigned int channel) { void sendTCP(void *socket, char *data, unsigned int len, unsigned int channel){
//1 byte '$', 1 byte channel, 2 bytes length // 1 byte '$', 1 byte channel, 2 bytes length
char buf[] = "$$$$"; char buf[] = "$$$$";
buf[1] = channel; buf[1] = channel;
((short *) buf)[1] = htons(len); ((short *)buf)[1] = htons(len);
((Socket::Connection *) socket)->SendNow(buf, 4); ((Socket::Connection *)socket)->SendNow(buf, 4);
((Socket::Connection *) socket)->SendNow(data, len); ((Socket::Connection *)socket)->SendNow(data, len);
} }
void OutRTSP::init(Util::Config * cfg){ void OutRTSP::init(Util::Config *cfg){
Output::init(cfg); Output::init(cfg);
capa["name"] = "RTSP"; capa["name"] = "RTSP";
capa["desc"] = "Provides Real Time Streaming Protocol output, supporting both UDP and TCP transports."; capa["desc"] =
"Provides Real Time Streaming Protocol output, supporting both UDP and TCP transports.";
capa["deps"] = ""; capa["deps"] = "";
capa["url_rel"] = "/$"; capa["url_rel"] = "/$";
capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("MPEG2");
capa["codecs"][0u][1u].append("AAC"); capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("MP3"); capa["codecs"][0u][1u].append("MP3");
capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("ALAW"); capa["codecs"][0u][1u].append("ALAW");
capa["codecs"][0u][1u].append("PCM"); capa["codecs"][0u][1u].append("PCM");
capa["codecs"][0u][1u].append("opus"); capa["codecs"][0u][1u].append("opus");
capa["codecs"][0u][1u].append("MP2");
capa["methods"][0u]["handler"] = "rtsp"; capa["methods"][0u]["handler"] = "rtsp";
capa["methods"][0u]["type"] = "rtsp"; capa["methods"][0u]["type"] = "rtsp";
@ -79,92 +92,81 @@ namespace Mist {
} }
void OutRTSP::sendNext(){ void OutRTSP::sendNext(){
char * dataPointer = 0; char *dataPointer = 0;
unsigned int dataLen = 0; unsigned int dataLen = 0;
thisPacket.getString("data", dataPointer, dataLen); thisPacket.getString("data", dataPointer, dataLen);
uint32_t tid = thisPacket.getTrackId(); uint32_t tid = thisPacket.getTrackId();
uint64_t timestamp = thisPacket.getTime(); uint64_t timestamp = thisPacket.getTime();
//if we're past the pausing point, seek to it, and pause immediately // if we're past the pausing point, seek to it, and pause immediately
if (pausepoint && timestamp > pausepoint){ if (pausepoint && timestamp > pausepoint){
pausepoint = 0; pausepoint = 0;
stop(); stop();
return; return;
} }
if (myMeta.live && lastTimeSync + 666 < timestamp){ if (myMeta.live && lastTimeSync + 666 < timestamp){
lastTimeSync = timestamp; lastTimeSync = timestamp;
updateMeta(); updateMeta();
DTSC::Track & mainTrk = myMeta.tracks[getMainSelectedTrack()]; DTSC::Track &mainTrk = myMeta.tracks[getMainSelectedTrack()];
// The extra 2000ms here is for the metadata sync delay. // The extra 2000ms here is for the metadata sync delay.
// It can be removed once we get rid of that. // It can be removed once we get rid of that.
if (timestamp + 2000 + needsLookAhead < mainTrk.keys.rbegin()->getTime() && mainTrk.lastms - mainTrk.keys.rbegin()->getTime() > needsLookAhead){ if (timestamp + 2000 + needsLookAhead < mainTrk.keys.rbegin()->getTime() &&
INFO_MSG("Skipping forward %llums (%llu ms LA)", mainTrk.keys.rbegin()->getTime() - thisPacket.getTime(), needsLookAhead); mainTrk.lastms - mainTrk.keys.rbegin()->getTime() > needsLookAhead){
INFO_MSG("Skipping forward %llums (%llu ms LA)",
mainTrk.keys.rbegin()->getTime() - thisPacket.getTime(), needsLookAhead);
seek(mainTrk.keys.rbegin()->getTime()); seek(mainTrk.keys.rbegin()->getTime());
return; return;
} }
} }
void * socket = 0; void *socket = 0;
void (*callBack)(void *, char *, unsigned int, unsigned int) = 0; void (*callBack)(void *, char *, unsigned int, unsigned int) = 0;
if (tracks[tid].channel == -1){//UDP connection if (sdpState.tracks[tid].channel == -1){// UDP connection
socket = &tracks[tid].data; socket = &sdpState.tracks[tid].data;
callBack = sendUDP; callBack = sendUDP;
if (Util::epoch()/5 != tracks[tid].rtcpSent){ if (Util::epoch() / 5 != sdpState.tracks[tid].rtcpSent){
tracks[tid].rtcpSent = Util::epoch()/5; sdpState.tracks[tid].rtcpSent = Util::epoch() / 5;
tracks[tid].pack.sendRTCP(connectedAt, &tracks[tid].rtcp, tid, myMeta, sendUDP); sdpState.tracks[tid].pack.sendRTCP(connectedAt, &sdpState.tracks[tid].rtcp, tid, myMeta,
sendUDP);
} }
}else{ }else{
socket = &myConn; socket = &myConn;
callBack = sendTCP; callBack = sendTCP;
} }
if(myMeta.tracks[tid].codec == "H264"){ uint64_t offset = thisPacket.getInt("offset");
long long offset = thisPacket.getInt("offset"); sdpState.tracks[tid].pack.setTimestamp((timestamp+offset) * SDP::getMultiplier(myMeta.tracks[tid]));
tracks[tid].pack.setTimestamp(90 * (timestamp + offset)); sdpState.tracks[tid].pack.sendData(socket, callBack, dataPointer, dataLen,
unsigned long sent = 0; sdpState.tracks[tid].channel, myMeta.tracks[tid].codec);
while (sent < dataLen) {
unsigned long nalSize = ntohl(*((unsigned long *)(dataPointer + sent)));
tracks[tid].pack.sendH264(socket, callBack, dataPointer + sent + 4, nalSize, tracks[tid].channel);
sent += nalSize + 4;
}
return;
}
//Default packager
tracks[tid].pack.setTimestamp(timestamp * ((double) myMeta.tracks[tid].rate / 1000.0));
tracks[tid].pack.sendData(socket, callBack, dataPointer, dataLen, tracks[tid].channel,myMeta.tracks[tid].codec);
} }
/// This request handler also checks for UDP packets /// This request handler also checks for UDP packets
void OutRTSP::requestHandler(){ void OutRTSP::requestHandler(){
if (!expectTCP){ if (!expectTCP){handleUDP();}
handleUDP();
}
Output::requestHandler(); Output::requestHandler();
} }
void OutRTSP::onRequest(){ void OutRTSP::onRequest(){
RTP::MAX_SEND = config->getInteger("maxsend"); RTP::MAX_SEND = config->getInteger("maxsend");
//if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets // if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets
while ((!expectTCP || handleTCP()) && HTTP_R.Read(myConn)){ while ((!expectTCP || handleTCP()) && HTTP_R.Read(myConn)){
//cancel broken URLs // cancel broken URLs
if (HTTP_R.url.size() < 8){ if (HTTP_R.url.size() < 8){
WARN_MSG("Invalid data found in RTSP input around ~%llub - disconnecting!", myConn.dataDown()); WARN_MSG("Invalid data found in RTSP input around ~%llub - disconnecting!",
myConn.dataDown());
myConn.close(); myConn.close();
break; break;
} }
HTTP_S.Clean(); HTTP_S.Clean();
HTTP_S.protocol = "RTSP/1.0"; HTTP_S.protocol = "RTSP/1.0";
//set the streamname and session // set the streamname and session
if (!source.size()){ if (!source.size()){
std::string source = HTTP_R.url.substr(7); std::string source = HTTP_R.url.substr(7);
unsigned int loc = std::min(source.find(':'),source.find('/')); unsigned int loc = std::min(source.find(':'), source.find('/'));
source = source.substr(0,loc); source = source.substr(0, loc);
} }
size_t found = HTTP_R.url.find('/', 7); size_t found = HTTP_R.url.find('/', 7);
if (!streamName.size()){ if (!streamName.size()){
@ -172,30 +174,29 @@ namespace Mist {
Util::sanitizeName(streamName); Util::sanitizeName(streamName);
} }
if (streamName.size()){ if (streamName.size()){
HTTP_S.SetHeader("Session", Secure::md5(HTTP_S.GetHeader("User-Agent") + getConnectedHost()) + "_" + streamName); HTTP_S.SetHeader("Session",
Secure::md5(HTTP_S.GetHeader("User-Agent") + getConnectedHost()) + "_" +
streamName);
} }
//set the date // set the date
time_t timer; time_t timer;
time(&timer); time(&timer);
struct tm * timeNow = gmtime(&timer); struct tm *timeNow = gmtime(&timer);
char dString[42]; char dString[42];
strftime(dString, 42, "%a, %d %h %Y, %X GMT", timeNow); strftime(dString, 42, "%a, %d %h %Y, %X GMT", timeNow);
HTTP_S.SetHeader("Date", dString); HTTP_S.SetHeader("Date", dString);
//set the sequence number to match the received sequence number // set the sequence number to match the received sequence number
if (HTTP_R.hasHeader("CSeq")){ if (HTTP_R.hasHeader("CSeq")){HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("CSeq"));}
HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("CSeq")); if (HTTP_R.hasHeader("Cseq")){HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("Cseq"));}
}
if (HTTP_R.hasHeader("Cseq")){
HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("Cseq"));
}
INFO_MSG("Handling %s", HTTP_R.method.c_str()); INFO_MSG("Handling %s", HTTP_R.method.c_str());
//handle the request // handle the request
if (HTTP_R.method == "OPTIONS"){ if (HTTP_R.method == "OPTIONS"){
HTTP_S.SetHeader("Public", "SETUP, TEARDOWN, PLAY, PAUSE, DESCRIBE, GET_PARAMETER, ANNOUNCE, RECORD"); HTTP_S.SetHeader("Public",
"SETUP, TEARDOWN, PLAY, PAUSE, DESCRIBE, GET_PARAMETER, ANNOUNCE, RECORD");
HTTP_S.SendResponse("200", "OK", myConn); HTTP_S.SendResponse("200", "OK", myConn);
HTTP_R.Clean(); HTTP_R.Clean();
continue; continue;
@ -210,11 +211,16 @@ namespace Mist {
selectedTracks.clear(); selectedTracks.clear();
std::stringstream transportString; std::stringstream transportString;
transportString << "v=0\r\n" transportString << "v=0\r\n"
"o=- " << Util::getMS() << " 1 IN IP4 127.0.0.1\r\n" "o=- "
"s=" << streamName << "\r\n" << Util::getMS() << " 1 IN IP4 127.0.0.1\r\n"
"s="
<< streamName << "\r\n"
"c=IN IP4 0.0.0.0\r\n" "c=IN IP4 0.0.0.0\r\n"
"i=" << streamName << "\r\n" "i="
"u=" << HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName << "\r\n" << streamName << "\r\n"
"u="
<< HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName
<< "\r\n"
"t=0 0\r\n" "t=0 0\r\n"
"a=tool:MistServer\r\n" "a=tool:MistServer\r\n"
"a=type:broadcast\r\n" "a=type:broadcast\r\n"
@ -222,15 +228,18 @@ namespace Mist {
if (myMeta.live){ if (myMeta.live){
transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-\r\n"; transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-\r\n";
}else{ }else{
transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-" << ((double)endTime()) / 1000.0 << "\r\n"; transportString << "a=range:npt=" << ((double)startTime()) / 1000.0 << "-"
<< ((double)endTime()) / 1000.0 << "\r\n";
} }
for (std::map<unsigned int, DTSC::Track>::iterator objIt = myMeta.tracks.begin(); objIt != myMeta.tracks.end(); ++objIt) { for (std::map<unsigned int, DTSC::Track>::iterator objIt = myMeta.tracks.begin();
transportString << tracks[objIt->first].mediaDescription(objIt->second); objIt != myMeta.tracks.end(); ++objIt){
transportString << SDP::mediaDescription(objIt->second);
} }
transportString << "\r\n"; transportString << "\r\n";
HIGH_MSG("Reply: %s", transportString.str().c_str()); HIGH_MSG("Reply: %s", transportString.str().c_str());
HTTP_S.SetHeader("Content-Base", HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) + "/" + streamName); HTTP_S.SetHeader("Content-Base",
HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) + "/" + streamName);
HTTP_S.SetHeader("Content-Type", "application/sdp"); HTTP_S.SetHeader("Content-Type", "application/sdp");
HTTP_S.SetBody(transportString.str()); HTTP_S.SetBody(transportString.str());
HTTP_S.SendResponse("200", "OK", myConn); HTTP_S.SendResponse("200", "OK", myConn);
@ -238,76 +247,52 @@ namespace Mist {
continue; continue;
} }
if (HTTP_R.method == "SETUP"){ if (HTTP_R.method == "SETUP"){
size_t trackPos = HTTP_R.url.rfind("/track"); uint32_t trackNo = sdpState.parseSetup(HTTP_R, getConnectedHost(), source);
if (trackPos != std::string::npos){
unsigned int trId = atol(HTTP_R.url.substr(trackPos + 6).c_str());
if (myMeta.tracks.count(trId)){
if (tracks[trId].parseTransport(HTTP_R.GetHeader("Transport"), getConnectedHost(), source, myMeta.tracks[trId])){
selectedTracks.insert(trId);
HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date"));
HTTP_S.SetHeader("Transport", tracks[trId].transportString);
HTTP_S.SetHeader("Cache-Control", "no-cache"); HTTP_S.SetHeader("Cache-Control", "no-cache");
if (trackNo){
selectedTracks.insert(trackNo);
SDP::Track &sdpTrack = sdpState.tracks[trackNo];
if (sdpTrack.channel != -1){expectTCP = true;}
HTTP_S.SetHeader("Transport", sdpTrack.transportString);
HTTP_S.SendResponse("200", "OK", myConn); HTTP_S.SendResponse("200", "OK", myConn);
}else{ INFO_MSG("Setup completed for track %lu (%s): %s", trackNo,
HTTP_S.SendResponse("404", "Track not available", myConn); myMeta.tracks[trackNo].codec.c_str(), sdpTrack.transportString.c_str());
}
HTTP_R.Clean();
continue;
}
}
//might be push setup - check known control points
if (pushing && tracks.size()){
bool setupHandled = false;
for (std::map<int, RTPTrack>::iterator it = tracks.begin(); it != tracks.end(); ++it){
if (it->second.control.size() && (HTTP_R.url.find(it->second.control) != std::string::npos || HTTP_R.GetVar("pass").find(it->second.control) != std::string::npos)){
if (it->second.parseTransport(HTTP_R.GetHeader("Transport"), getConnectedHost(), source, myMeta.tracks[it->first])){
if (it->second.channel != -1){
expectTCP = true;
}
HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date"));
HTTP_S.SetHeader("Transport", it->second.transportString);
HTTP_S.SetHeader("Cache-Control", "no-cache");
HTTP_S.SendResponse("200", "OK", myConn);
INFO_MSG("Setup completed for track %llu (%s): %s", it->first, myMeta.tracks[it->first].codec.c_str(), it->second.transportString.c_str());
}else{ }else{
HTTP_S.SendResponse("404", "Track not known or allowed", myConn); HTTP_S.SendResponse("404", "Track not known or allowed", myConn);
FAIL_MSG("Could not handle setup for %s", HTTP_R.url.c_str());
} }
setupHandled = true;
HTTP_R.Clean(); HTTP_R.Clean();
break;
}
}
if (!setupHandled){
HTTP_S.SendResponse("404", "Track not known", myConn);
HTTP_R.Clean();
}
continue; continue;
} }
FAIL_MSG("Could not handle setup: pushing=%s, trackSize=%u", pushing?"true":"false", tracks.size());
}
if (HTTP_R.method == "PLAY"){ if (HTTP_R.method == "PLAY"){
initialSeek(); initialSeek();
std::string range = HTTP_R.GetHeader("Range"); std::string range = HTTP_R.GetHeader("Range");
if (range != ""){ if (range != ""){
range = range.substr(range.find("npt=")+4); range = range.substr(range.find("npt=") + 4);
if (!range.empty()) { if (!range.empty()){
range = range.substr(0, range.find('-')); range = range.substr(0, range.find('-'));
uint64_t targetPos = 1000*atof(range.c_str()); uint64_t targetPos = 1000 * atof(range.c_str());
if (targetPos || myMeta.vod){seek(targetPos);} if (targetPos || myMeta.vod){seek(targetPos);}
} }
} }
std::stringstream rangeStr; std::stringstream rangeStr;
if (myMeta.live){ if (myMeta.live){
rangeStr << "npt=" << currentTime()/1000 << "." << std::setw(3) << std::setfill('0') << currentTime()%1000 << "-"; rangeStr << "npt=" << currentTime() / 1000 << "." << std::setw(3) << std::setfill('0')
<< currentTime() % 1000 << "-";
}else{ }else{
rangeStr << "npt=" << currentTime()/1000 << "." << std::setw(3) << std::setfill('0') << currentTime()%1000 << "-" << std::setw(1) << endTime()/1000 << "." << std::setw(3) << std::setfill('0') << endTime()%1000; rangeStr << "npt=" << currentTime() / 1000 << "." << std::setw(3) << std::setfill('0')
<< currentTime() % 1000 << "-" << std::setw(1) << endTime() / 1000 << "."
<< std::setw(3) << std::setfill('0') << endTime() % 1000;
} }
HTTP_S.SetHeader("Range", rangeStr.str()); HTTP_S.SetHeader("Range", rangeStr.str());
std::stringstream infoString; std::stringstream infoString;
if (selectedTracks.size()){ if (selectedTracks.size()){
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ for (std::set<unsigned long>::iterator it = selectedTracks.begin();
it != selectedTracks.end(); ++it){
if (!infoString.str().empty()){infoString << ",";} if (!infoString.str().empty()){infoString << ",";}
infoString << tracks[*it].rtpInfo(myMeta.tracks[*it], source+"/"+streamName, currentTime()); infoString << sdpState.tracks[*it].rtpInfo(myMeta.tracks[*it],
source + "/" + streamName, currentTime());
} }
} }
HTTP_S.SetHeader("RTP-Info", infoString.str()); HTTP_S.SetHeader("RTP-Info", infoString.str());
@ -319,14 +304,15 @@ namespace Mist {
if (HTTP_R.method == "PAUSE"){ if (HTTP_R.method == "PAUSE"){
HTTP_S.SendResponse("200", "OK", myConn); HTTP_S.SendResponse("200", "OK", myConn);
std::string range = HTTP_R.GetHeader("Range"); std::string range = HTTP_R.GetHeader("Range");
if (!range.empty()){ if (!range.empty()){range = range.substr(range.find("npt=") + 4);}
range = range.substr(range.find("npt=")+4);
}
if (range.empty()){ if (range.empty()){
stop(); stop();
}else{ }else{
pausepoint = 1000 * (int) atof(range.c_str()); pausepoint = 1000 * (int)atof(range.c_str());
if (pausepoint > currentTime()){pausepoint = 0; stop();} if (pausepoint > currentTime()){
pausepoint = 0;
stop();
}
} }
HTTP_R.Clean(); HTTP_R.Clean();
continue; continue;
@ -343,7 +329,7 @@ namespace Mist {
return; return;
} }
INFO_MSG("Pushing to stream %s", streamName.c_str()); INFO_MSG("Pushing to stream %s", streamName.c_str());
parseSDP(HTTP_R.body); sdpState.parseSDP(HTTP_R.body);
HTTP_S.SendResponse("200", "OK", myConn); HTTP_S.SendResponse("200", "OK", myConn);
HTTP_R.Clean(); HTTP_R.Clean();
continue; continue;
@ -360,519 +346,84 @@ namespace Mist {
/// Disconnects the user /// Disconnects the user
bool OutRTSP::onFinish(){ bool OutRTSP::onFinish(){
if (myConn){ if (myConn){myConn.close();}
myConn.close();
}
return false; return false;
} }
/// Attempts to parse TCP RTP packets at the beginning of the header. /// Attempts to parse TCP RTP packets at the beginning of the header.
/// Returns whether it is safe to attempt to read HTTP/RTSP packets (true) or not (false). /// Returns whether it is safe to attempt to read HTTP/RTSP packets (true) or not (false).
bool OutRTSP::handleTCP(){ bool OutRTSP::handleTCP(){
if (!myConn.Received().size() || !myConn.Received().available(1)){return false;}//no data if (!myConn.Received().size() || !myConn.Received().available(1)){return false;}// no data
if (myConn.Received().copy(1) != "$"){return true;}//not a TCP RTP packet if (myConn.Received().copy(1) != "$"){return true;}// not a TCP RTP packet
if (!myConn.Received().available(4)){return false;}//a TCP RTP packet, but not complete yet if (!myConn.Received().available(4)){return false;}// a TCP RTP packet, but not complete yet
//We have a TCP packet! Read it... // We have a TCP packet! Read it...
//Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data // Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data
std::string tcpHead = myConn.Received().copy(4); std::string tcpHead = myConn.Received().copy(4);
uint16_t len = ntohs(*(short*)(tcpHead.data()+2)); uint16_t len = ntohs(*(short *)(tcpHead.data() + 2));
if (!myConn.Received().available(len+4)){return false;}//a TCP RTP packet, but not complete yet if (!myConn.Received().available(len + 4)){
//remove whole packet from buffer, including 4 byte header return false;
std::string tcpPacket = myConn.Received().remove(len+4); }// a TCP RTP packet, but not complete yet
for (std::map<int, RTPTrack>::iterator it = tracks.begin(); it != tracks.end(); ++it){ // remove whole packet from buffer, including 4 byte header
if (tcpHead.data()[1] == it->second.channel){ std::string tcpPacket = myConn.Received().remove(len + 4);
RTP::Packet pkt(tcpPacket.data()+4, len); uint32_t trackNo = sdpState.getTrackNoForChannel(tcpHead.data()[1]);
it->second.rtpSeq = pkt.getSequence(); if (trackNo && isPushing()){
handleIncomingRTP(it->first, pkt); RTP::Packet pkt(tcpPacket.data() + 4, len);
break; sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();
sdpState.handleIncomingRTP(trackNo, pkt);
} }
} // attempt to read more packets
//attempt to read more packets
return handleTCP(); return handleTCP();
} }
/// Reads and handles RTP packets over UDP, if needed /// Reads and handles RTP packets over UDP, if needed
void OutRTSP::handleUDP(){ void OutRTSP::handleUDP(){
for (std::map<int, RTPTrack>::iterator it = tracks.begin(); it != tracks.end(); ++it){ if (!isPushing()){return;}
Socket::UDPConnection & s = it->second.data; for (std::map<uint32_t, SDP::Track>::iterator it = sdpState.tracks.begin();
it != sdpState.tracks.end(); ++it){
Socket::UDPConnection &s = it->second.data;
while (s.Receive()){ while (s.Receive()){
if (s.getDestPort() != it->second.cPort){ if (s.getDestPort() != it->second.cPort){
//wrong sending port, ignore packet // wrong sending port, ignore packet
continue; continue;
} }
lastRecv = Util::epoch();//prevent disconnect of idle TCP connection when using UDP lastRecv = Util::epoch(); // prevent disconnect of idle TCP connection when using UDP
myConn.addDown(s.data_len); myConn.addDown(s.data_len);
RTP::Packet pack(s.data, s.data_len); RTP::Packet pack(s.data, s.data_len);
if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();} if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();}
//packet is very early - assume dropped after 10 packets // packet is very early - assume dropped after 10 packets
while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -10){ while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -10){
WARN_MSG("Giving up on packet %u", it->second.rtpSeq); WARN_MSG("Giving up on packet %u", it->second.rtpSeq);
++(it->second.rtpSeq); ++(it->second.rtpSeq);
//send any buffered packets we may have // send any buffered packets we may have
while (it->second.packBuffer.count(it->second.rtpSeq)){ while (it->second.packBuffer.count(it->second.rtpSeq)){
handleIncomingRTP(it->first, pack); sdpState.handleIncomingRTP(it->first, pack);
++(it->second.rtpSeq); ++(it->second.rtpSeq);
} }
} }
//send any buffered packets we may have // send any buffered packets we may have
while (it->second.packBuffer.count(it->second.rtpSeq)){ while (it->second.packBuffer.count(it->second.rtpSeq)){
handleIncomingRTP(it->first, pack); sdpState.handleIncomingRTP(it->first, pack);
++(it->second.rtpSeq); ++(it->second.rtpSeq);
} }
//packet is slightly early - buffer it // packet is slightly early - buffer it
if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){ if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){
INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence()); INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence());
it->second.packBuffer[pack.getSequence()] = pack; it->second.packBuffer[pack.getSequence()] = pack;
} }
//packet is late // packet is late
if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){ if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){
//negative difference? // negative difference?
WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)",
(int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())));
return; return;
} }
//packet is in order // packet is in order
if (it->second.rtpSeq == pack.getSequence()){ if (it->second.rtpSeq == pack.getSequence()){
handleIncomingRTP(it->first, pack); sdpState.handleIncomingRTP(it->first, pack);
++(it->second.rtpSeq); ++(it->second.rtpSeq);
} }
} }
} }
} }
/// Handles a single H264 packet, checking if others are appended at the end in Annex B format.
/// If so, splits them up and calls h264Packet for each. If not, calls it only once for the whole payload.
void OutRTSP::h264MultiParse(uint64_t ts, const uint64_t track, char * buffer, const uint32_t len){
uint32_t lastStart = 0;
for (uint32_t i = 0; i < len-4; ++i){
//search for start code
if (buffer[i] == 0 && buffer[i+1] == 0 && buffer[i+2] == 0 && buffer[i+3] == 1){
//if found, handle a packet from the last start code up to this start code
Bit::htobl(buffer+lastStart, (i-lastStart-1)-4);//size-prepend
h264Packet(ts, track, buffer+lastStart, (i-lastStart-1), h264::isKeyframe(buffer+lastStart+4, i-lastStart-5));
lastStart = i;
}
}
//Last packet (might be first, if no start codes found)
Bit::htobl(buffer+lastStart, (len-lastStart)-4);//size-prepend
h264Packet(ts, track, buffer+lastStart, (len-lastStart), h264::isKeyframe(buffer+lastStart+4, len-lastStart-4));
}
void OutRTSP::h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, bool isKey){
//Ignore zero-length packets (e.g. only contained init data and nothing else)
if (!len){return;}
//Header data? Compare to init, set if needed, and throw away
uint8_t nalType = (buffer[4] & 0x1F);
switch (nalType){
case 7: //SPS
if (tracks[track].spsData.size() != len-4 || memcmp(buffer+4, tracks[track].spsData.data(), len-4) != 0){
INFO_MSG("Updated SPS from RTP data");
tracks[track].spsData.assign(buffer+4, len-4);
updateH264Init(track);
}
return;
case 8: //PPS
if (tracks[track].ppsData.size() != len-4 || memcmp(buffer+4, tracks[track].ppsData.data(), len-4) != 0){
INFO_MSG("Updated PPS from RTP data");
tracks[track].ppsData.assign(buffer+4, len-4);
updateH264Init(track);
}
return;
default://others, continue parsing
break;
}
double fps = tracks[track].fpsMeta;
uint32_t offset = 0;
uint64_t newTs = ts;
if (fps > 1){
//Assume a steady frame rate, clip the timestamp based on frame number.
uint64_t frameNo = (ts / (1000.0/fps))+0.5;
while (frameNo < tracks[track].packCount){
tracks[track].packCount--;
}
//More than 32 frames behind? We probably skipped something, somewhere...
if ((frameNo-tracks[track].packCount) > 32){
tracks[track].packCount = frameNo;
}
//After some experimentation, we found that the time offset is the difference between the frame number and the packet counter, times the frame rate in ms
offset = (frameNo-tracks[track].packCount) * (1000.0/fps);
//... and the timestamp is the packet counter times the frame rate in ms.
newTs = tracks[track].packCount * (1000.0/fps);
VERYHIGH_MSG("Packing time %llu = %sframe %llu (%.2f FPS). Expected %llu -> +%llu/%lu", ts, isKey?"key":"i", frameNo, fps, tracks[track].packCount, (frameNo-tracks[track].packCount), offset);
}else{
//For non-steady frame rate, assume no offsets are used and the timestamp is already correct
VERYHIGH_MSG("Packing time %llu = %sframe %llu (variable rate)", ts, isKey?"key":"i", tracks[track].packCount);
}
//Fill the new DTSC packet, buffer it.
DTSC::Packet nextPack;
nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey);
tracks[track].packCount++;
bufferLivePacket(nextPack);
}
/// Handles RTP packets generically, for both TCP and UDP-based connections.
/// In case of UDP, expects packets to be pre-sorted.
void OutRTSP::handleIncomingRTP(const uint64_t track, const RTP::Packet & pkt){
if (!tracks[track].firstTime){
tracks[track].firstTime = pkt.getTimeStamp() + 1;
}
if (myMeta.tracks[track].codec == "ALAW" || myMeta.tracks[track].codec == "opus" || myMeta.tracks[track].codec == "MP3" || myMeta.tracks[track].codec == "PCM"){
char * pl = pkt.getPayload();
DTSC::Packet nextPack;
nextPack.genericFill((pkt.getTimeStamp() - tracks[track].firstTime + 1) / ((double)myMeta.tracks[track].rate / 1000.0), 0, track, pl, pkt.getPayloadSize(), 0, false);
bufferLivePacket(nextPack);
return;
}
if (myMeta.tracks[track].codec == "AAC"){
//assume AAC packets are single AU units
/// \todo Support other input than single AU units
char * pl = pkt.getPayload();
unsigned int headLen = (Bit::btohs(pl) >> 3) + 2;//in bits, so /8, plus two for the prepended size
DTSC::Packet nextPack;
uint16_t samples = aac::AudSpecConf::samples(myMeta.tracks[track].init);
uint32_t sampleOffset = 0;
uint32_t offset = 0;
uint32_t auSize = 0;
for (uint32_t i = 2; i < headLen; i += 2){
auSize = Bit::btohs(pl+i) >> 3;//only the upper 13 bits
nextPack.genericFill((pkt.getTimeStamp() + sampleOffset - tracks[track].firstTime + 1) / ((double)myMeta.tracks[track].rate / 1000.0), 0, track, pl+headLen+offset, std::min(auSize, pkt.getPayloadSize() - headLen - offset), 0, false);
offset += auSize;
sampleOffset += samples;
bufferLivePacket(nextPack);
}
return;
}
if (myMeta.tracks[track].codec == "H264"){
//Handles common H264 packets types, but not all.
//Generalizes and converts them all to a data format ready for DTSC, then calls h264Packet for that data.
//Prints a WARN-level message if packet type is unsupported.
/// \todo Support other H264 packets types?
char * pl = pkt.getPayload();
if (!pkt.getPayloadSize()){
WARN_MSG("Empty packet ignored!");
return;
}
if ((pl[0] & 0x1F) == 0){
WARN_MSG("H264 packet type null ignored");
return;
}
if ((pl[0] & 0x1F) < 24){
DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F));
static char * packBuffer = 0;
static unsigned long packBufferSize = 0;
unsigned long len = pkt.getPayloadSize();
if (packBufferSize < len+4){
char * tmp = (char*)realloc(packBuffer, len+4);
if (tmp){
packBuffer = tmp;
packBufferSize = len+4;
}else{
free(packBuffer);
packBufferSize = 0;
packBuffer = 0;
FAIL_MSG("Failed to allocate memory for H264 packet");
return;
}
}
Bit::htobl(packBuffer, len);//size-prepend
memcpy(packBuffer+4, pl, len);
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len+4, h264::isKeyframe(packBuffer+4, len));
return;
}
if ((pl[0] & 0x1F) == 24){
DONTEVEN_MSG("H264 STAP-A packet");
unsigned int len = 0;
unsigned int pos = 1;
while (pos + 1 < pkt.getPayloadSize()){
unsigned int pLen = Bit::btohs(pl+pos);
INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos+2] & 0x1F));
pos += 2+pLen;
len += 4+pLen;
}
static char * packBuffer = 0;
static unsigned long packBufferSize = 0;
if (packBufferSize < len){
char * tmp = (char*)realloc(packBuffer, len);
if (tmp){
packBuffer = tmp;
packBufferSize = len;
}else{
free(packBuffer);
packBufferSize = 0;
packBuffer = 0;
FAIL_MSG("Failed to allocate memory for H264 STAP-A packet");
return;
}
}
pos = 1;
len = 0;
bool isKey = false;
while (pos + 1 < pkt.getPayloadSize()){
unsigned int pLen = Bit::btohs(pl+pos);
isKey |= h264::isKeyframe(pl+pos+2, pLen);
Bit::htobl(packBuffer+len, pLen);//size-prepend
memcpy(packBuffer+len+4, pl+pos+2, pLen);
len += 4+pLen;
pos += 2+pLen;
}
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, packBuffer, len, isKey);
return;
}
if ((pl[0] & 0x1F) == 28){
DONTEVEN_MSG("H264 FU-A packet");
static char * fuaBuffer = 0;
static unsigned long fuaBufferSize = 0;
static unsigned long fuaCurrLen = 0;
//No length yet? Check for start bit. Ignore rest.
if (!fuaCurrLen && (pkt.getPayload()[1] & 0x80) == 0){
HIGH_MSG("Not start of a new FU-A - throwing away");
return;
}
if (fuaCurrLen && ((pkt.getPayload()[1] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){
WARN_MSG("Ending unfinished FU-A");
INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaCurrLen);
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
//attempt to detect multiple H264 packets, even though specs disallow it
h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen);
}else{
Bit::htobl(fuaBuffer, fuaCurrLen-4);//size-prepend
fuaBuffer[4] |= 0x80;//set error bit
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen, h264::isKeyframe(fuaBuffer+4, fuaCurrLen-4));
}
fuaCurrLen = 0;
return;
}
unsigned long len = pkt.getPayloadSize() - 2;//ignore the two FU-A bytes in front
if (!fuaCurrLen){len += 5;}//five extra bytes for the first packet
if (fuaBufferSize < fuaCurrLen + len){
char * tmp = (char*)realloc(fuaBuffer, fuaCurrLen + len);
if (tmp){
fuaBuffer = tmp;
fuaBufferSize = fuaCurrLen + len;
}else{
free(fuaBuffer);
fuaBufferSize = 0;
fuaBuffer = 0;
FAIL_MSG("Failed to allocate memory for H264 FU-A packet");
return;
}
}
if (fuaCurrLen == 0){
memcpy(fuaBuffer+4, pkt.getPayload()+1, pkt.getPayloadSize()-1);
//reconstruct first byte
fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pkt.getPayload()[0] & 0xE0);
}else{
memcpy(fuaBuffer+fuaCurrLen, pkt.getPayload()+2, pkt.getPayloadSize()-2);
}
fuaCurrLen += len;
if (pkt.getPayload()[1] & 0x40){//last packet
INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F), fuaCurrLen);
uint8_t nalType = (fuaBuffer[4] & 0x1F);
if (nalType == 7 || nalType == 8){
//attempt to detect multiple H264 packets, even though specs disallow it
h264MultiParse((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen);
}else{
Bit::htobl(fuaBuffer, fuaCurrLen-4);//size-prepend
h264Packet((pkt.getTimeStamp() - tracks[track].firstTime + 1) / 90, track, fuaBuffer, fuaCurrLen, h264::isKeyframe(fuaBuffer+4, fuaCurrLen-4));
}
fuaCurrLen = 0;
}
return;
}
WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F));
return;
}
}
/// Calculates H264 track metadata from sps and pps data stored in tracks[trackNo]
void OutRTSP::updateH264Init(uint64_t trackNo){
DTSC::Track & Trk = myMeta.tracks[trackNo];
RTPTrack & RTrk = tracks[trackNo];
h264::sequenceParameterSet sps(RTrk.spsData.data(), RTrk.spsData.size());
h264::SPSMeta hMeta = sps.getCharacteristics();
MP4::AVCC avccBox;
avccBox.setVersion(1);
avccBox.setProfile(RTrk.spsData[1]);
avccBox.setCompatibleProfiles(RTrk.spsData[2]);
avccBox.setLevel(RTrk.spsData[3]);
avccBox.setSPSNumber(1);
avccBox.setSPS(RTrk.spsData);
avccBox.setPPSNumber(1);
avccBox.setPPS(RTrk.ppsData);
RTrk.fpsMeta = hMeta.fps;
Trk.width = hMeta.width;
Trk.height = hMeta.height;
Trk.fpks = hMeta.fps * 1000;
Trk.init = std::string(avccBox.payload(), avccBox.payloadSize());
}
void OutRTSP::parseSDP(const std::string & sdp){
std::stringstream ss(sdp);
std::string to;
uint64_t trackNo = 0;
bool nope = true; //true if we have no valid track to fill
DTSC::Track * thisTrack = 0;
while(std::getline(ss,to,'\n')){
if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size()-1, 1);}
// All tracks start with a media line
if (to.substr(0,2) == "m="){
nope = true;
++trackNo;
thisTrack = &(myMeta.tracks[trackNo]);
std::stringstream words(to.substr(2));
std::string item;
if (getline(words, item, ' ') && (item == "audio" || item == "video")){
thisTrack->type = item;
thisTrack->trackID = trackNo;
}else{
WARN_MSG("Media type not supported: %s", item.c_str());
continue;
}
getline(words, item, ' ');
if (!getline(words, item, ' ') || item != "RTP/AVP"){
WARN_MSG("Media transport not supported: %s", item.c_str());
continue;
}
if (getline(words, item, ' ')){
uint64_t avp_type = JSON::Value(item).asInt();
switch (avp_type){
case 8: //PCM A-law
INFO_MSG("PCM A-law payload type");
nope = false;
thisTrack->codec = "ALAW";
thisTrack->rate = 8000;
thisTrack->channels = 1;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
case 10: //PCM Stereo, 44.1kHz
INFO_MSG("Linear PCM stereo 44.1kHz payload type");
nope = false;
thisTrack->codec = "PCM";
thisTrack->size = 16;
thisTrack->rate = 44100;
thisTrack->channels = 2;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
case 11: //PCM Mono, 44.1kHz
INFO_MSG("Linear PCM mono 44.1kHz payload type");
nope = false;
thisTrack->codec = "PCM";
thisTrack->rate = 44100;
thisTrack->size = 16;
thisTrack->channels = 1;
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
break;
default:
//dynamic type
if (avp_type >= 96 && avp_type <= 127){
INFO_MSG("Dynamic payload type (%llu) detected", avp_type);
nope = false;
continue;
}else{
FAIL_MSG("Payload type %llu not supported!", avp_type);
continue;
}
}
}
continue;
}
if (nope){continue;}//ignore lines if we have no valid track
// RTP mapping
if (to.substr(0, 8) == "a=rtpmap"){
std::string mediaType = to.substr(to.find(' ', 8)+1);
std::string trCodec = mediaType.substr(0, mediaType.find('/'));
//convert to fullcaps
for(unsigned int i=0;i<trCodec.size();++i){
if(trCodec[i]<=122 && trCodec[i]>=97){trCodec[i]-=32;}
}
if (thisTrack->type == "audio"){
std::string extraInfo = mediaType.substr(mediaType.find('/')+1);
if (extraInfo.find('/') != std::string::npos){
size_t lastSlash = extraInfo.find('/');
thisTrack->rate = atoll(extraInfo.substr(0, lastSlash).c_str());
thisTrack->channels = atoll(extraInfo.substr(lastSlash+1).c_str());
}else{
thisTrack->rate = atoll(extraInfo.c_str());
thisTrack->channels = 1;
}
}
if (trCodec == "H264"){thisTrack->codec = "H264";}
if (trCodec == "OPUS"){
thisTrack->codec = "opus";
thisTrack->init = std::string("OpusHead\001\002\170\000\200\273\000\000\000\000\000", 19);
}
if (trCodec == "PCMA"){thisTrack->codec = "ALAW";}
if (trCodec == "L8"){
thisTrack->codec = "PCM";
thisTrack->size = 8;
}
if (trCodec == "L16"){
thisTrack->codec = "PCM";
thisTrack->size = 16;
}
if (trCodec == "L20"){
thisTrack->codec = "PCM";
thisTrack->size = 20;
}
if (trCodec == "L24"){
thisTrack->codec = "PCM";
thisTrack->size = 24;
}
if (trCodec == "MPEG4-GENERIC"){thisTrack->codec = "AAC";}
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
continue;
}
if (to.substr(0, 10) == "a=control:"){
tracks[trackNo].control = to.substr(10);
continue;
}
if (to.substr(0, 7) == "a=fmtp:"){
tracks[trackNo].fmtp = to.substr(7);
if (thisTrack->codec == "AAC"){
if (tracks[trackNo].getParamString("mode") != "AAC-hbr"){
//a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=120856E500
FAIL_MSG("AAC transport mode not supported: %s", tracks[trackNo].getParamString("mode").c_str());
nope = true;
myMeta.tracks.erase(trackNo);
tracks.erase(trackNo);
continue;
}
thisTrack->init = Encodings::Hex::decode(tracks[trackNo].getParamString("config"));
//myMeta.tracks[trackNo].rate = aac::AudSpecConf::rate(myMeta.tracks[trackNo].init);
}
if (thisTrack->codec == "H264"){
//a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z0LAHtkA2D3m//AUABqxAAADAAEAAAMAMg8WLkg=,aMuDyyA=; profile-level-id=42C01E
std::string sprop = tracks[trackNo].getParamString("sprop-parameter-sets");
size_t comma = sprop.find(',');
tracks[trackNo].spsData = Encodings::Base64::decode(sprop.substr(0,comma));
tracks[trackNo].ppsData = Encodings::Base64::decode(sprop.substr(comma+1));
updateH264Init(trackNo);
}
continue;
}
// We ignore bandwidth lines
if (to.substr(0,2) == "b="){
continue;
}
//we ignore everything before the first media line.
if (!trackNo){
continue;
}
//at this point, the data is definitely for a track
INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str());
}
}
} }

View file

@ -1,214 +1,36 @@
#pragma once #pragma once
#include "output.h" #include "output.h"
#include <mist/socket.h>
#include <mist/rtp.h>
#include <mist/http_parser.h>
#include <mist/encode.h>
#include <mist/h264.h> #include <mist/h264.h>
#include <mist/http_parser.h>
#include <mist/rtp.h>
#include <mist/sdp.h>
#include <mist/socket.h>
namespace Mist { namespace Mist{
///Structure used to keep track of selected tracks. class OutRTSP : public Output{
class RTPTrack {
public: public:
Socket::UDPConnection data; OutRTSP(Socket::Connection &myConn);
Socket::UDPConnection rtcp; static void init(Util::Config *cfg);
RTP::Packet pack;
long long rtcpSent;
uint64_t firstTime;
int channel;/// Channel number, used in TCP sending
uint64_t packCount;
uint16_t rtpSeq;
std::map<uint16_t, RTP::Packet> packBuffer;
uint32_t cPort;
std::string transportString;
std::string control;
std::string fmtp;
std::string spsData;
std::string ppsData;
uint64_t fpsTime;
double fpsMeta;
double fps;
RTPTrack(){
rtcpSent = 0;
channel = -1;
firstTime = 0;
packCount = 0;
cPort = 0;
rtpSeq = 0;
fpsTime = 0;
fpsMeta = 0;
fps = 0;
}
std::string getParamString(const std::string & param) const{
if (!fmtp.size()){return "";}
size_t pos = fmtp.find(param);
if (pos == std::string::npos){return "";}
pos += param.size()+1;
size_t ePos = fmtp.find_first_of(" ;", pos);
return fmtp.substr(pos, ePos-pos);
}
uint64_t getParamInt(const std::string & param) const{
return atoll(getParamString(param).c_str());
}
std::string mediaDescription(const DTSC::Track & trk){
std::stringstream mediaDesc;
if (trk.codec == "H264") {
MP4::AVCC avccbox;
avccbox.setPayload(trk.init);
mediaDesc << "m=video 0 RTP/AVP 97\r\n"
"a=rtpmap:97 H264/90000\r\n"
"a=cliprect:0,0," << trk.height << "," << trk.width << "\r\n"
"a=framesize:97 " << trk.width << '-' << trk.height << "\r\n"
"a=fmtp:97 packetization-mode=1;profile-level-id="
<< std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[1] << std::dec << "E0"
<< std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[3] << std::dec << ";"
"sprop-parameter-sets="
<< Encodings::Base64::encode(std::string(avccbox.getSPS(), avccbox.getSPSLen()))
<< ","
<< Encodings::Base64::encode(std::string(avccbox.getPPS(), avccbox.getPPSLen()))
<< "\r\n"
"a=framerate:" << ((double)trk.fpks)/1000.0 << "\r\n"
"a=control:track" << trk.trackID << "\r\n";
} else if (trk.codec == "AAC") {
mediaDesc << "m=audio 0 RTP/AVP 96" << "\r\n"
"a=rtpmap:96 mpeg4-generic/" << trk.rate << "/" << trk.channels << "\r\n"
"a=fmtp:96 streamtype=5; profile-level-id=15; config=";
for (unsigned int i = 0; i < trk.init.size(); i++) {
mediaDesc << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init[i] << std::dec;
}
//these values are described in RFC 3640
mediaDesc << "; mode=AAC-hbr; SizeLength=13; IndexLength=3; IndexDeltaLength=3;\r\n"
"a=control:track" << trk.trackID << "\r\n";
}else if (trk.codec == "MP3") {
mediaDesc << "m=" << trk.type << " 0 RTP/AVP 14" << "\r\n"
"a=rtpmap:14 MPA/90000/" << trk.channels << "\r\n"
"a=control:track" << trk.trackID << "\r\n";
}else if ( trk.codec == "AC3") {
mediaDesc << "m=audio 0 RTP/AVP 100" << "\r\n"
"a=rtpmap:100 AC3/" << trk.rate << "/" << trk.channels << "\r\n"
"a=control:track" << trk.trackID << "\r\n";
}else if ( trk.codec == "ALAW") {
if (trk.channels == 1 && trk.rate == 8000){
mediaDesc << "m=audio 0 RTP/AVP 8" << "\r\n";
}else{
mediaDesc << "m=audio 0 RTP/AVP 101" << "\r\n";
mediaDesc << "a=rtpmap:101 PCMA/" << trk.rate << "/" << trk.channels << "\r\n";
}
mediaDesc << "a=control:track" << trk.trackID << "\r\n";
}else if ( trk.codec == "PCM") {
if (trk.size == 16 && trk.channels == 2 && trk.rate == 44100){
mediaDesc << "m=audio 0 RTP/AVP 10" << "\r\n";
} else if (trk.size == 16 && trk.channels == 1 && trk.rate == 44100){
mediaDesc << "m=audio 0 RTP/AVP 11" << "\r\n";
}else{
mediaDesc << "m=audio 0 RTP/AVP 103" << "\r\n";
mediaDesc << "a=rtpmap:103 L" << trk.size << "/" << trk.rate << "/" << trk.channels << "\r\n";
}
mediaDesc << "a=control:track" << trk.trackID << "\r\n";
}else if ( trk.codec == "opus") {
mediaDesc << "m=audio 0 RTP/AVP 102" << "\r\n"
"a=rtpmap:102 opus/" << trk.rate << "/" << trk.channels << "\r\n"
"a=control:track" << trk.trackID << "\r\n";
}
return mediaDesc.str();
}
bool parseTransport(const std::string & transport, const std::string & host, const std::string & source, const DTSC::Track & trk){
unsigned int SSrc = rand();
if (trk.codec == "H264") {
pack = RTP::Packet(97, 1, 0, SSrc);
}else if(trk.codec == "AAC"){
pack = RTP::Packet(96, 1, 0, SSrc);
}else if(trk.codec == "AC3"){
pack = RTP::Packet(100, 1, 0, SSrc);
}else if(trk.codec == "MP3"){
pack = RTP::Packet(14, 1, 0, SSrc);
}else if(trk.codec == "ALAW"){
if (trk.channels == 1 && trk.rate == 8000){
pack = RTP::Packet(8, 1, 0, SSrc);
}else{
pack = RTP::Packet(101, 1, 0, SSrc);
}
}else if ( trk.codec == "PCM") {
if (trk.size == 16 && trk.channels == 2 && trk.rate == 44100){
pack = RTP::Packet(10, 1, 0, SSrc);
} else if (trk.size == 16 && trk.channels == 1 && trk.rate == 44100){
pack = RTP::Packet(11, 1, 0, SSrc);
}else{
pack = RTP::Packet(103, 1, 0, SSrc);
}
}else if(trk.codec == "opus"){
pack = RTP::Packet(102, 1, 0, SSrc);
}else{
ERROR_MSG("Unsupported codec %s for RTSP on track %u", trk.codec.c_str(), trk.trackID);
return false;
}
if (transport.find("TCP") != std::string::npos) {
std::string chanE = transport.substr(transport.find("interleaved=") + 12, (transport.size() - transport.rfind('-') - 1)); //extract channel ID
channel = atol(chanE.c_str());
rtcpSent = 0;
transportString = transport;
} else {
channel = -1;
size_t port_loc = transport.rfind("client_port=") + 12;
cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str());
uint32_t portA, portB;
//find available ports locally;
int sendbuff = 4*1024*1024;
data.SetDestination(host, cPort);
portA = data.bind(0);
setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
rtcp.SetDestination(host, cPort + 1);
portB = rtcp.bind(0);
setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
std::stringstream tStr;
tStr << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";";
if (source.size()){
tStr << "source=" << source << ";";
}
tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << SSrc << std::dec;
transportString = tStr.str();
INFO_MSG("Transport string: %s", transportString.c_str());
}
return true;
}
std::string rtpInfo(const DTSC::Track & trk, const std::string & source, uint64_t currentTime){
unsigned int timeMultiplier = 1;
timeMultiplier = ((double)trk.rate / 1000.0);
if (trk.codec == "H264") {
timeMultiplier = 90;
}
std::stringstream rInfo;
rInfo << "url=" << source << "/track" << trk.trackID << ";"; //get the current url, not localhost
rInfo << "sequence=" << pack.getSequence() << ";rtptime=" << currentTime * timeMultiplier;
return rInfo.str();
}
};
class OutRTSP : public Output {
public:
OutRTSP(Socket::Connection & myConn);
static void init(Util::Config * cfg);
void sendNext(); void sendNext();
void onRequest(); void onRequest();
void requestHandler(); void requestHandler();
bool onFinish(); bool onFinish();
void incomingPacket(const DTSC::Packet &pkt);
private: private:
void parseSDP(const std::string & sdp); long long connectedAt; ///< The timestamp the connection was made, as reference point for RTCP
long long connectedAt;///< The timestamp the connection was made, as reference point for RTCP packets. ///packets.
std::map<int, RTPTrack> tracks;///< List of selected tracks with RTSP-specific session data. unsigned int pausepoint; ///< Position to pause at, when reached
unsigned int pausepoint;///< Position to pause at, when reached SDP::State sdpState;
HTTP::Parser HTTP_R, HTTP_S; HTTP::Parser HTTP_R, HTTP_S;
std::string source; std::string source;
uint64_t lastTimeSync; uint64_t lastTimeSync;
bool expectTCP; bool expectTCP;
bool handleTCP(); bool handleTCP();
void handleUDP(); void handleUDP();
void handleIncomingRTP(const uint64_t track, const RTP::Packet & pkt);
void h264MultiParse(uint64_t ts, const uint64_t track, char * buffer, const uint32_t len);
void h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, bool isKey);
void updateH264Init(uint64_t trackNo);
}; };
} }
typedef Mist::OutRTSP mistOut; typedef Mist::OutRTSP mistOut;