Added global configuration mechanism and defaultStream support

This commit is contained in:
Thulinma 2019-10-29 17:15:08 +01:00
parent 50d1d0e944
commit 7bffdfe644
9 changed files with 130 additions and 3 deletions

View file

@ -133,6 +133,7 @@ static inline void show_stackframe(){}
#define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name #define SHM_STREAM_INDEX "MstSTRM%s" //%s stream name
#define SHM_STREAM_STATE "MstSTATE%s" //%s stream name #define SHM_STREAM_STATE "MstSTATE%s" //%s stream name
#define SHM_STREAM_CONF "MstSCnf%s" //%s stream name #define SHM_STREAM_CONF "MstSCnf%s" //%s stream name
#define SHM_GLOBAL_CONF "MstGlobalConfig"
#define STRMSTAT_OFF 0 #define STRMSTAT_OFF 0
#define STRMSTAT_INIT 1 #define STRMSTAT_INIT 1
#define STRMSTAT_BOOT 2 #define STRMSTAT_BOOT 2

View file

@ -205,12 +205,44 @@ JSON::Value Util::getStreamConfig(const std::string &streamname){
Util::DTSCShmReader rStrmConf(tmpBuf); Util::DTSCShmReader rStrmConf(tmpBuf);
DTSC::Scan stream_cfg = rStrmConf.getScan(); DTSC::Scan stream_cfg = rStrmConf.getScan();
if (!stream_cfg){ if (!stream_cfg){
WARN_MSG("Could not get stream '%s' config!", smp.c_str()); if (!Util::getGlobalConfig("defaultStream")){
WARN_MSG("Could not get stream '%s' config!", smp.c_str());
}else{
INFO_MSG("Could not get stream '%s' config, not emitting WARN message because fallback is configured", smp.c_str());
}
return result; return result;
} }
return stream_cfg.asJSON(); return stream_cfg.asJSON();
} }
JSON::Value Util::getGlobalConfig(const std::string &optionName){
IPC::sharedPage globCfg(SHM_GLOBAL_CONF);
if (!globCfg.mapped){
FAIL_MSG("Could not open global configuration options to read setting for '%s'", optionName.c_str());
return JSON::Value();
}
Util::RelAccX cfgData(globCfg.mapped);
if (!cfgData.isReady()){
FAIL_MSG("Global configuration options not ready; cannot read setting for '%s'", optionName.c_str());
return JSON::Value();
}
Util::RelAccXFieldData dataField = cfgData.getFieldData(optionName);
switch (dataField.type & 0xF0){
case RAX_INT:
case RAX_UINT:
//Integer types, return JSON::Value integer
return JSON::Value(cfgData.getInt(dataField));
case RAX_RAW:
case RAX_STRING:
//String types, return JSON::Value string
return JSON::Value(std::string(cfgData.getPointer(dataField), cfgData.getSize(optionName)));
default:
//Unimplemented types
FAIL_MSG("Global configuration setting for '%s' is not an implemented datatype!", optionName.c_str());
return JSON::Value();
}
}
DTSC::Meta Util::getStreamMeta(const std::string &streamname){ DTSC::Meta Util::getStreamMeta(const std::string &streamname){
DTSC::Meta ret; DTSC::Meta ret;
char pageId[NAME_BUFFER_SIZE]; char pageId[NAME_BUFFER_SIZE];

View file

@ -17,6 +17,7 @@ namespace Util {
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false, const std::map<std::string, std::string> & overrides = std::map<std::string, std::string>(), pid_t * spawn_pid = NULL); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false, const std::map<std::string, std::string> & overrides = std::map<std::string, std::string>(), pid_t * spawn_pid = NULL);
int startPush(const std::string & streamname, std::string & target); int startPush(const std::string & streamname, std::string & target);
JSON::Value getStreamConfig(const std::string & streamname); JSON::Value getStreamConfig(const std::string & streamname);
JSON::Value getGlobalConfig(const std::string & optionName);
JSON::Value getInputBySource(const std::string & filename, bool isProvider = false); JSON::Value getInputBySource(const std::string & filename, bool isProvider = false);
DTSC::Meta getStreamMeta(const std::string & streamname); DTSC::Meta getStreamMeta(const std::string & streamname);
uint8_t getStreamStatus(const std::string & streamname); uint8_t getStreamStatus(const std::string & streamname);

View file

@ -494,6 +494,9 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response
out["prometheus"] = in["prometheus"]; out["prometheus"] = in["prometheus"];
Controller::prometheus = out["prometheus"].asStringRef(); Controller::prometheus = out["prometheus"].asStringRef();
} }
if (in.isMember("defaultStream")){
out["defaultStream"] = in["defaultStream"];
}
} }
if (Request.isMember("bandwidth")){ if (Request.isMember("bandwidth")){
if (Request["bandwidth"].isObject()){ if (Request["bandwidth"].isObject()){

View file

@ -160,6 +160,13 @@ namespace Controller {
trgs["LIVE_BANDWIDTH"]["response"] = "always"; trgs["LIVE_BANDWIDTH"]["response"] = "always";
trgs["LIVE_BANDWIDTH"]["response_action"] = "If false, shuts down the stream buffer."; trgs["LIVE_BANDWIDTH"]["response_action"] = "If false, shuts down the stream buffer.";
trgs["LIVE_BANDWIDTH"]["argument"] = "Triggers only if current bytes per second exceeds this amount (integer)"; trgs["LIVE_BANDWIDTH"]["argument"] = "Triggers only if current bytes per second exceeds this amount (integer)";
trgs["DEFAULT_STREAM"]["when"] = "When any user attempts to open a stream that cannot be opened (because it is either offline or not configured), allows rewriting the stream to a different one as fallback. Supports variable substitution.";
trgs["DEFAULT_STREAM"]["stream_specific"] = true;
trgs["DEFAULT_STREAM"]["payload"] = "current defaultStream setting (string)\nrequested stream name (string)\nviewer host (string)\noutput type (string)\nfull request URL (string, may be blank for non-URL-based requests!)";
trgs["DEFAULT_STREAM"]["response"] = "always";
trgs["DEFAULT_STREAM"]["response_action"] = "Overrides the default stream setting (for this view) to the response value. If empty, fails loading the stream and returns an error to the viewer/user.";
} }
///Aquire list of available protocols, storing in global 'capabilities' JSON::Value. ///Aquire list of available protocols, storing in global 'capabilities' JSON::Value.

View file

@ -334,6 +334,27 @@ namespace Controller{
writeStream(it.key(), *it); writeStream(it.key(), *it);
} }
{
//Global configuration options, if any
IPC::sharedPage globCfg;
globCfg.init(SHM_GLOBAL_CONF, 4096, false, false);
if (!globCfg.mapped){
globCfg.init(SHM_GLOBAL_CONF, 4096, true, false);
}
if (globCfg.mapped){
Util::RelAccX globAccX(globCfg.mapped, false);
if (!globAccX.isReady()){
globAccX.addField("defaultStream", RAX_128STRING);
globAccX.setRCount(1);
globAccX.setEndPos(1);
globAccX.setReady();
}
globAccX.setString("defaultStream", Storage["config"]["defaultStream"].asStringRef());
globCfg.master = false;//leave the page after closing
}
}
/*LTS-START*/ /*LTS-START*/
static std::map<std::string, IPC::sharedPage> pageForType; // should contain one page for every trigger type static std::map<std::string, IPC::sharedPage> pageForType; // should contain one page for every trigger type
static JSON::Value writtenTrigs; static JSON::Value writtenTrigs;

View file

@ -372,8 +372,31 @@ namespace Mist{
} }
}else{ }else{
if (!Util::startInput(streamName, "", true, isPushing())){ if (!Util::startInput(streamName, "", true, isPushing())){
onFail("Stream open failed", true); JSON::Value defStrmJson = Util::getGlobalConfig("defaultStream");
return; std::string defStrm = defStrmJson.asString();
if(Triggers::shouldTrigger("DEFAULT_STREAM", streamName)){
std::string payload = defStrm+"\n"+streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
//The return value is ignored, because the response (defStrm in this case) tells us what to do next, if anything.
Triggers::doTrigger("DEFAULT_STREAM", payload, streamName, false, defStrm);
}
if (!defStrm.size()){
onFail("Stream open failed", true);
return;
}
std::string newStrm = defStrm;
Util::streamVariables(newStrm, streamName, "");
if (streamName == newStrm){
onFail("Stream open failed; nothing to fall back to ("+defStrm+" == "+newStrm+")", true);
return;
}
INFO_MSG("Stream open failed; falling back to default stream '%s' -> '%s'", defStrm.c_str(), newStrm.c_str());
std::string origStream = streamName;
streamName = newStrm;
Util::Config::streamName = streamName;
if (!Util::startInput(streamName, "", true, isPushing())){
onFail("Stream open failed (fallback stream for '"+origStream+"')", true);
return;
}
} }
} }
disconnect(); disconnect();

