Robustify accesses to server config

This commit is contained in:
Thulinma 2018-11-28 11:44:07 +01:00
parent d36faa340a
commit ac92e09262
14 changed files with 238 additions and 192 deletions

View file

@ -126,6 +126,7 @@ static inline void show_stackframe(){}
#define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name #define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name
#define SHM_STREAM_STATE "MstSTATE%s" //%s stream name #define SHM_STREAM_STATE "MstSTATE%s" //%s stream name
#define SHM_STREAM_CONF "MstSCnf%s" //%s stream name
#define STRMSTAT_OFF 0 #define STRMSTAT_OFF 0
#define STRMSTAT_INIT 1 #define STRMSTAT_INIT 1
#define STRMSTAT_BOOT 2 #define STRMSTAT_BOOT 2
@ -139,11 +140,10 @@ static inline void show_stackframe(){}
#define SHM_TRACK_DATA "MstDATA%s@%lu_%lu" //%s stream name, %lu track ID, %lu page # #define SHM_TRACK_DATA "MstDATA%s@%lu_%lu" //%s stream name, %lu track ID, %lu page #
#define SHM_STATISTICS "MstSTAT" #define SHM_STATISTICS "MstSTAT"
#define SHM_USERS "MstUSER%s" //%s stream name #define SHM_USERS "MstUSER%s" //%s stream name
#define SHM_TRIGGER "MstTRIG%s" //%s trigger name
#define SEM_LIVE "/MstLIVE%s" //%s stream name #define SEM_LIVE "/MstLIVE%s" //%s stream name
#define SEM_INPUT "/MstInpt%s" //%s stream name #define SEM_INPUT "/MstInpt%s" //%s stream name
#define SEM_CONF "/MstConfLock" #define SHM_CAPA "MstCapa"
#define SHM_CONF "MstConf" #define SHM_PROTO "MstProt"
#define SHM_STATE_LOGS "MstStateLogs" #define SHM_STATE_LOGS "MstStateLogs"
#define SHM_STATE_ACCS "MstStateAccs" #define SHM_STATE_ACCS "MstStateAccs"
#define SHM_STATE_STREAMS "MstStateStreams" #define SHM_STATE_STREAMS "MstStateStreams"

View file

@ -79,20 +79,17 @@ JSON::Value Util::getStreamConfig(const std::string & streamname){
FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size());
return result; return result;
} }
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false);
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); std::string smp = streamname.substr(0, streamname.find_first_of("+ "));
//check if smp (everything before + or space) exists
DTSC::Scan stream_cfg = config.getMember("streams").getMember(smp); char tmpBuf[NAME_BUFFER_SIZE];
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, smp.c_str());
Util::DTSCShmReader rStrmConf(tmpBuf);
DTSC::Scan stream_cfg = rStrmConf.getScan();
if (!stream_cfg){ if (!stream_cfg){
DEBUG_MSG(DLVL_MEDIUM, "Stream %s not configured", streamname.c_str()); WARN_MSG("Could not get stream '%s' config!", smp.c_str());
}else{
result = stream_cfg.asJSON();
}
configLock.post();//unlock the config semaphore
return result; return result;
}
return stream_cfg.asJSON();
} }
DTSC::Meta Util::getStreamMeta(const std::string & streamname){ DTSC::Meta Util::getStreamMeta(const std::string & streamname){
@ -281,22 +278,17 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider)
JSON::Value ret; JSON::Value ret;
//Attempt to load up configuration and find this stream //Attempt to load up configuration and find this stream
IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); Util::DTSCShmReader rCapa(SHM_CAPA);
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); DTSC::Scan inputs = rCapa.getMember("inputs");
//Lock the config to prevent race conditions and corruption issues while reading //Abort if not available
configLock.wait(); if (!inputs){
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); FAIL_MSG("Capabilities not available, aborting! Is MistController running?");
//Abort if no config available
if (!config){
FAIL_MSG("Configuration not available, aborting! Is MistController running?");
configLock.post();//unlock the config semaphore
return false; return false;
} }
//check in curConf for capabilities-inputs-<naam>-priority/source_match //check in curConf for <naam>-priority/source_match
bool selected = false; bool selected = false;
long long int curPrio = -1; long long int curPrio = -1;
DTSC::Scan inputs = config.getMember("capabilities").getMember("inputs");
DTSC::Scan input; DTSC::Scan input;
unsigned int input_size = inputs.getSize(); unsigned int input_size = inputs.getSize();
bool noProviderNoPick = false; bool noProviderNoPick = false;
@ -350,7 +342,6 @@ JSON::Value Util::getInputBySource(const std::string &filename, bool isProvider)
}else{ }else{
ret = input.asJSON(); ret = input.asJSON();
} }
configLock.post();//unlock the config semaphore
return ret; return ret;
} }
@ -362,3 +353,18 @@ uint8_t Util::getStreamStatus(const std::string & streamname){
return streamStatus.mapped[0]; return streamStatus.mapped[0];
} }
Util::DTSCShmReader::DTSCShmReader(const std::string &pageName){
rPage.init(pageName, 0);
if (rPage){
rAcc = Util::RelAccX(rPage.mapped);
}
}
DTSC::Scan Util::DTSCShmReader::getMember(const std::string &indice){
return DTSC::Scan(rAcc.getPointer("dtsc_data"), rAcc.getSize("dtsc_data")).getMember(indice.c_str());
}
DTSC::Scan Util::DTSCShmReader::getScan(){
return DTSC::Scan(rAcc.getPointer("dtsc_data"), rAcc.getSize("dtsc_data"));
}

