Added bootMsOffset to DTSC::Meta, RTMP now syncs on it.

This commit is contained in:
Thulinma 2018-07-04 11:30:38 +02:00
parent bd0b820577
commit 43934cf69c
5 changed files with 52 additions and 5 deletions

View file

@ -118,7 +118,7 @@ namespace DTSC {
packType getVersion() const; packType getVersion() const;
void reInit(Socket::Connection & src); void reInit(Socket::Connection & src);
void reInit(const char * data_, unsigned int len, bool noCopy = false); void reInit(const char * data_, unsigned int len, bool noCopy = false);
void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe); void genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset = 0);
void getString(const char * identifier, char *& result, unsigned int & len) const; void getString(const char * identifier, char *& result, unsigned int & len) const;
void getString(const char * identifier, std::string & result) const; void getString(const char * identifier, std::string & result) const;
void getInt(const char * identifier, uint64_t & result) const; void getInt(const char * identifier, uint64_t & result) const;
@ -348,6 +348,7 @@ namespace DTSC {
uint16_t version; uint16_t version;
long long int moreheader; long long int moreheader;
long long int bufferWindow; long long int bufferWindow;
int64_t bootMsOffset;///< Millis to add to packet timestamps to get millis since system boot.
std::string sourceURI; std::string sourceURI;
}; };

View file

