diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 0caec676..2752b1c5 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -14,6 +14,7 @@ #include "procs.h" #include "bitfields.h" #include "timing.h" +#include "auth.h" #if defined(__CYGWIN__) || defined(_WIN32) #include @@ -619,6 +620,11 @@ namespace IPC { htobl(data + 8, time); } + /// Calculates session ID from CRC, stream name, connector and host. + std::string statExchange::getSessId(){ + return Secure::md5(data+32, 140); + } + ///\brief Gets time currently connected long statExchange::time() { long result; diff --git a/lib/shared_memory.h b/lib/shared_memory.h index 1b665b78..ba9c807b 100644 --- a/lib/shared_memory.h +++ b/lib/shared_memory.h @@ -41,6 +41,7 @@ namespace IPC { void setSync(char s); unsigned int crc(); uint32_t getPID(); + std::string getSessId(); private: ///\brief The payload for the stat exchange /// - 8 byte - now (timestamp of last statistics) diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index a076aad9..4edcfdc0 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -181,6 +181,7 @@ int main_loop(int argc, char ** argv){ Controller::conf.addOption("maxconnsperip", JSON::fromString("{\"long\":\"maxconnsperip\", \"short\":\"M\", \"arg\":\"integer\" \"default\":0, \"help\":\"Max simultaneous sessions per unique IP address. Only enforced if the USER_NEW trigger is in use.\"}")); Controller::conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}")); Controller::conf.addOption("logfile", JSON::fromString("{\"long\":\"logfile\", \"short\":\"L\", \"arg\":\"string\" \"default\":\"\",\"help\":\"Redirect all standard output to a log file, provided with an argument\"}")); + Controller::conf.addOption("accesslog", JSON::fromString("{\"long\":\"accesslog\", \"short\":\"A\", \"arg\":\"string\" \"default\":\"LOG\",\"help\":\"Where to write the access log. If set to 'LOG' (the default), writes to wherever the log is written to. If empty, access logging is turned off. Otherwise, writes to the given filename.\"}")); Controller::conf.addOption("configFile", JSON::fromString("{\"long\":\"config\", \"short\":\"c\", \"arg\":\"string\" \"default\":\"config.json\", \"help\":\"Specify a config file other than default.\"}")); #ifdef UPDATER Controller::conf.addOption("update", JSON::fromString("{\"default\":0, \"help\":\"Check for and install updates before starting.\", \"short\":\"D\", \"long\":\"update\"}")); /*LTS*/ @@ -244,8 +245,14 @@ int main_loop(int argc, char ** argv){ if (Controller::Storage["config"]["controller"]["prometheus"]){ Controller::conf.getOption("prometheus", true)[0u] = Controller::Storage["config"]["controller"]["prometheus"]; } + if (Controller::Storage["config"].isMember("accesslog")){ + Controller::conf.getOption("accesslog", true)[0u] = Controller::Storage["config"]["accesslog"]; + } Controller::maxConnsPerIP = Controller::conf.getInteger("maxconnsperip"); Controller::Storage["config"]["controller"]["prometheus"] = Controller::conf.getString("prometheus"); + Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog"); + Controller::prometheus = Controller::Storage["config"]["controller"]["prometheus"].asStringRef(); + Controller::accesslog = Controller::Storage["config"]["accesslog"].asStringRef(); { IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1); configLock.unlink(); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 54fe491f..f42e8b12 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -71,6 +71,12 @@ void Controller::checkConfig(JSON::Value & in, JSON::Value & out){ INFO_MSG("Debug level set to %u", Util::Config::printDebugLevel); } } + if (out.isMember("controller") && out["controller"].isMember("prometheus")){ + Controller::prometheus = out["controller"]["prometheus"].asStringRef(); + } + if (out.isMember("accesslog")){ + Controller::accesslog = out["accesslog"].asStringRef(); + } } ///\brief Checks an authorization request for a given user. @@ -168,12 +174,12 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ while (conn && logins < 4){ if ((conn.spool() || conn.Received().size()) && H.Read(conn)){ //Catch prometheus requests - if (conf.getString("prometheus").size()){ - if (H.url == "/"+Controller::conf.getString("prometheus")){ + if (Controller::prometheus.size()){ + if (H.url == "/"+Controller::prometheus){ handlePrometheus(H, conn, PROMETHEUS_TEXT); continue; } - if (H.url == "/"+Controller::conf.getString("prometheus")+".json"){ + if (H.url == "/"+Controller::prometheus+".json"){ handlePrometheus(H, conn, PROMETHEUS_JSON); continue; } @@ -602,6 +608,34 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ } } + if (Request.isMember("stop_sessid")){ + if (Request["stop_sessid"].isArray() || Request["stop_sessid"].isObject()){ + jsonForEach(Request["stop_sessid"], it){ + Controller::sessId_shutdown(it->asStringRef()); + } + }else{ + Controller::sessId_shutdown(Request["stop_sessid"].asStringRef()); + } + } + + if (Request.isMember("stop_tag")){ + if (Request["stop_tag"].isArray() || Request["stop_tag"].isObject()){ + jsonForEach(Request["stop_tag"], it){ + Controller::tag_shutdown(it->asStringRef()); + } + }else{ + Controller::tag_shutdown(Request["stop_tag"].asStringRef()); + } + } + + if (Request.isMember("tag_sessid")){ + if (Request["tag_sessid"].isObject()){ + jsonForEach(Request["tag_sessid"], it){ + Controller::sessId_tag(it.key(), it->asStringRef()); + } + } + } + if (Request.isMember("push_start")){ std::string stream; @@ -665,6 +699,7 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ Controller::pushSettings(Request["push_settings"], Response["push_settings"]); } + Controller::configChanged = true; }else{//unauthorized diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index a173b6ef..6fabe609 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -60,6 +60,7 @@ static unsigned long long servOutputs = 0; static unsigned long long servViewers = 0; Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ + ID = "UNSET"; host = dhost; crc = dcrc; streamName = dstreamName; @@ -72,7 +73,7 @@ Controller::sessIndex::sessIndex(){ std::string Controller::sessIndex::toStr(){ std::stringstream s; - s << host << " " << crc << " " << streamName << " " << connector; + s << ID << "(" << host << " " << crc << " " << streamName << " " << connector << ")"; return s.str(); } @@ -83,6 +84,7 @@ Controller::sessIndex::sessIndex(IPC::statExchange & data){ streamName = data.streamName(); connector = data.connector(); crc = data.crc(); + ID = data.getSessId(); } @@ -136,17 +138,7 @@ void Controller::sessions_invalidate(const std::string & streamname){ for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->first.streamName == streamname){ sessCount++; - it->second.sync = 1; - if (it->second.curConns.size()){ - for (std::map::iterator jt = it->second.curConns.begin(); jt != it->second.curConns.end(); ++jt){ - char * data = statPointer->getIndex(jt->first); - if (data){ - IPC::statExchange tmpEx(data); - tmpEx.setSync(2); - invalidated++; - } - } - } + invalidated += it->second.invalidate(); } } INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str()); @@ -168,6 +160,59 @@ void Controller::sessions_shutdown(JSON::Iter & i){ //not handled, ignore } +///Shuts down the given session +void Controller::sessId_shutdown(const std::string & sessId){ + if (!statPointer){ + FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); + return; + } + unsigned int murdered = 0; + unsigned int sessCount = 0; + tthread::lock_guard guard(statsMutex); + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->first.ID == sessId){ + sessCount++; + murdered += it->second.kill(); + break; + } + } + INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); +} + +///Tags the given session +void Controller::sessId_tag(const std::string & sessId, const std::string & tag){ + if (!statPointer){ + FAIL_MSG("In controller shutdown procedure - cannot tag sessions."); + return; + } + tthread::lock_guard guard(statsMutex); + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->first.ID == sessId){ + it->second.tags.insert(tag); + return; + } + } + WARN_MSG("Session %s not found - cannot tag with %s", sessId.c_str(), tag.c_str()); +} + +///Shuts down sessions with the given tag set +void Controller::tag_shutdown(const std::string & tag){ + if (!statPointer){ + FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); + return; + } + unsigned int murdered = 0; + unsigned int sessCount = 0; + tthread::lock_guard guard(statsMutex); + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->second.tags.count(tag)){ + sessCount++; + murdered += it->second.kill(); + } + } + INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); +} + ///Shuts down all current sessions for the given streamname void Controller::sessions_shutdown(const std::string & streamname, const std::string & protocol){ if (!statPointer){ @@ -178,20 +223,9 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if ((!streamname.size() || it->first.streamName == streamname) && (!protocol.size() || it->first.connector == protocol) && it->second.curConns.size()){ + if ((!streamname.size() || it->first.streamName == streamname) && (!protocol.size() || it->first.connector == protocol)){ sessCount++; - for (std::map::iterator jt = it->second.curConns.begin(); jt != it->second.curConns.end(); ++jt){ - char * data = statPointer->getIndex(jt->first); - if (data){ - IPC::statExchange tmpEx(data); - uint32_t pid = tmpEx.getPID(); - if (pid > 1){ - Util::Procs::Stop(pid); - INFO_MSG("Killing PID %lu", pid); - murdered++; - } - } - } + murdered += it->second.kill(); } } INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str()); @@ -217,8 +251,10 @@ void Controller::SharedMemStats(void * config){ if (sessions.size()){ std::list mustWipe; unsigned long long cutOffPoint = Util::epoch() - STAT_CUTOFF; + unsigned long long disconnectPoint = Util::epoch() - STATS_DELAY; for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ it->second.wipeOld(cutOffPoint); + it->second.ping(it->first, disconnectPoint); if (!it->second.hasData()){ mustWipe.push_back(it->first); }else{ @@ -273,6 +309,45 @@ void Controller::SharedMemStats(void * config){ } } +/// Forces a re-sync of the session +uint32_t Controller::statSession::invalidate(){ + uint32_t ret = 0; + sync = 1; + if (curConns.size() && statPointer){ + for (std::map::iterator jt = curConns.begin(); jt != curConns.end(); ++jt){ + char * data = statPointer->getIndex(jt->first); + if (data){ + IPC::statExchange tmpEx(data); + tmpEx.setSync(2); + ret++; + } + } + } + return ret; +} + +/// Kills all active connections, sets the session state to denied (sync=100). +uint32_t Controller::statSession::kill(){ + uint32_t ret = 0; + sync = 100; + if (curConns.size() && statPointer){ + for (std::map::iterator jt = curConns.begin(); jt != curConns.end(); ++jt){ + char * data = statPointer->getIndex(jt->first); + if (data){ + IPC::statExchange tmpEx(data); + tmpEx.setSync(100); + uint32_t pid = tmpEx.getPID(); + if (pid > 1){ + Util::Procs::Stop(pid); + INFO_MSG("Killing PID %lu", pid); + } + ret++; + } + } + } + return ret; +} + /// Updates the given active connection with new stats data. void Controller::statSession::update(unsigned long index, IPC::statExchange & data){ //update the sync byte: 0 = requesting fill, 2 = requesting refill, 1 = needs checking, > 1 = state known (100=denied, 10=accepted) @@ -313,14 +388,18 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da long long prevDown = getDown(); long long prevUp = getUp(); curConns[index].update(data); - //store timestamp of last received data, if newer - if (data.now() > lastSec){ - lastSec = data.now(); - } //store timestamp of first received data, if older if (firstSec > data.now()){ firstSec = data.now(); } + //store timestamp of last received data, if newer + if (data.now() > lastSec){ + lastSec = data.now(); + if (!tracked){ + tracked = true; + firstActive = firstSec; + } + } long long currDown = getDown(); long long currUp = getUp(); if (currUp - prevUp < 0 || currDown-prevDown < 0){ @@ -414,6 +493,56 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ } } +void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){ + if (!tracked){return;} + if (lastSec < disconnectPoint){ + if (Controller::accesslog.size()){ + uint64_t duration = lastSec - firstActive; + if (duration < 1){duration = 1;} + if (Controller::accesslog == "LOG"){ + std::stringstream accessStr; + accessStr << "Session <" << index.ID << "> " << index.streamName << " (" << index.connector << ") from " << index.host << " ended after " << duration << "s, avg " << getUp()/duration/1024 << "KB/s up " << getDown()/duration/1024 << "KB/s down."; + if (tags.size()){ + accessStr << " Tags: "; + for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ + accessStr << "[" << *it << "]"; + } + } + accessStr << std::endl; + Controller::Log("ACCS", accessStr.str()); + }else{ + static std::ofstream accLogFile; + static std::string accLogFileName; + if (accLogFileName != Controller::accesslog || !accLogFile.good()){ + accLogFile.close(); + accLogFile.open(Controller::accesslog, std::ios_base::app); + if (!accLogFile.good()){ + FAIL_MSG("Could not open access log file '%s': %s", Controller::accesslog.c_str(), strerror(errno)); + }else{ + accLogFileName = Controller::accesslog; + } + } + if (accLogFile.good()){ + time_t rawtime; + struct tm *timeinfo; + char buffer[100]; + time(&rawtime); + timeinfo = localtime(&rawtime); + strftime(buffer, 100, "%F %H:%M:%S", timeinfo); + accLogFile << buffer << ", " << index.ID << ", " << index.streamName << ", " << index.connector << ", " << index.host << ", " << duration << ", " << getUp()/duration/1024 << ", " << getDown()/duration/1024 << ", "; + if (tags.size()){ + for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ + accLogFile << "[" << *it << "]"; + } + } + accLogFile << std::endl; + } + } + } + tracked = false; + } +} + /// Archives the given connection. void Controller::statSession::finish(unsigned long index){ oldConns.push_back(curConns[index]); @@ -422,6 +551,8 @@ void Controller::statSession::finish(unsigned long index){ /// Constructs an empty session Controller::statSession::statSession(){ + firstActive = 0; + tracked = false; firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; sync = 1; diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 11b4df17..276c42a8 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -46,6 +46,7 @@ namespace Controller { sessIndex(std::string host, unsigned int crc, std::string streamName, std::string connector); sessIndex(IPC::statExchange & data); sessIndex(); + std::string ID; std::string host; unsigned int crc; std::string streamName; @@ -73,21 +74,27 @@ namespace Controller { /// Allows for moving of connections to another session. class statSession { private: + uint64_t firstActive; unsigned long long firstSec; unsigned long long lastSec; unsigned long long wipedUp; unsigned long long wipedDown; std::deque oldConns; sessType sessionType; + bool tracked; public: + statSession(); + uint32_t invalidate(); + uint32_t kill(); char sync; std::map curConns; + std::set tags; sessType getSessType(); - statSession(); void wipeOld(unsigned long long); void finish(unsigned long index); void switchOverTo(statSession & newSess, unsigned long index); void update(unsigned long index, IPC::statExchange & data); + void ping(const sessIndex & index, unsigned long long disconnectPoint); unsigned long long getStart(); unsigned long long getEnd(); bool isViewerOn(unsigned long long time); @@ -118,6 +125,9 @@ namespace Controller { void SharedMemStats(void * config); void sessions_invalidate(const std::string & streamname); void sessions_shutdown(JSON::Iter & i); + void sessId_shutdown(const std::string & sessId); + void tag_shutdown(const std::string & tag); + void sessId_tag(const std::string & sessId, const std::string & tag); void sessions_shutdown(const std::string & streamname, const std::string & protocol = ""); bool hasViewers(std::string streamName); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 520bfd70..f37fface 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -13,6 +13,8 @@ ///\brief Holds everything unique to the controller. namespace Controller{ std::string instanceId; /// instanceId (previously uniqId) is first set in controller.cpp before licensing or update calls. + std::string prometheus; + std::string accesslog; Util::Config conf; JSON::Value Storage; ///< Global storage of data. tthread::mutex configMutex; diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 2cf3e2c6..60067a73 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -4,7 +4,9 @@ #include namespace Controller { - extern std::string instanceId; ///global storage of instanceId (previously uniqID) for updater + extern std::string instanceId; ///