diff --git a/src/input/input.cpp b/src/input/input.cpp index 0cca27e1..782c1769 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -220,6 +220,10 @@ namespace Mist{ srtTrack = 0; lastBufferCheck = 0; bufferPid = 0; + internalOnly = false; + isBuffer = false; + startTime = Util::bootSecs(); + lastStats = 0; } void Input::checkHeaderTimes(std::string streamFile){ @@ -705,8 +709,7 @@ namespace Mist{ /// ~~~~~~~~~~~~~~~ void Input::serve(){ users.reload(streamName, true); - Comms::Connections statComm; - uint64_t startTime = Util::bootSecs(); + startTime = Util::bootSecs(); if (!M){ // Initialize meta page @@ -719,8 +722,8 @@ namespace Mist{ } meta.setSource(config->getString("input")); - bool internalOnly = (config->getString("input").find("INTERNAL_ONLY") != std::string::npos); - bool isBuffer = (capa["name"].asStringRef() == "Buffer"); + internalOnly = (config->getString("input").find("INTERNAL_ONLY") != std::string::npos); + isBuffer = (capa["name"].asStringRef() == "Buffer"); /*LTS-START*/ if (Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){ @@ -752,18 +755,8 @@ namespace Mist{ }else{ if (connectedUsers && M.getValidTracks().size()){activityCounter = Util::bootSecs();} } - // Connect to stats for INPUT detection - if (!internalOnly && !isBuffer){ - if (!statComm){statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "");} - if (statComm){ - uint64_t now = Util::bootSecs(); - statComm.setNow(now); - statComm.setStream(streamName); - statComm.setTime(now - startTime); - statComm.setLastSecond(0); - connStats(statComm); - } - } + + inputServeStats(); // if not shutting down, wait 1 second before looping preMs = Util::bootMS() - preMs; uint64_t waitMs = INPUT_USER_INTERVAL; @@ -784,6 +777,23 @@ namespace Mist{ } } + void Input::inputServeStats(){ + uint64_t now = Util::bootSecs(); + if (now != lastStats){ + if (!internalOnly && !isBuffer){ + if (!statComm){statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "");} + if (statComm){ + statComm.setNow(now); + statComm.setStream(streamName); + statComm.setTime(now - startTime); + statComm.setLastSecond(0); + connStats(statComm); + } + } + lastStats = now; + } + } + /// This function checks if an input in serve mode should keep running or not. /// The default implementation checks for interruption by signals and otherwise waits until a /// save amount of time has passed before shutting down. @@ -1424,6 +1434,8 @@ namespace Mist{ uint64_t keyTime = keys.getTime(keyNum); + inputServeStats(); + bool isSrt = (hasSrt && idx == srtTrack); if (isSrt){ srtSource.clear(); @@ -1523,6 +1535,7 @@ namespace Mist{ byteCounter += thisPacket.getDataLen(); lastBuffered = thisTime; } + inputServeStats(); getNext(sourceIdx); } //Sanity check: are we matching the key's data size? diff --git a/src/input/input.h b/src/input/input.h index 4baf208f..6b2b56ad 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -51,6 +51,12 @@ namespace Mist{ virtual bool publishesTracks(){return true;} protected: + bool internalOnly; + bool isBuffer; + Comms::Connections statComm; + uint64_t startTime; + uint64_t lastStats; + virtual bool checkArguments() = 0; virtual bool readHeader(); virtual bool needHeader(){return !readExistingHeader();} @@ -71,6 +77,7 @@ namespace Mist{ virtual void removeUnused(); virtual void convert(); virtual void serve(); + virtual void inputServeStats(); virtual void stream(); virtual std::string getConnectedBinHost(){return std::string("\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001", 16);} virtual size_t streamByteCount(){