From 1c3e143709c22a79e107200ee691d260faa4d405 Mon Sep 17 00:00:00 2001 From: ozzay Date: Thu, 26 Nov 2015 14:13:07 +0100 Subject: [PATCH] Recording functionality by Diederick Huijbers, slightly tweaked. --- lib/http_parser.h | 4 +- lib/stream.cpp | 227 ++++++++++++++++++++++++++ lib/stream.h | 3 + lsp/minified.js | 5 +- lsp/mist.js | 4 + src/input/input_buffer.cpp | 53 ++++++ src/input/input_buffer.h | 4 + src/output/output.cpp | 206 ++++++++++++++++++++++- src/output/output.h | 15 ++ src/output/output_http.cpp | 19 +++ src/output/output_http.h | 7 + src/output/output_httpts.cpp | 7 +- src/output/output_progressive_flv.cpp | 3 +- src/output/output_progressive_mp4.cpp | 3 + 14 files changed, 550 insertions(+), 10 deletions(-) diff --git a/lib/http_parser.h b/lib/http_parser.h index 8f51dd92..207da8c9 100644 --- a/lib/http_parser.h +++ b/lib/http_parser.h @@ -46,11 +46,13 @@ namespace HTTP { unsigned int length; bool headerOnly; ///< If true, do not parse body if the length is a known size. bool bufferChunks; + //this bool was private + bool sendingChunks; + private: bool seenHeaders; bool seenReq; bool getChunks; - bool sendingChunks; unsigned int doingChunk; bool parse(std::string & HTTPbuffer); void parseVars(std::string data); diff --git a/lib/stream.cpp b/lib/stream.cpp index b8ace8b0..50050ad2 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -16,6 +16,12 @@ #include "dtsc.h" #include "triggers.h"//LTS +/* roxlu-begin */ +static std::string strftime_now(const std::string& format); +static void replace_str(std::string& str, const std::string& from, const std::string& to); +static void replace_variables(std::string& str); +/* roxlu-end */ + std::string Util::getTmpFolder() { std::string dir; char * tmp_char = 0; @@ -287,3 +293,224 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir } return true; } + +/* roxlu-begin */ +int Util::startRecording(std::string streamname) { + + sanitizeName(streamname); + if (streamname.size() > 100){ + FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); + return -1; + } + + // Attempt to load up configuration and find this stream + IPC::sharedPage mistConfOut("!mistConfig", DEFAULT_CONF_PAGE_SIZE); + IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1); + + //Lock the config to prevent race conditions and corruption issues while reading + configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); + + //Abort if no config available + if (!config){ + FAIL_MSG("Configuration not available, aborting! Is MistController running?"); + configLock.post();//unlock the config semaphore + return -2; + } + + //Find stream base name + std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); + DTSC::Scan streamCfg = config.getMember("streams").getMember(smp); + if (!streamCfg){ + DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str()); + configLock.post(); + return -3; + } + + // When we have a validate trigger, we execute that first before we continue. + if (Triggers::shouldTrigger("RECORDING_VALIDATE", streamname)) { + std::string validate_result; + Triggers::doTrigger("RECORDING_VALIDATE", streamname, streamname.c_str(), false, validate_result); + INFO_MSG("RECORDING_VALIDATE returned: %s", validate_result.c_str()); + if (validate_result == "0") { + INFO_MSG("RECORDING_VALIDATE: the hook returned 0 so we're not going to create a recording."); + configLock.post(); + return 0; + } + } + + // Should we start an flv output? (We allow hooks to specify custom filenames) + DTSC::Scan recordFilenameConf = streamCfg.getMember("record"); + std::string recordFilename; + + if (Triggers::shouldTrigger("RECORDING_FILEPATH", streamname)) { + + std::string payload = streamname; + std::string filepath_response; + Triggers::doTrigger("RECORDING_FILEPATH", payload, streamname.c_str(), false, filepath_response); /* @todo do we need to handle the return of doTrigger? */ + + if (filepath_response.size() < 1024) { /* @todo is there a MAX_FILEPATH somewhere? */ + recordFilename = filepath_response; + } + else { + FAIL_MSG("The RECORDING_FILEPATH trigger returned a filename which is bigger then our allowed max filename size. Not using returned filepath from hook."); + } + } + + // No filename set through trigger, so use the one one from the stream config. + if (recordFilename.size() == 0) { + recordFilename = recordFilenameConf.asString(); + } + + /*if (recordFilename.size() == 0 + || recordFilename.substr(recordFilename.find_last_of(".") + 1) != "flv") + { + configLock.post(); + return -4; + }*/ + + // The filename can hold variables like current time etc.. + replace_variables(recordFilename); + + INFO_MSG("Filepath that we use for the recording: %s", recordFilename.c_str()); + //to change hardcoding + //determine extension, first find the '.' for extension + size_t pointPlace = recordFilename.rfind("."); + if (pointPlace == std::string::npos){ + FAIL_MSG("no extension found in output name. Aborting recording."); + return -1; + } + std::string fileExtension = recordFilename.substr(pointPlace+1); + DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors"); + DTSC::Scan output; + std::string output_filepath = ""; + unsigned int outputs_size = outputs.getSize(); + HIGH_MSG("Recording outputs %d",outputs_size); + for (unsigned int i = 0; i vars; + std::string day = strftime_now("%d"); + std::string month = strftime_now("%m"); + std::string year = strftime_now("%Y"); + std::string hour = strftime_now("%H"); + std::string minute = strftime_now("%M"); + std::string seconds = strftime_now("%S"); + std::string datetime = year +"." +month +"." +day +"." +hour +"." +minute +"." +seconds; + + if (0 == day.size()) { + WARN_MSG("Failed to retrieve the current day with strftime_now()."); + } + if (0 == month.size()) { + WARN_MSG("Failed to retrieve the current month with strftime_now()."); + } + if (0 == year.size()) { + WARN_MSG("Failed to retrieve the current year with strftime_now()."); + } + if (0 == hour.size()) { + WARN_MSG("Failed to retrieve the current hour with strftime_now()."); + } + if (0 == minute.size()) { + WARN_MSG("Failed to retrieve the current minute with strftime_now()."); + } + if (0 == seconds.size()) { + WARN_MSG("Failed to retrieve the current seconds with strftime_now()."); + } + + vars.insert(std::pair("$day", day)); + vars.insert(std::pair("$month", month)); + vars.insert(std::pair("$year", year)); + vars.insert(std::pair("$hour", hour)); + vars.insert(std::pair("$minute", minute)); + vars.insert(std::pair("$seconds", seconds)); + vars.insert(std::pair("$datetime", datetime)); + + std::map::iterator it = vars.begin(); + while (it != vars.end()) { + replace(str, it->first, it->second); + ++it; + } +} + +static std::string strftime_now(const std::string& format) { + + time_t rawtime; + struct tm* timeinfo = NULL; + char buffer [80] = { 0 }; + + time(&rawtime); + timeinfo = localtime (&rawtime); + + if (0 == strftime(buffer, 80, format.c_str(), timeinfo)) { + FAIL_MSG("Call to stftime() failed with format: %s, maybe our buffer is not big enough (80 bytes).", format.c_str()); + return ""; + } + + return buffer; +} + +/* roxlu-end */ diff --git a/lib/stream.h b/lib/stream.h index e8cde110..021d94f3 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -11,5 +11,8 @@ namespace Util { void sanitizeName(std::string & streamname); bool streamAlive(std::string & streamname); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true); + /* roxlu-begin */ + int startRecording(std::string streamname); + /* roxlu-end */ JSON::Value getStreamConfig(std::string streamname); } diff --git a/lsp/minified.js b/lsp/minified.js index d557b57c..092117be 100644 --- a/lsp/minified.js +++ b/lsp/minified.js @@ -127,8 +127,9 @@ a[0]+" trigger?")){mist.data.config.triggers[a[0]].splice(a[1],1);mist.data.conf pointer:{main:n,index:"triggeron"},help:"For what event this trigger should activate.",type:"select",select:[["SYSTEM_START","SYSTEM_START: after MistServer boot"],["SYSTEM_STOP","SYSTEM_STOP: right before MistServer shutdown"],["SYSTEM_CONFIG","SYSTEM_CONFIG: after MistServer configurations have changed"],["OUTPUT_START","OUTPUT_START: right after the start command has been send to a protocol"],["OUTPUT_STOP","OUTPUT_STOP: right after the close command has been send to a protocol "],["STREAM_ADD", "STREAM_ADD: right before new stream configured"],["STREAM_CONFIG","STREAM_CONFIG: right before a stream configuration has changed"],["STREAM_REMOVE","STREAM_REMOVE: right before a stream has been deleted"],["STREAM_SOURCE","STREAM_SOURCE: right before stream source is loaded"],["STREAM_LOAD","STREAM_LOAD: right before stream input is loaded in memory"],["STREAM_READY","STREAM_READY: when the stream input is loaded and ready for playback"],["STREAM_UNLOAD","STREAM_UNLOAD: right before the stream input is removed from memory"], ["STREAM_PUSH","STREAM_PUSH: right before an incoming push is accepted"],["STREAM_TRACK_ADD","STREAM_TRACK_ADD: right before a track will be added to a stream; e.g.: additional push received"],["STREAM_TRACK_REMOVE","STREAM_TRACK_REMOVE: right before a track will be removed track from a stream; e.g.: push timeout"],["STREAM_BUFFER","STREAM_BUFFER: when a buffer changes between mostly full or mostly empty"],["RTMP_PUSH_REWRITE","RTMP_PUSH_REWRITE: allows rewriting of RTMP push URLs from external to internal representation before further parsing"], -["CONN_OPEN","CONN_OPEN: right after a new incoming connection has been received"],["CONN_CLOSE","CONN_CLOSE: right after a connection has been closed"],["CONN_PLAY","CONN_PLAY: right before a stream playback of a connection"]],LTSonly:!0,"function":function(){switch($(this).getval()){case "SYSTEM_START":case "SYSTEM_STOP":case "SYSTEM_CONFIG":case "OUTPUT_START":case "OUTPUT_STOP":case "RTMP_PUSH_REWRITE":$("[name=appliesto]").setval([]).closest(".UIelement").hide();break;default:$("[name=appliesto]").closest(".UIelement").show()}}}, -{label:"Applies to",pointer:{main:n,index:"appliesto"},help:"For triggers that can apply to specific streams, this value decides what streams they are triggered for. (none checked = always triggered)",type:"checklist",checklist:Object.keys(mist.data.streams),LTSonly:!0},$("
"),{label:"Handler (URL or executable)",help:"This can be either an HTTP URL or a full path to an executable.",pointer:{main:n,index:"url"},validate:["required"],type:"str",LTSonly:!0},{label:"Blocking",type:"checkbox",help:"If checked, pauses processing and uses the response of the handler. If the response does not start with 1, true, yes or cont, further processing is aborted. If unchecked, processing is never paused and the response is not checked.", +["RECORDING_VALIDATE","RECORDING_VALIDATE: before recording, check if we this stream is allowed to record."],["RECORDING_FILEPATH","RECORDING_FILEPATH: before recording, hook can return filename to be used. "],["RECORDING_START","RECORDING_START: started a recording"],["RECORDING_STOP","RECORDING_STOP: stopped a recording"],["CONN_OPEN","CONN_OPEN: right after a new incoming connection has been received"],["CONN_CLOSE","CONN_CLOSE: right after a connection has been closed"],["CONN_PLAY","CONN_PLAY: right before a stream playback of a connection"]], +LTSonly:!0,"function":function(){switch($(this).getval()){case "SYSTEM_START":case "SYSTEM_STOP":case "SYSTEM_CONFIG":case "OUTPUT_START":case "OUTPUT_STOP":case "RTMP_PUSH_REWRITE":$("[name=appliesto]").setval([]).closest(".UIelement").hide();break;default:$("[name=appliesto]").closest(".UIelement").show()}}},{label:"Applies to",pointer:{main:n,index:"appliesto"},help:"For triggers that can apply to specific streams, this value decides what streams they are triggered for. (none checked = always triggered)", +type:"checklist",checklist:Object.keys(mist.data.streams),LTSonly:!0},$("
"),{label:"Handler (URL or executable)",help:"This can be either an HTTP URL or a full path to an executable.",pointer:{main:n,index:"url"},validate:["required"],type:"str",LTSonly:!0},{label:"Blocking",type:"checkbox",help:"If checked, pauses processing and uses the response of the handler. If the response does not start with 1, true, yes or cont, further processing is aborted. If unchecked, processing is never paused and the response is not checked.", pointer:{main:n,index:"async"},LTSonly:!0},{label:"Default response",type:"str",help:"For blocking requests, the default response in case the handler cannot be executed for any reason.",pointer:{main:n,index:"default"},LTSonly:!0},{type:"buttons",buttons:[{type:"cancel",label:"Cancel","function":function(){UI.navto("Triggers")}},{type:"save",label:"Save","function":function(){c&&mist.data.config.triggers[c[0]].splice(c[1],1);var a=[n.url,n.async?true:false,typeof n.appliesto!="undefined"?n.appliesto: []];typeof n["default"]!="undefined"&&a.push(n["default"]);n.triggeron in mist.data.config.triggers||(mist.data.config.triggers[n.triggeron]=[]);mist.data.config.triggers[n.triggeron].push(a);mist.send(function(){UI.navto("Triggers")},{config:mist.data.config})}}]}]));$("[name=triggeron]").trigger("change");break;case "Logs":b.append(UI.buildUI([{type:"help",help:"Here you have an overview of all edited settings within MistServer and possible warnings or errors MistServer has encountered. MistServer stores up to 100 logs at a time."}, {label:"Refresh every",type:"select",select:[[10,"10 seconds"],[30,"30 seconds"],[60,"minute"],[300,"5 minutes"]],value:30,"function":function(){clearInterval(UI.interval);UI.interval.set(function(){mist.send(function(){V()})},$(this).val()*1E3)}}]));b.append($("