diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index d23fd443..49a3a4b1 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -706,6 +706,16 @@ namespace IPC { return result; } + ///\brief Sets checksum field + void statExchange::setSync(char s) { + data[172] = s; + } + + ///\brief Gets checksum field + char statExchange::getSync() { + return data[172]; + } + ///\brief Creates a semaphore guard, locks the semaphore on call semGuard::semGuard(semaphore * thisSemaphore) : mySemaphore(thisSemaphore) { mySemaphore->wait(); diff --git a/lib/shared_memory.h b/lib/shared_memory.h index dd189670..9bac591e 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -11,7 +11,7 @@ #include #endif -#define STAT_EX_SIZE 174 +#define STAT_EX_SIZE 175 #define PLAY_EX_SIZE 2+6*SIMUL_TRACKS namespace IPC { @@ -37,6 +37,8 @@ namespace IPC { void connector(std::string name); std::string connector(); void crc(unsigned int sum); + char getSync(); + void setSync(char s); unsigned int crc(); private: ///\brief The payload for the stat exchange @@ -49,6 +51,8 @@ namespace IPC { /// - 100 byte - streamName (name of the stream peer is viewing) /// - 20 byte - connector (name of the connector the peer is using) /// - 4 byte - CRC32 of user agent (or zero if none) + /// - 1 byte sync (was seen by controller yes/no) + /// - (implicit 2 bytes: PID) char * data; }; diff --git a/lsp/minified.js b/lsp/minified.js index 092117be..f1fee851 100644 --- a/lsp/minified.js +++ b/lsp/minified.js @@ -127,8 +127,8 @@ a[0]+" trigger?")){mist.data.config.triggers[a[0]].splice(a[1],1);mist.data.conf pointer:{main:n,index:"triggeron"},help:"For what event this trigger should activate.",type:"select",select:[["SYSTEM_START","SYSTEM_START: after MistServer boot"],["SYSTEM_STOP","SYSTEM_STOP: right before MistServer shutdown"],["SYSTEM_CONFIG","SYSTEM_CONFIG: after MistServer configurations have changed"],["OUTPUT_START","OUTPUT_START: right after the start command has been send to a protocol"],["OUTPUT_STOP","OUTPUT_STOP: right after the close command has been send to a protocol "],["STREAM_ADD", "STREAM_ADD: right before new stream configured"],["STREAM_CONFIG","STREAM_CONFIG: right before a stream configuration has changed"],["STREAM_REMOVE","STREAM_REMOVE: right before a stream has been deleted"],["STREAM_SOURCE","STREAM_SOURCE: right before stream source is loaded"],["STREAM_LOAD","STREAM_LOAD: right before stream input is loaded in memory"],["STREAM_READY","STREAM_READY: when the stream input is loaded and ready for playback"],["STREAM_UNLOAD","STREAM_UNLOAD: right before the stream input is removed from memory"], ["STREAM_PUSH","STREAM_PUSH: right before an incoming push is accepted"],["STREAM_TRACK_ADD","STREAM_TRACK_ADD: right before a track will be added to a stream; e.g.: additional push received"],["STREAM_TRACK_REMOVE","STREAM_TRACK_REMOVE: right before a track will be removed track from a stream; e.g.: push timeout"],["STREAM_BUFFER","STREAM_BUFFER: when a buffer changes between mostly full or mostly empty"],["RTMP_PUSH_REWRITE","RTMP_PUSH_REWRITE: allows rewriting of RTMP push URLs from external to internal representation before further parsing"], -["RECORDING_VALIDATE","RECORDING_VALIDATE: before recording, check if we this stream is allowed to record."],["RECORDING_FILEPATH","RECORDING_FILEPATH: before recording, hook can return filename to be used. "],["RECORDING_START","RECORDING_START: started a recording"],["RECORDING_STOP","RECORDING_STOP: stopped a recording"],["CONN_OPEN","CONN_OPEN: right after a new incoming connection has been received"],["CONN_CLOSE","CONN_CLOSE: right after a connection has been closed"],["CONN_PLAY","CONN_PLAY: right before a stream playback of a connection"]], -LTSonly:!0,"function":function(){switch($(this).getval()){case "SYSTEM_START":case "SYSTEM_STOP":case "SYSTEM_CONFIG":case "OUTPUT_START":case "OUTPUT_STOP":case "RTMP_PUSH_REWRITE":$("[name=appliesto]").setval([]).closest(".UIelement").hide();break;default:$("[name=appliesto]").closest(".UIelement").show()}}},{label:"Applies to",pointer:{main:n,index:"appliesto"},help:"For triggers that can apply to specific streams, this value decides what streams they are triggered for. (none checked = always triggered)", +["RECORDING_VALIDATE","RECORDING_VALIDATE: before recording, check if we this stream is allowed to record."],["RECORDING_FILEPATH","RECORDING_FILEPATH: before recording, hook can return filename to be used. "],["RECORDING_START","RECORDING_START: started a recording"],["RECORDING_STOP","RECORDING_STOP: stopped a recording"],["CONN_OPEN","CONN_OPEN: right after a new incoming connection has been received"],["CONN_CLOSE","CONN_CLOSE: right after a connection has been closed"],["CONN_PLAY","CONN_PLAY: right before a stream playback of a connection"], +["USER_NEW","USER_NEW: A new user connects that hasn't been allowed or denied access before"]],LTSonly:!0,"function":function(){switch($(this).getval()){case "SYSTEM_START":case "SYSTEM_STOP":case "SYSTEM_CONFIG":case "OUTPUT_START":case "OUTPUT_STOP":case "RTMP_PUSH_REWRITE":$("[name=appliesto]").setval([]).closest(".UIelement").hide();break;default:$("[name=appliesto]").closest(".UIelement").show()}}},{label:"Applies to",pointer:{main:n,index:"appliesto"},help:"For triggers that can apply to specific streams, this value decides what streams they are triggered for. (none checked = always triggered)", type:"checklist",checklist:Object.keys(mist.data.streams),LTSonly:!0},$("
"),{label:"Handler (URL or executable)",help:"This can be either an HTTP URL or a full path to an executable.",pointer:{main:n,index:"url"},validate:["required"],type:"str",LTSonly:!0},{label:"Blocking",type:"checkbox",help:"If checked, pauses processing and uses the response of the handler. If the response does not start with 1, true, yes or cont, further processing is aborted. If unchecked, processing is never paused and the response is not checked.", pointer:{main:n,index:"async"},LTSonly:!0},{label:"Default response",type:"str",help:"For blocking requests, the default response in case the handler cannot be executed for any reason.",pointer:{main:n,index:"default"},LTSonly:!0},{type:"buttons",buttons:[{type:"cancel",label:"Cancel","function":function(){UI.navto("Triggers")}},{type:"save",label:"Save","function":function(){c&&mist.data.config.triggers[c[0]].splice(c[1],1);var a=[n.url,n.async?true:false,typeof n.appliesto!="undefined"?n.appliesto: []];typeof n["default"]!="undefined"&&a.push(n["default"]);n.triggeron in mist.data.config.triggers||(mist.data.config.triggers[n.triggeron]=[]);mist.data.config.triggers[n.triggeron].push(a);mist.send(function(){UI.navto("Triggers")},{config:mist.data.config})}}]}]));$("[name=triggeron]").trigger("change");break;case "Logs":b.append(UI.buildUI([{type:"help",help:"Here you have an overview of all edited settings within MistServer and possible warnings or errors MistServer has encountered. MistServer stores up to 100 logs at a time."}, diff --git a/lsp/mist.js b/lsp/mist.js index 9f83e098..2ada6243 100644 --- a/lsp/mist.js +++ b/lsp/mist.js @@ -3576,7 +3576,8 @@ var UI = { ['RECORDING_STOP', 'RECORDING_STOP: stopped a recording'], ['CONN_OPEN', 'CONN_OPEN: right after a new incoming connection has been received'], ['CONN_CLOSE', 'CONN_CLOSE: right after a connection has been closed'], - ['CONN_PLAY', 'CONN_PLAY: right before a stream playback of a connection'] + ['CONN_PLAY', 'CONN_PLAY: right before a stream playback of a connection'], + ['USER_NEW', 'USER_NEW: A new user connects that hasn\'t been allowed or denied access before'] ], LTSonly: true, 'function': function(){ diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index d179cf28..99207f3a 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -127,6 +127,14 @@ void Controller::SharedMemStats(void * config){ /// Updates the given active connection with new stats data. void Controller::statSession::update(unsigned long index, IPC::statExchange & data){ + //update the sync byte: 0 = requesting fill, 1 = needs checking, > 1 = state known + if (!data.getSync()){ + data.setSync(sync); + }else{ + if (sync < 2){ + sync = data.getSync(); + } + } curConns[index].update(data); //store timestamp of last received data, if newer if (data.now() > lastSec){ @@ -171,6 +179,7 @@ void Controller::statSession::finish(unsigned long index){ Controller::statSession::statSession(){ firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; + sync = 1; } /// Moves the given connection to the given session diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 0880de7e..02ad1350 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -58,6 +58,7 @@ namespace Controller { unsigned long long lastSec; std::deque oldConns; std::map curConns; + char sync; public: statSession(); void wipeOld(unsigned long long); diff --git a/src/output/output.cpp b/src/output/output.cpp index db2a468a..c880e523 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -193,6 +193,14 @@ namespace Mist { /// output handler name /// request URL (if any) /// ~~~~~~~~~~~~~~~ + /// The `"USER_NEW"` trigger is stream-specific, and is ran when a new user first opens a stream. Segmented protcols are unduplicated over the duration of the statistics log (~10 minutes), true streaming protocols (RTMP, RTSP) are not deduplicated as no duplication ever takes place. Its payload is: + /// ~~~~~~~~~~~~~~~ + /// streamname + /// connected client host + /// User agent checksum (CRC32 of User-agent string) + /// output handler name + /// request URL (if any) + /// ~~~~~~~~~~~~~~~ void Output::initialize(){ if (isInitialized){ return; @@ -219,6 +227,33 @@ namespace Mist { myConn.close(); } } + if(Triggers::shouldTrigger("USER_NEW", streamName)){ + //sync byte 0 = no sync yet, wait for sync from controller... + IPC::statExchange tmpEx(statsPage.getData()); + unsigned int i = 0; + while (!tmpEx.getSync() && i < 30){ + Util::sleep(100); + stats(); + } + HIGH_MSG("USER_NEW sync achieved: %u", (unsigned int)tmpEx.getSync()); + //1 = check requested (connection is new) + if (tmpEx.getSync() == 1){ + std::string payload = streamName+"\n" + myConn.getHost() +"\n" + JSON::Value((long long)crc).asString() + "\n"+capa["name"].asStringRef()+"\n"+reqUrl; + if (!Triggers::doTrigger("USER_NEW", payload, streamName)){ + MEDIUM_MSG("Closing connection because denied by USER_NEW trigger"); + myConn.close(); + tmpEx.setSync(100);//100 = denied + }else{ + tmpEx.setSync(10);//10 = accepted + } + } + //100 = denied + if (tmpEx.getSync() == 100){ + myConn.close(); + MEDIUM_MSG("Closing connection because denied by USER_NEW sync byte"); + } + //anything else = accepted + } /*LTS-END*/ }