From 776cfe1850b844210de0727e63ec0e5d35b714b9 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Thu, 11 Oct 2018 17:51:35 +0200
Subject: [PATCH] Backported various Pro edition changes and general code to
 Free edition

---
 src/input/input.cpp                   | 15 +++---
 src/input/input_buffer.cpp            | 13 +++++
 src/output/output.cpp                 | 48 +++++++++++++++---
 src/output/output.h                   |  1 +
 src/output/output_progressive_mp4.cpp | 72 +++++++++++++++++++--------
 src/output/output_progressive_mp4.h   |  1 +
 src/output/output_rtmp.cpp            | 64 +++---------------------
 7 files changed, 123 insertions(+), 91 deletions(-)

diff --git a/src/input/input.cpp b/src/input/input.cpp
index c44f22d8..5d50cb7f 100644
--- a/src/input/input.cpp
+++ b/src/input/input.cpp
@@ -263,6 +263,7 @@ namespace Mist {
   
   ///Checks in the server configuration if this stream is set to always on or not.
   /// Returns true if it is, or if the stream could not be found in the configuration.
+  /// If the compiled default debug level is < INFO, instead returns false if the stream is not found.
   bool Input::isAlwaysOn(){
     bool ret = true;
     std::string strName = streamName.substr(0, (streamName.find_first_of("+ ")));
@@ -274,6 +275,10 @@ namespace Mist {
       if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){
         ret = false;
       }
+    }else{
+#if DEBUG < DLVL_DEVEL
+      ret = false;
+#endif
     }
     configLock.post();
     return ret;
@@ -331,6 +336,10 @@ namespace Mist {
     // - INPUT_TIMEOUT seconds haven't passed yet,
     // - this is a live stream and at least two of the biggest fragment haven't passed yet,
     bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)));
+    if (!ret && config->is_active && isAlwaysOn()){
+      ret = true;
+      activityCounter = Util::bootSecs();
+    }
     return ret;
   }
 
