diff --git a/CMakeLists.txt b/CMakeLists.txt index b4d7375b..40bfc0e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -374,6 +374,7 @@ makeInput(MP4 mp4)#LTS makeInput(TS ts)#LTS makeInput(Folder folder)#LTS makeInput(Balancer balancer)#LTS +makeInput(RTSP rtsp)#LTS ######################################## # MistServer - Outputs # diff --git a/lib/sdp.cpp b/lib/sdp.cpp index 711988ce..06c2e38b 100644 --- a/lib/sdp.cpp +++ b/lib/sdp.cpp @@ -13,7 +13,6 @@ namespace SDP{ channel = -1; firstTime = 0; packCount = 0; - cPort = 0; rtpSeq = 0; lostTotal = 0; lostCurrent = 0; @@ -23,6 +22,8 @@ namespace SDP{ fpsMeta = 0; fps = 0; mySSRC = rand(); + portA = portB = 0; + cPortA = cPortB = 0; } /// Extracts a particular parameter from the fmtp string. fmtp member must be set before calling. @@ -173,6 +174,28 @@ namespace SDP{ return mediaDesc.str(); } + /// Generates a transport string suitable for in a SETUP request. + /// By default generates a TCP mode string. + /// Expects parseTransport to be called with the response from the server. + std::string Track::generateTransport(uint32_t trackNo, const std::string &dest, bool TCPmode){ + if (TCPmode){ + //We simply request interleaved delivery over a trackNo-based identifier. + //No need to set any internal state, parseTransport will handle it all. + std::stringstream tStr; + tStr << "RTP/AVP/TCP;unicast;interleaved=" << ((trackNo - 1) * 2) << "-" << ((trackNo - 1) * 2 + 1); + return tStr.str(); + }else{ + //A little more tricky: we need to find free ports and remember them. + data.SetDestination(dest, 1337); + rtcp.SetDestination(dest, 1337); + portA = data.bind(0); + portB = rtcp.bind(0); + std::stringstream tStr; + tStr << "RTP/AVP/UDP;unicast;client_port=" << portA << "-" << portB; + return tStr.str(); + } + } + /// Sets the TCP/UDP connection details from a given transport string. /// Sets the transportString member to the current transport string on success. /// \param host The host connecting to us. @@ -221,22 +244,55 @@ namespace SDP{ transportString = transport; }else{ channel = -1; + uint32_t sPortA = 0, sPortB = 0; + cPortA = cPortB = 0; + size_t sPort_loc = transport.rfind("server_port=") + 12; + if (sPort_loc != std::string::npos){ + sPortA = atol(transport.substr(sPort_loc, transport.find('-', sPort_loc) - sPort_loc).c_str()); + sPortB = atol(transport.substr(transport.find('-', sPort_loc)+1).c_str()); + } size_t port_loc = transport.rfind("client_port=") + 12; - cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str()); - uint32_t portA, portB; - // find available ports locally; + if (port_loc != std::string::npos){ + cPortA = atol(transport.substr(port_loc, transport.find('-', port_loc) - port_loc).c_str()); + cPortB = atol(transport.substr(transport.find('-', port_loc)+1).c_str()); + } + INFO_MSG("UDP ports: server %d/%d, client %d/%d", sPortA, sPortB, cPortA, cPortB); int sendbuff = 4 * 1024 * 1024; - data.SetDestination(host, cPort); - portA = data.bind(0); - setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); - rtcp.SetDestination(host, cPort + 1); - portB = rtcp.bind(0); - setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); - std::stringstream tStr; - tStr << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";"; - if (source.size()){tStr << "source=" << source << ";";} - tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << mySSRC << std::dec; - transportString = tStr.str(); + if (!sPortA || !sPortB){ + //Server mode - find server ports + data.SetDestination(host, cPortA); + setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + rtcp.SetDestination(host, cPortB); + setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + portA = data.bind(0); + portB = rtcp.bind(0); + std::stringstream tStr; + tStr << "RTP/AVP/UDP;unicast;client_port=" << cPortA << '-' << cPortB << ";"; + if (source.size()){tStr << "source=" << source << ";";} + tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << mySSRC << std::dec; + transportString = tStr.str(); + }else{ + //Client mode - check ports and/or obey given ports if possible + data.SetDestination(host, sPortA); + setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + rtcp.SetDestination(host, sPortB); + setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)); + if (portA != cPortA){ + portA = data.bind(cPortA); + if (portA != cPortA){ + FAIL_MSG("Server requested port %d, which we couldn't bind", cPortA); + return false; + } + } + if (portB != cPortB){ + portB = data.bind(cPortB); + if (portB != cPortB){ + FAIL_MSG("Server requested port %d, which we couldn't bind", cPortB); + return false; + } + } + transportString = transport; + } INFO_MSG("Transport string: %s", transportString.c_str()); } return true; @@ -260,6 +316,7 @@ namespace SDP{ DTSC::Track *thisTrack = 0; while (std::getline(ss, to, '\n')){ if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size() - 1, 1);} + if (to.empty()){continue;} // All tracks start with a media line if (to.substr(0, 2) == "m="){ @@ -289,7 +346,6 @@ namespace SDP{ thisTrack->codec = "ALAW"; thisTrack->rate = 8000; thisTrack->channels = 1; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); break; case 10: // PCM Stereo, 44.1kHz INFO_MSG("Linear PCM stereo 44.1kHz payload type"); @@ -298,7 +354,6 @@ namespace SDP{ thisTrack->size = 16; thisTrack->rate = 44100; thisTrack->channels = 2; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); break; case 11: // PCM Mono, 44.1kHz INFO_MSG("Linear PCM mono 44.1kHz payload type"); @@ -307,7 +362,6 @@ namespace SDP{ thisTrack->rate = 44100; thisTrack->size = 16; thisTrack->channels = 1; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); break; case 14: // MPA INFO_MSG("MPA payload type"); @@ -316,18 +370,16 @@ namespace SDP{ thisTrack->rate = 0; thisTrack->size = 0; thisTrack->channels = 0; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); break; case 32: // MPV INFO_MSG("MPV payload type"); nope = false; thisTrack->codec = "MPEG2"; - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); break; default: // dynamic type if (avp_type >= 96 && avp_type <= 127){ - INFO_MSG("Dynamic payload type (%llu) detected", avp_type); + HIGH_MSG("Dynamic payload type (%llu) detected", avp_type); nope = false; continue; }else{ @@ -336,6 +388,7 @@ namespace SDP{ } } } + HIGH_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); continue; } if (nope){continue;}// ignore lines if we have no valid track @@ -391,7 +444,7 @@ namespace SDP{ if (!thisTrack->codec.size()){ ERROR_MSG("Unsupported RTP mapping: %s", mediaType.c_str()); }else{ - INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); + HIGH_MSG("Incoming track %s", thisTrack->getIdentifier().c_str()); } continue; } @@ -399,6 +452,22 @@ namespace SDP{ tracks[trackNo].control = to.substr(10); continue; } + if (to.substr(0, 12) == "a=framerate:"){ + if (!thisTrack->rate){ + thisTrack->rate = atof(to.c_str() + 12)*1000; + } + continue; + } + if (to.substr(0, 12) == "a=framesize:"){ + //Ignored for now. + /// \TODO Maybe implement? + continue; + } + if (to.substr(0, 11) == "a=cliprect:"){ + //Ignored for now. + /// \TODO Maybe implement? + continue; + } if (to.substr(0, 7) == "a=fmtp:"){ tracks[trackNo].fmtp = to.substr(7); if (thisTrack->codec == "AAC"){ @@ -444,6 +513,9 @@ namespace SDP{ // at this point, the data is definitely for a track INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str()); } + for (std::map::iterator it = myMeta->tracks.begin(); it != myMeta->tracks.end(); ++it){ + INFO_MSG("Detected track %s", it->second.getIdentifier().c_str()); + } } /// Calculates H265 track metadata from sps and pps data stored in tracks[trackNo] @@ -573,6 +645,7 @@ namespace SDP{ // Header data? Compare to init, set if needed, and throw away uint8_t nalType = (buffer[4] & 0x1F); + if (nalType == 9 && len < 20){return;}//ignore delimiter-only packets switch (nalType){ case 7: // SPS if (tracks[track].spsData.size() != len - 4 || diff --git a/lib/sdp.h b/lib/sdp.h index 62afb074..049da973 100644 --- a/lib/sdp.h +++ b/lib/sdp.h @@ -22,18 +22,18 @@ namespace SDP{ int32_t lostTotal, lostCurrent; uint32_t packTotal, packCurrent; std::map packBuffer; - uint32_t cPort; std::string transportString; /// Current transport string. std::string control; std::string fmtp; /// fmtp string, used by getParamString / getParamInt std::string spsData; std::string ppsData; - uint32_t mySSRC, theirSSRC; + uint32_t mySSRC, theirSSRC, portA, portB, cPortA, cPortB; h265::initData hevcInfo; uint64_t fpsTime; double fpsMeta; double fps; Track(); + std::string generateTransport(uint32_t trackNo, const std::string &dest = "", bool TCPmode = true); std::string getParamString(const std::string ¶m) const; uint64_t getParamInt(const std::string ¶m) const; bool parseTransport(const std::string &transport, const std::string &host, diff --git a/src/analysers/analyser_rtsp.cpp b/src/analysers/analyser_rtsp.cpp index bd8ac41b..380ad5cd 100644 --- a/src/analysers/analyser_rtsp.cpp +++ b/src/analysers/analyser_rtsp.cpp @@ -113,7 +113,6 @@ bool AnalyserRTSP::parsePacket(){ return true; }while (isOpen()); - - // if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets + return false; } diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp new file mode 100755 index 00000000..4959ab35 --- /dev/null +++ b/src/input/input_rtsp.cpp @@ -0,0 +1,376 @@ +#include "input_rtsp.h" + +Mist::InputRTSP *classPointer = 0; +Socket::Connection *mainConn = 0; + +void incomingPacket(const DTSC::Packet &pkt){ + classPointer->incoming(pkt); +} + +/// Function used to send RTP packets over UDP +///\param socket A UDP Connection pointer, sent as a void*, to keep portability. +///\param data The RTP Packet that needs to be sent +///\param len The size of data +///\param channel Not used here, but is kept for compatibility with sendTCP +void sendUDP(void *socket, char *data, unsigned int len, unsigned int channel){ + ((Socket::UDPConnection *)socket)->SendNow(data, len); + if (mainConn){mainConn->addUp(len);} +} + +namespace Mist{ + InputRTSP::InputRTSP(Util::Config *cfg) : Input(cfg){ + TCPmode = true; + sdpState.myMeta = &myMeta; + sdpState.incomingPacketCallback = incomingPacket; + classPointer = this; + standAlone = false; + seenSDP = false; + cSeq = 0; + capa["name"] = "RTSP"; + capa["decs"] = "Allows pulling from live RTSP sources"; + capa["source_match"].append("rtsp://*"); + // These can/may be set to always-on mode + capa["always_match"].append("rtsp://*"); + capa["priority"] = 9ll; + capa["codecs"][0u][0u].append("H264"); + capa["codecs"][0u][0u].append("HEVC"); + capa["codecs"][0u][0u].append("MPEG2"); + capa["codecs"][0u][1u].append("AAC"); + capa["codecs"][0u][1u].append("MP3"); + capa["codecs"][0u][1u].append("AC3"); + capa["codecs"][0u][1u].append("ALAW"); + capa["codecs"][0u][1u].append("ULAW"); + capa["codecs"][0u][1u].append("PCM"); + capa["codecs"][0u][1u].append("opus"); + capa["codecs"][0u][1u].append("MP2"); + + JSON::Value option; + option["arg"] = "integer"; + option["long"] = "buffer"; + option["short"] = "b"; + option["help"] = "DVR buffer time in ms"; + option["value"].append(50000LL); + config->addOption("bufferTime", option); + capa["optional"]["DVR"]["name"] = "Buffer time (ms)"; + capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in " + "milliseconds. This is the time available to seek around in, " + "and will automatically be extended to fit whole keyframes " + "as well as the minimum duration needed for stable playback."; + capa["optional"]["DVR"]["option"] = "--buffer"; + capa["optional"]["DVR"]["type"] = "uint"; + capa["optional"]["DVR"]["default"] = 50000LL; + option.null(); + option["arg"] = "string"; + option["long"] = "transport"; + option["short"] = "t"; + option["help"] = "Transport protocol (TCP (default) or UDP)"; + option["value"].append("TCP"); + config->addOption("transport", option); + capa["optional"]["transport"]["name"] = "Transport protocol"; + capa["optional"]["transport"]["help"] = "Sets the transport protocol to either TCP (default) " + "or UDP. UDP requires ephemeral UDP ports to be open, " + "TCP does not."; + capa["optional"]["transport"]["option"] = "--transport"; + capa["optional"]["transport"]["type"] = "select"; + capa["optional"]["transport"]["select"].append("TCP"); + capa["optional"]["transport"]["select"].append("UDP"); + capa["optional"]["transport"]["default"] = "TCP"; + } + + void InputRTSP::sendCommand(const std::string &cmd, const std::string &cUrl, + const std::string &body, + const std::map &extraHeaders){ + ++cSeq; + sndH.Clean(); + sndH.protocol = "RTSP/1.0"; + sndH.method = cmd; + sndH.url = cUrl; + sndH.body = body; + if ((username.size() || password.size()) && authRequest.size()){ + sndH.auth(username, password, authRequest); + } + sndH.SetHeader("User-Agent", "MistServer " PACKAGE_VERSION); + sndH.SetHeader("CSeq", JSON::Value((long long)cSeq).asString()); + if (session.size()){sndH.SetHeader("Session", session);} + if (extraHeaders.size()){ + for (std::map::const_iterator it = extraHeaders.begin(); + it != extraHeaders.end(); ++it){ + sndH.SetHeader(it->first, it->second); + } + } + sndH.SendRequest(tcpCon); + } + + bool InputRTSP::checkArguments(){ + const std::string &inpt = config->getString("input"); + if (inpt.substr(0, 7) != "rtsp://"){ + FAIL_MSG("Unsupported RTSP URL: '%s'", inpt.c_str()); + return false; + } + const std::string &transport = config->getString("transport"); + if (transport != "TCP" && transport != "UDP" && transport != "tcp" && transport != "udp"){ + FAIL_MSG("Not a supported transport mode: %s", transport.c_str()); + return false; + } + if (transport == "UDP" || transport == "udp"){TCPmode = false;} + url = HTTP::URL(config->getString("input")); + username = url.user; + password = url.pass; + url.user = ""; + url.pass = ""; + return true; + } + + bool InputRTSP::openStreamSource(){ + tcpCon = Socket::Connection(url.host, url.getPort(), false); + mainConn = &tcpCon; + return tcpCon; + } + + void InputRTSP::parseStreamHeader(){ + std::map extraHeaders; + extraHeaders["Accept"] = "application/sdp"; + sendCommand("DESCRIBE", url.getUrl(), "", extraHeaders); + parsePacket(); + if (!seenSDP && authRequest.size() && (username.size() || password.size()) && tcpCon){ + INFO_MSG("Authenticating..."); + sendCommand("DESCRIBE", url.getUrl(), "", extraHeaders); + parsePacket(); + } + if (!tcpCon || !seenSDP){ + FAIL_MSG("Could not get stream description!"); + return; + } + if (sdpState.tracks.size()){ + for (std::map::iterator it = sdpState.tracks.begin(); + it != sdpState.tracks.end(); ++it){ + transportSet = false; + extraHeaders.clear(); + extraHeaders["Transport"] = it->second.generateTransport(it->first, url.host, TCPmode); + sendCommand("SETUP", url.link(it->second.control).getUrl(), "", extraHeaders); + parsePacket(); + if (!tcpCon || !transportSet){ + FAIL_MSG("Could not setup track %s!", myMeta.tracks[it->first].getIdentifier().c_str()); + tcpCon.close(); + return; + } + } + } + INFO_MSG("Setup complete"); + extraHeaders.clear(); + extraHeaders["Range"] = "npt=0.000-"; + sendCommand("PLAY", url.getUrl(), "", extraHeaders); + if (!TCPmode){ + tcpCon.setBlocking(false); + connectedAt = Util::epoch() + 2208988800ll; + } + } + + void InputRTSP::closeStreamSource(){ + sendCommand("TEARDOWN", url.getUrl(), ""); + tcpCon.close(); + } + + std::string InputRTSP::streamMainLoop(){ + uint64_t lastPing = Util::bootSecs(); + while (config->is_active && nProxy.userClient.isAlive() && parsePacket()){ + handleUDP(); + // keep going + nProxy.userClient.keepAlive(); + if (Util::bootSecs() - lastPing > 30){ + sendCommand("GET_PARAMETER", url.getUrl(), ""); + lastPing = Util::bootSecs(); + } + } + if (!tcpCon){return "TCP connection closed";} + if (!config->is_active){return "received deactivate signal";} + if (!nProxy.userClient.isAlive()){return "buffer shutdown";} + return "Unknown"; + } + + bool InputRTSP::parsePacket(){ + uint32_t waitTime = 500; + if (!TCPmode){waitTime = 50;} + do{ + // No new data? Sleep and retry, if connection still open + if (!tcpCon.Received().size() || !tcpCon.Received().available(1)){ + if (!tcpCon.spool() && tcpCon && config->is_active && nProxy.userClient.isAlive()){ + nProxy.userClient.keepAlive(); + Util::sleep(waitTime); + if (!TCPmode){return true;} + } + continue; + } + if (tcpCon.Received().copy(1) != "$"){ + // not a TCP RTP packet, read RTSP commands + if (recH.Read(tcpCon)){ + if (recH.hasHeader("WWW-Authenticate")){ + authRequest = recH.GetHeader("WWW-Authenticate"); + } + if (recH.url == "401"){ + INFO_MSG("Requires authentication"); + recH.Clean(); + return true; + } + if (recH.hasHeader("Content-Location")){ + url = HTTP::URL(recH.GetHeader("Content-Location")); + } + if (recH.hasHeader("Content-Base")){url = HTTP::URL(recH.GetHeader("Content-Base"));} + if (recH.hasHeader("Session")){ + session = recH.GetHeader("Session"); + if (session.find(';') != std::string::npos){ + session.erase(session.find(';'), std::string::npos); + } + } + if (recH.hasHeader("Content-Type") && + recH.GetHeader("Content-Type") == "application/sdp"){ + seenSDP = true; + sdpState.parseSDP(recH.body); + recH.Clean(); + return true; + } + if (recH.hasHeader("Transport")){ + uint32_t trackNo = sdpState.parseSetup(recH, url.host, ""); + if (trackNo){ + INFO_MSG("Parsed transport for track: %lu", trackNo); + transportSet = true; + }else{ + INFO_MSG("Could not parse transport string!"); + } + recH.Clean(); + return true; + } + if (recH.url == "200" && recH.hasHeader("RTP-Info")){ + INFO_MSG("Playback starting"); + recH.Clean(); + return true; + } + // Ignore "OK" replies beyond this point + if (recH.url == "200"){ + recH.Clean(); + return true; + } + + // Print anything possibly interesting to cerr + std::cerr << recH.BuildRequest() << std::endl; + recH.Clean(); + return true; + } + if (!tcpCon.spool() && tcpCon && config->is_active && nProxy.userClient.isAlive()){ + nProxy.userClient.keepAlive(); + Util::sleep(waitTime); + } + continue; + } + if (!tcpCon.Received().available(4)){ + if (!tcpCon.spool() && tcpCon && config->is_active && nProxy.userClient.isAlive()){ + nProxy.userClient.keepAlive(); + Util::sleep(waitTime); + } + continue; + }// a TCP RTP packet, but not complete yet + + // We have a TCP packet! Read it... + // Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data + std::string tcpHead = tcpCon.Received().copy(4); + uint16_t len = ntohs(*(short *)(tcpHead.data() + 2)); + if (!tcpCon.Received().available(len + 4)){ + if (!tcpCon.spool() && tcpCon){Util::sleep(waitTime);} + continue; + }// a TCP RTP packet, but not complete yet + // remove whole packet from buffer, including 4 byte header + std::string tcpPacket = tcpCon.Received().remove(len + 4); + RTP::Packet pkt(tcpPacket.data() + 4, len); + uint8_t chan = tcpHead.data()[1]; + uint32_t trackNo = sdpState.getTrackNoForChannel(chan); + EXTREME_MSG("Received %ub RTP packet #%u on channel %u, time %llu", len, + (unsigned int)pkt.getSequence(), chan, pkt.getTimeStamp()); + if (!trackNo && (chan % 2) != 1){ + WARN_MSG("Received packet for unknown track number on channel %u", chan); + } + if (trackNo){sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();} + + sdpState.handleIncomingRTP(trackNo, pkt); + + return true; + + }while (tcpCon && config->is_active && nProxy.userClient.isAlive()); + return false; + } + + /// Reads and handles RTP packets over UDP, if needed + bool InputRTSP::handleUDP(){ + if (TCPmode){return false;} + bool r = false; + for (std::map::iterator it = sdpState.tracks.begin(); + it != sdpState.tracks.end(); ++it){ + Socket::UDPConnection &s = it->second.data; + while (s.Receive()){ + r = true; + // if (s.getDestPort() != it->second.sPortA){ + // // wrong sending port, ignore packet + // continue; + //} + tcpCon.addDown(s.data_len); + RTP::Packet pack(s.data, s.data_len); + if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();} + // packet is very early - assume dropped after 30 packets + while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -30){ + WARN_MSG("Giving up on packet %u", it->second.rtpSeq); + ++(it->second.rtpSeq); + ++(it->second.lostTotal); + ++(it->second.lostCurrent); + ++(it->second.packTotal); + ++(it->second.packCurrent); + // send any buffered packets we may have + while (it->second.packBuffer.count(it->second.rtpSeq)){ + sdpState.handleIncomingRTP(it->first, pack); + ++(it->second.rtpSeq); + ++(it->second.packTotal); + ++(it->second.packCurrent); + } + } + // send any buffered packets we may have + while (it->second.packBuffer.count(it->second.rtpSeq)){ + sdpState.handleIncomingRTP(it->first, pack); + ++(it->second.rtpSeq); + ++(it->second.packTotal); + ++(it->second.packCurrent); + } + // packet is slightly early - buffer it + if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){ + INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence()); + it->second.packBuffer[pack.getSequence()] = pack; + } + // packet is late + if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){ + // negative difference? + --(it->second.lostTotal); + --(it->second.lostCurrent); + ++(it->second.packTotal); + ++(it->second.packCurrent); + WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)", + (int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence()))); + return false; + } + // packet is in order + if (it->second.rtpSeq == pack.getSequence()){ + sdpState.handleIncomingRTP(it->first, pack); + ++(it->second.rtpSeq); + ++(it->second.packTotal); + ++(it->second.packCurrent); + if (!it->second.theirSSRC){it->second.theirSSRC = pack.getSSRC();} + } + } + if (Util::epoch() / 5 != it->second.rtcpSent){ + it->second.rtcpSent = Util::epoch() / 5; + it->second.pack.sendRTCP_RR(connectedAt, it->second, it->first, myMeta, sendUDP); + } + } + return r; + } + + void InputRTSP::incoming(const DTSC::Packet &pkt){nProxy.bufferLivePacket(pkt, myMeta);} + +}// namespace Mist + diff --git a/src/input/input_rtsp.h b/src/input/input_rtsp.h new file mode 100755 index 00000000..c2d63c0f --- /dev/null +++ b/src/input/input_rtsp.h @@ -0,0 +1,50 @@ +#include "input.h" +#include +#include +#include +#include +#include +#include +#include + +namespace Mist{ + /// This class contains all functions needed to implement TS Input + class InputRTSP : public Input{ + public: + InputRTSP(Util::Config *cfg); + bool needsLock(){return false;} + void incoming(const DTSC::Packet &pkt); + + protected: + // Private Functions + bool checkArguments(); + bool needHeader(){return false;} + bool readHeader(){return true;} + void getNext(bool smart = true){} + bool openStreamSource(); + void closeStreamSource(); + void parseStreamHeader(); + void seek(int seekTime){} + void sendCommand(const std::string &cmd, const std::string &cUrl, const std::string &body, + const std::map &extraHeaders = + std::map()); + bool parsePacket(); + bool handleUDP(); + std::string streamMainLoop(); + Socket::Connection tcpCon; + HTTP::Parser sndH, recH; + HTTP::URL url; + std::string username, password, authRequest; + uint64_t cSeq; + SDP::State sdpState; + bool seenSDP; + bool transportSet; + bool TCPmode; + std::string session; + long long connectedAt; ///< The timestamp the connection was made, as reference point for RTCP + /// packets. + }; +}// namespace Mist + +typedef Mist::InputRTSP mistIn; + diff --git a/src/output/output_rtsp.cpp b/src/output/output_rtsp.cpp index d3d8e1a2..3162d902 100644 --- a/src/output/output_rtsp.cpp +++ b/src/output/output_rtsp.cpp @@ -379,7 +379,7 @@ namespace Mist{ it != sdpState.tracks.end(); ++it){ Socket::UDPConnection &s = it->second.data; while (s.Receive()){ - if (s.getDestPort() != it->second.cPort && checkPort){ + if (s.getDestPort() != it->second.cPortA && checkPort){ // wrong sending port, ignore packet continue; }