Support limiting output range for most outputs and outgoing pushes

This commit is contained in:
Thulinma 2023-05-16 02:56:45 +02:00
parent 3e2a17ff93
commit 7dbd60b208
21 changed files with 433 additions and 186 deletions

View file

@ -18,7 +18,7 @@
#define PRETTY_ARG_TIME(t) \
(int)(t) / 86400, ((int)(t) % 86400) / 3600, ((int)(t) % 3600) / 60, (int)(t) % 60
#define PRETTY_PRINT_MSTIME "%ud%.2uh%.2um%.2us.%.3u"
#define PRETTY_ARG_MSTIME(t) PRETTY_ARG_TIME(t / 1000), (int)(t % 1000)
#define PRETTY_ARG_MSTIME(t) PRETTY_ARG_TIME((t) / 1000), (int)((t) % 1000)
#if DEBUG > -1
#define APPIDENT APPNAME "/" PACKAGE_VERSION

View file

@ -896,6 +896,7 @@ namespace DTSC{
streamMemBuf = 0;
isMemBuf = false;
isMaster = true;
removeLimiter();
reInit(_streamName, src);
}
@ -907,6 +908,7 @@ namespace DTSC{
streamMemBuf = 0;
isMemBuf = false;
isMaster = master;
removeLimiter();
reInit(_streamName, master, autoBackOff);
}
@ -916,6 +918,7 @@ namespace DTSC{
streamMemBuf = 0;
isMemBuf = false;
isMaster = true;
removeLimiter();
reInit(_streamName, fileName);
}
@ -989,6 +992,7 @@ namespace DTSC{
// Unix Time at zero point of a stream
if (src.hasMember("unixzero")){
setBootMsOffset(src.getMember("unixzero").asInt() - Util::unixMS() + Util::bootMS());
setUTCOffset(src.getMember("unixzero").asInt());
}else{
MEDIUM_MSG("No member \'unixzero\' found in DTSC::Scan. Calculating locally.");
int64_t nowMs = 0;
@ -2001,6 +2005,7 @@ namespace DTSC{
}
uint64_t Meta::getFirstms(size_t trackIdx) const{
const DTSC::Track &t = tracks.at(trackIdx);
if (isLimited && limitMin > t.track.getInt(t.trackFirstmsField)){return limitMin;}
return t.track.getInt(t.trackFirstmsField);
}
@ -2010,6 +2015,7 @@ namespace DTSC{
}
uint64_t Meta::getLastms(size_t trackIdx) const{
const DTSC::Track &t = tracks.find(trackIdx)->second;
if (isLimited && limitMax < t.track.getInt(t.trackLastmsField)){return limitMax;}
return t.track.getInt(t.trackLastmsField);
}
@ -2023,6 +2029,7 @@ namespace DTSC{
}
uint64_t Meta::getDuration(size_t trackIdx) const{
if (isLimited){return getLastms(trackIdx) - getFirstms(trackIdx);}
const DTSC::Track &t = tracks.at(trackIdx);
return t.track.getInt(t.trackLastmsField) - t.track.getInt(t.trackFirstmsField);
}
@ -2117,12 +2124,16 @@ namespace DTSC{
void Meta::setVod(bool vod){
stream.setInt(streamVodField, vod ? 1 : 0);
}
bool Meta::getVod() const{return stream.getInt(streamVodField);}
bool Meta::getVod() const{
return isLimited || stream.getInt(streamVodField);
}
void Meta::setLive(bool live){
stream.setInt(streamLiveField, live ? 1 : 0);
}
bool Meta::getLive() const{return stream.getInt(streamLiveField);}
bool Meta::getLive() const{
return (!isLimited || limitMax == 0xFFFFFFFFFFFFFFFFull) && stream.getInt(streamLiveField);
}
bool Meta::hasBFrames(size_t idx) const{
std::set<size_t> vTracks = getValidTracks();
@ -2272,7 +2283,7 @@ namespace DTSC{
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackIdx,
(uint32_t)t.pages.getInt("firstkey", t.pages.getDeleted()));
IPC::sharedPage p(thisPageName, 20971520);
IPC::sharedPage p(thisPageName, 20971520, false, false);
p.master = true;
// Then delete the page entry
@ -2554,6 +2565,13 @@ namespace DTSC{
const Util::RelAccX &Meta::parts(size_t idx) const{return tracks.at(idx).parts;}
Util::RelAccX &Meta::keys(size_t idx){return tracks.at(idx).keys;}
const Util::RelAccX &Meta::keys(size_t idx) const{return tracks.at(idx).keys;}
const Keys Meta::getKeys(size_t trackIdx) const{
DTSC::Keys k(keys(trackIdx));
if (isLimited){k.applyLimiter(limitMin, limitMax, DTSC::Parts(parts(trackIdx)));}
return k;
}
const Util::RelAccX &Meta::fragments(size_t idx) const{return tracks.at(idx).fragments;}
const Util::RelAccX &Meta::pages(size_t idx) const{return tracks.at(idx).pages;}
Util::RelAccX &Meta::pages(size_t idx){return tracks.at(idx).pages;}
@ -2652,7 +2670,8 @@ namespace DTSC{
uint64_t Meta::getSendLen(bool skipDynamic, std::set<size_t> selectedTracks) const{
uint64_t dataLen = 34; // + (merged ? 17 : 0) + (bufferWindow ? 24 : 0) + 21;
if (getVod()){dataLen += 14;}
if (getLive()){dataLen += 15 + 19;} // 19 for unixzero
if (getLive()){dataLen += 15;}
if (getLive() || getUTCOffset()){dataLen += 19;} // unixzero field
for (std::map<size_t, Track>::const_iterator it = tracks.begin(); it != tracks.end(); it++){
if (!it->second.parts.getPresent()){continue;}
if (!selectedTracks.size() || selectedTracks.count(it->first)){
@ -2804,9 +2823,13 @@ namespace DTSC{
if (getLive()){conn.SendNow("\000\004live\001\000\000\000\000\000\000\000\001", 15);}
conn.SendNow("\000\007version\001", 10);
conn.SendNow(c64(DTSH_VERSION), 8);
if (getLive()){
if (getLive() || getUTCOffset()){
conn.SendNow("\000\010unixzero\001", 11);
conn.SendNow(c64(Util::unixMS() - Util::bootMS() + getBootMsOffset()), 8);
if (getLive()){
conn.SendNow(c64(Util::unixMS() - Util::bootMS() + getBootMsOffset()), 8);
}else{
conn.SendNow(c64(getUTCOffset()), 8);
}
}
if (lVarSize){
conn.SendNow("\000\016inputLocalVars\002", 17);
@ -3272,6 +3295,19 @@ namespace DTSC{
// return is by reference
}
void Meta::removeLimiter(){
isLimited = false;
limitMin = 0;
limitMax = 0;
}
void Meta::applyLimiter(uint64_t min, uint64_t max){
isLimited = true;
limitMin = min;
limitMax = max;
INFO_MSG("Applied limiter from %" PRIu64 " to %" PRIu64, min, max);
}
/// Returns true if the tracks idx1 and idx2 are keyframe aligned
bool Meta::keyTimingsMatch(size_t idx1, size_t idx2) const {
const DTSC::Track &t1 = tracks.at(idx1);
@ -3325,6 +3361,7 @@ namespace DTSC{
partsField = cKeys.getFieldData("parts");
timeField = cKeys.getFieldData("time");
sizeField = cKeys.getFieldData("size");
isLimited = false;
}
Keys::Keys(const Util::RelAccX &_keys) : isConst(true), keys(empty), cKeys(_keys){
@ -3335,23 +3372,143 @@ namespace DTSC{
partsField = cKeys.getFieldData("parts");
timeField = cKeys.getFieldData("time");
sizeField = cKeys.getFieldData("size");
isLimited = false;
}
size_t Keys::getFirstValid() const{return cKeys.getDeleted();}
size_t Keys::getEndValid() const{return cKeys.getEndPos();}
size_t Keys::getFirstValid() const{
return isLimited ? limMin : cKeys.getDeleted();
}
size_t Keys::getEndValid() const{
return isLimited ? limMax : cKeys.getEndPos();
}
size_t Keys::getValidCount() const{return getEndValid() - getFirstValid();}
size_t Keys::getFirstPart(size_t idx) const{return cKeys.getInt(firstPartField, idx);}
size_t Keys::getFirstPart(size_t idx) const{
if (isLimited && idx == limMin){return limMinFirstPart;}
return cKeys.getInt(firstPartField, idx);
}
size_t Keys::getBpos(size_t idx) const{return cKeys.getInt(bposField, idx);}
uint64_t Keys::getDuration(size_t idx) const{return cKeys.getInt(durationField, idx);}
uint64_t Keys::getDuration(size_t idx) const{
if (isLimited && idx + 1 == limMax){return limMaxDuration;}
if (isLimited && idx == limMin){return limMinDuration;}
return cKeys.getInt(durationField, idx);
}
size_t Keys::getNumber(size_t idx) const{return cKeys.getInt(numberField, idx);}
size_t Keys::getParts(size_t idx) const{return cKeys.getInt(partsField, idx);}
uint64_t Keys::getTime(size_t idx) const{return cKeys.getInt(timeField, idx);}
size_t Keys::getParts(size_t idx) const{
if (isLimited && idx + 1 == limMax){return limMaxParts;}
if (isLimited && idx == limMin){return limMinParts;}
return cKeys.getInt(partsField, idx);
}
uint64_t Keys::getTime(size_t idx) const{
if (isLimited && idx == limMin){return limMinTime;}
return cKeys.getInt(timeField, idx);
}
void Keys::setSize(size_t idx, size_t _size){
if (isConst){return;}
keys.setInt(sizeField, _size, idx);
}
size_t Keys::getSize(size_t idx) const{return cKeys.getInt(sizeField, idx);}
size_t Keys::getSize(size_t idx) const{
if (isLimited && idx + 1 == limMax){return limMaxSize;}
if (isLimited && idx == limMin){return limMinSize;}
return cKeys.getInt(sizeField, idx);
}
uint64_t Keys::getTotalPartCount(){
return getParts(getEndValid()-1) + getFirstPart(getEndValid()-1) - getFirstPart(getFirstValid());
}
uint32_t Keys::getIndexForTime(uint64_t timestamp){
uint32_t firstKey = getFirstValid();
uint32_t endKey = getEndValid();
for (size_t i = firstKey; i < endKey; i++){
if (getTime(i) + getDuration(i) > timestamp){return i;}
}
return endKey;
}
void Keys::applyLimiter(uint64_t _min, uint64_t _max, DTSC::Parts _p){
// Determine first and last key available within the limits
// Note: limMax replaces getEndValid(), and is thus one _past_ the end key index!
limMin = getFirstValid();
limMax = getEndValid();
for (size_t i = limMin; i < limMax; i++){
if (getTime(i) <= _min){limMin = i;}
if (getTime(i) >= _max){
limMax = i;
break;
}
}
// We can't have 0 keys, so force at least 1 key in cases where min >= max.
if (limMin >= limMax){limMax = limMin + 1;}
// If the first key is the last key, the override calculation is a little trickier
if (limMin + 1 == limMax){
//Calculate combined first/last key override
{
limMinDuration = 0;
limMinParts = 0;
limMinSize = 0;
limMinFirstPart = getFirstPart(limMin);
limMinTime = getTime(limMin);
size_t partNo = limMinFirstPart;
size_t truePartEnd = partNo + getParts(limMin);
while (partNo < truePartEnd){
if (limMinTime >= _min){
if (limMinTime + limMinDuration >= _max){break;}
++limMinParts;
limMinDuration += _p.getDuration(partNo);
limMinSize += _p.getSize(partNo);
}else{
++limMinFirstPart;
limMinTime += _p.getDuration(partNo);
}
++partNo;
}
limMaxSize = limMinSize;
limMaxParts = limMinParts;
limMaxDuration = limMinDuration;
}
}else{
//Calculate first key overrides
{
limMinDuration = getDuration(limMin);
limMinParts = getParts(limMin);
limMinSize = getSize(limMin);
limMinFirstPart = getFirstPart(limMin);
limMinTime = getTime(limMin);
size_t partNo = limMinFirstPart;
size_t truePartEnd = partNo + limMinParts;
while (partNo < truePartEnd){
if (limMinTime >= _min){break;}
--limMinParts;
limMinDuration -= _p.getDuration(partNo);
limMinSize -= _p.getSize(partNo);
++limMinFirstPart;
limMinTime += _p.getDuration(partNo);
++partNo;
}
}
//Calculate last key overrides
{
limMaxDuration = limMaxParts = limMaxSize = 0;
size_t partNo = getFirstPart(limMax-1);
size_t truePartEnd = partNo + getParts(limMax-1);
uint64_t endTime = getTime(limMax-1);
while (partNo < truePartEnd){
if (endTime + limMaxDuration >= _max){break;}
++limMaxParts;
limMaxDuration += _p.getDuration(partNo);
limMaxSize += _p.getSize(partNo);
++partNo;
}
}
}
HIGH_MSG("Key limiter applied from %" PRIu64 " to %" PRIu64 ", key times %" PRIu64 " to %" PRIu64 ", %lld parts, %lld parts", _min, _max, getTime(limMin), getTime(limMax-1), (long long)limMinParts-(long long)getParts(limMin), (long long)limMaxParts-(long long)getParts(limMax-1));
isLimited = true;
}
Fragments::Fragments(const Util::RelAccX &_fragments) : fragments(_fragments){}
size_t Fragments::getFirstValid() const{return fragments.getDeleted();}

View file

@ -191,8 +191,27 @@ namespace DTSC{
void setSize(size_t idx, size_t _size);
size_t getSize(size_t idx) const;
uint64_t getTotalPartCount();
uint32_t getIndexForTime(uint64_t timestamp);
void applyLimiter(uint64_t _min, uint64_t _max, DTSC::Parts _p);
private:
bool isConst;
bool isLimited;
size_t limMin;
size_t limMax;
//Overrides for max key
size_t limMaxParts;
uint64_t limMaxDuration;
size_t limMaxSize;
//Overrides for min key
size_t limMinParts;
size_t limMinFirstPart;
uint64_t limMinDuration;
uint64_t limMinTime;
size_t limMinSize;
Util::RelAccX empty;
Util::RelAccX &keys;
@ -477,6 +496,8 @@ namespace DTSC{
Util::RelAccX &pages(size_t idx);
const Util::RelAccX &pages(size_t idx) const;
const Keys getKeys(size_t trackIdx) const;
std::string toPrettyString() const;
void remap(const std::string &_streamName = "");
@ -495,6 +516,9 @@ namespace DTSC{
void getHealthJSON(JSON::Value & returnReference) const;
void removeLimiter();
void applyLimiter(uint64_t min, uint64_t max);
protected:
void sBufMem(size_t trackCount = DEFAULT_TRACK_COUNT);
void sBufShm(const std::string &_streamName, size_t trackCount = DEFAULT_TRACK_COUNT, bool master = true, bool autoBackOff = true);
@ -509,6 +533,9 @@ namespace DTSC{
std::map<size_t, IPC::sharedPage> tM;
bool isMaster;
uint64_t limitMin;
uint64_t limitMax;
bool isLimited;
char *streamMemBuf;
bool isMemBuf;

View file

@ -502,13 +502,11 @@ bool FLV::Tag::DTSCMetaInit(const DTSC::Meta &M, std::set<size_t> &selTracks){
int i = 0;
uint64_t mediaLen = 0;
for (std::set<size_t>::iterator it = selTracks.begin(); it != selTracks.end(); it++){
if (M.getLastms(*it) - M.getFirstms(*it) > mediaLen){
mediaLen = M.getLastms(*it) - M.getFirstms(*it);
}
if (M.getDuration(*it) > mediaLen){mediaLen = M.getDuration(*it);}
if (M.getType(*it) == "video"){
trinfo.addContent(AMF::Object("", AMF::AMF0_OBJECT));
trinfo.getContentP(i)->addContent(AMF::Object(
"length", ((double)M.getLastms(*it) / 1000) * ((double)M.getFpks(*it) / 1000.0), AMF::AMF0_NUMBER));
"length", ((double)M.getDuration(*it) / 1000) * ((double)M.getFpks(*it) / 1000.0), AMF::AMF0_NUMBER));
trinfo.getContentP(i)->addContent(AMF::Object("timescale", ((double)M.getFpks(*it) / 1000), AMF::AMF0_NUMBER));
trinfo.getContentP(i)->addContent(AMF::Object("sampledescription", AMF::AMF0_STRICT_ARRAY));
amfdata.getContentP(1)->addContent(AMF::Object("hasVideo", 1, AMF::AMF0_BOOL));
@ -552,7 +550,7 @@ bool FLV::Tag::DTSCMetaInit(const DTSC::Meta &M, std::set<size_t> &selTracks){
if (M.getType(*it) == "audio"){
trinfo.addContent(AMF::Object("", AMF::AMF0_OBJECT));
trinfo.getContentP(i)->addContent(
AMF::Object("length", (double)(M.getLastms(*it) * M.getRate(*it)), AMF::AMF0_NUMBER));
AMF::Object("length", (double)(M.getDuration(*it) * M.getRate(*it)), AMF::AMF0_NUMBER));
trinfo.getContentP(i)->addContent(AMF::Object("timescale", M.getRate(*it), AMF::AMF0_NUMBER));
trinfo.getContentP(i)->addContent(AMF::Object("sampledescription", AMF::AMF0_STRICT_ARRAY));
amfdata.getContentP(1)->addContent(AMF::Object("hasAudio", 1, AMF::AMF0_BOOL));
@ -575,7 +573,7 @@ bool FLV::Tag::DTSCMetaInit(const DTSC::Meta &M, std::set<size_t> &selTracks){
}
}
if (M.getVod()){
amfdata.getContentP(1)->addContent(AMF::Object("duration", mediaLen / 1000, AMF::AMF0_NUMBER));
amfdata.getContentP(1)->addContent(AMF::Object("duration", mediaLen / 1000.0, AMF::AMF0_NUMBER));
}
amfdata.getContentP(1)->addContent(trinfo);