diff --git a/lib/defines.h b/lib/defines.h index c15c6f2a..201faf87 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -126,6 +126,7 @@ static inline void show_stackframe(){} #define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name #define SHM_STREAM_STATE "MstSTATE%s" //%s stream name +#define SHM_STREAM_CONF "MstSCnf%s" //%s stream name #define STRMSTAT_OFF 0 #define STRMSTAT_INIT 1 #define STRMSTAT_BOOT 2 @@ -139,11 +140,10 @@ static inline void show_stackframe(){} #define SHM_TRACK_DATA "MstDATA%s@%lu_%lu" //%s stream name, %lu track ID, %lu page # #define SHM_STATISTICS "MstSTAT" #define SHM_USERS "MstUSER%s" //%s stream name -#define SHM_TRIGGER "MstTRIG%s" //%s trigger name #define SEM_LIVE "/MstLIVE%s" //%s stream name #define SEM_INPUT "/MstInpt%s" //%s stream name -#define SEM_CONF "/MstConfLock" -#define SHM_CONF "MstConf" +#define SHM_CAPA "MstCapa" +#define SHM_PROTO "MstProt" #define SHM_STATE_LOGS "MstStateLogs" #define SHM_STATE_ACCS "MstStateAccs" #define SHM_STATE_STREAMS "MstStateStreams" diff --git a/lib/stream.cpp b/lib/stream.cpp index faeeb5f6..da09f5d5 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -79,20 +79,17 @@ JSON::Value Util::getStreamConfig(const std::string & streamname){ FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); return result; } - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); - //check if smp (everything before + or space) exists - DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); + + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, smp.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan stream_cfg = rStrmConf.getScan(); if (!stream_cfg){ - DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); - }else{ - result = stream_cfg.asJSON(); + WARN_MSG("Could not get stream '%s' config!", smp.c_str()); + return result; } - configLock.post();//unlock the config semaphore - return result; + return stream_cfg.asJSON(); } DTSC::Meta Util::getStreamMeta(const std::string & streamname){ @@ -281,22 +278,17 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider) JSON::Value ret; //Attempt to load up configuration and find this stream - IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //Lock the config to prevent race conditions and corruption issues while reading - configLock.wait(); - DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - //Abort if no config available - if (!config){ - FAIL_MSG("Configuration not available, aborting! Is MistController running?"); - configLock.post();//unlock the config semaphore + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan inputs = rCapa.getMember("inputs"); + //Abort if not available + if (!inputs){ + FAIL_MSG("Capabilities not available, aborting! Is MistController running?"); return false; } - //check in curConf for capabilities-inputs--priority/source_match + //check in curConf for -priority/source_match bool selected = false; long long int curPrio = -1; - DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs"); DTSC::Scan input; unsigned int input_size = inputs.getSize(); bool noProviderNoPick = false; @@ -350,7 +342,6 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider) }else{ ret = input.asJSON(); } - configLock.post();//unlock the config semaphore return ret; } @@ -362,3 +353,18 @@ uint8_t Util::getStreamStatus(const std::string & streamname){ return streamStatus.mapped[0]; } +Util::DTSCShmReader::DTSCShmReader(const std::string &pageName){ + rPage.init(pageName, 0); + if (rPage){ + rAcc = Util::RelAccX(rPage.mapped); + } +} + +DTSC::Scan Util::DTSCShmReader::getMember(const std::string &indice){ + return DTSC::Scan(rAcc.getPointer("dtsc_data"), rAcc.getSize("dtsc_data")).getMember(indice.c_str()); +} + +DTSC::Scan Util::DTSCShmReader::getScan(){ + return DTSC::Scan(rAcc.getPointer("dtsc_data"), rAcc.getSize("dtsc_data")); +} + diff --git a/lib/stream.h b/lib/stream.h index 1d89c892..81c8bd5e 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -6,6 +6,8 @@ #include "socket.h" #include "json.h" #include "dtsc.h" +#include "shared_memory.h" +#include "util.h" namespace Util { std::string getTmpFolder(); @@ -16,5 +18,16 @@ namespace Util { JSON::Value getInputBySource(const std::string & filename, bool isProvider = false); DTSC::Meta getStreamMeta(const std::string & streamname); uint8_t getStreamStatus(const std::string & streamname); + + class DTSCShmReader{ + public: + DTSCShmReader(const std::string &pageName); + DTSC::Scan getMember(const std::string &indice); + DTSC::Scan getScan(); + private: + IPC::sharedPage rPage; + Util::RelAccX rAcc; + }; + } diff --git a/lib/util.cpp b/lib/util.cpp index 728bcfbb..7cf4d868 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -6,6 +6,7 @@ #include "defines.h" #include "timing.h" #include "procs.h" +#include "dtsc.h" #include // errno, ENOENT, EEXIST #include #include @@ -564,6 +565,13 @@ namespace Util{ r << std::endl; break; } + case RAX_DTSC:{ + char * ptr = getPointer(it->first, i); + size_t sz = getSize(it->first, i); + r << std::endl; + r << DTSC::Scan(ptr, sz).toPrettyString(indent+6) << std::endl; + break; + } default: r << "[UNIMPLEMENTED]" << std::endl; break; } } diff --git a/lib/util.h b/lib/util.h index a3b9835d..9a4f1854 100644 --- a/lib/util.h +++ b/lib/util.h @@ -74,6 +74,7 @@ namespace Util{ #define RAX_RAW 0x40 #define RAX_256RAW 0x44 #define RAX_512RAW 0x45 + #define RAX_DTSC 0x50 /// Reliable Access class. /// Provides reliable access to memory data structures, using dynamic static offsets and a status diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 7ce1e016..aa289415 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -61,43 +61,28 @@ void createAccount(std::string account){ } /// Status monitoring thread. -/// Will check outputs, inputs and converters every five seconds +/// Will check outputs, inputs and converters every three seconds void statusMonitor(void *np){ - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); Controller::loadActiveConnectors(); while (Controller::conf.is_active){ // this scope prevents the configMutex from being locked constantly { tthread::lock_guard guard(Controller::configMutex); - bool changed = false; // checks online protocols, reports changes to status - changed |= Controller::CheckProtocols(Controller::Storage["config"]["protocols"], - Controller::capabilities); + if (Controller::CheckProtocols(Controller::Storage["config"]["protocols"], + Controller::capabilities)){ + Controller::writeProtocols(); + } // checks stream statuses, reports changes to status - changed |= Controller::CheckAllStreams(Controller::Storage["streams"]); - - // check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second... - if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() && - !configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){ - // that failed. We now unlock it, no matter what - and print a warning that it was stuck. - WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config."); - changed = true; - } - configLock.unlink(); - configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (changed || Controller::configChanged){ - Controller::writeConfig(); - Controller::configChanged = false; - } + Controller::CheckAllStreams(Controller::Storage["streams"]); } - Util::sleep(5000); // wait at least 5 seconds + Util::sleep(3000); // wait at least 3 seconds } if (Controller::restarting){ Controller::prepareActiveConnectorsForReload(); }else{ Controller::prepareActiveConnectorsForShutdown(); } - configLock.unlink(); } static unsigned long mix(unsigned long a, unsigned long b, unsigned long c){ @@ -254,12 +239,9 @@ int main_loop(int argc, char **argv){ Controller::conf.getOption("username", true)[0u] = Controller::Storage["config"]["controller"]["username"]; } - { - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.unlink(); - } Controller::writeConfig(); Controller::checkAvailProtocols(); + Controller::writeCapabilities(); createAccount(Controller::conf.getString("account")); Controller::conf.activate(); // activate early, so threads aren't killed. diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 5e4ba7be..f79ff417 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -122,10 +122,6 @@ void Controller::SharedMemStats(void * config){ statServer.parseEach(parseStatistics); if (firstRun){ firstRun = false; - servUpOtherBytes = 0; - servDownOtherBytes = 0; - servUpBytes = 0; - servDownBytes = 0; for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ it->second.upBytes = 0; it->second.downBytes = 0; diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 1a6b0901..10f61d79 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -211,43 +211,93 @@ namespace Controller { } } - /// Writes the current config to shared memory to be used in other processes - void writeConfig(){ - static JSON::Value writeConf; - bool changed = false; + + void writeCapabilities(){ + std::string temp = capabilities.toPacked(); + static IPC::sharedPage mistCapaOut(SHM_CAPA, temp.size()+100, true, false); + if (!mistCapaOut.mapped){ + FAIL_MSG("Could not open capabilities config for writing! Is shared memory enabled on your system?"); + return; + } + Util::RelAccX A(mistCapaOut.mapped, false); + A.addField("dtsc_data", RAX_DTSC, temp.size()); + // write config + memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + + void writeProtocols(){ + static JSON::Value proto_written; std::set skip; skip.insert("online"); skip.insert("error"); - if (!writeConf["config"].compareExcept(Storage["config"], skip)){ - writeConf["config"].assignFrom(Storage["config"], skip); - VERYHIGH_MSG("Saving new config because of edit in server config structure"); - changed = true; - } - if (!writeConf["streams"].compareExcept(Storage["streams"], skip)){ - writeConf["streams"].assignFrom(Storage["streams"], skip); - VERYHIGH_MSG("Saving new config because of edit in streams"); - changed = true; - } - if (writeConf["capabilities"] != capabilities){ - writeConf["capabilities"] = capabilities; - VERYHIGH_MSG("Saving new config because of edit in capabilities"); - changed = true; - } - if (!changed){return;}//cancel further processing if no changes - - static IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, true); - if (!mistConfOut.mapped){ - FAIL_MSG("Could not open config shared memory storage for writing! Is shared memory enabled on your system?"); + if (Storage["config"]["protocols"].compareExcept(proto_written, skip)){return;} + proto_written.assignFrom(Storage["config"]["protocols"], skip); + std::string temp = proto_written.toPacked(); + static IPC::sharedPage mistProtoOut(SHM_PROTO, temp.size()+100, true, false); + mistProtoOut.close(); + mistProtoOut.init(SHM_PROTO, temp.size()+100, true, false); + if (!mistProtoOut.mapped){ + FAIL_MSG("Could not open protocol config for writing! Is shared memory enabled on your system?"); return; } - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //lock semaphore - configLock.wait(); - //write config - std::string temp = writeConf.toPacked(); - memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len)); - //unlock semaphore - configLock.post(); + // write config + { + Util::RelAccX A(mistProtoOut.mapped, false); + A.addField("dtsc_data", RAX_DTSC, temp.size()); + // write config + memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + } + + void writeStream(const std::string & sName, const JSON::Value & sConf){ + static std::map writtenStrms; + static std::map pages; + static std::set skip; + if (!skip.size()){ + skip.insert("online"); + skip.insert("error"); + skip.insert("name"); + } + if (sConf.isNull()){ + writtenStrms.erase(sName); + pages.erase(sName); + return; + } + if (!writtenStrms.count(sName) || !writtenStrms[sName].compareExcept(sConf, skip)){ + writtenStrms[sName].assignFrom(sConf, skip); + IPC::sharedPage & P = pages[sName]; + std::string temp = writtenStrms[sName].toPacked(); + P.close(); + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, sName.c_str()); + P.init(tmpBuf, temp.size()+100, true, false); + if (!P){ + writtenStrms.erase(sName); + pages.erase(sName); + return; + } + Util::RelAccX A(P.mapped, false); + A.addField("dtsc_data", RAX_DTSC, temp.size()); + // write config + memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size()); + A.setRCount(1); + A.setEndPos(1); + A.setReady(); + } + } + + /// Writes the current config to shared memory to be used in other processes + void writeConfig(){ + writeProtocols(); + jsonForEach(Storage["streams"], it){ + writeStream(it.key(), *it); + } } } diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 525715ea..6ef2a150 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -32,5 +32,8 @@ namespace Controller { void initState(); void deinitState(bool leaveBehind); void writeConfig(); + void writeStream(const std::string & sName, const JSON::Value & sConf); + void writeCapabilities(); + void writeProtocols(); } diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index a81394a9..ecc93d0c 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -148,6 +148,7 @@ namespace Controller { out[jit.key()]["name"] = jit.key(); Log("STRM", std::string("New stream ") + jit.key()); } + Controller::writeStream(jit.key(), out[jit.key()]); } } @@ -240,6 +241,7 @@ namespace Controller { } Log("STRM", std::string("Deleted stream ") + name); out.removeMember(name); + Controller::writeStream(name, JSON::Value());//Null JSON value = delete } } //Controller namespace diff --git a/src/input/input.cpp b/src/input/input.cpp index 3b4f48f6..52e17f6e 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -298,10 +298,11 @@ namespace Mist { bool Input::isAlwaysOn(){ bool ret = true; std::string strName = streamName.substr(0, (streamName.find_first_of("+ "))); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); + + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan streamCfg = rStrmConf.getScan(); if (streamCfg){ if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){ ret = false; @@ -311,7 +312,6 @@ namespace Mist { ret = false; #endif } - configLock.post(); return ret; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index e1c6147e..721e9fa8 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -739,13 +739,11 @@ namespace Mist { std::string strName = config->getString("streamname"); Util::sanitizeName(strName); strName = strName.substr(0, (strName.find_first_of("+ "))); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!configLock.tryWaitOneSecond()){ - INFO_MSG("Aborting stream config refresh: locking took longer than expected"); - return false; - } - DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); + + char tmpBuf[NAME_BUFFER_SIZE]; + snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str()); + Util::DTSCShmReader rStrmConf(tmpBuf); + DTSC::Scan streamCfg = rStrmConf.getScan(); long long tmpNum; //if stream is configured and setting is present, use it, always diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 07d92779..e52322f6 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -123,10 +123,8 @@ namespace Mist { } //loop over the connectors - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan capa = rCapa.getMember("connectors"); unsigned int capa_ctr = capa.getSize(); for (unsigned int i = 0; i < capa_ctr; ++i){ DTSC::Scan c = capa.getIndice(i); @@ -159,14 +157,10 @@ namespace Mist { Util::sanitizeName(streamname); H.SetVar("stream", streamname); } - configLock.post(); - configLock.close(); return capa.getIndiceName(i); } } } - configLock.post(); - configLock.close(); return ""; } @@ -323,22 +317,16 @@ namespace Mist { char * argarr[20]; for (int i=0; i<20; i++){argarr[i] = 0;} int id = -1; - - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - unsigned int prots_ctr = prots.getSize(); - + JSON::Value pipedCapa; JSON::Value p;//properties of protocol - for (unsigned int i=0; i < prots_ctr; ++i){ - if (prots.getIndice(i).getMember("connector").asString() == connector) { - id = i; - break; //pick the first protocol in the list that matches the connector - } - } - if (id == -1) { - connector = connector + ".exe"; + + + { + Util::DTSCShmReader rProto(SHM_PROTO); + DTSC::Scan prots = rProto.getScan(); + unsigned int prots_ctr = prots.getSize(); + + //find connector in config for (unsigned int i=0; i < prots_ctr; ++i){ if (prots.getIndice(i).getMember("connector").asString() == connector) { id = i; @@ -346,26 +334,32 @@ namespace Mist { } } if (id == -1) { - connector = connector.substr(0, connector.size() - 4); - DEBUG_MSG(DLVL_ERROR, "No connector found for: %s", connector.c_str()); - configLock.post(); - configLock.close(); - return; + connector = connector + ".exe"; + for (unsigned int i=0; i < prots_ctr; ++i){ + if (prots.getIndice(i).getMember("connector").asString() == connector) { + id = i; + break; //pick the first protocol in the list that matches the connector + } + } + if (id == -1) { + connector = connector.substr(0, connector.size() - 4); + ERROR_MSG("No connector found for: %s", connector.c_str()); + return; + } } + //read options from found connector + p = prots.getIndice(id).asJSON(); + + HIGH_MSG("Connector found: %s", connector.c_str()); + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan capa = rCapa.getMember("connectors"); + pipedCapa = capa.getMember(connector).asJSON(); } - //read options from found connector - p = prots.getIndice(id).asJSON(); - - DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str()); + //build arguments for starting output process - std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector; - int argnum = 0; argarr[argnum++] = (char*)tmparg.c_str(); - JSON::Value pipedCapa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(connector).asJSON(); - configLock.post(); - configLock.close(); std::string temphost=getConnectedHost(); std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString(); argarr[argnum++] = (char*)"--ip"; diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 54cdb0ab..b6dbcce3 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -11,13 +11,13 @@ namespace Mist { /// Helper function to find the protocol entry for a given port number std::string getProtocolForPort(uint16_t portNo){ std::string ret; - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan conns = rCapa.getMember("connectors"); + Util::DTSCShmReader rProto(SHM_PROTO); + DTSC::Scan prtcls = rProto.getScan(); unsigned int pro_cnt = prtcls.getSize(); for (unsigned int i = 0; i < pro_cnt; ++i){ - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(prtcls.getIndice(i).getMember("connector").asString()); + DTSC::Scan capa = conns.getMember(prtcls.getIndice(i).getMember("connector").asString()); uint16_t port = prtcls.getIndice(i).getMember("port").asInt(); //get the default port if none is set if (!port){ @@ -28,7 +28,6 @@ namespace Mist { break; } } - configLock.post(); if (ret.find(':') != std::string::npos){ ret.erase(ret.find(':')); } @@ -348,16 +347,6 @@ namespace Mist { if (!myConn){ return json_resp; } - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - if (!prots){ - json_resp["error"] = "The specified stream is not available on this server."; - configLock.post(); - configLock.close(); - return json_resp; - } bool hasVideo = false; for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ @@ -395,7 +384,17 @@ namespace Mist { it->removeMember("parts"); } json_resp["meta"].removeMember("source"); - + + //Get sources/protocols information + Util::DTSCShmReader rCapa(SHM_CAPA); + DTSC::Scan connectors = rCapa.getMember("connectors"); + Util::DTSCShmReader rProto(SHM_PROTO); + DTSC::Scan prots = rProto.getScan(); + if (!prots || !connectors){ + json_resp["error"] = "Server configuration unavailable at this time."; + return json_resp; + } + //create a set for storing source information std::set sources; @@ -408,7 +407,7 @@ namespace Mist { //loop over the connectors. for (unsigned int i = 0; i < prots_ctr; ++i){ std::string cName = prots.getIndice(i).getMember("connector").asString(); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); + DTSC::Scan capa = connectors.getMember(cName); //if the connector has a port, if (capa.getMember("optional").getMember("port")){ HTTP::URL outURL(reqHost); @@ -431,12 +430,11 @@ namespace Mist { std::string cProv = capa.getMember("provides").asString(); //if this connector can be depended upon by other connectors, loop over the rest //check each enabled protocol separately to see if it depends on this connector - DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); - unsigned int capa_lst_ctr = capa_lst.getSize(); + unsigned int capa_lst_ctr = connectors.getSize(); for (unsigned int j = 0; j < capa_lst_ctr; ++j){ //if it depends on this connector and has a URL, list it - if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ - JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); + if (conns.count(connectors.getIndiceName(j)) && connectors.getIndice(j).getMember("deps").asString() == cProv && connectors.getIndice(j).getMember("methods")){ + JSON::Value subcapa_json = connectors.getIndice(j).asJSON(); addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"], useragent); } } @@ -450,8 +448,6 @@ namespace Mist { json_resp["source"].append(*it); } } - configLock.post(); - configLock.close(); return json_resp; } @@ -522,41 +518,38 @@ namespace Mist { // send smil MBR index if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".smil"){ std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; - std::string port, url_rel; - - IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - configLock.wait(); - IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); - DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); - DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember("RTMP"); - unsigned int pro_cnt = prtcls.getSize(); - for (unsigned int i = 0; i < pro_cnt; ++i){ - if (prtcls.getIndice(i).getMember("connector").asString() != "RTMP"){ - continue; - } - port = prtcls.getIndice(i).getMember("port").asString(); - //get the default port if none is set - if (!port.size()){ - port = capa.getMember("optional").getMember("port").getMember("default").asString(); - } - //extract url - url_rel = capa.getMember("url_rel").asString(); - if (url_rel.find('$')){ - url_rel.resize(url_rel.find('$')); - } - } - std::string trackSources;//this string contains all track sources for MBR smil - initialize(); - if (!myConn){return;} - for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ - if (trit->second.type == "video"){ - trackSources += "