Added SDP input
This commit is contained in:
parent
dd2382e858
commit
3e73508a6a
8 changed files with 571 additions and 46 deletions
|
@ -486,6 +486,7 @@ makeInput(Playlist playlist)#LTS
|
|||
makeInput(Balancer balancer)#LTS
|
||||
makeInput(RTSP rtsp)#LTS
|
||||
makeInput(SRT srt)#LTS
|
||||
makeInput(SDP sdp)
|
||||
|
||||
if(SRT_LIB)
|
||||
makeInput(TSSRT tssrt with_srt)#LTS
|
||||
|
|
98
lib/rtp.cpp
98
lib/rtp.cpp
|
@ -506,6 +506,7 @@ namespace RTP{
|
|||
rtpSeq = pSNo - 5;
|
||||
first = false;
|
||||
}
|
||||
DONTEVEN_MSG("Received packet #%u, current packet is #%u", pSNo, rtpSeq);
|
||||
if (preBuffer){
|
||||
//If we've buffered the first 5 packets, assume we have the first one known
|
||||
if (packBuffer.size() >= 5){
|
||||
|
@ -583,6 +584,7 @@ namespace RTP{
|
|||
packCount = 0;
|
||||
lastSeq = 0;
|
||||
vp8BufferHasKeyframe = false;
|
||||
curPicParameterSetId = 0;
|
||||
}
|
||||
|
||||
void toDTSC::setProperties(const uint64_t track, const std::string &c, const std::string &t,
|
||||
|
@ -601,7 +603,7 @@ namespace RTP{
|
|||
MP4::AVCC avccbox;
|
||||
avccbox.setPayload(init);
|
||||
spsData.assign(avccbox.getSPS(), avccbox.getSPSLen());
|
||||
ppsData.assign(avccbox.getPPS(), avccbox.getPPSLen());
|
||||
ppsData[curPicParameterSetId].assign(avccbox.getPPS(), avccbox.getPPSLen());
|
||||
h264::sequenceParameterSet sps(spsData.data(), spsData.size());
|
||||
h264::SPSMeta hMeta = sps.getCharacteristics();
|
||||
fps = hMeta.fps;
|
||||
|
@ -669,6 +671,12 @@ namespace RTP{
|
|||
}
|
||||
}
|
||||
}
|
||||
// When there are B-frames, the firstTime can be higher than the current time
|
||||
// causing msTime to become negative and thus overflow
|
||||
if (firstTime > pTime + 1){
|
||||
WARN_MSG("firstTime was higher than current packet time. Readjusting firsTime...");
|
||||
firstTime = pTime + 1;
|
||||
}
|
||||
prevTime = pkt.getTimeStamp();
|
||||
uint64_t msTime = ((uint64_t)pTime - firstTime + 1 + 0x100000000ull * wrapArounds) / multiplier + milliSync;
|
||||
char *pl = (char *)pkt.getPayload();
|
||||
|
@ -844,9 +852,13 @@ namespace RTP{
|
|||
if (fps > 1){
|
||||
// Assume a steady frame rate, clip the timestamp based on frame number.
|
||||
uint64_t frameNo = (ts / (1000.0 / fps)) + 0.5;
|
||||
while (frameNo < packCount){packCount--;}
|
||||
if (frameNo < packCount){
|
||||
packCount = frameNo;
|
||||
}
|
||||
// More than 32 frames behind? We probably skipped something, somewhere...
|
||||
if ((frameNo - packCount) > 32){packCount = frameNo;}
|
||||
if ((frameNo - packCount) > 32){
|
||||
packCount = frameNo;
|
||||
}
|
||||
// After some experimentation, we found that the time offset is the difference between the
|
||||
// frame number and the packet counter, times the frame rate in ms
|
||||
offset = (frameNo - packCount) * (1000.0 / fps);
|
||||
|
@ -981,9 +993,13 @@ namespace RTP{
|
|||
if (fps > 1){
|
||||
// Assume a steady frame rate, clip the timestamp based on frame number.
|
||||
uint64_t frameNo = (currH264Time / (1000.0 / fps)) + 0.5;
|
||||
while (frameNo < packCount){packCount--;}
|
||||
if (frameNo < packCount){
|
||||
packCount = frameNo;
|
||||
}
|
||||
// More than 32 frames behind? We probably skipped something, somewhere...
|
||||
if ((frameNo - packCount) > 32){packCount = frameNo;}
|
||||
if ((frameNo - packCount) > 32){
|
||||
packCount = frameNo;
|
||||
}
|
||||
// After some experimentation, we found that the time offset is the difference between the
|
||||
// frame number and the packet counter, times the frame rate in ms
|
||||
offset = (frameNo - packCount) * (1000.0 / fps);
|
||||
|
@ -1026,58 +1042,50 @@ namespace RTP{
|
|||
spsData.assign(buffer + 4, len - 4);
|
||||
h264::SPSMeta hMeta = sps.getCharacteristics();
|
||||
fps = hMeta.fps;
|
||||
|
||||
MP4::AVCC avccBox;
|
||||
avccBox.setVersion(1);
|
||||
avccBox.setProfile(spsData[1]);
|
||||
avccBox.setCompatibleProfiles(spsData[2]);
|
||||
avccBox.setLevel(spsData[3]);
|
||||
avccBox.setSPSCount(1);
|
||||
avccBox.setSPS(spsData);
|
||||
avccBox.setPPSCount(1);
|
||||
avccBox.setPPS(ppsData);
|
||||
std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize());
|
||||
if (newInit != init){
|
||||
init = newInit;
|
||||
outInit(trackId, init);
|
||||
}
|
||||
}
|
||||
return;
|
||||
case 8: // PPS
|
||||
if (ppsData.size() != len - 4 || memcmp(buffer + 4, ppsData.data(), len - 4) != 0){
|
||||
if (!h264::ppsValidate(buffer+4, len-4)){
|
||||
WARN_MSG("Ignoring invalid PPS packet! (%" PRIu32 "b)", len-4);
|
||||
return;
|
||||
}
|
||||
HIGH_MSG("Updated PPS from RTP data: %" PRIu32 "b", len-4);
|
||||
ppsData.assign(buffer + 4, len - 4);
|
||||
MP4::AVCC avccBox;
|
||||
avccBox.setVersion(1);
|
||||
avccBox.setProfile(spsData[1]);
|
||||
avccBox.setCompatibleProfiles(spsData[2]);
|
||||
avccBox.setLevel(spsData[3]);
|
||||
avccBox.setSPSCount(1);
|
||||
avccBox.setSPS(spsData);
|
||||
avccBox.setPPSCount(1);
|
||||
avccBox.setPPS(ppsData);
|
||||
std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize());
|
||||
if (newInit != init){
|
||||
init = newInit;
|
||||
outInit(trackId, init);
|
||||
// Determine pic_parameter_set_id and check whether the PPS is new or updated
|
||||
{
|
||||
h264::ppsUnit PPS(buffer + 4, len - 4);
|
||||
if (ppsData[PPS.picParameterSetId].size() != len - 4 || memcmp(buffer + 4, ppsData[PPS.picParameterSetId].data(), len - 4) != 0){
|
||||
if (!h264::ppsValidate(buffer+4, len-4)){
|
||||
WARN_MSG("Ignoring invalid PPS packet! (%" PRIu32 "b)", len-4);
|
||||
return;
|
||||
}
|
||||
HIGH_MSG("Updated PPS with ID %li from RTP data", PPS.picParameterSetId);
|
||||
ppsData[PPS.picParameterSetId].assign(buffer + 4, len - 4);
|
||||
}
|
||||
}
|
||||
return;
|
||||
case 5:{
|
||||
//If this is a keyframe and we have no buffer yet, prepend the SPS/PPS
|
||||
if (!h264OutBuffer.size()){
|
||||
// We have a keyframe: prepend SPS/PPS if the pic_parameter_set_id changed or if this is the first keyframe
|
||||
h264::codedSliceUnit keyPiece(buffer + 4, len - 4);
|
||||
if (!h264OutBuffer.size() || keyPiece.picParameterSetId != curPicParameterSetId){
|
||||
curPicParameterSetId = keyPiece.picParameterSetId;
|
||||
// Update meta init data if needed
|
||||
MP4::AVCC avccBox;
|
||||
avccBox.setVersion(1);
|
||||
avccBox.setProfile(spsData[1]);
|
||||
avccBox.setCompatibleProfiles(spsData[2]);
|
||||
avccBox.setLevel(spsData[3]);
|
||||
avccBox.setSPSCount(1);
|
||||
avccBox.setSPS(spsData);
|
||||
avccBox.setPPSCount(1);
|
||||
avccBox.setPPS(ppsData[curPicParameterSetId]);
|
||||
std::string newInit = std::string(avccBox.payload(), avccBox.payloadSize());
|
||||
if (newInit != init){
|
||||
init = newInit;
|
||||
outInit(trackId, init);
|
||||
}
|
||||
// Prepend SPS/PPS
|
||||
char sizeBuffer[4];
|
||||
Bit::htobl(sizeBuffer, spsData.size());
|
||||
h264OutBuffer.append(sizeBuffer, 4);
|
||||
h264OutBuffer.append(spsData.data(), spsData.size());
|
||||
|
||||
Bit::htobl(sizeBuffer, ppsData.size());
|
||||
Bit::htobl(sizeBuffer, ppsData[curPicParameterSetId].size());
|
||||
h264OutBuffer.append(sizeBuffer, 4);
|
||||
h264OutBuffer.append(ppsData.data(), ppsData.size());
|
||||
h264OutBuffer.append(ppsData[curPicParameterSetId].data(), ppsData[curPicParameterSetId].size());
|
||||
}
|
||||
//Note: no return, we still want to buffer the packet itself, below!
|
||||
}
|
||||
|
|
|
@ -179,7 +179,8 @@ namespace RTP{
|
|||
void handleH264Single(uint64_t ts, const char *buffer, const uint32_t len, bool isKey);
|
||||
void handleH264Multi(uint64_t ts, char *buffer, const uint32_t len);
|
||||
std::string spsData; ///< SPS for H264
|
||||
std::string ppsData; ///< PPS for H264
|
||||
uint8_t curPicParameterSetId;
|
||||
std::map<uint8_t,std::string> ppsData; ///< PPS for H264
|
||||
void handleVP8(uint64_t msTime, const char *buffer, const uint32_t len, bool missed, bool hasPadding);
|
||||
Util::ResizeablePointer vp8FrameBuffer; ///< Stores successive VP8 payload data. We always start with the first
|
||||
///< partition; but we might be missing other partitions when they were
|
||||
|
|
114
lib/sdp.cpp
114
lib/sdp.cpp
|
@ -398,6 +398,60 @@ namespace SDP{
|
|||
return true;
|
||||
}
|
||||
|
||||
/// Tries to bind a RTP/RTCP UDP port pair
|
||||
/// \param portInfo port/#ports as found in SDP file
|
||||
/// \param hostInfo host address
|
||||
bool Track::bindUDPPort(std::string portInfo, std::string hostInfo){
|
||||
uint32_t portRTP, portRTCP;
|
||||
|
||||
if (portInfo == "" || hostInfo == ""){
|
||||
WARN_MSG("Can not setup transport to address %s:%s", hostInfo.c_str(), portInfo.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
// Extract port numbers from input string
|
||||
size_t tempPos;
|
||||
tempPos = portInfo.find('/');
|
||||
if (tempPos != std::string::npos){
|
||||
// TODO https://tools.ietf.org/html/rfc4566#section-5.14
|
||||
// bind more ports if theres a /, which indicates the amount of port pairs
|
||||
WARN_MSG("Does not support more than one RTP/RTCP port pair");
|
||||
portInfo = portInfo.substr(0, tempPos);
|
||||
}
|
||||
std::istringstream ( portInfo ) >> portRTP;
|
||||
portRTCP = portRTP + 1;
|
||||
|
||||
// During RTSP streams we get the transport info on setup
|
||||
// in this case the port is set to 0 in the SDP file
|
||||
if (!portRTP){
|
||||
return true;
|
||||
}
|
||||
|
||||
// Since default is set to IPV6, force to AF_UNSPEC
|
||||
data.setSocketFamily(AF_UNSPEC);
|
||||
rtcp.setSocketFamily(AF_UNSPEC);
|
||||
// Test UDP ports
|
||||
int sendbuff = 4 * 1024 * 1024;
|
||||
data.SetDestination(hostInfo, portRTP);
|
||||
setsockopt(data.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
||||
rtcp.SetDestination(hostInfo, portRTCP);
|
||||
setsockopt(rtcp.getSock(), SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff));
|
||||
// Bind sockets
|
||||
portA = data.bind(portRTP, hostInfo);
|
||||
if (portA != portRTP){
|
||||
FAIL_MSG("Server requested RTP port %u, which we couldn't bind", portRTP);
|
||||
return false;
|
||||
}
|
||||
portB = rtcp.bind(portRTCP, hostInfo);
|
||||
if (portB != portRTCP){
|
||||
FAIL_MSG("Server requested RTCP port %u, which we couldn't bind", portRTCP);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/// Gets the rtpInfo for a given DTSC::Track, source identifier and timestamp (in millis).
|
||||
std::string Track::rtpInfo(const DTSC::Meta &M, size_t tid, const std::string &source, uint64_t currentTime){
|
||||
std::stringstream rInfo;
|
||||
|
@ -414,8 +468,14 @@ namespace SDP{
|
|||
|
||||
void State::parseSDP(const std::string &sdp){
|
||||
DONTEVEN_MSG("Parsing %zu-byte SDP", sdp.size());
|
||||
if (!sdp.size()){
|
||||
FAIL_MSG("SDP buffer is empty!");
|
||||
return;
|
||||
}
|
||||
std::stringstream ss(sdp);
|
||||
std::string to;
|
||||
// (UDP) Host will be set when a c= line is read
|
||||
std::string host = "127.0.0.1";
|
||||
size_t tid = INVALID_TRACK_ID;
|
||||
bool nope = true; // true if we have no valid track to fill
|
||||
while (std::getline(ss, to, '\n')){
|
||||
|
@ -423,12 +483,39 @@ namespace SDP{
|
|||
if (to.empty()){continue;}
|
||||
DONTEVEN_MSG("Parsing SDP line: %s", to.c_str());
|
||||
|
||||
// Extract host IP from c= line
|
||||
// c=<nettype> <addrtype> <connection-address>
|
||||
if (to.substr(0, 2) == "c="){
|
||||
// Strip c=
|
||||
std::stringstream words(to.substr(2));
|
||||
std::string item;
|
||||
size_t tempPos;
|
||||
|
||||
// Strip nettype
|
||||
getline(words, item, ' ');
|
||||
// Strip addrtype
|
||||
getline(words, item, ' ');
|
||||
// Get connection address
|
||||
getline(words, item, ' ');
|
||||
// Strip TTL, which is appended as IP/TTL
|
||||
tempPos = item.find('/');
|
||||
if (tempPos != std::string::npos){
|
||||
item = item.substr(0, tempPos);
|
||||
}
|
||||
host = item;
|
||||
}
|
||||
|
||||
// All tracks start with a media line
|
||||
// m=<media> <port>/<number of ports> <proto> <fmt> ...
|
||||
if (to.substr(0, 2) == "m="){
|
||||
nope = true;
|
||||
tid = myMeta->addTrack();
|
||||
|
||||
// Strip m=
|
||||
std::stringstream words(to.substr(2));
|
||||
std::string item;
|
||||
|
||||
// Get media type
|
||||
if (getline(words, item, ' ') && (item == "audio" || item == "video")){
|
||||
myMeta->setType(tid, item);
|
||||
myMeta->setID(tid, tid);
|
||||
|
@ -438,13 +525,22 @@ namespace SDP{
|
|||
tracks.erase(tid);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get port info and bind RTP/RTCP UDP pairs
|
||||
getline(words, item, ' ');
|
||||
if (!tracks[tid].bindUDPPort(item, host) ){
|
||||
FAIL_MSG("Failed to bind ports for given port info: %s", item.c_str());
|
||||
}
|
||||
|
||||
// Get transport protocol
|
||||
if (!getline(words, item, ' ') || item.substr(0, 7) != "RTP/AVP"){
|
||||
WARN_MSG("Media transport not supported: %s", item.c_str());
|
||||
myMeta->removeTrack(tid);
|
||||
tracks.erase(tid);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get media format description
|
||||
if (getline(words, item, ' ')){
|
||||
uint64_t avp_type = JSON::Value(item).asInt();
|
||||
switch (avp_type){
|
||||
|
@ -774,4 +870,22 @@ namespace SDP{
|
|||
tConv[track].addRTP(pkt);
|
||||
}
|
||||
|
||||
/// Re-inits internal variables and removes all tracks from meta
|
||||
void State::reinitSDP(){
|
||||
tConv.clear();
|
||||
size_t trackID;
|
||||
|
||||
for (std::map<long unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
|
||||
trackID = myMeta->getID(it->first);
|
||||
INFO_MSG("Removing track %zu:%s", it->first, myMeta->getTrackIdentifier(it->first).c_str());
|
||||
if (trackID == INVALID_TRACK_ID){
|
||||
WARN_MSG("TrackID was invalid");
|
||||
}
|
||||
else{
|
||||
myMeta->removeTrack(it->first);
|
||||
}
|
||||
}
|
||||
//myMeta->refresh();
|
||||
tracks.clear();
|
||||
}
|
||||
}// namespace SDP
|
||||
|
|
|
@ -14,6 +14,8 @@ namespace SDP{
|
|||
public:
|
||||
Track();
|
||||
std::string generateTransport(uint32_t trackNo, const std::string &dest = "", bool TCPmode = true);
|
||||
/// Tries to bind a RTP/RTCP UDP port pair
|
||||
bool bindUDPPort(std::string portInfo, std::string hostInfo);
|
||||
std::string getParamString(const std::string ¶m) const;
|
||||
uint64_t getParamInt(const std::string ¶m) const;
|
||||
bool parseTransport(const std::string &transport, const std::string &host,
|
||||
|
@ -55,6 +57,10 @@ namespace SDP{
|
|||
size_t getTrackNoForChannel(uint8_t chan);
|
||||
size_t parseSetup(HTTP::Parser &H, const std::string &host, const std::string &source);
|
||||
void handleIncomingRTP(const uint64_t track, const RTP::Packet &pkt);
|
||||
// Sets up the transport from SDP data
|
||||
bool parseTransport(const std::string &sdpString);
|
||||
// Re-inits internal variables and removes all tracks from meta
|
||||
void reinitSDP();
|
||||
|
||||
public:
|
||||
DTSC::Meta *myMeta;
|
||||
|
|
|
@ -1701,6 +1701,7 @@ void Socket::UDPConnection::setSocketFamily(int AF_TYPE){\
|
|||
/// Stores the properties of the receiving end of this UDP socket.
|
||||
/// This will be the receiving end for all SendNow calls.
|
||||
void Socket::UDPConnection::SetDestination(std::string destIp, uint32_t port){
|
||||
DONTEVEN_MSG("Setting destination to %s:%u", destIp.c_str(), port);
|
||||
// UDP sockets can switch between IPv4 and IPv6 on demand.
|
||||
// We change IPv4-mapped IPv6 addresses into IPv4 addresses for Windows-sillyness reasons.
|
||||
if (destIp.substr(0, 7) == "::ffff:"){destIp = destIp.substr(7);}
|
||||
|
|
324
src/input/input_sdp.cpp
Normal file
324
src/input/input_sdp.cpp
Normal file
|
@ -0,0 +1,324 @@
|
|||
#include "input_sdp.h"
|
||||
|
||||
// Will point to current InputSDP obj after constructor is called
|
||||
Mist::InputSDP *classPointer = 0;
|
||||
size_t bytesUp = 0;
|
||||
// CB used to receive DTSC packets back from RTP sorter
|
||||
void incomingPacket(const DTSC::Packet &pkt){
|
||||
classPointer->incoming(pkt);
|
||||
}
|
||||
void insertRTP(const uint64_t track, const RTP::Packet &p){
|
||||
classPointer->incomingRTP(track, p);
|
||||
}
|
||||
|
||||
/// Function used to send RTCP packets over UDP
|
||||
///\param socket A UDP Connection pointer, sent as a void*, to keep portability.
|
||||
///\param data The RTP Packet that needs to be sent
|
||||
///\param len The size of data
|
||||
///\param channel Not used here, but is kept for compatibility with sendTCP
|
||||
void sendUDP(void *socket, const char *data, size_t len, uint8_t channel){
|
||||
((Socket::UDPConnection *)socket)->SendNow(data, len);
|
||||
bytesUp += len;
|
||||
}
|
||||
|
||||
namespace Mist{
|
||||
void InputSDP::incomingRTP(const uint64_t track, const RTP::Packet &p){
|
||||
sdpState.handleIncomingRTP(track, p);
|
||||
}
|
||||
|
||||
InputSDP::InputSDP(Util::Config *cfg) : Input(cfg){
|
||||
setPacketOffset = false;
|
||||
packetOffset = 0;
|
||||
sdpState.myMeta = &meta;
|
||||
sdpState.incomingPacketCallback = incomingPacket;
|
||||
classPointer = this;
|
||||
standAlone = false;
|
||||
hasBork = false;
|
||||
bytesRead = 0;
|
||||
count = 0;
|
||||
capa["name"] = "SDP";
|
||||
capa["desc"] = "This input allows pulling of RTP packets using a provided SDP file";
|
||||
capa["source_match"].append("*.sdp");
|
||||
capa["always_match"].append("*.sdp");
|
||||
capa["priority"] = 9;
|
||||
capa["codecs"][0u][0u].append("H264");
|
||||
capa["codecs"][0u][0u].append("HEVC");
|
||||
capa["codecs"][0u][0u].append("MPEG2");
|
||||
capa["codecs"][0u][0u].append("VP8");
|
||||
capa["codecs"][0u][0u].append("VP9");
|
||||
capa["codecs"][0u][1u].append("AAC");
|
||||
capa["codecs"][0u][1u].append("MP3");
|
||||
capa["codecs"][0u][1u].append("AC3");
|
||||
capa["codecs"][0u][1u].append("ALAW");
|
||||
capa["codecs"][0u][1u].append("ULAW");
|
||||
capa["codecs"][0u][1u].append("PCM");
|
||||
capa["codecs"][0u][1u].append("opus");
|
||||
capa["codecs"][0u][1u].append("MP2");
|
||||
|
||||
JSON::Value option;
|
||||
option["arg"] = "integer";
|
||||
option["long"] = "buffer";
|
||||
option["short"] = "b";
|
||||
option["help"] = "DVR buffer time in ms";
|
||||
option["value"].append(50000);
|
||||
config->addOption("bufferTime", option);
|
||||
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
|
||||
capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in "
|
||||
"milliseconds. This is the time available to seek around in, "
|
||||
"and will automatically be extended to fit whole keyframes "
|
||||
"as well as the minimum duration needed for stable playback.";
|
||||
capa["optional"]["DVR"]["option"] = "--buffer";
|
||||
capa["optional"]["DVR"]["type"] = "uint";
|
||||
capa["optional"]["DVR"]["default"] = 50000;
|
||||
option.null();
|
||||
}
|
||||
|
||||
/// Checks whether the input string ends with .sdp
|
||||
bool InputSDP::checkArguments(){
|
||||
const std::string &inpt = config->getString("input");
|
||||
if (inpt.substr(inpt.length() - 4) != ".sdp"){
|
||||
FAIL_MSG("Expected a SDP file but received: '%s'", inpt.c_str());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Lets URIreader open the SDP file at the requested given location
|
||||
bool InputSDP::openStreamSource(){
|
||||
const std::string &inpt = config->getString("input");
|
||||
reader.open(inpt);
|
||||
// Will return false if it cant open file or it is EOF
|
||||
return reader;
|
||||
}
|
||||
|
||||
/// Gets and parses the SDP file
|
||||
void InputSDP::parseStreamHeader(){
|
||||
if (!reader){
|
||||
FAIL_MSG("Connection lost with input. Could not get stream description!");
|
||||
return;
|
||||
}
|
||||
|
||||
reader.readAll(buffer, bytesRead);
|
||||
HIGH_MSG("Downloaded SDP file (%lu B)", bytesRead);
|
||||
|
||||
// Save old buffer in order to identify changes
|
||||
oldBuffer = strdup(buffer);
|
||||
|
||||
sdpState.reinitSDP();
|
||||
sdpState.parseSDP(buffer);
|
||||
|
||||
INFO_MSG("Stream contains %zu tracks", M.getValidTracks().size());
|
||||
|
||||
if (reader){
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
void InputSDP::closeStreamSource(){
|
||||
if (reader){
|
||||
reader.close();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/// Compare two c strings char by char
|
||||
/// \return false if not equals (or different in size), else true
|
||||
bool InputSDP::compareStrings(char* str1, char* str2){
|
||||
size_t strlen1 = strlen(str1);
|
||||
size_t strlen2 = strlen(str2);
|
||||
|
||||
if (strlen1 != strlen2){
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int k = 0; k < strlen1; k++){
|
||||
if(str1[k] != str2[k]){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Checks if there are updates available to the SDP file
|
||||
// and updates the SDP file accordingly
|
||||
bool InputSDP::updateSDP(){
|
||||
// Reset error flag
|
||||
hasBork = false;
|
||||
// Reopen the file if necessary
|
||||
if (!reader){
|
||||
const std::string &inpt = config->getString("input");
|
||||
reader.open(inpt);
|
||||
}
|
||||
// If the file has dissappeared the stream must have stopped
|
||||
if (!reader){
|
||||
WARN_MSG("SDP file no longer available. Cannot update SDP info.");
|
||||
return false;
|
||||
}
|
||||
// Re-read SDP file
|
||||
reader.readAll(buffer, bytesRead);
|
||||
// Re-init SPD state iff contents have changed
|
||||
INFO_MSG("Downloaded SDP file (%lu B)", bytesRead);
|
||||
if (bytesRead != 0){
|
||||
if (!compareStrings(oldBuffer, buffer)){
|
||||
INFO_MSG("SDP contents have changed. Reparsing SDP file");
|
||||
// Save old buffer in order to identify changes
|
||||
oldBuffer = strdup(buffer);
|
||||
|
||||
sdpState.reinitSDP();
|
||||
sdpState.parseSDP(buffer);
|
||||
|
||||
INFO_MSG("Stream contains %zu tracks", M.getValidTracks().size());
|
||||
}
|
||||
else{
|
||||
FAIL_MSG("Unable to parse stream data for current SDP file. Quitting...");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else{
|
||||
FAIL_MSG("SDP file no longer available. Quitting...");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
// Close the file so that we can reopen it on err
|
||||
if (reader){
|
||||
reader.close();
|
||||
}
|
||||
|
||||
// Notify Meta of changes to tracks
|
||||
meta.refresh();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Updates stats and quits if parsePacket returns false
|
||||
void InputSDP::streamMainLoop(){
|
||||
Comms::Statistics statComm;
|
||||
uint64_t startTime = Util::epoch();
|
||||
uint64_t lastSecs = 0;
|
||||
// Get RTP packets from UDP socket and stop if this fails
|
||||
while (keepAlive() && parsePacket()){
|
||||
uint64_t currSecs = Util::bootSecs();
|
||||
if (lastSecs != currSecs){
|
||||
lastSecs = currSecs;
|
||||
// Connect to stats for INPUT detection
|
||||
statComm.reload();
|
||||
if (statComm){
|
||||
if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){
|
||||
config->is_active = false;
|
||||
Util::logExitReason("received shutdown request from controller");
|
||||
return;
|
||||
}
|
||||
uint64_t now = Util::bootSecs();
|
||||
statComm.setNow(now);
|
||||
statComm.setCRC(getpid());
|
||||
statComm.setStream(streamName);
|
||||
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
|
||||
statComm.setDown(bytesRead);
|
||||
statComm.setUp(bytesUp);
|
||||
statComm.setTime(now - startTime);
|
||||
statComm.setLastSecond(0);
|
||||
statComm.setHost(getConnectedBinHost());
|
||||
}
|
||||
}
|
||||
// If the error flag is raised or we are lacking data, try to recover
|
||||
if (count > 5 || hasBork) {
|
||||
if (!updateSDP()){
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief Passes incoming RTP packets to sorter
|
||||
/// \return False if we cannot recover and should quit. Else returns True
|
||||
bool InputSDP::parsePacket(){
|
||||
uint32_t waitTime = 200;
|
||||
bool receivedPacket = false;
|
||||
// How often to send RTCP receiver requests in seconds
|
||||
const uint32_t rtcpInterval = 7;
|
||||
for (std::map<size_t, SDP::Track>::iterator it = sdpState.tracks.begin();
|
||||
it != sdpState.tracks.end(); ++it){
|
||||
|
||||
// Get RTP socket for selected track
|
||||
Socket::UDPConnection &s = it->second.data;
|
||||
it->second.sorter.setCallback(it->first, insertRTP);
|
||||
|
||||
// Get RTP packets
|
||||
while (s.Receive()){
|
||||
count = 0;
|
||||
receivedPacket = true;
|
||||
bytesRead += (s.data.size());
|
||||
RTP::Packet pack(s.data, s.data.size());
|
||||
|
||||
// Init local and remote SSRC if it was not set
|
||||
if (!it->second.theirSSRC){
|
||||
it->second.theirSSRC = pack.getSSRC();
|
||||
}
|
||||
if (!currentSSRC[it->first]){
|
||||
currentSSRC[it->first] = pack.getSSRC();
|
||||
}
|
||||
// If we still have some packets from the old track in the socket buffer, skip it
|
||||
if (oldSSRC[it->first] == pack.getSSRC()){
|
||||
continue;
|
||||
}
|
||||
// Verify if the SSRC has changed: indicating that a new video is being sent
|
||||
// Either recover, reload or quit at this point
|
||||
if (currentSSRC[it->first] != pack.getSSRC()){
|
||||
WARN_MSG("Sorter for the current track has encountered an error: current SSRC has changed from %u to %u. Trying to recover...", currentSSRC[it->first], pack.getSSRC());
|
||||
oldSSRC[it->first] = currentSSRC[it->first];
|
||||
hasBork = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Let sorter handle RTP specifics
|
||||
it->second.sorter.addPacket(pack);
|
||||
DONTEVEN_MSG("Added %zu B RTP packet to buffer with start time %u and SSRC %u: %s", bytesRead, pack.getTimeStamp(), pack.getSSRC(), pack.toString().c_str());
|
||||
}
|
||||
// Send RTCP packet back to host
|
||||
if (Util::bootSecs() > it->second.rtcpSent + rtcpInterval){
|
||||
it->second.rtcpSent = Util::bootSecs();
|
||||
it->second.pack.sendRTCP_RR(it->second, sendUDP);
|
||||
}
|
||||
}
|
||||
if (!receivedPacket){
|
||||
Util::sleep(waitTime);
|
||||
count++;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Buffers incoming DTSC packets (from SDP tracks -> RTP sorter)
|
||||
void InputSDP::incoming(const DTSC::Packet &pkt){
|
||||
if (!M.getBootMsOffset()){
|
||||
meta.setBootMsOffset(Util::bootMS() - pkt.getTime());
|
||||
packetOffset = 0;
|
||||
setPacketOffset = true;
|
||||
}else if (!setPacketOffset){
|
||||
packetOffset = (Util::bootMS() - pkt.getTime()) - M.getBootMsOffset();
|
||||
setPacketOffset = true;
|
||||
}
|
||||
static DTSC::Packet newPkt;
|
||||
char *pktData;
|
||||
size_t pktDataLen;
|
||||
pkt.getString("data", pktData, pktDataLen);
|
||||
size_t idx = M.trackIDToIndex(pkt.getTrackId(), getpid());
|
||||
|
||||
HIGH_MSG("Buffering new pkt for track %zu->%zu at offset %zu and time %zu", pkt.getTrackId(), idx, packetOffset, pkt.getTime());
|
||||
|
||||
if (idx == INVALID_TRACK_ID){
|
||||
INFO_MSG("Invalid index for track number %zu", pkt.getTrackId());
|
||||
}else{
|
||||
if (!userSelect.count(idx)){
|
||||
WARN_MSG("Reloading track %zu, index %zu", pkt.getTrackId(), idx);
|
||||
userSelect[idx].reload(streamName, idx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
|
||||
}
|
||||
if (userSelect[idx].getStatus() == COMM_STATUS_REQDISCONNECT){
|
||||
Util::logExitReason("buffer requested shutdown");
|
||||
}
|
||||
}
|
||||
|
||||
bufferLivePacket(pkt.getTime() + packetOffset, pkt.getInt("offset"), idx, pktData,
|
||||
pktDataLen, 0, pkt.getFlag("keyframe"));
|
||||
}
|
||||
}// namespace Mist
|
70
src/input/input_sdp.h
Normal file
70
src/input/input_sdp.h
Normal file
|
@ -0,0 +1,70 @@
|
|||
#include "input.h"
|
||||
#include <mist/dtsc.h>
|
||||
#include <mist/urireader.h>
|
||||
#include <mist/nal.h>
|
||||
#include <mist/rtp.h>
|
||||
#include <mist/sdp.h>
|
||||
#include <mist/url.h>
|
||||
#include <set>
|
||||
#include <string>
|
||||
|
||||
namespace Mist{
|
||||
class InputSDP : public Input{
|
||||
public:
|
||||
InputSDP(Util::Config *cfg);
|
||||
|
||||
// Buffers incoming DTSC packets (from SDP tracks -> RTP sorter)
|
||||
void incoming(const DTSC::Packet &pkt);
|
||||
|
||||
void incomingRTP(const uint64_t track, const RTP::Packet &p);
|
||||
|
||||
// Compare two c strings char by char
|
||||
bool compareStrings(char* str1, char* str2);
|
||||
|
||||
protected:
|
||||
void streamMainLoop();
|
||||
bool checkArguments();
|
||||
// Overwrite default functions from input
|
||||
bool needHeader(){return false;}
|
||||
bool readHeader(){return true;}
|
||||
// Force to stream > serve
|
||||
bool needsLock(){return false;}
|
||||
// Open connection with input
|
||||
bool openStreamSource();
|
||||
void closeStreamSource();
|
||||
|
||||
// Gets and parses SDP file
|
||||
void parseStreamHeader();
|
||||
// Passes incoming RTP packets to sorter
|
||||
bool parsePacket();
|
||||
// Checks if there are updates available to the SDP file
|
||||
// and updates the SDP file accordingly
|
||||
bool updateSDP();
|
||||
|
||||
// Used to read SDP file
|
||||
HTTP::URIReader reader;
|
||||
// Contains track info
|
||||
SDP::State sdpState;
|
||||
// Total bytes downloaded
|
||||
size_t bytesRead;
|
||||
// Local buffer to read into
|
||||
char* buffer;
|
||||
// Copy of parsed SDP file in order to detect changes
|
||||
char* oldBuffer;
|
||||
|
||||
bool setPacketOffset;
|
||||
int64_t packetOffset;
|
||||
|
||||
// Count amount of pulls without a packet
|
||||
int count;
|
||||
// Flag to re-init SDP state
|
||||
bool hasBork;
|
||||
|
||||
// Map SSRC to tracks in order to recognize when video source changes
|
||||
std::map<size_t, uint32_t> currentSSRC;
|
||||
// Map prev SSRC in order to detect old packages to ignore
|
||||
std::map<size_t, uint32_t> oldSSRC;
|
||||
};
|
||||
}// namespace Mist
|
||||
|
||||
typedef Mist::InputSDP mistIn;
|
Loading…
Add table
Reference in a new issue