View file

@ -6,6 +6,7 @@
#include "flashPlayer.h" #include "flashPlayer.h"
#include "oldFlashPlayer.h" #include "oldFlashPlayer.h"
#include <mist/websocket.h> #include <mist/websocket.h>
#include <mist/triggers.h>
namespace Mist { namespace Mist {
/// Helper function to find the protocol entry for a given port number /// Helper function to find the protocol entry for a given port number
@ -358,8 +359,36 @@ namespace Mist {
if (config->getString("nostreamtext") != ""){ if (config->getString("nostreamtext") != ""){
json_resp["on_error"] = config->getString("nostreamtext"); json_resp["on_error"] = config->getString("nostreamtext");
} }
//Make note of any defaultStream-based redirection
if (origStreamName.size() && origStreamName != streamName){
json_resp["redirected"].append(origStreamName);
json_resp["redirected"].append(streamName);
}
uint8_t streamStatus = Util::getStreamStatus(streamName); uint8_t streamStatus = Util::getStreamStatus(streamName);
if (streamStatus != STRMSTAT_READY){ if (streamStatus != STRMSTAT_READY){
//If we haven't rewritten the stream name yet to a fallback, attempt to do so
if (origStreamName == streamName){
JSON::Value defStrmJson = Util::getGlobalConfig("defaultStream");
std::string defStrm = defStrmJson.asString();
if(Triggers::shouldTrigger("DEFAULT_STREAM", streamName)){
std::string payload = defStrm+"\n"+streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
//The return value is ignored, because the response (defStrm in this case) tells us what to do next, if anything.
Triggers::doTrigger("DEFAULT_STREAM", payload, streamName, false, defStrm);
}
if (defStrm.size()){
std::string newStrm = defStrm;
Util::streamVariables(newStrm, streamName, "");
if (streamName != newStrm){
INFO_MSG("Falling back to default stream '%s' -> '%s'", defStrm.c_str(), newStrm.c_str());
origStreamName = streamName;
streamName = newStrm;
Util::Config::streamName = streamName;
reconnect();
return getStatusJSON(reqHost, useragent);
}
}
origStreamName.clear();//no fallback, don't check again
}
switch (streamStatus){ switch (streamStatus){
case STRMSTAT_OFF: case STRMSTAT_OFF:
json_resp["error"] = "Stream is offline"; json_resp["error"] = "Stream is offline";
@ -504,6 +533,7 @@ namespace Mist {
} }
void OutHTTP::onHTTP(){ void OutHTTP::onHTTP(){
origStreamName = streamName;
std::string method = H.method; std::string method = H.method;
//Handle certbot validations //Handle certbot validations
@ -907,6 +937,8 @@ namespace Mist {
Util::startInput(streamName, "", true, false); Util::startInput(streamName, "", true, false);
char pageName[NAME_BUFFER_SIZE]; char pageName[NAME_BUFFER_SIZE];
std::string currStreamName;
currStreamName = streamName;
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str()); snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
IPC::sharedPage streamStatus(pageName, 1, false, false); IPC::sharedPage streamStatus(pageName, 1, false, false);
uint8_t prevState, newState, metaCounter; uint8_t prevState, newState, metaCounter;
@ -925,6 +957,11 @@ namespace Mist {
disconnect(); disconnect();
} }
JSON::Value resp = getStatusJSON(reqHost, useragent); JSON::Value resp = getStatusJSON(reqHost, useragent);
if (currStreamName != streamName){
currStreamName = streamName;
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_STATE, streamName.c_str());
streamStatus.close();
}
ws.sendFrame(resp.toString()); ws.sendFrame(resp.toString());
prevState = newState; prevState = newState;
}else{ }else{

View file

@ -20,6 +20,8 @@ namespace Mist {
virtual bool onFinish(){ virtual bool onFinish(){
return stayConnected; return stayConnected;
} }
private:
std::string origStreamName;
}; };
} }