@@ -473,12 +482,6 @@ namespace Mist {
           if (!it2->second){
             bufferRemove(it->first, it2->first);
             pageCounter[it->first].erase(it2->first);
-            for (int i = 0; i < 8192; i += 8){
-              unsigned int thisKeyNum = ntohl(((((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
-              if (thisKeyNum == it2->first){
-                (((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) = 0;
-              }
-            }
             change = true;
             break;
           }
diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp
index 95c59f59..027da501 100644
--- a/src/input/input_buffer.cpp
+++ b/src/input/input_buffer.cpp
@@ -351,6 +351,19 @@ namespace Mist {
             bufferLocations[tid].erase(bufferLocations[tid].begin());
           }
           if (pushLocation.count(it->first)){
+            // \todo Debugger says this is null sometimes. It shouldn't be. Figure out why!
+            // For now, this if will prevent crashes in these cases.
+            if (pushLocation[it->first]){
+              //Reset the userpage, to allow repushing from TS
+              IPC::userConnection userConn(pushLocation[it->first]);
+              for (int i = 0; i < SIMUL_TRACKS; i++) {
+                if (userConn.getTrackId(i) == it->first) {
+                  userConn.setTrackId(i, 0);
+                  userConn.setKeynum(i, 0);
+                  break;
+                }
+              }
+            }
             pushLocation.erase(it->first);
           }
           nProxy.curPageNum.erase(it->first);
diff --git a/src/output/output.cpp b/src/output/output.cpp
index c260df6f..0bcc0bb0 100644
--- a/src/output/output.cpp
+++ b/src/output/output.cpp
@@ -74,6 +74,12 @@ namespace Mist{
       DEBUG_MSG(DLVL_WARN, "Warning: MistOut created with closed socket!");
     }
     sentHeader = false;
+    
+    //If we have a streamname option, set internal streamname to that option
+    if (!streamName.size() && config->hasOption("streamname")){
+      streamName = config->getString("streamname");
+    }
+
   }
 
   void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){
@@ -161,7 +167,9 @@ namespace Mist{
   }
  
   bool Output::isReadyForPlay(){
-    if (isPushing()){return true;}
+    static bool recursing = false;
+    if (isPushing() || recursing){return true;}
+    recursing = true;
     if (!isInitialized){initialize();}
     if (!myMeta.tracks.size()){updateMeta();}
     if (myMeta.tracks.size()){
@@ -170,6 +178,7 @@ namespace Mist{
       }
       unsigned int mainTrack = getMainSelectedTrack();
       if (mainTrack && myMeta.tracks.count(mainTrack) && (myMeta.tracks[mainTrack].keys.size() >= 2 || myMeta.tracks[mainTrack].lastms - myMeta.tracks[mainTrack].firstms > 5000)){
+        recursing = false;
         return true;
       }else{
         HIGH_MSG("NOT READY YET (%lu tracks, %lu = %lu keys)", myMeta.tracks.size(), getMainSelectedTrack(), myMeta.tracks[getMainSelectedTrack()].keys.size());
@@ -177,6 +186,7 @@ namespace Mist{
     }else{
       HIGH_MSG("NOT READY YET (%lu tracks)", myMeta.tracks.size());
     }
+    recursing = false;
     return false;
   }
 
@@ -572,11 +582,31 @@ namespace Mist{
     return start;
   }
 
-  ///Return the end time of the selected tracks, or 0 if unknown or live.
+  ///Return the end time of the selected tracks, or 0 if unknown.
   ///Returns the end time of latest track if nothing is selected.
   ///Returns zero if no tracks exist.
   uint64_t Output::endTime(){
-    if (myMeta.live){return 0;}
+    if (!myMeta.tracks.size()){return 0;}
+    uint64_t end = 0;
+    if (selectedTracks.size()){
+      for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
+        if (myMeta.tracks.count(*it)){
+          if (end < myMeta.tracks[*it].lastms){end = myMeta.tracks[*it].lastms;}
+        }
+      }
+    }else{
+      for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
+        if (end < it->second.lastms){end = it->second.lastms;}
+      }
+    }
+    return end;
+  }
+
+  ///Return the most live time stamp of the selected tracks, or 0 if unknown or non-live.
+  ///Returns the time stamp of the newest track if nothing is selected.
+  ///Returns zero if no tracks exist.
+  uint64_t Output::liveTime(){
+    if (!myMeta.live){return 0;}
     if (!myMeta.tracks.size()){return 0;}
     uint64_t end = 0;
     if (selectedTracks.size()){
@@ -944,6 +974,10 @@ namespace Mist{
         dropTrack(nxt.tid, "timeless empty packet");
         return false;
       }
+      //for VoD, check if we've reached the end of the track, if so, drop it
+      if (myMeta.vod && nxt.time > myMeta.tracks[nxt.tid].lastms){
+        dropTrack(nxt.tid, "Reached end of track");
+      }
       //if this is a live stream, we might have just reached the live point.
       //check where the next key is
       nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time);
@@ -954,8 +988,8 @@ namespace Mist{
         if (++emptyCount < 100){
           Util::wait(250);
           //we're waiting for new data to show up
-          if (emptyCount % 8 == 0){
-            reconnect();//reconnect every 2 seconds
+          if (emptyCount % 64 == 0){
+            reconnect();//reconnect every 16 seconds
           }else{
             //updating meta is only useful with live streams
             if (myMeta.live && emptyCount % 4 == 0){
@@ -1097,7 +1131,7 @@ namespace Mist{
     if (now == lastStats && !force){return;}
     lastStats = now;
 
-    EXTREME_MSG("Writing stats: %s, %s, %lu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu);
+    HIGH_MSG("Writing stats: %s, %s, %lu, %llu, %llu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown());
     if (statsPage.getData()){
       IPC::statExchange tmpEx(statsPage.getData());
       tmpEx.now(now);
@@ -1198,6 +1232,7 @@ namespace Mist{
       return false;
     }
     close(outFile);
+    sought = false;
     return true;
   }
 
@@ -1248,6 +1283,7 @@ namespace Mist{
       Util::wait(1000);
       streamStatus = Util::getStreamStatus(streamName);
       if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){
+        INFO_MSG("Reconnecting to %s buffer... (%u)", streamName.c_str(), streamStatus);
         reconnect();
         streamStatus = Util::getStreamStatus(streamName);
       }
diff --git a/src/output/output.h b/src/output/output.h
index d6daab76..009cc6d3 100644
--- a/src/output/output.h
+++ b/src/output/output.h
@@ -49,6 +49,7 @@ namespace Mist {
       uint64_t currentTime();
       uint64_t startTime();
       uint64_t endTime();
+      uint64_t liveTime();
       void setBlocking(bool blocking);
       void updateMeta();
       bool selectDefaultTracks();
diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp
index ed690583..fe61fbe6 100644
--- a/src/output/output_progressive_mp4.cpp
+++ b/src/output/output_progressive_mp4.cpp
@@ -82,7 +82,10 @@ namespace Mist {
         tmpRes += 16//SMHD Box
           + 16//STSD
           + 36//MP4A
-          + 37 + thisTrack.init.size();//ESDS
+          + 35;
+        if (thisTrack.init.size()){
+          tmpRes += 2 + thisTrack.init.size();//ESDS
+        }
       }
       
       //Unfortunately, for our STTS and CTTS boxes, we need to loop through all parts of the track
@@ -117,6 +120,7 @@ namespace Mist {
     }
     res += 8; //mdat beginning
     fileSize += res;
+    MEDIUM_MSG("H size %llu, file: %llu", res, fileSize);
     return res;
   }
 
@@ -148,6 +152,7 @@ namespace Mist {
     //Construct with duration of -1
     MP4::MVHD mvhdBox(-1);
     //Then override it to set the correct duration
+    uint64_t fms;
     uint64_t firstms = 0xFFFFFFFFFFFFFFull;
     uint64_t lastms = 0;
     for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
@@ -155,6 +160,7 @@ namespace Mist {
       firstms = std::min(firstms, (uint64_t)myMeta.tracks[*it].firstms);
     }
     mvhdBox.setDuration(lastms - firstms);
+    fms = firstms;
     //Set the trackid for the first "empty" track within the file.
     mvhdBox.setTrackID(selectedTracks.size() + 1);
     moovBox.setContent(mvhdBox, moovOffset++);
@@ -176,11 +182,25 @@ namespace Mist {
       MP4::ELST elstBox;
       elstBox.setVersion(0);
       elstBox.setFlags(0);
-      elstBox.setCount(1);
-      elstBox.setSegmentDuration(0, tDuration);
-      elstBox.setMediaTime(0, 0);
-      elstBox.setMediaRateInteger(0, 1);
-      elstBox.setMediaRateFraction(0, 0);
+      if (myMeta.vod && thisTrack.firstms != fms){
+        elstBox.setCount(2);
+
+        elstBox.setSegmentDuration(0, thisTrack.firstms - fms);
+        elstBox.setMediaTime(0, 0xFFFFFFFFull);
+        elstBox.setMediaRateInteger(0, 0);
+        elstBox.setMediaRateFraction(0, 0);
+
+        elstBox.setSegmentDuration(1, tDuration);
+        elstBox.setMediaTime(1, 0);
+        elstBox.setMediaRateInteger(1, 1);
+        elstBox.setMediaRateFraction(1, 0);
+      }else{
+        elstBox.setCount(1);
+        elstBox.setSegmentDuration(0, tDuration);
+        elstBox.setMediaTime(0, 0);
+        elstBox.setMediaRateInteger(0, 1);
+        elstBox.setMediaRateFraction(0, 0);
+      }
 
       edtsBox.setContent(elstBox, 0);
       trakBox.setContent(edtsBox, trakOffset++);
@@ -198,6 +218,9 @@ namespace Mist {
       MP4::MINF minfBox;
       size_t minfOffset = 0;
       
+      MP4::STBL stblBox;
+      unsigned int stblOffset = 0;
+
       //Add a track-type specific box to the MINF box
       if (thisTrack.type == "video") {
         MP4::VMHD vmhdBox;
@@ -214,10 +237,6 @@ namespace Mist {
       dinfBox.setContent(drefBox, 0);
       minfBox.setContent(dinfBox, minfOffset++);
 
-     
-      MP4::STBL stblBox;
-      size_t stblOffset = 0;
-
       //Add STSD box
       MP4::STSD stsdBox(0);
       if (thisTrack.type == "video") {
@@ -346,10 +365,12 @@ namespace Mist {
     //Current values are actual byte offset without header-sized offset
     std::set <keyPart> sortSet;//filling sortset for interleaving parts
     for (std::set<long unsigned int>::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) {
+      DTSC::Track & thisTrack = myMeta.tracks[*subIt];
       keyPart temp;
       temp.trackID = *subIt;
-      temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame
+      temp.time = thisTrack.firstms;//timeplace of frame
       temp.index = 0;
+      temp.size = thisTrack.parts[0].getDuration();
       HIGH_MSG("Header sortSet: tid %lu time %lu", temp.trackID, temp.time);
       sortSet.insert(temp);
     }
@@ -372,6 +393,7 @@ namespace Mist {
       if (temp.index + 1< thisTrack.parts.size()) {//Only create new element, when there are new elements to be added 
         temp.time += thisTrack.parts[temp.index].getDuration();
         ++temp.index;
+        temp.size = thisTrack.parts[temp.index].getSize();
         sortSet.insert(temp);
       }
     }
@@ -384,8 +406,9 @@ namespace Mist {
     if (mdatSize < 0xFFFFFFFF){
       Bit::htobl(mdatHeader, mdatSize);
     }
-    header << std::string(mdatHeader, 8);
+    header.write(mdatHeader, 8);
     size += header.str().size();
+    MEDIUM_MSG("Header %llu, file: %llu", header.str().size(), size);
     return header.str();
   }
   
@@ -425,6 +448,7 @@ namespace Mist {
       if (temp.index + 1 < myMeta.tracks[temp.trackID].parts.size()){ //only insert when there are parts left
         temp.time += thisTrack.parts[temp.index].getDuration();
         ++temp.index;
+        temp.size = thisTrack.parts[temp.index].getSize();
         sortSet.insert(temp);
       }
       //Remove just-parsed element
@@ -470,10 +494,12 @@ namespace Mist {
     currPos = 0;
     sortSet.clear();
     for (std::set<long unsigned int>::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) {
+      DTSC::Track & thisTrack = myMeta.tracks[*subIt];
       keyPart temp;
       temp.trackID = *subIt;
-      temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame
+      temp.time = thisTrack.firstms;//timeplace of frame
       temp.index = 0;
+      temp.size = thisTrack.parts[temp.index].getSize();
       sortSet.insert(temp);
     }
     if (H.GetHeader("Range") != ""){
@@ -517,12 +543,6 @@ namespace Mist {
       //HTTP_S.StartResponse(HTTP_R, conn);
     }
     leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data
-    if (byteStart < headerSize) {
-      std::string headerData = DTSCMeta2MP4Header(fileSize);
-      myConn.SendNow(headerData.data() + byteStart, std::min(headerSize, byteEnd) - byteStart); //send MP4 header
-      leftOver -= std::min(headerSize, byteEnd) - byteStart;
-    }
-    currPos += headerSize;//we're now guaranteed to be past the header point, no matter what
   }
   
   void OutProgressiveMP4::sendNext() {
@@ -534,8 +554,7 @@ namespace Mist {
     thisPacket.getString("data", dataPointer, len);
 
     keyPart thisPart = *sortSet.begin();
-    uint64_t thisSize = myMeta.tracks[thisPart.trackID].parts[thisPart.index].getSize();
-    if ((unsigned long)thisPacket.getTrackId() != thisPart.trackID || thisPacket.getTime() != thisPart.time || len != thisSize){
+    if ((unsigned long)thisPacket.getTrackId() != thisPart.trackID || thisPacket.getTime() != thisPart.time || len != thisPart.size){
       if (thisPacket.getTime() > sortSet.begin()->time || thisPacket.getTrackId() > sortSet.begin()->trackID) {
         if (perfect) {
           WARN_MSG("Warning: input is inconsistent. Expected %lu:%lu but got %ld:%llu - cancelling playback", thisPart.trackID, thisPart.time, thisPacket.getTrackId(), thisPacket.getTime());
@@ -543,7 +562,7 @@ namespace Mist {
           myConn.close();
         }
       } else {
-        WARN_MSG("Did not receive expected %lu:%lu (%lub) but got %ld:%llu (%ub) - throwing it away", thisPart.trackID, thisPart.time, thisSize, thisPacket.getTrackId(), thisPacket.getTime(), len);
+        WARN_MSG("Did not receive expected %lu:%lu (%lub) but got %ld:%llu (%ub) - throwing it away", thisPart.trackID, thisPart.time, thisPart.size, thisPacket.getTrackId(), thisPacket.getTime(), len);
       }
       return;
     }
@@ -571,6 +590,7 @@ namespace Mist {
       if (temp.index + 1 < thisTrack.parts.size()) { //only insert when there are parts left
         temp.time += thisTrack.parts[temp.index].getDuration();
         ++temp.index;
+        temp.size = thisTrack.parts[temp.index].getSize();
         sortSet.insert(temp);
       }
 
@@ -584,6 +604,14 @@ namespace Mist {
   }
 
   void OutProgressiveMP4::sendHeader(){
+    //Send the header data
+    uint64_t headerSize = mp4HeaderSize(fileSize);
+    if (byteStart < headerSize){
+      std::string headerData = DTSCMeta2MP4Header(fileSize);
+      myConn.SendNow(headerData.data() + byteStart, std::min(headerSize, byteEnd) - byteStart); //send MP4 header
+      leftOver -= std::min(headerSize, byteEnd) - byteStart;
+    }
+    currPos += headerSize;//we're now guaranteed to be past the header point, no matter what
     seek(seekPoint);
     sentHeader = true;
   }
diff --git a/src/output/output_progressive_mp4.h b/src/output/output_progressive_mp4.h
index 99f66ac2..fe49368a 100644
--- a/src/output/output_progressive_mp4.h
+++ b/src/output/output_progressive_mp4.h
@@ -20,6 +20,7 @@ namespace Mist {
       uint64_t time;
       uint64_t byteOffset;//Stores relative bpos for fragmented MP4
       uint64_t index;
+      uint32_t size;
   };
   
   class OutProgressiveMP4 : public HTTPOutput {
diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp
index 2cec1183..84ca7251 100644
--- a/src/output/output_rtmp.cpp
+++ b/src/output/output_rtmp.cpp
@@ -10,6 +10,9 @@
 
 namespace Mist {
   OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
+    lastOutTime = 0;
+    rtmpOffset = 0;
+    bootMsOffset = 0;
     setBlocking(true);
     while (!conn.Received().available(1537) && conn.connected() && config->is_active) {
       conn.spool();
@@ -67,60 +70,6 @@ namespace Mist {
     return false;
   }
 
-  void OutRTMP::parseVars(std::string data){
-    std::string varname;
-    std::string varval;
-    bool trackSwitch = false;
-    // position where a part start (e.g. after &)
-    size_t pos = 0;
-    while (pos < data.length()){
-      size_t nextpos = data.find('&', pos);
-      if (nextpos == std::string::npos){
-        nextpos = data.length();
-      }
-      size_t eq_pos = data.find('=', pos);
-      if (eq_pos < nextpos){
-        // there is a key and value
-        varname = data.substr(pos, eq_pos - pos);
-        varval = data.substr(eq_pos + 1, nextpos - eq_pos - 1);
-      }else{
-        // no value, only a key
-        varname = data.substr(pos, nextpos - pos);
-        varval.clear();
-      }
-
-      if (varname == "track" || varname == "audio" || varname == "video"){
-        long long int selTrack = JSON::Value(varval).asInt();
-        if (myMeta){
-          if (myMeta.tracks.count(selTrack)){
-            std::string & delThis = myMeta.tracks[selTrack].type;
-            for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
-              if (myMeta.tracks[*it].type == delThis){
-                selectedTracks.erase(it);
-                trackSwitch = true;
-                break;
-              }
-            }
-            selectedTracks.insert(selTrack);
-          }
-        }else{
-          selectedTracks.insert(selTrack);
-        }
-      }
-
-      if (nextpos == std::string::npos){
-        // in case the string is gigantic
-        break;
-      }
-      // erase &
-      pos = nextpos + 1;
-    }
-    if (trackSwitch && thisPacket){
-      seek(thisPacket.getTime());
-    }
-  }
-
-
   void OutRTMP::init(Util::Config * cfg){
     Output::init(cfg);
     capa["name"] = "RTMP";
@@ -275,9 +224,9 @@ namespace Mist {
     
     unsigned int timestamp = thisPacket.getTime() - rtmpOffset;
     //make sure we don't go negative
-    if (rtmpOffset > thisPacket.getTime()){
+    if (rtmpOffset > (int64_t)thisPacket.getTime()){
       timestamp = 0;
-      rtmpOffset = thisPacket.getTime();
+      rtmpOffset = (int64_t)thisPacket.getTime();
     }
     
     bool allow_short = RTMPStream::lastsend.count(4);
@@ -638,7 +587,8 @@ namespace Mist {
       if (streamName.find('?') != std::string::npos){
         std::string tmpVars = streamName.substr(streamName.find('?') + 1);
         streamName = streamName.substr(0, streamName.find('?'));
-        parseVars(tmpVars);
+        std::map<std::string, std::string> targetParams;
+        HTTP::parseVars(tmpVars, targetParams);
       }
       
       size_t colonPos = streamName.find(':');