diff --git a/lib/adts.h b/lib/adts.h index a40eba5f..64ac93b2 100644 --- a/lib/adts.h +++ b/lib/adts.h @@ -1,4 +1,5 @@ #include +#include "bitstream.h" namespace aac { class adts { @@ -26,4 +27,59 @@ namespace aac { char * data; unsigned long len; }; + + class AudSpecConf{ + public: + static inline uint32_t rate(const std::string & conf){ + Utils::bitstream bs; + bs.append(conf.data(), conf.size()); + if (bs.get(5) == 31){bs.skip(6);}//skip object type + switch (bs.get(4)){//frequency index + case 0: return 96000; + case 1: return 88200; + case 2: return 64000; + case 3: return 48000; + case 4: return 44100; + case 5: return 32000; + case 6: return 24000; + case 7: return 22050; + case 8: return 16000; + case 9: return 12000; + case 10: return 11025; + case 11: return 8000; + case 12: return 7350; + case 15: return bs.get(24); + default: return 0; + } + } + static inline uint16_t channels(const std::string & conf){ + Utils::bitstream bs; + bs.append(conf.data(), conf.size()); + if (bs.get(5) == 31){bs.skip(6);}//skip object type + if (bs.get(4) == 15){bs.skip(24);}//frequency index + return bs.get(4);//channel configuration + } + static inline uint8_t objtype(const std::string & conf){ + Utils::bitstream bs; + bs.append(conf.data(), conf.size()); + uint8_t ot = bs.get(5); + if (ot == 31){return bs.get(6)+32;} + return ot; + } + static inline uint16_t samples(const std::string & conf){ + Utils::bitstream bs; + bs.append(conf.data(), conf.size()); + if (bs.get(5) == 31){bs.skip(6);}//skip object type + if (bs.get(4) == 15){bs.skip(24);}//frequency index + bs.skip(4);//channel configuration + if (bs.get(1)){ + return 960; + }else{ + return 1024; + } + } + }; + + + } diff --git a/lib/rtp.cpp b/lib/rtp.cpp index c425dd02..8d403850 100644 --- a/lib/rtp.cpp +++ b/lib/rtp.cpp @@ -6,12 +6,20 @@ namespace RTP { double Packet::startRTCP = 0; - unsigned int MAX_SEND = 4*1024; + unsigned int MAX_SEND = 1500-28; unsigned int Packet::getHsize() const { return 12 + 4 * getContribCount(); } + unsigned int Packet::getPayloadSize() const { + return datalen - getHsize(); + } + + char * Packet::getPayload() const { + return data + getHsize(); + } + unsigned int Packet::getVersion() const { return (data[0] >> 6) & 0x3; } @@ -70,17 +78,17 @@ namespace RTP { void Packet::sendH264(void * socket, void callBack(void *, char *, unsigned int, unsigned int), const char * payload, unsigned int payloadlen, unsigned int channel) { /// \todo This function probably belongs in DMS somewhere. - if (payloadlen <= MAX_SEND) { + if (payloadlen+getHsize() <= MAX_SEND) { data[1] |= 0x80;//setting the RTP marker bit to 1 memcpy(data + getHsize(), payload, payloadlen); callBack(socket, data, getHsize() + payloadlen, channel); sentPackets++; - sentBytes += payloadlen; + sentBytes += payloadlen+getHsize(); increaseSequence(); } else { data[1] &= 0x7F;//setting the RTP marker bit to 0 unsigned int sent = 0; - unsigned int sending = MAX_SEND;//packages are of size MAX_SEND, except for the final one + unsigned int sending = MAX_SEND-getHsize()-2;//packages are of size MAX_SEND, except for the final one char initByte = (payload[0] & 0xE0) | 0x1C; char serByte = payload[0] & 0x1F; //ser is now 000 data[getHsize()] = initByte; @@ -90,17 +98,17 @@ namespace RTP { } else { serByte &= 0x7F;//set first bit to 0 } - if (sent + MAX_SEND >= payloadlen) { + if (sent + sending >= payloadlen) { //last package serByte |= 0x40; sending = payloadlen - sent; data[1] |= 0x80;//setting the RTP marker bit to 1 } data[getHsize() + 1] = serByte; - memcpy(data + getHsize() + 2, payload + 1 + sent, sending); //+1 because + memcpy(data + getHsize() + 2, payload + 1 + sent, sending); callBack(socket, data, getHsize() + 2 + sending, channel); sentPackets++; - sentBytes += sending; + sentBytes += sending+getHsize()+2; sent += sending; increaseSequence(); } @@ -128,19 +136,6 @@ namespace RTP { increaseSequence(); } -/// Stores a long long (64 bits) value of val in network order to the pointer p. - inline void Packet::htobll(char * p, long long val) { - p[0] = (val >> 56) & 0xFF; - p[1] = (val >> 48) & 0xFF; - p[2] = (val >> 40) & 0xFF; - p[3] = (val >> 32) & 0xFF; - p[4] = (val >> 24) & 0xFF; - p[5] = (val >> 16) & 0xFF; - p[6] = (val >> 8) & 0xFF; - p[7] = val & 0xFF; - } - - void Packet::sendRTCP(long long & connectedAt, void * socket, unsigned int tid , DTSC::Meta & metadata, void callBack(void *, char *, unsigned int, unsigned int)) { void * rtcpData = malloc(32); diff --git a/lib/rtp.h b/lib/rtp.h index 964446a7..eed8276e 100644 --- a/lib/rtp.h +++ b/lib/rtp.h @@ -28,10 +28,11 @@ namespace RTP { unsigned int datalen; /// #include #include +#include +#include +#include +#include #include "output_rtsp.h" +#include namespace Mist { OutRTSP::OutRTSP(Socket::Connection & myConn) : Output(myConn){ connectedAt = Util::epoch() + 2208988800ll; - seekpoint = 0; pausepoint = 0; setBlocking(false); maxSkipAhead = 0; minSkipAhead = 0; + expectTCP = false; + isPushing = false; } /// Function used to send RTP packets over UDP @@ -60,10 +66,17 @@ namespace Mist { capa["optional"]["maxsend"]["option"] = "--max-packet-size"; capa["optional"]["maxsend"]["short"] = "m"; - cfg->addConnectorOptions(554, capa); + cfg->addConnectorOptions(5554, capa); config = cfg; } + bool OutRTSP::isReadyForPlay(){ + if (isPushing){ + return true; + } + return Output::isReadyForPlay(); + } + void OutRTSP::sendNext(){ char * dataPointer = 0; unsigned int dataLen = 0; @@ -71,11 +84,8 @@ namespace Mist { unsigned int tid = thisPacket.getTrackId(); unsigned int timestamp = thisPacket.getTime(); - //update where we are now. - seekpoint = timestamp; //if we're past the pausing point, seek to it, and pause immediately - if (pausepoint && seekpoint > pausepoint){ - seekpoint = pausepoint; + if (pausepoint && timestamp > pausepoint){ pausepoint = 0; stop(); return; @@ -84,12 +94,12 @@ namespace Mist { void * socket = 0; void (*callBack)(void *, char *, unsigned int, unsigned int) = 0; - if (tracks[tid].UDP){ + if (tracks[tid].channel == -1){//UDP connection socket = &tracks[tid].data; callBack = sendUDP; if (Util::epoch()/5 != tracks[tid].rtcpSent){ tracks[tid].rtcpSent = Util::epoch()/5; - tracks[tid].rtpPacket.sendRTCP(connectedAt, &tracks[tid].rtcp, tid, myMeta, sendUDP); + tracks[tid].pack.sendRTCP(connectedAt, &tracks[tid].rtcp, tid, myMeta, sendUDP); } }else{ socket = &myConn; @@ -97,31 +107,24 @@ namespace Mist { } if(myMeta.tracks[tid].codec == "MP3"){ - tracks[tid].rtpPacket.setTimestamp(timestamp * 90); - tracks[tid].rtpPacket.sendData(socket, callBack, dataPointer, dataLen, tracks[tid].channel, "MP3"); + tracks[tid].pack.setTimestamp(timestamp * 90); + tracks[tid].pack.sendData(socket, callBack, dataPointer, dataLen, tracks[tid].channel, "MP3"); return; } if( myMeta.tracks[tid].codec == "AC3" || myMeta.tracks[tid].codec == "AAC"){ - tracks[tid].rtpPacket.setTimestamp(timestamp * ((double) myMeta.tracks[tid].rate / 1000.0)); - tracks[tid].rtpPacket.sendData(socket, callBack, dataPointer, dataLen, tracks[tid].channel,myMeta.tracks[tid].codec); + tracks[tid].pack.setTimestamp(timestamp * ((double) myMeta.tracks[tid].rate / 1000.0)); + tracks[tid].pack.sendData(socket, callBack, dataPointer, dataLen, tracks[tid].channel,myMeta.tracks[tid].codec); return; } if(myMeta.tracks[tid].codec == "H264"){ long long offset = thisPacket.getInt("offset"); - tracks[tid].rtpPacket.setTimestamp(90 * (timestamp + offset)); - if (tracks[tid].initSent && thisPacket.getFlag("keyframe")) { - MP4::AVCC avccbox; - avccbox.setPayload(myMeta.tracks[tid].init); - tracks[tid].rtpPacket.sendH264(socket, callBack, avccbox.getSPS(), avccbox.getSPSLen(), tracks[tid].channel); - tracks[tid].rtpPacket.sendH264(socket, callBack, avccbox.getPPS(), avccbox.getPPSLen(), tracks[tid].channel); - tracks[tid].initSent = true; - } + tracks[tid].pack.setTimestamp(90 * (timestamp + offset)); unsigned long sent = 0; while (sent < dataLen) { unsigned long nalSize = ntohl(*((unsigned long *)(dataPointer + sent))); - tracks[tid].rtpPacket.sendH264(socket, callBack, dataPointer + sent + 4, nalSize, tracks[tid].channel); + tracks[tid].pack.sendH264(socket, callBack, dataPointer + sent + 4, nalSize, tracks[tid].channel); sent += nalSize + 4; } return; @@ -129,17 +132,34 @@ namespace Mist { } + /// This request handler also checks for UDP packets + void OutRTSP::requestHandler(){ + if (!expectTCP){ + handleUDP(); + } + Output::requestHandler(); + } + void OutRTSP::onRequest(){ RTP::MAX_SEND = config->getInteger("maxsend"); + //if needed, parse TCP packets, and return if it is not safe (yet) to read HTTP/RTSP packets + if (expectTCP && !handleTCP()){return;} while (HTTP_R.Read(myConn)){ HTTP_S.Clean(); HTTP_S.protocol = "RTSP/1.0"; //set the streamname and session + if (!source.size()){ + std::string source = HTTP_R.url.substr(7); + unsigned int loc = std::min(source.find(':'),source.find('/')); + source = source.substr(0,loc); + } size_t found = HTTP_R.url.find('/', 7); - streamName = HTTP_R.url.substr(found + 1, HTTP_R.url.substr(found + 1).find('/')); - Util::sanitizeName(streamName); - if (streamName != ""){ + if (!streamName.size()){ + streamName = HTTP_R.url.substr(found + 1, HTTP_R.url.substr(found + 1).find('/')); + Util::sanitizeName(streamName); + } + if (streamName.size()){ HTTP_S.SetHeader("Session", Secure::md5(HTTP_S.GetHeader("User-Agent") + getConnectedHost()) + "_" + streamName); } @@ -152,272 +172,658 @@ namespace Mist { HTTP_S.SetHeader("Date", dString); //set the sequence number to match the received sequence number - HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("CSeq")); + if (HTTP_R.hasHeader("CSeq")){ + HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("CSeq")); + } + if (HTTP_R.hasHeader("Cseq")){ + HTTP_S.SetHeader("CSeq", HTTP_R.GetHeader("Cseq")); + } + INFO_MSG("Handling %s", HTTP_R.method.c_str()); + //handle the request - VERYHIGH_MSG("Received %s:\n%s", HTTP_R.method.c_str(), HTTP_R.BuildRequest().c_str()); - bool handled = false; if (HTTP_R.method == "OPTIONS"){ - HTTP_S.SetHeader("Public", "SETUP, TEARDOWN, PLAY, PAUSE, DESCRIBE, GET_PARAMETER"); + HTTP_S.SetHeader("Public", "SETUP, TEARDOWN, PLAY, PAUSE, DESCRIBE, GET_PARAMETER, ANNOUNCE, RECORD"); HTTP_S.SendResponse("200", "OK", myConn); - handled = true; + HTTP_R.Clean(); + continue; } if (HTTP_R.method == "GET_PARAMETER"){ HTTP_S.SendResponse("200", "OK", myConn); - handled = true; + HTTP_R.Clean(); + continue; } if (HTTP_R.method == "DESCRIBE"){ - handleDescribe(); - handled = true; + initialize(); + uint64_t firstms = startTime(); + uint64_t lastms = endTime(); + selectedTracks.clear(); + std::stringstream transportString; + transportString << "v=0\r\n" + "o=- " << Util::getMS() << " 1 IN IP4 127.0.0.1\r\n" + "s=" << streamName << "\r\n" + "c=IN IP4 0.0.0.0\r\n" + "i=" << streamName << "\r\n" + "u=" << HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName << "\r\n" + "t=0 0\r\n" + "a=tool:MistServer\r\n" + "a=type:broadcast\r\n" + "a=control:*\r\n"; + if (myMeta.live){ + transportString << "a=range:npt=" << ((double)firstms) / 1000.0 << "-\r\n"; + }else{ + transportString << "a=range:npt=" << ((double)firstms) / 1000.0 << "-" << ((double)lastms) / 1000.0 << "\r\n"; + } + + for (std::map::iterator objIt = myMeta.tracks.begin(); objIt != myMeta.tracks.end(); ++objIt) { + transportString << tracks[objIt->first].mediaDescription(objIt->second); + } + transportString << "\r\n"; + HIGH_MSG("Reply: %s", transportString.str().c_str()); + HTTP_S.SetHeader("Content-Base", HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) + "/" + streamName); + HTTP_S.SetHeader("Content-Type", "application/sdp"); + HTTP_S.SetBody(transportString.str()); + HTTP_S.SendResponse("200", "OK", myConn); + HTTP_R.Clean(); + continue; } if (HTTP_R.method == "SETUP"){ - handleSetup(); - handled = true; + size_t trackPos = HTTP_R.url.rfind("/track"); + if (trackPos != std::string::npos){ + unsigned int trId = atol(HTTP_R.url.substr(trackPos + 6).c_str()); + if (myMeta.tracks.count(trId)){ + if (tracks[trId].parseTransport(HTTP_R.GetHeader("Transport"), getConnectedHost(), source, myMeta.tracks[trId])){ + selectedTracks.insert(trId); + HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); + HTTP_S.SetHeader("Transport", tracks[trId].transportString); + HTTP_S.SetHeader("Cache-Control", "no-cache"); + HTTP_S.SendResponse("200", "OK", myConn); + }else{ + HTTP_S.SendResponse("404", "Track not available", myConn); + } + HTTP_R.Clean(); + continue; + } + } + //might be push setup - check known control points + if (isPushing && tracks.size()){ + bool setupHandled = false; + for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ + if (it->second.control.size() && (HTTP_R.url.find(it->second.control) != std::string::npos || HTTP_R.GetVar("pass").find(it->second.control) != std::string::npos)){ + if (it->second.parseTransport(HTTP_R.GetHeader("Transport"), getConnectedHost(), source, myMeta.tracks[it->first])){ + if (it->second.channel != -1){ + expectTCP = true; + if (expectTCP){handleTCP();} + } + HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); + HTTP_S.SetHeader("Transport", it->second.transportString); + HTTP_S.SetHeader("Cache-Control", "no-cache"); + HTTP_S.SendResponse("200", "OK", myConn); + INFO_MSG("Setup completed for track %llu (%s): %s", it->first, myMeta.tracks[it->first].codec.c_str(), it->second.transportString.c_str()); + }else{ + HTTP_S.SendResponse("404", "Track not known or allowed", myConn); + } + setupHandled = true; + HTTP_R.Clean(); + break; + } + } + if (!setupHandled){ + HTTP_S.SendResponse("404", "Track not known", myConn); + HTTP_R.Clean(); + } + continue; + } + FAIL_MSG("Could not handle setup: pushing=%s, trackSize=%u", isPushing?"true":"false", tracks.size()); } if (HTTP_R.method == "PLAY"){ - handlePlay(); - handled = true; + initialSeek(); + std::string range = HTTP_R.GetHeader("Range"); + if (range != ""){ + range = range.substr(range.find("npt=")+4); + if (!range.empty()) { + range = range.substr(0, range.find('-')); + uint64_t targetPos = 1000*atof(range.c_str()); + if (targetPos){seek(targetPos);} + } + } + std::stringstream rangeStr; + if (myMeta.live){ + rangeStr << "npt=" << currentTime()/1000 << "." << std::setw(3) << std::setfill('0') << currentTime()%1000 << "-"; + }else{ + rangeStr << "npt=" << currentTime()/1000 << "." << std::setw(3) << std::setfill('0') << currentTime()%1000 << "-" << std::setw(1) << endTime()/1000 << "." << std::setw(3) << std::setfill('0') << endTime()%1000; + } + HTTP_S.SetHeader("Range", rangeStr.str()); + std::stringstream infoString; + if (selectedTracks.size()){ + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){ + if (!infoString.str().empty()){infoString << ",";} + infoString << tracks[*it].rtpInfo(myMeta.tracks[*it], source+"/"+streamName, currentTime()); + } + } + HTTP_S.SetHeader("RTP-Info", infoString.str()); + HTTP_S.SendResponse("200", "OK", myConn); + parseData = true; + HTTP_R.Clean(); + continue; } if (HTTP_R.method == "PAUSE"){ - handlePause(); - handled = true; + HTTP_S.SendResponse("200", "OK", myConn); + std::string range = HTTP_R.GetHeader("Range"); + if (!range.empty()){ + range = range.substr(range.find("npt=")+4); + } + if (range.empty()){ + stop(); + }else{ + pausepoint = 1000 * (int) atof(range.c_str()); + if (pausepoint > currentTime()){pausepoint = 0; stop();} + } + HTTP_R.Clean(); + continue; } if (HTTP_R.method == "TEARDOWN"){ myConn.close(); stop(); - handled = true; + HTTP_R.Clean(); + continue; } - /// \todo Handle ANNOUNCE for push? Send out ANNOUNCE with stream length updates? - if (!handled){ - WARN_MSG("Unhandled command %s:\n%s", HTTP_R.method.c_str(), HTTP_R.BuildRequest().c_str()); + if (HTTP_R.method == "ANNOUNCE"){ + std::string smp = streamName.substr(0,(streamName.find_first_of("+ "))); + IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); ///< Contains server configuration and capabilities + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); + configLock.wait(); + + DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(smp); + if (streamCfg){ + if (streamCfg.getMember("source").asString().substr(0, 7) != "push://"){ + FAIL_MSG("Push rejected - stream %s not a push-able stream. (%s != push://*)", streamName.c_str(), streamCfg.getMember("source").asString().c_str()); + onFinish(); + }else{ + std::string source = streamCfg.getMember("source").asString().substr(7); + std::string IP = source.substr(0, source.find('@')); + /*LTS-START*/ + std::string password; + if (source.find('@') != std::string::npos){ + password = source.substr(source.find('@')+1); + if (password != ""){ + if (password == HTTP_R.GetVar("pass")){ + INFO_MSG("Password accepted - ignoring IP settings."); + IP = ""; + }else{ + INFO_MSG("Password rejected - checking IP."); + if (IP == ""){ + IP = "deny-all.invalid"; + } + } + } + } + if(Triggers::shouldTrigger("STREAM_PUSH", smp)){ + std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; + if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){ + FAIL_MSG("Push from %s to %s rejected - STREAM_PUSH trigger denied the push", getConnectedHost().c_str(), streamName.c_str()); + onFinish(); + configLock.post(); + configLock.close(); + return; + } + } + /*LTS-END*/ + if (IP != ""){ + if (!myConn.isAddress(IP)){ + FAIL_MSG("Push from %s to %s rejected - source host not whitelisted", getConnectedHost().c_str(), streamName.c_str()); + onFinish(); + } + } + } + }else{ + FAIL_MSG("Push from %s rejected - stream '%s' not configured.", getConnectedHost().c_str(), streamName.c_str()); + onFinish(); + } + configLock.post(); + configLock.close(); + if (!myConn){return;}//do not initialize if rejected + isPushing = true; + initialize(); + INFO_MSG("Pushing to stream %s", streamName.c_str()); + parseSDP(HTTP_R.body); + HTTP_S.SendResponse("200", "OK", myConn); + HTTP_R.Clean(); + continue; } + if (HTTP_R.method == "RECORD"){ + HTTP_S.SendResponse("200", "OK", myConn); + HTTP_R.Clean(); + continue; + } + WARN_MSG("Unhandled command %s:\n%s", HTTP_R.method.c_str(), HTTP_R.BuildRequest().c_str()); HTTP_R.Clean(); } } - void OutRTSP::handleDescribe(){ - //initialize the header, clear out any automatically selected tracks - initialize(); - selectedTracks.clear(); - - //calculate begin/end of stream - unsigned int firstms = myMeta.tracks.begin()->second.firstms; - unsigned int lastms = myMeta.tracks.begin()->second.lastms; - for (std::map::iterator objIt = myMeta.tracks.begin(); objIt != myMeta.tracks.end(); objIt ++) { - if (objIt->second.firstms < firstms){ - firstms = objIt->second.firstms; - } - if (objIt->second.lastms > lastms){ - lastms = objIt->second.lastms; - } + /// Disconnects the user + bool OutRTSP::onFinish(){ + if (myConn){ + myConn.close(); } - - HTTP_S.SetHeader("Content-Base", HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) + "/" + streamName); - HTTP_S.SetHeader("Content-Type", "application/sdp"); - std::stringstream transportString; - transportString << "v=0\r\n"//version - "o=- "//owner - << Util::getMS()//id - << " 1 IN IP4 127.0.0.1"//or IPv6 - "\r\ns=" << streamName << "\r\n" - "c=IN IP4 0.0.0.0\r\n" - "i=Mistserver stream " << streamName << "\r\n" - "u=" << HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName << "\r\n" - "t=0 0\r\n"//timing - "a=tool:MistServer\r\n"// - "a=type:broadcast\r\n"// - "a=control:*\r\n"// - "a=range:npt=" << ((double)firstms) / 1000.0 << "-" << ((double)lastms) / 1000.0 << "\r\n"; - - //loop over all tracks, add them to the SDP. - /// \todo Make sure this works correctly for multibitrate streams. - for (std::map::iterator objIt = myMeta.tracks.begin(); objIt != myMeta.tracks.end(); objIt ++) { - if (objIt->second.codec == "H264") { - MP4::AVCC avccbox; - avccbox.setPayload(objIt->second.init); - transportString << "m=" << objIt->second.type << " 0 RTP/AVP 97\r\n" - "a=rtpmap:97 H264/90000\r\n" - "a=cliprect:0,0," << objIt->second.height << "," << objIt->second.width << "\r\n" - "a=framesize:97 " << objIt->second.width << '-' << objIt->second.height << "\r\n" - "a=fmtp:97 packetization-mode=1;profile-level-id=" - << std::hex << std::setw(2) << std::setfill('0') << (int)objIt->second.init.data()[1] << std::dec << "E0" - << std::hex << std::setw(2) << std::setfill('0') << (int)objIt->second.init.data()[3] << std::dec << ";" - "sprop-parameter-sets=" - << Encodings::Base64::encode(std::string(avccbox.getSPS(), avccbox.getSPSLen())) - << "," - << Encodings::Base64::encode(std::string(avccbox.getPPS(), avccbox.getPPSLen())) - << "\r\n" - "a=framerate:" << ((double)objIt->second.fpks)/1000.0 << "\r\n" - "a=control:track" << objIt->second.trackID << "\r\n"; - } else if (objIt->second.codec == "AAC") { - transportString << "m=" << objIt->second.type << " 0 RTP/AVP 96" << "\r\n" - "a=rtpmap:96 mpeg4-generic/" << objIt->second.rate << "/" << objIt->second.channels << "\r\n" - "a=fmtp:96 streamtype=5; profile-level-id=15; config="; - for (unsigned int i = 0; i < objIt->second.init.size(); i++) { - transportString << std::hex << std::setw(2) << std::setfill('0') << (int)objIt->second.init[i] << std::dec; - } - //these values are described in RFC 3640 - transportString << "; mode=AAC-hbr; SizeLength=13; IndexLength=3; IndexDeltaLength=3;\r\n" - "a=control:track" << objIt->second.trackID << "\r\n"; - }else if (objIt->second.codec == "MP3") { - transportString << "m=" << objIt->second.type << " 0 RTP/AVP 14" << "\r\n" - "a=rtpmap:14 MPA/90000/" << objIt->second.channels << "\r\n" - "a=control:track" << objIt->second.trackID << "\r\n"; - }else if ( objIt->second.codec == "AC3") { - transportString << "m=" << objIt->second.type << " 0 RTP/AVP 100" << "\r\n" - "a=rtpmap:100 AC3/" << objIt->second.rate << "/" << objIt->second.channels << "\r\n" - "a=control:track" << objIt->second.trackID << "\r\n"; - } - }//for tracks iterator - transportString << "\r\n"; - HTTP_S.SetBody(transportString.str()); - HTTP_S.SendResponse("200", "OK", myConn); - } - - void OutRTSP::handleSetup(){ - std::stringstream transportString; - unsigned int trId = atol(HTTP_R.url.substr(HTTP_R.url.rfind("/track") + 6).c_str()); - selectedTracks.insert(trId); - unsigned int SSrc = rand(); - if (myMeta.tracks[trId].codec == "H264") { - tracks[trId].rtpPacket = RTP::Packet(97, 1, 0, SSrc); - }else if(myMeta.tracks[trId].codec == "AAC"){ - tracks[trId].rtpPacket = RTP::Packet(96, 1, 0, SSrc); - }else if(myMeta.tracks[trId].codec == "AC3"){ - tracks[trId].rtpPacket = RTP::Packet(100, 1, 0, SSrc); - }else if(myMeta.tracks[trId].codec == "MP3"){ - tracks[trId].rtpPacket = RTP::Packet(14, 1, 0, SSrc); - }else{ - WARN_MSG("Unsupported codec for RTSP on track %u (%s): %s", trId, myMeta.tracks[trId].codec.c_str(), HTTP_R.url.c_str()); - } - - //read client ports - std::string transport = HTTP_R.GetHeader("Transport"); - unsigned long cPort; - if (transport.find("TCP") != std::string::npos) { - /// \todo This needs error checking. - tracks[trId].UDP = false; - std::string chanE = transport.substr(transport.find("interleaved=") + 12, (transport.size() - transport.rfind('-') - 1)); //extract channel ID - tracks[trId].channel = atol(chanE.c_str()); - tracks[trId].rtcpSent = 0; - transportString << transport; - } else { - tracks[trId].UDP = true; - size_t port_loc = transport.rfind("client_port=") + 12; - cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str()); - //find available ports locally; - int sendbuff = 4*1024*1024; - tracks[trId].data.SetDestination(getConnectedHost(), cPort); - tracks[trId].data.bind(2000 + trId * 2); - setsockopt(tracks[trId].data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); - tracks[trId].rtcp.SetDestination(getConnectedHost(), cPort + 1); - tracks[trId].rtcp.bind(2000 + trId * 2 + 1); - setsockopt(tracks[trId].rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); - std::string source = HTTP_R.url.substr(7); - unsigned int loc = std::min(source.find(':'),source.find('/')); - source = source.substr(0,loc); - transportString << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";source="<< source <<";server_port=" << (2000 + trId * 2) << "-" << (2000 + trId * 2 + 1) << ";ssrc=" << std::hex << SSrc << std::dec; - } - /// \todo We should probably not allocate UDP sockets when using TCP. - HTTP_S.SetHeader("Expires", HTTP_S.GetHeader("Date")); - HTTP_S.SetHeader("Transport", transportString.str()); - HTTP_S.SetHeader("Cache-Control", "no-cache"); - HTTP_S.SendResponse("200", "OK", myConn); + return false; } - void OutRTSP::handlePause(){ - HTTP_S.SendResponse("200", "OK", myConn); - std::string range = HTTP_R.GetHeader("Range"); - if (range.empty()){ - stop(); - return; + /// Attempts to parse TCP RTP packets at the beginning of the header. + /// Returns whether it is safe to attempt to read HTTP/RTSP packets (true) or not (false). + bool OutRTSP::handleTCP(){ + if (!myConn.Received().size() || !myConn.Received().available(1)){return false;}//no data + if (myConn.Received().copy(1) != "$"){return true;}//not a TCP RTP packet + if (!myConn.Received().available(4)){return false;}//a TCP RTP packet, but not complete yet + //We have a TCP packet! Read it... + //Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data + std::string tcpHead = myConn.Received().copy(4); + uint16_t len = ntohs(*(short*)(tcpHead.data()+2)); + if (!myConn.Received().available(len+4)){return false;}//a TCP RTP packet, but not complete yet + //remove whole packet from buffer, including 4 byte header + std::string tcpPacket = myConn.Received().remove(len+4); + for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ + if (tcpHead.data()[1] == it->second.channel){ + RTP::Packet pkt(tcpPacket.data()+4, len); + it->second.rtpSeq = pkt.getSequence(); + handleIncomingRTP(it->first, pkt); + break; + } } - range = range.substr(range.find("npt=")+4); - if (range.empty()) { - stop(); - return; - } - pausepoint = 1000 * (int) atof(range.c_str()); - if (pausepoint > seekpoint){ - seekpoint = pausepoint; - pausepoint = 0; - stop(); + //attempt to read more packets + return handleTCP(); + } + + /// Reads and handles RTP packets over UDP, if needed + void OutRTSP::handleUDP(){ + for (std::map::iterator it = tracks.begin(); it != tracks.end(); ++it){ + Socket::UDPConnection & s = it->second.data; + while (s.Receive()){ + if (s.getDestPort() != it->second.cPort){continue;}//port mismatch = skip + RTP::Packet pack(s.data, s.data_len); + if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();} + //packet is very early - assume dropped after 10 packets + while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -10){ + WARN_MSG("Giving up on packet %u", it->second.rtpSeq); + ++(it->second.rtpSeq); + //send any buffered packets we may have + while (it->second.packBuffer.count(it->second.rtpSeq)){ + handleIncomingRTP(it->first, pack); + ++(it->second.rtpSeq); + } + } + //send any buffered packets we may have + while (it->second.packBuffer.count(it->second.rtpSeq)){ + handleIncomingRTP(it->first, pack); + ++(it->second.rtpSeq); + } + //packet is slightly early - buffer it + if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){ + INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence()); + it->second.packBuffer[pack.getSequence()] = pack; + } + //packet is late + if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){ + //negative difference? + WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); + return; + } + //packet is in order + if (it->second.rtpSeq == pack.getSequence()){ + handleIncomingRTP(it->first, pack); + ++(it->second.rtpSeq); + } + } } } - - void OutRTSP::handlePlay(){ - /// \todo Add support for queuing multiple play ranges - //calculate first and last possible timestamps - unsigned int firstms = myMeta.tracks.begin()->second.firstms; - unsigned int lastms = myMeta.tracks.begin()->second.lastms; - for (std::map::iterator objIt = myMeta.tracks.begin(); objIt != myMeta.tracks.end(); objIt ++) { - if (objIt->second.firstms < firstms){ - firstms = objIt->second.firstms; - } - if (objIt->second.lastms > lastms){ - lastms = objIt->second.lastms; - } - } - - std::stringstream transportString; - std::string range = HTTP_R.GetHeader("Range"); - if (range != ""){ - VERYHIGH_MSG("Play: %s", range.c_str()); - range = range.substr(range.find("npt=")+4); - if (range.empty()) { - seekpoint = 0; + + ///Helper function to determine if a H264 NAL unit is a keyframe or not + static inline bool isH264Keyframe(char * data, unsigned long len){ + if ((data[0] & 0x1F) == 0x05){return true;} + if ((data[0] & 0x1F) != 0x01){return false;} + Utils::bitstream bs; + for (size_t i = 1; i < 10 && i < len; ++i) { + if (i + 2 < len && (memcmp(data + i, "\000\000\003", 3) == 0)) { //Emulation prevention bytes + bs.append(data + i, 2); + i += 2; } else { - range = range.substr(0, range.find('-')); - seekpoint = 1000 * (int) atof(range.c_str()); + bs.append(data + i, 1); } - //snap seekpoint to closest keyframe - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - it->second.rtcpSent =0; - if (myMeta.tracks[it->first].type == "video") { - unsigned int newPoint = seekpoint; - for (unsigned int iy = 0; iy < myMeta.tracks[it->first].keys.size(); iy++) { - if (myMeta.tracks[it->first].keys[iy].getTime() > seekpoint && iy > 0) { - iy--; - break; - } - newPoint = myMeta.tracks[it->first].keys[iy].getTime(); + } + bs.getExpGolomb();//Discard first_mb_in_slice + uint64_t sliceType = bs.getUExpGolomb(); + if (sliceType == 2 || sliceType == 4 || sliceType == 7 || sliceType == 9){ + return true; + } + return false; + } + + ///Helper function to determine a H264 NAL unit frame number + ///\returns -1 if there is no frame number + ///UNFINISHED. Reads all values, but doesn't return any sensible values. Be warned! + static inline int getH264FrameNum(char * data, unsigned long len, h264::SPSMeta & md){ + char nalType = (data[0] & 0x1F); + if (nalType != 1 && nalType != 2 && nalType != 5){ + return -1; + } + Utils::bitstream bs; + for (size_t i = 1; i < 20 && i < len; ++i) { + if (i + 2 < len && (memcmp(data + i, "\000\000\003", 3) == 0)) { //Emulation prevention bytes + bs.append(data + i, 2); + i += 2; + } else { + bs.append(data + i, 1); + } + } + bs.getUExpGolomb();//first_mb_in_slice + bs.getUExpGolomb();//slice_type + bs.getUExpGolomb();//pps_id + if (md.sep_col_plane){ + bs.get(2);//colour_plane_id + } + uint16_t frame_num = bs.get(md.log2_max_frame_num); + if (!md.mbs_only){ + if (bs.get(1)){bs.get(1);}//field_pic_flag && bottom_field_flag + } + if (nalType == 5){ + bs.getUExpGolomb();//idr_pic_id + } + ///\todo Implement pic_order_cnt_type value 1 + uint16_t order_cnt = 0; + if (md.cnt_type == 0){ + order_cnt = bs.get(md.log2_max_order_cnt); + } + return -1; + } + + + void OutRTSP::h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, const bool isKey){ + DTSC::Packet nextPack; + uint64_t frameNo = (ts / (1000.0/h264meta[track].fps))+0.5; + while (frameNo < tracks[track].packCount){ + tracks[track].packCount--; + tracks[track].offset--; + } + uint32_t offset = (frameNo-tracks[track].packCount) * (1000.0/h264meta[track].fps); + uint64_t newTs = tracks[track].packCount * (1000.0/h264meta[track].fps); + VERYHIGH_MSG("Packing time %llu = frame %llu. Expected %llu -> +%llu/%llu (oz %d)", ts, frameNo, tracks[track].packCount, (frameNo-tracks[track].packCount), offset, tracks[track].offset); + nextPack.genericFill(newTs, offset, track, buffer, len, 0, isKey); + tracks[track].packCount++; + nProxy.streamName = streamName; + continueNegotiate(track); + bufferLivePacket(nextPack); + } + + void OutRTSP::handleIncomingRTP(const uint64_t track, const RTP::Packet & pkt){ + if (!tracks[track].firstTime){ + tracks[track].firstTime = pkt.getTimeStamp(); + if (!tracks[track].firstTime){ + tracks[track].firstTime = 1; + } + } + if (myMeta.tracks[track].codec == "AAC"){ + //assume AAC packets are single AU units + /// \todo Support other input than single AU units + char * pl = pkt.getPayload(); + unsigned int headLen = (Bit::btohs(pl) >> 3) + 2;//in bits, so /8, plus two for the prepended size + DTSC::Packet nextPack; + uint16_t samples = aac::AudSpecConf::samples(myMeta.tracks[track].init); + uint32_t sampleOffset = 0; + uint32_t offset = 0; + uint32_t auSize = 0; + for (uint32_t i = 2; i < headLen; i += 2){ + auSize = Bit::btohs(pl+i) >> 3;//only the upper 13 bits + nextPack.genericFill((pkt.getTimeStamp() + sampleOffset - tracks[track].firstTime) / ((double)myMeta.tracks[track].rate / 1000.0), 0, track, pl+headLen+offset, std::min(auSize, pkt.getPayloadSize() - headLen - offset), 0, false); + offset += auSize; + sampleOffset += samples; + nProxy.streamName = streamName; + continueNegotiate(track); + bufferLivePacket(nextPack); + } + return; + } + if (myMeta.tracks[track].codec == "H264"){ + //assume H264 packets + char * pl = pkt.getPayload(); + if ((pl[0] & 0x1F) == 0){ + WARN_MSG("H264 packet type null ignored"); + return; + } + if ((pl[0] & 0x1F) < 24){ + DONTEVEN_MSG("H264 single packet, type %u", (unsigned int)(pl[0] & 0x1F)); + static char * packBuffer = 0; + static unsigned long packBufferSize = 0; + unsigned long len = pkt.getPayloadSize(); + if (packBufferSize < len+4){ + char * tmp = (char*)realloc(packBuffer, len+4); + if (tmp){ + packBuffer = tmp; + packBufferSize = len+4; + }else{ + free(packBuffer); + packBufferSize = 0; + packBuffer = 0; + FAIL_MSG("Failed to allocate memory for H264 packet"); + return; } - seekpoint = newPoint; - break; } + Bit::htobl(packBuffer, len);//size-prepend + memcpy(packBuffer+4, pl, len); + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime) / 90, track, packBuffer, len+4, isH264Keyframe(packBuffer+4, len)); + return; } - } - seek(seekpoint); - - unsigned int counter = 0; - std::map timeMap; //Keeps track of temporary timestamp data for the upcoming seek. - for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - timeMap[it->first] = myMeta.tracks[it->first].firstms; - for (unsigned int iy = 0; iy < myMeta.tracks[it->first].parts.size(); iy++) { - if (timeMap[it->first] > seekpoint) { - iy--; - break; + if ((pl[0] & 0x1F) == 24){ + DONTEVEN_MSG("H264 STAP-A packet"); + unsigned int len = 0; + unsigned int pos = 1; + while (pos + 1 < pkt.getPayloadSize()){ + unsigned int pLen = Bit::btohs(pl+pos); + INSANE_MSG("Packet of %ub and type %u", pLen, (unsigned int)(pl[pos+2] & 0x1F)); + pos += 2+pLen; + len += 4+pLen; } - timeMap[it->first] += myMeta.tracks[it->first].parts[iy].getDuration();//door parts van keyframes + static char * packBuffer = 0; + static unsigned long packBufferSize = 0; + if (packBufferSize < len){ + char * tmp = (char*)realloc(packBuffer, len); + if (tmp){ + packBuffer = tmp; + packBufferSize = len; + }else{ + free(packBuffer); + packBufferSize = 0; + packBuffer = 0; + FAIL_MSG("Failed to allocate memory for H264 STAP-A packet"); + return; + } + } + pos = 1; + len = 0; + bool isKey = false; + while (pos + 1 < pkt.getPayloadSize()){ + unsigned int pLen = Bit::btohs(pl+pos); + Bit::htobl(packBuffer+len, pLen);//size-prepend + getH264FrameNum(pl+pos+2, pLen, h264meta[track]); + isKey |= isH264Keyframe(pl+pos+2, pLen); + memcpy(packBuffer+len+4, pl+pos+2, pLen); + pos += 2+pLen; + len += 4+pLen; + } + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime) / 90, track, packBuffer, len, isKey); + return; } - if (myMeta.tracks[it->first].codec == "H264") { - timeMap[it->first] = 90 * timeMap[it->first]; - } else if (myMeta.tracks[it->first].codec == "AAC" || myMeta.tracks[it->first].codec == "MP3" || myMeta.tracks[it->first].codec == "AC3") { - timeMap[it->first] = timeMap[it->first] * ((double)myMeta.tracks[it->first].rate / 1000.0); + if ((pl[0] & 0x1F) == 28){ + DONTEVEN_MSG("H264 FU-A packet"); + static char * fuaBuffer = 0; + static unsigned long fuaBufferSize = 0; + static unsigned long fuaCurrLen = 0; + + //No length yet? Check for start bit. Ignore rest. + if (!fuaCurrLen && (pkt.getPayload()[1] & 0x80) == 0){ + HIGH_MSG("Not start of a new FU-A - throwing away"); + return; + } + if (fuaCurrLen && ((pkt.getPayload()[1] & 0x80) || (tracks[track].rtpSeq != pkt.getSequence()))){ + WARN_MSG("Ending unfinished FU-A"); + INSANE_MSG("H264 FU-A packet incompleted: %lu", fuaCurrLen); + Bit::htobl(fuaBuffer, fuaCurrLen-4);//size-prepend + fuaBuffer[4] |= 0x80;//set error bit + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime) / 90, track, fuaBuffer, fuaCurrLen, isH264Keyframe(fuaBuffer+4, fuaCurrLen-4)); + fuaCurrLen = 0; + return; + } + + unsigned long len = pkt.getPayloadSize() - 2;//ignore the two FU-A bytes in front + if (!fuaCurrLen){len += 5;}//five extra bytes for the first packet + + + if (fuaBufferSize < fuaCurrLen + len){ + char * tmp = (char*)realloc(fuaBuffer, fuaCurrLen + len); + if (tmp){ + fuaBuffer = tmp; + fuaBufferSize = fuaCurrLen + len; + }else{ + free(fuaBuffer); + fuaBufferSize = 0; + fuaBuffer = 0; + FAIL_MSG("Failed to allocate memory for H264 FU-A packet"); + return; + } + } + + if (fuaCurrLen == 0){ + memcpy(fuaBuffer+4, pkt.getPayload()+1, pkt.getPayloadSize()-1); + //reconstruct first byte + fuaBuffer[4] = (fuaBuffer[4] & 0x1F) | (pkt.getPayload()[0] & 0xE0); + }else{ + memcpy(fuaBuffer+fuaCurrLen, pkt.getPayload()+2, pkt.getPayloadSize()-2); + } + fuaCurrLen += len; + + if (pkt.getPayload()[1] & 0x40){//last packet + INSANE_MSG("H264 FU-A packet type %u completed: %lu", (unsigned int)(fuaBuffer[4] & 0x1F), fuaCurrLen); + Bit::htobl(fuaBuffer, fuaCurrLen-4);//size-prepend + h264Packet((pkt.getTimeStamp() - tracks[track].firstTime) / 90, track, fuaBuffer, fuaCurrLen, isH264Keyframe(fuaBuffer+4, fuaCurrLen-4)); + fuaCurrLen = 0; + } + return; } - transportString << "url=" << HTTP_R.url.substr(0, HTTP_R.url.rfind('/')) << "/" << streamName << "/track" << it->first << ";"; //get the current url, not localhost - transportString << "sequence=" << tracks[it->first].rtpPacket.getSequence() << ";rtptime=" << timeMap[it->first]; - if (counter < tracks.size()) { - transportString << ","; - } - counter++; + WARN_MSG("H264 packet type %u unsupported", (unsigned int)(pl[0] & 0x1F)); + return; } - std::stringstream rangeStr; - rangeStr << "npt=" << seekpoint/1000 << "." << std::setw(3) << std::setfill('0') << seekpoint %1000 << "-" << std::setw(1) << lastms/1000 << "." << std::setw(3) << std::setfill('0') << lastms%1000; - HTTP_S.SetHeader("Range", rangeStr.str()); - HTTP_S.SetHeader("RTP-Info", transportString.str()); - HTTP_S.SendResponse("200", "OK", myConn); - parseData = true; + } + + void OutRTSP::parseSDP(const std::string & sdp){ + std::stringstream ss(sdp); + std::string to; + uint64_t trackNo = 0; + bool nope = true; //true if we have no valid track to fill + while(std::getline(ss,to,'\n')){ + if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size()-1, 1);} + + // All tracks start with a media line + if (to.substr(0,2) == "m="){ + nope = true; + ++trackNo; + std::stringstream words(to.substr(2)); + std::string item; + if (getline(words, item, ' ') && (item == "audio" || item == "video")){ + myMeta.tracks[trackNo].type = item; + myMeta.tracks[trackNo].trackID = trackNo; + }else{ + WARN_MSG("Media type not supported: %s", item.c_str()); + continue; + } + getline(words, item, ' '); + if (!getline(words, item, ' ') || item != "RTP/AVP"){ + WARN_MSG("Media transport not supported: %s", item.c_str()); + continue; + } + nope = false; + continue; + } + if (nope){continue;}//ignore lines if we have no valid track + // RTP mapping + if (to.substr(0, 8) == "a=rtpmap"){ + std::string mediaType = to.substr(to.find(' ', 8)+1); + std::string trCodec = mediaType.substr(0, mediaType.find('/')); + for(unsigned int i=0;i=97){trCodec[i]-=32;} + } + if (trCodec == "H264"){ + myMeta.tracks[trackNo].codec = "H264"; + } + if (trCodec == "MPEG4-GENERIC"){ + myMeta.tracks[trackNo].codec = "AAC"; + std::string extraInfo = mediaType.substr(mediaType.find('/')+1); + if (extraInfo.find('/') != std::string::npos){ + size_t lastSlash = extraInfo.find('/'); + myMeta.tracks[trackNo].rate = atoll(extraInfo.substr(0, lastSlash).c_str()); + myMeta.tracks[trackNo].channels = atoll(extraInfo.substr(lastSlash+1).c_str()); + }else{ + myMeta.tracks[trackNo].rate = atoll(extraInfo.c_str()); + myMeta.tracks[trackNo].channels = 1; + } + } + INFO_MSG("Incoming track %s", myMeta.tracks[trackNo].getIdentifier().c_str()); + continue; + } + if (to.substr(0, 10) == "a=control:"){ + tracks[trackNo].control = to.substr(10); + continue; + } + if (to.substr(0, 7) == "a=fmtp:"){ + tracks[trackNo].fmtp = to.substr(7); + if (myMeta.tracks[trackNo].codec == "AAC"){ + if (tracks[trackNo].getParamString("mode") != "AAC-hbr"){ + //a=fmtp:97 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3; config=120856E500 + FAIL_MSG("AAC transport mode not supported: %s", tracks[trackNo].getParamString("mode").c_str()); + nope = true; + myMeta.tracks.erase(trackNo); + tracks.erase(trackNo); + continue; + } + myMeta.tracks[trackNo].init = Encodings::Hex::decode(tracks[trackNo].getParamString("config")); + //myMeta.tracks[trackNo].rate = aac::AudSpecConf::rate(myMeta.tracks[trackNo].init); + + } + if (myMeta.tracks[trackNo].codec == "H264"){ + //a=fmtp:96 packetization-mode=1; sprop-parameter-sets=Z0LAHtkA2D3m//AUABqxAAADAAEAAAMAMg8WLkg=,aMuDyyA=; profile-level-id=42C01E + std::string sprop = tracks[trackNo].getParamString("sprop-parameter-sets"); + size_t comma = sprop.find(','); + std::string spsInfo = Encodings::Base64::decode(sprop.substr(0,comma)); + std::string ppsInfo = Encodings::Base64::decode(sprop.substr(comma+1)); + h264::sequenceParameterSet sps(spsInfo.data(), spsInfo.size()); + h264meta[trackNo] = sps.getCharacteristics(); + myMeta.tracks[trackNo].width = h264meta[trackNo].width; + myMeta.tracks[trackNo].height = h264meta[trackNo].height; + myMeta.tracks[trackNo].fpks = h264meta[trackNo].fps * 1000; + MP4::AVCC avccBox; + avccBox.setVersion(1); + avccBox.setProfile(spsInfo[1]); + avccBox.setCompatibleProfiles(spsInfo[2]); + avccBox.setLevel(spsInfo[3]); + avccBox.setSPSNumber(1); + avccBox.setSPS(spsInfo); + avccBox.setPPSNumber(1); + avccBox.setPPS(ppsInfo); + myMeta.tracks[trackNo].init = std::string(avccBox.payload(), avccBox.payloadSize()); + } + continue; + } + // We ignore bandwidth lines + if (to.substr(0,2) == "b="){ + continue; + } + //we ignore everything before the first media line. + if (!trackNo){ + continue; + } + //at this point, the data is definitely for a track + INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str()); + } + } } + diff --git a/src/output/output_rtsp.h b/src/output/output_rtsp.h index 7fc2b5b7..7ec834af 100644 --- a/src/output/output_rtsp.h +++ b/src/output/output_rtsp.h @@ -4,24 +4,138 @@ #include #include #include +#include +#include namespace Mist { ///Structure used to keep track of selected tracks. - class trackmeta { + class RTPTrack { public: - trackmeta(){ - rtcpSent = 0; - channel = 0; - UDP = false; - initSent = false; - } Socket::UDPConnection data; Socket::UDPConnection rtcp; - RTP::Packet rtpPacket;/// The RTP packet instance used for this track. + RTP::Packet pack; long long rtcpSent; + uint64_t firstTime; int channel;/// Channel number, used in TCP sending - bool UDP;/// True if sending over UDP, false otherwise - bool initSent; + uint64_t packCount; + uint16_t rtpSeq; + std::map packBuffer; + uint32_t cPort; + std::string transportString; + std::string control; + std::string fmtp; + int8_t offset; + RTPTrack(){ + rtcpSent = 0; + channel = -1; + firstTime = 0; + packCount = 0; + offset = 0; + cPort = 0; + rtpSeq = 0; + } + std::string getParamString(const std::string & param) const{ + if (!fmtp.size()){return "";} + size_t pos = fmtp.find(param); + if (pos == std::string::npos){return "";} + pos += param.size()+1; + size_t ePos = fmtp.find_first_of(" ;", pos); + return fmtp.substr(pos, ePos-pos); + } + uint64_t getParamInt(const std::string & param) const{ + return atoll(getParamString(param).c_str()); + } + std::string mediaDescription(const DTSC::Track & trk){ + std::stringstream mediaDesc; + if (trk.codec == "H264") { + MP4::AVCC avccbox; + avccbox.setPayload(trk.init); + mediaDesc << "m=video 0 RTP/AVP 97\r\n" + "a=rtpmap:97 H264/90000\r\n" + "a=cliprect:0,0," << trk.height << "," << trk.width << "\r\n" + "a=framesize:97 " << trk.width << '-' << trk.height << "\r\n" + "a=fmtp:97 packetization-mode=1;profile-level-id=" + << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[1] << std::dec << "E0" + << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init.data()[3] << std::dec << ";" + "sprop-parameter-sets=" + << Encodings::Base64::encode(std::string(avccbox.getSPS(), avccbox.getSPSLen())) + << "," + << Encodings::Base64::encode(std::string(avccbox.getPPS(), avccbox.getPPSLen())) + << "\r\n" + "a=framerate:" << ((double)trk.fpks)/1000.0 << "\r\n" + "a=control:track" << trk.trackID << "\r\n"; + } else if (trk.codec == "AAC") { + mediaDesc << "m=audio 0 RTP/AVP 96" << "\r\n" + "a=rtpmap:96 mpeg4-generic/" << trk.rate << "/" << trk.channels << "\r\n" + "a=fmtp:96 streamtype=5; profile-level-id=15; config="; + for (unsigned int i = 0; i < trk.init.size(); i++) { + mediaDesc << std::hex << std::setw(2) << std::setfill('0') << (int)trk.init[i] << std::dec; + } + //these values are described in RFC 3640 + mediaDesc << "; mode=AAC-hbr; SizeLength=13; IndexLength=3; IndexDeltaLength=3;\r\n" + "a=control:track" << trk.trackID << "\r\n"; + }else if (trk.codec == "MP3") { + mediaDesc << "m=" << trk.type << " 0 RTP/AVP 14" << "\r\n" + "a=rtpmap:14 MPA/90000/" << trk.channels << "\r\n" + "a=control:track" << trk.trackID << "\r\n"; + }else if ( trk.codec == "AC3") { + mediaDesc << "m=audio 0 RTP/AVP 100" << "\r\n" + "a=rtpmap:100 AC3/" << trk.rate << "/" << trk.channels << "\r\n" + "a=control:track" << trk.trackID << "\r\n"; + } + return mediaDesc.str(); + } + bool parseTransport(const std::string & transport, const std::string & host, const std::string & source, const DTSC::Track & trk){ + unsigned int SSrc = rand(); + if (trk.codec == "H264") { + pack = RTP::Packet(97, 1, 0, SSrc); + }else if(trk.codec == "AAC"){ + pack = RTP::Packet(96, 1, 0, SSrc); + }else if(trk.codec == "AC3"){ + pack = RTP::Packet(100, 1, 0, SSrc); + }else if(trk.codec == "MP3"){ + pack = RTP::Packet(14, 1, 0, SSrc); + }else{ + ERROR_MSG("Unsupported codec %s for RTSP on track %u", trk.codec.c_str(), trk.trackID); + return false; + } + std::cerr << transport << std::endl; + if (transport.find("TCP") != std::string::npos) { + std::string chanE = transport.substr(transport.find("interleaved=") + 12, (transport.size() - transport.rfind('-') - 1)); //extract channel ID + channel = atol(chanE.c_str()); + rtcpSent = 0; + transportString = transport; + } else { + channel = -1; + size_t port_loc = transport.rfind("client_port=") + 12; + cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str()); + unsigned int rand_offset = ((rand() % 4000) << 1) + 2000; + //find available ports locally; + int sendbuff = 4*1024*1024; + data.SetDestination(host, cPort); + data.bind(rand_offset + trk.trackID * 2); + setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + rtcp.SetDestination(host, cPort + 1); + rtcp.bind(rand_offset + trk.trackID * 2 + 1); + setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + std::stringstream tStr; + tStr << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";source="<< source <<";server_port=" << (rand_offset + trk.trackID * 2) << "-" << (rand_offset + trk.trackID * 2 + 1) << ";ssrc=" << std::hex << SSrc << std::dec; + transportString = tStr.str(); + } + return true; + } + std::string rtpInfo(const DTSC::Track & trk, const std::string & source, uint64_t currentTime){ + unsigned int timeMultiplier = 1; + if (trk.codec == "H264") { + timeMultiplier = 90; + } else if (trk.codec == "AAC" || trk.codec == "MP3" || trk.codec == "AC3") { + timeMultiplier = ((double)trk.rate / 1000.0); + } + std::stringstream rInfo; + rInfo << "url=" << source << "/track" << trk.trackID << ";"; //get the current url, not localhost + rInfo << "sequence=" << pack.getSequence() << ";rtptime=" << currentTime * timeMultiplier; + return rInfo.str(); + } }; class OutRTSP : public Output { @@ -30,17 +144,23 @@ namespace Mist { static void init(Util::Config * cfg); void sendNext(); void onRequest(); + void requestHandler(); + bool isReadyForPlay(); + bool onFinish(); private: - void handleDescribe(); - void handleSetup(); - void handlePlay(); - void handlePause(); - + bool isPushing; + void parseSDP(const std::string & sdp); long long connectedAt;///< The timestamp the connection was made, as reference point for RTCP packets. - std::map tracks;///< List of selected tracks with RTSP-specific session data. - unsigned int seekpoint;///< Current play position + std::map tracks;///< List of selected tracks with RTSP-specific session data. + std::map h264meta;///< Metadata from SPS of H264 tracks, for input handling. unsigned int pausepoint;///< Position to pause at, when reached HTTP::Parser HTTP_R, HTTP_S; + std::string source; + bool expectTCP; + bool handleTCP(); + void handleUDP(); + void handleIncomingRTP(const uint64_t track, const RTP::Packet & pkt); + void h264Packet(uint64_t ts, const uint64_t track, const char * buffer, const uint32_t len, const bool isKey); }; }