From 263dee7b2551178d17025ea1b8cc9d3a3c9be850 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 10 May 2016 00:18:30 +0200 Subject: [PATCH] Added push-related API calls to controller, made outputs able to wait for playable streams. --- CMakeLists.txt | 2 + lib/procs.h | 2 +- lib/stream.cpp | 167 +++++----------- lib/stream.h | 3 +- src/controller/controller_api.cpp | 47 +++++ src/controller/controller_push.cpp | 120 ++++++++++++ src/controller/controller_push.h | 19 ++ src/controller/controller_statistics.cpp | 36 ++++ src/controller/controller_statistics.h | 8 + src/input/input_buffer.cpp | 97 ---------- src/input/input_buffer.h | 11 +- src/output/output.cpp | 230 +++-------------------- src/output/output.h | 12 +- src/output/output_http.cpp | 17 +- src/output/output_http.h | 3 - src/output/output_rtmp.cpp | 10 + src/output/output_rtmp.h | 2 + 17 files changed, 329 insertions(+), 457 deletions(-) create mode 100644 src/controller/controller_push.cpp create mode 100644 src/controller/controller_push.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e18702a..a69cdf79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,6 +481,7 @@ set(controllerHeaders ${SOURCE_DIR}/src/controller/controller_updater.h ${SOURCE_DIR}/src/controller/controller_capabilities.h ${SOURCE_DIR}/src/controller/controller_streams.h + ${SOURCE_DIR}/src/controller/controller_push.h ) ######################################## @@ -497,6 +498,7 @@ set(controllerSources ${SOURCE_DIR}/src/controller/controller_capabilities.cpp ${SOURCE_DIR}/src/controller/controller_uplink.cpp ${SOURCE_DIR}/src/controller/controller_api.cpp + ${SOURCE_DIR}/src/controller/controller_push.cpp ) ######################################## # MistController - Build # diff --git a/lib/procs.h b/lib/procs.h index 033ae55c..ea4bc816 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -34,5 +34,5 @@ namespace Util { static bool isRunning(pid_t pid); static std::set socketList; ///< Holds sockets that should be closed before forking }; - } + diff --git a/lib/stream.cpp b/lib/stream.cpp index ae4e450c..d00084ac 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -219,7 +219,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir std::string source = input.getMember("source_match").getIndice(j).asString(); std::string front = source.substr(0,source.find('*')); std::string back = source.substr(source.find('*')+1); - DEBUG_MSG(DLVL_MEDIUM, "Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str()); + MEDIUM_MSG("Checking input %s: %s (%s)", inputs.getIndiceName(i).c_str(), input.getMember("name").asString().c_str(), source.c_str()); if (filename.substr(0,front.size()) == front && filename.substr(filename.size()-back.size()) == back){ player_bin = Util::getMyPath() + "MistIn" + input.getMember("name").asString(); @@ -319,150 +319,77 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir return streamAlive(streamname); } -/* roxlu-begin */ -int Util::startRecording(std::string streamname) { +/// Attempt to start a push for streamname to target. +/// Both streamname and target may be changed by this function: +/// - streamname is sanitized to a permissible streamname +/// - target gets variables replaced and may be altered by the RECORDING_START trigger response. +/// Attempts to match the altered target to an output that can push to it. +pid_t Util::startPush(std::string & streamname, std::string & target) { sanitizeName(streamname); - if (streamname.size() > 100){ - FAIL_MSG("Stream opening denied: %s is longer than 100 characters (%lu).", streamname.c_str(), streamname.size()); - return -1; + + if (Triggers::shouldTrigger("PUSH_OUT_START", streamname)) { + std::string payload = streamname+"\n"+target; + std::string filepath_response; + Triggers::doTrigger("PUSH_OUT_START", payload, streamname.c_str(), false, filepath_response); + target = filepath_response; + } + if (!target.size()){ + INFO_MSG("Aborting push of stream %s - target is empty", streamname.c_str()); + return 0; } - // Attempt to load up configuration and find this stream + // The target can hold variables like current time etc + replace_variables(target); + replace(target, "$stream", streamname); + + //Attempt to load up configuration and find this stream IPC::sharedPage mistConfOut(SHM_CONF, DEFAULT_CONF_PAGE_SIZE); IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); - //Lock the config to prevent race conditions and corruption issues while reading configLock.wait(); + DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); - - //Abort if no config available - if (!config){ - FAIL_MSG("Configuration not available, aborting! Is MistController running?"); - configLock.post();//unlock the config semaphore - return -2; - } - - //Find stream base name - std::string smp = streamname.substr(0, streamname.find_first_of("+ ")); - DTSC::Scan streamCfg = config.getMember("streams").getMember(smp); - if (!streamCfg){ - DEBUG_MSG(DLVL_HIGH, "Stream %s not configured - attempting to ignore", streamname.c_str()); - configLock.post(); - return -3; - } - - // When we have a validate trigger, we execute that first before we continue. - if (Triggers::shouldTrigger("RECORDING_VALIDATE", streamname)) { - std::string validate_result; - Triggers::doTrigger("RECORDING_VALIDATE", streamname, streamname.c_str(), false, validate_result); - INFO_MSG("RECORDING_VALIDATE returned: %s", validate_result.c_str()); - if (validate_result == "0") { - INFO_MSG("RECORDING_VALIDATE: the hook returned 0 so we're not going to create a recording."); - configLock.post(); - return 0; - } - } - - // Should we start an flv output? (We allow hooks to specify custom filenames) - DTSC::Scan recordFilenameConf = streamCfg.getMember("record"); - std::string recordFilename; - - if (Triggers::shouldTrigger("RECORDING_FILEPATH", streamname)) { - - std::string payload = streamname; - std::string filepath_response; - Triggers::doTrigger("RECORDING_FILEPATH", payload, streamname.c_str(), false, filepath_response); /* @todo do we need to handle the return of doTrigger? */ - - if (filepath_response.size() < 1024) { /* @todo is there a MAX_FILEPATH somewhere? */ - recordFilename = filepath_response; - } - else { - FAIL_MSG("The RECORDING_FILEPATH trigger returned a filename which is bigger then our allowed max filename size. Not using returned filepath from hook."); - } - } - - // No filename set through trigger, so use the one one from the stream config. - if (recordFilename.size() == 0) { - recordFilename = recordFilenameConf.asString(); - } - - /*if (recordFilename.size() == 0 - || recordFilename.substr(recordFilename.find_last_of(".") + 1) != "flv") - { - configLock.post(); - return -4; - }*/ - - // The filename can hold variables like current time etc.. - replace_variables(recordFilename); - replace(recordFilename, "$stream", streamname); - - INFO_MSG("Filepath that we use for the recording: %s", recordFilename.c_str()); - //to change hardcoding - //determine extension, first find the '.' for extension - size_t pointPlace = recordFilename.rfind("."); - if (pointPlace == std::string::npos){ - FAIL_MSG("no extension found in output name. Aborting recording."); - return -1; - } - std::string fileExtension = recordFilename.substr(pointPlace+1); DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors"); - DTSC::Scan output; - std::string output_filepath = ""; + std::string output_bin = ""; unsigned int outputs_size = outputs.getSize(); - HIGH_MSG("Recording outputs %d",outputs_size); - for (unsigned int i = 0; iasInt()); + } + }else{ + Controller::stopPush(Request["push_stop"].asInt()); + } + } + + if (Request.isMember("push_auto_add")){ + Controller::addPush(Request["push_auto_add"]); + } + + if (Request.isMember("push_auto_remove")){ + if (Request["push_auto_remove"].isArray()){ + jsonForEach(Request["push_auto_remove"], it){ + Controller::removePush(*it); + } + }else{ + Controller::removePush(Request["push_auto_remove"]); + } + } + + if (Request.isMember("push_auto_list")){ + Response["push_auto_list"] = Controller::Storage["autopushes"]; + } + Controller::configChanged = true; }else{//unauthorized diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp new file mode 100644 index 00000000..ff7667a5 --- /dev/null +++ b/src/controller/controller_push.cpp @@ -0,0 +1,120 @@ +#include +#include +#include +#include +#include +#include +#include "controller_storage.h" +#include "controller_statistics.h" + +namespace Controller { + + /// Internal list of currently active pushes + std::map activePushes; + + /// Immediately starts a push for the given stream to the given target. + /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. + void startPush(std::string & stream, std::string & target){ + pid_t ret = Util::startPush(stream, target); + if (ret){ + JSON::Value push; + push.append((long long)ret); + push.append(stream); + push.append(target); + activePushes[ret] = push; + } + } + + /// Immediately stops a push with the given ID + void stopPush(unsigned int ID){ + Util::Procs::Stop(ID); + } + + /// Gives a list of all currently active pushes + void listPush(JSON::Value & output){ + output.null(); + std::set toWipe; + for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ + if (Util::Procs::isActive(it->first)){ + output.append(it->second); + }else{ + toWipe.insert(it->first); + } + } + while (toWipe.size()){ + activePushes.erase(*toWipe.begin()); + toWipe.erase(toWipe.begin()); + } + } + + /// Adds a push to the list of auto-pushes. + /// Auto-starts currently active matches immediately. + void addPush(JSON::Value & request){ + JSON::Value newPush; + if (request.isArray()){ + newPush = request; + }else{ + newPush.append(request["stream"]); + newPush.append(request["target"]); + } + Controller::Storage["autopushes"].append(newPush); + if (activeStreams.size()){ + const std::string & pStr = newPush[0u].asStringRef(); + std::string target = newPush[1u].asStringRef(); + for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ + std::string streamname = it->first; + if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ + startPush(streamname, target); + } + } + } + } + + /// Removes a push from the list of auto-pushes. + /// Does not stop currently active matching pushes. + void removePush(const JSON::Value & request){ + JSON::Value delPush; + if (request.isString()){ + return removePush(request.asStringRef()); + } + if (request.isArray()){ + delPush = request; + }else{ + delPush.append(request["stream"]); + delPush.append(request["target"]); + } + JSON::Value newautopushes; + jsonForEach(Controller::Storage["autopushes"], it){ + if ((*it) != delPush){ + newautopushes.append(*it); + } + } + Controller::Storage["autopushes"] = newautopushes; + } + + /// Removes a push from the list of auto-pushes. + /// Does not stop currently active matching pushes. + void removePush(const std::string & streamname){ + JSON::Value newautopushes; + jsonForEach(Controller::Storage["autopushes"], it){ + if ((*it)[0u] != streamname){ + newautopushes.append(*it); + } + } + Controller::Storage["autopushes"] = newautopushes; + } + + /// Starts all configured auto pushes for the given stream. + void doAutoPush(std::string & streamname){ + jsonForEach(Controller::Storage["autopushes"], it){ + const std::string & pStr = (*it)[0u].asStringRef(); + if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ + std::string stream = streamname; + std::string target = (*it)[1u]; + startPush(stream, target); + } + } + } + +} + diff --git a/src/controller/controller_push.h b/src/controller/controller_push.h new file mode 100644 index 00000000..f2ca128a --- /dev/null +++ b/src/controller/controller_push.h @@ -0,0 +1,19 @@ +#include +#include +#include +#include + +namespace Controller { + //Functions for current pushes, start/stop/list + void startPush(std::string & streamname, std::string & target); + void stopPush(unsigned int ID); + void listPush(JSON::Value & output); + + //Functions for automated pushes, add/remove + void addPush(JSON::Value & request); + void removePush(const JSON::Value & request); + void removePush(const std::string & streamname); + + void doAutoPush(std::string & streamname); +} + diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 6cf912e7..0017b3c5 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -6,6 +6,7 @@ #include #include "controller_statistics.h" #include "controller_limits.h" +#include "controller_push.h" #ifndef KILL_ON_EXIT #define KILL_ON_EXIT false @@ -40,6 +41,7 @@ std::map Controller::sessions; / std::map Controller::connToSession; ///< Map of socket IDs to session info. bool Controller::killOnExit = KILL_ON_EXIT; tthread::mutex Controller::statsMutex; +std::map Controller::activeStreams; //For server-wide totals. Local to this file only. struct streamTotals { @@ -118,12 +120,26 @@ void Controller::killStatistics(char * data, size_t len, unsigned int id){ (*(data - 1)) = 128;//Send disconnect message; } + +///This function is ran whenever a stream becomes active. +void Controller::streamStarted(std::string stream){ + INFO_MSG("Stream %s became active", stream.c_str()); + Controller::doAutoPush(stream); +} + +///This function is ran whenever a stream becomes active. +void Controller::streamStopped(std::string stream){ + INFO_MSG("Stream %s became inactive", stream.c_str()); +} + + /// This function runs as a thread and roughly once per second retrieves /// statistics from all connected clients, as well as wipes /// old statistics that have disconnected over 10 minutes ago. void Controller::SharedMemStats(void * config){ DEBUG_MSG(DLVL_HIGH, "Starting stats thread"); IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); + std::set inactiveStreams; while(((Util::Config*)config)->is_active){ { tthread::lock_guard guard(statsMutex); @@ -144,6 +160,18 @@ void Controller::SharedMemStats(void * config){ mustWipe.pop_front(); } } + if (activeStreams.size()){ + for (std::map::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ + if (++it->second > 1){ + streamStopped(it->first); + inactiveStreams.insert(it->first); + } + } + while (inactiveStreams.size()){ + activeStreams.erase(*inactiveStreams.begin()); + inactiveStreams.erase(inactiveStreams.begin()); + } + } Controller::checkServerLimits(); /*LTS*/ } Util::sleep(1000); @@ -571,6 +599,14 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){ //the data is no longer valid - connection has gone away, store for later sessions[idx].finish(id); connToSession.erase(id); + }else{ + std::string strmName = tmpEx.streamName(); + if (strmName.size()){ + if (!activeStreams.count(strmName)){ + streamStarted(strmName); + } + activeStreams[strmName] = 0; + } } /*LTS-START*/ //if (counter < 125 && Controller::isBlacklisted(tmpEx.host(), ID, tmpEx.time())){ diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index e2a7971c..f9d97e42 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -1,3 +1,4 @@ +#pragma once #include #include #include @@ -15,6 +16,13 @@ namespace Controller { extern bool killOnExit; + + //These functions keep track of which streams are currently active. + extern std::map activeStreams; + ///This function is ran whenever a stream becomes active. + void streamStarted(std::string stream); + ///This function is ran whenever a stream becomes active. + void streamStopped(std::string stream); struct statLog { long time; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 329cbe90..8ab05e57 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -39,19 +39,6 @@ namespace Mist { capa["optional"]["DVR"]["default"] = 50000LL; /*LTS-start*/ option.null(); - option["arg"] = "string"; - option["long"] = "record"; - option["short"] = "r"; - option["help"] = "Record the stream to a file"; - option["value"].append(""); - config->addOption("record", option); - capa["optional"]["record"]["name"] = "Record to file"; - capa["optional"]["record"]["help"] = "Filename to record the stream to."; - capa["optional"]["record"]["option"] = "--record"; - capa["optional"]["record"]["type"] = "str"; - capa["optional"]["record"]["default"] = ""; - option.null(); - option["arg"] = "integer"; option["long"] = "cut"; option["short"] = "c"; @@ -135,7 +122,6 @@ namespace Mist { cutTime = 0; segmentSize = 5000; hasPush = false; - recordingPid = -1; resumeMode = false; } @@ -321,14 +307,6 @@ namespace Mist { DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); //remove all parts of this key for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) { - /*LTS-START*/ - if (recFile.is_open()) { - if (!recMeta.tracks.count(tid)) { - recMeta.tracks[tid] = myMeta.tracks[tid]; - recMeta.tracks[tid].reset(); - } - } - /*LTS-END*/ myMeta.tracks[tid].parts.pop_front(); } //remove the key itself @@ -952,81 +930,6 @@ namespace Mist { DEBUG_MSG(DLVL_DEVEL, "Setting segmentSize from %u to new value of %lli", segmentSize, tmpNum); segmentSize = tmpNum; } - - /* - //if stream is configured and setting is present, use it, always - std::string rec; - if (streamCfg && streamCfg.getMember("record")){ - rec = streamCfg.getMember("record").asInt(); - } else { - if (streamCfg){ - //otherwise, if stream is configured use the default - rec = config->getOption("record", true)[0u].asString(); - } else { - //if not, use the commandline argument - rec = config->getOption("record").asString(); - } - } - //if the new value is different, print a message and apply it - if (recName != rec){ - //close currently recording file, for we should open a new one - DEBUG_MSG(DLVL_DEVEL, "Stopping recording of %s to %s", config->getString("streamname").c_str(), recName.c_str()); - recFile.close(); - recMeta.tracks.clear(); - recName = rec; - } - if (recName != "" && !recFile.is_open()){ - DEBUG_MSG(DLVL_DEVEL, "Starting recording of %s to %s", config->getString("streamname").c_str(), recName.c_str()); - recFile.open(recName.c_str()); - if (recFile.fail()){ - DEBUG_MSG(DLVL_DEVEL, "Error occured during record opening: %s", strerror(errno)); - } - recBpos = 0; - } - */ - - /* roxlu-begin */ - // check if we have a video track with a keyframe, otherwise the mp4 output will fail. - // @todo as the mp4 recording was not working perfectly I focussed on getting it - // to work for .flv. This seems to work perfectly but ofc. we want to make it work - // for .mp4 too at some point. - bool has_keyframes = false; - std::map::iterator it = myMeta.tracks.begin(); - while (it != myMeta.tracks.end()) { - - DTSC::Track & tr = it->second; - if (tr.type != "video") { - ++it; - continue; - } - - if (tr.keys.size() > 0) { - has_keyframes = true; - break; - } - ++it; - } - - if (streamCfg && streamCfg.getMember("record") && streamCfg.getMember("record").asString().size() > 0 && has_keyframes) { - - // @todo check if output is already running ? - if (recordingPid == -1 && config != NULL) { - - INFO_MSG("The stream %s has a value specified for the recording. We're goint to start an output and record into %s", config->getString("streamname").c_str(), streamCfg.getMember("record").asString().c_str()); - - configLock.post(); - configLock.close(); - recordingPid = Util::startRecording(config->getString("streamname")); - if (recordingPid < 0) { - FAIL_MSG("Failed to start the recording for %s", config->getString("streamname").c_str()); - } - INFO_MSG("We started an output for recording with PID: %d", recordingPid); - return true; - } - } - /* roxlu-end */ - - /*LTS-END*/ configLock.post(); configLock.close(); diff --git a/src/input/input_buffer.h b/src/input/input_buffer.h index b360523a..21221216 100644 --- a/src/input/input_buffer.h +++ b/src/input/input_buffer.h @@ -40,17 +40,8 @@ namespace Mist { std::map > bufferLocations; std::map pushLocation; inputBuffer * singleton; - - std::string recName;/*LTS*/ - DTSC::Meta recMeta;/*LTS*/ - std::ofstream recFile;/*LTS*/ - long long int recBpos;/*LTS*/ - //This is used for an ugly fix to prevent metadata from dissapearing in some cases. + //This is used for an ugly fix to prevent metadata from disappearing in some cases. std::map initData; - - /* begin-roxlu */ - int recordingPid; // pid of the process that does the recording. Currently only MP4 supported. - /* end-roxlu */ }; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 4a092b16..c61a1ff9 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1,62 +1,3 @@ -/// Recording to file -/// -/// Currently MistServer has basic support for recording for which the -/// functionality is spread over a couple of files. The general flow in -/// mist (this is my understanding and I'm a newb to MistServer, roxlu), -/// is like this: -/// -/// The controller creates a couple of protocol handlers, e.g. for -/// RTMP. When a new live connection is made, an output is created through -/// this protocol handler. In the case of a live source, all received -/// data is passed into a inputBuffer object (see input_buffer.cpp). -/// -/// So, when the inputBuffer is created, the `setup()` function is -/// called. In this function the `config` object is available that holds -/// the configuration values for the specific stream. This is also where a -/// recording gets initialized. -/// -/// An recording is initialized by starting another output with a call to -/// `startRecording()`. `startRecording()` forks the current process and -/// then calls `execvp()` to take over the child process with -/// e.g. `MistOutFLV()`. When `execvp()` starts the other process (that -/// records the data), it passes the `--outputFilename` command line -/// argument. -/// -/// Each output checks if it's started with the `--outputFilename` flag; -/// this is done in the constructor of `Output`. In Output, it opens the -/// given filename and uses `dup2()` which makes sure that all `stdout` -/// data is written into the recording file. -/// -/// Though, because some or probably most outputs also write HTTP to -/// stdout, I created the function `HTTPOutput::sendResponse()` which -/// checks if the current output is creating a recording. When creating a -/// recording it simply skips the HTTP output. -/// -/// +-------------------------+ -/// | inputBuffer::setup() | -/// +-------+-----------------+ -/// | -/// o---- calls Util::startRecording() (stream.cpp) -/// | -/// v -/// +------------------------+ -/// | stream::startRecording | -> Kicks off output app with --outputFilename -/// +-------+----------------+ -/// | -/// v -/// +----------------+ -/// | MistOut[XXX] | -> Checks if started with --outputFilename, -/// +----------------+ in Output::Output() and starts recording. -/// -/// The following files contain updates that were made for the recording: -/// -/// - stream.cpp - startRecording() -/// - output.cpp - Output(), - added --outputFilename option -/// ~Output(), - closes the filedescriptor if opened. -/// openOutputFileForRecording() - opens the output file descriptor, uses dup2(). -/// closeOutputFileForRecording() - closes the output file descriptor. -/// - input_buffer.cpp - setup() - executes an MistOut[XXX] app. - #include #include #include @@ -101,12 +42,6 @@ namespace Mist { capa["optional"]["startpos"]["short"] = "P"; capa["optional"]["startpos"]["default"] = (long long)500; capa["optional"]["startpos"]["type"] = "uint"; - capa["optional"]["outputfilename"]["type"] = "str"; - capa["optional"]["outputfilename"]["name"] = "outputfilename"; - capa["optional"]["outputfilename"]["help"] = "Name of the file into which we write the recording."; - capa["optional"]["outputfilename"]["option"] = "--outputFilename"; - capa["optional"]["outputfilename"]["short"] = "O"; - capa["optional"]["outputfilename"]["default"] = ""; } Output::Output(Socket::Connection & conn) : myConn(conn) { @@ -129,16 +64,6 @@ namespace Mist { DEBUG_MSG(DLVL_WARN, "Warning: MistOut created with closed socket!"); } sentHeader = false; - /* begin-roxlu */ - outputFileDescriptor = -1; - - // When the stream has a output filename defined we open it so we can start recording. - if (config != NULL - && config->getString("outputfilename").size() != 0) - { - openOutputFileForRecording(); - } - /* end-roxlu */ } void Output::setBlocking(bool blocking){ @@ -146,14 +71,6 @@ namespace Mist { myConn.setBlocking(isBlocking); } - /*begin-roxlu*/ - Output::~Output(){ - if (config != NULL && config->getString("outputfilename").size() != 0){ - closeOutputFileForRecording(); - } - } - /*end-roxlu*/ - void Output::updateMeta(){ //read metadata from page to myMeta variable if (nProxy.metaPages[0].mapped){ @@ -298,6 +215,29 @@ namespace Mist { return; } updateMeta(); + if (myMeta.live && needsPlayableKeys()){ + bool waitALittleLonger = true; + unsigned int maxWaits = 15; + while (waitALittleLonger){ + waitALittleLonger = true; + if (myMeta.tracks.size()){ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (it->second.keys.size() >= needsPlayableKeys()){ + waitALittleLonger = false; + break; + } + } + } + if (waitALittleLonger){ + Util::sleep(1000); + if (--maxWaits == 0){ + FAIL_MSG("Giving up waiting for playable tracks"); + waitALittleLonger = false; + } + updateMeta(); + } + } + } } void Output::selectDefaultTracks(){ @@ -409,11 +349,6 @@ namespace Mist { DEBUG_MSG(DLVL_MEDIUM, "Selected tracks: %s (%lu)", selected.str().c_str(), selectedTracks.size()); } - /*begin-roxlu*/ - // Added this check while working on the recording, because when the output cant - // select a track it means it won't be able to start the recording. Therefore - // when we don't see this explicitly it makes debugging the recording feature - // a bit painfull :) if (selectedTracks.size() == 0) { INSANE_MSG("We didn't find any tracks which that we can use. selectedTrack.size() is 0."); for (std::map::iterator trit = myMeta.tracks.begin(); trit != myMeta.tracks.end(); trit++){ @@ -445,7 +380,6 @@ namespace Mist { } } } - /*end-roxlu*/ } /// Clears the buffer, sets parseData to false, and generally makes not very much happen at all. @@ -927,25 +861,6 @@ namespace Mist { } if ( !sentHeader){ DEBUG_MSG(DLVL_DONTEVEN, "sendHeader"); - bool waitLonger = false; - if (!myMeta.tracks.size()){ - waitLonger = true; - }else{ - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - if (!it->second.keys.size()){ - waitLonger = true; - break; - } - } - } - if (waitLonger){ - updateMeta(); - Util::sleep(1000); - static unsigned int metaTries = 0; - if(++metaTries < 7){ - continue; - } - } sendHeader(); } prepareNext(); @@ -1249,108 +1164,23 @@ namespace Mist { sentHeader = true; } - /*begin-roxlu*/ - bool Output::openOutputFileForRecording() { - - if (NULL == config) { - FAIL_MSG("Cannot open the output file for recording because the config member is NULL and we can't check if we actually want a recording."); - return false; - } - - // We won't open the output file when the user didn't set the outputfile through the admin. - if (config->getString("outputfilename").size() == 0) { - FAIL_MSG("Cannot open the output file for recording because the given name is empty."); - return false; - } - - if (outputFileDescriptor != -1) { - FAIL_MSG("Cannot open the output file for recording because it seems that it's already open. Make sure it's closed correctly."); - return false; - } - - // The RECORDING_START trigger needs to be execute before we open the file because - // the trigger may need to create some directories where we need to save the recording. - if (Triggers::shouldTrigger("RECORDING_START")) { - - if (0 == config->getString("streamname").size()) { - ERROR_MSG("Streamname is empty; the RECORDING_START trigger will not know what stream started it's recording. We do execute the trigger."); - } - - std::string payload = config->getString("streamname"); - Triggers::doTrigger("RECORDING_START", payload, streamName.c_str()); - } - - // Open the output file. + bool Output::connectToFile(std::string file) { int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; int mode = O_RDWR | O_CREAT | O_TRUNC; - - outputFileDescriptor = open(config->getString("outputfilename").c_str(), mode, flags); - if (outputFileDescriptor < 0) { - ERROR_MSG("Failed to open the file that we want to use to store the recording, error: %s", strerror(errno)); + int outFile = open(file.c_str(), mode, flags); + if (outFile < 0) { + ERROR_MSG("Failed to open file %s, error: %s", file.c_str(), strerror(errno)); return false; } - // Make a copy of the socket into outputFileDescriptor. Whenever we write to the socket we write to file. - int r = dup2(outputFileDescriptor, myConn.getSocket()); + int r = dup2(outFile, myConn.getSocket()); if (r == -1) { ERROR_MSG("Failed to create an alias for the socket using dup2: %s.", strerror(errno)); return false; } - - //make this output ready for recording to file - onRecord(); - - INFO_MSG("Opened %s for recording.", config->getString("outputfilename").c_str()); - + close(outFile); return true; } - bool Output::closeOutputFileForRecording() { - - if (config == NULL) { - ERROR_MSG("Config member is NULL, we cannot close the output file for the recording."); - return false; - } - - if (outputFileDescriptor == -1) { - ERROR_MSG("Requested to close the output file for the recording, but we're not making a recording."); - return false; - } - - if (config->getString("outputfilename").size() == 0) { - ERROR_MSG("Requested to close the output file for the recording, but the output filename is empty; not supposed to happen. We're still going to close the file descriptor though."); - } - - if (close(outputFileDescriptor) < 0) { - FAIL_MSG("Error: failed to close the output file: %s. We're resetting the file descriptor anyway.", strerror(errno)); - } - - outputFileDescriptor = -1; - - INFO_MSG("Close the file for the recording: %s", config->getString("outputfilename").c_str()); - - if (Triggers::shouldTrigger("RECORDING_STOP")) { - - if (0 == config->getString("streamname").size()) { - ERROR_MSG("Streamname is empty; the RECORDING_STOP trigger will not know what stream stopped it's recording. We do execute the trigger."); - } - - std::string payload; - payload = config->getString("streamname") +"\n"; - payload += config->getString("outputfilename"); - - Triggers::doTrigger("RECORDING_STOP", payload, streamName.c_str()); - } - - return true; - } - /*end-roxlu*/ - bool Output::recording(){ - if (config->getString("outputfilename").size() > 0) { - DONTEVEN_MSG("We're not sending a HTTP response because we're currently creating a recording."); - return true; - } - return false; - } - } + diff --git a/src/output/output.h b/src/output/output.h index 31af830b..3e0fbb5b 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -42,7 +42,6 @@ namespace Mist { public: //constructor and destructor Output(Socket::Connection & conn); - virtual ~Output(); //static members for initialization and capabilities static void init(Util::Config * cfg); static JSON::Value capa; @@ -63,11 +62,9 @@ namespace Mist { long unsigned int getMainSelectedTrack(); void updateMeta(); void selectDefaultTracks(); - /*begin-roxlu*/ - bool openOutputFileForRecording(); // Opens the output file and uses dup2() to make sure that all stdout is written into a file. - bool closeOutputFileForRecording(); // Closes the output file into which we're writing and resets the file descriptor. - /*end-roxlu*/ + bool connectToFile(std::string file); static bool listenMode(){return true;} + virtual unsigned int needsPlayableKeys(){return 2;} //virtuals. The optional virtuals have default implementations that do as little as possible. virtual void sendNext() {}//REQUIRED! Others are optional. virtual void prepareNext(); @@ -132,10 +129,7 @@ namespace Mist { bool sentHeader;///< If false, triggers sendHeader if parseData is true. std::map bookKeeping; - /*begin-roxlu*/ - int outputFileDescriptor; // Write output into this file. - /*end-roxlu*/ - bool recording(); + virtual bool isRecording(){return false;}; }; } diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 2cffaab5..0505f294 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -400,21 +400,6 @@ namespace Mist { return trustedProxies.count(ip) > 0; } /*LTS-END*/ - /*begin-roxlu*/ - void HTTPOutput::sendResponse(std::string message, std::string code) { - - // Only send output when we're not creating a recording. - if (recording()) return; - - if (code.size() == 0) { - WARN_MSG("Requested to send a HTTP response but the given code is empty. Trying though."); - } - if (message.size() == 0) { - WARN_MSG("Requested to send a HTTP response but the given message is empty. Trying though."); - } - - H.SendResponse(code, message, myConn); - } - /*end-roxlu*/ } + diff --git a/src/output/output_http.h b/src/output/output_http.h index 18791979..75a9d368 100644 --- a/src/output/output_http.h +++ b/src/output/output_http.h @@ -21,9 +21,6 @@ namespace Mist { static bool listenMode(){return false;} void reConnector(std::string & connector); std::string getHandler(); - /*begin-roxlu*/ - void sendResponse(std::string message = "OK", std::string code = "200"); - /*end-roxlu*/ protected: HTTP::Parser H; std::string getConnectedHost();//LTS diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 227f1574..b81493ce 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -34,9 +34,18 @@ namespace Mist { setBlocking(false); maxSkipAhead = 1500; minSkipAhead = 500; + isPushing = false; } OutRTMP::~OutRTMP() {} + + unsigned int OutRTMP::needsPlayableKeys(){ + if (isPushing){ + return 0; + }else{ + return 2; + } + } void OutRTMP::parseVars(std::string data){ std::string varname; @@ -575,6 +584,7 @@ namespace Mist { configLock.post(); configLock.close(); if (!myConn){return;}//do not initialize if rejected + isPushing = true; initialize(); } //send a _result reply diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 5eacdae5..277e43b2 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -14,7 +14,9 @@ namespace Mist { void onRequest(); void sendNext(); void sendHeader(); + unsigned int needsPlayableKeys(); protected: + bool isPushing; void parseVars(std::string data); std::string app_name; void parseChunk(Socket::Buffer & inputBuffer);