From 6b88525e2f1b1e57f1f6aec752203b5d40e7551b Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 23 Feb 2023 02:56:18 +0100 Subject: [PATCH] Generic input page loading re-prioritization edit --- src/input/input.cpp | 61 +++++++++++++++++++++++++---- src/input/input.h | 15 +++++++ src/input/input_hls.cpp | 1 + src/output/output.cpp | 6 ++- src/output/output_http_internal.cpp | 2 +- src/output/output_mp4.cpp | 4 +- 6 files changed, 77 insertions(+), 12 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index 6e16b783..b5186158 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -19,7 +19,11 @@ namespace Mist{ Util::Config *Input::config = NULL; - void Input::userLeadIn(){connectedUsers = 0;} + void Input::userLeadIn(){ + connectedUsers = 0; + keyLoadPriority.clear(); + } + void Input::userOnActive(size_t id){ ++connectedUsers; size_t track = users.getTrack(id); @@ -36,12 +40,47 @@ namespace Mist{ //But! What if our current key is 20+ seconds long? HAVE YOU THOUGHT OF THAT?! //Exactly! I thought not! So, if the end key number == the first, we increase by one. if (endKey == key){++endKey;} + if (endKey > key + 1000){endKey = key + 1000;} DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time); - for (size_t i = key; i <= endKey; i++){bufferFrame(track, i);} + for (size_t i = key; i <= endKey; ){ + + + const Util::RelAccX &tPages = M.pages(track); + if (!tPages.getEndPos()){return;} + DTSC::Keys keys(M.keys(track)); + if (i > keys.getValidCount()){return;} + uint64_t pageIdx = 0; + for (uint64_t j = tPages.getDeleted(); j < tPages.getEndPos(); j++){ + if (tPages.getInt("firstkey", j) > i) break; + pageIdx = j; + } + uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); + if (i == key){ + INFO_MSG("Track %zu key %zu is on page %" PRIu32, track, key, pageNumber); + keyLoadPriority[trackKey(track, pageNumber)] += 10000; + }else{ + keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (key - i); + } + uint64_t cnt = tPages.getInt("keycount", pageIdx); + if (pageNumber + cnt <= i){return;} + i = pageNumber + cnt; + } //Now, we can rest assured that the next ~120 seconds or so is pre-buffered in RAM. } void Input::userOnDisconnect(size_t id){} - void Input::userLeadOut(){} + void Input::userLeadOut(){ + INFO_MSG("Wanna load %zu keys", keyLoadPriority.size()); + if (!keyLoadPriority.size()){return;} + //Make reverse mapping + std::multimap reverse; + for (std::map::iterator i = keyLoadPriority.begin(); i != keyLoadPriority.end(); ++i){ + reverse.insert(std::pair(i->second, i->first)); + } + uint64_t timer = Util::bootMS(); + for (std::multimap::reverse_iterator i = reverse.rbegin(); i != reverse.rend() && Util::bootMS() < timer + 500; ++i){ + bufferFrame(i->second.track, i->second.key); + } + } void Input::reloadClientMeta(){ if (M.getStreamName() != "" && M.getMaster()){return;} @@ -674,10 +713,10 @@ namespace Mist{ // Initialize meta page meta.reInit(streamName, true); }else{ - std::set validTracks = M.getValidTracks(true); - for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ - bufferFrame(*it, 0); - } + //std::set validTracks = M.getValidTracks(true); + //for (std::set::iterator it = validTracks.begin(); it != validTracks.end(); it++){ + // bufferFrame(*it, 0); + //} } meta.setSource(config->getString("input")); @@ -699,6 +738,7 @@ namespace Mist{ activityCounter = Util::bootSecs(); // main serve loop while (keepRunning()){ + uint64_t preMs = Util::bootMS(); // load pages for connected clients on request userLeadIn(); COMM_LOOP(users, userOnActive(id), userOnDisconnect(id)) @@ -726,7 +766,12 @@ namespace Mist{ } } // if not shutting down, wait 1 second before looping - if (config->is_active){Util::wait(INPUT_USER_INTERVAL);} + preMs = Util::bootMS() - preMs; + uint64_t waitMs = INPUT_USER_INTERVAL; + if (preMs >= waitMs){waitMs = 0;}else{waitMs -= preMs;} + if (config->is_active && waitMs){ + Util::wait(waitMs); + } } if (!isThread()){ if (streamStatus){streamStatus.mapped[0] = STRMSTAT_SHUTDOWN;} diff --git a/src/input/input.h b/src/input/input.h index 9163483d..4baf208f 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -20,6 +20,20 @@ namespace Mist{ uint32_t curPart; }; + struct trackKey{ + size_t track; + size_t key; + trackKey(){track = 0; key = 0;} + trackKey(size_t t, size_t k){ + track = t; + key = k; + } + }; + + inline bool operator< (const trackKey a, const trackKey b){ + return a.track < b.track || (a.track == b.track && a.key < b.key); + } + class Input : public InOutBase{ public: Input(Util::Config *cfg); @@ -79,6 +93,7 @@ namespace Mist{ JSON::Value capa; std::map > keyTimes; + std::map keyLoadPriority; // Create server for user pages Comms::Users users; diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index a48e3b98..ebd7a185 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -1012,6 +1012,7 @@ namespace Mist{ /// \brief Override userLeadOut to buffer new data as live packets void inputHLS::userLeadOut(){ + Input::userLeadOut(); if (!isLiveDVR){ return; } diff --git a/src/output/output.cpp b/src/output/output.cpp index a108ef00..c06ac773 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -229,7 +229,7 @@ namespace Mist{ bool Output::isReadyForPlay(){ // If a protocol does not support any codecs, we assume you know what you're doing - if (!capa.isMember("codecs")){return true;} + if (!capa.isMember("codecs") || !capa["codecs"].size() || !capa["codecs"].isArray() || !capa["codecs"][0u].size()){return true;} if (!isInitialized){return false;} meta.reloadReplacedPagesIfNeeded(); if (getSupportedTracks().size()){ @@ -809,6 +809,10 @@ namespace Mist{ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ seekTracks.insert(it->first); } + //Seek all seek positions, first + for (std::set::iterator it = seekTracks.begin(); it != seekTracks.end(); it++){ + userSelect[*it].setKeyNum(M.getKeyNumForTime(*it, pos)); + } bool ret = seekTracks.size(); for (std::set::iterator it = seekTracks.begin(); it != seekTracks.end(); it++){ ret &= seek(*it, pos, false); diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 445d3a61..ff1225cc 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -122,7 +122,7 @@ namespace Mist{ capa["provides"] = "HTTP"; capa["protocol"] = "http://"; capa["url_rel"] = "/$.html"; - capa["codecs"][0u][0u].append("+*"); + capa["codecs"][0u].null(); capa["url_match"].append("/crossdomain.xml"); capa["url_match"].append("/clientaccesspolicy.xml"); capa["url_match"].append("/$.html"); diff --git a/src/output/output_mp4.cpp b/src/output/output_mp4.cpp index f202b2f9..65ad78a9 100644 --- a/src/output/output_mp4.cpp +++ b/src/output/output_mp4.cpp @@ -1452,7 +1452,7 @@ namespace Mist{ fragSeqNum = 0; idleInterval = 1000; maxSkipAhead = 0; - dataWaitTimeout = 450; + if (M.getLive()){dataWaitTimeout = 450;} } void OutMP4::onWebsocketFrame() { @@ -1735,7 +1735,7 @@ namespace Mist{ if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);} } r["data"]["jitter"] = jitter; - if (dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;} + if (M.getLive() && dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;} if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;} webSock->sendFrame(r.toString()); }