View file

@ -6,6 +6,8 @@
#include "socket.h" #include "socket.h"
#include "json.h" #include "json.h"
#include "dtsc.h" #include "dtsc.h"
#include "shared_memory.h"
#include "util.h"
namespace Util { namespace Util {
std::string getTmpFolder(); std::string getTmpFolder();
@ -16,5 +18,16 @@ namespace Util {
JSON::Value getInputBySource(const std::string & filename, bool isProvider = false); JSON::Value getInputBySource(const std::string & filename, bool isProvider = false);
DTSC::Meta getStreamMeta(const std::string & streamname); DTSC::Meta getStreamMeta(const std::string & streamname);
uint8_t getStreamStatus(const std::string & streamname); uint8_t getStreamStatus(const std::string & streamname);
class DTSCShmReader{
public:
DTSCShmReader(const std::string &pageName);
DTSC::Scan getMember(const std::string &indice);
DTSC::Scan getScan();
private:
IPC::sharedPage rPage;
Util::RelAccX rAcc;
};
} }

View file

@ -6,6 +6,7 @@
#include "defines.h" #include "defines.h"
#include "timing.h" #include "timing.h"
#include "procs.h" #include "procs.h"
#include "dtsc.h"
#include <errno.h> // errno, ENOENT, EEXIST #include <errno.h> // errno, ENOENT, EEXIST
#include <iostream> #include <iostream>
#include <iomanip> #include <iomanip>
@ -564,6 +565,13 @@ namespace Util{
r << std::endl; r << std::endl;
break; break;
} }
case RAX_DTSC:{
char * ptr = getPointer(it->first, i);
size_t sz = getSize(it->first, i);
r << std::endl;
r << DTSC::Scan(ptr, sz).toPrettyString(indent+6) << std::endl;
break;
}
default: r << "[UNIMPLEMENTED]" << std::endl; break; default: r << "[UNIMPLEMENTED]" << std::endl; break;
} }
} }

View file

