From 98e3940079719e49feef42303d037b71ab10cba9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sun, 11 Nov 2018 03:59:23 +0100 Subject: [PATCH] Robustify accesses to server config --- lib/defines.h | 6 +- lib/stream.cpp | 125 +++++++++--------- lib/stream.h | 13 ++ lib/util.cpp | 8 ++ lib/util.h | 1 + src/controller/controller.cpp | 34 ++--- src/controller/controller_capabilities.cpp | 6 - src/controller/controller_statistics.cpp | 6 +- src/controller/controller_storage.cpp | 145 +++++++++++++++------ src/controller/controller_storage.h | 3 + src/controller/controller_streams.cpp | 2 + src/input/input.cpp | 10 +- src/input/input_buffer.cpp | 12 +- src/output/output_http.cpp | 112 ++++++++-------- src/output/output_http_internal.cpp | 103 +++++++-------- 15 files changed, 320 insertions(+), 266 deletions(-) diff --git a/lib/defines.h b/lib/defines.h index 0488017f..bf497575 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -126,6 +126,7 @@ static inline void show_stackframe(){} #define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name #define SHM_STREAM_STATE "MstSTATE%s" //%s stream name +#define SHM_STREAM_CONF "MstSCnf%s" //%s stream name #define STRMSTAT_OFF 0 #define STRMSTAT_INIT 1 #define STRMSTAT_BOOT 2 @@ -142,9 +143,10 @@ static inline void show_stackframe(){} #define SHM_TRIGGER "MstTRGR%s" //%s trigger name #define SEM_LIVE "/MstLIVE%s" //%s stream name #define SEM_INPUT "/MstInpt%s" //%s stream name -#define SEM_CONF "/MstConfLock" #define SEM_SESSCACHE "/MstSessCacheLock" -#define SHM_CONF "MstConf" +#define SHM_CAPA "MstCapa" +#define SHM_PROTO "MstProt" +#define SHM_PROXY "MstProx" #define SHM_STATE_LOGS "MstStateLogs" #define SHM_STATE_ACCS "MstStateAccs" #define SHM_STATE_STREAMS "MstStateStreams" diff --git a/lib/stream.cpp b/lib/stream.cpp index 2beeb683..caf3cad2 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -129,20 +129,17 @@ JSON::Value Util::getStreamConfig(const std::string & streamname){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return result; } - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); - //check if smp (everything before + or space) exists - DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); + + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, smp.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan stream_cfg = rStrmConf.getScan(); if (!stream_cfg){ - DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); - }else{ - result = stream_cfg.asJSON(); + WARN_MSG("Could not get stream '%s' config!", smp.c_str()); + return result; } - configLock.post();//unlock the config semaphore - return result; + return stream_cfg.asJSON(); } DTSC::Meta Util::getStreamMeta(const std::string & streamname){ @@ -207,25 +204,20 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir return true; } - /*LTS-START*/ + /* + * OLD CODE FOR HARDLIMITS. + * Maybe re-enable? + * Still sorta-works, but undocumented... { - //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //Lock the config to prevent race conditions and corruption issues while reading - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - //Abort if no config available - if (config){ - if (config.getMember("hardlimit_active")) { - configLock.post();//unlock the config semaphore - return false; - } + IPC::ConfigWrapper confLock(15); + if (confLock){ + IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + //Abort if we loaded a config and there is a hardlimit active in it. + if (config && config.getMember("hardlimit_active")){return false;} } - //unlock the config semaphore - configLock.post(); } - /*LTS-END*/ + */ //Find stream base name std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); @@ -368,22 +360,17 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider) JSON::Value ret; //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //Lock the config to prevent race conditions and corruption issues while reading - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - //Abort if no config available - if (!config){ - FAIL_MSG("Configuration not available, aborting! Is MistController running?"); - configLock.post();//unlock the config semaphore + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan inputs = rCapa.getMember("inputs"); + //Abort if not available + if (!inputs){ + FAIL_MSG("Capabilities not available, aborting! Is MistController running?"); return false; } - //check in curConf for capabilities-inputs--priority/source_match + //check in curConf for -priority/source_match bool selected = false; long long int curPrio = -1; - DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); DTSC::Scan input; unsigned int input_size = inputs.getSize(); bool noProviderNoPick = false; @@ -437,7 +424,6 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider) }else{ ret = input.asJSON(); } - configLock.post();//unlock the config semaphore return ret; } @@ -462,34 +448,34 @@ pid_t Util::startPush(const std::string & streamname, std::string & target) { streamVariables(target, streamname); //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //Lock the config to prevent race conditions and corruption issues while reading - configLock.wait(); - - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors"); std::string output_bin = ""; - std::string checkTarget = target.substr(0, target.rfind('?')); - unsigned int outputs_size = outputs.getSize(); - for (unsigned int i = 0; i // errno, ENOENT, EEXIST #include #include @@ -564,6 +565,13 @@ namespace Util{ r << std::endl; break; } + case RAX_DTSC:{ + char * ptr = getPointer(it->first, i); + size_t sz = getSize(it->first, i); + r << std::endl; + r << DTSC::Scan(ptr, sz).toPrettyString(indent+6) << std::endl; + break; + } default: r << "[UNIMPLEMENTED]" << std::endl; break; } } diff --git a/lib/util.h b/lib/util.h index a3b9835d..9a4f1854 100644 --- a/lib/util.h +++ b/lib/util.h @@ -74,6 +74,7 @@ namespace Util{ #define RAX_RAW 0x40 #define RAX_256RAW 0x44 #define RAX_512RAW 0x45 + #define RAX_DTSC 0x50 /// Reliable Access class. /// Provides reliable access to memory data structures, using dynamic static offsets and a status diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index dff64b46..3520592e 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -69,43 +69,28 @@ void createAccount(std::string account){ } /// Status monitoring thread. -/// Will check outputs, inputs and converters every five seconds +/// Will check outputs, inputs and converters every three seconds void statusMonitor(void *np){ - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); Controller::loadActiveConnectors(); while (Controller::conf.is_active){ // this scope prevents the configMutex from being locked constantly { tthread::lock_guard guard(Controller::configMutex); - bool changed = false; // checks online protocols, reports changes to status - changed |= Controller::CheckProtocols(Controller::Storage["config"]["protocols"], - Controller::capabilities); + if (Controller::CheckProtocols(Controller::Storage["config"]["protocols"], + Controller::capabilities)){ + Controller::writeProtocols(); + } // checks stream statuses, reports changes to status - changed |= Controller::CheckAllStreams(Controller::Storage["streams"]); - - // check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second... - if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && - !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){ - // that failed. We now unlock it, no matter what - and print a warning that it was stuck. - WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); - changed = true; - } - configLock.unlink(); - configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (changed || Controller::configChanged){ - Controller::writeConfig(); - Controller::configChanged = false; - } + Controller::CheckAllStreams(Controller::Storage["streams"]); } - Util::sleep(5000); // wait at least 5 seconds + Util::sleep(3000); // wait at least 3 seconds } if (Controller::restarting){ Controller::prepareActiveConnectorsForReload(); }else{ Controller::prepareActiveConnectorsForShutdown(); } - configLock.unlink(); } static unsigned long mix(unsigned long a, unsigned long b, unsigned long c){ @@ -316,13 +301,10 @@ int main_loop(int argc, char **argv){ Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog"); Controller::prometheus = Controller::Storage["config"]["prometheus"].asStringRef(); Controller::accesslog = Controller::Storage["config"]["accesslog"].asStringRef(); - { - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.unlink(); - } Controller::writeConfig(); Controller::checkAvailProtocols(); Controller::checkAvailTriggers(); + Controller::writeCapabilities(); Controller::updateBandwidthConfig(); createAccount(Controller::conf.getString("account")); Controller::conf.activate(); // activate early, so threads aren't killed. diff --git a/src/controller/controller_capabilities.cpp b/src/controller/controller_capabilities.cpp index ba78df57..f35aeb8a 100644 --- a/src/controller/controller_capabilities.cpp +++ b/src/controller/controller_capabilities.cpp @@ -27,12 +27,6 @@ namespace Controller { trgs["SYSTEM_STOP"]["response"] = "always"; trgs["SYSTEM_STOP"]["response_action"] = "If false, aborts shutdown."; - trgs["SYSTEM_CONFIG"]["when"] = "Every time MistServer's global configuration changes"; - trgs["SYSTEM_CONFIG"]["stream_specific"] = false; - trgs["SYSTEM_CONFIG"]["payload"] = "newly active configuration (JSON)"; - trgs["SYSTEM_CONFIG"]["response"] = "ignored"; - trgs["SYSTEM_CONFIG"]["response_action"] = "None."; - trgs["OUTPUT_START"]["when"] = "Before a connector starts listening for connections"; trgs["OUTPUT_START"]["stream_specific"] = false; trgs["OUTPUT_START"]["payload"] = "connector configuration (JSON)"; diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 98fb86a1..796ab3ab 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -1580,9 +1580,9 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i #if !defined(__CYGWIN__) && !defined(_WIN32) { struct statvfs shmd; - IPC::sharedPage tmpConf(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); - if (tmpConf.mapped && tmpConf.handle){ - fstatvfs(tmpConf.handle, &shmd); + IPC::sharedPage tmpCapa(SHM_CAPA, DEFAULT_CONF_PAGE_SIZE, false, false); + if (tmpCapa.mapped && tmpCapa.handle){ + fstatvfs(tmpCapa.handle, &shmd); shm_free = (shmd.f_bfree*shmd.f_frsize)/1024; shm_total = (shmd.f_blocks*shmd.f_frsize)/1024; } diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 08f436c3..52a11aed 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -218,6 +218,106 @@ namespace Controller{ } } + + void writeCapabilities(){ + std::string temp = capabilities.toPacked(); + static IPC::sharedPage mistCapaOut(SHM_CAPA, temp.size()+100, true, false); + if (!mistCapaOut.mapped){ + FAIL_MSG("Could not open capabilities config for writing! Is shared memory enabled on your system?"); + return; + } + Util::RelAccX A(mistCapaOut.mapped, false); + A.addField("dtsc_data", RAX_DTSC, temp.size()); + // write config + memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + + void writeProtocols(){ + static std::string proxy_written; + if (proxy_written != Storage["config"]["trustedproxy"].asStringRef()){ + proxy_written = Storage["config"]["trustedproxy"].asStringRef(); + static IPC::sharedPage mistProxOut(SHM_PROXY, proxy_written.size()+100, true, false); + mistProxOut.close(); + mistProxOut.init(SHM_PROXY, proxy_written.size()+100, true, false); + if (!mistProxOut.mapped){ + FAIL_MSG("Could not open trusted proxy config for writing! Is shared memory enabled on your system?"); + return; + }else{ + Util::RelAccX A(mistProxOut.mapped, false); + A.addField("proxy_data", RAX_STRING, proxy_written.size()); + // write config + memcpy(A.getPointer("proxy_data"), proxy_written.data(), proxy_written.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + } + static JSON::Value proto_written; + std::set skip; + skip.insert("online"); + skip.insert("error"); + if (Storage["config"]["protocols"].compareExcept(proto_written, skip)){return;} + proto_written.assignFrom(Storage["config"]["protocols"], skip); + std::string temp = proto_written.toPacked(); + static IPC::sharedPage mistProtoOut(SHM_PROTO, temp.size()+100, true, false); + mistProtoOut.close(); + mistProtoOut.init(SHM_PROTO, temp.size()+100, true, false); + if (!mistProtoOut.mapped){ + FAIL_MSG("Could not open protocol config for writing! Is shared memory enabled on your system?"); + return; + } + // write config + { + Util::RelAccX A(mistProtoOut.mapped, false); + A.addField("dtsc_data", RAX_DTSC, temp.size()); + // write config + memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + } + + void writeStream(const std::string & sName, const JSON::Value & sConf){ + static std::map writtenStrms; + static std::map pages; + static std::set skip; + if (!skip.size()){ + skip.insert("online"); + skip.insert("error"); + skip.insert("name"); + } + if (sConf.isNull()){ + writtenStrms.erase(sName); + pages.erase(sName); + return; + } + if (!writtenStrms.count(sName) || !writtenStrms[sName].compareExcept(sConf, skip)){ + writtenStrms[sName].assignFrom(sConf, skip); + IPC::sharedPage & P = pages[sName]; + std::string temp = writtenStrms[sName].toPacked(); + P.close(); + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, sName.c_str()); + P.init(tmpBuf, temp.size()+100, true, false); + if (!P){ + writtenStrms.erase(sName); + pages.erase(sName); + return; + } + Util::RelAccX A(P.mapped, false); + A.addField("dtsc_data", RAX_DTSC, temp.size()); + // write config + memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + } + /// Writes the current config to shared memory to be used in other processes /// \triggers /// The `"SYSTEM_START"` trigger is global, and is ran as soon as the server configuration is first stable. It has no payload. If cancelled, @@ -226,41 +326,10 @@ namespace Controller{ /// The `"SYSTEM_CONFIG"` trigger is global, and is ran every time the server configuration is updated. Its payload is the new configuration in /// JSON format. This trigger cannot be cancelled. void writeConfig(){ - static JSON::Value writeConf; - bool changed = false; - std::set skip; - skip.insert("online"); - skip.insert("error"); - if (!writeConf["config"].compareExcept(Storage["config"], skip)){ - writeConf["config"].assignFrom(Storage["config"], skip); - VERYHIGH_MSG("Saving new config because of edit in server config structure"); - changed = true; + writeProtocols(); + jsonForEach(Storage["streams"], it){ + writeStream(it.key(), *it); } - if (!writeConf["streams"].compareExcept(Storage["streams"], skip)){ - writeConf["streams"].assignFrom(Storage["streams"], skip); - VERYHIGH_MSG("Saving new config because of edit in streams"); - changed = true; - } - if (writeConf["capabilities"] != capabilities){ - writeConf["capabilities"] = capabilities; - VERYHIGH_MSG("Saving new config because of edit in capabilities"); - changed = true; - } - if (!changed){return;}// cancel further processing if no changes - - static IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, true); - if (!mistConfOut.mapped){ - FAIL_MSG("Could not open config shared memory storage for writing! Is shared memory enabled on your system?"); - return; - } - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - // lock semaphore - configLock.wait(); - // write config - std::string temp = writeConf.toPacked(); - memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len)); - // unlock semaphore - configLock.post(); /*LTS-START*/ static std::map pageForType; // should contain one page for every trigger type @@ -269,8 +338,8 @@ namespace Controller{ // for all shm pages that hold triggers pageForType.clear(); - if (writeConf["config"]["triggers"].size()){ - jsonForEach(writeConf["config"]["triggers"], it){ + if (Storage["config"]["triggers"].size()){ + jsonForEach(Storage["config"]["triggers"], it){ snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_TRIGGER, (it.key()).c_str()); pageForType[it.key()].init(tmpBuf, 32 * 1024, true, false); Util::RelAccX tPage(pageForType[it.key()].mapped, false); @@ -349,10 +418,6 @@ namespace Controller{ if (!Triggers::doTrigger("SYSTEM_START")){conf.is_active = false;} serverStartTriggered = true; } - if (Triggers::shouldTrigger("SYSTEM_CONFIG")){ - std::string payload = writeConf.toString(); - Triggers::doTrigger("SYSTEM_CONFIG", payload); - } /*LTS-END*/ } } diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 1884ff83..07a6ce2f 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -34,5 +34,8 @@ namespace Controller { void initState(); void deinitState(bool leaveBehind); void writeConfig(); + void writeStream(const std::string & sName, const JSON::Value & sConf); + void writeCapabilities(); + void writeProtocols(); } diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 54860ca1..0774d92b 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -208,6 +208,7 @@ namespace Controller { out[jit.key()]["name"] = jit.key(); Log("STRM", std::string("New stream ") + jit.key()); } + Controller::writeStream(jit.key(), out[jit.key()]); } } @@ -360,6 +361,7 @@ namespace Controller { /*LTS-END*/ Log("STRM", "Deleted stream " + name); out.removeMember(name); + Controller::writeStream(name, JSON::Value());//Null JSON value = delete ++ret; ret *= -1; if (inputProcesses.count(name)){ diff --git a/src/input/input.cpp b/src/input/input.cpp index 1eb70743..26e6931c 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -420,10 +420,11 @@ namespace Mist { bool Input::isAlwaysOn(){ bool ret = true; std::string strName = streamName.substr(0, (streamName.find_first_of("+ "))); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); + + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan streamCfg = rStrmConf.getScan(); if (streamCfg){ if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){ ret = false; @@ -433,7 +434,6 @@ namespace Mist { ret = false; #endif } - configLock.post(); return ret; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 17d0b4d9..0466e838 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -946,13 +946,11 @@ namespace Mist { std::string strName = config->getString("streamname"); Util::sanitizeName(strName); strName = strName.substr(0, (strName.find_first_of("+ "))); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!configLock.tryWaitOneSecond()){ - INFO_MSG("Aborting stream config refresh: locking took longer than expected"); - return false; - } - DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); + + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan streamCfg = rStrmConf.getScan(); long long tmpNum; //if stream is configured and setting is present, use it, always diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 9ea8e0e1..60962e55 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -125,10 +125,8 @@ namespace Mist { } //loop over the connectors - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan capa = rCapa.getMember("connectors"); unsigned int capa_ctr = capa.getSize(); for (unsigned int i = 0; i < capa_ctr; ++i){ DTSC::Scan c = capa.getIndice(i); @@ -161,14 +159,10 @@ namespace Mist { Util::sanitizeName(streamname); H.SetVar("stream", streamname); } - configLock.post(); - configLock.close(); return capa.getIndiceName(i); } } } - configLock.post(); - configLock.close(); return ""; } @@ -353,32 +347,25 @@ namespace Mist { char * argarr[20]; for (int i=0; i<20; i++){argarr[i] = 0;} int id = -1; - - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - unsigned int prots_ctr = prots.getSize(); - + JSON::Value pipedCapa; JSON::Value p;//properties of protocol - if (connector == "HTTP" || connector == "HTTP.exe"){ - //restore from values in the environment, regardless of configged settings - if (getenv("MIST_HTTP_nostreamtext")){ - p["nostreamtext"] = getenv("MIST_HTTP_nostreamtext"); - } - if (getenv("MIST_HTTP_pubaddr")){ - p["pubaddr"] = getenv("MIST_HTTP_pubaddr"); - } - }else{ - //find connector in config - for (unsigned int i=0; i < prots_ctr; ++i){ - if (prots.getIndice(i).getMember("connector").asString() == connector) { - id = i; - break; //pick the first protocol in the list that matches the connector + + + { + Util::DTSCShmReader rProto(SHM_PROTO); + DTSC::Scan prots = rProto.getScan(); + unsigned int prots_ctr = prots.getSize(); + + if (connector == "HTTP" || connector == "HTTP.exe"){ + //restore from values in the environment, regardless of configged settings + if (getenv("MIST_HTTP_nostreamtext")){ + p["nostreamtext"] = getenv("MIST_HTTP_nostreamtext"); } - } - if (id == -1) { - connector = connector + ".exe"; + if (getenv("MIST_HTTP_pubaddr")){ + p["pubaddr"] = getenv("MIST_HTTP_pubaddr"); + } + }else{ + //find connector in config for (unsigned int i=0; i < prots_ctr; ++i){ if (prots.getIndice(i).getMember("connector").asString() == connector) { id = i; @@ -386,27 +373,33 @@ namespace Mist { } } if (id == -1) { - connector = connector.substr(0, connector.size() - 4); - DEBUG_MSG(DLVL_ERROR, "No connector found for: %s", connector.c_str()); - configLock.post(); - configLock.close(); - return; + connector = connector + ".exe"; + for (unsigned int i=0; i < prots_ctr; ++i){ + if (prots.getIndice(i).getMember("connector").asString() == connector) { + id = i; + break; //pick the first protocol in the list that matches the connector + } + } + if (id == -1) { + connector = connector.substr(0, connector.size() - 4); + ERROR_MSG("No connector found for: %s", connector.c_str()); + return; + } } + //read options from found connector + p = prots.getIndice(id).asJSON(); } - //read options from found connector - p = prots.getIndice(id).asJSON(); + + HIGH_MSG("Connector found: %s", connector.c_str()); + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan capa = rCapa.getMember("connectors"); + pipedCapa = capa.getMember(connector).asJSON(); } - - DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str()); + //build arguments for starting output process - std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector; - int argnum = 0; argarr[argnum++] = (char*)tmparg.c_str(); - JSON::Value pipedCapa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(connector).asJSON(); - configLock.post(); - configLock.close(); std::string temphost=getConnectedHost(); std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString(); argarr[argnum++] = (char*)"--ip"; @@ -465,20 +458,19 @@ namespace Mist { trustedProxies.insert("::1"); trustedProxies.insert("127.0.0.1"); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Open server config - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - std::string trustedList = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("trustedproxy").asString(); - configLock.post(); - configLock.close(); - size_t pos = 0; - size_t endPos; - while (pos != std::string::npos){ - endPos = trustedList.find(" ", pos); - trustedProxies.insert(trustedList.substr(pos, endPos - pos)); - pos = endPos; - if (pos != std::string::npos){ - pos++; + IPC::sharedPage rPage(SHM_PROXY, 0, false, false); + if (rPage){ + Util::RelAccX rAcc(rPage.mapped); + std::string trustedList(rAcc.getPointer("proxy_data"), rAcc.getSize("proxy_data")); + size_t pos = 0; + size_t endPos; + while (pos != std::string::npos){ + endPos = trustedList.find(" ", pos); + trustedProxies.insert(trustedList.substr(pos, endPos - pos)); + pos = endPos; + if (pos != std::string::npos){ + pos++; + } } } } diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index b42dabdd..483aa686 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -11,13 +11,13 @@ namespace Mist { /// Helper function to find the protocol entry for a given port number std::string getProtocolForPort(uint16_t portNo){ std::string ret; - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan conns = rCapa.getMember("connectors"); + Util::DTSCShmReader rProto(SHM_PROTO); + DTSC::Scan prtcls = rProto.getScan(); unsigned int pro_cnt = prtcls.getSize(); for (unsigned int i = 0; i < pro_cnt; ++i){ - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(prtcls.getIndice(i).getMember("connector").asString()); + DTSC::Scan capa = conns.getMember(prtcls.getIndice(i).getMember("connector").asString()); uint16_t port = prtcls.getIndice(i).getMember("port").asInt(); //get the default port if none is set if (!port){ @@ -28,7 +28,6 @@ namespace Mist { break; } } - configLock.post(); if (ret.find(':') != std::string::npos){ ret.erase(ret.find(':')); } @@ -383,16 +382,6 @@ namespace Mist { if (!myConn){ return json_resp; } - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - if (!prots){ - json_resp["error"] = "The specified stream is not available on this server."; - configLock.post(); - configLock.close(); - return json_resp; - } bool hasVideo = false; for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ @@ -431,7 +420,17 @@ namespace Mist { it->removeMember("ivecs");/*LTS*/ } json_resp["meta"].removeMember("source"); - + + //Get sources/protocols information + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan connectors = rCapa.getMember("connectors"); + Util::DTSCShmReader rProto(SHM_PROTO); + DTSC::Scan prots = rProto.getScan(); + if (!prots || !connectors){ + json_resp["error"] = "Server configuration unavailable at this time."; + return json_resp; + } + //create a set for storing source information std::set sources; @@ -444,7 +443,7 @@ namespace Mist { //loop over the connectors. for (unsigned int i = 0; i < prots_ctr; ++i){ std::string cName = prots.getIndice(i).getMember("connector").asString(); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); + DTSC::Scan capa = connectors.getMember(cName); //if the connector has a port, if (capa.getMember("optional").getMember("port")){ HTTP::URL outURL(reqHost); @@ -476,12 +475,11 @@ namespace Mist { std::string cProv = capa.getMember("provides").asString(); //if this connector can be depended upon by other connectors, loop over the rest //check each enabled protocol separately to see if it depends on this connector - DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); - unsigned int capa_lst_ctr = capa_lst.getSize(); + unsigned int capa_lst_ctr = connectors.getSize(); for (unsigned int j = 0; j < capa_lst_ctr; ++j){ //if it depends on this connector and has a URL, list it - if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ - JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); + if (conns.count(connectors.getIndiceName(j)) && connectors.getIndice(j).getMember("deps").asString() == cProv && connectors.getIndice(j).getMember("methods")){ + JSON::Value subcapa_json = connectors.getIndice(j).asJSON(); addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"], useragent); } } @@ -495,8 +493,6 @@ namespace Mist { json_resp["source"].append(*it); } } - configLock.post(); - configLock.close(); return json_resp; } @@ -567,41 +563,38 @@ namespace Mist { // send smil MBR index if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".smil"){ std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; - std::string port, url_rel; - - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember("RTMP"); - unsigned int pro_cnt = prtcls.getSize(); - for (unsigned int i = 0; i < pro_cnt; ++i){ - if (prtcls.getIndice(i).getMember("connector").asString() != "RTMP"){ - continue; - } - port = prtcls.getIndice(i).getMember("port").asString(); - //get the default port if none is set - if (!port.size()){ - port = capa.getMember("optional").getMember("port").getMember("default").asString(); - } - //extract url - url_rel = capa.getMember("url_rel").asString(); - if (url_rel.find('$')){ - url_rel.resize(url_rel.find('$')); - } - } - std::string trackSources;//this string contains all track sources for MBR smil - initialize(); - if (!myConn){return;} - for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.type == "video"){ - trackSources += "