From 43934cf69c17d9b2d74e5e1253def3d7be243fe5 Mon Sep 17 00:00:00 2001
From: Thulinma <jaron@vietors.com>
Date: Wed, 4 Jul 2018 11:30:38 +0200
Subject: [PATCH] Added bootMsOffset to DTSC::Meta, RTMP now syncs on it.

---
 lib/dtsc.h                 |  3 ++-
 lib/dtscmeta.cpp           | 35 +++++++++++++++++++++++++++++++++--
 src/output/output.cpp      |  3 +++
 src/output/output_rtmp.cpp | 12 +++++++++++-
 src/output/output_rtmp.h   |  4 +++-
 5 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/lib/dtsc.h b/lib/dtsc.h
index 428fa3c2..472a34c4 100644
--- a/lib/dtsc.h
+++ b/lib/dtsc.h
@@ -118,7 +118,7 @@ namespace DTSC {
       packType getVersion() const;
       void reInit(Socket::Connection & src);
       void reInit(const char * data_, unsigned int len, bool noCopy = false);
-      void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe);
+      void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset = 0);
       void getString(const char * identifier, char *& result, unsigned int & len) const;
       void getString(const char * identifier, std::string & result) const;
       void getInt(const char * identifier, uint64_t & result) const;
@@ -348,6 +348,7 @@ namespace DTSC {
       uint16_t version;
       long long int moreheader;
       long long int bufferWindow;
+      int64_t bootMsOffset;///< Millis to add to packet timestamps to get millis since system boot.
       std::string sourceURI;
   };
 
diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp
index a1833849..d6be464d 100644
--- a/lib/dtscmeta.cpp
+++ b/lib/dtscmeta.cpp
@@ -209,7 +209,7 @@ namespace DTSC {
   /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields.
   /// When given a NULL pointer, the data is reserved and memset to 0
   /// If given a NULL pointer and a zero size, an empty packet is created.
-  void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe){
+  void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset){
     null();
     master = true;
     //time and trackID are part of the 20-byte header.
@@ -217,12 +217,13 @@ namespace DTSC {
     //offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen)
     //bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen)
     //keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen)
+    //bootmsoffset, if != 0, adds 9 bytes (integer type) and 5 bytes (2+namelen)
     //data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen)
     if (packData && packDataSize < 1){
       FAIL_MSG("Attempted to fill a packet with %lli bytes!", packDataSize);
       return;
     }
-    unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + packDataSize+11;
+    unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + (bootMsOffset?14:0) + packDataSize+11;
     resize(sendLen);
     //set internal variables
     version = DTSC_V2;
@@ -259,6 +260,14 @@ namespace DTSC {
       memcpy(data+offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19);
       offset += 19;
     }
+    if (bootMsOffset){
+      memcpy(data+offset, "\000\003bmo\001", 6);
+      tmpLong = htonl((int)(bootMsOffset >> 32));
+      memcpy(data+offset+6, (char *)&tmpLong, 4);
+      tmpLong = htonl((int)(bootMsOffset & 0xFFFFFFFF));
+      memcpy(data+offset+10, (char *)&tmpLong, 4);
+      offset += 14;
+    }
     memcpy(data+offset, "\000\004data\002", 7);
     tmpLong = htonl(packDataSize);
     memcpy(data+offset+7, (char *)&tmpLong, 4);
@@ -1411,6 +1420,7 @@ namespace DTSC {
     moreheader = 0;
     merged = false;
     bufferWindow = 0;
+    bootMsOffset = 0;
   }
 
   Meta::Meta(const DTSC::Packet & source) {
@@ -1425,6 +1435,7 @@ namespace DTSC {
     merged = source.getFlag("merged");
     bufferWindow = source.getInt("buffer_window");
     moreheader = source.getInt("moreheader");
+    bootMsOffset = source.getInt("bootoffset");
     source.getString("source", sourceURI);
     Scan tmpTracks = source.getScan().getMember("tracks");
     unsigned int num = 0;
@@ -1447,6 +1458,7 @@ namespace DTSC {
     live = meta.isMember("live") && meta["live"];
     sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : "";
     version = meta.isMember("version") ? meta["version"].asInt() : 0;
+    bootMsOffset = meta.isMember("bootoffset") ? meta["bootoffset"].asInt() : 0;
     merged = meta.isMember("merged") && meta["merged"];
     bufferWindow = 0;
     if (meta.isMember("buffer_window")) {
@@ -1476,6 +1488,9 @@ namespace DTSC {
     unsigned int dataLen;
     pack.getString("data", data, dataLen);
     update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size);
+    if (!bootMsOffset && pack.hasMember("bmo")){
+      bootMsOffset = pack.getInt("bmo");
+    }
   }
 
   ///\brief Updates a meta object given a DTSC::Packet with byte position override.
@@ -1874,6 +1889,7 @@ namespace DTSC {
       }
     }
     if (version){dataLen += 18;}
+    if (bootMsOffset){dataLen += 21;}
     if (sourceURI.size()){dataLen += 13+sourceURI.size();}
     return dataLen + 8; //add 8 bytes header
   }
@@ -1904,6 +1920,10 @@ namespace DTSC {
       writePointer(p, "\000\007version\001", 10);
       writePointer(p, convertLongLong(version), 8);
     }
+    if (bootMsOffset) {
+      writePointer(p, "\000\012bootoffset\001", 13);
+      writePointer(p, convertLongLong(bootMsOffset), 8);
+    }
     if (sourceURI.size()) {
       writePointer(p, "\000\006source\002", 9);
       writePointer(p, convertInt(sourceURI.size()), 4);
@@ -1946,6 +1966,10 @@ namespace DTSC {
       conn.SendNow("\000\007version\001", 10);
       conn.SendNow(convertLongLong(version), 8);
     }
+    if (bootMsOffset) {
+      conn.SendNow("\000\012bootoffset\001", 10);
+      conn.SendNow(convertLongLong(bootMsOffset), 8);
+    }
     if (sourceURI.size()) {
       conn.SendNow("\000\006source\002", 9);
       conn.SendNow(convertInt(sourceURI.size()), 4);
@@ -2044,6 +2068,9 @@ namespace DTSC {
     if (version) {
       result["version"] = (long long)version;
     }
+    if (bootMsOffset){
+      result["bootoffset"] = (long long)bootMsOffset;
+    }
     if (sourceURI.size()){
       result["source"] = sourceURI;
     }
@@ -2083,6 +2110,9 @@ namespace DTSC {
     if (sourceURI.size()){
       str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl;
     }
+    if (bootMsOffset){
+      str << std::string(indent, ' ') << "Boot MS offset: " << bootMsOffset << std::endl;
+    }
     str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl;
   }
 
@@ -2091,6 +2121,7 @@ namespace DTSC {
     for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
       it->second.reset();
     }
+    bootMsOffset = 0;
   }
 
 
diff --git a/src/output/output.cpp b/src/output/output.cpp
index 87bdd79b..20d94182 100644
--- a/src/output/output.cpp
+++ b/src/output/output.cpp
@@ -1187,6 +1187,7 @@ namespace Mist{
         return false;
       }
     }
+    initialize();
     return true;
   }
 
@@ -1225,6 +1226,7 @@ namespace Mist{
             DTSC::Meta reMeta;
             reMeta.reinit(tmpMeta);
             myMeta.sourceURI = reMeta.sourceURI;
+            myMeta.bootMsOffset = reMeta.bootMsOffset;
           }
           if (liveSem){
             liveSem->post();
@@ -1232,6 +1234,7 @@ namespace Mist{
             liveSem = 0;
           }
         }
+        nProxy.metaPages.clear();
       }
     }
   }
diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp
index fc46a3c3..b0115571 100644
--- a/src/output/output_rtmp.cpp
+++ b/src/output/output_rtmp.cpp
@@ -925,6 +925,16 @@ namespace Mist {
           F.toMeta(myMeta, *amf_storage, reTrack);
           if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){
             uint64_t tagTime = next.timestamp;
+            if (!bootMsOffset){
+              if (myMeta.bootMsOffset){
+                bootMsOffset = myMeta.bootMsOffset;
+                rtmpOffset = (Util::bootMS() - tagTime) - bootMsOffset;
+              }else{
+                bootMsOffset = Util::bootMS() - tagTime;
+                rtmpOffset = 0;
+              }
+            }
+            tagTime += rtmpOffset;
             uint64_t & ltt = lastTagTime[reTrack];
             //Check for decreasing timestamps - this is a connection error.
             //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set.
@@ -949,7 +959,7 @@ namespace Mist {
                 ptr[i+1] = tmpchar;
               }
             }
-            thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
+            thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe, F.isKeyframe?bootMsOffset:0);
             ltt = tagTime;
             if (!nProxy.userClient.getData()){
               char userPageName[NAME_BUFFER_SIZE];
diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h
index 3256c6b5..591ccad3 100644
--- a/src/output/output_rtmp.h
+++ b/src/output/output_rtmp.h
@@ -15,8 +15,10 @@ namespace Mist {
       void sendHeader();
       bool onFinish();
     protected:
-      uint64_t rtmpOffset;
       void parseVars(std::string data);
+      int64_t rtmpOffset;
+      uint64_t lastOutTime;
+      int64_t bootMsOffset;
       std::string app_name;
       void parseChunk(Socket::Buffer & inputBuffer);
       void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);