@ -209,7 +209,7 @@ namespace DTSC {
/// Re-initializes this Packet to contain a generic DTSC packet with the given data fields. /// Re-initializes this Packet to contain a generic DTSC packet with the given data fields.
/// When given a NULL pointer, the data is reserved and memset to 0 /// When given a NULL pointer, the data is reserved and memset to 0
/// If given a NULL pointer and a zero size, an empty packet is created. /// If given a NULL pointer and a zero size, an empty packet is created.
void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe){ void Packet::genericFill(long long packTime, long long packOffset, long long packTrack, const char * packData, long long packDataSize, uint64_t packBytePos, bool isKeyframe, int64_t bootMsOffset){
null(); null();
master = true; master = true;
//time and trackID are part of the 20-byte header. //time and trackID are part of the 20-byte header.
@ -217,12 +217,13 @@ namespace DTSC {
//offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen) //offset, if non-zero, adds 9 bytes (integer type) and 8 bytes (2+namelen)
//bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen) //bpos, if >= 0, adds 9 bytes (integer type) and 6 bytes (2+namelen)
//keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen) //keyframe, if true, adds 9 bytes (integer type) and 10 bytes (2+namelen)
//bootmsoffset, if != 0, adds 9 bytes (integer type) and 5 bytes (2+namelen)
//data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen) //data adds packDataSize+5 bytes (string type) and 6 bytes (2+namelen)
if (packData && packDataSize < 1){ if (packData && packDataSize < 1){
FAIL_MSG("Attempted to fill a packet with %lli bytes!", packDataSize); FAIL_MSG("Attempted to fill a packet with %lli bytes!", packDataSize);
return; return;
} }
unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + packDataSize+11; unsigned int sendLen = 24 + (packOffset?17:0) + (packBytePos?15:0) + (isKeyframe?19:0) + (bootMsOffset?14:0) + packDataSize+11;
resize(sendLen); resize(sendLen);
//set internal variables //set internal variables
version = DTSC_V2; version = DTSC_V2;
@ -259,6 +260,14 @@ namespace DTSC {
memcpy(data+offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19); memcpy(data+offset, "\000\010keyframe\001\000\000\000\000\000\000\000\001", 19);
offset += 19; offset += 19;
} }
if (bootMsOffset){
memcpy(data+offset, "\000\003bmo\001", 6);
tmpLong = htonl((int)(bootMsOffset >> 32));
memcpy(data+offset+6, (char *)&tmpLong, 4);
tmpLong = htonl((int)(bootMsOffset & 0xFFFFFFFF));
memcpy(data+offset+10, (char *)&tmpLong, 4);
offset += 14;
}
memcpy(data+offset, "\000\004data\002", 7); memcpy(data+offset, "\000\004data\002", 7);
tmpLong = htonl(packDataSize); tmpLong = htonl(packDataSize);
memcpy(data+offset+7, (char *)&tmpLong, 4); memcpy(data+offset+7, (char *)&tmpLong, 4);
@ -1411,6 +1420,7 @@ namespace DTSC {
moreheader = 0; moreheader = 0;
merged = false; merged = false;
bufferWindow = 0; bufferWindow = 0;
bootMsOffset = 0;
} }
Meta::Meta(const DTSC::Packet & source) { Meta::Meta(const DTSC::Packet & source) {
@ -1425,6 +1435,7 @@ namespace DTSC {
merged = source.getFlag("merged"); merged = source.getFlag("merged");
bufferWindow = source.getInt("buffer_window"); bufferWindow = source.getInt("buffer_window");
moreheader = source.getInt("moreheader"); moreheader = source.getInt("moreheader");
bootMsOffset = source.getInt("bootoffset");
source.getString("source", sourceURI); source.getString("source", sourceURI);
Scan tmpTracks = source.getScan().getMember("tracks"); Scan tmpTracks = source.getScan().getMember("tracks");
unsigned int num = 0; unsigned int num = 0;
@ -1447,6 +1458,7 @@ namespace DTSC {
live = meta.isMember("live") && meta["live"]; live = meta.isMember("live") && meta["live"];
sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : ""; sourceURI = meta.isMember("source") ? meta["source"].asStringRef() : "";
version = meta.isMember("version") ? meta["version"].asInt() : 0; version = meta.isMember("version") ? meta["version"].asInt() : 0;
bootMsOffset = meta.isMember("bootoffset") ? meta["bootoffset"].asInt() : 0;
merged = meta.isMember("merged") && meta["merged"]; merged = meta.isMember("merged") && meta["merged"];
bufferWindow = 0; bufferWindow = 0;
if (meta.isMember("buffer_window")) { if (meta.isMember("buffer_window")) {
@ -1476,6 +1488,9 @@ namespace DTSC {
unsigned int dataLen; unsigned int dataLen;
pack.getString("data", data, dataLen); pack.getString("data", data, dataLen);
update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size); update(pack.getTime(), pack.hasMember("offset")?pack.getInt("offset"):0, pack.getTrackId(), dataLen, pack.hasMember("bpos")?pack.getInt("bpos"):0, pack.hasMember("keyframe"), pack.getDataLen(), segment_size);
if (!bootMsOffset && pack.hasMember("bmo")){
bootMsOffset = pack.getInt("bmo");
}
} }
///\brief Updates a meta object given a DTSC::Packet with byte position override. ///\brief Updates a meta object given a DTSC::Packet with byte position override.
@ -1874,6 +1889,7 @@ namespace DTSC {
} }
} }
if (version){dataLen += 18;} if (version){dataLen += 18;}
if (bootMsOffset){dataLen += 21;}
if (sourceURI.size()){dataLen += 13+sourceURI.size();} if (sourceURI.size()){dataLen += 13+sourceURI.size();}
return dataLen + 8; //add 8 bytes header return dataLen + 8; //add 8 bytes header
} }
@ -1904,6 +1920,10 @@ namespace DTSC {
writePointer(p, "\000\007version\001", 10); writePointer(p, "\000\007version\001", 10);
writePointer(p, convertLongLong(version), 8); writePointer(p, convertLongLong(version), 8);
} }
if (bootMsOffset) {
writePointer(p, "\000\012bootoffset\001", 13);
writePointer(p, convertLongLong(bootMsOffset), 8);
}
if (sourceURI.size()) { if (sourceURI.size()) {
writePointer(p, "\000\006source\002", 9); writePointer(p, "\000\006source\002", 9);
writePointer(p, convertInt(sourceURI.size()), 4); writePointer(p, convertInt(sourceURI.size()), 4);
@ -1946,6 +1966,10 @@ namespace DTSC {
conn.SendNow("\000\007version\001", 10); conn.SendNow("\000\007version\001", 10);
conn.SendNow(convertLongLong(version), 8); conn.SendNow(convertLongLong(version), 8);
} }
if (bootMsOffset) {
conn.SendNow("\000\012bootoffset\001", 10);
conn.SendNow(convertLongLong(bootMsOffset), 8);
}
if (sourceURI.size()) { if (sourceURI.size()) {
conn.SendNow("\000\006source\002", 9); conn.SendNow("\000\006source\002", 9);
conn.SendNow(convertInt(sourceURI.size()), 4); conn.SendNow(convertInt(sourceURI.size()), 4);
@ -2044,6 +2068,9 @@ namespace DTSC {
if (version) { if (version) {
result["version"] = (long long)version; result["version"] = (long long)version;
} }
if (bootMsOffset){
result["bootoffset"] = (long long)bootMsOffset;
}
if (sourceURI.size()){ if (sourceURI.size()){
result["source"] = sourceURI; result["source"] = sourceURI;
} }
@ -2083,6 +2110,9 @@ namespace DTSC {
if (sourceURI.size()){ if (sourceURI.size()){
str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl; str << std::string(indent, ' ') << "Source: " << sourceURI << std::endl;
} }
if (bootMsOffset){
str << std::string(indent, ' ') << "Boot MS offset: " << bootMsOffset << std::endl;
}
str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl; str << std::string(indent, ' ') << "More Header: " << moreheader << std::endl;
} }
@ -2091,6 +2121,7 @@ namespace DTSC {
for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) { for (std::map<unsigned int, Track>::iterator it = tracks.begin(); it != tracks.end(); it++) {
it->second.reset(); it->second.reset();
} }
bootMsOffset = 0;
} }

View file

@ -1187,6 +1187,7 @@ namespace Mist{
return false; return false;
} }
} }
initialize();
return true; return true;
} }
@ -1225,6 +1226,7 @@ namespace Mist{
DTSC::Meta reMeta; DTSC::Meta reMeta;
reMeta.reinit(tmpMeta); reMeta.reinit(tmpMeta);
myMeta.sourceURI = reMeta.sourceURI; myMeta.sourceURI = reMeta.sourceURI;
myMeta.bootMsOffset = reMeta.bootMsOffset;
} }
if (liveSem){ if (liveSem){
liveSem->post(); liveSem->post();
@ -1232,6 +1234,7 @@ namespace Mist{
liveSem = 0; liveSem = 0;
} }
} }
nProxy.metaPages.clear();
} }
} }
} }

