From f8b9db9dcdaa671abc8ac7a64a91812a5f660a35 Mon Sep 17 00:00:00 2001 From: Erik Zandvliet <erik.zandvliet@ddvtech.com> Date: Thu, 24 Aug 2017 12:12:32 +0200 Subject: [PATCH] Generalized recording, added recfrom and recuntil flags --- lib/stream.cpp | 5 +- src/output/output.cpp | 83 ++++++++++++++++++++++++++- src/output/output.h | 1 + src/output/output_httpts.cpp | 20 ------- src/output/output_progressive_flv.cpp | 29 +--------- src/output/output_wav.cpp | 30 +--------- 6 files changed, 87 insertions(+), 81 deletions(-) diff --git a/lib/stream.cpp b/lib/stream.cpp index e711a17b..d4b61295 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -374,6 +374,7 @@ pid_t Util::startPush(const std::string & streamname, std::string & target) { DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors"); std::string output_bin = ""; + std::string checkTarget = target.substr(0, target.find('?')); unsigned int outputs_size = outputs.getSize(); for (unsigned int i = 0; i<outputs_size && !output_bin.size(); ++i){ DTSC::Scan output = outputs.getIndice(i); @@ -383,9 +384,9 @@ pid_t Util::startPush(const std::string & streamname, std::string & target) { std::string tar_match = output.getMember("push_urls").getIndice(j).asString(); std::string front = tar_match.substr(0,tar_match.find('*')); std::string back = tar_match.substr(tar_match.find('*')+1); - MEDIUM_MSG("Checking output %s: %s (%s)", outputs.getIndiceName(i).c_str(), output.getMember("name").asString().c_str(), target.c_str()); + MEDIUM_MSG("Checking output %s: %s (%s)", outputs.getIndiceName(i).c_str(), output.getMember("name").asString().c_str(), checkTarget.c_str()); - if (target.substr(0,front.size()) == front && target.substr(target.size()-back.size()) == back){ + if (checkTarget.substr(0,front.size()) == front && checkTarget.substr(checkTarget.size()-back.size()) == back){ output_bin = Util::getMyPath() + "MistOut" + output.getMember("name").asString(); break; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 4f82a1f0..3e6e5612 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -19,6 +19,7 @@ #include <arpa/inet.h> #include <sys/socket.h> #include <netdb.h> +#include <sys/stat.h> /*LTS-END*/ namespace Mist{ @@ -85,6 +86,43 @@ namespace Mist{ } sentHeader = false; isRecordingToFile = false; + + if (config->getString("streamname").size()){ + streamName = config->getString("streamname"); + } + + if(capa.isMember("push_urls")){ + std::string tgt = config->getString("target"); + struct stat tgtStat; + if (tgt.size()){ + if(stat(tgt.substr(0, tgt.rfind('/')).c_str(), &tgtStat) != 0){ + INFO_MSG("could not stat %s", tgt.substr(0, tgt.rfind('/')).c_str()); + return; + } + if (!streamName.size()){ + WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["nama"].asString().c_str()); + conn.close(); + return; + } + if (tgt == "-"){ + parseData = true; + wantRequest = false; + INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(), capa["nama"].asString().c_str()); + return; + } + std::string params = tgt.substr(tgt.find('?') + 1); + tgt = tgt.substr(0, tgt.find('?')); + if (connectToFile(tgt)){ + parseData = true; + wantRequest = false; + INFO_MSG("Recording %s to %s with %s format", streamName.c_str(), tgt.c_str(), capa["nama"].asString().c_str()); + + HTTP::parseVars(params, recParams); + }else{ + conn.close(); + } + } + } } void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){ @@ -781,6 +819,43 @@ namespace Mist{ //if yes, seek here if (good){break;} } + } + if (isRecordingToFile){ + if (recParams.count("recuntil")){ + long long endRec = atoll(recParams["recuntil"].c_str()); + if (endRec < startTime()){ + FAIL_MSG("Record range not available anymore"); + config->is_active = false; + return; + } + } + if (recParams.count("recfrom") && atoll(recParams["recfrom"].c_str()) != 0){ + unsigned long int mainTrack = getMainSelectedTrack(); + long long startRec = atoll(recParams["recfrom"].c_str()); + if (startRec > myMeta.tracks[mainTrack].lastms){ + if (myMeta.vod){ + FAIL_MSG("Record range out of bounds on vod file"); + config->is_active = false; + return; + } + long unsigned int streamAvail = myMeta.tracks[mainTrack].lastms; + long unsigned int lastUpdated = Util::getMS(); + while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail){ + Util::sleep(500); + updateMeta(); + if (myMeta.tracks[mainTrack].lastms > streamAvail){ + stats(); + streamAvail = myMeta.tracks[mainTrack].lastms; + lastUpdated = Util::getMS(); + } + } + } + if (startRec < startTime()){ + WARN_MSG("Record begin @ %llu ms not available, starting at %llu ms instead", startRec, startTime()); + startRec = startTime(); + } + seekPos = startRec; + } } MEDIUM_MSG("Initial seek to %llums", seekPos); seek(seekPos); @@ -936,8 +1011,12 @@ namespace Mist{ needsLookAhead = 0; } } - - sendNext(); + + if (isRecordingToFile && recParams.count("recuntil") && atoll(recParams["recuntil"].c_str()) < lastPacketTime){ + config->is_active = false; + }else{ + sendNext(); + } }else{ /*LTS-START*/ if(Triggers::shouldTrigger("CONN_STOP", streamName)){ diff --git a/src/output/output.h b/src/output/output.h index d2bcde61..4ae2bd32 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -96,6 +96,7 @@ namespace Mist { int pageNumForKey(long unsigned int trackId, long long int keyNum); int pageNumMax(long unsigned int trackId); bool isRecordingToFile; + std::map<std::string, std::string> recParams; unsigned int lastStats;///<Time of last sending of stats. std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes. std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets. diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index ddc26d2f..7d4cdfc4 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -7,26 +7,6 @@ namespace Mist { OutHTTPTS::OutHTTPTS(Socket::Connection & conn) : TSOutput(conn){ sendRepeatingHeaders = 500;//PAT/PMT every 500ms (DVB spec) - if (config->getString("target").size()){ - if (!streamName.size()){ - WARN_MSG("Recording unconnected TS output to file! Cancelled."); - conn.close(); - return; - } - if (config->getString("target") == "-"){ - parseData = true; - wantRequest = false; - INFO_MSG("Outputting %s to stdout in TS format", streamName.c_str()); - return; - } - if (connectToFile(config->getString("target"))){ - parseData = true; - wantRequest = false; - INFO_MSG("Recording %s to %s in TS format", streamName.c_str(), config->getString("target").c_str()); - }else{ - conn.close(); - } - } } OutHTTPTS::~OutHTTPTS() {} diff --git a/src/output/output_progressive_flv.cpp b/src/output/output_progressive_flv.cpp index 401bab82..5cd9efaf 100644 --- a/src/output/output_progressive_flv.cpp +++ b/src/output/output_progressive_flv.cpp @@ -1,34 +1,7 @@ #include "output_progressive_flv.h" namespace Mist { - OutProgressiveFLV::OutProgressiveFLV(Socket::Connection & conn) : HTTPOutput(conn){ - if (config->getString("target").size()){ - initialize(); - if (!streamName.size()){ - WARN_MSG("Recording unconnected FLV output to file! Cancelled."); - conn.close(); - return; - } - if (config->getString("target") == "-"){ - parseData = true; - wantRequest = false; - INFO_MSG("Outputting %s to stdout in FLV format", streamName.c_str()); - return; - } - if (!myMeta.tracks.size()){ - INFO_MSG("Stream not available - aborting"); - conn.close(); - return; - } - if (connectToFile(config->getString("target"))){ - parseData = true; - wantRequest = false; - INFO_MSG("Recording %s to %s in FLV format", streamName.c_str(), config->getString("target").c_str()); - return; - } - conn.close(); - } - } + OutProgressiveFLV::OutProgressiveFLV(Socket::Connection & conn) : HTTPOutput(conn){} void OutProgressiveFLV::init(Util::Config * cfg){ HTTPOutput::init(cfg); diff --git a/src/output/output_wav.cpp b/src/output/output_wav.cpp index c3453b07..e4198dd5 100644 --- a/src/output/output_wav.cpp +++ b/src/output/output_wav.cpp @@ -3,35 +3,7 @@ #include <mist/util.h> namespace Mist{ - OutWAV::OutWAV(Socket::Connection &conn) : HTTPOutput(conn){ - if (config->getString("target").size()){ - initialize(); - if (!streamName.size()){ - WARN_MSG("Recording unconnected WAV output to file! Cancelled."); - conn.close(); - return; - } - if (config->getString("target") == "-"){ - parseData = true; - wantRequest = false; - INFO_MSG("Outputting %s to stdout in WAV format", streamName.c_str()); - return; - } - if (!myMeta.tracks.size()){ - INFO_MSG("Stream not available - aborting"); - conn.close(); - return; - } - if (connectToFile(config->getString("target"))){ - parseData = true; - wantRequest = false; - INFO_MSG("Recording %s to %s in WAV format", streamName.c_str(), - config->getString("target").c_str()); - return; - } - conn.close(); - } - } + OutWAV::OutWAV(Socket::Connection &conn) : HTTPOutput(conn){} void OutWAV::init(Util::Config *cfg){ HTTPOutput::init(cfg);