From 08953540f6ac32e3259d396cc446dc3d8a5eb1a1 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 2 Oct 2014 15:38:59 +0200 Subject: [PATCH] New input starting method based on JSON capabilities. --- lib/dtsc.h | 1 + lib/dtscmeta.cpp | 52 ++++++++++-- lib/ftp.cpp | 10 --- lib/stream.cpp | 205 +++++++++++++++++++++++++---------------------- lib/stream.h | 10 +-- 5 files changed, 159 insertions(+), 119 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index 9ff6b4a7..a764d44c 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -79,6 +79,7 @@ namespace DTSC { Scan getMember(const char * indice); Scan getMember(const char * indice, const unsigned int ind_len); Scan getIndice(unsigned int num); + std::string getIndiceName(unsigned int num); unsigned int getSize(); char getType(); diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 393e6f09..1e049939 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -506,6 +506,33 @@ namespace DTSC { return Scan(); } + /// Returns the name of the num-th member of this object. + /// Returns an empty string on error or when not an object. + std::string Scan::getIndiceName(unsigned int num) { + if (getType() == DTSC_OBJ || getType() == DTSC_CON) { + char * i = p + 1; + unsigned int arr_indice = 0; + //object, scan contents + while (i[0] + i[1] != 0 && i < p + len) { //while not encountering 0x0000 (we assume 0x0000EE) + if (i + 2 >= p + len) { + return "";//out of packet! + } + unsigned int strlen = i[0] * 256 + i[1]; + i += 2; + if (arr_indice == num) { + return std::string(i, strlen); + } else { + arr_indice++; + i = skipDTSC(i + strlen, p + len); + if (!i) { + return ""; + } + } + } + } + return ""; + } + /// Returns the first byte of this DTSC value, or 0 on error. char Scan::getType() { if (!p) { @@ -555,13 +582,26 @@ namespace DTSC { } /// Returns the string value of this DTSC string value. - /// Uses getString internally, does no conversion. + /// Uses getString internally, if a string. + /// Converts integer values to strings. /// Returns an empty string on error. std::string Scan::asString() { - char * str; - unsigned int strlen; - getString(str, strlen); - return std::string(str, strlen); + switch (getType()) { + case DTSC_INT:{ + std::stringstream st; + st << asInt(); + return st.str(); + } + break; + case DTSC_STR:{ + char * str; + unsigned int strlen; + getString(str, strlen); + return std::string(str, strlen); + } + break; + } + return ""; } /// Sets result to a pointer to the string, and strlen to the lenght of it. @@ -585,7 +625,7 @@ namespace DTSC { JSON::Value Scan::asJSON(){ JSON::Value result; unsigned int i = 0; - JSON::fromDTMI2((const unsigned char*)p, len, i, result); + JSON::fromDTMI((const unsigned char*)p, len, i, result); return result; } diff --git a/lib/ftp.cpp b/lib/ftp.cpp index 96ebe17c..286a8472 100644 --- a/lib/ftp.cpp +++ b/lib/ftp.cpp @@ -22,16 +22,6 @@ FTP::User::User(Socket::Connection NewConnection, std::map-priority/source_match + DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); + DTSC::Scan input; + unsigned int input_size = inputs.getSize(); + for (unsigned int i = 0; i < input_size; ++i){ + input = inputs.getIndice(i); + + //if match voor current stream && priority is hoger dan wat we al hebben + if (curPrio < input.getMember("priority").asInt()){ + std::string source = input.getMember("source_match").asString(); + std::string front = source.substr(0,source.find('*')); + std::string back = source.substr(source.find('*')+1); + DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str()); + + if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ + player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString(); + curPrio = input.getMember("priority").asInt(); + selected = true; + } } } - DEBUG_MSG(DLVL_ERROR, "Stream not found: %s", streamname.c_str()); - return false; -} + + if (!selected){ + configLock.post();//unlock the config semaphore + FAIL_MSG("No compatible input found for stream %s: %s", streamname.c_str(), filename.c_str()); + return false; + } -/// Create a stream on the system. -/// Filters the streamname, removing invalid characters and -/// converting all letters to lowercase. -/// If a '?' character is found, everything following that character is deleted. -Socket::Server Util::Stream::makeLive(std::string streamname) { - sanitizeName(streamname); - std::string loc = getTmpFolder() + "stream_" + streamname; - //create and return the Socket::Server - return Socket::Server(loc); + //copy the neccessary arguments to separate storage so we can unlock the config semaphore safely + std::map str_args; + //check required parameters + DTSC::Scan required = input.getMember("required"); + unsigned int req_size = required.getSize(); + for (unsigned int i = 0; i < req_size; ++i){ + std::string opt = required.getIndiceName(i); + if (!stream_cfg.getMember(opt)){ + configLock.post();//unlock the config semaphore + FAIL_MSG("Required parameter %s for stream %s missing", opt.c_str(), streamname.c_str()); + return false; + } + str_args[required.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString(); + } + //check optional parameters + DTSC::Scan optional = input.getMember("optional"); + unsigned int opt_size = optional.getSize(); + for (unsigned int i = 0; i < opt_size; ++i){ + std::string opt = optional.getIndiceName(i); + if (stream_cfg.getMember(opt)){ + str_args[optional.getIndice(i).getMember("option").asString()] = stream_cfg.getMember(opt).asString(); + } + } + + //finally, unlock the config semaphore + configLock.post(); + + INFO_MSG("Starting %s -p -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); + char * argv[30] = {(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()}; + int argNum = 3; + std::string debugLvl; + if (Util::Config::printDebugLevel != DEBUG && !str_args.count("--debug")){ + debugLvl = JSON::Value((long long)Util::Config::printDebugLevel).asString(); + argv[++argNum] = (char *)"--debug"; + argv[++argNum] = (char *)debugLvl.c_str(); + } + for (std::map::iterator it = str_args.begin(); it != str_args.end(); ++it){ + argv[++argNum] = (char *)it->first.c_str(); + argv[++argNum] = (char *)it->second.c_str(); + } + argv[++argNum] = (char *)0; + + int pid = 1; + if (forkFirst){ + pid = fork(); + if (pid == -1) { + FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno)); + return false; + } + } + if (pid == 0){ + execvp(argv[0], argv); + FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); + _exit(42); + } + return true; } diff --git a/lib/stream.h b/lib/stream.h index 259d39e0..1db6c471 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -7,12 +7,6 @@ namespace Util { std::string getTmpFolder(); - class Stream { - public: - static void sanitizeName(std::string & streamname); - static bool getLive(std::string streamname); - static bool getVod(std::string filename, std::string streamname); - static bool getStream(std::string streamname); - static Socket::Server makeLive(std::string streamname); - }; + void sanitizeName(std::string & streamname); + bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true); }