diff --git a/src/analysers/load_analyser.cpp b/src/analysers/load_analyser.cpp index 650803fb..80344519 100644 --- a/src/analysers/load_analyser.cpp +++ b/src/analysers/load_analyser.cpp @@ -23,8 +23,8 @@ class hostDetails{ tthread::mutex * hostMutex; std::map streams; unsigned int cpu; - unsigned int ramMax; - unsigned int ramCurr; + unsigned long long ramMax; + unsigned long long ramCurr; unsigned int upSpeed; unsigned int downSpeed; unsigned int total; @@ -95,29 +95,47 @@ class hostDetails{ //Next, we add 200 points if the stream is already available. if (streams.count(s)){score += 200;} //Finally, account for bandwidth. We again scale from 0 to 1000 where 1000 is perfect. - score += (1000 - (((upSpeed + addBandwidth) * 1000) / availBandwidth)); - MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, (1000 - ((upSpeed * 1000) / availBandwidth)), (addBandwidth * 1000)/availBandwidth, availBandwidth / 1024 / 1024); + long long bwscore = (1000 - ((upSpeed * 1000) / availBandwidth)); + if (bwscore < 0){bwscore = 0;} + long long bw_sub = ((addBandwidth * 1000) / availBandwidth); + if (bwscore - bw_sub < 0){bw_sub = bwscore;} + score += (bwscore - bw_sub); + MEDIUM_MSG("CPU %u, RAM %u, Stream %u, BW %u (-%u) (max %llu MB/s)", 1000-cpu, 1000-((ramCurr * 1000) / ramMax), streams.count(s)?200:0, bwscore, bw_sub, availBandwidth / 1024 / 1024); return score; } void addViewer(std::string & s){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); + unsigned long long toAdd = 0; if (streams.count(s)){ - addBandwidth += streams[s].bandwidth; + toAdd = streams[s].bandwidth; }else{ if (total){ - addBandwidth += (upSpeed + downSpeed) / total; + toAdd = (upSpeed + downSpeed) / total; }else{ - addBandwidth += (upSpeed + downSpeed) + 100000; + toAdd = (upSpeed + downSpeed) + 100000; } } + //ensure reasonable limits of bandwidth guesses + if (toAdd < 64*1024){toAdd = 64*1024;}//minimum of 0.5 mbps + if (toAdd > 1024*1024){toAdd = 1024*1024;}//maximum of 8 mbps + addBandwidth += toAdd; } void update(JSON::Value & d){ if (!hostMutex){hostMutex = new tthread::mutex();} tthread::lock_guard guard(*hostMutex); cpu = d["cpu"].asInt(); - ramMax = d["mem_total"].asInt(); - ramCurr = d["mem_used"].asInt(); + long long nRamMax = d["mem_total"].asInt(); + long long nRamCur = d["mem_used"].asInt(); + long long nShmMax = d["shm_total"].asInt(); + long long nShmCur = d["shm_used"].asInt(); + if (((nRamCur + nShmCur)*1000) / nRamMax > (nShmCur*1000) / nShmMax){ + ramMax = nRamMax; + ramCurr = nRamCur + nShmCur; + }else{ + ramMax = nShmMax; + ramCurr = nShmCur; + } total = d["curr"][0u].asInt(); unsigned long long currUp = d["bw"][0u].asInt(), currDown = d["bw"][1u].asInt(); unsigned int timeDiff = 0; @@ -155,6 +173,19 @@ class hostDetails{ } strm.prevTotal = currTotal; } + if (streams.size()){ + std::set eraseList; + for (std::map::iterator it = streams.begin(); it != streams.end(); ++it){ + if (!d["streams"].isMember(it->first)){ + eraseList.insert(it->first); + } + } + for (std::set::iterator it = eraseList.begin(); it != eraseList.end(); ++it){ + streams.erase(*it); + } + } + }else{ + streams.clear(); } addBandwidth *= 0.9; } @@ -215,6 +246,7 @@ int handleRequest(Socket::Connection & conn){ }//if HTTP request received } conn.close(); + return 0; } void handleServer(void * servName){ @@ -263,8 +295,12 @@ void handleServer(void * servName){ H.SendRequest(servConn); H.Clean(); unsigned int startTime = Util::epoch(); - while (servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ - Util::sleep(100); + while (cfg->is_active && servConn && !((servConn.spool() || servConn.Received().size()) && H.Read(servConn))){ + if (Util::epoch() - startTime > 2){ + servConn.close(); + H.Clean(); + } + Util::sleep(250); } JSON::Value servData = JSON::fromString(H.body); if (!servData){