@ -74,6 +74,7 @@ namespace Util{
#define RAX_RAW 0x40 #define RAX_RAW 0x40
#define RAX_256RAW 0x44 #define RAX_256RAW 0x44
#define RAX_512RAW 0x45 #define RAX_512RAW 0x45
#define RAX_DTSC 0x50
/// Reliable Access class. /// Reliable Access class.
/// Provides reliable access to memory data structures, using dynamic static offsets and a status /// Provides reliable access to memory data structures, using dynamic static offsets and a status

View file

@ -61,43 +61,28 @@ void createAccount(std::string account){
} }
/// Status monitoring thread. /// Status monitoring thread.
/// Will check outputs, inputs and converters every five seconds /// Will check outputs, inputs and converters every three seconds
void statusMonitor(void *np){ void statusMonitor(void *np){
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
Controller::loadActiveConnectors(); Controller::loadActiveConnectors();
while (Controller::conf.is_active){ while (Controller::conf.is_active){
// this scope prevents the configMutex from being locked constantly // this scope prevents the configMutex from being locked constantly
{ {
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex); tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
bool changed = false;
// checks online protocols, reports changes to status // checks online protocols, reports changes to status
changed |= Controller::CheckProtocols(Controller::Storage["config"]["protocols"], if (Controller::CheckProtocols(Controller::Storage["config"]["protocols"],
Controller::capabilities); Controller::capabilities)){
Controller::writeProtocols();
}
// checks stream statuses, reports changes to status // checks stream statuses, reports changes to status
changed |= Controller::CheckAllStreams(Controller::Storage["streams"]); Controller::CheckAllStreams(Controller::Storage["streams"]);
// check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second...
if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() &&
!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){
// that failed. We now unlock it, no matter what - and print a warning that it was stuck.
WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config.");
changed = true;
} }
configLock.unlink(); Util::sleep(3000); // wait at least 3 seconds
configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (changed || Controller::configChanged){
Controller::writeConfig();
Controller::configChanged = false;
}
}
Util::sleep(5000); // wait at least 5 seconds
} }
if (Controller::restarting){ if (Controller::restarting){
Controller::prepareActiveConnectorsForReload(); Controller::prepareActiveConnectorsForReload();
}else{ }else{
Controller::prepareActiveConnectorsForShutdown(); Controller::prepareActiveConnectorsForShutdown();
} }
configLock.unlink();
} }
static unsigned long mix(unsigned long a, unsigned long b, unsigned long c){ static unsigned long mix(unsigned long a, unsigned long b, unsigned long c){
@ -254,12 +239,9 @@ int main_loop(int argc, char **argv){
Controller::conf.getOption("username", true)[0u] = Controller::conf.getOption("username", true)[0u] =
Controller::Storage["config"]["controller"]["username"]; Controller::Storage["config"]["controller"]["username"];
} }
{
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.unlink();
}
Controller::writeConfig(); Controller::writeConfig();
Controller::checkAvailProtocols(); Controller::checkAvailProtocols();
Controller::writeCapabilities();
createAccount(Controller::conf.getString("account")); createAccount(Controller::conf.getString("account"));
Controller::conf.activate(); // activate early, so threads aren't killed. Controller::conf.activate(); // activate early, so threads aren't killed.

View file

@ -122,10 +122,6 @@ void Controller::SharedMemStats(void * config){
statServer.parseEach(parseStatistics); statServer.parseEach(parseStatistics);
if (firstRun){ if (firstRun){
firstRun = false; firstRun = false;
servUpOtherBytes = 0;
servDownOtherBytes = 0;
servUpBytes = 0;
servDownBytes = 0;
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
it->second.upBytes = 0; it->second.upBytes = 0;
it->second.downBytes = 0; it->second.downBytes = 0;

View file

@ -211,43 +211,93 @@ namespace Controller {
} }
} }
/// Writes the current config to shared memory to be used in other processes
void writeConfig(){ void writeCapabilities(){
static JSON::Value writeConf; std::string temp = capabilities.toPacked();
bool changed = false; static IPC::sharedPage mistCapaOut(SHM_CAPA, temp.size()+100, true, false);
if (!mistCapaOut.mapped){
FAIL_MSG("Could not open capabilities config for writing! Is shared memory enabled on your system?");
return;
}
Util::RelAccX A(mistCapaOut.mapped, false);
A.addField("dtsc_data", RAX_DTSC, temp.size());
// write config
memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size());
A.setRCount(1);
A.setEndPos(1);
A.setReady();
}
void writeProtocols(){
static JSON::Value proto_written;
std::set<std::string> skip; std::set<std::string> skip;
skip.insert("online"); skip.insert("online");
skip.insert("error"); skip.insert("error");
if (!writeConf["config"].compareExcept(Storage["config"], skip)){ if (Storage["config"]["protocols"].compareExcept(proto_written, skip)){return;}
writeConf["config"].assignFrom(Storage["config"], skip); proto_written.assignFrom(Storage["config"]["protocols"], skip);
VERYHIGH_MSG("Saving new config because of edit in server config structure"); std::string temp = proto_written.toPacked();
changed = true; static IPC::sharedPage mistProtoOut(SHM_PROTO, temp.size()+100, true, false);
} mistProtoOut.close();
if (!writeConf["streams"].compareExcept(Storage["streams"], skip)){ mistProtoOut.init(SHM_PROTO, temp.size()+100, true, false);
writeConf["streams"].assignFrom(Storage["streams"], skip); if (!mistProtoOut.mapped){
VERYHIGH_MSG("Saving new config because of edit in streams"); FAIL_MSG("Could not open protocol config for writing! Is shared memory enabled on your system?");
changed = true;
}
if (writeConf["capabilities"] != capabilities){
writeConf["capabilities"] = capabilities;
VERYHIGH_MSG("Saving new config because of edit in capabilities");
changed = true;
}
if (!changed){return;}//cancel further processing if no changes
static IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, true);
if (!mistConfOut.mapped){
FAIL_MSG("Could not open config shared memory storage for writing! Is shared memory enabled on your system?");
return; return;
} }
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); // write config
//lock semaphore {
configLock.wait(); Util::RelAccX A(mistProtoOut.mapped, false);
//write config A.addField("dtsc_data", RAX_DTSC, temp.size());
std::string temp = writeConf.toPacked(); // write config
memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len)); memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size());
//unlock semaphore A.setRCount(1);
configLock.post(); A.setEndPos(1);
A.setReady();
}
}
void writeStream(const std::string & sName, const JSON::Value & sConf){
static std::map<std::string, JSON::Value> writtenStrms;
static std::map<std::string, IPC::sharedPage> pages;
static std::set<std::string> skip;
if (!skip.size()){
skip.insert("online");
skip.insert("error");
skip.insert("name");
}
if (sConf.isNull()){
writtenStrms.erase(sName);
pages.erase(sName);
return;
}
if (!writtenStrms.count(sName) || !writtenStrms[sName].compareExcept(sConf, skip)){
writtenStrms[sName].assignFrom(sConf, skip);
IPC::sharedPage & P = pages[sName];
std::string temp = writtenStrms[sName].toPacked();
P.close();
char tmpBuf[NAME_BUFFER_SIZE];
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, sName.c_str());
P.init(tmpBuf, temp.size()+100, true, false);
if (!P){
writtenStrms.erase(sName);
pages.erase(sName);
return;
}
Util::RelAccX A(P.mapped, false);
A.addField("dtsc_data", RAX_DTSC, temp.size());
// write config
memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size());
A.setRCount(1);
A.setEndPos(1);
A.setReady();
}
}
/// Writes the current config to shared memory to be used in other processes
void writeConfig(){
writeProtocols();
jsonForEach(Storage["streams"], it){
writeStream(it.key(), *it);
}
} }
} }

