Generic input page loading re-prioritization edit

This commit is contained in:
Thulinma 2023-02-23 02:56:18 +01:00
parent 0c716714df
commit 6b88525e2f
6 changed files with 77 additions and 12 deletions

View file

@ -19,7 +19,11 @@
namespace Mist{ namespace Mist{
Util::Config *Input::config = NULL; Util::Config *Input::config = NULL;
void Input::userLeadIn(){connectedUsers = 0;} void Input::userLeadIn(){
connectedUsers = 0;
keyLoadPriority.clear();
}
void Input::userOnActive(size_t id){ void Input::userOnActive(size_t id){
++connectedUsers; ++connectedUsers;
size_t track = users.getTrack(id); size_t track = users.getTrack(id);
@ -36,12 +40,47 @@ namespace Mist{
//But! What if our current key is 20+ seconds long? HAVE YOU THOUGHT OF THAT?! //But! What if our current key is 20+ seconds long? HAVE YOU THOUGHT OF THAT?!
//Exactly! I thought not! So, if the end key number == the first, we increase by one. //Exactly! I thought not! So, if the end key number == the first, we increase by one.
if (endKey == key){++endKey;} if (endKey == key){++endKey;}
if (endKey > key + 1000){endKey = key + 1000;}
DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time); DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time);
for (size_t i = key; i <= endKey; i++){bufferFrame(track, i);} for (size_t i = key; i <= endKey; ){
const Util::RelAccX &tPages = M.pages(track);
if (!tPages.getEndPos()){return;}
DTSC::Keys keys(M.keys(track));
if (i > keys.getValidCount()){return;}
uint64_t pageIdx = 0;
for (uint64_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){
if (tPages.getInt("firstkey", j) > i) break;
pageIdx = j;
}
uint32_t pageNumber = tPages.getInt("firstkey", pageIdx);
if (i == key){
INFO_MSG("Track %zu key %zu is on page %" PRIu32, track, key, pageNumber);
keyLoadPriority[trackKey(track, pageNumber)] += 10000;
}else{
keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (key - i);
}
uint64_t cnt = tPages.getInt("keycount", pageIdx);
if (pageNumber + cnt <= i){return;}
i = pageNumber + cnt;
}
//Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM. //Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM.
} }
void Input::userOnDisconnect(size_t id){} void Input::userOnDisconnect(size_t id){}
void Input::userLeadOut(){} void Input::userLeadOut(){
INFO_MSG("Wanna load %zu keys", keyLoadPriority.size());
if (!keyLoadPriority.size()){return;}
//Make reverse mapping
std::multimap<uint64_t, trackKey> reverse;
for (std::map<trackKey, uint64_t>::iterator i = keyLoadPriority.begin(); i != keyLoadPriority.end(); ++i){
reverse.insert(std::pair<uint64_t, trackKey>(i->second, i->first));
}
uint64_t timer = Util::bootMS();
for (std::multimap<uint64_t, trackKey>::reverse_iterator i = reverse.rbegin(); i != reverse.rend() && Util::bootMS() < timer + 500; ++i){
bufferFrame(i->second.track, i->second.key);
}
}
void Input::reloadClientMeta(){ void Input::reloadClientMeta(){
if (M.getStreamName() != "" && M.getMaster()){return;} if (M.getStreamName() != "" && M.getMaster()){return;}
@ -674,10 +713,10 @@ namespace Mist{
// Initialize meta page // Initialize meta page
meta.reInit(streamName, true); meta.reInit(streamName, true);
}else{ }else{
std::set<size_t> validTracks = M.getValidTracks(true); //std::set<size_t> validTracks = M.getValidTracks(true);
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){ //for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
bufferFrame(*it, 0); // bufferFrame(*it, 0);
} //}
} }
meta.setSource(config->getString("input")); meta.setSource(config->getString("input"));
@ -699,6 +738,7 @@ namespace Mist{
activityCounter = Util::bootSecs(); activityCounter = Util::bootSecs();
// main serve loop // main serve loop
while (keepRunning()){ while (keepRunning()){
uint64_t preMs = Util::bootMS();
// load pages for connected clients on request // load pages for connected clients on request
userLeadIn(); userLeadIn();
COMM_LOOP(users, userOnActive(id), userOnDisconnect(id)) COMM_LOOP(users, userOnActive(id), userOnDisconnect(id))
@ -726,7 +766,12 @@ namespace Mist{
} }
} }
// if not shutting down, wait 1 second before looping // if not shutting down, wait 1 second before looping
if (config->is_active){Util::wait(INPUT_USER_INTERVAL);} preMs = Util::bootMS() - preMs;
uint64_t waitMs = INPUT_USER_INTERVAL;
if (preMs >= waitMs){waitMs = 0;}else{waitMs -= preMs;}
if (config->is_active && waitMs){
Util::wait(waitMs);
}
} }
if (!isThread()){ if (!isThread()){
if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;}

View file

@ -20,6 +20,20 @@ namespace Mist{
uint32_t curPart; uint32_t curPart;
}; };
struct trackKey{
size_t track;
size_t key;
trackKey(){track = 0; key = 0;}
trackKey(size_t t, size_t k){
track = t;
key = k;
}
};
inline bool operator< (const trackKey a, const trackKey b){
return a.track < b.track || (a.track == b.track && a.key < b.key);
}
class Input : public InOutBase{ class Input : public InOutBase{
public: public:
Input(Util::Config *cfg); Input(Util::Config *cfg);
@ -79,6 +93,7 @@ namespace Mist{
JSON::Value capa; JSON::Value capa;
std::map<size_t, std::set<uint64_t> > keyTimes; std::map<size_t, std::set<uint64_t> > keyTimes;
std::map<trackKey, uint64_t> keyLoadPriority;
// Create server for user pages // Create server for user pages
Comms::Users users; Comms::Users users;

View file

@ -1012,6 +1012,7 @@ namespace Mist{
/// \brief Override userLeadOut to buffer new data as live packets /// \brief Override userLeadOut to buffer new data as live packets
void inputHLS::userLeadOut(){ void inputHLS::userLeadOut(){
Input::userLeadOut();
if (!isLiveDVR){ if (!isLiveDVR){
return; return;
} }

View file

@ -229,7 +229,7 @@ namespace Mist{
bool Output::isReadyForPlay(){ bool Output::isReadyForPlay(){
// If a protocol does not support any codecs, we assume you know what you're doing // If a protocol does not support any codecs, we assume you know what you're doing
if (!capa.isMember("codecs")){return true;} if (!capa.isMember("codecs") || !capa["codecs"].size() || !capa["codecs"].isArray() || !capa["codecs"][0u].size()){return true;}
if (!isInitialized){return false;} if (!isInitialized){return false;}
meta.reloadReplacedPagesIfNeeded(); meta.reloadReplacedPagesIfNeeded();
if (getSupportedTracks().size()){ if (getSupportedTracks().size()){
@ -809,6 +809,10 @@ namespace Mist{
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){ for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
seekTracks.insert(it->first); seekTracks.insert(it->first);
} }
//Seek all seek positions, first
for (std::set<size_t>::iterator it = seekTracks.begin(); it != seekTracks.end(); it++){
userSelect[*it].setKeyNum(M.getKeyNumForTime(*it, pos));
}
bool ret = seekTracks.size(); bool ret = seekTracks.size();
for (std::set<size_t>::iterator it = seekTracks.begin(); it != seekTracks.end(); it++){ for (std::set<size_t>::iterator it = seekTracks.begin(); it != seekTracks.end(); it++){
ret &= seek(*it, pos, false); ret &= seek(*it, pos, false);

View file

@ -122,7 +122,7 @@ namespace Mist{
capa["provides"] = "HTTP"; capa["provides"] = "HTTP";
capa["protocol"] = "http://"; capa["protocol"] = "http://";
capa["url_rel"] = "/$.html"; capa["url_rel"] = "/$.html";
capa["codecs"][0u][0u].append("+*"); capa["codecs"][0u].null();
capa["url_match"].append("/crossdomain.xml"); capa["url_match"].append("/crossdomain.xml");
capa["url_match"].append("/clientaccesspolicy.xml"); capa["url_match"].append("/clientaccesspolicy.xml");
capa["url_match"].append("/$.html"); capa["url_match"].append("/$.html");

View file

@ -1452,7 +1452,7 @@ namespace Mist{
fragSeqNum = 0; fragSeqNum = 0;
idleInterval = 1000; idleInterval = 1000;
maxSkipAhead = 0; maxSkipAhead = 0;
dataWaitTimeout = 450; if (M.getLive()){dataWaitTimeout = 450;}
} }
void OutMP4::onWebsocketFrame() { void OutMP4::onWebsocketFrame() {
@ -1735,7 +1735,7 @@ namespace Mist{
if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);} if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);}
} }
r["data"]["jitter"] = jitter; r["data"]["jitter"] = jitter;
if (dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;} if (M.getLive() && dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;}
if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;} if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;}
webSock->sendFrame(r.toString()); webSock->sendFrame(r.toString());
} }