DTSC pull input now supports keeping stream UTC offset intact between servers, plus a local fallback calculation for older versions

This commit is contained in:
Siddarth Tegginamani 2021-11-10 13:00:20 +01:00 committed by Thulinma
parent 9403d34eb4
commit 4a866305e5
4 changed files with 35 additions and 2 deletions

View file

@ -985,6 +985,20 @@ namespace DTSC{
for (int i = 0; i < tNum; i++){
addTrackFrom(src.getMember("tracks").getIndice(i));
}
// Unix Time at zero point of a stream
if (src.hasMember("unixzero")){
setBootMsOffset(src.getMember("unixzero").asInt() - Util::unixMS() + Util::bootMS());
}else{
MEDIUM_MSG("No member \'unixzero\' found in DTSC::Scan. Calculating locally.");
int64_t lastMs = 0;
for (std::map<size_t, Track>::iterator it = tracks.begin(); it != tracks.end(); it++){
if (it->second.track.getInt(it->second.trackLastmsField) > lastMs){
lastMs = it->second.track.getInt(it->second.trackLastmsField);
}
}
setBootMsOffset(Util::bootMS() - lastMs);
}
}
void Meta::addTrackFrom(const DTSC::Scan &trak){
@ -2620,7 +2634,7 @@ namespace DTSC{
uint64_t Meta::getSendLen(bool skipDynamic, std::set<size_t> selectedTracks) const{
uint64_t dataLen = 34; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21;
if (getVod()){dataLen += 14;}
if (getLive()){dataLen += 15;}
if (getLive()){dataLen += 15 + 19;} // 19 for unixzero
for (std::map<size_t, Track>::const_iterator it = tracks.begin(); it != tracks.end(); it++){
if (!it->second.parts.getPresent()){continue;}
if (!selectedTracks.size() || selectedTracks.count(it->first)){
@ -2880,6 +2894,10 @@ namespace DTSC{
if (getLive()){conn.SendNow("\000\004live\001\000\000\000\000\000\000\000\001", 15);}
conn.SendNow("\000\007version\001", 10);
conn.SendNow(c64(DTSH_VERSION), 8);
if (getLive()){
conn.SendNow("\000\010unixzero\001", 11);
conn.SendNow(c64(Util::unixMS() - Util::bootMS() + getBootMsOffset()), 8);
}
conn.SendNow("\000\006tracks\340", 9);
for (std::set<size_t>::const_iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
std::string tmp = getTrackIdentifier(*it, true);

View file

@ -166,6 +166,7 @@ namespace Mist{
DTSC::Meta nM("", metaPack.getScan());
meta.reInit(streamName, false);
meta.merge(nM, true, false);
meta.setBootMsOffset(nM.getBootMsOffset());
std::set<size_t> validTracks = M.getMySourceTracks(getpid());
userSelect.clear();
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); ++it){

View file

@ -244,6 +244,20 @@ namespace Mist{
}
}
meta.reloadReplacedPagesIfNeeded();
// Unix Time at zero point of a stream
if (metaScan.hasMember("unixzero")){
meta.setBootMsOffset(metaScan.getMember("unixzero").asInt() - Util::unixMS() + Util::bootMS());
}else{
MEDIUM_MSG("No member \'unixzero\' found in DTSC::Scan. Calculating locally.");
int64_t lastMs = 0;
std::set<size_t> tracks = M.getValidTracks();
for (std::set<size_t>::iterator it = tracks.begin(); it != tracks.end(); it++){
if (M.getLastms(*it) > lastMs){
lastMs = M.getLastms(*it);
}
}
meta.setBootMsOffset(Util::bootMS() - lastMs);
}
std::stringstream rep;
rep << "DTSC_HEAD parsed, we went from " << prevTracks << " to " << meta.getValidTracks().size() << " tracks. Bring on those data packets!";
sendOk(rep.str());

View file

@ -20,7 +20,7 @@ JSON::Value & pData = pStat["proc_status_update"]["status"];
tthread::mutex statsMutex;
uint64_t statSinkMs = 0;
uint64_t statSourceMs = 0;
uint64_t bootMsOffset = 0;
int64_t bootMsOffset = 0;
namespace Mist{