Added USER_NEW trigger.

This commit is contained in:
Thulinma 2015-12-25 02:22:54 +01:00
parent 4db2ea97ed
commit f6e2e95b5a
7 changed files with 64 additions and 4 deletions

View file

@ -706,6 +706,16 @@ namespace IPC {
return result; return result;
} }
///\brief Sets checksum field
void statExchange::setSync(char s) {
data[172] = s;
}
///\brief Gets checksum field
char statExchange::getSync() {
return data[172];
}
///\brief Creates a semaphore guard, locks the semaphore on call ///\brief Creates a semaphore guard, locks the semaphore on call
semGuard::semGuard(semaphore * thisSemaphore) : mySemaphore(thisSemaphore) { semGuard::semGuard(semaphore * thisSemaphore) : mySemaphore(thisSemaphore) {
mySemaphore->wait(); mySemaphore->wait();

View file

@ -11,7 +11,7 @@
#include <semaphore.h> #include <semaphore.h>
#endif #endif
#define STAT_EX_SIZE 174 #define STAT_EX_SIZE 175
#define PLAY_EX_SIZE 2+6*SIMUL_TRACKS #define PLAY_EX_SIZE 2+6*SIMUL_TRACKS
namespace IPC { namespace IPC {
@ -37,6 +37,8 @@ namespace IPC {
void connector(std::string name); void connector(std::string name);
std::string connector(); std::string connector();
void crc(unsigned int sum); void crc(unsigned int sum);
char getSync();
void setSync(char s);
unsigned int crc(); unsigned int crc();
private: private:
///\brief The payload for the stat exchange ///\brief The payload for the stat exchange
@ -49,6 +51,8 @@ namespace IPC {
/// - 100 byte - streamName (name of the stream peer is viewing) /// - 100 byte - streamName (name of the stream peer is viewing)
/// - 20 byte - connector (name of the connector the peer is using) /// - 20 byte - connector (name of the connector the peer is using)
/// - 4 byte - CRC32 of user agent (or zero if none) /// - 4 byte - CRC32 of user agent (or zero if none)
/// - 1 byte sync (was seen by controller yes/no)
/// - (implicit 2 bytes: PID)
char * data; char * data;
}; };

View file

@ -127,8 +127,8 @@ a[0]+" trigger?")){mist.data.config.triggers[a[0]].splice(a[1],1);mist.data.conf
pointer:{main:n,index:"triggeron"},help:"For what event this trigger should activate.",type:"select",select:[["SYSTEM_START","SYSTEM_START: after MistServer boot"],["SYSTEM_STOP","SYSTEM_STOP: right before MistServer shutdown"],["SYSTEM_CONFIG","SYSTEM_CONFIG: after MistServer configurations have changed"],["OUTPUT_START","OUTPUT_START: right after the start command has been send to a protocol"],["OUTPUT_STOP","OUTPUT_STOP: right after the close command has been send to a protocol "],["STREAM_ADD", pointer:{main:n,index:"triggeron"},help:"For what event this trigger should activate.",type:"select",select:[["SYSTEM_START","SYSTEM_START: after MistServer boot"],["SYSTEM_STOP","SYSTEM_STOP: right before MistServer shutdown"],["SYSTEM_CONFIG","SYSTEM_CONFIG: after MistServer configurations have changed"],["OUTPUT_START","OUTPUT_START: right after the start command has been send to a protocol"],["OUTPUT_STOP","OUTPUT_STOP: right after the close command has been send to a protocol "],["STREAM_ADD",
"STREAM_ADD: right before new stream configured"],["STREAM_CONFIG","STREAM_CONFIG: right before a stream configuration has changed"],["STREAM_REMOVE","STREAM_REMOVE: right before a stream has been deleted"],["STREAM_SOURCE","STREAM_SOURCE: right before stream source is loaded"],["STREAM_LOAD","STREAM_LOAD: right before stream input is loaded in memory"],["STREAM_READY","STREAM_READY: when the stream input is loaded and ready for playback"],["STREAM_UNLOAD","STREAM_UNLOAD: right before the stream input is removed from memory"], "STREAM_ADD: right before new stream configured"],["STREAM_CONFIG","STREAM_CONFIG: right before a stream configuration has changed"],["STREAM_REMOVE","STREAM_REMOVE: right before a stream has been deleted"],["STREAM_SOURCE","STREAM_SOURCE: right before stream source is loaded"],["STREAM_LOAD","STREAM_LOAD: right before stream input is loaded in memory"],["STREAM_READY","STREAM_READY: when the stream input is loaded and ready for playback"],["STREAM_UNLOAD","STREAM_UNLOAD: right before the stream input is removed from memory"],
["STREAM_PUSH","STREAM_PUSH: right before an incoming push is accepted"],["STREAM_TRACK_ADD","STREAM_TRACK_ADD: right before a track will be added to a stream; e.g.: additional push received"],["STREAM_TRACK_REMOVE","STREAM_TRACK_REMOVE: right before a track will be removed track from a stream; e.g.: push timeout"],["STREAM_BUFFER","STREAM_BUFFER: when a buffer changes between mostly full or mostly empty"],["RTMP_PUSH_REWRITE","RTMP_PUSH_REWRITE: allows rewriting of RTMP push URLs from external to internal representation before further parsing"], ["STREAM_PUSH","STREAM_PUSH: right before an incoming push is accepted"],["STREAM_TRACK_ADD","STREAM_TRACK_ADD: right before a track will be added to a stream; e.g.: additional push received"],["STREAM_TRACK_REMOVE","STREAM_TRACK_REMOVE: right before a track will be removed track from a stream; e.g.: push timeout"],["STREAM_BUFFER","STREAM_BUFFER: when a buffer changes between mostly full or mostly empty"],["RTMP_PUSH_REWRITE","RTMP_PUSH_REWRITE: allows rewriting of RTMP push URLs from external to internal representation before further parsing"],
["RECORDING_VALIDATE","RECORDING_VALIDATE: before recording, check if we this stream is allowed to record."],["RECORDING_FILEPATH","RECORDING_FILEPATH: before recording, hook can return filename to be used. "],["RECORDING_START","RECORDING_START: started a recording"],["RECORDING_STOP","RECORDING_STOP: stopped a recording"],["CONN_OPEN","CONN_OPEN: right after a new incoming connection has been received"],["CONN_CLOSE","CONN_CLOSE: right after a connection has been closed"],["CONN_PLAY","CONN_PLAY: right before a stream playback of a connection"]], ["RECORDING_VALIDATE","RECORDING_VALIDATE: before recording, check if we this stream is allowed to record."],["RECORDING_FILEPATH","RECORDING_FILEPATH: before recording, hook can return filename to be used. "],["RECORDING_START","RECORDING_START: started a recording"],["RECORDING_STOP","RECORDING_STOP: stopped a recording"],["CONN_OPEN","CONN_OPEN: right after a new incoming connection has been received"],["CONN_CLOSE","CONN_CLOSE: right after a connection has been closed"],["CONN_PLAY","CONN_PLAY: right before a stream playback of a connection"],
LTSonly:!0,"function":function(){switch($(this).getval()){case "SYSTEM_START":case "SYSTEM_STOP":case "SYSTEM_CONFIG":case "OUTPUT_START":case "OUTPUT_STOP":case "RTMP_PUSH_REWRITE":$("[name=appliesto]").setval([]).closest(".UIelement").hide();break;default:$("[name=appliesto]").closest(".UIelement").show()}}},{label:"Applies to",pointer:{main:n,index:"appliesto"},help:"For triggers that can apply to specific streams, this value decides what streams they are triggered for. (none checked = always triggered)", ["USER_NEW","USER_NEW: A new user connects that hasn't been allowed or denied access before"]],LTSonly:!0,"function":function(){switch($(this).getval()){case "SYSTEM_START":case "SYSTEM_STOP":case "SYSTEM_CONFIG":case "OUTPUT_START":case "OUTPUT_STOP":case "RTMP_PUSH_REWRITE":$("[name=appliesto]").setval([]).closest(".UIelement").hide();break;default:$("[name=appliesto]").closest(".UIelement").show()}}},{label:"Applies to",pointer:{main:n,index:"appliesto"},help:"For triggers that can apply to specific streams, this value decides what streams they are triggered for. (none checked = always triggered)",
type:"checklist",checklist:Object.keys(mist.data.streams),LTSonly:!0},$("<br>"),{label:"Handler (URL or executable)",help:"This can be either an HTTP URL or a full path to an executable.",pointer:{main:n,index:"url"},validate:["required"],type:"str",LTSonly:!0},{label:"Blocking",type:"checkbox",help:"If checked, pauses processing and uses the response of the handler. If the response does not start with 1, true, yes or cont, further processing is aborted. If unchecked, processing is never paused and the response is not checked.", type:"checklist",checklist:Object.keys(mist.data.streams),LTSonly:!0},$("<br>"),{label:"Handler (URL or executable)",help:"This can be either an HTTP URL or a full path to an executable.",pointer:{main:n,index:"url"},validate:["required"],type:"str",LTSonly:!0},{label:"Blocking",type:"checkbox",help:"If checked, pauses processing and uses the response of the handler. If the response does not start with 1, true, yes or cont, further processing is aborted. If unchecked, processing is never paused and the response is not checked.",
pointer:{main:n,index:"async"},LTSonly:!0},{label:"Default response",type:"str",help:"For blocking requests, the default response in case the handler cannot be executed for any reason.",pointer:{main:n,index:"default"},LTSonly:!0},{type:"buttons",buttons:[{type:"cancel",label:"Cancel","function":function(){UI.navto("Triggers")}},{type:"save",label:"Save","function":function(){c&&mist.data.config.triggers[c[0]].splice(c[1],1);var a=[n.url,n.async?true:false,typeof n.appliesto!="undefined"?n.appliesto: pointer:{main:n,index:"async"},LTSonly:!0},{label:"Default response",type:"str",help:"For blocking requests, the default response in case the handler cannot be executed for any reason.",pointer:{main:n,index:"default"},LTSonly:!0},{type:"buttons",buttons:[{type:"cancel",label:"Cancel","function":function(){UI.navto("Triggers")}},{type:"save",label:"Save","function":function(){c&&mist.data.config.triggers[c[0]].splice(c[1],1);var a=[n.url,n.async?true:false,typeof n.appliesto!="undefined"?n.appliesto:
[]];typeof n["default"]!="undefined"&&a.push(n["default"]);n.triggeron in mist.data.config.triggers||(mist.data.config.triggers[n.triggeron]=[]);mist.data.config.triggers[n.triggeron].push(a);mist.send(function(){UI.navto("Triggers")},{config:mist.data.config})}}]}]));$("[name=triggeron]").trigger("change");break;case "Logs":b.append(UI.buildUI([{type:"help",help:"Here you have an overview of all edited settings within MistServer and possible warnings or errors MistServer has encountered. MistServer stores up to 100 logs at a time."}, []];typeof n["default"]!="undefined"&&a.push(n["default"]);n.triggeron in mist.data.config.triggers||(mist.data.config.triggers[n.triggeron]=[]);mist.data.config.triggers[n.triggeron].push(a);mist.send(function(){UI.navto("Triggers")},{config:mist.data.config})}}]}]));$("[name=triggeron]").trigger("change");break;case "Logs":b.append(UI.buildUI([{type:"help",help:"Here you have an overview of all edited settings within MistServer and possible warnings or errors MistServer has encountered. MistServer stores up to 100 logs at a time."},

View file

@ -3576,7 +3576,8 @@ var UI = {
['RECORDING_STOP', 'RECORDING_STOP: stopped a recording'], ['RECORDING_STOP', 'RECORDING_STOP: stopped a recording'],
['CONN_OPEN', 'CONN_OPEN: right after a new incoming connection has been received'], ['CONN_OPEN', 'CONN_OPEN: right after a new incoming connection has been received'],
['CONN_CLOSE', 'CONN_CLOSE: right after a connection has been closed'], ['CONN_CLOSE', 'CONN_CLOSE: right after a connection has been closed'],
['CONN_PLAY', 'CONN_PLAY: right before a stream playback of a connection'] ['CONN_PLAY', 'CONN_PLAY: right before a stream playback of a connection'],
['USER_NEW', 'USER_NEW: A new user connects that hasn\'t been allowed or denied access before']
], ],
LTSonly: true, LTSonly: true,
'function': function(){ 'function': function(){

View file

@ -127,6 +127,14 @@ void Controller::SharedMemStats(void * config){
/// Updates the given active connection with new stats data. /// Updates the given active connection with new stats data.
void Controller::statSession::update(unsigned long index, IPC::statExchange & data){ void Controller::statSession::update(unsigned long index, IPC::statExchange & data){
//update the sync byte: 0 = requesting fill, 1 = needs checking, > 1 = state known
if (!data.getSync()){
data.setSync(sync);
}else{
if (sync < 2){
sync = data.getSync();
}
}
curConns[index].update(data); curConns[index].update(data);
//store timestamp of last received data, if newer //store timestamp of last received data, if newer
if (data.now() > lastSec){ if (data.now() > lastSec){
@ -171,6 +179,7 @@ void Controller::statSession::finish(unsigned long index){
Controller::statSession::statSession(){ Controller::statSession::statSession(){
firstSec = 0xFFFFFFFFFFFFFFFFull; firstSec = 0xFFFFFFFFFFFFFFFFull;
lastSec = 0; lastSec = 0;
sync = 1;
} }
/// Moves the given connection to the given session /// Moves the given connection to the given session

View file

@ -58,6 +58,7 @@ namespace Controller {
unsigned long long lastSec; unsigned long long lastSec;
std::deque<statStorage> oldConns; std::deque<statStorage> oldConns;
std::map<unsigned long, statStorage> curConns; std::map<unsigned long, statStorage> curConns;
char sync;
public: public:
statSession(); statSession();
void wipeOld(unsigned long long); void wipeOld(unsigned long long);

View file

@ -193,6 +193,14 @@ namespace Mist {
/// output handler name /// output handler name
/// request URL (if any) /// request URL (if any)
/// ~~~~~~~~~~~~~~~ /// ~~~~~~~~~~~~~~~
/// The `"USER_NEW"` trigger is stream-specific, and is ran when a new user first opens a stream. Segmented protcols are unduplicated over the duration of the statistics log (~10 minutes), true streaming protocols (RTMP, RTSP) are not deduplicated as no duplication ever takes place. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// connected client host
/// User agent checksum (CRC32 of User-agent string)
/// output handler name
/// request URL (if any)
/// ~~~~~~~~~~~~~~~
void Output::initialize(){ void Output::initialize(){
if (isInitialized){ if (isInitialized){
return; return;
@ -219,6 +227,33 @@ namespace Mist {
myConn.close(); myConn.close();
} }
} }
if(Triggers::shouldTrigger("USER_NEW", streamName)){
//sync byte 0 = no sync yet, wait for sync from controller...
IPC::statExchange tmpEx(statsPage.getData());
unsigned int i = 0;
while (!tmpEx.getSync() && i < 30){
Util::sleep(100);
stats();
}
HIGH_MSG("USER_NEW sync achieved: %u", (unsigned int)tmpEx.getSync());
//1 = check requested (connection is new)
if (tmpEx.getSync() == 1){
std::string payload = streamName+"\n" + myConn.getHost() +"\n" + JSON::Value((long long)crc).asString() + "\n"+capa["name"].asStringRef()+"\n"+reqUrl;
if (!Triggers::doTrigger("USER_NEW", payload, streamName)){
MEDIUM_MSG("Closing connection because denied by USER_NEW trigger");
myConn.close();
tmpEx.setSync(100);//100 = denied
}else{
tmpEx.setSync(10);//10 = accepted
}
}
//100 = denied
if (tmpEx.getSync() == 100){
myConn.close();
MEDIUM_MSG("Closing connection because denied by USER_NEW sync byte");
}
//anything else = accepted
}
/*LTS-END*/ /*LTS-END*/
} }