diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index d5db4093..9638b737 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -253,6 +253,7 @@ int main_loop(int argc, char ** argv){ } Controller::writeConfig(); Controller::checkAvailProtocols(); + Controller::updateBandwidthConfig(); createAccount(Controller::conf.getString("account")); //if a terminal is connected and we're not logging to file diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index d5d2d4c5..20eb47d9 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -282,6 +282,18 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response Controller::prometheus = out["prometheus"].asStringRef(); } } + if (Request.isMember("bandwidth")){ + if (Request["bandwidth"].isObject()){ + if (Request["bandwidth"].isMember("limit") && Request["bandwidth"]["limit"].isInt()){ + Controller::Storage["bandwidth"]["limit"] = Request["bandwidth"]["limit"]; + } + if (Request["bandwidth"].isMember("exceptions") && Request["bandwidth"]["exceptions"].isArray()){ + Controller::Storage["bandwidth"]["exceptions"] = Request["bandwidth"]["exceptions"]; + } + Controller::updateBandwidthConfig(); + } + Response["bandwidth"] = Controller::Storage["bandwidth"]; + } if (Request.isMember("streams")){ Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]); } diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 3699d027..bef90337 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -43,12 +43,36 @@ bool Controller::killOnExit = KILL_ON_EXIT; tthread::mutex Controller::statsMutex; std::map Controller::activeStreams; unsigned int Controller::maxConnsPerIP = 0; +char noBWCountMatches[1717]; +uint64_t bwLimit = 128*1024*1024;//gigabit default limit /// Session cache shared memory page IPC::sharedPage * shmSessions = 0; /// Lock for the session cache shared memory page IPC::semaphore * cacheLock = 0; +/// Convert bandwidth config into memory format +void Controller::updateBandwidthConfig(){ + size_t offset = 0; + bwLimit = 128*1024*1024;//gigabit default limit + memset(noBWCountMatches, 0, 1717); + if (Storage.isMember("bandwidth")){ + if (Storage["bandwidth"].isMember("limit")){ + bwLimit = Storage["bandwidth"]["limit"].asInt(); + } + if (Storage["bandwidth"].isMember("exceptions")){ + jsonForEach(Storage["bandwidth"]["exceptions"], j){ + std::string newbins = Socket::getBinForms(j->asStringRef()); + if (offset + newbins.size() < 1700){ + memcpy(noBWCountMatches+offset, newbins.data(), newbins.size()); + offset += newbins.size(); + } + } + } + } +} + + //For server-wide totals. Local to this file only. struct streamTotals { unsigned long long upBytes; @@ -61,6 +85,8 @@ struct streamTotals { static std::map streamStats; static unsigned long long servUpBytes = 0; static unsigned long long servDownBytes = 0; +static unsigned long long servUpOtherBytes = 0; +static unsigned long long servDownOtherBytes = 0; static unsigned long long servInputs = 0; static unsigned long long servOutputs = 0; static unsigned long long servViewers = 0; @@ -467,8 +493,29 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da if (currUp - prevUp < 0 || currDown-prevDown < 0){ INFO_MSG("Negative data usage! %lldu/%lldd (u%lld->%lld) in %s over %s, #%lu", currUp-prevUp, currDown-prevDown, prevUp, currUp, data.streamName().c_str(), data.connector().c_str(), index); }else{ - servUpBytes += currUp - prevUp; - servDownBytes += currDown - prevDown; + if (!noBWCount){ + size_t bwMatchOffset = 0; + noBWCount = 1; + while (noBWCountMatches[bwMatchOffset+16] != 0 && bwMatchOffset < 1700){ + if (Socket::matchIPv6Addr(data.host(), std::string(noBWCountMatches+bwMatchOffset, 16), noBWCountMatches[bwMatchOffset+16])){ + noBWCount = 2; + break; + } + bwMatchOffset += 17; + } + if (noBWCount == 2){ + MEDIUM_MSG("Not counting for main bandwidth"); + }else{ + MEDIUM_MSG("Counting connection for main bandwidth"); + } + } + if (noBWCount == 2){ + servUpOtherBytes += currUp - prevUp; + servDownOtherBytes += currDown - prevDown; + }else{ + servUpBytes += currUp - prevUp; + servDownBytes += currDown - prevDown; + } } if (currDown + currUp > COUNTABLE_BYTES){ std::string streamName = data.streamName(); @@ -621,6 +668,7 @@ Controller::statSession::statSession(){ wipedUp = 0; wipedDown = 0; sessionType = SESS_UNSET; + noBWCount = 0; } /// Moves the given connection to the given session @@ -1548,10 +1596,13 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i response << "# HELP mist_bw_total Count of bytes handled since server start, by direction.\n"; response << "# TYPE mist_bw_total counter\n"; - response << "mist_bw_total{direction=\"up\"} " << bw_up_total << "\n"; - response << "mist_bw_total{direction=\"down\"} " << bw_down_total << "\n\n"; - response << "stat_bw_total{direction=\"up\"} " << servUpBytes << "\n"; - response << "stat_bw_total{direction=\"down\"} " << servDownBytes << "\n\n"; + response << "stat_bw_total{direction=\"up\"} " << bw_up_total << "\n"; + response << "stat_bw_total{direction=\"down\"} " << bw_down_total << "\n\n"; + response << "mist_bw_total{direction=\"up\"} " << servUpBytes << "\n"; + response << "mist_bw_total{direction=\"down\"} " << servDownBytes << "\n\n"; + response << "mist_bw_other{direction=\"up\"} " << servUpOtherBytes << "\n"; + response << "mist_bw_other{direction=\"down\"} " << servDownOtherBytes << "\n\n"; + response << "mist_bw_limit " << bwLimit << "\n\n"; response << "# HELP mist_viewers Number of sessions by type and stream active right now.\n"; response << "# TYPE mist_viewers gauge\n"; @@ -1620,10 +1671,13 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i resp["tot"].append((long long)servViewers); resp["tot"].append((long long)servInputs); resp["tot"].append((long long)servOutputs); - resp["bw"].append((long long)bw_up_total); - resp["bw"].append((long long)bw_down_total); - resp["st"].append((long long)servUpBytes); - resp["st"].append((long long)servDownBytes); + resp["st"].append((long long)bw_up_total); + resp["st"].append((long long)bw_down_total); + resp["bw"].append((long long)servUpBytes); + resp["bw"].append((long long)servDownBytes); + resp["bwlimit"] = (long long)bwLimit; + resp["obw"].append((long long)servUpOtherBytes); + resp["obw"].append((long long)servDownOtherBytes); for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index ce0c61dd..86947bb8 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -25,6 +25,8 @@ namespace Controller { ///This function is ran whenever a stream becomes inactive. void streamStopped(std::string stream); + void updateBandwidthConfig(); + struct statLog { long time; long lastSecond; @@ -82,6 +84,7 @@ namespace Controller { std::deque oldConns; sessType sessionType; bool tracked; + uint8_t noBWCount;///