From 200e1e4a1ce3ffbe91b58be04940870e4930b737 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Thu, 8 Jul 2021 14:02:50 +0200
Subject: [PATCH] WS/MP4 reliability edits: - Stream selecting now obeys new
 "maxdelay" capa entry - Output::liveSeek now takes an optional bool argument
 to indicate only rate control should be applied (no seeking) -
 dataWaitTimeout is now configurable per-output, defaults to the old 25s -
 WS/MP4 uses the new liveSeek with rate-control only - WS/MP4 uses the new
 dataWaitTimeout and sets it to 4.5s - WS/MP4 uses the new maxdelay capa, sets
 it to 5s - WS/MP4 will now auto-reselect tracks if a track is dropped for
 data wait timeout or disappeared from metadata reasons - Added support for
 jitter information in WS/MP4 protocol - Robustify sendWebsocketCodecData
 being ran when sendHeader is ran - Fix race condition when switching video
 tracks before previous video track has sent a single frame

---
 lib/dtsc.cpp              |   2 +-
 lib/stream.cpp            |  19 ++++-
 src/output/output.cpp     |   7 +-
 src/output/output.h       |   3 +-
 src/output/output_mp4.cpp | 153 +++++++++++++++++++++-----------------
 src/output/output_mp4.h   |   2 +
 6 files changed, 113 insertions(+), 73 deletions(-)

diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp
index acfd9c0f..133038a6 100644
--- a/lib/dtsc.cpp
+++ b/lib/dtsc.cpp
@@ -2294,7 +2294,7 @@ namespace DTSC{
             MEDIUM_MSG("Jitter lowered from %" PRIu64 " to %" PRIu64 " ms", maxJitter, curJitter);
             maxJitter = curJitter;
           }
-          curJitter = maxJitter*0.75;
+          curJitter = maxJitter*0.90;
         }
         ++x;
         trueTime[x % 8] = curMs;
diff --git a/lib/stream.cpp b/lib/stream.cpp
index d7861ad6..1458a71c 100644
--- a/lib/stream.cpp
+++ b/lib/stream.cpp
@@ -1139,7 +1139,7 @@ std::set<size_t> Util::wouldSelect(const DTSC::Meta &M, const std::string &track
 std::set<size_t> Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value &capa,
                                           const std::string &type, const std::string &UA){
   std::set<size_t> validTracks = M.getValidTracks();
-
+  uint64_t maxLastMs = 0;
   std::set<size_t> toRemove;
   for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
     // Remove unrequested tracks
@@ -1208,10 +1208,27 @@ std::set<size_t> Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value
         toRemove.insert(*it);
       }
     }
+    //not removing this track? Keep track of highest lastMs
+    if (capa.isMember("maxdelay")){
+      uint64_t lMs = M.getLastms(*it);
+      if (lMs > maxLastMs){maxLastMs = lMs;}
+    }
   }
   for (std::set<size_t>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
     validTracks.erase(*it);
   }
+  //if there is a max delay configured, remove tracks that are further behind than this
+  if (capa.isMember("maxdelay")){
+    uint64_t maxDelay = capa["maxdelay"].asInt();
+    if (maxDelay > maxLastMs){maxDelay = maxLastMs;}
+    toRemove.clear();
+    for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
+      if (M.getLastms(*it) < maxLastMs - maxDelay){toRemove.insert(*it);}
+    }
+    for (std::set<size_t>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
+      validTracks.erase(*it);
+    }
+  }
   return validTracks;
 }
 