View file

@ -925,6 +925,16 @@ namespace Mist {
F.toMeta(myMeta, *amf_storage, reTrack); F.toMeta(myMeta, *amf_storage, reTrack);
if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){
uint64_t tagTime = next.timestamp; uint64_t tagTime = next.timestamp;
if (!bootMsOffset){
if (myMeta.bootMsOffset){
bootMsOffset = myMeta.bootMsOffset;
rtmpOffset = (Util::bootMS() - tagTime) - bootMsOffset;
}else{
bootMsOffset = Util::bootMS() - tagTime;
rtmpOffset = 0;
}
}
tagTime += rtmpOffset;
uint64_t & ltt = lastTagTime[reTrack]; uint64_t & ltt = lastTagTime[reTrack];
//Check for decreasing timestamps - this is a connection error. //Check for decreasing timestamps - this is a connection error.
//We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set. //We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set.
@ -949,7 +959,7 @@ namespace Mist {
ptr[i+1] = tmpchar; ptr[i+1] = tmpchar;
} }
} }
thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe, F.isKeyframe?bootMsOffset:0);
ltt = tagTime; ltt = tagTime;
if (!nProxy.userClient.getData()){ if (!nProxy.userClient.getData()){
char userPageName[NAME_BUFFER_SIZE]; char userPageName[NAME_BUFFER_SIZE];

View file

@ -15,8 +15,10 @@ namespace Mist {
void sendHeader(); void sendHeader();
bool onFinish(); bool onFinish();
protected: protected:
uint64_t rtmpOffset;
void parseVars(std::string data); void parseVars(std::string data);
int64_t rtmpOffset;
uint64_t lastOutTime;
int64_t bootMsOffset;
std::string app_name; std::string app_name;
void parseChunk(Socket::Buffer & inputBuffer); void parseChunk(Socket::Buffer & inputBuffer);
void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId); void parseAMFCommand(AMF::Object & amfData, int messageType, int streamId);