Robustify accesses to server config

This commit is contained in:
Thulinma 2018-11-11 03:59:23 +01:00
parent 6032f236d2
commit 98e3940079
15 changed files with 320 additions and 266 deletions

View file

@ -69,43 +69,28 @@ void createAccount(std::string account){
}
/// Status monitoring thread.
/// Will check outputs, inputs and converters every five seconds
/// Will check outputs, inputs and converters every three seconds
void statusMonitor(void *np){
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
Controller::loadActiveConnectors();
while (Controller::conf.is_active){
// this scope prevents the configMutex from being locked constantly
{
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
bool changed = false;
// checks online protocols, reports changes to status
changed |= Controller::CheckProtocols(Controller::Storage["config"]["protocols"],
Controller::capabilities);
if (Controller::CheckProtocols(Controller::Storage["config"]["protocols"],
Controller::capabilities)){
Controller::writeProtocols();
}
// checks stream statuses, reports changes to status
changed |= Controller::CheckAllStreams(Controller::Storage["streams"]);
// check if the config semaphore is stuck, by trying to lock it for 5 attempts of 1 second...
if (!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond() &&
!configLock.tryWaitOneSecond() && !configLock.tryWaitOneSecond()){
// that failed. We now unlock it, no matter what - and print a warning that it was stuck.
WARN_MSG("Configuration semaphore was stuck. Force-unlocking it and re-writing config.");
changed = true;
}
configLock.unlink();
configLock.open(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (changed || Controller::configChanged){
Controller::writeConfig();
Controller::configChanged = false;
}
Controller::CheckAllStreams(Controller::Storage["streams"]);
}
Util::sleep(5000); // wait at least 5 seconds
Util::sleep(3000); // wait at least 3 seconds
}
if (Controller::restarting){
Controller::prepareActiveConnectorsForReload();
}else{
Controller::prepareActiveConnectorsForShutdown();
}
configLock.unlink();
}
static unsigned long mix(unsigned long a, unsigned long b, unsigned long c){
@ -316,13 +301,10 @@ int main_loop(int argc, char **argv){
Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog");
Controller::prometheus = Controller::Storage["config"]["prometheus"].asStringRef();
Controller::accesslog = Controller::Storage["config"]["accesslog"].asStringRef();
{
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.unlink();
}
Controller::writeConfig();
Controller::checkAvailProtocols();
Controller::checkAvailTriggers();
Controller::writeCapabilities();
Controller::updateBandwidthConfig();
createAccount(Controller::conf.getString("account"));
Controller::conf.activate(); // activate early, so threads aren't killed.

View file

@ -27,12 +27,6 @@ namespace Controller {
trgs["SYSTEM_STOP"]["response"] = "always";
trgs["SYSTEM_STOP"]["response_action"] = "If false, aborts shutdown.";
trgs["SYSTEM_CONFIG"]["when"] = "Every time MistServer's global configuration changes";
trgs["SYSTEM_CONFIG"]["stream_specific"] = false;
trgs["SYSTEM_CONFIG"]["payload"] = "newly active configuration (JSON)";
trgs["SYSTEM_CONFIG"]["response"] = "ignored";
trgs["SYSTEM_CONFIG"]["response_action"] = "None.";
trgs["OUTPUT_START"]["when"] = "Before a connector starts listening for connections";
trgs["OUTPUT_START"]["stream_specific"] = false;
trgs["OUTPUT_START"]["payload"] = "connector configuration (JSON)";

View file

@ -1580,9 +1580,9 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
#if !defined(__CYGWIN__) && !defined(_WIN32)
{
struct statvfs shmd;
IPC::sharedPage tmpConf(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false);
if (tmpConf.mapped && tmpConf.handle){
fstatvfs(tmpConf.handle, &shmd);
IPC::sharedPage tmpCapa(SHM_CAPA, DEFAULT_CONF_PAGE_SIZE, false, false);
if (tmpCapa.mapped && tmpCapa.handle){
fstatvfs(tmpCapa.handle, &shmd);
shm_free = (shmd.f_bfree*shmd.f_frsize)/1024;
shm_total = (shmd.f_blocks*shmd.f_frsize)/1024;
}

View file

@ -218,6 +218,106 @@ namespace Controller{
}
}
void writeCapabilities(){
std::string temp = capabilities.toPacked();
static IPC::sharedPage mistCapaOut(SHM_CAPA, temp.size()+100, true, false);
if (!mistCapaOut.mapped){
FAIL_MSG("Could not open capabilities config for writing! Is shared memory enabled on your system?");
return;
}
Util::RelAccX A(mistCapaOut.mapped, false);
A.addField("dtsc_data", RAX_DTSC, temp.size());
// write config
memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size());
A.setRCount(1);
A.setEndPos(1);
A.setReady();
}
void writeProtocols(){
static std::string proxy_written;
if (proxy_written != Storage["config"]["trustedproxy"].asStringRef()){
proxy_written = Storage["config"]["trustedproxy"].asStringRef();
static IPC::sharedPage mistProxOut(SHM_PROXY, proxy_written.size()+100, true, false);
mistProxOut.close();
mistProxOut.init(SHM_PROXY, proxy_written.size()+100, true, false);
if (!mistProxOut.mapped){
FAIL_MSG("Could not open trusted proxy config for writing! Is shared memory enabled on your system?");
return;
}else{
Util::RelAccX A(mistProxOut.mapped, false);
A.addField("proxy_data", RAX_STRING, proxy_written.size());
// write config
memcpy(A.getPointer("proxy_data"), proxy_written.data(), proxy_written.size());
A.setRCount(1);
A.setEndPos(1);
A.setReady();
}
}
static JSON::Value proto_written;
std::set<std::string> skip;
skip.insert("online");
skip.insert("error");
if (Storage["config"]["protocols"].compareExcept(proto_written, skip)){return;}
proto_written.assignFrom(Storage["config"]["protocols"], skip);
std::string temp = proto_written.toPacked();
static IPC::sharedPage mistProtoOut(SHM_PROTO, temp.size()+100, true, false);
mistProtoOut.close();
mistProtoOut.init(SHM_PROTO, temp.size()+100, true, false);
if (!mistProtoOut.mapped){
FAIL_MSG("Could not open protocol config for writing! Is shared memory enabled on your system?");
return;
}
// write config
{
Util::RelAccX A(mistProtoOut.mapped, false);
A.addField("dtsc_data", RAX_DTSC, temp.size());
// write config
memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size());
A.setRCount(1);
A.setEndPos(1);
A.setReady();
}
}
void writeStream(const std::string & sName, const JSON::Value & sConf){
static std::map<std::string, JSON::Value> writtenStrms;
static std::map<std::string, IPC::sharedPage> pages;
static std::set<std::string> skip;
if (!skip.size()){
skip.insert("online");
skip.insert("error");
skip.insert("name");
}
if (sConf.isNull()){
writtenStrms.erase(sName);
pages.erase(sName);
return;
}
if (!writtenStrms.count(sName) || !writtenStrms[sName].compareExcept(sConf, skip)){
writtenStrms[sName].assignFrom(sConf, skip);
IPC::sharedPage & P = pages[sName];
std::string temp = writtenStrms[sName].toPacked();
P.close();
char tmpBuf[NAME_BUFFER_SIZE];
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_STREAM_CONF, sName.c_str());
P.init(tmpBuf, temp.size()+100, true, false);
if (!P){
writtenStrms.erase(sName);
pages.erase(sName);
return;
}
Util::RelAccX A(P.mapped, false);
A.addField("dtsc_data", RAX_DTSC, temp.size());
// write config
memcpy(A.getPointer("dtsc_data"), temp.data(), temp.size());
A.setRCount(1);
A.setEndPos(1);
A.setReady();
}
}
/// Writes the current config to shared memory to be used in other processes
/// \triggers
/// The `"SYSTEM_START"` trigger is global, and is ran as soon as the server configuration is first stable. It has no payload. If cancelled,
@ -226,41 +326,10 @@ namespace Controller{
/// The `"SYSTEM_CONFIG"` trigger is global, and is ran every time the server configuration is updated. Its payload is the new configuration in
/// JSON format. This trigger cannot be cancelled.
void writeConfig(){
static JSON::Value writeConf;
bool changed = false;
std::set<std::string> skip;
skip.insert("online");
skip.insert("error");
if (!writeConf["config"].compareExcept(Storage["config"], skip)){
writeConf["config"].assignFrom(Storage["config"], skip);
VERYHIGH_MSG("Saving new config because of edit in server config structure");
changed = true;
writeProtocols();
jsonForEach(Storage["streams"], it){
writeStream(it.key(), *it);
}
if (!writeConf["streams"].compareExcept(Storage["streams"], skip)){
writeConf["streams"].assignFrom(Storage["streams"], skip);
VERYHIGH_MSG("Saving new config because of edit in streams");
changed = true;
}
if (writeConf["capabilities"] != capabilities){
writeConf["capabilities"] = capabilities;
VERYHIGH_MSG("Saving new config because of edit in capabilities");
changed = true;
}
if (!changed){return;}// cancel further processing if no changes
static IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, true);
if (!mistConfOut.mapped){
FAIL_MSG("Could not open config shared memory storage for writing! Is shared memory enabled on your system?");
return;
}
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
// lock semaphore
configLock.wait();
// write config
std::string temp = writeConf.toPacked();
memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len));
// unlock semaphore
configLock.post();
/*LTS-START*/
static std::map<std::string, IPC::sharedPage> pageForType; // should contain one page for every trigger type
@ -269,8 +338,8 @@ namespace Controller{
// for all shm pages that hold triggers
pageForType.clear();
if (writeConf["config"]["triggers"].size()){
jsonForEach(writeConf["config"]["triggers"], it){
if (Storage["config"]["triggers"].size()){
jsonForEach(Storage["config"]["triggers"], it){
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_TRIGGER, (it.key()).c_str());
pageForType[it.key()].init(tmpBuf, 32 * 1024, true, false);
Util::RelAccX tPage(pageForType[it.key()].mapped, false);
@ -349,10 +418,6 @@ namespace Controller{
if (!Triggers::doTrigger("SYSTEM_START")){conf.is_active = false;}
serverStartTriggered = true;
}
if (Triggers::shouldTrigger("SYSTEM_CONFIG")){
std::string payload = writeConf.toString();
Triggers::doTrigger("SYSTEM_CONFIG", payload);
}
/*LTS-END*/
}
}

View file

@ -34,5 +34,8 @@ namespace Controller {
void initState();
void deinitState(bool leaveBehind);
void writeConfig();
void writeStream(const std::string & sName, const JSON::Value & sConf);
void writeCapabilities();
void writeProtocols();
}

View file

@ -208,6 +208,7 @@ namespace Controller {
out[jit.key()]["name"] = jit.key();
Log("STRM", std::string("New stream ") + jit.key());
}
Controller::writeStream(jit.key(), out[jit.key()]);
}
}
@ -360,6 +361,7 @@ namespace Controller {
/*LTS-END*/
Log("STRM", "Deleted stream " + name);
out.removeMember(name);
Controller::writeStream(name, JSON::Value());//Null JSON value = delete
++ret;
ret *= -1;
if (inputProcesses.count(name)){