View file

@ -32,5 +32,8 @@ namespace Controller {
void initState(); void initState();
void deinitState(bool leaveBehind); void deinitState(bool leaveBehind);
void writeConfig(); void writeConfig();
void writeStream(const std::string & sName, const JSON::Value & sConf);
void writeCapabilities();
void writeProtocols();
} }

View file

@ -148,6 +148,7 @@ namespace Controller {
out[jit.key()]["name"] = jit.key(); out[jit.key()]["name"] = jit.key();
Log("STRM", std::string("New stream ") + jit.key()); Log("STRM", std::string("New stream ") + jit.key());
} }
Controller::writeStream(jit.key(), out[jit.key()]);
} }
} }
@ -240,6 +241,7 @@ namespace Controller {
} }
Log("STRM", std::string("Deleted stream ") + name); Log("STRM", std::string("Deleted stream ") + name);
out.removeMember(name); out.removeMember(name);
Controller::writeStream(name, JSON::Value());//Null JSON value = delete
} }
} //Controller namespace } //Controller namespace

View file

@ -298,10 +298,11 @@ namespace Mist {
bool Input::isAlwaysOn(){ bool Input::isAlwaysOn(){
bool ret = true; bool ret = true;
std::string strName = streamName.substr(0, (streamName.find_first_of("+ "))); std::string strName = streamName.substr(0, (streamName.find_first_of("+ ")));
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); char tmpBuf[NAME_BUFFER_SIZE];
configLock.wait(); snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str());
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName); Util::DTSCShmReader rStrmConf(tmpBuf);
DTSC::Scan streamCfg = rStrmConf.getScan();
if (streamCfg){ if (streamCfg){
if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){ if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){
ret = false; ret = false;
@ -311,7 +312,6 @@ namespace Mist {
ret = false; ret = false;
#endif #endif
} }
configLock.post();
return ret; return ret;
} }

