Updated triggers, implemented LIVE_BANDWIDTH trigger

This commit is contained in:
Thulinma 2017-01-19 15:07:36 +01:00
parent 6a68d86a0e
commit cab87a6425
6 changed files with 319 additions and 324 deletions

View file

@ -1,17 +1,18 @@
#include <sys/stat.h>
#include <iostream>
#include <fstream>
#include <algorithm>
#include <mist/timing.h>
#include <mist/shared_memory.h>
#include <mist/defines.h>
#include "controller_storage.h"
#include "controller_capabilities.h"
#include <mist/triggers.h>//LTS
#include <algorithm>
#include <fstream>
#include <iostream>
#include <mist/defines.h>
#include <mist/shared_memory.h>
#include <mist/timing.h>
#include <mist/triggers.h> //LTS
#include <mist/util.h> //LTS
#include <sys/stat.h>
///\brief Holds everything unique to the controller.
namespace Controller {
std::string instanceId; ///instanceId (previously uniqId) is first set in controller.cpp before licensing or update calls.
namespace Controller{
std::string instanceId; /// instanceId (previously uniqId) is first set in controller.cpp before licensing or update calls.
Util::Config conf;
JSON::Value Storage; ///< Global storage of data.
tthread::mutex configMutex;
@ -30,13 +31,13 @@ namespace Controller {
m.append(kind);
m.append(message);
Storage["log"].append(m);
Storage["log"].shrink(100); //limit to 100 log messages
Storage["log"].shrink(100); // limit to 100 log messages
time_t rawtime;
struct tm * timeinfo;
char buffer [100];
time (&rawtime);
timeinfo = localtime (&rawtime);
strftime(buffer,100,"%F %H:%M:%S",timeinfo);
struct tm *timeinfo;
char buffer[100];
time(&rawtime);
timeinfo = localtime(&rawtime);
strftime(buffer, 100, "%F %H:%M:%S", timeinfo);
std::cout << "[" << buffer << "] " << kind << ": " << message << std::endl;
logCounter++;
}
@ -51,27 +52,23 @@ namespace Controller {
File.close();
return File.good();
}
/// Handles output of a Mist application, detecting and catching debug messages.
/// Debug messages are automatically converted into Log messages.
/// Closes the file descriptor on read error.
/// \param err File descriptor of the stderr output of the process to monitor.
void handleMsg(void * err){
void handleMsg(void *err){
char buf[1024];
FILE * output = fdopen((long long int)err, "r");
FILE *output = fdopen((long long int)err, "r");
while (fgets(buf, 1024, output)){
unsigned int i = 0;
while (i < 9 && buf[i] != '|' && buf[i] != 0){
++i;
}
while (i < 9 && buf[i] != '|' && buf[i] != 0){++i;}
unsigned int j = i;
while (j < 1024 && buf[j] != '\n' && buf[j] != 0){
++j;
}
while (j < 1024 && buf[j] != '\n' && buf[j] != 0){++j;}
buf[j] = 0;
if(i < 9){
if (i < 9){
buf[i] = 0;
Log(buf,buf+i+1);
Log(buf, buf + i + 1);
}else{
printf("%s", buf);
}
@ -80,12 +77,14 @@ namespace Controller {
fclose(output);
close((long long int)err);
}
/// Writes the current config to shared memory to be used in other processes
/// \triggers
/// The `"SYSTEM_START"` trigger is global, and is ran as soon as the server configuration is first stable. It has no payload. If cancelled, the system immediately shuts down again.
/// \triggers
/// The `"SYSTEM_START"` trigger is global, and is ran as soon as the server configuration is first stable. It has no payload. If cancelled,
/// the system immediately shuts down again.
/// \n
/// The `"SYSTEM_CONFIG"` trigger is global, and is ran every time the server configuration is updated. Its payload is the new configuration in JSON format. This trigger cannot be cancelled.
/// The `"SYSTEM_CONFIG"` trigger is global, and is ran every time the server configuration is updated. Its payload is the new configuration in
/// JSON format. This trigger cannot be cancelled.
void writeConfig(){
static JSON::Value writeConf;
bool changed = false;
@ -104,7 +103,7 @@ namespace Controller {
VERYHIGH_MSG("Saving new config because of edit in capabilities");
changed = true;
}
if (!changed){return;}//cancel further processing if no changes
if (!changed){return;}// cancel further processing if no changes
static IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, true);
if (!mistConfOut.mapped){
@ -112,90 +111,99 @@ namespace Controller {
return;
}
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
//lock semaphore
// lock semaphore
configLock.wait();
//write config
// write config
std::string temp = writeConf.toPacked();
memcpy(mistConfOut.mapped, temp.data(), std::min(temp.size(), (size_t)mistConfOut.len));
//unlock semaphore
// unlock semaphore
configLock.post();
/*LTS-START*/
static std::map<std::string,IPC::sharedPage> pageForType; //should contain one page for every trigger type
static std::map<std::string, IPC::sharedPage> pageForType; // should contain one page for every trigger type
char tmpBuf[NAME_BUFFER_SIZE];
//for all shm pages that hold triggers
// for all shm pages that hold triggers
pageForType.clear();
if( writeConf["config"]["triggers"].size() ){//if triggers are defined...
jsonForEach(writeConf["config"]["triggers"], it){//for all types defined in config
snprintf(tmpBuf,NAME_BUFFER_SIZE,SHM_TRIGGER,(it.key()).c_str()); //create page
pageForType[it.key()].init(tmpBuf, 8*1024, true, false);// todo: should this be false/why??
char * bytePos=pageForType[it.key()].mapped;
//write data to page
jsonForEach(*it, triggIt){ //for all defined
unsigned int tmpUrlSize=(*triggIt)[(unsigned int) 0].asStringRef().size();
unsigned int tmpStreamNames=0;// (*triggIt)[2ul].packedSize();
std::string namesArray="";
if( (triggIt->size() >= 3) && (*triggIt)[2ul].size()){
jsonForEach((*triggIt)[2ul], shIt){
unsigned int tmpLen=shIt->asString().size();
tmpStreamNames+= 4+tmpLen;
//INFO_MSG("adding string: %s len: %d", shIt->asString().c_str() , tmpLen );
((unsigned int*)tmpBuf)[0] = tmpLen; //NOTE: namesArray may be replaced by writing directly to tmpBuf.
namesArray.append(tmpBuf,4);
namesArray.append(shIt->asString());
if (writeConf["config"]["triggers"].size()){
jsonForEach(writeConf["config"]["triggers"], it){
snprintf(tmpBuf, NAME_BUFFER_SIZE, SHM_TRIGGER, (it.key()).c_str());
pageForType[it.key()].init(tmpBuf, 32 * 1024, true, false);
Util::RelAccX tPage(pageForType[it.key()].mapped, false);
tPage.addField("url", RAX_128STRING);
tPage.addField("sync", RAX_UINT);
tPage.addField("streams", RAX_256RAW);
tPage.addField("params", RAX_128STRING);
tPage.setReady();
uint32_t i = 0;
uint32_t max = (32 * 1024 - tPage.getOffset()) / tPage.getRSize();
// write data to page
jsonForEach(*it, triggIt){
if (i >= max){
ERROR_MSG("Not all %s triggers fit on the memory page!", (it.key()).c_str());
break;
}
if (triggIt->isArray()){
tPage.setString("url", (*triggIt)[0u].asStringRef(), i);
tPage.setInt("sync", ((*triggIt)[1u].asBool() ? 1 : 0), i);
char *strmP = tPage.getPointer("streams", i);
if (strmP){
((unsigned int *)strmP)[0] = 0; // reset first 4 bytes of stream list pointer
if ((triggIt->size() >= 3) && (*triggIt)[2u].size()){
std::string namesArray;
jsonForEach((*triggIt)[2u], shIt){
((unsigned int *)tmpBuf)[0] = shIt->asString().size();
namesArray.append(tmpBuf, 4);
namesArray.append(shIt->asString());
}
if (namesArray.size()){memcpy(strmP, namesArray.data(), std::min(namesArray.size(), (size_t)256));}
}
}
}
unsigned int totalLen=9+tmpUrlSize+tmpStreamNames; //4Btotal len, 4Burl len ,XB tmpurl, 1B sync , XB tmpstreamnames
if(totalLen > (pageForType[it.key()].len-(bytePos-pageForType[it.key()].mapped)) ){ //check if totalLen fits on the page
ERROR_MSG("trigger does not fit on page. size: %d bytes left on page: %d skipping.",totalLen,(pageForType[it.key()].len-(bytePos-pageForType[it.key()].mapped))); //doesnt fit
continue;
}
((unsigned int*)bytePos)[0] = totalLen;
bytePos+=4;
((unsigned int*)bytePos)[0] = tmpUrlSize;
bytePos+=4;
memcpy(bytePos, (*triggIt)[(unsigned int) 0].asStringRef().data(), (*triggIt)[(unsigned int) 0].asStringRef().size());
bytePos+=(*triggIt)[(unsigned int) 0].asStringRef().size();
(bytePos++)[0] = (*triggIt)[1ul].asBool() ? '\001' : '\000';
if(tmpStreamNames){
memcpy(bytePos,namesArray.data(),tmpStreamNames); //contains a string of 4Blen,XBstring pairs
bytePos+=tmpStreamNames;
if (triggIt->isObject()){
if (!triggIt->isMember("handler")){continue;}
tPage.setString("url", (*triggIt)["handler"].asStringRef(), i);
tPage.setInt("sync", ((*triggIt)["sync"].asBool() ? 1 : 0), i);
char *strmP = tPage.getPointer("streams", i);
if (strmP){
((unsigned int *)strmP)[0] = 0; // reset first 4 bytes of stream list pointer
if ((triggIt->isMember("streams")) && (*triggIt)["streams"].size()){
std::string namesArray;
jsonForEach((*triggIt)["streams"], shIt){
((unsigned int *)tmpBuf)[0] = shIt->asString().size();
namesArray.append(tmpBuf, 4);
namesArray.append(shIt->asString());
}
if (namesArray.size()){memcpy(strmP, namesArray.data(), std::min(namesArray.size(), (size_t)256));}
}
}
if (triggIt->isMember("params")){
tPage.setString("params", (*triggIt)["params"].asStringRef(), i);
}else{
tPage.setString("params", "", i);
}
}
++i;
}
}
tPage.setRCount(std::min(i, max));
}
}
static bool serverStartTriggered;
if(!serverStartTriggered){
if (!Triggers::doTrigger("SYSTEM_START")){
conf.is_active = false;
static bool serverStartTriggered;
if (!serverStartTriggered){
if (!Triggers::doTrigger("SYSTEM_START")){conf.is_active = false;}
serverStartTriggered++;
}
serverStartTriggered++;
}
if (Triggers::shouldTrigger("SYSTEM_CONFIG")){
std::string payload = writeConf.toString();
Triggers::doTrigger("SYSTEM_CONFIG", payload);
}
/*LTS-END*/
if (Triggers::shouldTrigger("SYSTEM_CONFIG")){
std::string payload = writeConf.toString();
Triggers::doTrigger("SYSTEM_CONFIG", payload);
}
/*LTS-END*/
}
}
/*NOTES:
4B size (total size of entry 9B+XB(URL)+ 0..XB(nameArrayLen)) (if 0x00, stop reading)
4B url_len
XB url
1B async
for(number of strings)
4B stringLen
XB string
)
*/

View file

@ -253,6 +253,14 @@ namespace Mist {
//return is by reference
}
/*LTS-START*/
static bool liveBW(const char * param, const void * bwPtr){
if (!param || !bwPtr){return false;}
INFO_MSG("Comparing %s to %lu", param, *((uint32_t*)bwPtr));
return JSON::Value(param).asInt() <= *((uint32_t*)bwPtr);
}
/*LTS-END*/
/// \triggers
/// The `"STREAM_BUFFER"` trigger is stream-specific, and is ran whenever the buffer changes state between playable (FULL) or not (EMPTY). It cannot be cancelled. It is possible to receive multiple EMPTY calls without FULL calls in between, as EMPTY is always generated when a stream is unloaded from memory, even if this stream never reached playable state in the first place (e.g. a broadcast was cancelled before filling enough buffer to be playable). Its payload is:
/// ~~~~~~~~~~~~~~~
@ -263,10 +271,13 @@ namespace Mist {
void inputBuffer::updateMeta() {
static bool wentDry = false;
static long long unsigned int lastFragCount = 0xFFFFull;
static uint32_t lastBPS = 0;/*LTS*/
uint32_t currBPS = 0;
long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int lastms = 0;
long long unsigned int fragCount = 0xFFFFull;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
currBPS += it->second.bps; /*LTS*/
if (it->second.type == "meta" || !it->second.type.size()) {
continue;
}
@ -290,20 +301,31 @@ namespace Mist {
}
}
/*LTS-START*/
if (fragCount >= FRAG_BOOT && fragCount != 0xFFFFull && Triggers::shouldTrigger("STREAM_BUFFER")){
if (currBPS != lastBPS){
lastBPS = currBPS;
if (Triggers::shouldTrigger("LIVE_BANDWIDTH", streamName, liveBW, &lastBPS)){
std::string payload = streamName + "\n" + JSON::Value((long long)lastBPS).asStringRef();
if (!Triggers::doTrigger("LIVE_BANDWIDTH", payload, streamName)){
WARN_MSG("Shutting down buffer because bandwidth limit reached!");
config->is_active = false;
userPage.finishEach();
}
}
}
if (fragCount >= FRAG_BOOT && fragCount != 0xFFFFull && Triggers::shouldTrigger("STREAM_BUFFER", streamName)){
JSON::Value stream_details;
fillBufferDetails(stream_details);
if (lastFragCount == 0xFFFFull) {
std::string payload = config->getString("streamname") + "\nFULL\n" + stream_details.toString();
Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname"));
std::string payload = streamName + "\nFULL\n" + stream_details.toString();
Triggers::doTrigger("STREAM_BUFFER", payload, streamName);
}else{
if (stream_details.isMember("issues") != wentDry){
if (stream_details.isMember("issues")){
std::string payload = config->getString("streamname") + "\nDRY\n" + stream_details.toString();
Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname"));
std::string payload = streamName + "\nDRY\n" + stream_details.toString();
Triggers::doTrigger("STREAM_BUFFER", payload, streamName);
}else{
std::string payload = config->getString("streamname") + "\nRECOVER\n" + stream_details.toString();
Triggers::doTrigger("STREAM_BUFFER", payload, config->getString("streamname"));
std::string payload = streamName + "\nRECOVER\n" + stream_details.toString();
Triggers::doTrigger("STREAM_BUFFER", payload, streamName);
}
}
}