From 2a028fa309328aec704e14c90ce43840f44dd553 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 25 Jun 2018 14:23:01 +0200 Subject: [PATCH] Added bootMsOffset to DTSC::Meta, RTMP now syncs on it. --- lib/dtsc.h | 3 ++- lib/dtscmeta.cpp | 35 +++++++++++++++++++++++++++++++++-- src/output/output.cpp | 3 +++ src/output/output_rtmp.cpp | 13 ++++++++++++- src/output/output_rtmp.h | 1 + 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index 4ed43109..07ffd81d 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -118,7 +118,7 @@ namespace DTSC { packType getVersion() const; void reInit(Socket::Connection & src); void reInit(const char * data_, unsigned int len, bool noCopy = false); - void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe); + void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset = 0); void getString(const char * identifier, char *& result, unsigned int & len) const; void getString(const char * identifier, std::string & result) const; void getInt(const char * identifier, uint64_t & result) const; @@ -375,6 +375,7 @@ namespace DTSC { uint16_t version; long long int moreheader; long long int bufferWindow; + int64_t bootMsOffset;///< Millis to add to packet timestamps to get millis since system boot. std::string sourceURI; }; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 6ff8c0ad..c4bfc38a 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -209,7 +209,7 @@ namespace DTSC { /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. /// When given a NULL pointer, the data is reserved and memset to 0 /// If given a NULL pointer and a zero size, an empty packet is created. - void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe){ + void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset){ null(); master = true; //time and trackID are part of the 20-byte header. @@ -217,12 +217,13 @@ namespace DTSC { //offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen) //bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen) //keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen) + //bootmsoffset, if != 0, adds 9 bytes (integer type) and 5 bytes (2+namelen) //data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen) if (packData && packDataSize < 1){ FAIL_MSG("Attempted to fill a packet with %lli bytes for timestamp %llu, track %llu!", packDataSize, packTime, packTrack); return; } - unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + packDataSize+11; + unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + (bootMsOffset?14:0) + packDataSize+11; resize(sendLen); //set internal variables version = DTSC_V2; @@ -259,6 +260,14 @@ namespace DTSC { memcpy(data+offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19); offset += 19; } + if (bootMsOffset){ + memcpy(data+offset, "\000\003bmo\001", 6); + tmpLong = htonl((int)(bootMsOffset >> 32)); + memcpy(data+offset+6, (char *)&tmpLong, 4); + tmpLong = htonl((int)(bootMsOffset & 0xFFFFFFFF)); + memcpy(data+offset+10, (char *)&tmpLong, 4); + offset += 14; + } memcpy(data+offset, "\000\004data\002", 7); tmpLong = htonl(packDataSize); memcpy(data+offset+7, (char *)&tmpLong, 4); @@ -1465,6 +1474,7 @@ namespace DTSC { moreheader = 0; merged = false; bufferWindow = 0; + bootMsOffset = 0; } Meta::Meta(const DTSC::Packet & source) { @@ -1479,6 +1489,7 @@ namespace DTSC { merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); + bootMsOffset = source.getInt("bootoffset"); source.getString("source", sourceURI); Scan tmpTracks = source.getScan().getMember("tracks"); unsigned int num = 0; @@ -1501,6 +1512,7 @@ namespace DTSC { live = meta.isMember("live") && meta["live"]; sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : ""; version = meta.isMember("version") ? meta["version"].asInt() : 0; + bootMsOffset = meta.isMember("bootoffset") ? meta["bootoffset"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; if (meta.isMember("buffer_window")) { @@ -1539,6 +1551,9 @@ namespace DTSC { update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size); LTS*/ update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size, ivecLen?ivec:0); + if (!bootMsOffset && pack.hasMember("bmo")){ + bootMsOffset = pack.getInt("bmo"); + } } ///\brief Updates a meta object given a DTSC::Packet with byte position override. @@ -1959,6 +1974,7 @@ namespace DTSC { } } if (version){dataLen += 18;} + if (bootMsOffset){dataLen += 21;} if (sourceURI.size()){dataLen += 13+sourceURI.size();} return dataLen + 8; //add 8 bytes header } @@ -1989,6 +2005,10 @@ namespace DTSC { writePointer(p, "\000\007version\001", 10); writePointer(p, convertLongLong(version), 8); } + if (bootMsOffset) { + writePointer(p, "\000\012bootoffset\001", 13); + writePointer(p, convertLongLong(bootMsOffset), 8); + } if (sourceURI.size()) { writePointer(p, "\000\006source\002", 9); writePointer(p, convertInt(sourceURI.size()), 4); @@ -2031,6 +2051,10 @@ namespace DTSC { conn.SendNow("\000\007version\001", 10); conn.SendNow(convertLongLong(version), 8); } + if (bootMsOffset) { + conn.SendNow("\000\012bootoffset\001", 10); + conn.SendNow(convertLongLong(bootMsOffset), 8); + } if (sourceURI.size()) { conn.SendNow("\000\006source\002", 9); conn.SendNow(convertInt(sourceURI.size()), 4); @@ -2137,6 +2161,9 @@ namespace DTSC { if (version) { result["version"] = (long long)version; } + if (bootMsOffset){ + result["bootoffset"] = (long long)bootMsOffset; + } if (sourceURI.size()){ result["source"] = sourceURI; } @@ -2176,6 +2203,9 @@ namespace DTSC { if (sourceURI.size()){ str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl; } + if (bootMsOffset){ + str << std::string(indent, ' ') << "Boot MS offset: " << bootMsOffset << std::endl; + } str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; } @@ -2184,6 +2214,7 @@ namespace DTSC { for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { it->second.reset(); } + bootMsOffset = 0; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 6812c01f..0cf9a10f 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1712,6 +1712,7 @@ namespace Mist{ return false; } } + initialize(); return true; } @@ -1751,6 +1752,7 @@ namespace Mist{ DTSC::Meta reMeta; reMeta.reinit(tmpMeta); myMeta.sourceURI = reMeta.sourceURI; + myMeta.bootMsOffset = reMeta.bootMsOffset; } if (liveSem){ liveSem->post(); @@ -1758,6 +1760,7 @@ namespace Mist{ liveSem = 0; } } + nProxy.metaPages.clear(); } } } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index c10b7972..e23e772f 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -13,6 +13,7 @@ namespace Mist{ OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn){ lastOutTime = 0; rtmpOffset = 0; + bootMsOffset = 0; maxbps = config->getInteger("maxkbps")*128; if (config->getString("target").size() && config->getString("target").substr(0, 7) == "rtmp://"){ streamName = config->getString("streamname"); @@ -1088,6 +1089,16 @@ namespace Mist{ F.toMeta(myMeta, *amf_storage, reTrack); if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ uint64_t tagTime = next.timestamp; + if (!bootMsOffset){ + if (myMeta.bootMsOffset){ + bootMsOffset = myMeta.bootMsOffset; + rtmpOffset = (Util::bootMS() - tagTime) - bootMsOffset; + }else{ + bootMsOffset = Util::bootMS() - tagTime; + rtmpOffset = 0; + } + } + tagTime += rtmpOffset; uint64_t & ltt = lastTagTime[reTrack]; //Check for decreasing timestamps - this is a connection error. //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. @@ -1112,7 +1123,7 @@ namespace Mist{ ptr[i+1] = tmpchar; } } - thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe, F.isKeyframe?bootMsOffset:0); ltt = tagTime; if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 4850e194..29be586a 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -21,6 +21,7 @@ namespace Mist { int64_t rtmpOffset; uint64_t lastOutTime; unsigned int maxbps; + int64_t bootMsOffset; std::string app_name; void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);