From 8fbdafb288de60f88dac9a5b042e3e0cfc9ad788 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 31 Aug 2017 15:41:46 +0200 Subject: [PATCH] Generalized recording, pimped. --- lib/stream.cpp | 2 +- src/output/output.cpp | 126 ++++++++++++++++---------- src/output/output.h | 3 +- src/output/output_http.cpp | 3 - src/output/output_httpts.h | 1 + src/output/output_progressive_flv.h | 1 + src/output/output_progressive_mp3.cpp | 23 +---- src/output/output_progressive_mp3.h | 1 + src/output/output_wav.h | 1 + 9 files changed, 85 insertions(+), 76 deletions(-) diff --git a/lib/stream.cpp b/lib/stream.cpp index d4b61295..5a2f5293 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -374,7 +374,7 @@ pid_t Util::startPush(const std::string & streamname, std::string & target) { DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors"); std::string output_bin = ""; - std::string checkTarget = target.substr(0, target.find('?')); + std::string checkTarget = target.substr(0, target.rfind('?')); unsigned int outputs_size = outputs.getSize(); for (unsigned int i = 0; igetString("streamname").size()){ + //If we have a streamname option, set internal streamname to that option + if (!streamName.size() && config->hasOption("streamname")){ streamName = config->getString("streamname"); } - if(capa.isMember("push_urls")){ + /*LTS-START*/ + // If we have a target, scan for trailing ?, remove it, parse into targetParams + if (config->hasOption("target")){ std::string tgt = config->getString("target"); - struct stat tgtStat; - if (tgt.size()){ - if(stat(tgt.substr(0, tgt.rfind('/')).c_str(), &tgtStat) != 0){ - INFO_MSG("could not stat %s", tgt.substr(0, tgt.rfind('/')).c_str()); - return; - } - if (!streamName.size()){ - WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["nama"].asString().c_str()); - conn.close(); - return; - } - if (tgt == "-"){ - parseData = true; - wantRequest = false; - INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(), capa["nama"].asString().c_str()); - return; - } - std::string params = tgt.substr(tgt.find('?') + 1); - tgt = tgt.substr(0, tgt.find('?')); - if (connectToFile(tgt)){ - parseData = true; - wantRequest = false; - INFO_MSG("Recording %s to %s with %s format", streamName.c_str(), tgt.c_str(), capa["nama"].asString().c_str()); - - HTTP::parseVars(params, recParams); - }else{ - conn.close(); - } + if (tgt.rfind('?') != std::string::npos){ + INFO_MSG("Stripping target options: %s", tgt.substr(tgt.rfind('?') + 1).c_str()); + HTTP::parseVars(tgt.substr(tgt.rfind('?') + 1), targetParams); + config->getOption("target", true).append(tgt.substr(0, tgt.rfind('?'))); + }else{ + INFO_MSG("Not modifying target (%s), no options present", tgt.c_str()); } } + /*LTS-END*/ + } + + bool Output::isFileTarget(){ + INFO_MSG("Default file target handler (false)"); + return false; } void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){ @@ -820,27 +807,29 @@ namespace Mist{ if (good){break;} } } + /*LTS-START*/ if (isRecordingToFile){ - if (recParams.count("recuntil")){ - long long endRec = atoll(recParams["recuntil"].c_str()); + if (targetParams.count("recstop")){ + long long endRec = atoll(targetParams["recstop"].c_str()); if (endRec < startTime()){ - FAIL_MSG("Record range not available anymore"); - config->is_active = false; + FAIL_MSG("Entire recording range is in the past"); + onFail(); return; } + INFO_MSG("Recording will stop at %lld", endRec); } - if (recParams.count("recfrom") && atoll(recParams["recfrom"].c_str()) != 0){ + if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){ unsigned long int mainTrack = getMainSelectedTrack(); - long long startRec = atoll(recParams["recfrom"].c_str()); + long long startRec = atoll(targetParams["recstart"].c_str()); if (startRec > myMeta.tracks[mainTrack].lastms){ if (myMeta.vod){ - FAIL_MSG("Record range out of bounds on vod file"); - config->is_active = false; + FAIL_MSG("Recording start past end of non-live source"); + onFail(); return; } long unsigned int streamAvail = myMeta.tracks[mainTrack].lastms; long unsigned int lastUpdated = Util::getMS(); - while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail){ + while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail && keepGoing()){ Util::sleep(500); updateMeta(); if (myMeta.tracks[mainTrack].lastms > streamAvail){ @@ -851,12 +840,14 @@ namespace Mist{ } } if (startRec < startTime()){ - WARN_MSG("Record begin @ %llu ms not available, starting at %llu ms instead", startRec, startTime()); + WARN_MSG("Record begin at %llu ms not available, starting at %llu ms instead", startRec, startTime()); startRec = startTime(); } + INFO_MSG("Recording will start at %lld", startRec); seekPos = startRec; } } + /*LTS-END*/ MEDIUM_MSG("Initial seek to %llums", seekPos); seek(seekPos); } @@ -944,6 +935,36 @@ namespace Mist{ /// ~~~~~~~~~~~~~~~ int Output::run(){ /*LTS-START*/ + //Connect to file target, if needed + if(isFileTarget()){ + if (!streamName.size()){ + WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["name"].asString().c_str()); + myConn.close(); + return 2; + } + initialize(); + if (!myMeta.tracks.size() || !selectedTracks.size() || !keepGoing()){ + INFO_MSG("Stream not available - aborting"); + myConn.close(); + }else{ + if (config->getString("target") == "-"){ + parseData = true; + wantRequest = false; + if (!targetParams.count("realtime")){realTime = 0;} + INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(), capa["name"].asString().c_str()); + }else{ + if (connectToFile(config->getString("target"))){ + parseData = true; + wantRequest = false; + if (!targetParams.count("realtime")){realTime = 0;} + INFO_MSG("Recording %s to %s with %s format", streamName.c_str(), config->getString("target").c_str(), capa["name"].asString().c_str()); + }else{ + myConn.close(); + } + } + } + } + //Handle CONN_OPEN trigger, if needed if(Triggers::shouldTrigger("CONN_OPEN", streamName)){ std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; if (!Triggers::doTrigger("CONN_OPEN", payload, streamName)){ @@ -1012,19 +1033,21 @@ namespace Mist{ } } - if (isRecordingToFile && recParams.count("recuntil") && atoll(recParams["recuntil"].c_str()) < lastPacketTime){ - config->is_active = false; - }else{ - sendNext(); + if (isRecordingToFile && targetParams.count("recstop") && atoll(targetParams["recstop"].c_str()) < lastPacketTime){ + INFO_MSG("End of planned recording reached, shutting down"); + if (!onFinish()){ + break; + } } + sendNext(); }else{ - /*LTS-START*/ + INFO_MSG("Shutting down because of stream end"); + /*LTS-START*/ if(Triggers::shouldTrigger("CONN_STOP", streamName)){ std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"; Triggers::doTrigger("CONN_STOP", payload, streamName); } /*LTS-END*/ - INFO_MSG("Shutting down because of stream end"); if (!onFinish()){ break; } @@ -1142,7 +1165,7 @@ namespace Mist{ //actually drop what we found. //if both of the above cases occur, the next prepareNext iteration will take care of that for (std::set::iterator it = dropTracks.begin(); it != dropTracks.end(); ++it){ - dropTrack(*it, "seek/select mismatch", true); + dropTrack(*it, "seek/select mismatch"); } return false; } @@ -1150,7 +1173,7 @@ namespace Mist{ sortedPageInfo nxt = *(buffer.begin()); if (!myMeta.tracks.count(nxt.tid)){ - dropTrack(nxt.tid, "disappeared from metadata", true); + dropTrack(nxt.tid, "disappeared from metadata"); return false; } @@ -1174,7 +1197,7 @@ namespace Mist{ buffer.insert(nxt); } }else{ - dropTrack(nxt.tid, "page load failure", true); + dropTrack(nxt.tid, "page load failure"); } return false; } @@ -1186,6 +1209,10 @@ namespace Mist{ dropTrack(nxt.tid, "timeless empty packet"); return false; } + //for VoD, check if we've reached the end of the track, if so, drop it + if (myMeta.vod && nxt.time > myMeta.tracks[nxt.tid].lastms){ + dropTrack(nxt.tid, "Reached end of track", false); + } //if this is a live stream, we might have just reached the live point. //check where the next key is nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time); @@ -1452,6 +1479,7 @@ namespace Mist{ } close(outFile); isRecordingToFile = true; + sought = false; return true; } diff --git a/src/output/output.h b/src/output/output.h index 4ae2bd32..36b8f124 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -96,13 +96,13 @@ namespace Mist { int pageNumForKey(long unsigned int trackId, long long int keyNum); int pageNumMax(long unsigned int trackId); bool isRecordingToFile; - std::map recParams; unsigned int lastStats;///