Added support for precise splitting of pushes to disk (i.e. recording) with re-variable-ized target strings at each split

This commit is contained in:
Thulinma 2019-09-28 22:45:28 +02:00
parent a071b365e5
commit 5be878bea5
8 changed files with 97 additions and 13 deletions

View file

@ -509,6 +509,9 @@ pid_t Util::startPush(const std::string &streamname, std::string &target){
return 0;
}
//Set original target string in environment
setenv("MST_ORIG_TARGET", target.c_str(), 1);
// The target can hold variables like current time etc
streamVariables(target, streamname);
@ -554,7 +557,12 @@ pid_t Util::startPush(const std::string &streamname, std::string &target){
(char *)target.c_str(), (char *)NULL};
int stdErr = 2;
return Util::Procs::StartPiped(argv, 0, 0, &stdErr);
//Cache return value so we can do some cleaning before we return
pid_t ret = Util::Procs::StartPiped(argv, 0, 0, &stdErr);
//Clean up environment
unsetenv("MST_ORIG_TARGET");
//Actually return the resulting PID
return ret;
}
uint8_t Util::getStreamStatus(const std::string &streamname){

View file

@ -711,6 +711,19 @@ namespace Mist{
return keyNo;
}
///Returns the timestamp of the next upcoming keyframe after thisPacket, or 0 if that cannot be determined (yet).
uint64_t Output::nextKeyTime(){
DTSC::Track & trk = myMeta.tracks[getMainSelectedTrack()];
if (!trk.keys.size()){
return 0;
}
std::deque<DTSC::Key>::iterator it;
for (it = trk.keys.begin(); it != trk.keys.end(); it++){
if (it->getTime() > lastPacketTime){return it->getTime();}
}
return 0;
}
int Output::pageNumForKey(long unsigned int trackId, long long int keyNum){
if (!nProxy.metaPages.count(trackId) || !nProxy.metaPages[trackId].mapped){
char id[NAME_BUFFER_SIZE];
@ -1085,6 +1098,11 @@ namespace Mist{
seekPos = startRec;
}
//Duration to record in seconds. Overrides recstop.
if (targetParams.count("split")){
long long endRec = atoll(targetParams["split"].c_str())*1000;
INFO_MSG("Will split recording every %lld seconds", atoll(targetParams["split"].c_str()));
targetParams["nxt-split"] = JSON::Value((int64_t)(seekPos + endRec)).asString();
}
if (targetParams.count("duration")){
long long endRec = atoll(targetParams["duration"].c_str())*1000;
targetParams["recstop"] = JSON::Value((int64_t)(seekPos + endRec)).asString();
@ -1254,6 +1272,34 @@ namespace Mist{
Util::wait(millis);
}
/// Called right before sendNext(). Should return true if this is a stopping point.
bool Output::reachedPlannedStop(){
//If we're recording to file and reached the target position, stop
if (isRecordingToFile && targetParams.count("recstop") && atoll(targetParams["recstop"].c_str()) <= lastPacketTime){
INFO_MSG("End of planned recording reached");
return true;
}
//Regardless of playback method, if we've reached the wanted stop point, stop
if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) <= lastPacketTime){
INFO_MSG("End of planned playback reached");
return true;
}
//check if we need to split here
if (inlineRestartCapable() && targetParams.count("split")){
//Make sure that inlineRestartCapable outputs with splitting enabled only stop right before keyframes
//This works because this function is executed right BEFORE sendNext(), causing thisPacket to be the next packet
//in the newly splitted file.
if (!thisPacket.getFlag("keyframe")){return false;}
//is this a split point?
if (targetParams.count("nxt-split") && atoll(targetParams["nxt-split"].c_str()) <= lastPacketTime){
INFO_MSG("Split point reached");
return true;
}
}
//Otherwise, we're not stopping
return false;
}
/// \triggers
/// The `"CONN_OPEN"` trigger is stream-specific, and is ran when a connection is made or passed to a new handler. Its payload is:
/// ~~~~~~~~~~~~~~~
@ -1374,15 +1420,30 @@ namespace Mist{
}
}
if (isRecordingToFile && targetParams.count("recstop") && atoll(targetParams["recstop"].c_str()) < lastPacketTime){
INFO_MSG("End of planned recording reached, shutting down");
if (!onFinish()){
break;
}
}else if (targetParams.count("stop") && atoll(targetParams["stop"].c_str()) < lastPacketTime){
INFO_MSG("End of planned playback reached, shutting down");
if (!onFinish()){
break;
if (reachedPlannedStop()){
const char * origTarget = getenv("MST_ORIG_TARGET");
targetParams.erase("nxt-split");
if (inlineRestartCapable() && origTarget && !reachedPlannedStop()){
std::string newTarget = origTarget;
Util::streamVariables(newTarget, streamName);
if (newTarget.rfind('?') != std::string::npos){
newTarget.erase(newTarget.rfind('?'));
}
INFO_MSG("Switching to next push target filename: %s", newTarget.c_str());
if (!connectToFile(newTarget)){
FAIL_MSG("Failed to open file, aborting: %s", newTarget.c_str());
onFinish();
break;
}
uint64_t endRec = lastPacketTime + atoll(targetParams["split"].c_str())*1000;
targetParams["nxt-split"] = JSON::Value(endRec).asString();
sentHeader = false;
sendHeader();
}else{
if (!onFinish()){
INFO_MSG("Shutting down because planned stopping point reached");
break;
}
}
}
sendNext();
@ -1815,7 +1876,6 @@ namespace Mist{
}
close(outFile);
isRecordingToFile = true;
sought = false;
return true;
}

View file

@ -61,6 +61,7 @@ namespace Mist {
static bool listenMode(){return true;}
uint32_t currTrackCount() const;
virtual bool isReadyForPlay();
virtual bool reachedPlannedStop();
//virtuals. The optional virtuals have default implementations that do as little as possible.
/// This function is called whenever a packet is ready for sending.
/// Inside it, thisPacket is guaranteed to contain a valid packet.
@ -102,6 +103,7 @@ namespace Mist {
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
protected://these are to be messed with by child classes
virtual bool inlineRestartCapable() const{return false;}///< True if the output is capable of restarting mid-stream. This is used for swapping recording files
bool pushing;
std::map<std::string, std::string> targetParams; /*LTS*/
std::string UA; ///< User Agent string, if known.
@ -118,6 +120,7 @@ namespace Mist {
bool isBlocking;///< If true, indicates that myConn is blocking.
uint32_t crc;///< Checksum, if any, for usage in the stats.
unsigned int getKeyForTime(long unsigned int trackId, long long timeStamp);
uint64_t nextKeyTime();
//stream delaying variables
unsigned int maxSkipAhead;///< Maximum ms that we will go ahead of the intended timestamps.

View file

@ -153,6 +153,10 @@ namespace Mist{
}else{
//In live, clusters are aligned with the lookAhead time
newClusterTime = currentClusterTime+(needsLookAhead?needsLookAhead:1);
//EXCEPT if there's a keyframe within the lookAhead window, then align to that keyframe instead
//This makes sure that inlineRestartCapable works as intended
uint64_t nxtKTime = nextKeyTime();
if (nxtKTime && nxtKTime < newClusterTime){newClusterTime = nxtKTime;}
}
EBML::sendElemHead(myConn, EBML::EID_CLUSTER, clusterSize(currentClusterTime, newClusterTime));
EBML::sendElemUInt(myConn, EBML::EID_TIMECODE, currentClusterTime);

View file

@ -10,7 +10,8 @@ namespace Mist{
void sendNext();
virtual void sendHeader();
uint32_t clusterSize(uint64_t start, uint64_t end);
protected:
virtual bool inlineRestartCapable() const{return true;}
private:
bool isRecording();
std::string doctype;

View file

@ -10,6 +10,7 @@ namespace Mist {
void sendNext();
void sendHeader();
private:
virtual bool inlineRestartCapable() const{return true;}
FLV::Tag tag;
bool isRecording();
bool isFileTarget(){return isRecording();}

View file

@ -17,13 +17,18 @@ namespace Mist {
virtual void sendNext();
virtual void sendTS(const char * tsData, unsigned int len=188){};
void fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg);
virtual void sendHeader(){
sentHeader = true;
packCounter = 0;
}
protected:
virtual bool inlineRestartCapable() const{return true;}
std::map<unsigned int, bool> first;
std::map<unsigned int, int> contCounters;
int contPAT;
int contPMT;
int contSDT;
unsigned int packCounter; ///\todo update constructors?
unsigned int packCounter;
TS::Packet packData;
bool appleCompat;
uint64_t sendRepeatingHeaders; ///< Amount of ms between PAT/PMT. Zero means do not repeat.

View file

@ -9,6 +9,8 @@ namespace Mist {
void onHTTP();
void sendNext();
void sendHeader();
protected:
virtual bool inlineRestartCapable() const{return true;}
private:
bool isRecording();
bool isFileTarget(){return isRecording();}