diff --git a/src/output/output.cpp b/src/output/output.cpp
index 28894376..2982f605 100644
--- a/src/output/output.cpp
+++ b/src/output/output.cpp
@@ -86,6 +86,7 @@ namespace Mist{
   }
 
   Output::Output(Socket::Connection &conn) : myConn(conn){
+    dataWaitTimeout = 2500;
     pushing = false;
     recursingSync = false;
     firstTime = 0;
@@ -1097,7 +1098,7 @@ namespace Mist{
   /// This function attempts to forward playback in live streams to a more live point.
   /// It seeks to the last sync'ed keyframe of the main track, no closer than needsLookAhead+minKeepAway ms from the end.
   /// Aborts if not live, there is no main track or it has no keyframes.
-  bool Output::liveSeek(){
+  bool Output::liveSeek(bool rateOnly){
     if (!realTime){return false;}//Makes no sense when playing in turbo mode
     uint64_t seekPos = 0;
     if (!meta.getLive()){return false;}
@@ -1113,7 +1114,7 @@ namespace Mist{
       if (lMs - mKa - needsLookAhead - extraKeepAway > cTime + 50){
         // We need to speed up!
         uint64_t diff = (lMs - mKa - needsLookAhead - extraKeepAway) - cTime;
-        if (diff > 3000){
+        if (!rateOnly && diff > 3000){
           noReturn = true;
           newSpeed = 1000;
         }else if (diff > 1000){
@@ -1687,7 +1688,7 @@ namespace Mist{
           //Okay, there's no next page yet, and no next packet on this page either.
           //That means we're waiting for data to show up, somewhere.
           // after ~25 seconds, give up and drop the track.
-          if (++emptyCount >= 2500){
+          if (++emptyCount >= dataWaitTimeout){
             dropTrack(nxt.tid, "EOP: data wait timeout");
             return false;
           }
diff --git a/src/output/output.h b/src/output/output.h
index 5ec379f8..84d17482 100644
--- a/src/output/output.h
+++ b/src/output/output.h
@@ -62,7 +62,7 @@ namespace Mist{
     static void listener(Util::Config &conf, int (*callback)(Socket::Connection &S));
     virtual void initialSeek();
     uint64_t getMinKeepAway();
-    virtual bool liveSeek();
+    virtual bool liveSeek(bool rateOnly = false);
     virtual bool onFinish(){return false;}
     void reconnect();
     void disconnect();
@@ -115,6 +115,7 @@ namespace Mist{
     uint64_t uaDelay;                                ///< Seconds to wait before setting the UA.
     uint64_t lastRecv;
     uint64_t extraKeepAway;
+    uint64_t dataWaitTimeout; ///< How long to wait for new packets before dropping a track, in tens of milliseconds.
     uint64_t firstTime; ///< Time of first packet after last seek. Used for real-time sending.
     virtual std::string getConnectedHost();
     virtual std::string getConnectedBinHost();
diff --git a/src/output/output_mp4.cpp b/src/output/output_mp4.cpp
index d1f9ebbe..c7a7a078 100644
--- a/src/output/output_mp4.cpp
+++ b/src/output/output_mp4.cpp
@@ -1243,7 +1243,6 @@ namespace Mist{
         dropTrack(prevVidTrack, "Smoothly switching to new video track", false);
         prevVidTrack = INVALID_TRACK_ID;
         onIdle();
-        sendWebsocketCodecData("tracks");
         sendHeader();
 
 /*
@@ -1281,7 +1280,7 @@ namespace Mist{
       webBuf.append(dataPointer, len);
       webSock->sendFrame(webBuf, webBuf.size(), 2);
 
-      if (stayLive && thisPacket.getFlag("keyframe")){liveSeek();}
+      if (stayLive && thisPacket.getFlag("keyframe")){liveSeek(true);}
       // We must return here, the rest of this function won't work for websockets. 
       return;
     }
@@ -1395,7 +1394,11 @@ namespace Mist{
   void OutMP4::sendHeader(){
 
     if (webSock) {
-
+      if (!sentHeader){
+        sendWebsocketCodecData("codec_data");
+      }else{
+        sendWebsocketCodecData("tracks");
+      }
       JSON::Value r;
       r["type"] = "info";
       r["data"]["msg"] = "Sending header";
@@ -1445,9 +1448,11 @@ namespace Mist{
 
   void OutMP4::onWebsocketConnect() {
     capa["name"] = "MP4/WS";
+    capa["maxdelay"] = 5000;
     fragSeqNum = 0;
     idleInterval = 1000;
     maxSkipAhead = 0;
+    dataWaitTimeout = 450;
   }
 
   void OutMP4::onWebsocketFrame() {
@@ -1482,7 +1487,7 @@ namespace Mist{
         }
       }
       selectDefaultTracks();
-      sendWebsocketCodecData("codec_data");
+      sendHeader();
     }else if (command["type"] == "seek") {
       handleWebsocketSeek(command);
     }else if (command["type"] == "pause") {
@@ -1513,71 +1518,11 @@ namespace Mist{
           targetParams.erase("video");
         }
       }
-      // Remember the previous video track, if any.
-      std::set<size_t> prevSelTracks;
-      prevVidTrack = INVALID_TRACK_ID;
-      for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
-        prevSelTracks.insert(it->first);
-        if (M.getType(it->first) == "video"){
-          prevVidTrack = it->first;
-        }
-      }
-      if (selectDefaultTracks()) {
-        uint64_t seekTarget = currentTime();
-        if (command.isMember("seek_time")){
-          seekTarget = command["seek_time"].asInt();
-          prevVidTrack = INVALID_TRACK_ID;
-        }
-        // Add the previous video track back, if we had one.
-        if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack)){
-          userSelect[prevVidTrack].reload(streamName, prevVidTrack);
-          seek(seekTarget);
-          std::set<size_t> newSelTracks;
-          newSelTracks.insert(prevVidTrack);
-          for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
-            if (M.getType(it->first) != "video"){
-              newSelTracks.insert(it->first);
-            }
-          }
-          if (prevSelTracks != newSelTracks){
-            seek(seekTarget, true);
-            realTime = 0;
-            forwardTo = seekTarget;
-            sendWebsocketCodecData(command["type"]);
-            sendHeader();
-            JSON::Value r;
-            r["type"] = "set_speed";
-            if (target_rate == 0.0){
-              r["data"]["play_rate_prev"] = "auto";
-            }else{
-              r["data"]["play_rate_prev"] = target_rate;
-            }
-            r["data"]["play_rate_curr"] = "fast-forward";
-            webSock->sendFrame(r.toString());
-          }
-        }else{
-          prevVidTrack = INVALID_TRACK_ID;
-          seek(seekTarget, true);
-          realTime = 0;
-          forwardTo = seekTarget;
-          sendWebsocketCodecData(command["type"]);
-          sendHeader();
-          JSON::Value r;
-          r["type"] = "set_speed";
-          if (target_rate == 0.0){
-            r["data"]["play_rate_prev"] = "auto";
-          }else{
-            r["data"]["play_rate_prev"] = target_rate;
-          }
-          r["data"]["play_rate_curr"] = "fast-forward";
-          webSock->sendFrame(r.toString());
-        }
-        onIdle();
-        return;
+      if (command.isMember("seek_time")){
+        possiblyReselectTracks(command["seek_time"].asInt());
       }else{
-        prevVidTrack = INVALID_TRACK_ID;
+        possiblyReselectTracks(currentTime());
       }
-      onIdle();
       return;
     }else if (command["type"] == "set_speed") {
       handleWebsocketSetSpeed(command);
@@ -1590,6 +1535,68 @@ namespace Mist{
     }
   }
 
+  bool OutMP4::possiblyReselectTracks(uint64_t seekTarget){
+    // Remember the previous video track, if any.
+    std::set<size_t> prevSelTracks;
+    prevVidTrack = INVALID_TRACK_ID;
+    for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
+      prevSelTracks.insert(it->first);
+      if (M.getType(it->first) == "video"){
+        prevVidTrack = it->first;
+      }
+    }
+    if (!selectDefaultTracks()) {
+      prevVidTrack = INVALID_TRACK_ID;
+      onIdle();
+      return false;
+    }
+    if (seekTarget != currentTime()){prevVidTrack = INVALID_TRACK_ID;}
+    // Add the previous video track back, if we had one.
+    if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack)){
+      userSelect[prevVidTrack].reload(streamName, prevVidTrack);
+      seek(seekTarget);
+      std::set<size_t> newSelTracks;
+      newSelTracks.insert(prevVidTrack);
+      for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
+        if (M.getType(it->first) != "video"){
+          newSelTracks.insert(it->first);
+        }
+      }
+      if (prevSelTracks != newSelTracks){
+        seek(seekTarget, true);
+        realTime = 0;
+        forwardTo = seekTarget;
+        sendHeader();
+        JSON::Value r;
+        r["type"] = "set_speed";
+        if (target_rate == 0.0){
+          r["data"]["play_rate_prev"] = "auto";
+        }else{
+          r["data"]["play_rate_prev"] = target_rate;
+        }
+        r["data"]["play_rate_curr"] = "fast-forward";
+        webSock->sendFrame(r.toString());
+      }
+    }else{
+      prevVidTrack = INVALID_TRACK_ID;
+      seek(seekTarget, true);
+      realTime = 0;
+      forwardTo = seekTarget;
+      sendHeader();
+      JSON::Value r;
+      r["type"] = "set_speed";
+      if (target_rate == 0.0){
+        r["data"]["play_rate_prev"] = "auto";
+      }else{
+        r["data"]["play_rate_prev"] = target_rate;
+      }
+      r["data"]["play_rate_curr"] = "fast-forward";
+      webSock->sendFrame(r.toString());
+    }
+    onIdle();
+    return true;
+  }
+
   void OutMP4::sendWebsocketCodecData(const std::string& type) {
     JSON::Value r;
     r["type"] = type;
@@ -1717,9 +1724,14 @@ namespace Mist{
         r["data"]["play_rate_curr"] = target_rate;
       }
     }
+    uint64_t jitter = 0;
     for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
       r["data"]["tracks"].append(it->first);
+      if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);}
     }
+    r["data"]["jitter"] = jitter;
+    if (dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;}
+    if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;}
     webSock->sendFrame(r.toString());
   }
 
@@ -1739,5 +1751,12 @@ namespace Mist{
     return false;
   }
 
+  void OutMP4::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){
+    if (webSock && (reason == "EOP: data wait timeout" || reason == "disappeared from metadata") && possiblyReselectTracks(currentTime())){
+      return;
+    }
+    return Output::dropTrack(trackId, reason, probablyBad);
+  }
+
 }// namespace Mist
 
diff --git a/src/output/output_mp4.h b/src/output/output_mp4.h
index 7d137f73..05e1c927 100644
--- a/src/output/output_mp4.h
+++ b/src/output/output_mp4.h
@@ -108,7 +108,9 @@ namespace Mist{
     void onWebsocketFrame();
     void onIdle();
     virtual bool onFinish();
+    virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true);
   protected:
+    bool possiblyReselectTracks(uint64_t seekTarget);
     void sendWebsocketCodecData(const std::string& type);
     bool handleWebsocketSeek(JSON::Value& command);
     bool handleWebsocketSetSpeed(JSON::Value& command);