diff --git a/lib/dtsc.h b/lib/dtsc.h index 428fa3c2..472a34c4 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -118,7 +118,7 @@ namespace DTSC { packType getVersion() const; void reInit(Socket::Connection & src); void reInit(const char * data_, unsigned int len, bool noCopy = false); - void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe); + void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset = 0); void getString(const char * identifier, char *& result, unsigned int & len) const; void getString(const char * identifier, std::string & result) const; void getInt(const char * identifier, uint64_t & result) const; @@ -348,6 +348,7 @@ namespace DTSC { uint16_t version; long long int moreheader; long long int bufferWindow; + int64_t bootMsOffset;///< Millis to add to packet timestamps to get millis since system boot. std::string sourceURI; }; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index a1833849..d6be464d 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -209,7 +209,7 @@ namespace DTSC { /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. /// When given a NULL pointer, the data is reserved and memset to 0 /// If given a NULL pointer and a zero size, an empty packet is created. - void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe){ + void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset){ null(); master = true; //time and trackID are part of the 20-byte header. @@ -217,12 +217,13 @@ namespace DTSC { //offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen) //bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen) //keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen) + //bootmsoffset, if != 0, adds 9 bytes (integer type) and 5 bytes (2+namelen) //data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen) if (packData && packDataSize < 1){ FAIL_MSG("Attempted to fill a packet with %lli bytes!", packDataSize); return; } - unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + packDataSize+11; + unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + (bootMsOffset?14:0) + packDataSize+11; resize(sendLen); //set internal variables version = DTSC_V2; @@ -259,6 +260,14 @@ namespace DTSC { memcpy(data+offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19); offset += 19; } + if (bootMsOffset){ + memcpy(data+offset, "\000\003bmo\001", 6); + tmpLong = htonl((int)(bootMsOffset >> 32)); + memcpy(data+offset+6, (char *)&tmpLong, 4); + tmpLong = htonl((int)(bootMsOffset & 0xFFFFFFFF)); + memcpy(data+offset+10, (char *)&tmpLong, 4); + offset += 14; + } memcpy(data+offset, "\000\004data\002", 7); tmpLong = htonl(packDataSize); memcpy(data+offset+7, (char *)&tmpLong, 4); @@ -1411,6 +1420,7 @@ namespace DTSC { moreheader = 0; merged = false; bufferWindow = 0; + bootMsOffset = 0; } Meta::Meta(const DTSC::Packet & source) { @@ -1425,6 +1435,7 @@ namespace DTSC { merged = source.getFlag("merged"); bufferWindow = source.getInt("buffer_window"); moreheader = source.getInt("moreheader"); + bootMsOffset = source.getInt("bootoffset"); source.getString("source", sourceURI); Scan tmpTracks = source.getScan().getMember("tracks"); unsigned int num = 0; @@ -1447,6 +1458,7 @@ namespace DTSC { live = meta.isMember("live") && meta["live"]; sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : ""; version = meta.isMember("version") ? meta["version"].asInt() : 0; + bootMsOffset = meta.isMember("bootoffset") ? meta["bootoffset"].asInt() : 0; merged = meta.isMember("merged") && meta["merged"]; bufferWindow = 0; if (meta.isMember("buffer_window")) { @@ -1476,6 +1488,9 @@ namespace DTSC { unsigned int dataLen; pack.getString("data", data, dataLen); update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size); + if (!bootMsOffset && pack.hasMember("bmo")){ + bootMsOffset = pack.getInt("bmo"); + } } ///\brief Updates a meta object given a DTSC::Packet with byte position override. @@ -1874,6 +1889,7 @@ namespace DTSC { } } if (version){dataLen += 18;} + if (bootMsOffset){dataLen += 21;} if (sourceURI.size()){dataLen += 13+sourceURI.size();} return dataLen + 8; //add 8 bytes header } @@ -1904,6 +1920,10 @@ namespace DTSC { writePointer(p, "\000\007version\001", 10); writePointer(p, convertLongLong(version), 8); } + if (bootMsOffset) { + writePointer(p, "\000\012bootoffset\001", 13); + writePointer(p, convertLongLong(bootMsOffset), 8); + } if (sourceURI.size()) { writePointer(p, "\000\006source\002", 9); writePointer(p, convertInt(sourceURI.size()), 4); @@ -1946,6 +1966,10 @@ namespace DTSC { conn.SendNow("\000\007version\001", 10); conn.SendNow(convertLongLong(version), 8); } + if (bootMsOffset) { + conn.SendNow("\000\012bootoffset\001", 10); + conn.SendNow(convertLongLong(bootMsOffset), 8); + } if (sourceURI.size()) { conn.SendNow("\000\006source\002", 9); conn.SendNow(convertInt(sourceURI.size()), 4); @@ -2044,6 +2068,9 @@ namespace DTSC { if (version) { result["version"] = (long long)version; } + if (bootMsOffset){ + result["bootoffset"] = (long long)bootMsOffset; + } if (sourceURI.size()){ result["source"] = sourceURI; } @@ -2083,6 +2110,9 @@ namespace DTSC { if (sourceURI.size()){ str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl; } + if (bootMsOffset){ + str << std::string(indent, ' ') << "Boot MS offset: " << bootMsOffset << std::endl; + } str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; } @@ -2091,6 +2121,7 @@ namespace DTSC { for (std::map::iterator it = tracks.begin(); it != tracks.end(); it++) { it->second.reset(); } + bootMsOffset = 0; } diff --git a/src/output/output.cpp b/src/output/output.cpp index 87bdd79b..20d94182 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1187,6 +1187,7 @@ namespace Mist{ return false; } } + initialize(); return true; } @@ -1225,6 +1226,7 @@ namespace Mist{ DTSC::Meta reMeta; reMeta.reinit(tmpMeta); myMeta.sourceURI = reMeta.sourceURI; + myMeta.bootMsOffset = reMeta.bootMsOffset; } if (liveSem){ liveSem->post(); @@ -1232,6 +1234,7 @@ namespace Mist{ liveSem = 0; } } + nProxy.metaPages.clear(); } } } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index fc46a3c3..b0115571 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -925,6 +925,16 @@ namespace Mist { F.toMeta(myMeta, *amf_storage, reTrack); if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ uint64_t tagTime = next.timestamp; + if (!bootMsOffset){ + if (myMeta.bootMsOffset){ + bootMsOffset = myMeta.bootMsOffset; + rtmpOffset = (Util::bootMS() - tagTime) - bootMsOffset; + }else{ + bootMsOffset = Util::bootMS() - tagTime; + rtmpOffset = 0; + } + } + tagTime += rtmpOffset; uint64_t & ltt = lastTagTime[reTrack]; //Check for decreasing timestamps - this is a connection error. //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. @@ -949,7 +959,7 @@ namespace Mist { ptr[i+1] = tmpchar; } } - thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); + thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe, F.isKeyframe?bootMsOffset:0); ltt = tagTime; if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; diff --git a/src/output/output_rtmp.h b/src/output/output_rtmp.h index 3256c6b5..591ccad3 100644 --- a/src/output/output_rtmp.h +++ b/src/output/output_rtmp.h @@ -15,8 +15,10 @@ namespace Mist { void sendHeader(); bool onFinish(); protected: - uint64_t rtmpOffset; void parseVars(std::string data); + int64_t rtmpOffset; + uint64_t lastOutTime; + int64_t bootMsOffset; std::string app_name; void parseChunk(Socket::Buffer & inputBuffer); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);