From 4c9c6fa7baa2d8cac7f23ad9fe83f3b12b35732c Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 30 May 2016 15:17:54 +0200 Subject: [PATCH 1/7] Backported various little edits from Pro edition. --- .gitignore | 1 + CMakeLists.txt | 6 +- Doxyfile.in | 2 +- lib/CMakeLists.txt | 69 --- lib/bitfields.h | 2 +- lib/config.cpp | 207 +++---- lib/config.h | 5 +- lib/defines.h | 26 +- lib/dtsc.cpp | 58 +- lib/dtsc.h | 45 +- lib/dtscmeta.cpp | 218 +++++-- lib/flv_tag.cpp | 12 + lib/h264.cpp | 10 +- lib/h264.h | 5 +- lib/http_parser.cpp | 18 +- lib/http_parser.h | 5 +- lib/mp4.cpp | 14 +- lib/mp4_generic.cpp | 11 +- lib/mp4_generic.h | 3 +- lib/mp4_ms.cpp | 67 ++ lib/mp4_ms.h | 13 + lib/procs.cpp | 136 +++- lib/procs.h | 15 +- lib/rtmpchunks.cpp | 14 +- lib/rtmpchunks.h | 4 + lib/shared_memory.cpp | 153 ++++- lib/shared_memory.h | 16 +- lib/socket.cpp | 10 +- lib/stream.cpp | 59 +- lib/stream.h | 3 + lib/ts_packet.cpp | 123 ++-- lib/ts_packet.h | 16 +- lib/util.cpp | 40 ++ lib/util.h | 6 + lsp/mist.js | 4 +- src/CMakeLists.txt | 101 --- src/analysers/dtsc_analyser.cpp | 9 +- src/analysers/mp4_analyser.cpp | 7 + src/controller/controller.cpp | 34 +- src/controller/controller_api.cpp | 5 +- src/controller/controller_capabilities.cpp | 4 + src/controller/controller_statistics.cpp | 50 +- src/controller/controller_statistics.h | 6 + src/controller/controller_storage.cpp | 7 +- src/controller/controller_storage.h | 1 + src/controller/controller_streams.cpp | 11 +- src/controller/controller_streams.h | 2 + src/input/input.cpp | 95 +-- src/input/input.h | 15 +- src/input/input_buffer.cpp | 272 +++++--- src/input/input_buffer.h | 6 +- src/input/input_dtsc.cpp | 195 +++++- src/input/input_dtsc.h | 6 + src/input/input_mp3.cpp | 2 +- src/input/mist_in.cpp | 36 +- src/io.cpp | 233 ++++--- src/io.h | 53 +- src/output/mist_out.cpp | 2 + src/output/output.cpp | 685 +++++++++++++-------- src/output/output.h | 17 +- src/output/output_hds.cpp | 4 +- src/output/output_hls.cpp | 54 +- src/output/output_hls.h | 4 +- src/output/output_hss.cpp | 23 +- src/output/output_hss.h | 2 - src/output/output_http.cpp | 27 +- src/output/output_http_internal.cpp | 14 +- src/output/output_httpts.cpp | 10 +- src/output/output_httpts.h | 7 - src/output/output_progressive_flv.cpp | 1 - src/output/output_progressive_flv.h | 1 - src/output/output_progressive_mp3.cpp | 1 - src/output/output_progressive_mp3.h | 1 - src/output/output_raw.cpp | 8 +- src/output/output_rtmp.cpp | 178 +++--- src/output/output_rtmp.h | 4 +- src/output/output_ts.cpp | 7 +- src/output/output_ts_base.cpp | 4 +- 78 files changed, 2334 insertions(+), 1266 deletions(-) delete mode 100644 lib/CMakeLists.txt create mode 100644 lib/util.cpp create mode 100644 lib/util.h delete mode 100644 src/CMakeLists.txt diff --git a/.gitignore b/.gitignore index b1739226..34faf555 100644 --- a/.gitignore +++ b/.gitignore @@ -55,4 +55,5 @@ build.ninja rules.ninja .ninja_log .ninja_deps +aes_ctr128 diff --git a/CMakeLists.txt b/CMakeLists.txt index 9635144f..f05b4c72 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,7 @@ set(libHeaders ${SOURCE_DIR}/lib/defines.h ${SOURCE_DIR}/lib/dtsc.h ${SOURCE_DIR}/lib/flv_tag.h + ${SOURCE_DIR}/lib/h264.h ${SOURCE_DIR}/lib/http_parser.h ${SOURCE_DIR}/lib/json.h ${SOURCE_DIR}/lib/mp4_adobe.h @@ -120,6 +121,7 @@ set(libHeaders ${SOURCE_DIR}/lib/timing.h ${SOURCE_DIR}/lib/tinythread.h ${SOURCE_DIR}/lib/ts_packet.h + ${SOURCE_DIR}/lib/util.h ${SOURCE_DIR}/lib/vorbis.h ) @@ -136,6 +138,7 @@ set(libSources ${SOURCE_DIR}/lib/dtsc.cpp ${SOURCE_DIR}/lib/dtscmeta.cpp ${SOURCE_DIR}/lib/flv_tag.cpp + ${SOURCE_DIR}/lib/h264.cpp ${SOURCE_DIR}/lib/http_parser.cpp ${SOURCE_DIR}/lib/json.cpp ${SOURCE_DIR}/lib/mp4_adobe.cpp @@ -153,6 +156,7 @@ set(libSources ${SOURCE_DIR}/lib/timing.cpp ${SOURCE_DIR}/lib/tinythread.cpp ${SOURCE_DIR}/lib/ts_packet.cpp + ${SOURCE_DIR}/lib/util.cpp ${SOURCE_DIR}/lib/vorbis.cpp ) @@ -245,9 +249,9 @@ makeInput(Buffer buffer) ######################################## macro(makeOutput outputName format) #Parse all extra arguments, for http and ts flags + SET (tsBaseClass Output) if (";${ARGN};" MATCHES ";http;") SET(httpOutput src/output/output_http.cpp) - SET(tsBaseClass Output) if (";${ARGN};" MATCHES ";ts;") SET(tsBaseClass HTTPOutput) endif() diff --git a/Doxyfile.in b/Doxyfile.in index 2c30f767..ca69803f 100644 --- a/Doxyfile.in +++ b/Doxyfile.in @@ -228,7 +228,7 @@ TAB_SIZE = 2 # "Side Effects:". You can put \n's in the value part of an alias to insert # newlines. -ALIASES = "api=\xrefitem api \"API call\" \"API calls\"" +ALIASES = "api=\xrefitem api \"API call\" \"API calls\"" "triggers=\xrefitem triggers \"Trigger\" \"Triggers\"" # This tag can be used to specify a number of word-keyword mappings (TCL only). # A mapping has the form "name=value". For example adding "class=itcl::class" diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt deleted file mode 100644 index 4b84051d..00000000 --- a/lib/CMakeLists.txt +++ /dev/null @@ -1,69 +0,0 @@ -add_library ( mist SHARED - amf.cpp - amf.h - auth.cpp - auth.h - base64.cpp - base64.h - bitfields.cpp - bitfields.h - bitstream.cpp - bitstream.h - checksum.h - CMakeLists.txt - config.cpp - config.h - converter.cpp - converter.h - defines.h - dtsc.cpp - dtsc.h - dtscmeta.cpp - filesystem.cpp - filesystem.h - flv_tag.cpp - flv_tag.h - ftp.cpp - ftp.h - http_parser.cpp - http_parser.h - json.cpp - json.h - mp4_adobe.cpp - mp4_adobe.h - mp4.cpp - mp4_generic.cpp - mp4_generic.h - mp4.h - mp4_ms.cpp - mp4_ms.h - nal.cpp - nal.h - ogg.cpp - ogg.h - procs.cpp - procs.h - rtmpchunks.cpp - rtmpchunks.h - shared_memory.cpp - shared_memory.h - socket.cpp - socket.h - stream.cpp - stream.h - theora.cpp - theora.h - timing.cpp - timing.h - tinythread.cpp - tinythread.h - ts_packet.cpp - ts_packet.h - vorbis.cpp - vorbis.h -) - -target_link_libraries( mist - -lpthread - -lrt -) diff --git a/lib/bitfields.h b/lib/bitfields.h index 0ca119a4..8fb8dc98 100644 --- a/lib/bitfields.h +++ b/lib/bitfields.h @@ -15,7 +15,7 @@ namespace Bit{ //Host to binary/binary to host functions - similar to kernel ntoh/hton functions. /// Retrieves a short in network order from the pointer p. - inline unsigned short btohs(char * p) { + inline unsigned short btohs(const char * p) { return ((unsigned short)p[0] << 8) | p[1]; } diff --git a/lib/config.cpp b/lib/config.cpp index 15629e4d..7f3990c8 100644 --- a/lib/config.cpp +++ b/lib/config.cpp @@ -32,6 +32,7 @@ #include #include #include //for getMyExec +#include "procs.h" bool Util::Config::is_active = false; unsigned int Util::Config::printDebugLevel = DEBUG;// @@ -69,8 +70,6 @@ Util::Config::Config(std::string cmd) { /// { /// "short":"o", //The short option letter /// "long":"onName", //The long option -/// "short_off":"n", //The short option-off letter -/// "long_off":"offName", //The long option-off /// "arg":"integer", //The type of argument, if required. /// "value":[], //The default value(s) for this option if it is not given on the commandline. /// "arg_num":1, //The count this value has on the commandline, after all the options have been processed. @@ -88,9 +87,6 @@ void Util::Config::addOption(std::string optname, JSON::Value option) { if (it->isMember("long")) { long_count++; } - if (it->isMember("long_off")) { - long_count++; - } } } @@ -110,12 +106,6 @@ void Util::Config::printHelp(std::ostream & output) { longest = current; } current = 0; - if (it->isMember("long_off")) { - current += (*it)["long_off"].asString().size() + 4; - } - if (it->isMember("short_off")) { - current += (*it)["short_off"].asString().size() + 3; - } if (current > longest) { longest = current; } @@ -158,26 +148,6 @@ void Util::Config::printHelp(std::ostream & output) { output << f << (*it)["help"].asString() << std::endl; } } - if (it->isMember("long_off") || it->isMember("short_off")) { - if (it->isMember("long_off") && it->isMember("short_off")) { - f = "--" + (*it)["long_off"].asString() + ", -" + (*it)["short_off"].asString(); - } else { - if (it->isMember("long_off")) { - f = "--" + (*it)["long_off"].asString(); - } - if (it->isMember("short_off")) { - f = "-" + (*it)["short_off"].asString(); - } - } - while (f.size() < longest) { - f.append(" "); - } - if (it->isMember("arg")) { - output << f << "(" << (*it)["arg"].asString() << ") " << (*it)["help"].asString() << std::endl; - } else { - output << f << (*it)["help"].asString() << std::endl; - } - } if (it->isMember("arg_num")) { f = it.key(); while (f.size() < longest) { @@ -204,12 +174,6 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { shortopts += ":"; } } - if (it->isMember("short_off")) { - shortopts += (*it)["short_off"].asString(); - if (it->isMember("arg")) { - shortopts += ":"; - } - } if (it->isMember("long")) { longOpts[long_i].name = (*it)["long"].asStringRef().c_str(); longOpts[long_i].val = (*it)["short"].asString()[0]; @@ -218,14 +182,6 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { } long_i++; } - if (it->isMember("long_off")) { - longOpts[long_i].name = (*it)["long_off"].asStringRef().c_str(); - longOpts[long_i].val = (*it)["short_off"].asString()[0]; - if (it->isMember("arg")) { - longOpts[long_i].has_arg = 1; - } - long_i++; - } if (it->isMember("arg_num") && !(it->isMember("value") && (*it)["value"].size())) { if ((*it)["arg_num"].asInt() > arg_count) { arg_count = (*it)["arg_num"].asInt(); @@ -263,9 +219,6 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { } break; } - if (it->isMember("short_off") && (*it)["short_off"].asString()[0] == opt) { - (*it)["value"].append((long long int)0); - } } break; } @@ -289,6 +242,10 @@ bool Util::Config::parseArgs(int & argc, char ** & argv) { return true; } +bool Util::Config::hasOption(const std::string & optname) { + return vals.isMember(optname); +} + /// Returns a reference to the current value of an option or default if none was set. /// If the option does not exist, this exits the application with a return code of 37. JSON::Value & Util::Config::getOption(std::string optname, bool asArray) { @@ -298,12 +255,18 @@ JSON::Value & Util::Config::getOption(std::string optname, bool asArray) { } if (!vals[optname].isMember("value") || !vals[optname]["value"].isArray()) { vals[optname]["value"].append(JSON::Value()); + vals[optname]["value"].shrink(0); } if (asArray) { return vals[optname]["value"]; } else { int n = vals[optname]["value"].size(); - return vals[optname]["value"][n - 1]; + if (!n){ + static JSON::Value empty = ""; + return empty; + }else{ + return vals[optname]["value"][n - 1]; + } } } @@ -341,6 +304,7 @@ static void callThreadCallback(void * cDataArg) { } int Util::Config::threadServer(Socket::Server & server_socket, int (*callback)(Socket::Connection &)) { + Util::Procs::socketList.insert(server_socket.getSocket()); while (is_active && server_socket.connected()) { Socket::Connection S = server_socket.accept(); if (S.connected()) { //check if the new connection is valid @@ -356,11 +320,13 @@ int Util::Config::threadServer(Socket::Server & server_socket, int (*callback)(S Util::sleep(10); //sleep 10ms } } + Util::Procs::socketList.erase(server_socket.getSocket()); server_socket.close(); return 0; } int Util::Config::forkServer(Socket::Server & server_socket, int (*callback)(Socket::Connection &)) { + Util::Procs::socketList.insert(server_socket.getSocket()); while (is_active && server_socket.connected()) { Socket::Connection S = server_socket.accept(); if (S.connected()) { //check if the new connection is valid @@ -376,6 +342,7 @@ int Util::Config::forkServer(Socket::Server & server_socket, int (*callback)(Soc Util::sleep(10); //sleep 10ms } } + Util::Procs::socketList.erase(server_socket.getSocket()); server_socket.close(); return 0; } @@ -385,8 +352,8 @@ int Util::Config::serveThreadedSocket(int (*callback)(Socket::Connection &)) { if (vals.isMember("socket")) { server_socket = Socket::Server(Util::getTmpFolder() + getString("socket")); } - if (vals.isMember("listen_port") && vals.isMember("listen_interface")) { - server_socket = Socket::Server(getInteger("listen_port"), getString("listen_interface"), false); + if (vals.isMember("port") && vals.isMember("interface")) { + server_socket = Socket::Server(getInteger("port"), getString("interface"), false); } if (!server_socket.connected()) { DEBUG_MSG(DLVL_DEVEL, "Failure to open socket"); @@ -402,8 +369,8 @@ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection & S)) { if (vals.isMember("socket")) { server_socket = Socket::Server(Util::getTmpFolder() + getString("socket")); } - if (vals.isMember("listen_port") && vals.isMember("listen_interface")) { - server_socket = Socket::Server(getInteger("listen_port"), getString("listen_interface"), false); + if (vals.isMember("port") && vals.isMember("interface")) { + server_socket = Socket::Server(getInteger("port"), getString("interface"), false); } if (!server_socket.connected()) { DEBUG_MSG(DLVL_DEVEL, "Failure to open socket"); @@ -416,7 +383,6 @@ int Util::Config::serveForkedSocket(int (*callback)(Socket::Connection & S)) { /// Activated the stored config. This will: /// - Drop permissions to the stored "username", if any. -/// - Daemonize the process if "daemonize" exists and is true. /// - Set is_active to true. /// - Set up a signal handler to set is_active to false for the SIGINT, SIGHUP and SIGTERM signals. void Util::Config::activate() { @@ -424,14 +390,6 @@ void Util::Config::activate() { setUser(getString("username")); vals.removeMember("username"); } - if (vals.isMember("daemonize") && getBool("daemonize")) { - if (vals.isMember("logfile") && getString("logfile") != "") { - Daemonize(true); - } else { - Daemonize(false); - } - vals.removeMember("daemonize"); - } struct sigaction new_action; struct sigaction cur_action; new_action.sa_sigaction = signal_handler; @@ -476,33 +434,78 @@ void Util::Config::signal_handler(int signum, siginfo_t * sigInfo, void * ignore } } //signal_handler + +/// Adds the options from the given JSON capabilities structure. +/// Recurses into optional and required, added options as needed. +void Util::Config::addOptionsFromCapabilities(const JSON::Value & capa){ + //First add the required options. + if (capa.isMember("required") && capa["required"].size()){ + jsonForEachConst(capa["required"], it){ + if (!it->isMember("short") || !it->isMember("option") || !it->isMember("type")){ + FAIL_MSG("Incomplete required option: %s", it.key().c_str()); + continue; + } + JSON::Value opt; + opt["short"] = (*it)["short"]; + opt["long"] = (*it)["option"].asStringRef().substr(2); + if (it->isMember("type")){ + //int, uint, debug, select, str + if ((*it)["type"].asStringRef() == "int" || (*it)["type"].asStringRef() == "uint"){ + opt["arg"] = "integer"; + }else{ + opt["arg"] = "string"; + } + } + if (it->isMember("default")){ + opt["value"].append((*it)["default"]); + } + opt["help"] = (*it)["help"]; + addOption(it.key(), opt); + } + } + //Then, the optionals. + if (capa.isMember("optional") && capa["optional"].size()){ + jsonForEachConst(capa["optional"], it){ + if (it.key() == "debug"){continue;} + if (!it->isMember("short") || !it->isMember("option") || !it->isMember("default")){ + FAIL_MSG("Incomplete optional option: %s", it.key().c_str()); + continue; + } + JSON::Value opt; + opt["short"] = (*it)["short"]; + opt["long"] = (*it)["option"].asStringRef().substr(2); + if (it->isMember("type")){ + //int, uint, debug, select, str + if ((*it)["type"].asStringRef() == "int" || (*it)["type"].asStringRef() == "uint"){ + opt["arg"] = "integer"; + }else{ + opt["arg"] = "string"; + } + } + if (it->isMember("default")){ + opt["value"].append((*it)["default"]); + } + opt["help"] = (*it)["help"]; + addOption(it.key(), opt); + } + } +} + /// Adds the default connector options. Also updates the capabilities structure with the default options. /// Besides the options addBasicConnectorOptions adds, this function also adds port and interface options. void Util::Config::addConnectorOptions(int port, JSON::Value & capabilities) { - JSON::Value option; - option.null(); - option["long"] = "port"; - option["short"] = "p"; - option["arg"] = "integer"; - option["help"] = "TCP port to listen on"; - option["value"].append((long long)port); - addOption("listen_port", option); capabilities["optional"]["port"]["name"] = "TCP port"; - capabilities["optional"]["port"]["help"] = "TCP port to listen on - default if unprovided is " + option["value"][0u].asString(); + capabilities["optional"]["port"]["help"] = "TCP port to listen on"; capabilities["optional"]["port"]["type"] = "uint"; + capabilities["optional"]["port"]["short"] = "p"; capabilities["optional"]["port"]["option"] = "--port"; - capabilities["optional"]["port"]["default"] = option["value"][0u]; + capabilities["optional"]["port"]["default"] = (long long)port; - option.null(); - option["long"] = "interface"; - option["short"] = "i"; - option["arg"] = "string"; - option["help"] = "Interface address to listen on, or 0.0.0.0 for all available interfaces."; - option["value"].append("0.0.0.0"); - addOption("listen_interface", option); capabilities["optional"]["interface"]["name"] = "Interface"; - capabilities["optional"]["interface"]["help"] = "Address of the interface to listen on - default if unprovided is all interfaces"; + capabilities["optional"]["interface"]["help"] = "Address of the interface to listen on"; + capabilities["optional"]["interface"]["default"] = "0.0.0.0"; capabilities["optional"]["interface"]["option"] = "--interface"; + capabilities["optional"]["interface"]["short"] = "i"; capabilities["optional"]["interface"]["type"] = "str"; addBasicConnectorOptions(capabilities); @@ -510,38 +513,16 @@ void Util::Config::addConnectorOptions(int port, JSON::Value & capabilities) { /// Adds the default connector options. Also updates the capabilities structure with the default options. void Util::Config::addBasicConnectorOptions(JSON::Value & capabilities) { - JSON::Value option; - option.null(); - option["long"] = "username"; - option["short"] = "u"; - option["arg"] = "string"; - option["help"] = "Username to drop privileges to, or root to not drop provileges."; - option["value"].append("root"); - addOption("username", option); capabilities["optional"]["username"]["name"] = "Username"; capabilities["optional"]["username"]["help"] = "Username to drop privileges to - default if unprovided means do not drop privileges"; capabilities["optional"]["username"]["option"] = "--username"; + capabilities["optional"]["username"]["short"] = "u"; + capabilities["optional"]["username"]["default"] = "root"; capabilities["optional"]["username"]["type"] = "str"; + addOptionsFromCapabilities(capabilities); - if (capabilities.isMember("socket")) { - option.null(); - option["arg"] = "string"; - option["help"] = "Socket name that can be connected to for this connector."; - option["value"].append(capabilities["socket"]); - addOption("socket", option); - } - - option.null(); - option["long"] = "daemon"; - option["short"] = "d"; - option["long_off"] = "nodaemon"; - option["short_off"] = "n"; - option["help"] = "Whether or not to daemonize the process after starting."; - option["value"].append(0ll); - addOption("daemonize", option); - - option.null(); + JSON::Value option; option["long"] = "json"; option["short"] = "j"; option["help"] = "Output connector info in JSON format, then exit."; @@ -631,17 +612,3 @@ void Util::setUser(std::string username) { } } -/// Will turn the current process into a daemon. -/// Works by calling daemon(1,0): -/// Does not change directory to root. -/// Does redirect output to /dev/null -void Util::Daemonize(bool notClose) { - DEBUG_MSG(DLVL_DEVEL, "Going into background mode..."); - int noClose = 0; - if (notClose) { - noClose = 1; - } - if (daemon(1, noClose) < 0) { - DEBUG_MSG(DLVL_ERROR, "Failed to daemonize: %s", strerror(errno)); - } -} diff --git a/lib/config.h b/lib/config.h index af3d2c26..bdefe569 100644 --- a/lib/config.h +++ b/lib/config.h @@ -30,6 +30,7 @@ namespace Util { void addOption(std::string optname, JSON::Value option); void printHelp(std::ostream & output); bool parseArgs(int & argc, char ** & argv); + bool hasOption(const std::string & optname); JSON::Value & getOption(std::string optname, bool asArray = false); std::string getString(std::string optname); long long int getInteger(std::string optname); @@ -40,6 +41,7 @@ namespace Util { int serveThreadedSocket(int (*callback)(Socket::Connection & S)); int serveForkedSocket(int (*callback)(Socket::Connection & S)); int servePlainSocket(int (*callback)(Socket::Connection & S)); + void addOptionsFromCapabilities(const JSON::Value & capabilities); void addBasicConnectorOptions(JSON::Value & capabilities); void addConnectorOptions(int port, JSON::Value & capabilities); }; @@ -53,7 +55,4 @@ namespace Util { /// Will set the active user to the named username. void setUser(std::string user); - /// Will turn the current process into a daemon. - void Daemonize(bool notClose = false); - } diff --git a/lib/defines.h b/lib/defines.h index 917f9fe5..ea7583bf 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -57,11 +57,28 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #endif + +#ifndef SHM_DATASIZE +#define SHM_DATASIZE 25 +#endif + + +#ifndef STATS_DELAY +#define STATS_DELAY 15 +#endif + +#ifndef INPUT_TIMEOUT +#define INPUT_TIMEOUT STATS_DELAY +#endif + /// The size used for stream header pages under Windows, where they cannot be size-detected. #define DEFAULT_META_PAGE_SIZE 16 * 1024 * 1024 +/// The size used for stream header pages under Windows, where they cannot be size-detected. +#define DEFAULT_STRM_PAGE_SIZE 4 * 1024 * 1024 + /// The size used for stream data pages under Windows, where they cannot be size-detected. -#define DEFAULT_DATA_PAGE_SIZE 25 * 1024 * 1024 +#define DEFAULT_DATA_PAGE_SIZE SHM_DATASIZE * 1024 * 1024 /// The size used for server configuration pages. #define DEFAULT_CONF_PAGE_SIZE 4 * 1024 * 1024 @@ -72,10 +89,15 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", " #define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name #define SHM_TRACK_META "MstTRAK%s@%lu" //%s stream name, %lu track ID #define SHM_TRACK_INDEX "MstTRID%s@%lu" //%s stream name, %lu track ID +#define SHM_TRACK_INDEX_SIZE 8192 #define SHM_TRACK_DATA "MstDATA%s@%lu_%lu" //%s stream name, %lu track ID, %lu page # #define SHM_STATISTICS "MstSTAT" #define SHM_USERS "MstUSER%s" //%s stream name -#define SEM_LIVE "MstLIVE%s" //%s stream name +#define SHM_TRIGGER "MstTRIG%s" //%s trigger name +#define SEM_LIVE "/MstLIVE%s" //%s stream name +#define SEM_INPUT "/MstInpt%s" //%s stream name +#define SEM_CONF "/MstConfLock" +#define SHM_CONF "MstConf" #define NAME_BUFFER_SIZE 200 //char buffer size for snprintf'ing shm filenames #define SIMUL_TRACKS 10 diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 4f25c07c..ad02c164 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -9,6 +9,7 @@ char DTSC::Magic_Header[] = "DTSC"; char DTSC::Magic_Packet[] = "DTPD"; char DTSC::Magic_Packet2[] = "DTP2"; +char DTSC::Magic_Command[] = "DTCM"; DTSC::File::File() { F = 0; @@ -32,8 +33,7 @@ DTSC::File & DTSC::File::operator =(const File & rhs) { if (rhs.myPack) { myPack = rhs.myPack; } - metaStorage = rhs.metaStorage; - metadata = metaStorage; + metadata = rhs.metadata; currtime = rhs.currtime; lastreadpos = rhs.lastreadpos; headerSize = rhs.headerSize; @@ -67,7 +67,7 @@ DTSC::File::File(std::string filename, bool create) { } created = create; if (!F) { - DEBUG_MSG(DLVL_ERROR, "Could not open file %s", filename.c_str()); + HIGH_MSG("Could not open file %s", filename.c_str()); return; } fseek(F, 0, SEEK_END); @@ -83,7 +83,7 @@ DTSC::File::File(std::string filename, bool create) { return; } if (memcmp(buffer, DTSC::Magic_Header, 4) != 0) { - if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0 && memcmp(buffer, DTSC::Magic_Packet, 4) != 0) { + if (memcmp(buffer, DTSC::Magic_Packet2, 4) != 0 && memcmp(buffer, DTSC::Magic_Packet, 4) != 0 && memcmp(buffer, DTSC::Magic_Command, 4) != 0) { DEBUG_MSG(DLVL_ERROR, "%s is not a valid DTSC file", filename.c_str()); fclose(F); F = 0; @@ -113,8 +113,7 @@ DTSC::File::File(std::string filename, bool create) { fseek(F, 0, SEEK_SET); File Fhead(filename + ".dtsh"); if (Fhead) { - metaStorage = Fhead.metaStorage; - metadata = metaStorage; + metadata = Fhead.metadata; } } currframe = 0; @@ -346,8 +345,9 @@ void DTSC::File::seekNext() { } void DTSC::File::parseNext(){ + char header_buffer[4] = {0, 0, 0, 0}; lastreadpos = ftell(F); - if (fread(buffer, 4, 1, F) != 1) { + if (fread(header_buffer, 4, 1, F) != 1) { if (feof(F)) { DEBUG_MSG(DLVL_DEVEL, "End of file reached @ %d", (int)lastreadpos); } else { @@ -356,55 +356,26 @@ void DTSC::File::parseNext(){ myPack.null(); return; } - if (memcmp(buffer, DTSC::Magic_Header, 4) == 0) { - if (lastreadpos != 0) { - readHeader(lastreadpos); - std::string tmp = metaStorage.toNetPacked(); - myPack.reInit(tmp.data(), tmp.size()); - DEBUG_MSG(DLVL_DEVEL, "Read another header"); - } else { - if (fread(buffer, 4, 1, F) != 1) { - DEBUG_MSG(DLVL_ERROR, "Could not read header size @ %d", (int)lastreadpos); - myPack.null(); - return; - } - long packSize = ntohl(((unsigned long *)buffer)[0]); - std::string strBuffer = "DTSC"; - strBuffer.append((char *)buffer, 4); - strBuffer.resize(packSize + 8); - if (fread((void *)(strBuffer.c_str() + 8), packSize, 1, F) != 1) { - DEBUG_MSG(DLVL_ERROR, "Could not read header @ %d", (int)lastreadpos); - myPack.null(); - return; - } - myPack.reInit(strBuffer.data(), strBuffer.size()); - } - return; - } long long unsigned int version = 0; - if (memcmp(buffer, DTSC::Magic_Packet, 4) == 0) { + if (memcmp(header_buffer, DTSC::Magic_Packet, 4) == 0 || memcmp(header_buffer, DTSC::Magic_Command, 4) == 0 || memcmp(header_buffer, DTSC::Magic_Header, 4) == 0) { version = 1; } - if (memcmp(buffer, DTSC::Magic_Packet2, 4) == 0) { + if (memcmp(header_buffer, DTSC::Magic_Packet2, 4) == 0) { version = 2; } if (version == 0) { - DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x - %.4s != %.4s @ %d", (unsigned int)lastreadpos, (char *)buffer, DTSC::Magic_Packet2, (int)lastreadpos); + DEBUG_MSG(DLVL_ERROR, "Invalid packet header @ %#x: %.4s", (unsigned int)lastreadpos, (char *)buffer); myPack.null(); return; } if (fread(buffer, 4, 1, F) != 1) { - DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %d", (int)lastreadpos); + DEBUG_MSG(DLVL_ERROR, "Could not read packet size @ %#x", (unsigned int)lastreadpos); myPack.null(); return; } long packSize = ntohl(((unsigned long *)buffer)[0]); char * packBuffer = (char *)malloc(packSize + 8); - if (version == 1) { - memcpy(packBuffer, "DTPD", 4); - } else { - memcpy(packBuffer, "DTP2", 4); - } + memcpy(packBuffer, header_buffer, 4); memcpy(packBuffer + 4, buffer, 4); if (fread((void *)(packBuffer + 8), packSize, 1, F) != 1) { DEBUG_MSG(DLVL_ERROR, "Could not read packet @ %d", (int)lastreadpos); @@ -432,6 +403,11 @@ bool DTSC::File::seek_time(unsigned int ms, unsigned int trackNo, bool forceSeek if (!forceSeek && myPack && ms >= myPack.getTime() && trackNo >= myPack.getTrackId()) { tmpPos.seekTime = myPack.getTime(); tmpPos.bytePos = getBytePos(); + /* + if (trackNo == myPack.getTrackId()){ + tmpPos.bytePos += myPack.getDataLen(); + } + */ } else { tmpPos.seekTime = 0; tmpPos.bytePos = 0; diff --git a/lib/dtsc.h b/lib/dtsc.h index 4b2de033..5d70e893 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -34,6 +34,7 @@ namespace DTSC { extern char Magic_Header[]; ///< The magic bytes for a DTSC header extern char Magic_Packet[]; ///< The magic bytes for a DTSC packet extern char Magic_Packet2[]; ///< The magic bytes for a DTSC packet version 2 + extern char Magic_Command[]; ///< The magic bytes for a DTCM packet ///\brief A simple structure used for ordering byte seek positions. struct seekPos { @@ -61,7 +62,8 @@ namespace DTSC { DTSC_INVALID, DTSC_HEAD, DTSC_V1, - DTSC_V2 + DTSC_V2, + DTCM }; /// This class allows scanning through raw binary format DTSC data. @@ -107,6 +109,7 @@ namespace DTSC { void operator = (const Packet & rhs); operator bool() const; packType getVersion() const; + void reInit(Socket::Connection & src); void reInit(const char * data_, unsigned int len, bool noCopy = false); void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, long long packBytePos, bool isKeyframe); void getString(const char * identifier, char *& result, unsigned int & len) const; @@ -205,6 +208,17 @@ namespace DTSC { char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: +#ifdef BIGMETA +#define PACKED_KEY_SIZE 25 + ///\brief Data storage for this Key. + /// + /// - 8 bytes: MSB storage of the position of the first packet of this keyframe within the file. + /// - 3 bytes: MSB storage of the duration of this keyframe. + /// - 4 bytes: MSB storage of the number of this keyframe. + /// - 2 bytes: MSB storage of the amount of parts in this keyframe. + /// - 8 bytes: MSB storage of the timestamp associated with this keyframe's first packet. +#else +#define PACKED_KEY_SIZE 16 ///\brief Data storage for this Key. /// /// - 5 bytes: MSB storage of the position of the first packet of this keyframe within the file. @@ -212,7 +226,8 @@ namespace DTSC { /// - 2 bytes: MSB storage of the number of this keyframe. /// - 2 bytes: MSB storage of the amount of parts in this keyframe. /// - 4 bytes: MSB storage of the timestamp associated with this keyframe's first packet. - char data[16]; +#endif + char data[PACKED_KEY_SIZE]; }; ///\brief Basic class for storage of data associated with fragments. @@ -229,13 +244,24 @@ namespace DTSC { char * getData(); void toPrettyString(std::ostream & str, int indent = 0); private: - ///\Brief Data storage for this Fragment. +#ifdef BIGMETA +#define PACKED_FRAGMENT_SIZE 13 + ///\brief Data storage for this Fragment. + /// + /// - 4 bytes: duration (in milliseconds) + /// - 1 byte: length (amount of keyframes) + /// - 4 bytes: number of first keyframe in fragment + /// - 4 bytes: size of fragment in bytes +#else +#define PACKED_FRAGMENT_SIZE 11 + ///\brief Data storage for this Fragment. /// /// - 4 bytes: duration (in milliseconds) /// - 1 byte: length (amount of keyframes) /// - 2 bytes: number of first keyframe in fragment /// - 4 bytes: size of fragment in bytes - char data[11]; +#endif + char data[PACKED_FRAGMENT_SIZE]; }; ///\brief Class for storage of track data @@ -249,10 +275,10 @@ namespace DTSC { return (parts.size() && keySizes.size() && (keySizes.size() == keys.size())); } void update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size = 5000); - int getSendLen(); - void send(Socket::Connection & conn); + int getSendLen(bool skipDynamic = false); + void send(Socket::Connection & conn, bool skipDynamic = false); void writeTo(char *& p); - JSON::Value toJSON(); + JSON::Value toJSON(bool skipDynamic = false); std::deque fragments; std::deque keys; std::deque keySizes; @@ -302,8 +328,8 @@ namespace DTSC { void updatePosOverride(DTSC::Packet & pack, unsigned long bpos); void update(JSON::Value & pack, unsigned long segment_size = 5000); void update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize = 0, unsigned long segment_size = 5000); - unsigned int getSendLen(); - void send(Socket::Connection & conn); + unsigned int getSendLen(bool skipDynamic = false, std::set selectedTracks = std::set()); + void send(Socket::Connection & conn, bool skipDynamic = false, std::set selectedTracks = std::set()); void writeTo(char * p); JSON::Value toJSON(); void reset(); @@ -348,7 +374,6 @@ namespace DTSC { long int endPos; void readHeader(int pos); DTSC::Packet myPack; - JSON::Value metaStorage; Meta metadata; std::map trackMapping; long long int currtime; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index d431f222..1913925e 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -109,6 +109,32 @@ namespace DTSC { } } + void Packet::reInit(Socket::Connection & src) { + int sleepCount = 0; + null(); + int toReceive = 0; + while (src.connected()){ + if (!toReceive && src.Received().available(8)){ + if (src.Received().copy(2) != "DT"){ + INFO_MSG("Invalid DTSC Packet header encountered (%s)", src.Received().copy(4).c_str()); + break; + } + toReceive = Bit::btohl(src.Received().copy(8).data() + 4); + } + if (toReceive && src.Received().available(toReceive + 8)){ + std::string dataBuf = src.Received().remove(toReceive + 8); + reInit(dataBuf.data(), dataBuf.size()); + return; + } + if(!src.spool()){ + if (sleepCount++ > 60){ + return; + } + Util::sleep(100); + } + } + } + ///\brief Initializes a packet with new data ///\param data_ The new data for the packet ///\param len The length of the data pointed to by data_ @@ -159,11 +185,15 @@ namespace DTSC { if (!memcmp(data, Magic_Header, 4)) { version = DTSC_HEAD; } else { + if (!memcmp(data, Magic_Command, 4)) { + version = DTCM; + } else { DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with invalid header"); return; } } } + } } else { DEBUG_MSG(DLVL_FAIL, "ReInit received a packet with size < 4"); return; @@ -248,7 +278,7 @@ namespace DTSC { if (p[0] == DTSC_OBJ || p[0] == DTSC_CON) { p++; //object, scan contents - while (p[0] + p[1] != 0 && p < max) { //while not encountering 0x0000 (we assume 0x0000EE) + while (p < max && p[0] + p[1] != 0) { //while not encountering 0x0000 (we assume 0x0000EE) if (p + 2 >= max) { return 0;//out of packet! } @@ -264,7 +294,7 @@ namespace DTSC { if (p[0] == DTSC_ARR) { p++; //array, scan contents - while (p[0] + p[1] != 0 && p < max) { //while not encountering 0x0000 (we assume 0x0000EE) + while (p < max && p[0] + p[1] != 0) { //while not encountering 0x0000 (we assume 0x0000EE) //search through contents... p = skipDTSC(p, max); if (!p) { @@ -844,53 +874,93 @@ namespace DTSC { ///\brief Returns the byteposition of a keyframe unsigned long long Key::getBpos() { +#ifdef BIGMETA + return Bit::btohll(data); +#else return (((unsigned long long)data[0] << 32) | (data[1] << 24) | (data[2] << 16) | (data[3] << 8) | data[4]); +#endif } void Key::setBpos(unsigned long long newBpos) { +#ifdef BIGMETA + Bit::htobll(data, newBpos); +#else data[4] = newBpos & 0xFF; data[3] = (newBpos >> 8) & 0xFF; data[2] = (newBpos >> 16) & 0xFF; data[1] = (newBpos >> 24) & 0xFF; data[0] = (newBpos >> 32) & 0xFF; +#endif } unsigned long Key::getLength() { +#ifdef BIGMETA + return Bit::btoh24(data+8); +#else return Bit::btoh24(data+5); +#endif } void Key::setLength(unsigned long newLength) { +#ifdef BIGMETA + Bit::htob24(data+8, newLength); +#else Bit::htob24(data+5, newLength); +#endif } ///\brief Returns the number of a keyframe unsigned long Key::getNumber() { +#ifdef BIGMETA + return Bit::btohl(data + 11); +#else return Bit::btohs(data + 8); +#endif } ///\brief Sets the number of a keyframe void Key::setNumber(unsigned long newNumber) { +#ifdef BIGMETA + Bit::htobl(data + 11, newNumber); +#else Bit::htobs(data + 8, newNumber); +#endif } ///\brief Returns the number of parts of a keyframe unsigned short Key::getParts() { +#ifdef BIGMETA + return Bit::btohs(data + 15); +#else return Bit::btohs(data + 10); +#endif } ///\brief Sets the number of parts of a keyframe void Key::setParts(unsigned short newParts) { +#ifdef BIGMETA + Bit::htobs(data + 15, newParts); +#else Bit::htobs(data + 10, newParts); +#endif } ///\brief Returns the timestamp of a keyframe unsigned long long Key::getTime() { +#ifdef BIGMETA + return Bit::btohll(data + 17); +#else return Bit::btohl(data + 12); +#endif } ///\brief Sets the timestamp of a keyframe void Key::setTime(unsigned long long newTime) { +#ifdef BIGMETA + Bit::htobll(data + 17, newTime); +#else Bit::htobl(data + 12, newTime); +#endif } ///\brief Returns the data of this keyframe struct @@ -927,22 +997,38 @@ namespace DTSC { ///\brief Returns the number of the first keyframe in this fragment unsigned long Fragment::getNumber() { +#ifdef BIGMETA + return Bit::btohl(data + 5); +#else return Bit::btohs(data + 5); +#endif } ///\brief Sets the number of the first keyframe in this fragment void Fragment::setNumber(unsigned long newNumber) { +#ifdef BIGMETA + Bit::htobl(data + 5, newNumber); +#else Bit::htobs(data + 5, newNumber); +#endif } ///\brief Returns the size of a fragment unsigned long Fragment::getSize() { +#ifdef BIGMETA + return Bit::btohl(data + 9); +#else return Bit::btohl(data + 7); +#endif } ///\brief Sets the size of a fragement void Fragment::setSize(unsigned long newSize) { +#ifdef BIGMETA + Bit::htobl(data + 9, newSize); +#else Bit::htobl(data + 7, newSize); +#endif } ///\brief Returns thte data of this fragment structure @@ -976,11 +1062,11 @@ namespace DTSC { Track::Track(JSON::Value & trackRef) { if (trackRef.isMember("fragments") && trackRef["fragments"].isString()) { Fragment * tmp = (Fragment *)trackRef["fragments"].asStringRef().data(); - fragments = std::deque(tmp, tmp + (trackRef["fragments"].asStringRef().size() / 11)); + fragments = std::deque(tmp, tmp + (trackRef["fragments"].asStringRef().size() / PACKED_FRAGMENT_SIZE)); } if (trackRef.isMember("keys") && trackRef["keys"].isString()) { Key * tmp = (Key *)trackRef["keys"].asStringRef().data(); - keys = std::deque(tmp, tmp + (trackRef["keys"].asStringRef().size() / 16)); + keys = std::deque(tmp, tmp + (trackRef["keys"].asStringRef().size() / PACKED_KEY_SIZE)); } if (trackRef.isMember("parts") && trackRef["parts"].isString()) { Part * tmp = (Part *)trackRef["parts"].asStringRef().data(); @@ -1018,13 +1104,13 @@ namespace DTSC { char * tmp = 0; unsigned int tmplen = 0; trackRef.getMember("fragments").getString(tmp, tmplen); - fragments = std::deque((Fragment *)tmp, ((Fragment *)tmp) + (tmplen / 11)); + fragments = std::deque((Fragment *)tmp, ((Fragment *)tmp) + (tmplen / PACKED_FRAGMENT_SIZE)); } if (trackRef.getMember("keys").getType() == DTSC_STR) { char * tmp = 0; unsigned int tmplen = 0; trackRef.getMember("keys").getString(tmp, tmplen); - keys = std::deque((Key *)tmp, ((Key *)tmp) + (tmplen / 16)); + keys = std::deque((Key *)tmp, ((Key *)tmp) + (tmplen / PACKED_KEY_SIZE)); } if (trackRef.getMember("parts").getType() == DTSC_STR) { char * tmp = 0; @@ -1064,7 +1150,13 @@ namespace DTSC { ///Will also insert keyframes on non-video tracks, and creates fragments void Track::update(long long packTime, long long packOffset, long long packDataSize, long long packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size) { if ((unsigned long long)packTime < lastms) { - DEBUG_MSG(DLVL_WARN, "Received packets for track %u in wrong order (%lld < %llu) - ignoring!", trackID, packTime, lastms); + static bool warned = false; + if (!warned){ + ERROR_MSG("Received packets for track %u in wrong order (%lld < %llu) - ignoring! Further messages on HIGH level.", trackID, packTime, lastms); + warned = true; + }else{ + HIGH_MSG("Received packets for track %u in wrong order (%lld < %llu) - ignoring! Further messages on HIGH level.", trackID, packTime, lastms); + } return; } Part newPart; @@ -1384,20 +1476,22 @@ namespace DTSC { } ///\brief Determines the "packed" size of a track - int Track::getSendLen() { - int result = 146 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); - result += fragments.size() * 11; - result += keys.size() * 16; + int Track::getSendLen(bool skipDynamic) { + int result = 107 + init.size() + codec.size() + type.size() + getWritableIdentifier().size(); + if (!skipDynamic){ + result += fragments.size() * PACKED_FRAGMENT_SIZE + 16; + result += keys.size() * PACKED_KEY_SIZE + 11; if (keySizes.size()){ - result += 11 + (keySizes.size() * 4) + 4; + result += (keySizes.size() * 4) + 15; + } + result += parts.size() * 9 + 12; } - result += parts.size() * 9; if (type == "audio") { result += 49; } else if (type == "video") { result += 48; } - if (missedFrags) { + if (!skipDynamic && missedFrags) { result += 23; } return result; @@ -1420,19 +1514,23 @@ namespace DTSC { ///\brief Writes a track to a pointer void Track::writeTo(char *& p) { + std::deque::iterator firstFrag = fragments.begin(); + if (fragments.size() && (&firstFrag) == 0){ + return; + } std::string trackIdent = getWritableIdentifier(); writePointer(p, convertShort(trackIdent.size()), 2); writePointer(p, trackIdent); writePointer(p, "\340", 1);//Begin track object writePointer(p, "\000\011fragments\002", 12); - writePointer(p, convertInt(fragments.size() * 11), 4); + writePointer(p, convertInt(fragments.size() * PACKED_FRAGMENT_SIZE), 4); for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { - writePointer(p, it->getData(), 11); + writePointer(p, it->getData(), PACKED_FRAGMENT_SIZE); } writePointer(p, "\000\004keys\002", 7); - writePointer(p, convertInt(keys.size() * 16), 4); + writePointer(p, convertInt(keys.size() * PACKED_KEY_SIZE), 4); for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { - writePointer(p, it->getData(), 16); + writePointer(p, it->getData(), PACKED_KEY_SIZE); } writePointer(p, "\000\010keysizes\002,", 11); writePointer(p, convertInt(keySizes.size() * 4), 4); @@ -1490,19 +1588,20 @@ namespace DTSC { } ///\brief Writes a track to a socket - void Track::send(Socket::Connection & conn) { + void Track::send(Socket::Connection & conn, bool skipDynamic) { conn.SendNow(convertShort(getWritableIdentifier().size()), 2); conn.SendNow(getWritableIdentifier()); conn.SendNow("\340", 1);//Begin track object + if (!skipDynamic){ conn.SendNow("\000\011fragments\002", 12); - conn.SendNow(convertInt(fragments.size() * 11), 4); + conn.SendNow(convertInt(fragments.size() * PACKED_FRAGMENT_SIZE), 4); for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { - conn.SendNow(it->getData(), 11); + conn.SendNow(it->getData(), PACKED_FRAGMENT_SIZE); } conn.SendNow("\000\004keys\002", 7); - conn.SendNow(convertInt(keys.size() * 16), 4); + conn.SendNow(convertInt(keys.size() * PACKED_KEY_SIZE), 4); for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { - conn.SendNow(it->getData(), 16); + conn.SendNow(it->getData(), PACKED_KEY_SIZE); } conn.SendNow("\000\010keysizes\002,", 11); conn.SendNow(convertInt(keySizes.size() * 4), 4); @@ -1520,9 +1619,10 @@ namespace DTSC { for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { conn.SendNow(it->getData(), 9); } + } conn.SendNow("\000\007trackid\001", 10); conn.SendNow(convertLongLong(trackID), 8); - if (missedFrags) { + if (!skipDynamic && missedFrags) { conn.SendNow("\000\014missed_frags\001", 15); conn.SendNow(convertLongLong(missedFrags), 8); } @@ -1560,10 +1660,12 @@ namespace DTSC { } ///\brief Determines the "packed" size of a meta object - unsigned int Meta::getSendLen() { + unsigned int Meta::getSendLen(bool skipDynamic, std::set selectedTracks) { unsigned int dataLen = 16 + (vod ? 14 : 0) + (live ? 15 : 0) + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21; for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - dataLen += it->second.getSendLen(); + if (!selectedTracks.size() || selectedTracks.count(it->first)){ + dataLen += it->second.getSendLen(skipDynamic); + } } return dataLen + 8; //add 8 bytes header } @@ -1600,13 +1702,15 @@ namespace DTSC { } ///\brief Writes a meta object to a socket - void Meta::send(Socket::Connection & conn) { - int dataLen = getSendLen() - 8; //strip 8 bytes header + void Meta::send(Socket::Connection & conn, bool skipDynamic, std::set selectedTracks) { + int dataLen = getSendLen(skipDynamic, selectedTracks) - 8; //strip 8 bytes header conn.SendNow(DTSC::Magic_Header, 4); conn.SendNow(convertInt(dataLen), 4); conn.SendNow("\340\000\006tracks\340", 10); for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { - it->second.send(conn); + if (!selectedTracks.size() || selectedTracks.count(it->first)){ + it->second.send(conn, skipDynamic); + } } conn.SendNow("\000\000\356", 3);//End tracks object if (vod) { @@ -1631,35 +1735,38 @@ namespace DTSC { } ///\brief Converts a track to a JSON::Value - JSON::Value Track::toJSON() { + JSON::Value Track::toJSON(bool skipDynamic) { JSON::Value result; std::string tmp; - tmp.reserve(fragments.size() * 11); - for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { - tmp.append(it->getData(), 11); + if (!skipDynamic) { + tmp.reserve(fragments.size() * PACKED_FRAGMENT_SIZE); + for (std::deque::iterator it = fragments.begin(); it != fragments.end(); it++) { + tmp.append(it->getData(), PACKED_FRAGMENT_SIZE); + } + result["fragments"] = tmp; + tmp = ""; + tmp.reserve(keys.size() * PACKED_KEY_SIZE); + for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { + tmp.append(it->getData(), PACKED_KEY_SIZE); + } + result["keys"] = tmp; + tmp = ""; + tmp.reserve(keySizes.size() * 4); + for (unsigned int i = 0; i < keySizes.size(); i++){ + tmp += (char)((keySizes[i] >> 24)); + tmp += (char)((keySizes[i] >> 16)); + tmp += (char)((keySizes[i] >> 8)); + tmp += (char)(keySizes[i]); + } + result["keysizes"] = tmp; + tmp = ""; + tmp.reserve(parts.size() * 9); + for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { + tmp.append(it->getData(), 9); + } + result["parts"] = tmp; } - result["fragments"] = tmp; - tmp = ""; - tmp.reserve(keys.size() * 16); - for (std::deque::iterator it = keys.begin(); it != keys.end(); it++) { - tmp.append(it->getData(), 16); - } - result["keys"] = tmp; - tmp = ""; - tmp.reserve(keySizes.size() * 4); - for (unsigned int i = 0; i < keySizes.size(); i++){ - tmp += (char)((keySizes[i] >> 24)); - tmp += (char)((keySizes[i] >> 16)); - tmp += (char)((keySizes[i] >> 8)); - tmp += (char)(keySizes[i]); - } - result["keysizes"] = tmp; - tmp = ""; - tmp.reserve(parts.size() * 9); - for (std::deque::iterator it = parts.begin(); it != parts.end(); it++) { - tmp.append(it->getData(), 9); - } - result["parts"] = tmp; + result["init"] = init; result["trackid"] = trackID; result["firstms"] = (long long)firstms; result["lastms"] = (long long)lastms; @@ -1669,7 +1776,6 @@ namespace DTSC { } result["codec"] = codec; result["type"] = type; - result["init"] = init; if (type == "audio") { result["rate"] = rate; result["size"] = size; diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 9d2289e5..939a96c6 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -11,6 +11,9 @@ #include //memcpy #include + +#include "h264.h" //Needed for init data parsing in case of invalid values from FLV init + /// Holds the last FLV header parsed. /// Defaults to a audio+video header on FLV version 0x01 if no header received yet. char FLV::Header[13] = {'F', 'L', 'V', 0x01, 0x05, 0, 0, 0, 0x09, 0, 0, 0, 0}; @@ -1099,6 +1102,15 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u } metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } + ///this is a hacky way around invalid FLV data (since it gets ignored nearly everywhere, but we do need correct data... + if (!metadata.tracks[reTrack].width || !metadata.tracks[reTrack].height || !metadata.tracks[reTrack].fpks){ + h264::sequenceParameterSet sps; + sps.fromDTSCInit(metadata.tracks[reTrack].init); + h264::SPSMeta spsChar = sps.getCharacteristics(); + metadata.tracks[reTrack].width = spsChar.width; + metadata.tracks[reTrack].height = spsChar.height; + metadata.tracks[reTrack].fpks = spsChar.fps * 1000; + } pack_out.null(); return pack_out; //skip rest of parsing, get next tag. } diff --git a/lib/h264.cpp b/lib/h264.cpp index 6d6efd3b..a256b61e 100644 --- a/lib/h264.cpp +++ b/lib/h264.cpp @@ -81,6 +81,12 @@ namespace h264 { sequenceParameterSet::sequenceParameterSet(const char * _data, unsigned long _dataLen) : data(_data), dataLen(_dataLen) {} + //DTSC Initdata is the payload for an avcc box. init[8+] is data, init[6-7] is a network-encoded length + void sequenceParameterSet::fromDTSCInit(const std::string & dtscInit){ + data = dtscInit.data() + 8; + dataLen = Bit::btohs(dtscInit.data() + 6); + } + SPSMeta sequenceParameterSet::getCharacteristics() const { SPSMeta result; @@ -107,8 +113,10 @@ namespace h264 { } char profileIdc = bs.get(8); + result.profile = profileIdc; //Start skipping unused data - bs.skip(16); + bs.skip(8); + result.level = bs.get(8); bs.getUExpGolomb(); if (profileIdc == 100 || profileIdc == 110 || profileIdc == 122 || profileIdc == 244 || profileIdc == 44 || profileIdc == 83 || profileIdc == 86 || profileIdc == 118 || profileIdc == 128) { //chroma format idc diff --git a/lib/h264.h b/lib/h264.h index 28cc693b..48fcfcc9 100644 --- a/lib/h264.h +++ b/lib/h264.h @@ -12,6 +12,8 @@ namespace h264 { unsigned int width; unsigned int height; double fps; + uint8_t profile; + uint8_t level; }; ///Class for analyzing generic nal units @@ -50,7 +52,8 @@ namespace h264 { class sequenceParameterSet { public: - sequenceParameterSet(const char * _data, unsigned long _dataLen); + sequenceParameterSet(const char * _data = NULL, unsigned long _dataLen = 0); + void fromDTSCInit(const std::string & dtscInit); SPSMeta getCharacteristics() const; private: const char * data; diff --git a/lib/http_parser.cpp b/lib/http_parser.cpp index 33b9cb53..d7efe6a6 100644 --- a/lib/http_parser.cpp +++ b/lib/http_parser.cpp @@ -181,13 +181,11 @@ void HTTP::Parser::StartResponse(HTTP::Parser & request, Socket::Connection & co StartResponse("200", "OK", request, conn, bufferAllChunks); } -/// After receiving a header with this object, this function call will: -/// - Forward the headers to the 'to' Socket::Connection. +/// After receiving a header with this object, and after a call with SendResponse/SendRequest with this object, this function call will: /// - Retrieve all the body from the 'from' Socket::Connection. /// - Forward those contents as-is to the 'to' Socket::Connection. /// It blocks until completed or either of the connections reaches an error state. void HTTP::Parser::Proxy(Socket::Connection & from, Socket::Connection & to) { - SendResponse(url, method, to); if (getChunks) { unsigned int proxyingChunk = 0; while (to.connected() && from.connected()) { @@ -315,6 +313,20 @@ std::string HTTP::Parser::GetVar(std::string i) { return vars[i]; } +std::string HTTP::Parser::allVars(){ + std::string ret; + if (!vars.size()){return ret;} + for (std::map::iterator it = vars.begin(); it != vars.end(); ++it){ + if (ret.size() > 1){ + ret += "&"; + }else{ + ret += "?"; + } + ret += it->first + "=" + Encodings::URL::encode(it->second); + } + return ret; +} + /// Sets header i to string value v. void HTTP::Parser::SetHeader(std::string i, std::string v) { Trim(i); diff --git a/lib/http_parser.h b/lib/http_parser.h index c728565c..4e304038 100644 --- a/lib/http_parser.h +++ b/lib/http_parser.h @@ -19,6 +19,7 @@ namespace HTTP { std::string GetHeader(std::string i); std::string GetVar(std::string i); std::string getUrl(); + std::string allVars(); void SetHeader(std::string i, std::string v); void SetHeader(std::string i, long long v); void setCORSHeaders(); @@ -44,11 +45,13 @@ namespace HTTP { unsigned int length; bool headerOnly; ///< If true, do not parse body if the length is a known size. bool bufferChunks; + //this bool was private + bool sendingChunks; + private: bool seenHeaders; bool seenReq; bool getChunks; - bool sendingChunks; unsigned int doingChunk; bool parse(std::string & HTTPbuffer); void parseVars(std::string data); diff --git a/lib/mp4.cpp b/lib/mp4.cpp index 307bac7d..64c4a039 100644 --- a/lib/mp4.cpp +++ b/lib/mp4.cpp @@ -104,6 +104,7 @@ namespace MP4 { } } else if (size == 0) { fseek(newData, 0, SEEK_END); + return true; } DONTEVEN_MSG("skipping size 0x%.8lX", size); if (fseek(newData, pos + size, SEEK_SET) == 0) { @@ -132,6 +133,10 @@ namespace MP4 { return false; } } + if (size == 0){//no else if, because the extended size MAY be 0... + fseek(newData, 0, SEEK_END); + size = ftell(newData) - pos; + } fseek(newData, pos, SEEK_SET); data = (char *)realloc(data, size); data_size = size; @@ -160,6 +165,9 @@ namespace MP4 { return false; } } + if (size == 0){ + size = newData.size(); + } if (newData.size() >= size) { data = (char *)realloc(data, size); data_size = size; @@ -383,9 +391,10 @@ namespace MP4 { default: break; } - std::string retval = std::string(indent, ' ') + "Unimplemented pretty-printing for box " + std::string(data + 4, 4) + "\n"; + std::stringstream retval; + retval << std::string(indent, ' ') << "Unimplemented pretty-printing for box " << std::string(data + 4, 4) << " (" << ntohl(((int*)data)[0]) << ")\n"; /// \todo Implement hexdump for unimplemented boxes? - return retval; + return retval.str(); } /// Sets the 8 bits integer at the given index. @@ -807,4 +816,5 @@ namespace MP4 { } return r.str(); } + } diff --git a/lib/mp4_generic.cpp b/lib/mp4_generic.cpp index d4612b26..0c9c8591 100644 --- a/lib/mp4_generic.cpp +++ b/lib/mp4_generic.cpp @@ -1091,10 +1091,15 @@ namespace MP4 { return; } } + memset(data + payloadOffset + 4, 0, 4); memcpy(data + payloadOffset + 4, newMinorVersion, 4); } std::string FTYP::getMinorVersion() { + static char zero[4] = {0,0,0,0}; + if (memcmp(zero, data+payloadOffset+4, 4) == 0){ + return ""; + } return std::string(data + payloadOffset + 4, 4); } @@ -2335,7 +2340,8 @@ namespace MP4 { setEntryCount(no + 1); } setInt32(newCTTSEntry.sampleCount, 8 + no * 8); - setInt32(newCTTSEntry.sampleOffset, 8 + (no * 8) + 4); + setInt32(*(reinterpret_cast(&newCTTSEntry.sampleOffset)), 8 + (no * 8) + 4); + } CTTSEntry CTTS::getCTTSEntry(uint32_t no) { @@ -2345,7 +2351,8 @@ namespace MP4 { return inval; } retval.sampleCount = getInt32(8 + (no * 8)); - retval.sampleOffset = getInt32(8 + (no * 8) + 4); + uint32_t tmp = getInt32(8 + (no * 8) + 4); + retval.sampleOffset = *(reinterpret_cast(&tmp)); return retval; } diff --git a/lib/mp4_generic.h b/lib/mp4_generic.h index ad66fcf6..1f760f5e 100644 --- a/lib/mp4_generic.h +++ b/lib/mp4_generic.h @@ -486,7 +486,7 @@ namespace MP4 { struct CTTSEntry { uint32_t sampleCount; - uint32_t sampleOffset; + int32_t sampleOffset; }; class CTTS: public fullBox { @@ -714,3 +714,4 @@ namespace MP4 { std::string toPrettyString(uint32_t indent = 0); }; } + diff --git a/lib/mp4_ms.cpp b/lib/mp4_ms.cpp index 47424428..fa8e61b2 100644 --- a/lib/mp4_ms.cpp +++ b/lib/mp4_ms.cpp @@ -127,6 +127,9 @@ namespace MP4 { if (UUID == "d4807ef2-ca39-4695-8e54-26cb9e46a79f") { return ((UUID_TrackFragmentReference *)this)->toPrettyString(indent); } + if (UUID == "6d1d9b05-42d5-44e6-80e2-141daff757b2") { + return ((UUID_TFXD *)this)->toPrettyString(indent); + } std::stringstream r; r << std::string(indent, ' ') << "[uuid] Extension box (" << boxedSize() << ")" << std::endl; r << std::string(indent + 1, ' ') << "UUID: " << UUID << std::endl; @@ -205,4 +208,68 @@ namespace MP4 { } return r.str(); } + + UUID_TFXD::UUID_TFXD() { + setUUID((std::string)"6d1d9b05-42d5-44e6-80e2-141daff757b2"); + setVersion(0); + setFlags(0); + } + + void UUID_TFXD::setVersion(uint32_t newVersion) { + setInt8(newVersion, 16); + } + + uint32_t UUID_TFXD::getVersion() { + return getInt8(16); + } + + void UUID_TFXD::setFlags(uint32_t newFlags) { + setInt24(newFlags, 17); + } + + uint32_t UUID_TFXD::getFlags() { + return getInt24(17); + } + + void UUID_TFXD::setTime(uint64_t newTime) { + if (getVersion() == 0) { + setInt32(newTime, 20); + } else { + setInt64(newTime, 20); + } + } + + uint64_t UUID_TFXD::getTime() { + if (getVersion() == 0) { + return getInt32(20); + } else { + return getInt64(20); + } + } + + void UUID_TFXD::setDuration(uint64_t newDuration) { + if (getVersion() == 0) { + setInt32(newDuration, 24); + } else { + setInt64(newDuration, 28); + } + } + + uint64_t UUID_TFXD::getDuration() { + if (getVersion() == 0) { + return getInt32(24); + } else { + return getInt64(28); + } + } + + std::string UUID_TFXD::toPrettyString(uint32_t indent) { + std::stringstream r; + setUUID((std::string)"6d1d9b05-42d5-44e6-80e2-141daff757b2"); + r << std::string(indent, ' ') << "[6d1d9b05-42d5-44e6-80e2-141daff757b2] TFXD Box (" << boxedSize() << ")" << std::endl; + r << std::string(indent + 1, ' ') << "Version: " << getVersion() << std::endl; + r << std::string(indent + 1, ' ') << "Time = " << getTime() << std::endl; + r << std::string(indent + 1, ' ') << "Duration = " << getDuration() << std::endl; + return r.str(); + } } diff --git a/lib/mp4_ms.h b/lib/mp4_ms.h index 882f01dc..e00b1340 100644 --- a/lib/mp4_ms.h +++ b/lib/mp4_ms.h @@ -37,4 +37,17 @@ namespace MP4 { std::string toPrettyString(uint32_t indent = 0); }; + class UUID_TFXD: public UUID { + public: + UUID_TFXD(); + void setVersion(uint32_t newVersion); + uint32_t getVersion(); + void setFlags(uint32_t newFlags); + uint32_t getFlags(); + void setTime(uint64_t newTime); + uint64_t getTime(); + void setDuration(uint64_t newDuration); + uint64_t getDuration(); + std::string toPrettyString(uint32_t indent = 0); + }; } diff --git a/lib/procs.cpp b/lib/procs.cpp index 9d51b3d4..ba984b10 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -22,13 +22,31 @@ #include "timing.h" std::set Util::Procs::plist; +std::set Util::Procs::socketList; bool Util::Procs::handler_set = false; +bool Util::Procs::thread_handler = false; +tthread::mutex Util::Procs::plistMutex; +tthread::thread * Util::Procs::reaper_thread = 0; - -static bool childRunning(pid_t p) { - pid_t ret = waitpid(p, 0, WNOHANG); +/// Local-only function. Attempts to reap child and returns current running status. +bool Util::Procs::childRunning(pid_t p) { + int status; + pid_t ret = waitpid(p, &status, WNOHANG); if (ret == p) { + tthread::lock_guard guard(plistMutex); + int exitcode = -1; + if (WIFEXITED(status)) { + exitcode = WEXITSTATUS(status); + } else if (WIFSIGNALED(status)) { + exitcode = -WTERMSIG(status); + } + if (plist.count(ret)) { + HIGH_MSG("Process %d fully terminated with code %d", ret, exitcode); + plist.erase(ret); + } else { + HIGH_MSG("Child process %d exited with code %d", ret, exitcode); + } return false; } if (ret < 0 && errno == EINTR) { @@ -48,7 +66,17 @@ bool Util::Procs::isRunning(pid_t pid){ /// all remaining children. Waits one more second for cleanup to finish, then exits. void Util::Procs::exit_handler() { int waiting = 0; - std::set listcopy = plist; + std::set listcopy; + { + tthread::lock_guard guard(plistMutex); + listcopy = plist; + thread_handler = false; + } + if (reaper_thread){ + reaper_thread->join(); + delete reaper_thread; + reaper_thread = 0; + } std::set::iterator it; if (listcopy.empty()) { return; @@ -62,7 +90,7 @@ void Util::Procs::exit_handler() { break; } if (!listcopy.empty()) { - Util::sleep(20); + Util::wait(20); ++waiting; } } @@ -71,7 +99,7 @@ void Util::Procs::exit_handler() { return; } - DEBUG_MSG(DLVL_DEVEL, "Sending SIGINT to remaining %d children", (int)listcopy.size()); + WARN_MSG("Sending SIGINT to remaining %d children", (int)listcopy.size()); //send sigint to all remaining if (!listcopy.empty()) { for (it = listcopy.begin(); it != listcopy.end(); it++) { @@ -80,7 +108,7 @@ void Util::Procs::exit_handler() { } } - DEBUG_MSG(DLVL_DEVEL, "Waiting up to 5 seconds for %d children to terminate.", (int)listcopy.size()); + INFO_MSG("Waiting up to 5 seconds for %d children to terminate.", (int)listcopy.size()); waiting = 0; //wait up to 5 seconds for applications to shut down while (!listcopy.empty() && waiting <= 250) { @@ -90,7 +118,7 @@ void Util::Procs::exit_handler() { break; } if (!listcopy.empty()) { - Util::sleep(20); + Util::wait(20); ++waiting; } } @@ -99,7 +127,7 @@ void Util::Procs::exit_handler() { return; } - DEBUG_MSG(DLVL_DEVEL, "Sending SIGKILL to remaining %d children", (int)listcopy.size()); + ERROR_MSG("Sending SIGKILL to remaining %d children", (int)listcopy.size()); //send sigkill to all remaining if (!listcopy.empty()) { for (it = listcopy.begin(); it != listcopy.end(); it++) { @@ -108,7 +136,7 @@ void Util::Procs::exit_handler() { } } - DEBUG_MSG(DLVL_DEVEL, "Waiting up to a second for %d children to terminate.", (int)listcopy.size()); + INFO_MSG("Waiting up to a second for %d children to terminate.", (int)listcopy.size()); waiting = 0; //wait up to 1 second for applications to shut down while (!listcopy.empty() && waiting <= 50) { @@ -118,7 +146,7 @@ void Util::Procs::exit_handler() { break; } if (!listcopy.empty()) { - Util::sleep(20); + Util::wait(20); ++waiting; } } @@ -126,14 +154,17 @@ void Util::Procs::exit_handler() { if (listcopy.empty()) { return; } - DEBUG_MSG(DLVL_DEVEL, "Giving up with %d children left.", (int)listcopy.size()); - + FAIL_MSG("Giving up with %d children left.", (int)listcopy.size()); } /// Sets up exit and childsig handlers. +/// Spawns grim_reaper. exit handler despawns grim_reaper /// Called by every Start* function. void Util::Procs::setHandler() { + tthread::lock_guard guard(plistMutex); if (!handler_set) { + thread_handler = true; + reaper_thread = new tthread::thread(grim_reaper, 0); struct sigaction new_action; new_action.sa_handler = childsig_handler; sigemptyset(&new_action.sa_mask); @@ -144,19 +175,21 @@ void Util::Procs::setHandler() { } } - -/// Used internally to capture child signals and update plist. -void Util::Procs::childsig_handler(int signum) { - if (signum != SIGCHLD) { - return; - } +///Thread that loops until thread_handler is false. +///Reaps available children and then sleeps for a second. +///Not done in signal handler so we can use a mutex to prevent race conditions. +void Util::Procs::grim_reaper(void * n){ + VERYHIGH_MSG("Grim reaper start"); + while (thread_handler){ + { + tthread::lock_guard guard(plistMutex); int status; pid_t ret = -1; while (ret != 0) { ret = waitpid(-1, &status, WNOHANG); if (ret <= 0) { //ignore, would block otherwise if (ret == 0 || errno != EINTR) { - return; + break; } continue; } @@ -166,20 +199,25 @@ void Util::Procs::childsig_handler(int signum) { } else if (WIFSIGNALED(status)) { exitcode = -WTERMSIG(status); } else { // not possible - return; + break; } - + if (plist.count(ret)) { + HIGH_MSG("Process %d fully terminated with code %d", ret, exitcode); plist.erase(ret); -#if DEBUG >= DLVL_HIGH - if (!isActive(pname)) { - DEBUG_MSG(DLVL_HIGH, "Process %d fully terminated", ret); } else { - DEBUG_MSG(DLVL_HIGH, "Child process %d exited", ret); + HIGH_MSG("Child process %d exited with code %d", ret, exitcode); } -#endif - } } + Util::sleep(500); + } + VERYHIGH_MSG("Grim reaper stop"); +} + +/// Ignores everything. Separate thread handles waiting for children. +void Util::Procs::childsig_handler(int signum) { + return; +} /// Runs the given command and returns the stdout output as a string. @@ -187,7 +225,7 @@ std::string Util::Procs::getOutputOf(char * const * argv) { std::string ret; int fin = 0, fout = -1, ferr = 0; pid_t myProc = StartPiped(argv, &fin, &fout, &ferr); - while (isActive(myProc)) { + while (childRunning(myProc)) { Util::sleep(100); } FILE * outFile = fdopen(fout, "r"); @@ -201,6 +239,31 @@ std::string Util::Procs::getOutputOf(char * const * argv) { return ret; } + +///This function prepares a deque for getOutputOf and automatically inserts a NULL at the end of the char* const* +char* const* Util::Procs::dequeToArgv(std::deque & argDeq){ + char** ret = (char**)malloc((argDeq.size()+1)*sizeof(char*)); + for (int i = 0; i & argDeq){ + std::string ret; + char* const* argv = dequeToArgv(argDeq);//Note: Do not edit deque before executing command + ret = getOutputOf(argv); + return ret; +} + +pid_t Util::Procs::StartPiped(std::deque & argDeq, int * fdin, int * fdout, int * fderr) { + pid_t ret; + char* const* argv = dequeToArgv(argDeq);//Note: Do not edit deque before executing command + ret = Util::Procs::StartPiped(argv, fdin, fdout, fderr); + return ret; +} + /// Starts a new process with given fds if the name is not already active. /// \return 0 if process was not started, process PID otherwise. /// \arg argv Command for this process. @@ -258,6 +321,10 @@ pid_t Util::Procs::StartPiped(char * const * argv, int * fdin, int * fdout, int } pid = fork(); if (pid == 0) { //child + //Close all sockets in the socketList + for (std::set::iterator it = Util::Procs::socketList.begin(); it != Util::Procs::socketList.end(); ++it){ + close(*it); + } if (!fdin) { dup2(devnull, STDIN_FILENO); } else if (*fdin == -1) { @@ -319,7 +386,10 @@ pid_t Util::Procs::StartPiped(char * const * argv, int * fdin, int * fdout, int } return 0; } else { //parent + { + tthread::lock_guard guard(plistMutex); plist.insert(pid); + } DEBUG_MSG(DLVL_HIGH, "Piped process %s started, PID %d", argv[0], pid); if (devnull != -1) { close(devnull); @@ -354,7 +424,11 @@ void Util::Procs::Murder(pid_t name) { /// (Attempts to) stop all running child processes. void Util::Procs::StopAll() { - std::set listcopy = plist; + std::set listcopy; + { + tthread::lock_guard guard(plistMutex); + listcopy = plist; + } std::set::iterator it; for (it = listcopy.begin(); it != listcopy.end(); it++) { Stop(*it); @@ -363,11 +437,13 @@ void Util::Procs::StopAll() { /// Returns the number of active child processes. int Util::Procs::Count() { + tthread::lock_guard guard(plistMutex); return plist.size(); } /// Returns true if a process with this PID is currently active. bool Util::Procs::isActive(pid_t name) { + tthread::lock_guard guard(plistMutex); return (plist.count(name) == 1) && (kill(name, 0) == 0); } diff --git a/lib/procs.h b/lib/procs.h index 4a06c5f0..b74ed258 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include "tinythread.h" /// Contains utility code, not directly related to streaming media namespace Util { @@ -13,21 +15,30 @@ namespace Util { /// Deals with spawning, monitoring and stopping child processes class Procs { private: - static std::set plist; ///< Holds active processes + static bool childRunning(pid_t p); + static tthread::mutex plistMutex; + static tthread::thread * reaper_thread; + static std::set plist; ///< Holds active process list. static bool handler_set; ///< If true, the sigchld handler has been setup. + static bool thread_handler;///< True while thread handler should be running. static void childsig_handler(int signum); static void exit_handler(); static void runCmd(std::string & cmd); static void setHandler(); + static char* const* dequeToArgv(std::deque & argDeq); + static void grim_reaper(void * n); public: static std::string getOutputOf(char * const * argv); + static std::string getOutputOf(std::deque & argDeq); static pid_t StartPiped(char * const * argv, int * fdin, int * fdout, int * fderr); + static pid_t StartPiped(std::deque & argDeq, int * fdin, int * fdout, int * fderr); static void Stop(pid_t name); static void Murder(pid_t name); static void StopAll(); static int Count(); static bool isActive(pid_t name); static bool isRunning(pid_t pid); + static std::set socketList; ///< Holds sockets that should be closed before forking }; - } + diff --git a/lib/rtmpchunks.cpp b/lib/rtmpchunks.cpp index 16db90e5..25b2795b 100644 --- a/lib/rtmpchunks.cpp +++ b/lib/rtmpchunks.cpp @@ -7,10 +7,6 @@ #include "timing.h" #include "auth.h" -#ifndef FILLER_DATA -#define FILLER_DATA "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Praesent commodo vulputate urna eu commodo. Cras tempor velit nec nulla placerat volutpat. Proin eleifend blandit quam sit amet suscipit. Pellentesque vitae tristique lorem. Maecenas facilisis consequat neque, vitae iaculis eros vulputate ut. Suspendisse ut arcu non eros vestibulum pulvinar id sed erat. Nam dictum tellus vel tellus rhoncus ut mollis tellus fermentum. Fusce volutpat consectetur ante, in mollis nisi euismod vulputate. Curabitur vitae facilisis ligula. Sed sed gravida dolor. Integer eu eros a dolor lobortis ullamcorper. Mauris interdum elit non neque interdum dictum. Suspendisse imperdiet eros sed sapien cursus pulvinar. Vestibulum ut dolor lectus, id commodo elit. Cras convallis varius leo eu porta. Duis luctus sapien nec dui adipiscing quis interdum nunc congue. Morbi pharetra aliquet mauris vitae tristique. Etiam feugiat sapien quis augue elementum id ultricies magna vulputate. Phasellus luctus, leo id egestas consequat, eros tortor commodo neque, vitae hendrerit nunc sem ut odio." -#endif - std::string RTMPStream::handshake_in; ///< Input for the handshake. std::string RTMPStream::handshake_out; ///< Output for the handshake. @@ -205,7 +201,7 @@ RTMPStream::Chunk::Chunk() { std::string & RTMPStream::SendChunk(unsigned int cs_id, unsigned char msg_type_id, unsigned int msg_stream_id, std::string data) { static RTMPStream::Chunk ch; ch.cs_id = cs_id; - ch.timestamp = Util::getMS(); + ch.timestamp = 0; ch.len = data.size(); ch.real_len = data.size(); ch.len_left = 0; @@ -458,16 +454,16 @@ bool RTMPStream::Chunk::Parse(Socket::Buffer & buffer) { DEBUG_MSG(DLVL_DONTEVEN, "Parsing RTMP chunk result: len_left=%d, real_len=%d", len_left, real_len); - //read extended timestamp, if neccesary - if (ts_header == 0x00ffffff) { + //read extended timestamp, if necessary + if (ts_header == 0x00ffffff && headertype != 0xC0) { if (!buffer.available(i + 4)) { return false; } //can't read timestamp indata = buffer.copy(i + 4); - timestamp += indata[i++ ]; + timestamp = indata[i++ ]; timestamp += indata[i++ ] * 256; timestamp += indata[i++ ] * 256 * 256; - timestamp = indata[i++ ] * 256 * 256 * 256; + timestamp += indata[i++ ] * 256 * 256 * 256; ts_delta = timestamp; } diff --git a/lib/rtmpchunks.h b/lib/rtmpchunks.h index 88080d74..b5de6d3b 100644 --- a/lib/rtmpchunks.h +++ b/lib/rtmpchunks.h @@ -10,6 +10,10 @@ #include #include "socket.h" +#ifndef FILLER_DATA +#define FILLER_DATA "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Praesent commodo vulputate urna eu commodo. Cras tempor velit nec nulla placerat volutpat. Proin eleifend blandit quam sit amet suscipit. Pellentesque vitae tristique lorem. Maecenas facilisis consequat neque, vitae iaculis eros vulputate ut. Suspendisse ut arcu non eros vestibulum pulvinar id sed erat. Nam dictum tellus vel tellus rhoncus ut mollis tellus fermentum. Fusce volutpat consectetur ante, in mollis nisi euismod vulputate. Curabitur vitae facilisis ligula. Sed sed gravida dolor. Integer eu eros a dolor lobortis ullamcorper. Mauris interdum elit non neque interdum dictum. Suspendisse imperdiet eros sed sapien cursus pulvinar. Vestibulum ut dolor lectus, id commodo elit. Cras convallis varius leo eu porta. Duis luctus sapien nec dui adipiscing quis interdum nunc congue. Morbi pharetra aliquet mauris vitae tristique. Etiam feugiat sapien quis augue elementum id ultricies magna vulputate. Phasellus luctus, leo id egestas consequat, eros tortor commodo neque, vitae hendrerit nunc sem ut odio." +#endif + //forward declaration of FLV::Tag to avoid circular dependencies. namespace FLV { class Tag; diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index d23fd443..78ad91b0 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -95,13 +95,13 @@ namespace IPC { ///\param oflag The flags with which to open the semaphore ///\param mode The mode in which to create the semaphore, if O_CREAT is given in oflag, ignored otherwise ///\param value The initial value of the semaphore if O_CREAT is given in oflag, ignored otherwise - semaphore::semaphore(const char * name, int oflag, mode_t mode, unsigned int value) { + semaphore::semaphore(const char * name, int oflag, mode_t mode, unsigned int value, bool noWait) { #if defined(__CYGWIN__) || defined(_WIN32) mySem = 0; #else mySem = SEM_FAILED; #endif - open(name, oflag, mode, value); + open(name, oflag, mode, value, noWait); } ///\brief The deconstructor @@ -126,13 +126,13 @@ namespace IPC { ///\param oflag The flags with which to open the semaphore ///\param mode The mode in which to create the semaphore, if O_CREAT is given in oflag, ignored otherwise ///\param value The initial value of the semaphore if O_CREAT is given in oflag, ignored otherwise - void semaphore::open(const char * name, int oflag, mode_t mode, unsigned int value) { + void semaphore::open(const char * name, int oflag, mode_t mode, unsigned int value, bool noWait) { close(); int timer = 0; while (!(*this) && timer++ < 10) { #if defined(__CYGWIN__) || defined(_WIN32) std::string semaName = "Global\\"; - semaName += name; + semaName += (name+1); if (oflag & O_CREAT) { if (oflag & O_EXCL) { //attempt opening, if succes, close handle and return false; @@ -165,7 +165,7 @@ namespace IPC { mySem = sem_open(name, oflag); } if (!(*this)) { - if (errno == ENOENT) { + if (errno == ENOENT && !noWait) { Util::wait(500); } else { break; @@ -405,7 +405,7 @@ namespace IPC { int i = 0; do { if (i != 0) { - Util::sleep(1000); + Util::wait(1000); } handle = OpenFileMappingA(FILE_MAP_ALL_ACCESS, FALSE, name.c_str()); i++; @@ -438,14 +438,14 @@ namespace IPC { int i = 0; while (i < 10 && handle == -1 && autoBackoff) { i++; - Util::sleep(1000); + Util::wait(1000); handle = shm_open(name.c_str(), O_RDWR, ACCESSPERMS); } } } if (handle == -1) { if (!master_ && autoBackoff) { - FAIL_MSG("shm_open for page %s failed: %s", name.c_str(), strerror(errno)); + HIGH_MSG("shm_open for page %s failed: %s", name.c_str(), strerror(errno)); } return; } @@ -558,7 +558,7 @@ namespace IPC { int i = 0; while (i < 10 && handle == -1 && autoBackoff) { i++; - Util::sleep(1000); + Util::wait(1000); handle = open(std::string(Util::getTmpFolder() + name).c_str(), O_RDWR, (mode_t)0600); } } @@ -676,7 +676,7 @@ namespace IPC { if (splitChar != std::string::npos) { name[splitChar] = '+'; } - memcpy(data + 48, name.c_str(), std::min((int)name.size(), 100)); + snprintf(data+48, 100, "%s", name.c_str()); } ///\brief Gets the name of the stream this user is viewing @@ -686,7 +686,7 @@ namespace IPC { ///\brief Sets the name of the connector through which this user is viewing void statExchange::connector(std::string name) { - memcpy(data + 148, name.c_str(), std::min((int)name.size(), 20)); + snprintf(data+148, 20, "%s", name.c_str()); } ///\brief Gets the name of the connector through which this user is viewing @@ -706,6 +706,16 @@ namespace IPC { return result; } + ///\brief Sets checksum field + void statExchange::setSync(char s) { + data[172] = s; + } + + ///\brief Gets checksum field + char statExchange::getSync() { + return data[172]; + } + ///\brief Creates a semaphore guard, locks the semaphore on call semGuard::semGuard(semaphore * thisSemaphore) : mySemaphore(thisSemaphore) { mySemaphore->wait(); @@ -762,6 +772,7 @@ namespace IPC { ///\brief The deconstructor sharedServer::~sharedServer() { + finishEach(); mySemaphore.close(); mySemaphore.unlink(); } @@ -818,6 +829,62 @@ namespace IPC { return false; } + ///Disconnect all connected users + void sharedServer::finishEach(){ + if (!hasCounter){ + return; + } + for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { + if (!it->mapped || !it->len) { + break; + } + unsigned int offset = 0; + while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { + it->mapped[offset] = 126; + offset += payLen + (hasCounter ? 1 : 0); + } + } + } + + ///Returns a pointer to the data for the given index. + ///Returns null on error or if index is empty. + char * sharedServer::getIndex(unsigned int requestId){ + char * empty = 0; + if (!hasCounter) { + empty = (char *)malloc(payLen * sizeof(char)); + memset(empty, 0, payLen); + } + semGuard tmpGuard(&mySemaphore); + unsigned int id = 0; + for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { + if (!it->mapped || !it->len) { + DEBUG_MSG(DLVL_FAIL, "Something went terribly wrong?"); + return 0; + } + unsigned int offset = 0; + while (offset + payLen + (hasCounter ? 1 : 0) <= it->len) { + if (id == requestId){ + if (hasCounter) { + if (it->mapped[offset] != 0) { + return it->mapped + offset + 1; + }else{ + return 0; + } + } else { + if (memcmp(empty, it->mapped + offset, payLen)) { + return it->mapped + offset; + }else{ + return 0; + } + } + } + offset += payLen + (hasCounter ? 1 : 0); + id ++; + } + } + return 0; + } + ///\brief Parse each of the possible payload pieces, and runs a callback on it if in use. void sharedServer::parseEach(void (*callback)(char * data, size_t len, unsigned int id)) { char * empty = 0; @@ -829,6 +896,7 @@ namespace IPC { unsigned int id = 0; unsigned int userCount = 0; unsigned int emptyCount = 0; + connectedUsers = 0; for (std::set::iterator it = myPages.begin(); it != myPages.end(); it++) { if (!it->mapped || !it->len) { DEBUG_MSG(DLVL_FAIL, "Something went terribly wrong?"); @@ -842,28 +910,25 @@ namespace IPC { char * counter = it->mapped + offset; //increase the count if needed ++userCount; + if (*counter & 0x80){ + connectedUsers++; + } if (id >= amount) { amount = id + 1; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } - unsigned short tmpPID = *((unsigned short *)(it->mapped + 1 + offset + payLen - 2)); - if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127 || *counter == 254 || *counter == 255)) { - WARN_MSG("process disappeared, timing out. (pid %d)", tmpPID); + uint32_t tmpPID = *((uint32_t *)(it->mapped + 1 + offset + payLen - 4)); + if (!Util::Procs::isRunning(tmpPID) && !(*counter == 126 || *counter == 127)){ + WARN_MSG("process disappeared, timing out. (pid %lu)", tmpPID); *counter = 126; //if process is already dead, instant timeout. } callback(it->mapped + offset + 1, payLen, id); switch (*counter) { case 127: - DEBUG_MSG(DLVL_HIGH, "Client %u requested disconnect", id); + HIGH_MSG("Client %u requested disconnect", id); break; case 126: - DEBUG_MSG(DLVL_WARN, "Client %u timed out", id); - break; - case 255: - DEBUG_MSG(DLVL_HIGH, "Client %u disconnected on request", id); - break; - case 254: - DEBUG_MSG(DLVL_WARN, "Client %u disconnect timed out", id); + HIGH_MSG("Client %u timed out", id); break; default: #ifndef NOCRASHCHECK @@ -883,7 +948,7 @@ namespace IPC { #endif break; } - if (*counter == 127 || *counter == 126 || *counter == 255 || *counter == 254) { + if (*counter == 127 || *counter == 126){ memset(it->mapped + offset + 1, 0, payLen); it->mapped[offset] = 0; } else { @@ -895,7 +960,7 @@ namespace IPC { //bring the counter down if this was the last element if (id == amount - 1) { amount = id; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } //stop, we're guaranteed no more pages are full at this point break; @@ -907,7 +972,7 @@ namespace IPC { //increase the count if needed if (id >= amount) { amount = id + 1; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } callback(it->mapped + offset, payLen, id); } else { @@ -916,7 +981,7 @@ namespace IPC { //bring the counter down if this was the last element if (id == amount - 1) { amount = id; - DEBUG_MSG(DLVL_VERYHIGH, "Shared memory %s is now at count %u", baseName.c_str(), amount); + VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } //stop, we're guaranteed no more pages are full at this point if (empty) { @@ -952,12 +1017,14 @@ namespace IPC { hasCounter = 0; payLen = 0; offsetOnPage = 0; + countAsViewer= true; } ///\brief Copy constructor for sharedClients ///\param rhs The client ro copy sharedClient::sharedClient(const sharedClient & rhs) { + countAsViewer = rhs.countAsViewer; baseName = rhs.baseName; payLen = rhs.payLen; hasCounter = rhs.hasCounter; @@ -978,6 +1045,7 @@ namespace IPC { ///\brief Assignment operator void sharedClient::operator =(const sharedClient & rhs) { + countAsViewer = rhs.countAsViewer; baseName = rhs.baseName; payLen = rhs.payLen; hasCounter = rhs.hasCounter; @@ -1001,6 +1069,7 @@ namespace IPC { ///\param len The size of the payload to allocate ///\param withCounter Whether or not this payload has a counter sharedClient::sharedClient(std::string name, int len, bool withCounter) : baseName("/" + name), payLen(len), offsetOnPage(-1), hasCounter(withCounter) { + countAsViewer = true; #ifdef __APPLE__ //note: O_CREAT is only needed for mac, probably mySemaphore.open(baseName.c_str(), O_RDWR | O_CREAT, 0); @@ -1035,7 +1104,7 @@ namespace IPC { offsetOnPage = offset; if (hasCounter) { myPage.mapped[offset] = 1; - *((unsigned short *)(myPage.mapped + 1 + offset + len - 2)) = getpid(); + *((uint32_t *)(myPage.mapped + 1 + offset + len - 4)) = getpid(); } break; } @@ -1058,6 +1127,8 @@ namespace IPC { ///\brief The deconstructor sharedClient::~sharedClient() { mySemaphore.close(); + + } ///\brief Writes data to the shared data @@ -1079,7 +1150,7 @@ namespace IPC { } if (myPage.mapped) { semGuard tmpGuard(&mySemaphore); - myPage.mapped[offsetOnPage] = 127; + myPage.mapped[offsetOnPage] = 126; } } @@ -1089,13 +1160,20 @@ namespace IPC { DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element without counters"); return; } - if (myPage.mapped[offsetOnPage] < 128) { - myPage.mapped[offsetOnPage] = 1; + if ((myPage.mapped[offsetOnPage] & 0x7F) < 126) { + myPage.mapped[offsetOnPage] = (countAsViewer ? 0x81 : 0x01); } else { DEBUG_MSG(DLVL_WARN, "Trying to keep-alive an element that needs to timeout, ignoring"); } } + bool sharedClient::isAlive() { + if (!hasCounter) { + return true; + } + return (myPage.mapped[offsetOnPage] & 0x7F) < 126; + } + ///\brief Get a pointer to the data of this client char * sharedClient::getData() { if (!myPage.mapped) { @@ -1104,8 +1182,21 @@ namespace IPC { return (myPage.mapped + offsetOnPage + (hasCounter ? 1 : 0)); } + int sharedClient::getCounter() { + if (!hasCounter){ + return -1; + } + if (!myPage.mapped) { + return 0; + } + return *(myPage.mapped + offsetOnPage); + } + userConnection::userConnection(char * _data) { data = _data; + if (!data){ + WARN_MSG("userConnection created with null pointer!"); + } } unsigned long userConnection::getTrackId(size_t offset) const { diff --git a/lib/shared_memory.h b/lib/shared_memory.h index dd189670..bc8906f2 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -11,7 +11,7 @@ #include #endif -#define STAT_EX_SIZE 174 +#define STAT_EX_SIZE 177 #define PLAY_EX_SIZE 2+6*SIMUL_TRACKS namespace IPC { @@ -37,6 +37,8 @@ namespace IPC { void connector(std::string name); std::string connector(); void crc(unsigned int sum); + char getSync(); + void setSync(char s); unsigned int crc(); private: ///\brief The payload for the stat exchange @@ -49,6 +51,8 @@ namespace IPC { /// - 100 byte - streamName (name of the stream peer is viewing) /// - 20 byte - connector (name of the connector the peer is using) /// - 4 byte - CRC32 of user agent (or zero if none) + /// - 1 byte sync (was seen by controller yes/no) + /// - (implicit 4 bytes: PID) char * data; }; @@ -56,10 +60,10 @@ namespace IPC { class semaphore { public: semaphore(); - semaphore(const char * name, int oflag, mode_t mode, unsigned int value); + semaphore(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0, bool noWait = false); ~semaphore(); operator bool() const; - void open(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0); + void open(const char * name, int oflag, mode_t mode = 0, unsigned int value = 0, bool noWait = false); int getVal() const; void post(); void wait(); @@ -177,9 +181,11 @@ namespace IPC { void init(std::string name, int len, bool withCounter = false); ~sharedServer(); void parseEach(void (*callback)(char * data, size_t len, unsigned int id)); + char * getIndex(unsigned int id); operator bool() const; ///\brief The amount of connected clients unsigned int amount; + unsigned int connectedUsers; private: bool isInUse(unsigned int id); void newPage(); @@ -194,6 +200,7 @@ namespace IPC { semaphore mySemaphore; ///\brief Whether the payload has a counter, if so, it is added in front of the payload bool hasCounter; + void finishEach(); }; ///\brief The client part of a server/client model for shared memory. @@ -215,7 +222,10 @@ namespace IPC { void write(char * data, int len); void finish(); void keepAlive(); + bool isAlive(); char * getData(); + int getCounter(); + bool countAsViewer; private: ///\brief The basename of the shared pages. std::string baseName; diff --git a/lib/socket.cpp b/lib/socket.cpp index a8561962..dbad1fa2 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -510,11 +510,8 @@ unsigned int Socket::Connection::iwrite(const void * buffer, int len) { return 0; break; default: - if (errno != EPIPE && errno != ECONNRESET) { Error = true; - remotehost = strerror(errno); - DEBUG_MSG(DLVL_WARN, "Could not iwrite data! Error: %s", remotehost.c_str()); - } + INSANE_MSG("Could not iwrite data! Error: %s", strerror(errno)); close(); return 0; break; @@ -555,11 +552,8 @@ int Socket::Connection::iread(void * buffer, int len, int flags) { return 0; break; default: - if (errno != EPIPE) { Error = true; - remotehost = strerror(errno); - DEBUG_MSG(DLVL_WARN, "Could not iread data! Error: %s", remotehost.c_str()); - } + INSANE_MSG("Could not iread data! Error: %s", strerror(errno)); close(); return 0; break; diff --git a/lib/stream.cpp b/lib/stream.cpp index 27422962..bed76b43 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -73,10 +73,37 @@ void Util::sanitizeName(std::string & streamname) { } } +JSON::Value Util::getStreamConfig(std::string streamname){ + JSON::Value result; + if (streamname.size() > 100){ + FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); + return result; + } + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); + configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + + sanitizeName(streamname); + std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); + //check if smp (everything before + or space) exists + DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); + if (!stream_cfg){ + DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); + }else{ + result = stream_cfg.asJSON(); + } + configLock.post();//unlock the config semaphore + return result; +} + /// Checks if the given streamname has an active input serving it. Returns true if this is the case. /// Assumes the streamname has already been through sanitizeName()! bool Util::streamAlive(std::string & streamname){ - IPC::semaphore playerLock(std::string("/lock_" + streamname).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamname.c_str()); + IPC::semaphore playerLock(semName, O_RDWR, ACCESSPERMS, 1, true); + if (!playerLock){return false;} if (!playerLock.tryWait()) { playerLock.close(); return true; @@ -109,8 +136,8 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir } //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); //Lock the config to prevent race conditions and corruption issues while reading configLock.wait(); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); @@ -149,7 +176,21 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir input = inputs.getIndice(i); //if match voor current stream && priority is hoger dan wat we al hebben - if (curPrio < input.getMember("priority").asInt()){ + if (input.getMember("source_match") && curPrio < input.getMember("priority").asInt()){ + if (input.getMember("source_match").getSize()){ + for(unsigned int j = 0; j < input.getMember("source_match").getSize(); ++j){ + std::string source = input.getMember("source_match").getIndice(j).asString(); + std::string front = source.substr(0,source.find('*')); + std::string back = source.substr(source.find('*')+1); + MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str()); + + if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ + player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString(); + curPrio = input.getMember("priority").asInt(); + selected = true; + } + } + }else{ std::string source = input.getMember("source_match").asString(); std::string front = source.substr(0,source.find('*')); std::string back = source.substr(source.find('*')+1); @@ -160,6 +201,8 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir curPrio = input.getMember("priority").asInt(); selected = true; } + } + } } @@ -230,5 +273,11 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); _exit(42); } - return true; + + unsigned int waiting = 0; + while (!streamAlive(streamname) && ++waiting < 40){ + Util::wait(250); + } + return streamAlive(streamname); } + diff --git a/lib/stream.h b/lib/stream.h index 07c935ea..6b2e7759 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -4,10 +4,13 @@ #pragma once #include #include "socket.h" +#include "json.h" namespace Util { std::string getTmpFolder(); void sanitizeName(std::string & streamname); bool streamAlive(std::string & streamname); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true); + JSON::Value getStreamConfig(std::string streamname); } + diff --git a/lib/ts_packet.cpp b/lib/ts_packet.cpp index ac41247c..b127a35f 100644 --- a/lib/ts_packet.cpp +++ b/lib/ts_packet.cpp @@ -25,6 +25,11 @@ namespace TS { clear(); } + Packet::Packet(const Packet & rhs){ + memcpy(strBuf, rhs.strBuf, 188); + pos = 188; + } + /// This function fills a Packet from a file. /// It fills the content with the next 188 bytes int he file. /// \param Data The data to be read into the packet. @@ -34,11 +39,11 @@ namespace TS { if (!fread((void *)strBuf, 188, 1, data)) { return false; } - pos=188; if (strBuf[0] != 0x47){ HIGH_MSG("Failed to read a good packet on pos %lld", bPos); return false; } + pos=188; return true; } @@ -412,7 +417,7 @@ namespace TS { /// \return A character pointer to the internal packet buffer data const char * Packet::checkAndGetBuffer() const{ if (pos != 188) { - DEBUG_MSG(DLVL_ERROR, "Size invalid (%d) - invalid data from this point on", pos); + DEBUG_MSG(DLVL_HIGH, "Size invalid (%d) - invalid data from this point on", pos); } return strBuf; } @@ -564,6 +569,11 @@ namespace TS { } + ProgramAssociationTable & ProgramAssociationTable::operator = (const Packet & rhs){ + memcpy(strBuf, rhs.checkAndGetBuffer(), 188); + pos = 188; + return *this; + } ///Retrieves the current addStuffingoffset value for a PAT char ProgramAssociationTable::getOffset() const{ unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0); @@ -680,6 +690,10 @@ namespace TS { return data[0]; } + void ProgramMappingEntry::setStreamType(int newType){ + data[0] = newType; + } + std::string ProgramMappingEntry::getCodec() const{ switch (getStreamType()){ case 0x01: @@ -730,10 +744,25 @@ namespace TS { return ((data[1] << 8) | data[2]) & 0x1FFF; } + void ProgramMappingEntry::setElementaryPid(int newElementaryPid) { + data[1] = newElementaryPid >> 8 & 0x1F; + data[2] = newElementaryPid & 0xFF; + } + int ProgramMappingEntry::getESInfoLength() const{ return ((data[3] << 8) | data[4]) & 0x0FFF; } + const char * ProgramMappingEntry::getESInfo() const{ + return data + 5; + } + + void ProgramMappingEntry::setESInfo(const std::string & newInfo){ + data[3] = (newInfo.size() >> 8) & 0x0F; + data[4] = newInfo.size() & 0xFF; + memcpy(data + 5, newInfo.data(), newInfo.size()); + } + void ProgramMappingEntry::advance(){ if (!(*this)) { return; @@ -749,6 +778,12 @@ namespace TS { pos=4; } + ProgramMappingTable & ProgramMappingTable::operator = (const Packet & rhs) { + memcpy(strBuf, rhs.checkAndGetBuffer(), 188); + pos = 188; + return *this; + } + char ProgramMappingTable::getOffset() const{ unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0); return strBuf[loc]; @@ -868,14 +903,6 @@ namespace TS { strBuf[loc+1] = (char)newVal; } - short ProgramMappingTable::getProgramCount() const{ - return (getSectionLength() - 13) / 5; - } - - void ProgramMappingTable::setProgramCount(short newVal) { - setSectionLength(newVal * 5 + 13); - } - ProgramMappingEntry ProgramMappingTable::getEntry(int index) const{ int dataOffset = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset(); ProgramMappingEntry res((char*)(strBuf + dataOffset + 13 + getProgramInfoLength()), (char*)(strBuf + dataOffset + getSectionLength()) ); @@ -885,59 +912,6 @@ namespace TS { return res; } - char ProgramMappingTable::getStreamType(short index) const{ - if (index > getProgramCount()) { - return 0; - } - unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + 13 + getProgramInfoLength(); - return strBuf[loc + (index * 5)]; - } - - void ProgramMappingTable::setStreamType(char newVal, short index) { - if (index > getProgramCount()) { - return; - } - unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + 13 + getProgramInfoLength(); //TODO - updPos(loc+(index*5)+1); - strBuf[loc + (index * 5)] = newVal; - } - - short ProgramMappingTable::getElementaryPID(short index) const{ - if (index > getProgramCount()) { - return 0; - } - unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + 13 + getProgramInfoLength(); - return (((short)strBuf[loc + (index * 5) + 1] & 0x1F) << 8) | strBuf[loc + (index * 5) + 2]; - } - - void ProgramMappingTable::setElementaryPID(short newVal, short index) { - if (index > getProgramCount()) { - return; - } - unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + 13 + getProgramInfoLength(); - updPos(loc+(index*5)+3); - strBuf[loc + (index * 5)+1] = ((newVal >> 8) & 0x1F )| 0xE0; - strBuf[loc + (index * 5)+2] = (char)newVal; - } - - short ProgramMappingTable::getESInfoLength(short index) const{ - if (index > getProgramCount()) { - return 0; - } - unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + 13 + getProgramInfoLength(); - return (((short)strBuf[loc + (index * 5) + 3] & 0x0F) << 8) | strBuf[loc + (index * 5) + 4]; - } - - void ProgramMappingTable::setESInfoLength(short newVal, short index) { - if (index > getProgramCount()) { - return; - } - unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + 13 + getProgramInfoLength(); - updPos(loc+(index*5)+5); - strBuf[loc + (index * 5)+3] = ((newVal >> 8) & 0x0F) | 0xF0; - strBuf[loc + (index * 5)+4] = (char)newVal; - } - int ProgramMappingTable::getCRC() const{ unsigned int loc = 4 + (getAdaptationField() > 1 ? getAdaptationFieldLen() + 1 : 0) + getOffset() + getSectionLength(); return ((int)(strBuf[loc]) << 24) | ((int)(strBuf[loc + 1]) << 16) | ((int)(strBuf[loc + 2]) << 8) | strBuf[loc + 3]; @@ -995,7 +969,14 @@ namespace TS { PMT.setPID(4096); PMT.setTableId(2); //section length met 2 tracks: 0xB017 - PMT.setSectionLength(0xB00D + (selectedTracks.size() * 5)); + int sectionLen = 0; + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ + sectionLen += 5; + if (myMeta.tracks[*it].codec == "ID3"){ + sectionLen += myMeta.tracks[*it].init.size(); + } + } + PMT.setSectionLength(0xB00D + sectionLen); PMT.setProgramNumber(1); PMT.setVersionNumber(0); PMT.setCurrentNextIndicator(0); @@ -1012,20 +993,20 @@ namespace TS { if (vidTrack == -1){ vidTrack = *(selectedTracks.begin()); } - PMT.setPCRPID(0x100 + vidTrack - 1); + PMT.setPCRPID(vidTrack); PMT.setProgramInfoLength(0); short id = 0; + ProgramMappingEntry entry = PMT.getEntry(0); for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ + entry.setElementaryPid(*it); if (myMeta.tracks[*it].codec == "H264"){ - PMT.setStreamType(0x1B,id); + entry.setStreamType(0x1B); }else if (myMeta.tracks[*it].codec == "AAC"){ - PMT.setStreamType(0x0F,id); + entry.setStreamType(0x0F); }else if (myMeta.tracks[*it].codec == "MP3"){ - PMT.setStreamType(0x03,id); + entry.setStreamType(0x03); } - PMT.setElementaryPID(0x100 + (*it) - 1, id); - PMT.setESInfoLength(0,id); - id++; + entry.advance(); } PMT.calcCRC(); return PMT.checkAndGetBuffer(); diff --git a/lib/ts_packet.h b/lib/ts_packet.h index 7c0d370c..7c6e3b4f 100644 --- a/lib/ts_packet.h +++ b/lib/ts_packet.h @@ -22,6 +22,7 @@ namespace TS { public: //Constructors and fillers Packet(); + Packet(const Packet & rhs); ~Packet(); bool FromPointer(const char * data); bool FromFile(FILE * data); @@ -83,6 +84,7 @@ namespace TS { class ProgramAssociationTable : public Packet { public: + ProgramAssociationTable & operator = (const Packet & rhs); char getOffset() const; char getTableId() const; short getSectionLength() const; @@ -105,11 +107,14 @@ namespace TS { operator bool() const; int getStreamType() const; + void setStreamType(int newType); std::string getCodec() const; std::string getStreamTypeString() const; int getElementaryPid() const; + void setElementaryPid(int newElementaryPid); int getESInfoLength() const; - char * getESInfo() const; + const char * getESInfo() const; + void setESInfo(const std::string & newInfo); void advance(); private: char* data; @@ -119,6 +124,7 @@ namespace TS { class ProgramMappingTable : public Packet { public: ProgramMappingTable(); + ProgramMappingTable & operator = (const Packet & rhs); char getOffset() const; void setOffset(char newVal); char getTableId() const; @@ -139,15 +145,7 @@ namespace TS { void setPCRPID(short newVal); short getProgramInfoLength() const; void setProgramInfoLength(short newVal); - short getProgramCount() const; - void setProgramCount(short newVal); ProgramMappingEntry getEntry(int index) const; - void setStreamType(char newVal, short index); - char getStreamType(short index) const; - void setElementaryPID(short newVal, short index); - short getElementaryPID(short index) const; - void setESInfoLength(short newVal,short index); - short getESInfoLength(short index) const; int getCRC() const; void calcCRC(); std::string toPrettyString(size_t indent) const; diff --git a/lib/util.cpp b/lib/util.cpp new file mode 100644 index 00000000..2c44cf47 --- /dev/null +++ b/lib/util.cpp @@ -0,0 +1,40 @@ +#include "util.h" +#include + +namespace Util { + bool stringScan(const std::string & src, const std::string & pattern, std::deque & result){ + result.clear(); + std::deque positions; + size_t pos = pattern.find("%", 0); + while (pos != std::string::npos){ + positions.push_back(pos); + pos = pattern.find("%", pos + 1); + } + if (positions.size() == 0){ + return false; + } + size_t sourcePos = 0; + size_t patternPos = 0; + std::deque::iterator posIter = positions.begin(); + while (sourcePos != std::string::npos){ + //Match first part of the string + if (pattern.substr(patternPos, *posIter - patternPos) != src.substr(sourcePos, *posIter - patternPos)){ + break; + } + sourcePos += *posIter - patternPos; + std::deque::iterator nxtIter = posIter + 1; + if (nxtIter != positions.end()){ + patternPos = *posIter+2; + size_t tmpPos = src.find(pattern.substr(*posIter+2, *nxtIter - patternPos), sourcePos); + result.push_back(src.substr(sourcePos, tmpPos - sourcePos)); + sourcePos = tmpPos; + }else{ + result.push_back(src.substr(sourcePos)); + sourcePos = std::string::npos; + } + posIter++; + } + return result.size() == positions.size(); + } +} + diff --git a/lib/util.h b/lib/util.h new file mode 100644 index 00000000..df0a27b9 --- /dev/null +++ b/lib/util.h @@ -0,0 +1,6 @@ +#include +#include + +namespace Util { + bool stringScan(const std::string & src, const std::string & pattern, std::deque & result); +} diff --git a/lsp/mist.js b/lsp/mist.js index 29233b25..5cbfa602 100644 --- a/lsp/mist.js +++ b/lsp/mist.js @@ -1924,7 +1924,7 @@ var UI = { help: 'You can set the amount of debug information MistServer saves in the log. A full reboot of MistServer is required before some components of MistServer can post debug information.' },{ type: 'checkbox', - label: 'Force JSON file save', + label: 'Force configurations save', pointer: { main: mist.data, index: 'save' @@ -3464,7 +3464,7 @@ var UI = { },{ label: 'Target', type: 'str', - help: 'Where the stream will be pushed to.
Valid formats:
  • '+target_match.join('
  • ')+'
', + help: 'Where the stream will be pushed to.
Valid formats:
  • '+target_match.join('
  • ')+'
Valid text replacements: