RTSP pull input
Includes TCP, UDP and authentication support
This commit is contained in:
parent
b81f980932
commit
f5c1e6b573
7 changed files with 526 additions and 27 deletions
|
@ -374,6 +374,7 @@ makeInput(MP4 mp4)#LTS
|
||||||
makeInput(TS ts)#LTS
|
makeInput(TS ts)#LTS
|
||||||
makeInput(Folder folder)#LTS
|
makeInput(Folder folder)#LTS
|
||||||
makeInput(Balancer balancer)#LTS
|
makeInput(Balancer balancer)#LTS
|
||||||
|
makeInput(RTSP rtsp)#LTS
|
||||||
|
|
||||||
########################################
|
########################################
|
||||||
# MistServer - Outputs #
|
# MistServer - Outputs #
|
||||||
|
|
105
lib/sdp.cpp
105
lib/sdp.cpp
|
@ -13,7 +13,6 @@ namespace SDP{
|
||||||
channel = -1;
|
channel = -1;
|
||||||
firstTime = 0;
|
firstTime = 0;
|
||||||
packCount = 0;
|
packCount = 0;
|
||||||
cPort = 0;
|
|
||||||
rtpSeq = 0;
|
rtpSeq = 0;
|
||||||
lostTotal = 0;
|
lostTotal = 0;
|
||||||
lostCurrent = 0;
|
lostCurrent = 0;
|
||||||
|
@ -23,6 +22,8 @@ namespace SDP{
|
||||||
fpsMeta = 0;
|
fpsMeta = 0;
|
||||||
fps = 0;
|
fps = 0;
|
||||||
mySSRC = rand();
|
mySSRC = rand();
|
||||||
|
portA = portB = 0;
|
||||||
|
cPortA = cPortB = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extracts a particular parameter from the fmtp string. fmtp member must be set before calling.
|
/// Extracts a particular parameter from the fmtp string. fmtp member must be set before calling.
|
||||||
|
@ -173,6 +174,28 @@ namespace SDP{
|
||||||
return mediaDesc.str();
|
return mediaDesc.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generates a transport string suitable for in a SETUP request.
|
||||||
|
/// By default generates a TCP mode string.
|
||||||
|
/// Expects parseTransport to be called with the response from the server.
|
||||||
|
std::string Track::generateTransport(uint32_t trackNo, const std::string &dest, bool TCPmode){
|
||||||
|
if (TCPmode){
|
||||||
|
//We simply request interleaved delivery over a trackNo-based identifier.
|
||||||
|
//No need to set any internal state, parseTransport will handle it all.
|
||||||
|
std::stringstream tStr;
|
||||||
|
tStr << "RTP/AVP/TCP;unicast;interleaved=" << ((trackNo - 1) * 2) << "-" << ((trackNo - 1) * 2 + 1);
|
||||||
|
return tStr.str();
|
||||||
|
}else{
|
||||||
|
//A little more tricky: we need to find free ports and remember them.
|
||||||
|
data.SetDestination(dest, 1337);
|
||||||
|
rtcp.SetDestination(dest, 1337);
|
||||||
|
portA = data.bind(0);
|
||||||
|
portB = rtcp.bind(0);
|
||||||
|
std::stringstream tStr;
|
||||||
|
tStr << "RTP/AVP/UDP;unicast;client_port=" << portA << "-" << portB;
|
||||||
|
return tStr.str();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets the TCP/UDP connection details from a given transport string.
|
/// Sets the TCP/UDP connection details from a given transport string.
|
||||||
/// Sets the transportString member to the current transport string on success.
|
/// Sets the transportString member to the current transport string on success.
|
||||||
/// \param host The host connecting to us.
|
/// \param host The host connecting to us.
|
||||||
|
@ -221,22 +244,55 @@ namespace SDP{
|
||||||
transportString = transport;
|
transportString = transport;
|
||||||
}else{
|
}else{
|
||||||
channel = -1;
|
channel = -1;
|
||||||
|
uint32_t sPortA = 0, sPortB = 0;
|
||||||
|
cPortA = cPortB = 0;
|
||||||
|
size_t sPort_loc = transport.rfind("server_port=") + 12;
|
||||||
|
if (sPort_loc != std::string::npos){
|
||||||
|
sPortA = atol(transport.substr(sPort_loc, transport.find('-', sPort_loc) - sPort_loc).c_str());
|
||||||
|
sPortB = atol(transport.substr(transport.find('-', sPort_loc)+1).c_str());
|
||||||
|
}
|
||||||
size_t port_loc = transport.rfind("client_port=") + 12;
|
size_t port_loc = transport.rfind("client_port=") + 12;
|
||||||
cPort = atol(transport.substr(port_loc, transport.rfind('-') - port_loc).c_str());
|
if (port_loc != std::string::npos){
|
||||||
uint32_t portA, portB;
|
cPortA = atol(transport.substr(port_loc, transport.find('-', port_loc) - port_loc).c_str());
|
||||||
// find available ports locally;
|
cPortB = atol(transport.substr(transport.find('-', port_loc)+1).c_str());
|
||||||
|
}
|
||||||
|
INFO_MSG("UDP ports: server %d/%d, client %d/%d", sPortA, sPortB, cPortA, cPortB);
|
||||||
int sendbuff = 4 * 1024 * 1024;
|
int sendbuff = 4 * 1024 * 1024;
|
||||||
data.SetDestination(host, cPort);
|
if (!sPortA || !sPortB){
|
||||||
portA = data.bind(0);
|
//Server mode - find server ports
|
||||||
|
data.SetDestination(host, cPortA);
|
||||||
setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
||||||
rtcp.SetDestination(host, cPort + 1);
|
rtcp.SetDestination(host, cPortB);
|
||||||
portB = rtcp.bind(0);
|
|
||||||
setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
||||||
|
portA = data.bind(0);
|
||||||
|
portB = rtcp.bind(0);
|
||||||
std::stringstream tStr;
|
std::stringstream tStr;
|
||||||
tStr << "RTP/AVP/UDP;unicast;client_port=" << cPort << '-' << cPort + 1 << ";";
|
tStr << "RTP/AVP/UDP;unicast;client_port=" << cPortA << '-' << cPortB << ";";
|
||||||
if (source.size()){tStr << "source=" << source << ";";}
|
if (source.size()){tStr << "source=" << source << ";";}
|
||||||
tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << mySSRC << std::dec;
|
tStr << "server_port=" << portA << "-" << portB << ";ssrc=" << std::hex << mySSRC << std::dec;
|
||||||
transportString = tStr.str();
|
transportString = tStr.str();
|
||||||
|
}else{
|
||||||
|
//Client mode - check ports and/or obey given ports if possible
|
||||||
|
data.SetDestination(host, sPortA);
|
||||||
|
setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
||||||
|
rtcp.SetDestination(host, sPortB);
|
||||||
|
setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
||||||
|
if (portA != cPortA){
|
||||||
|
portA = data.bind(cPortA);
|
||||||
|
if (portA != cPortA){
|
||||||
|
FAIL_MSG("Server requested port %d, which we couldn't bind", cPortA);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (portB != cPortB){
|
||||||
|
portB = data.bind(cPortB);
|
||||||
|
if (portB != cPortB){
|
||||||
|
FAIL_MSG("Server requested port %d, which we couldn't bind", cPortB);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
transportString = transport;
|
||||||
|
}
|
||||||
INFO_MSG("Transport string: %s", transportString.c_str());
|
INFO_MSG("Transport string: %s", transportString.c_str());
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -260,6 +316,7 @@ namespace SDP{
|
||||||
DTSC::Track *thisTrack = 0;
|
DTSC::Track *thisTrack = 0;
|
||||||
while (std::getline(ss, to, '\n')){
|
while (std::getline(ss, to, '\n')){
|
||||||
if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size() - 1, 1);}
|
if (!to.empty() && *to.rbegin() == '\r'){to.erase(to.size() - 1, 1);}
|
||||||
|
if (to.empty()){continue;}
|
||||||
|
|
||||||
// All tracks start with a media line
|
// All tracks start with a media line
|
||||||
if (to.substr(0, 2) == "m="){
|
if (to.substr(0, 2) == "m="){
|
||||||
|
@ -289,7 +346,6 @@ namespace SDP{
|
||||||
thisTrack->codec = "ALAW";
|
thisTrack->codec = "ALAW";
|
||||||
thisTrack->rate = 8000;
|
thisTrack->rate = 8000;
|
||||||
thisTrack->channels = 1;
|
thisTrack->channels = 1;
|
||||||
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
|
||||||
break;
|
break;
|
||||||
case 10: // PCM Stereo, 44.1kHz
|
case 10: // PCM Stereo, 44.1kHz
|
||||||
INFO_MSG("Linear PCM stereo 44.1kHz payload type");
|
INFO_MSG("Linear PCM stereo 44.1kHz payload type");
|
||||||
|
@ -298,7 +354,6 @@ namespace SDP{
|
||||||
thisTrack->size = 16;
|
thisTrack->size = 16;
|
||||||
thisTrack->rate = 44100;
|
thisTrack->rate = 44100;
|
||||||
thisTrack->channels = 2;
|
thisTrack->channels = 2;
|
||||||
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
|
||||||
break;
|
break;
|
||||||
case 11: // PCM Mono, 44.1kHz
|
case 11: // PCM Mono, 44.1kHz
|
||||||
INFO_MSG("Linear PCM mono 44.1kHz payload type");
|
INFO_MSG("Linear PCM mono 44.1kHz payload type");
|
||||||
|
@ -307,7 +362,6 @@ namespace SDP{
|
||||||
thisTrack->rate = 44100;
|
thisTrack->rate = 44100;
|
||||||
thisTrack->size = 16;
|
thisTrack->size = 16;
|
||||||
thisTrack->channels = 1;
|
thisTrack->channels = 1;
|
||||||
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
|
||||||
break;
|
break;
|
||||||
case 14: // MPA
|
case 14: // MPA
|
||||||
INFO_MSG("MPA payload type");
|
INFO_MSG("MPA payload type");
|
||||||
|
@ -316,18 +370,16 @@ namespace SDP{
|
||||||
thisTrack->rate = 0;
|
thisTrack->rate = 0;
|
||||||
thisTrack->size = 0;
|
thisTrack->size = 0;
|
||||||
thisTrack->channels = 0;
|
thisTrack->channels = 0;
|
||||||
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
|
||||||
break;
|
break;
|
||||||
case 32: // MPV
|
case 32: // MPV
|
||||||
INFO_MSG("MPV payload type");
|
INFO_MSG("MPV payload type");
|
||||||
nope = false;
|
nope = false;
|
||||||
thisTrack->codec = "MPEG2";
|
thisTrack->codec = "MPEG2";
|
||||||
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// dynamic type
|
// dynamic type
|
||||||
if (avp_type >= 96 && avp_type <= 127){
|
if (avp_type >= 96 && avp_type <= 127){
|
||||||
INFO_MSG("Dynamic payload type (%llu) detected", avp_type);
|
HIGH_MSG("Dynamic payload type (%llu) detected", avp_type);
|
||||||
nope = false;
|
nope = false;
|
||||||
continue;
|
continue;
|
||||||
}else{
|
}else{
|
||||||
|
@ -336,6 +388,7 @@ namespace SDP{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
HIGH_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (nope){continue;}// ignore lines if we have no valid track
|
if (nope){continue;}// ignore lines if we have no valid track
|
||||||
|
@ -391,7 +444,7 @@ namespace SDP{
|
||||||
if (!thisTrack->codec.size()){
|
if (!thisTrack->codec.size()){
|
||||||
ERROR_MSG("Unsupported RTP mapping: %s", mediaType.c_str());
|
ERROR_MSG("Unsupported RTP mapping: %s", mediaType.c_str());
|
||||||
}else{
|
}else{
|
||||||
INFO_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
HIGH_MSG("Incoming track %s", thisTrack->getIdentifier().c_str());
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -399,6 +452,22 @@ namespace SDP{
|
||||||
tracks[trackNo].control = to.substr(10);
|
tracks[trackNo].control = to.substr(10);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (to.substr(0, 12) == "a=framerate:"){
|
||||||
|
if (!thisTrack->rate){
|
||||||
|
thisTrack->rate = atof(to.c_str() + 12)*1000;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (to.substr(0, 12) == "a=framesize:"){
|
||||||
|
//Ignored for now.
|
||||||
|
/// \TODO Maybe implement?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (to.substr(0, 11) == "a=cliprect:"){
|
||||||
|
//Ignored for now.
|
||||||
|
/// \TODO Maybe implement?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (to.substr(0, 7) == "a=fmtp:"){
|
if (to.substr(0, 7) == "a=fmtp:"){
|
||||||
tracks[trackNo].fmtp = to.substr(7);
|
tracks[trackNo].fmtp = to.substr(7);
|
||||||
if (thisTrack->codec == "AAC"){
|
if (thisTrack->codec == "AAC"){
|
||||||
|
@ -444,6 +513,9 @@ namespace SDP{
|
||||||
// at this point, the data is definitely for a track
|
// at this point, the data is definitely for a track
|
||||||
INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str());
|
INFO_MSG("Unhandled SDP line for track %llu: %s", trackNo, to.c_str());
|
||||||
}
|
}
|
||||||
|
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta->tracks.begin(); it != myMeta->tracks.end(); ++it){
|
||||||
|
INFO_MSG("Detected track %s", it->second.getIdentifier().c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculates H265 track metadata from sps and pps data stored in tracks[trackNo]
|
/// Calculates H265 track metadata from sps and pps data stored in tracks[trackNo]
|
||||||
|
@ -573,6 +645,7 @@ namespace SDP{
|
||||||
|
|
||||||
// Header data? Compare to init, set if needed, and throw away
|
// Header data? Compare to init, set if needed, and throw away
|
||||||
uint8_t nalType = (buffer[4] & 0x1F);
|
uint8_t nalType = (buffer[4] & 0x1F);
|
||||||
|
if (nalType == 9 && len < 20){return;}//ignore delimiter-only packets
|
||||||
switch (nalType){
|
switch (nalType){
|
||||||
case 7: // SPS
|
case 7: // SPS
|
||||||
if (tracks[track].spsData.size() != len - 4 ||
|
if (tracks[track].spsData.size() != len - 4 ||
|
||||||
|
|
|
@ -22,18 +22,18 @@ namespace SDP{
|
||||||
int32_t lostTotal, lostCurrent;
|
int32_t lostTotal, lostCurrent;
|
||||||
uint32_t packTotal, packCurrent;
|
uint32_t packTotal, packCurrent;
|
||||||
std::map<uint16_t, RTP::Packet> packBuffer;
|
std::map<uint16_t, RTP::Packet> packBuffer;
|
||||||
uint32_t cPort;
|
|
||||||
std::string transportString; /// Current transport string.
|
std::string transportString; /// Current transport string.
|
||||||
std::string control;
|
std::string control;
|
||||||
std::string fmtp; /// fmtp string, used by getParamString / getParamInt
|
std::string fmtp; /// fmtp string, used by getParamString / getParamInt
|
||||||
std::string spsData;
|
std::string spsData;
|
||||||
std::string ppsData;
|
std::string ppsData;
|
||||||
uint32_t mySSRC, theirSSRC;
|
uint32_t mySSRC, theirSSRC, portA, portB, cPortA, cPortB;
|
||||||
h265::initData hevcInfo;
|
h265::initData hevcInfo;
|
||||||
uint64_t fpsTime;
|
uint64_t fpsTime;
|
||||||
double fpsMeta;
|
double fpsMeta;
|
||||||
double fps;
|
double fps;
|
||||||
Track();
|
Track();
|
||||||
|
std::string generateTransport(uint32_t trackNo, const std::string &dest = "", bool TCPmode = true);
|
||||||
std::string getParamString(const std::string ¶m) const;
|
std::string getParamString(const std::string ¶m) const;
|
||||||
uint64_t getParamInt(const std::string ¶m) const;
|
uint64_t getParamInt(const std::string ¶m) const;
|
||||||
bool parseTransport(const std::string &transport, const std::string &host,
|
bool parseTransport(const std::string &transport, const std::string &host,
|
||||||
|
|
|
@ -113,7 +113,6 @@ bool AnalyserRTSP::parsePacket(){
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
}while (isOpen());
|
}while (isOpen());
|
||||||
|
return false;
|
||||||
// if needed, parse TCP packets, and cancel if it is not safe (yet) to read HTTP/RTSP packets
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
376
src/input/input_rtsp.cpp
Executable file
376
src/input/input_rtsp.cpp
Executable file
|
@ -0,0 +1,376 @@
|
||||||
|
#include "input_rtsp.h"
|
||||||
|
|
||||||
|
Mist::InputRTSP *classPointer = 0;
|
||||||
|
Socket::Connection *mainConn = 0;
|
||||||
|
|
||||||
|
void incomingPacket(const DTSC::Packet &pkt){
|
||||||
|
classPointer->incoming(pkt);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Function used to send RTP packets over UDP
|
||||||
|
///\param socket A UDP Connection pointer, sent as a void*, to keep portability.
|
||||||
|
///\param data The RTP Packet that needs to be sent
|
||||||
|
///\param len The size of data
|
||||||
|
///\param channel Not used here, but is kept for compatibility with sendTCP
|
||||||
|
void sendUDP(void *socket, char *data, unsigned int len, unsigned int channel){
|
||||||
|
((Socket::UDPConnection *)socket)->SendNow(data, len);
|
||||||
|
if (mainConn){mainConn->addUp(len);}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace Mist{
|
||||||
|
InputRTSP::InputRTSP(Util::Config *cfg) : Input(cfg){
|
||||||
|
TCPmode = true;
|
||||||
|
sdpState.myMeta = &myMeta;
|
||||||
|
sdpState.incomingPacketCallback = incomingPacket;
|
||||||
|
classPointer = this;
|
||||||
|
standAlone = false;
|
||||||
|
seenSDP = false;
|
||||||
|
cSeq = 0;
|
||||||
|
capa["name"] = "RTSP";
|
||||||
|
capa["decs"] = "Allows pulling from live RTSP sources";
|
||||||
|
capa["source_match"].append("rtsp://*");
|
||||||
|
// These can/may be set to always-on mode
|
||||||
|
capa["always_match"].append("rtsp://*");
|
||||||
|
capa["priority"] = 9ll;
|
||||||
|
capa["codecs"][0u][0u].append("H264");
|
||||||
|
capa["codecs"][0u][0u].append("HEVC");
|
||||||
|
capa["codecs"][0u][0u].append("MPEG2");
|
||||||
|
capa["codecs"][0u][1u].append("AAC");
|
||||||
|
capa["codecs"][0u][1u].append("MP3");
|
||||||
|
capa["codecs"][0u][1u].append("AC3");
|
||||||
|
capa["codecs"][0u][1u].append("ALAW");
|
||||||
|
capa["codecs"][0u][1u].append("ULAW");
|
||||||
|
capa["codecs"][0u][1u].append("PCM");
|
||||||
|
capa["codecs"][0u][1u].append("opus");
|
||||||
|
capa["codecs"][0u][1u].append("MP2");
|
||||||
|
|
||||||
|
JSON::Value option;
|
||||||
|
option["arg"] = "integer";
|
||||||
|
option["long"] = "buffer";
|
||||||
|
option["short"] = "b";
|
||||||
|
option["help"] = "DVR buffer time in ms";
|
||||||
|
option["value"].append(50000LL);
|
||||||
|
config->addOption("bufferTime", option);
|
||||||
|
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
|
||||||
|
capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in "
|
||||||
|
"milliseconds. This is the time available to seek around in, "
|
||||||
|
"and will automatically be extended to fit whole keyframes "
|
||||||
|
"as well as the minimum duration needed for stable playback.";
|
||||||
|
capa["optional"]["DVR"]["option"] = "--buffer";
|
||||||
|
capa["optional"]["DVR"]["type"] = "uint";
|
||||||
|
capa["optional"]["DVR"]["default"] = 50000LL;
|
||||||
|
option.null();
|
||||||
|
option["arg"] = "string";
|
||||||
|
option["long"] = "transport";
|
||||||
|
option["short"] = "t";
|
||||||
|
option["help"] = "Transport protocol (TCP (default) or UDP)";
|
||||||
|
option["value"].append("TCP");
|
||||||
|
config->addOption("transport", option);
|
||||||
|
capa["optional"]["transport"]["name"] = "Transport protocol";
|
||||||
|
capa["optional"]["transport"]["help"] = "Sets the transport protocol to either TCP (default) "
|
||||||
|
"or UDP. UDP requires ephemeral UDP ports to be open, "
|
||||||
|
"TCP does not.";
|
||||||
|
capa["optional"]["transport"]["option"] = "--transport";
|
||||||
|
capa["optional"]["transport"]["type"] = "select";
|
||||||
|
capa["optional"]["transport"]["select"].append("TCP");
|
||||||
|
capa["optional"]["transport"]["select"].append("UDP");
|
||||||
|
capa["optional"]["transport"]["default"] = "TCP";
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputRTSP::sendCommand(const std::string &cmd, const std::string &cUrl,
|
||||||
|
const std::string &body,
|
||||||
|
const std::map<std::string, std::string> &extraHeaders){
|
||||||
|
++cSeq;
|
||||||
|
sndH.Clean();
|
||||||
|
sndH.protocol = "RTSP/1.0";
|
||||||
|
sndH.method = cmd;
|
||||||
|
sndH.url = cUrl;
|
||||||
|
sndH.body = body;
|
||||||
|
if ((username.size() || password.size()) && authRequest.size()){
|
||||||
|
sndH.auth(username, password, authRequest);
|
||||||
|
}
|
||||||
|
sndH.SetHeader("User-Agent", "MistServer " PACKAGE_VERSION);
|
||||||
|
sndH.SetHeader("CSeq", JSON::Value((long long)cSeq).asString());
|
||||||
|
if (session.size()){sndH.SetHeader("Session", session);}
|
||||||
|
if (extraHeaders.size()){
|
||||||
|
for (std::map<std::string, std::string>::const_iterator it = extraHeaders.begin();
|
||||||
|
it != extraHeaders.end(); ++it){
|
||||||
|
sndH.SetHeader(it->first, it->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sndH.SendRequest(tcpCon);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool InputRTSP::checkArguments(){
|
||||||
|
const std::string &inpt = config->getString("input");
|
||||||
|
if (inpt.substr(0, 7) != "rtsp://"){
|
||||||
|
FAIL_MSG("Unsupported RTSP URL: '%s'", inpt.c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const std::string &transport = config->getString("transport");
|
||||||
|
if (transport != "TCP" && transport != "UDP" && transport != "tcp" && transport != "udp"){
|
||||||
|
FAIL_MSG("Not a supported transport mode: %s", transport.c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (transport == "UDP" || transport == "udp"){TCPmode = false;}
|
||||||
|
url = HTTP::URL(config->getString("input"));
|
||||||
|
username = url.user;
|
||||||
|
password = url.pass;
|
||||||
|
url.user = "";
|
||||||
|
url.pass = "";
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool InputRTSP::openStreamSource(){
|
||||||
|
tcpCon = Socket::Connection(url.host, url.getPort(), false);
|
||||||
|
mainConn = &tcpCon;
|
||||||
|
return tcpCon;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputRTSP::parseStreamHeader(){
|
||||||
|
std::map<std::string, std::string> extraHeaders;
|
||||||
|
extraHeaders["Accept"] = "application/sdp";
|
||||||
|
sendCommand("DESCRIBE", url.getUrl(), "", extraHeaders);
|
||||||
|
parsePacket();
|
||||||
|
if (!seenSDP && authRequest.size() && (username.size() || password.size()) && tcpCon){
|
||||||
|
INFO_MSG("Authenticating...");
|
||||||
|
sendCommand("DESCRIBE", url.getUrl(), "", extraHeaders);
|
||||||
|
parsePacket();
|
||||||
|
}
|
||||||
|
if (!tcpCon || !seenSDP){
|
||||||
|
FAIL_MSG("Could not get stream description!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (sdpState.tracks.size()){
|
||||||
|
for (std::map<uint32_t, SDP::Track>::iterator it = sdpState.tracks.begin();
|
||||||
|
it != sdpState.tracks.end(); ++it){
|
||||||
|
transportSet = false;
|
||||||
|
extraHeaders.clear();
|
||||||
|
extraHeaders["Transport"] = it->second.generateTransport(it->first, url.host, TCPmode);
|
||||||
|
sendCommand("SETUP", url.link(it->second.control).getUrl(), "", extraHeaders);
|
||||||
|
parsePacket();
|
||||||
|
if (!tcpCon || !transportSet){
|
||||||
|
FAIL_MSG("Could not setup track %s!", myMeta.tracks[it->first].getIdentifier().c_str());
|
||||||
|
tcpCon.close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
INFO_MSG("Setup complete");
|
||||||
|
extraHeaders.clear();
|
||||||
|
extraHeaders["Range"] = "npt=0.000-";
|
||||||
|
sendCommand("PLAY", url.getUrl(), "", extraHeaders);
|
||||||
|
if (!TCPmode){
|
||||||
|
tcpCon.setBlocking(false);
|
||||||
|
connectedAt = Util::epoch() + 2208988800ll;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputRTSP::closeStreamSource(){
|
||||||
|
sendCommand("TEARDOWN", url.getUrl(), "");
|
||||||
|
tcpCon.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string InputRTSP::streamMainLoop(){
|
||||||
|
uint64_t lastPing = Util::bootSecs();
|
||||||
|
while (config->is_active && nProxy.userClient.isAlive() && parsePacket()){
|
||||||
|
handleUDP();
|
||||||
|
// keep going
|
||||||
|
nProxy.userClient.keepAlive();
|
||||||
|
if (Util::bootSecs() - lastPing > 30){
|
||||||
|
sendCommand("GET_PARAMETER", url.getUrl(), "");
|
||||||
|
lastPing = Util::bootSecs();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!tcpCon){return "TCP connection closed";}
|
||||||
|
if (!config->is_active){return "received deactivate signal";}
|
||||||
|
if (!nProxy.userClient.isAlive()){return "buffer shutdown";}
|
||||||
|
return "Unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
bool InputRTSP::parsePacket(){
|
||||||
|
uint32_t waitTime = 500;
|
||||||
|
if (!TCPmode){waitTime = 50;}
|
||||||
|
do{
|
||||||
|
// No new data? Sleep and retry, if connection still open
|
||||||
|
if (!tcpCon.Received().size() || !tcpCon.Received().available(1)){
|
||||||
|
if (!tcpCon.spool() && tcpCon && config->is_active && nProxy.userClient.isAlive()){
|
||||||
|
nProxy.userClient.keepAlive();
|
||||||
|
Util::sleep(waitTime);
|
||||||
|
if (!TCPmode){return true;}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (tcpCon.Received().copy(1) != "$"){
|
||||||
|
// not a TCP RTP packet, read RTSP commands
|
||||||
|
if (recH.Read(tcpCon)){
|
||||||
|
if (recH.hasHeader("WWW-Authenticate")){
|
||||||
|
authRequest = recH.GetHeader("WWW-Authenticate");
|
||||||
|
}
|
||||||
|
if (recH.url == "401"){
|
||||||
|
INFO_MSG("Requires authentication");
|
||||||
|
recH.Clean();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (recH.hasHeader("Content-Location")){
|
||||||
|
url = HTTP::URL(recH.GetHeader("Content-Location"));
|
||||||
|
}
|
||||||
|
if (recH.hasHeader("Content-Base")){url = HTTP::URL(recH.GetHeader("Content-Base"));}
|
||||||
|
if (recH.hasHeader("Session")){
|
||||||
|
session = recH.GetHeader("Session");
|
||||||
|
if (session.find(';') != std::string::npos){
|
||||||
|
session.erase(session.find(';'), std::string::npos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (recH.hasHeader("Content-Type") &&
|
||||||
|
recH.GetHeader("Content-Type") == "application/sdp"){
|
||||||
|
seenSDP = true;
|
||||||
|
sdpState.parseSDP(recH.body);
|
||||||
|
recH.Clean();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (recH.hasHeader("Transport")){
|
||||||
|
uint32_t trackNo = sdpState.parseSetup(recH, url.host, "");
|
||||||
|
if (trackNo){
|
||||||
|
INFO_MSG("Parsed transport for track: %lu", trackNo);
|
||||||
|
transportSet = true;
|
||||||
|
}else{
|
||||||
|
INFO_MSG("Could not parse transport string!");
|
||||||
|
}
|
||||||
|
recH.Clean();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (recH.url == "200" && recH.hasHeader("RTP-Info")){
|
||||||
|
INFO_MSG("Playback starting");
|
||||||
|
recH.Clean();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// Ignore "OK" replies beyond this point
|
||||||
|
if (recH.url == "200"){
|
||||||
|
recH.Clean();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print anything possibly interesting to cerr
|
||||||
|
std::cerr << recH.BuildRequest() << std::endl;
|
||||||
|
recH.Clean();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!tcpCon.spool() && tcpCon && config->is_active && nProxy.userClient.isAlive()){
|
||||||
|
nProxy.userClient.keepAlive();
|
||||||
|
Util::sleep(waitTime);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!tcpCon.Received().available(4)){
|
||||||
|
if (!tcpCon.spool() && tcpCon && config->is_active && nProxy.userClient.isAlive()){
|
||||||
|
nProxy.userClient.keepAlive();
|
||||||
|
Util::sleep(waitTime);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}// a TCP RTP packet, but not complete yet
|
||||||
|
|
||||||
|
// We have a TCP packet! Read it...
|
||||||
|
// Format: 1 byte '$', 1 byte channel, 2 bytes len, len bytes binary data
|
||||||
|
std::string tcpHead = tcpCon.Received().copy(4);
|
||||||
|
uint16_t len = ntohs(*(short *)(tcpHead.data() + 2));
|
||||||
|
if (!tcpCon.Received().available(len + 4)){
|
||||||
|
if (!tcpCon.spool() && tcpCon){Util::sleep(waitTime);}
|
||||||
|
continue;
|
||||||
|
}// a TCP RTP packet, but not complete yet
|
||||||
|
// remove whole packet from buffer, including 4 byte header
|
||||||
|
std::string tcpPacket = tcpCon.Received().remove(len + 4);
|
||||||
|
RTP::Packet pkt(tcpPacket.data() + 4, len);
|
||||||
|
uint8_t chan = tcpHead.data()[1];
|
||||||
|
uint32_t trackNo = sdpState.getTrackNoForChannel(chan);
|
||||||
|
EXTREME_MSG("Received %ub RTP packet #%u on channel %u, time %llu", len,
|
||||||
|
(unsigned int)pkt.getSequence(), chan, pkt.getTimeStamp());
|
||||||
|
if (!trackNo && (chan % 2) != 1){
|
||||||
|
WARN_MSG("Received packet for unknown track number on channel %u", chan);
|
||||||
|
}
|
||||||
|
if (trackNo){sdpState.tracks[trackNo].rtpSeq = pkt.getSequence();}
|
||||||
|
|
||||||
|
sdpState.handleIncomingRTP(trackNo, pkt);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
|
||||||
|
}while (tcpCon && config->is_active && nProxy.userClient.isAlive());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads and handles RTP packets over UDP, if needed
|
||||||
|
bool InputRTSP::handleUDP(){
|
||||||
|
if (TCPmode){return false;}
|
||||||
|
bool r = false;
|
||||||
|
for (std::map<uint32_t, SDP::Track>::iterator it = sdpState.tracks.begin();
|
||||||
|
it != sdpState.tracks.end(); ++it){
|
||||||
|
Socket::UDPConnection &s = it->second.data;
|
||||||
|
while (s.Receive()){
|
||||||
|
r = true;
|
||||||
|
// if (s.getDestPort() != it->second.sPortA){
|
||||||
|
// // wrong sending port, ignore packet
|
||||||
|
// continue;
|
||||||
|
//}
|
||||||
|
tcpCon.addDown(s.data_len);
|
||||||
|
RTP::Packet pack(s.data, s.data_len);
|
||||||
|
if (!it->second.rtpSeq){it->second.rtpSeq = pack.getSequence();}
|
||||||
|
// packet is very early - assume dropped after 30 packets
|
||||||
|
while ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < -30){
|
||||||
|
WARN_MSG("Giving up on packet %u", it->second.rtpSeq);
|
||||||
|
++(it->second.rtpSeq);
|
||||||
|
++(it->second.lostTotal);
|
||||||
|
++(it->second.lostCurrent);
|
||||||
|
++(it->second.packTotal);
|
||||||
|
++(it->second.packCurrent);
|
||||||
|
// send any buffered packets we may have
|
||||||
|
while (it->second.packBuffer.count(it->second.rtpSeq)){
|
||||||
|
sdpState.handleIncomingRTP(it->first, pack);
|
||||||
|
++(it->second.rtpSeq);
|
||||||
|
++(it->second.packTotal);
|
||||||
|
++(it->second.packCurrent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// send any buffered packets we may have
|
||||||
|
while (it->second.packBuffer.count(it->second.rtpSeq)){
|
||||||
|
sdpState.handleIncomingRTP(it->first, pack);
|
||||||
|
++(it->second.rtpSeq);
|
||||||
|
++(it->second.packTotal);
|
||||||
|
++(it->second.packCurrent);
|
||||||
|
}
|
||||||
|
// packet is slightly early - buffer it
|
||||||
|
if (((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) < 0)){
|
||||||
|
INFO_MSG("Buffering early packet #%u->%u", it->second.rtpSeq, pack.getSequence());
|
||||||
|
it->second.packBuffer[pack.getSequence()] = pack;
|
||||||
|
}
|
||||||
|
// packet is late
|
||||||
|
if ((int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())) > 0){
|
||||||
|
// negative difference?
|
||||||
|
--(it->second.lostTotal);
|
||||||
|
--(it->second.lostCurrent);
|
||||||
|
++(it->second.packTotal);
|
||||||
|
++(it->second.packCurrent);
|
||||||
|
WARN_MSG("Dropped a packet that arrived too late! (%d packets difference)",
|
||||||
|
(int16_t)(((uint16_t)it->second.rtpSeq) - ((uint16_t)pack.getSequence())));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// packet is in order
|
||||||
|
if (it->second.rtpSeq == pack.getSequence()){
|
||||||
|
sdpState.handleIncomingRTP(it->first, pack);
|
||||||
|
++(it->second.rtpSeq);
|
||||||
|
++(it->second.packTotal);
|
||||||
|
++(it->second.packCurrent);
|
||||||
|
if (!it->second.theirSSRC){it->second.theirSSRC = pack.getSSRC();}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Util::epoch() / 5 != it->second.rtcpSent){
|
||||||
|
it->second.rtcpSent = Util::epoch() / 5;
|
||||||
|
it->second.pack.sendRTCP_RR(connectedAt, it->second, it->first, myMeta, sendUDP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InputRTSP::incoming(const DTSC::Packet &pkt){nProxy.bufferLivePacket(pkt, myMeta);}
|
||||||
|
|
||||||
|
}// namespace Mist
|
||||||
|
|
50
src/input/input_rtsp.h
Executable file
50
src/input/input_rtsp.h
Executable file
|
@ -0,0 +1,50 @@
|
||||||
|
#include "input.h"
|
||||||
|
#include <mist/dtsc.h>
|
||||||
|
#include <mist/http_parser.h>
|
||||||
|
#include <mist/nal.h>
|
||||||
|
#include <mist/rtp.h>
|
||||||
|
#include <mist/sdp.h>
|
||||||
|
#include <set>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace Mist{
|
||||||
|
/// This class contains all functions needed to implement TS Input
|
||||||
|
class InputRTSP : public Input{
|
||||||
|
public:
|
||||||
|
InputRTSP(Util::Config *cfg);
|
||||||
|
bool needsLock(){return false;}
|
||||||
|
void incoming(const DTSC::Packet &pkt);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// Private Functions
|
||||||
|
bool checkArguments();
|
||||||
|
bool needHeader(){return false;}
|
||||||
|
bool readHeader(){return true;}
|
||||||
|
void getNext(bool smart = true){}
|
||||||
|
bool openStreamSource();
|
||||||
|
void closeStreamSource();
|
||||||
|
void parseStreamHeader();
|
||||||
|
void seek(int seekTime){}
|
||||||
|
void sendCommand(const std::string &cmd, const std::string &cUrl, const std::string &body,
|
||||||
|
const std::map<std::string, std::string> &extraHeaders =
|
||||||
|
std::map<std::string, std::string>());
|
||||||
|
bool parsePacket();
|
||||||
|
bool handleUDP();
|
||||||
|
std::string streamMainLoop();
|
||||||
|
Socket::Connection tcpCon;
|
||||||
|
HTTP::Parser sndH, recH;
|
||||||
|
HTTP::URL url;
|
||||||
|
std::string username, password, authRequest;
|
||||||
|
uint64_t cSeq;
|
||||||
|
SDP::State sdpState;
|
||||||
|
bool seenSDP;
|
||||||
|
bool transportSet;
|
||||||
|
bool TCPmode;
|
||||||
|
std::string session;
|
||||||
|
long long connectedAt; ///< The timestamp the connection was made, as reference point for RTCP
|
||||||
|
/// packets.
|
||||||
|
};
|
||||||
|
}// namespace Mist
|
||||||
|
|
||||||
|
typedef Mist::InputRTSP mistIn;
|
||||||
|
|
|
@ -379,7 +379,7 @@ namespace Mist{
|
||||||
it != sdpState.tracks.end(); ++it){
|
it != sdpState.tracks.end(); ++it){
|
||||||
Socket::UDPConnection &s = it->second.data;
|
Socket::UDPConnection &s = it->second.data;
|
||||||
while (s.Receive()){
|
while (s.Receive()){
|
||||||
if (s.getDestPort() != it->second.cPort && checkPort){
|
if (s.getDestPort() != it->second.cPortA && checkPort){
|
||||||
// wrong sending port, ignore packet
|
// wrong sending port, ignore packet
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue