diff --git a/lib/stream.cpp b/lib/stream.cpp index 4de017bd..daf37550 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -509,6 +509,9 @@ pid_t Util::startPush(const std::string &streamname, std::string &target){ return 0; } + //Set original target string in environment + setenv("MST_ORIG_TARGET", target.c_str(), 1); + // The target can hold variables like current time etc streamVariables(target, streamname); @@ -554,7 +557,12 @@ pid_t Util::startPush(const std::string &streamname, std::string &target){ (char *)target.c_str(), (char *)NULL}; int stdErr = 2; - return Util::Procs::StartPiped(argv, 0, 0, &stdErr); + //Cache return value so we can do some cleaning before we return + pid_t ret = Util::Procs::StartPiped(argv, 0, 0, &stdErr); + //Clean up environment + unsetenv("MST_ORIG_TARGET"); + //Actually return the resulting PID + return ret; } uint8_t Util::getStreamStatus(const std::string &streamname){ diff --git a/src/output/output.cpp b/src/output/output.cpp index 414d96f0..64fc1641 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -710,6 +710,19 @@ namespace Mist{ } return keyNo; } + + ///Returns the timestamp of the next upcoming keyframe after thisPacket, or 0 if that cannot be determined (yet). + uint64_t Output::nextKeyTime(){ + DTSC::Track & trk = myMeta.tracks[getMainSelectedTrack()]; + if (!trk.keys.size()){ + return 0; + } + std::deque::iterator it; + for (it = trk.keys.begin(); it != trk.keys.end(); it++){ + if (it->getTime() > lastPacketTime){return it->getTime();} + } + return 0; + } int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){ if (!nProxy.metaPages.count(trackId) || !nProxy.metaPages[trackId].mapped){ @@ -1085,6 +1098,11 @@ namespace Mist{ seekPos = startRec; } //Duration to record in seconds. Overrides recstop. + if (targetParams.count("split")){ + long long endRec = atoll(targetParams["split"].c_str())*1000; + INFO_MSG("Will split recording every %lld seconds", atoll(targetParams["split"].c_str())); + targetParams["nxt-split"] = JSON::Value((int64_t)(seekPos + endRec)).asString(); + } if (targetParams.count("duration")){ long long endRec = atoll(targetParams["duration"].c_str())*1000; targetParams["recstop"] = JSON::Value((int64_t)(seekPos + endRec)).asString(); @@ -1253,6 +1271,34 @@ namespace Mist{ } Util::wait(millis); } + + /// Called right before sendNext(). Should return true if this is a stopping point. + bool Output::reachedPlannedStop(){ + //If we're recording to file and reached the target position, stop + if (isRecordingToFile && targetParams.count("recstop") && atoll(targetParams["recstop"].c_str()) <= lastPacketTime){ + INFO_MSG("End of planned recording reached"); + return true; + } + //Regardless of playback method, if we've reached the wanted stop point, stop + if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= lastPacketTime){ + INFO_MSG("End of planned playback reached"); + return true; + } + //check if we need to split here + if (inlineRestartCapable() && targetParams.count("split")){ + //Make sure that inlineRestartCapable outputs with splitting enabled only stop right before keyframes + //This works because this function is executed right BEFORE sendNext(), causing thisPacket to be the next packet + //in the newly splitted file. + if (!thisPacket.getFlag("keyframe")){return false;} + //is this a split point? + if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){ + INFO_MSG("Split point reached"); + return true; + } + } + //Otherwise, we're not stopping + return false; + } /// \triggers /// The `"CONN_OPEN"` trigger is stream-specific, and is ran when a connection is made or passed to a new handler. Its payload is: @@ -1374,15 +1420,30 @@ namespace Mist{ } } - if (isRecordingToFile && targetParams.count("recstop") && atoll(targetParams["recstop"].c_str()) < lastPacketTime){ - INFO_MSG("End of planned recording reached, shutting down"); - if (!onFinish()){ - break; - } - }else if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) < lastPacketTime){ - INFO_MSG("End of planned playback reached, shutting down"); - if (!onFinish()){ - break; + if (reachedPlannedStop()){ + const char * origTarget = getenv("MST_ORIG_TARGET"); + targetParams.erase("nxt-split"); + if (inlineRestartCapable() && origTarget && !reachedPlannedStop()){ + std::string newTarget = origTarget; + Util::streamVariables(newTarget, streamName); + if (newTarget.rfind('?') != std::string::npos){ + newTarget.erase(newTarget.rfind('?')); + } + INFO_MSG("Switching to next push target filename: %s", newTarget.c_str()); + if (!connectToFile(newTarget)){ + FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str()); + onFinish(); + break; + } + uint64_t endRec = lastPacketTime + atoll(targetParams["split"].c_str())*1000; + targetParams["nxt-split"] = JSON::Value(endRec).asString(); + sentHeader = false; + sendHeader(); + }else{ + if (!onFinish()){ + INFO_MSG("Shutting down because planned stopping point reached"); + break; + } } } sendNext(); @@ -1815,7 +1876,6 @@ namespace Mist{ } close(outFile); isRecordingToFile = true; - sought = false; return true; } diff --git a/src/output/output.h b/src/output/output.h index 2db11467..1e6765d9 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -61,6 +61,7 @@ namespace Mist { static bool listenMode(){return true;} uint32_t currTrackCount() const; virtual bool isReadyForPlay(); + virtual bool reachedPlannedStop(); //virtuals. The optional virtuals have default implementations that do as little as possible. /// This function is called whenever a packet is ready for sending. /// Inside it, thisPacket is guaranteed to contain a valid packet. @@ -102,6 +103,7 @@ namespace Mist { std::set buffer;///< A sorted list of next-to-be-loaded packets. bool sought;/// targetParams; /*LTS*/ std::string UA; ///< User Agent string, if known. @@ -118,6 +120,7 @@ namespace Mist { bool isBlocking;///< If true, indicates that myConn is blocking. uint32_t crc;///< Checksum, if any, for usage in the stats. unsigned int getKeyForTime(long unsigned int trackId, long long timeStamp); + uint64_t nextKeyTime(); //stream delaying variables unsigned int maxSkipAhead;///< Maximum ms that we will go ahead of the intended timestamps. diff --git a/src/output/output_ebml.cpp b/src/output/output_ebml.cpp index b4638ef0..ea7e1f72 100644 --- a/src/output/output_ebml.cpp +++ b/src/output/output_ebml.cpp @@ -153,6 +153,10 @@ namespace Mist{ }else{ //In live, clusters are aligned with the lookAhead time newClusterTime = currentClusterTime+(needsLookAhead?needsLookAhead:1); + //EXCEPT if there's a keyframe within the lookAhead window, then align to that keyframe instead + //This makes sure that inlineRestartCapable works as intended + uint64_t nxtKTime = nextKeyTime(); + if (nxtKTime && nxtKTime < newClusterTime){newClusterTime = nxtKTime;} } EBML::sendElemHead(myConn, EBML::EID_CLUSTER, clusterSize(currentClusterTime, newClusterTime)); EBML::sendElemUInt(myConn, EBML::EID_TIMECODE, currentClusterTime); diff --git a/src/output/output_ebml.h b/src/output/output_ebml.h index 25dda375..9cb5ea3c 100644 --- a/src/output/output_ebml.h +++ b/src/output/output_ebml.h @@ -10,7 +10,8 @@ namespace Mist{ void sendNext(); virtual void sendHeader(); uint32_t clusterSize(uint64_t start, uint64_t end); - + protected: + virtual bool inlineRestartCapable() const{return true;} private: bool isRecording(); std::string doctype; diff --git a/src/output/output_progressive_flv.h b/src/output/output_progressive_flv.h index 5b957c5e..6a506e38 100644 --- a/src/output/output_progressive_flv.h +++ b/src/output/output_progressive_flv.h @@ -10,6 +10,7 @@ namespace Mist { void sendNext(); void sendHeader(); private: + virtual bool inlineRestartCapable() const{return true;} FLV::Tag tag; bool isRecording(); bool isFileTarget(){return isRecording();} diff --git a/src/output/output_ts_base.h b/src/output/output_ts_base.h index d1b041ef..f113a2db 100644 --- a/src/output/output_ts_base.h +++ b/src/output/output_ts_base.h @@ -17,13 +17,18 @@ namespace Mist { virtual void sendNext(); virtual void sendTS(const char * tsData, unsigned int len=188){}; void fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg); + virtual void sendHeader(){ + sentHeader = true; + packCounter = 0; + } protected: + virtual bool inlineRestartCapable() const{return true;} std::map first; std::map contCounters; int contPAT; int contPMT; int contSDT; - unsigned int packCounter; ///\todo update constructors? + unsigned int packCounter; TS::Packet packData; bool appleCompat; uint64_t sendRepeatingHeaders; ///< Amount of ms between PAT/PMT. Zero means do not repeat. diff --git a/src/output/output_wav.h b/src/output/output_wav.h index d7f3a701..e0ee4709 100644 --- a/src/output/output_wav.h +++ b/src/output/output_wav.h @@ -9,6 +9,8 @@ namespace Mist { void onHTTP(); void sendNext(); void sendHeader(); + protected: + virtual bool inlineRestartCapable() const{return true;} private: bool isRecording(); bool isFileTarget(){return isRecording();}