From 856043ed5557dd3c55fe79dafd4171e5b4ea777c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 28 Aug 2016 13:12:11 +0200 Subject: [PATCH] Implemented proper tsudp:// protocol push output, removed MistOutTSPush --- CMakeLists.txt | 1 - src/output/output_ts.cpp | 58 ++++++++++++++++++++- src/output/output_ts.h | 8 ++- src/output/output_ts_push.cpp | 96 ----------------------------------- src/output/output_ts_push.h | 18 ------- 5 files changed, 64 insertions(+), 117 deletions(-) delete mode 100644 src/output/output_ts_push.cpp delete mode 100644 src/output/output_ts_push.h diff --git a/CMakeLists.txt b/CMakeLists.txt index d5343e57..7e7a89c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -376,7 +376,6 @@ makeOutput(HTTPTS httpts http ts) makeOutput(HLS hls http ts) makeOutput(Push push)#LTS makeOutput(RTSP rtsp)#LTS -makeOutput(TSPush ts_push ts)#LTS makeOutput(DASH dash_mp4 http)#LTS if (DEFINED WITH_SANITY ) diff --git a/src/output/output_ts.cpp b/src/output/output_ts.cpp index dc547df0..6487f4c3 100644 --- a/src/output/output_ts.cpp +++ b/src/output/output_ts.cpp @@ -7,8 +7,39 @@ namespace Mist { streamName = config->getString("streamname"); parseData = true; wantRequest = false; + pushOut = false; initialize(); std::string tracks = config->getString("tracks"); + if (config->getString("target").size()){ + std::string target = config->getString("target"); + if (target.substr(0,8) != "tsudp://"){ + FAIL_MSG("Target %s must begin with tsudp://, aborting", target.c_str()); + parseData = false; + myConn.close(); + return; + } + //strip beginning off URL + target.erase(0, 8); + if (target.find(':') == std::string::npos){ + FAIL_MSG("Target %s must contain a port, aborting", target.c_str()); + parseData = false; + myConn.close(); + return; + } + pushOut = true; + udpSize = 5; + sendRepeatingHeaders = true; + if (target.find('?') != std::string::npos){ + std::map vars; + HTTP::parseVars(target.substr(target.find('?')+1), vars); + if (vars.count("tracks")){tracks = vars["tracks"];} + if (vars.count("pkts")){udpSize = atoi(vars["pkts"].c_str());} + } + packetBuffer.reserve(188*udpSize); + int port = atoi(target.substr(target.find(":") + 1).c_str()); + target.erase(target.find(":"));//strip all after the colon + pushSock.SetDestination(target, port); + } unsigned int currTrack = 0; //loop over tracks, add any found track IDs to selectedTracks if (tracks != ""){ @@ -54,9 +85,34 @@ namespace Mist { capa["codecs"][0u][1u].append("AC3"); cfg->addConnectorOptions(8888, capa); config = cfg; + capa["push_urls"].append("tsudp://*"); + + JSON::Value opt; + opt["arg"] = "string"; + opt["default"] = ""; + opt["arg_num"] = 1ll; + opt["help"] = "Target tsudp:// URL to push out towards."; + cfg->addOption("target", opt); } void OutTS::sendTS(const char * tsData, unsigned int len){ - myConn.SendNow(tsData, len); + if (pushOut){ + static int curFilled = 0; + if (curFilled == udpSize){ + pushSock.SendNow(packetBuffer); + packetBuffer.clear(); + packetBuffer.reserve(udpSize * len); + curFilled = 0; + } + packetBuffer.append(tsData, len); + curFilled ++; + }else{ + myConn.SendNow(tsData, len); + } } + + bool OutTS::listenMode(){ + return !(config->getString("target").size()); + } + } diff --git a/src/output/output_ts.h b/src/output/output_ts.h index dfc55be3..a307662f 100644 --- a/src/output/output_ts.h +++ b/src/output/output_ts.h @@ -6,7 +6,13 @@ namespace Mist { OutTS(Socket::Connection & conn); ~OutTS(); static void init(Util::Config * cfg); - void sendTS(const char * tsData, unsigned int len=188); + void sendTS(const char * tsData, unsigned int len=188); + static bool listenMode(); + private: + unsigned int udpSize; + bool pushOut; + std::string packetBuffer; + Socket::UDPConnection pushSock; }; } diff --git a/src/output/output_ts_push.cpp b/src/output/output_ts_push.cpp deleted file mode 100644 index d00f346e..00000000 --- a/src/output/output_ts_push.cpp +++ /dev/null @@ -1,96 +0,0 @@ -#include "output_ts_push.h" -#include -#include - -namespace Mist { - OutTSPush::OutTSPush(Socket::Connection & conn) : TSOutput(conn){ - streamName = config->getString("streamname"); - parseData = true; - wantRequest = false; - sendRepeatingHeaders = true; - initialize(); - std::string tracks = config->getString("tracks"); - unsigned int currTrack = 0; - //loop over tracks, add any found track IDs to selectedTracks - if (tracks != ""){ - selectedTracks.clear(); - for (unsigned int i = 0; i < tracks.size(); ++i){ - if (tracks[i] >= '0' && tracks[i] <= '9'){ - currTrack = currTrack*10 + (tracks[i] - '0'); - }else{ - if (currTrack > 0){ - selectedTracks.insert(currTrack); - } - currTrack = 0; - } - } - if (currTrack > 0){ - selectedTracks.insert(currTrack); - } - } - - //For udp pushing, 7 ts packets a time - packetBuffer.reserve(config->getInteger("udpsize") * 188); - std::string host = config->getString("destination"); - if (host.substr(0, 6) == "udp://"){ - host = host.substr(6); - } - int port = atoi(host.substr(host.find(":") + 1).c_str()); - host = host.substr(0, host.find(":")); - pushSock.SetDestination(host, port); - } - - OutTSPush::~OutTSPush() {} - - void OutTSPush::init(Util::Config * cfg){ - Output::init(cfg); - capa["name"] = "TSPush"; - capa["desc"] = "Push raw MPEG/TS over a TCP or UDP socket."; - capa["deps"] = ""; - capa["required"]["streamname"]["name"] = "Stream"; - capa["required"]["streamname"]["help"] = "What streamname to serve. For multiple streams, add this protocol multiple times using different ports."; - capa["required"]["streamname"]["type"] = "str"; - capa["required"]["streamname"]["option"] = "--stream"; - capa["required"]["streamname"]["short"] = "s"; - capa["required"]["destination"]["name"] = "Destination"; - capa["required"]["destination"]["help"] = "Where to push to, in the format protocol://hostname:port. Ie: udp://127.0.0.1:9876"; - capa["required"]["destination"]["type"] = "str"; - capa["required"]["destination"]["option"] = "--destination"; - capa["required"]["destination"]["short"] = "D"; - capa["required"]["udpsize"]["name"] = "UDP Size"; - capa["required"]["udpsize"]["help"] = "The number of TS packets to push in a single UDP datagram"; - capa["required"]["udpsize"]["type"] = "uint"; - capa["required"]["udpsize"]["default"] = 5; - capa["required"]["udpsize"]["option"] = "--udpsize"; - capa["required"]["udpsize"]["short"] = "u"; - capa["optional"]["tracks"]["name"] = "Tracks"; - capa["optional"]["tracks"]["help"] = "The track IDs of the stream that this connector will transmit separated by spaces"; - capa["optional"]["tracks"]["type"] = "str"; - capa["optional"]["tracks"]["option"] = "--tracks"; - capa["optional"]["tracks"]["short"] = "t"; - capa["optional"]["tracks"]["default"] = ""; - capa["codecs"][0u][0u].append("HEVC"); - capa["codecs"][0u][0u].append("H264"); - capa["codecs"][0u][1u].append("AAC"); - capa["codecs"][0u][1u].append("MP3"); - cfg->addBasicConnectorOptions(capa); - config = cfg; - } - - void OutTSPush::fillBuffer(const char * data, size_t dataLen){ - static int curFilled = 0; - if (curFilled == config->getInteger("udpsize")){ - pushSock.SendNow(packetBuffer); - packetBuffer.clear(); - packetBuffer.reserve(config->getInteger("udpsize") * 188); - curFilled = 0; - } - packetBuffer += std::string(data, 188); - curFilled ++; - } - - void OutTSPush::sendTS(const char * tsData, unsigned int len){ - fillBuffer(tsData, len); - } - -} diff --git a/src/output/output_ts_push.h b/src/output/output_ts_push.h deleted file mode 100644 index e9f4ee08..00000000 --- a/src/output/output_ts_push.h +++ /dev/null @@ -1,18 +0,0 @@ -#include "output_ts_base.h" - -namespace Mist { - class OutTSPush : public TSOutput{ - public: - OutTSPush(Socket::Connection & conn); - ~OutTSPush(); - static void init(Util::Config * cfg); - static bool listenMode(){return false;} - void sendTS(const char * tsData, unsigned int len=188); - protected: - void fillBuffer(const char * data, size_t dataLen); - std::string packetBuffer; - Socket::UDPConnection pushSock; - }; -} - -typedef Mist::OutTSPush mistOut;