View file

@ -739,13 +739,11 @@ namespace Mist {
std::string strName = config->getString("streamname"); std::string strName = config->getString("streamname");
Util::sanitizeName(strName); Util::sanitizeName(strName);
strName = strName.substr(0, (strName.find_first_of("+ "))); strName = strName.substr(0, (strName.find_first_of("+ ")));
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); char tmpBuf[NAME_BUFFER_SIZE];
if (!configLock.tryWaitOneSecond()){ snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, strName.c_str());
INFO_MSG("Aborting stream config refresh: locking took longer than expected"); Util::DTSCShmReader rStrmConf(tmpBuf);
return false; DTSC::Scan streamCfg = rStrmConf.getScan();
}
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName);
long long tmpNum; long long tmpNum;
//if stream is configured and setting is present, use it, always //if stream is configured and setting is present, use it, always

View file

@ -123,10 +123,8 @@ namespace Mist {
} }
//loop over the connectors //loop over the connectors
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); Util::DTSCShmReader rCapa(SHM_CAPA);
configLock.wait(); DTSC::Scan capa = rCapa.getMember("connectors");
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors");
unsigned int capa_ctr = capa.getSize(); unsigned int capa_ctr = capa.getSize();
for (unsigned int i = 0; i < capa_ctr; ++i){ for (unsigned int i = 0; i < capa_ctr; ++i){
DTSC::Scan c = capa.getIndice(i); DTSC::Scan c = capa.getIndice(i);
@ -159,14 +157,10 @@ namespace Mist {
Util::sanitizeName(streamname); Util::sanitizeName(streamname);
H.SetVar("stream", streamname); H.SetVar("stream", streamname);
} }
configLock.post();
configLock.close();
return capa.getIndiceName(i); return capa.getIndiceName(i);
} }
} }
} }
configLock.post();
configLock.close();
return ""; return "";
} }
@ -323,14 +317,16 @@ namespace Mist {
char * argarr[20]; char * argarr[20];
for (int i=0; i<20; i++){argarr[i] = 0;} for (int i=0; i<20; i++){argarr[i] = 0;}
int id = -1; int id = -1;
JSON::Value pipedCapa;
JSON::Value p;//properties of protocol
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait(); {
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); Util::DTSCShmReader rProto(SHM_PROTO);
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); DTSC::Scan prots = rProto.getScan();
unsigned int prots_ctr = prots.getSize(); unsigned int prots_ctr = prots.getSize();
JSON::Value p;//properties of protocol //find connector in config
for (unsigned int i=0; i < prots_ctr; ++i){ for (unsigned int i=0; i < prots_ctr; ++i){
if (prots.getIndice(i).getMember("connector").asString() == connector) { if (prots.getIndice(i).getMember("connector").asString() == connector) {
id = i; id = i;
@ -347,25 +343,23 @@ namespace Mist {
} }
if (id == -1) { if (id == -1) {
connector = connector.substr(0, connector.size() - 4); connector = connector.substr(0, connector.size() - 4);
DEBUG_MSG(DLVL_ERROR, "No connector found for: %s", connector.c_str()); ERROR_MSG("No connector found for: %s", connector.c_str());
configLock.post();
configLock.close();
return; return;
} }
} }
//read options from found connector //read options from found connector
p = prots.getIndice(id).asJSON(); p = prots.getIndice(id).asJSON();
DEBUG_MSG(DLVL_HIGH, "Connector found: %s", connector.c_str()); HIGH_MSG("Connector found: %s", connector.c_str());
Util::DTSCShmReader rCapa(SHM_CAPA);
DTSC::Scan capa = rCapa.getMember("connectors");
pipedCapa = capa.getMember(connector).asJSON();
}
//build arguments for starting output process //build arguments for starting output process
std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector; std::string tmparg = Util::getMyPath() + std::string("MistOut") + connector;
int argnum = 0; int argnum = 0;
argarr[argnum++] = (char*)tmparg.c_str(); argarr[argnum++] = (char*)tmparg.c_str();
JSON::Value pipedCapa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(connector).asJSON();
configLock.post();
configLock.close();
std::string temphost=getConnectedHost(); std::string temphost=getConnectedHost();
std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString(); std::string debuglevel = JSON::Value((long long)Util::Config::printDebugLevel).asString();
argarr[argnum++] = (char*)"--ip"; argarr[argnum++] = (char*)"--ip";

