Implemented proper tsudp:// protocol push output, removed MistOutTSPush

This commit is contained in:
Thulinma 2016-08-28 13:12:11 +02:00
parent 172bdabf36
commit 856043ed55
5 changed files with 64 additions and 117 deletions

View file

@ -376,7 +376,6 @@ makeOutput(HTTPTS httpts http ts)
makeOutput(HLS hls http ts)
makeOutput(Push push)#LTS
makeOutput(RTSP rtsp)#LTS
makeOutput(TSPush ts_push ts)#LTS
makeOutput(DASH dash_mp4 http)#LTS
if (DEFINED WITH_SANITY )

View file

@ -7,8 +7,39 @@ namespace Mist {
streamName = config->getString("streamname");
parseData = true;
wantRequest = false;
pushOut = false;
initialize();
std::string tracks = config->getString("tracks");
if (config->getString("target").size()){
std::string target = config->getString("target");
if (target.substr(0,8) != "tsudp://"){
FAIL_MSG("Target %s must begin with tsudp://, aborting", target.c_str());
parseData = false;
myConn.close();
return;
}
//strip beginning off URL
target.erase(0, 8);
if (target.find(':') == std::string::npos){
FAIL_MSG("Target %s must contain a port, aborting", target.c_str());
parseData = false;
myConn.close();
return;
}
pushOut = true;
udpSize = 5;
sendRepeatingHeaders = true;
if (target.find('?') != std::string::npos){
std::map<std::string, std::string> vars;
HTTP::parseVars(target.substr(target.find('?')+1), vars);
if (vars.count("tracks")){tracks = vars["tracks"];}
if (vars.count("pkts")){udpSize = atoi(vars["pkts"].c_str());}
}
packetBuffer.reserve(188*udpSize);
int port = atoi(target.substr(target.find(":") + 1).c_str());
target.erase(target.find(":"));//strip all after the colon
pushSock.SetDestination(target, port);
}
unsigned int currTrack = 0;
//loop over tracks, add any found track IDs to selectedTracks
if (tracks != ""){
@ -54,9 +85,34 @@ namespace Mist {
capa["codecs"][0u][1u].append("AC3");
cfg->addConnectorOptions(8888, capa);
config = cfg;
capa["push_urls"].append("tsudp://*");
JSON::Value opt;
opt["arg"] = "string";
opt["default"] = "";
opt["arg_num"] = 1ll;
opt["help"] = "Target tsudp:// URL to push out towards.";
cfg->addOption("target", opt);
}
void OutTS::sendTS(const char * tsData, unsigned int len){
myConn.SendNow(tsData, len);
if (pushOut){
static int curFilled = 0;
if (curFilled == udpSize){
pushSock.SendNow(packetBuffer);
packetBuffer.clear();
packetBuffer.reserve(udpSize * len);
curFilled = 0;
}
packetBuffer.append(tsData, len);
curFilled ++;
}else{
myConn.SendNow(tsData, len);
}
}
bool OutTS::listenMode(){
return !(config->getString("target").size());
}
}

View file

@ -7,6 +7,12 @@ namespace Mist {
~OutTS();
static void init(Util::Config * cfg);
void sendTS(const char * tsData, unsigned int len=188);
static bool listenMode();
private:
unsigned int udpSize;
bool pushOut;
std::string packetBuffer;
Socket::UDPConnection pushSock;
};
}

View file

@ -1,96 +0,0 @@
#include "output_ts_push.h"
#include <mist/http_parser.h>
#include <mist/defines.h>
namespace Mist {
OutTSPush::OutTSPush(Socket::Connection & conn) : TSOutput(conn){
streamName = config->getString("streamname");
parseData = true;
wantRequest = false;
sendRepeatingHeaders = true;
initialize();
std::string tracks = config->getString("tracks");
unsigned int currTrack = 0;
//loop over tracks, add any found track IDs to selectedTracks
if (tracks != ""){
selectedTracks.clear();
for (unsigned int i = 0; i < tracks.size(); ++i){
if (tracks[i] >= '0' && tracks[i] <= '9'){
currTrack = currTrack*10 + (tracks[i] - '0');
}else{
if (currTrack > 0){
selectedTracks.insert(currTrack);
}
currTrack = 0;
}
}
if (currTrack > 0){
selectedTracks.insert(currTrack);
}
}
//For udp pushing, 7 ts packets a time
packetBuffer.reserve(config->getInteger("udpsize") * 188);
std::string host = config->getString("destination");
if (host.substr(0, 6) == "udp://"){
host = host.substr(6);
}
int port = atoi(host.substr(host.find(":") + 1).c_str());
host = host.substr(0, host.find(":"));
pushSock.SetDestination(host, port);
}
OutTSPush::~OutTSPush() {}
void OutTSPush::init(Util::Config * cfg){
Output::init(cfg);
capa["name"] = "TSPush";
capa["desc"] = "Push raw MPEG/TS over a TCP or UDP socket.";
capa["deps"] = "";
capa["required"]["streamname"]["name"] = "Stream";
capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add this protocol multiple times using different ports.";
capa["required"]["streamname"]["type"] = "str";
capa["required"]["streamname"]["option"] = "--stream";
capa["required"]["streamname"]["short"] = "s";
capa["required"]["destination"]["name"] = "Destination";
capa["required"]["destination"]["help"] = "Where to push to, in the format protocol://hostname:port. Ie: udp://127.0.0.1:9876";
capa["required"]["destination"]["type"] = "str";
capa["required"]["destination"]["option"] = "--destination";
capa["required"]["destination"]["short"] = "D";
capa["required"]["udpsize"]["name"] = "UDP Size";
capa["required"]["udpsize"]["help"] = "The number of TS packets to push in a single UDP datagram";
capa["required"]["udpsize"]["type"] = "uint";
capa["required"]["udpsize"]["default"] = 5;
capa["required"]["udpsize"]["option"] = "--udpsize";
capa["required"]["udpsize"]["short"] = "u";
capa["optional"]["tracks"]["name"] = "Tracks";
capa["optional"]["tracks"]["help"] = "The track IDs of the stream that this connector will transmit separated by spaces";
capa["optional"]["tracks"]["type"] = "str";
capa["optional"]["tracks"]["option"] = "--tracks";
capa["optional"]["tracks"]["short"] = "t";
capa["optional"]["tracks"]["default"] = "";
capa["codecs"][0u][0u].append("HEVC");
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][1u].append("AAC");
capa["codecs"][0u][1u].append("MP3");
cfg->addBasicConnectorOptions(capa);
config = cfg;
}
void OutTSPush::fillBuffer(const char * data, size_t dataLen){
static int curFilled = 0;
if (curFilled == config->getInteger("udpsize")){
pushSock.SendNow(packetBuffer);
packetBuffer.clear();
packetBuffer.reserve(config->getInteger("udpsize") * 188);
curFilled = 0;
}
packetBuffer += std::string(data, 188);
curFilled ++;
}
void OutTSPush::sendTS(const char * tsData, unsigned int len){
fillBuffer(tsData, len);
}
}

View file

@ -1,18 +0,0 @@
#include "output_ts_base.h"
namespace Mist {
class OutTSPush : public TSOutput{
public:
OutTSPush(Socket::Connection & conn);
~OutTSPush();
static void init(Util::Config * cfg);
static bool listenMode(){return false;}
void sendTS(const char * tsData, unsigned int len=188);
protected:
void fillBuffer(const char * data, size_t dataLen);
std::string packetBuffer;
Socket::UDPConnection pushSock;
};
}
typedef Mist::OutTSPush mistOut;