View file

@ -11,13 +11,13 @@ namespace Mist {
/// Helper function to find the protocol entry for a given port number /// Helper function to find the protocol entry for a given port number
std::string getProtocolForPort(uint16_t portNo){ std::string getProtocolForPort(uint16_t portNo){
std::string ret; std::string ret;
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); Util::DTSCShmReader rCapa(SHM_CAPA);
configLock.wait(); DTSC::Scan conns = rCapa.getMember("connectors");
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); Util::DTSCShmReader rProto(SHM_PROTO);
DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); DTSC::Scan prtcls = rProto.getScan();
unsigned int pro_cnt = prtcls.getSize(); unsigned int pro_cnt = prtcls.getSize();
for (unsigned int i = 0; i < pro_cnt; ++i){ for (unsigned int i = 0; i < pro_cnt; ++i){
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(prtcls.getIndice(i).getMember("connector").asString()); DTSC::Scan capa = conns.getMember(prtcls.getIndice(i).getMember("connector").asString());
uint16_t port = prtcls.getIndice(i).getMember("port").asInt(); uint16_t port = prtcls.getIndice(i).getMember("port").asInt();
//get the default port if none is set //get the default port if none is set
if (!port){ if (!port){
@ -28,7 +28,6 @@ namespace Mist {
break; break;
} }
} }
configLock.post();
if (ret.find(':') != std::string::npos){ if (ret.find(':') != std::string::npos){
ret.erase(ret.find(':')); ret.erase(ret.find(':'));
} }
@ -348,16 +347,6 @@ namespace Mist {
if (!myConn){ if (!myConn){
return json_resp; return json_resp;
} }
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE);
DTSC::Scan prots = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols");
if (!prots){
json_resp["error"] = "The specified stream is not available on this server.";
configLock.post();
configLock.close();
return json_resp;
}
bool hasVideo = false; bool hasVideo = false;
for (std::map<unsigned int, DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ for (std::map<unsigned int, DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
@ -396,6 +385,16 @@ namespace Mist {
} }
json_resp["meta"].removeMember("source"); json_resp["meta"].removeMember("source");
//Get sources/protocols information
Util::DTSCShmReader rCapa(SHM_CAPA);
DTSC::Scan connectors = rCapa.getMember("connectors");
Util::DTSCShmReader rProto(SHM_PROTO);
DTSC::Scan prots = rProto.getScan();
if (!prots || !connectors){
json_resp["error"] = "Server configuration unavailable at this time.";
return json_resp;
}
//create a set for storing source information //create a set for storing source information
std::set<JSON::Value, sourceCompare> sources; std::set<JSON::Value, sourceCompare> sources;
@ -408,7 +407,7 @@ namespace Mist {
//loop over the connectors. //loop over the connectors.
for (unsigned int i = 0; i < prots_ctr; ++i){ for (unsigned int i = 0; i < prots_ctr; ++i){
std::string cName = prots.getIndice(i).getMember("connector").asString(); std::string cName = prots.getIndice(i).getMember("connector").asString();
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember(cName); DTSC::Scan capa = connectors.getMember(cName);
//if the connector has a port, //if the connector has a port,
if (capa.getMember("optional").getMember("port")){ if (capa.getMember("optional").getMember("port")){
HTTP::URL outURL(reqHost); HTTP::URL outURL(reqHost);
@ -431,12 +430,11 @@ namespace Mist {
std::string cProv = capa.getMember("provides").asString(); std::string cProv = capa.getMember("provides").asString();
//if this connector can be depended upon by other connectors, loop over the rest //if this connector can be depended upon by other connectors, loop over the rest
//check each enabled protocol separately to see if it depends on this connector //check each enabled protocol separately to see if it depends on this connector
DTSC::Scan capa_lst = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors"); unsigned int capa_lst_ctr = connectors.getSize();
unsigned int capa_lst_ctr = capa_lst.getSize();
for (unsigned int j = 0; j < capa_lst_ctr; ++j){ for (unsigned int j = 0; j < capa_lst_ctr; ++j){
//if it depends on this connector and has a URL, list it //if it depends on this connector and has a URL, list it
if (conns.count(capa_lst.getIndiceName(j)) && capa_lst.getIndice(j).getMember("deps").asString() == cProv && capa_lst.getIndice(j).getMember("methods")){ if (conns.count(connectors.getIndiceName(j)) && connectors.getIndice(j).getMember("deps").asString() == cProv && connectors.getIndice(j).getMember("methods")){
JSON::Value subcapa_json = capa_lst.getIndice(j).asJSON(); JSON::Value subcapa_json = connectors.getIndice(j).asJSON();
addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"], useragent); addSources(streamName, sources, outURL, subcapa_json, json_resp["meta"], useragent);
} }
} }
@ -450,8 +448,6 @@ namespace Mist {
json_resp["source"].append(*it); json_resp["source"].append(*it);
} }
} }
configLock.post();
configLock.close();
return json_resp; return json_resp;
} }
@ -522,14 +518,13 @@ namespace Mist {
// send smil MBR index // send smil MBR index
if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".smil"){ if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".smil"){
std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; std::string reqHost = HTTP::URL(H.GetHeader("Host")).host;
std::string port, url_rel; std::string port, url_rel;
std::string trackSources;//this string contains all track sources for MBR smil
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); {
configLock.wait(); Util::DTSCShmReader rProto(SHM_PROTO);
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); DTSC::Scan prtcls = rProto.getScan();
DTSC::Scan prtcls = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("config").getMember("protocols"); Util::DTSCShmReader rCapa(SHM_CAPA);
DTSC::Scan capa = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("capabilities").getMember("connectors").getMember("RTMP"); DTSC::Scan capa = rCapa.getMember("connectors").getMember("RTMP");
unsigned int pro_cnt = prtcls.getSize(); unsigned int pro_cnt = prtcls.getSize();
for (unsigned int i = 0; i < pro_cnt; ++i){ for (unsigned int i = 0; i < pro_cnt; ++i){
if (prtcls.getIndice(i).getMember("connector").asString() != "RTMP"){ if (prtcls.getIndice(i).getMember("connector").asString() != "RTMP"){
@ -547,7 +542,6 @@ namespace Mist {
} }
} }
std::string trackSources;//this string contains all track sources for MBR smil
initialize(); initialize();
if (!myConn){return;} if (!myConn){return;}
for (std::map<unsigned int, DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ for (std::map<unsigned int, DTSC::Track>::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){
@ -555,8 +549,7 @@ namespace Mist {
trackSources += " <video src='"+ streamName + "?track=" + JSON::Value((long long)trit->first).asString() + "' height='" + JSON::Value((long long)trit->second.height).asString() + "' system-bitrate='" + JSON::Value((long long)trit->second.bps).asString() + "' width='" + JSON::Value((long long)trit->second.width).asString() + "' />\n"; trackSources += " <video src='"+ streamName + "?track=" + JSON::Value((long long)trit->first).asString() + "' height='" + JSON::Value((long long)trit->second.height).asString() + "' system-bitrate='" + JSON::Value((long long)trit->second.bps).asString() + "' width='" + JSON::Value((long long)trit->second.width).asString() + "' />\n";
} }
} }
configLock.post(); }
configLock.close();
H.Clean(); H.Clean();
H.SetHeader("Content-Type", "application/smil"); H.SetHeader("Content-Type", "application/smil");