diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e9bd644..6af16808 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -592,6 +592,17 @@ makeOutput(RTSP rtsp)#LTS makeOutput(WAV wav)#LTS makeOutput(SDP sdp http) +add_executable(MistSession + ${BINARY_DIR}/mist/.headers + src/session.cpp +) +install( + TARGETS MistSession + DESTINATION bin +) +target_link_libraries(MistSession mist) + + add_executable(MistProcFFMPEG ${BINARY_DIR}/mist/.headers src/process/process_ffmpeg.cpp diff --git a/lib/comms.cpp b/lib/comms.cpp index 108c686a..85f1e6d7 100644 --- a/lib/comms.cpp +++ b/lib/comms.cpp @@ -7,6 +7,7 @@ #include "timing.h" #include #include +#include "config.h" namespace Comms{ Comms::Comms(){ @@ -141,200 +142,45 @@ namespace Comms{ } } - Statistics::Statistics() : Comms(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);} + Sessions::Sessions() : Connections(){sem.open(SEM_STATISTICS, O_CREAT | O_RDWR, ACCESSPERMS, 1);} - void Statistics::unload(){ - if (index != INVALID_RECORD_INDEX){ - setStatus(COMM_STATUS_DISCONNECT | getStatus()); - } - index = INVALID_RECORD_INDEX; - } - - void Statistics::reload(bool _master, bool reIssue){ + void Sessions::reload(bool _master, bool reIssue){ Comms::reload(COMMS_STATISTICS, COMMS_STATISTICS_INITSIZE, _master, reIssue); } - void Statistics::addFields(){ - Comms::addFields(); - dataAccX.addField("sync", RAX_UINT); - dataAccX.addField("now", RAX_64UINT); - dataAccX.addField("time", RAX_64UINT); - dataAccX.addField("lastsecond", RAX_64UINT); - dataAccX.addField("down", RAX_64UINT); - dataAccX.addField("up", RAX_64UINT); - dataAccX.addField("host", RAX_RAW, 16); - dataAccX.addField("stream", RAX_STRING, 100); - dataAccX.addField("connector", RAX_STRING, 20); - dataAccX.addField("crc", RAX_32UINT); - dataAccX.addField("pktcount", RAX_64UINT); - dataAccX.addField("pktloss", RAX_64UINT); - dataAccX.addField("pktretrans", RAX_64UINT); - } - - void Statistics::nullFields(){ - Comms::nullFields(); - setCRC(0); - setConnector(""); - setStream(""); - setHost(""); - setUp(0); - setDown(0); - setLastSecond(0); - setTime(0); - setNow(0); - setSync(0); - setPacketCount(0); - setPacketLostCount(0); - setPacketRetransmitCount(0); - } - - void Statistics::fieldAccess(){ - Comms::fieldAccess(); - sync = dataAccX.getFieldAccX("sync"); - now = dataAccX.getFieldAccX("now"); - time = dataAccX.getFieldAccX("time"); - lastSecond = dataAccX.getFieldAccX("lastsecond"); - down = dataAccX.getFieldAccX("down"); - up = dataAccX.getFieldAccX("up"); - host = dataAccX.getFieldAccX("host"); - stream = dataAccX.getFieldAccX("stream"); - connector = dataAccX.getFieldAccX("connector"); - crc = dataAccX.getFieldAccX("crc"); - pktcount = dataAccX.getFieldAccX("pktcount"); - pktloss = dataAccX.getFieldAccX("pktloss"); - pktretrans = dataAccX.getFieldAccX("pktretrans"); - } - - uint8_t Statistics::getSync() const{return sync.uint(index);} - uint8_t Statistics::getSync(size_t idx) const{return (master ? sync.uint(idx) : 0);} - void Statistics::setSync(uint8_t _sync){sync.set(_sync, index);} - void Statistics::setSync(uint8_t _sync, size_t idx){ + std::string Sessions::getSessId() const{return sessId.string(index);} + std::string Sessions::getSessId(size_t idx) const{return (master ? sessId.string(idx) : 0);} + void Sessions::setSessId(std::string _sid){sessId.set(_sid, index);} + void Sessions::setSessId(std::string _sid, size_t idx){ if (!master){return;} - sync.set(_sync, idx); + sessId.set(_sid, idx); } - uint64_t Statistics::getNow() const{return now.uint(index);} - uint64_t Statistics::getNow(size_t idx) const{return (master ? now.uint(idx) : 0);} - void Statistics::setNow(uint64_t _now){now.set(_now, index);} - void Statistics::setNow(uint64_t _now, size_t idx){ - if (!master){return;} - now.set(_now, idx); + bool Sessions::sessIdExists(std::string _sid){ + for (size_t i = 0; i < recordCount(); i++){ + if (getStatus(i) == COMM_STATUS_INVALID || (getStatus(i) & COMM_STATUS_DISCONNECT)){continue;} + if (getSessId(i) == _sid){ + if (Util::Procs::isRunning(getPid(i))){ + return true; + } + } + } + return false; } - uint64_t Statistics::getTime() const{return time.uint(index);} - uint64_t Statistics::getTime(size_t idx) const{return (master ? time.uint(idx) : 0);} - void Statistics::setTime(uint64_t _time){time.set(_time, index);} - void Statistics::setTime(uint64_t _time, size_t idx){ - if (!master){return;} - time.set(_time, idx); + void Sessions::addFields(){ + Connections::addFields(); + dataAccX.addField("sessid", RAX_STRING, 80); } - uint64_t Statistics::getLastSecond() const{return lastSecond.uint(index);} - uint64_t Statistics::getLastSecond(size_t idx) const{ - return (master ? lastSecond.uint(idx) : 0); - } - void Statistics::setLastSecond(uint64_t _lastSecond){lastSecond.set(_lastSecond, index);} - void Statistics::setLastSecond(uint64_t _lastSecond, size_t idx){ - if (!master){return;} - lastSecond.set(_lastSecond, idx); + void Sessions::nullFields(){ + Connections::nullFields(); + setSessId(""); } - uint64_t Statistics::getDown() const{return down.uint(index);} - uint64_t Statistics::getDown(size_t idx) const{return (master ? down.uint(idx) : 0);} - void Statistics::setDown(uint64_t _down){down.set(_down, index);} - void Statistics::setDown(uint64_t _down, size_t idx){ - if (!master){return;} - down.set(_down, idx); - } - - uint64_t Statistics::getUp() const{return up.uint(index);} - uint64_t Statistics::getUp(size_t idx) const{return (master ? up.uint(idx) : 0);} - void Statistics::setUp(uint64_t _up){up.set(_up, index);} - void Statistics::setUp(uint64_t _up, size_t idx){ - if (!master){return;} - up.set(_up, idx); - } - - std::string Statistics::getHost() const{return std::string(host.ptr(index), 16);} - std::string Statistics::getHost(size_t idx) const{ - if (!master){return std::string((size_t)16, (char)'\000');} - return std::string(host.ptr(idx), 16); - } - void Statistics::setHost(std::string _host){host.set(_host, index);} - void Statistics::setHost(std::string _host, size_t idx){ - if (!master){return;} - host.set(_host, idx); - } - - std::string Statistics::getStream() const{return stream.string(index);} - std::string Statistics::getStream(size_t idx) const{return (master ? stream.string(idx) : "");} - void Statistics::setStream(std::string _stream){stream.set(_stream, index);} - void Statistics::setStream(std::string _stream, size_t idx){ - if (!master){return;} - stream.set(_stream, idx); - } - - std::string Statistics::getConnector() const{return connector.string(index);} - std::string Statistics::getConnector(size_t idx) const{ - return (master ? connector.string(idx) : ""); - } - void Statistics::setConnector(std::string _connector){connector.set(_connector, index);} - void Statistics::setConnector(std::string _connector, size_t idx){ - if (!master){return;} - connector.set(_connector, idx); - } - - uint32_t Statistics::getCRC() const{return crc.uint(index);} - uint32_t Statistics::getCRC(size_t idx) const{return (master ? crc.uint(idx) : 0);} - void Statistics::setCRC(uint32_t _crc){crc.set(_crc, index);} - void Statistics::setCRC(uint32_t _crc, size_t idx){ - if (!master){return;} - crc.set(_crc, idx); - } - - uint64_t Statistics::getPacketCount() const{return pktcount.uint(index);} - uint64_t Statistics::getPacketCount(size_t idx) const{ - return (master ? pktcount.uint(idx) : 0); - } - void Statistics::setPacketCount(uint64_t _count){pktcount.set(_count, index);} - void Statistics::setPacketCount(uint64_t _count, size_t idx){ - if (!master){return;} - pktcount.set(_count, idx); - } - - uint64_t Statistics::getPacketLostCount() const{return pktloss.uint(index);} - uint64_t Statistics::getPacketLostCount(size_t idx) const{ - return (master ? pktloss.uint(idx) : 0); - } - void Statistics::setPacketLostCount(uint64_t _lost){pktloss.set(_lost, index);} - void Statistics::setPacketLostCount(uint64_t _lost, size_t idx){ - if (!master){return;} - pktloss.set(_lost, idx); - } - - uint64_t Statistics::getPacketRetransmitCount() const{return pktretrans.uint(index);} - uint64_t Statistics::getPacketRetransmitCount(size_t idx) const{ - return (master ? pktretrans.uint(idx) : 0); - } - void Statistics::setPacketRetransmitCount(uint64_t _retrans){pktretrans.set(_retrans, index);} - void Statistics::setPacketRetransmitCount(uint64_t _retrans, size_t idx){ - if (!master){return;} - pktretrans.set(_retrans, idx); - } - - std::string Statistics::getSessId() const{return getSessId(index);} - - std::string Statistics::getSessId(size_t idx) const{ - char res[140]; - memset(res, 0, 140); - std::string tmp = host.string(idx); - memcpy(res, tmp.c_str(), (tmp.size() > 16 ? 16 : tmp.size())); - tmp = stream.string(idx); - memcpy(res + 16, tmp.c_str(), (tmp.size() > 100 ? 100 : tmp.size())); - tmp = connector.string(idx); - memcpy(res + 116, tmp.c_str(), (tmp.size() > 20 ? 20 : tmp.size())); - Bit::htobl(res + 136, crc.uint(idx)); - return Secure::md5(res, 140); + void Sessions::fieldAccess(){ + Connections::fieldAccess(); + sessId = dataAccX.getFieldAccX("sessid"); } Users::Users() : Comms(){} @@ -404,4 +250,282 @@ namespace Comms{ if (!master){return;} keyNum.set(_keyNum, idx); } + + /// \brief Claims a spot on the connections page for the input/output which calls this function + /// Starts the MistSession binary for each session, which handles the statistics + /// and the USER_NEW and USER_END triggers + /// \param streamName: Name of the stream the input is providing or an output is making available to viewers + /// \param ip: IP address of the viewer which wants to access streamName. For inputs this value can be set to any value + /// \param sid: Session ID given by the player or randomly generated + /// \param protocol: Protocol currently in use for this connection + /// \param sessionMode: Determines how a viewer session is defined: + // If set to 0, all connections with the same viewer IP and stream name are bundled. + // If set to 1, all connections with the same viewer IP and player ID are bundled. + // If set to 2, all connections with the same player ID and stream name are bundled. + // If set to 3, all connections with the same viewer IP, player ID and stream name are bundled. + /// \param _master: If True, we are reading from this page. If False, we are writing (to our entry) on this page + /// \param reIssue: If True, claim a new entry on this page + void Connections::reload(std::string streamName, std::string ip, std::string sid, std::string protocol, std::string reqUrl, uint64_t sessionMode, bool _master, bool reIssue){ + if (sessionMode == 0xFFFFFFFFFFFFFFFFull){ + FAIL_MSG("The session mode was not initialised properly. Assuming default behaviour of bundling by viewer IP, stream name and player id"); + sessionMode = SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID; + } + // Generate a unique session ID for each viewer, input or output + sessionId = generateSession(streamName, ip, sid, protocol, sessionMode); + if (protocol.size() >= 6 && protocol.substr(0, 6) == "INPUT:"){ + sessionId = "I" + sessionId; + }else if (protocol.size() >= 7 && protocol.substr(0, 7) == "OUTPUT:"){ + sessionId = "O" + sessionId; + } + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, sessionId.c_str()); + // Check if the page exists, if not, spawn new session process + if (!_master){ + dataPage.init(userPageName, 0, false, false); + if (!dataPage){ + pid_t thisPid; + std::deque args; + args.push_back(Util::getMyPath() + "MistSession"); + args.push_back(sessionId); + args.push_back("--sessionmode"); + args.push_back(JSON::Value(sessionMode).asString()); + args.push_back("--streamname"); + args.push_back(streamName); + args.push_back("--ip"); + args.push_back(ip); + args.push_back("--sid"); + args.push_back(sid); + args.push_back("--protocol"); + args.push_back(protocol); + args.push_back("--requrl"); + args.push_back(reqUrl); + int err = fileno(stderr); + thisPid = Util::Procs::StartPiped(args, 0, 0, &err); + Util::Procs::forget(thisPid); + HIGH_MSG("Spawned new session executeable (pid %u) for sessionId '%s', corresponding to host %s and stream %s", thisPid, sessionId.c_str(), ip.c_str(), streamName.c_str()); + } + } + // Open SEM_SESSION + if(!sem){ + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, sessionId.c_str()); + sem.open(semName, O_RDWR, ACCESSPERMS, 1); + } + Comms::reload(userPageName, COMMS_SESSIONS_INITSIZE, _master, reIssue); + VERYHIGH_MSG("Reloading connection. Claimed record %lu", index); + } + + /// \brief Marks the data page as closed, so that we longer write any new data to is + void Connections::setExit(){ + if (!master){return;} + dataAccX.setExit(); + } + + bool Connections::getExit(){ + return dataAccX.isExit(); + } + + void Connections::unload(){ + if (index != INVALID_RECORD_INDEX){ + setStatus(COMM_STATUS_DISCONNECT | getStatus()); + } + index = INVALID_RECORD_INDEX; + } + void Connections::addFields(){ + Comms::addFields(); + dataAccX.addField("now", RAX_64UINT); + dataAccX.addField("time", RAX_64UINT); + dataAccX.addField("lastsecond", RAX_64UINT); + dataAccX.addField("down", RAX_64UINT); + dataAccX.addField("up", RAX_64UINT); + dataAccX.addField("host", RAX_RAW, 16); + dataAccX.addField("stream", RAX_STRING, 100); + dataAccX.addField("connector", RAX_STRING, 20); + dataAccX.addField("tags", RAX_STRING, 512); + dataAccX.addField("pktcount", RAX_64UINT); + dataAccX.addField("pktloss", RAX_64UINT); + dataAccX.addField("pktretrans", RAX_64UINT); + } + + void Connections::nullFields(){ + Comms::nullFields(); + setTags(""); + setConnector(""); + setStream(""); + setHost(""); + setUp(0); + setDown(0); + setLastSecond(0); + setTime(0); + setNow(0); + setPacketCount(0); + setPacketLostCount(0); + setPacketRetransmitCount(0); + } + + void Connections::fieldAccess(){ + Comms::fieldAccess(); + now = dataAccX.getFieldAccX("now"); + time = dataAccX.getFieldAccX("time"); + lastSecond = dataAccX.getFieldAccX("lastsecond"); + down = dataAccX.getFieldAccX("down"); + up = dataAccX.getFieldAccX("up"); + host = dataAccX.getFieldAccX("host"); + stream = dataAccX.getFieldAccX("stream"); + connector = dataAccX.getFieldAccX("connector"); + tags = dataAccX.getFieldAccX("tags"); + pktcount = dataAccX.getFieldAccX("pktcount"); + pktloss = dataAccX.getFieldAccX("pktloss"); + pktretrans = dataAccX.getFieldAccX("pktretrans"); + } + + uint64_t Connections::getNow() const{return now.uint(index);} + uint64_t Connections::getNow(size_t idx) const{return (master ? now.uint(idx) : 0);} + void Connections::setNow(uint64_t _now){now.set(_now, index);} + void Connections::setNow(uint64_t _now, size_t idx){ + if (!master){return;} + now.set(_now, idx); + } + + uint64_t Connections::getTime() const{return time.uint(index);} + uint64_t Connections::getTime(size_t idx) const{return (master ? time.uint(idx) : 0);} + void Connections::setTime(uint64_t _time){time.set(_time, index);} + void Connections::setTime(uint64_t _time, size_t idx){ + if (!master){return;} + time.set(_time, idx); + } + + uint64_t Connections::getLastSecond() const{return lastSecond.uint(index);} + uint64_t Connections::getLastSecond(size_t idx) const{ + return (master ? lastSecond.uint(idx) : 0); + } + void Connections::setLastSecond(uint64_t _lastSecond){lastSecond.set(_lastSecond, index);} + void Connections::setLastSecond(uint64_t _lastSecond, size_t idx){ + if (!master){return;} + lastSecond.set(_lastSecond, idx); + } + + uint64_t Connections::getDown() const{return down.uint(index);} + uint64_t Connections::getDown(size_t idx) const{return (master ? down.uint(idx) : 0);} + void Connections::setDown(uint64_t _down){down.set(_down, index);} + void Connections::setDown(uint64_t _down, size_t idx){ + if (!master){return;} + down.set(_down, idx); + } + + uint64_t Connections::getUp() const{return up.uint(index);} + uint64_t Connections::getUp(size_t idx) const{return (master ? up.uint(idx) : 0);} + void Connections::setUp(uint64_t _up){up.set(_up, index);} + void Connections::setUp(uint64_t _up, size_t idx){ + if (!master){return;} + up.set(_up, idx); + } + + std::string Connections::getHost() const{return std::string(host.ptr(index), 16);} + std::string Connections::getHost(size_t idx) const{ + if (!master){return std::string((size_t)16, (char)'\000');} + return std::string(host.ptr(idx), 16); + } + void Connections::setHost(std::string _host){host.set(_host, index);} + void Connections::setHost(std::string _host, size_t idx){ + if (!master){return;} + host.set(_host, idx); + } + + std::string Connections::getStream() const{return stream.string(index);} + std::string Connections::getStream(size_t idx) const{return (master ? stream.string(idx) : "");} + void Connections::setStream(std::string _stream){stream.set(_stream, index);} + void Connections::setStream(std::string _stream, size_t idx){ + if (!master){return;} + stream.set(_stream, idx); + } + + std::string Connections::getConnector() const{return connector.string(index);} + std::string Connections::getConnector(size_t idx) const{ + return (master ? connector.string(idx) : ""); + } + void Connections::setConnector(std::string _connector){connector.set(_connector, index);} + void Connections::setConnector(std::string _connector, size_t idx){ + if (!master){return;} + connector.set(_connector, idx); + } + + bool Connections::hasConnector(size_t idx, std::string protocol){ + std::stringstream sstream(connector.string(idx)); + std::string _conn; + while (std::getline(sstream, _conn, ',')){ + if (_conn == protocol){ + return true; + } + } + return false; + } + + std::string Connections::getTags() const{return tags.string(index);} + std::string Connections::getTags(size_t idx) const{return (master ? tags.string(idx) : 0);} + void Connections::setTags(std::string _sid){tags.set(_sid, index);} + void Connections::setTags(std::string _sid, size_t idx){ + if (!master){return;} + tags.set(_sid, idx); + } + + uint64_t Connections::getPacketCount() const{return pktcount.uint(index);} + uint64_t Connections::getPacketCount(size_t idx) const{ + return (master ? pktcount.uint(idx) : 0); + } + void Connections::setPacketCount(uint64_t _count){pktcount.set(_count, index);} + void Connections::setPacketCount(uint64_t _count, size_t idx){ + if (!master){return;} + pktcount.set(_count, idx); + } + + uint64_t Connections::getPacketLostCount() const{return pktloss.uint(index);} + uint64_t Connections::getPacketLostCount(size_t idx) const{ + return (master ? pktloss.uint(idx) : 0); + } + void Connections::setPacketLostCount(uint64_t _lost){pktloss.set(_lost, index);} + void Connections::setPacketLostCount(uint64_t _lost, size_t idx){ + if (!master){return;} + pktloss.set(_lost, idx); + } + + uint64_t Connections::getPacketRetransmitCount() const{return pktretrans.uint(index);} + uint64_t Connections::getPacketRetransmitCount(size_t idx) const{ + return (master ? pktretrans.uint(idx) : 0); + } + void Connections::setPacketRetransmitCount(uint64_t _retrans){pktretrans.set(_retrans, index);} + void Connections::setPacketRetransmitCount(uint64_t _retrans, size_t idx){ + if (!master){return;} + pktretrans.set(_retrans, idx); + } + + /// \brief Generates a session ID which is unique per viewer + /// \return generated session ID as string + std::string Connections::generateSession(std::string streamName, std::string ip, std::string sid, std::string connector, uint64_t sessionMode){ + std::string concat; + // First bit defines whether to include stream name + if (sessionMode > 7){ + concat += streamName; + sessionMode -= 8; + } + // Second bit defines whether to include viewer ip + if (sessionMode > 3){ + concat += ip; + sessionMode -= 4; + } + // Third bit defines whether to include player ip + if (sessionMode > 1){ + concat += sid; + sessionMode -= 2; + } + // Fourth bit defines whether to include protocol + if (sessionMode == 1){ + concat += connector; + sessionMode = 0; + } + if (sessionMode > 0){ + WARN_MSG("Could not resolve session mode of value %lu", sessionMode); + } + return Secure::sha256(concat.c_str(), concat.length()); + } }// namespace Comms diff --git a/lib/comms.h b/lib/comms.h index 9f459422..9a5c0ea9 100644 --- a/lib/comms.h +++ b/lib/comms.h @@ -64,21 +64,21 @@ namespace Comms{ Util::FieldAccX pid; }; - class Statistics : public Comms{ + class Connections : public Comms{ public: - Statistics(); - operator bool() const{return dataPage.mapped && (master || index != INVALID_RECORD_INDEX);} + void reload(std::string streamName, std::string ip, std::string sid, std::string protocol, std::string reqUrl, uint64_t sessionMode, bool _master = false, bool reIssue = false); void unload(); - void reload(bool _master = false, bool reIssue = false); + operator bool() const{return dataPage.mapped && (master || index != INVALID_RECORD_INDEX);} + std::string generateSession(std::string streamName, std::string ip, std::string sid, std::string connector, uint64_t sessionMode); + std::string sessionId; + + void setExit(); + bool getExit(); + virtual void addFields(); virtual void nullFields(); virtual void fieldAccess(); - uint8_t getSync() const; - uint8_t getSync(size_t idx) const; - void setSync(uint8_t _sync); - void setSync(uint8_t _sync, size_t idx); - uint64_t getNow() const; uint64_t getNow(size_t idx) const; void setNow(uint64_t _now); @@ -118,11 +118,12 @@ namespace Comms{ std::string getConnector(size_t idx) const; void setConnector(std::string _connector); void setConnector(std::string _connector, size_t idx); + bool hasConnector(size_t idx, std::string protocol); - uint32_t getCRC() const; - uint32_t getCRC(size_t idx) const; - void setCRC(uint32_t _crc); - void setCRC(uint32_t _crc, size_t idx); + std::string getTags() const; + std::string getTags(size_t idx) const; + void setTags(std::string _sid); + void setTags(std::string _sid, size_t idx); uint64_t getPacketCount() const; uint64_t getPacketCount(size_t idx) const; @@ -139,11 +140,7 @@ namespace Comms{ void setPacketRetransmitCount(uint64_t _retransmit); void setPacketRetransmitCount(uint64_t _retransmit, size_t idx); - std::string getSessId() const; - std::string getSessId(size_t index) const; - - private: - Util::FieldAccX sync; + protected: Util::FieldAccX now; Util::FieldAccX time; Util::FieldAccX lastSecond; @@ -152,7 +149,8 @@ namespace Comms{ Util::FieldAccX host; Util::FieldAccX stream; Util::FieldAccX connector; - Util::FieldAccX crc; + Util::FieldAccX sessId; + Util::FieldAccX tags; Util::FieldAccX pktcount; Util::FieldAccX pktloss; Util::FieldAccX pktretrans; @@ -186,4 +184,18 @@ namespace Comms{ Util::FieldAccX track; Util::FieldAccX keyNum; }; + + class Sessions : public Connections{ + public: + Sessions(); + void reload(bool _master = false, bool reIssue = false); + std::string getSessId() const; + std::string getSessId(size_t idx) const; + void setSessId(std::string _sid); + void setSessId(std::string _sid, size_t idx); + bool sessIdExists(std::string _sid); + virtual void addFields(); + virtual void nullFields(); + virtual void fieldAccess(); + }; }// namespace Comms diff --git a/lib/defines.h b/lib/defines.h index 10c4b9b1..203b75db 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -196,11 +196,14 @@ static inline void show_stackframe(){} #define TRACK_PAGE_RECORDSIZE 36 #define COMMS_STATISTICS "MstStat" -#define COMMS_STATISTICS_INITSIZE 8 * 1024 * 1024 +#define COMMS_STATISTICS_INITSIZE 16 * 1024 * 1024 #define COMMS_USERS "MstUser%s" //%s stream name #define COMMS_USERS_INITSIZE 512 * 1024 +#define COMMS_SESSIONS "MstSession%s" +#define COMMS_SESSIONS_INITSIZE 8 * 1024 * 1024 + #define SEM_STATISTICS "/MstStat" #define SEM_USERS "/MstUser%s" //%s stream name @@ -226,7 +229,9 @@ static inline void show_stackframe(){} #define SEM_LIVE "/MstLIVE%s" //%s stream name #define SEM_INPUT "/MstInpt%s" //%s stream name #define SEM_TRACKLIST "/MstTRKS%s" //%s stream name +#define SEM_SESSION "MstSess%s" #define SEM_SESSCACHE "/MstSessCacheLock" +#define SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID 14 #define SHM_CAPA "MstCapa" #define SHM_PROTO "MstProt" #define SHM_PROXY "MstProx" diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index a0ff2ee8..9e17ca6b 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -306,7 +306,6 @@ int main_loop(int argc, char **argv){ if (Controller::Storage["config"].isMember("accesslog")){ Controller::conf.getOption("accesslog", true)[0u] = Controller::Storage["config"]["accesslog"]; } - Controller::maxConnsPerIP = Controller::conf.getInteger("maxconnsperip"); Controller::Storage["config"]["prometheus"] = Controller::conf.getString("prometheus"); Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog"); Controller::normalizeTrustedProxies(Controller::Storage["config"]["trustedproxy"]); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 4d94bb11..02491065 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -594,6 +594,7 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ out["prometheus"] = in["prometheus"]; Controller::prometheus = out["prometheus"].asStringRef(); } + if (in.isMember("sessionMode")){out["sessionMode"] = in["sessionMode"];} if (in.isMember("defaultStream")){out["defaultStream"] = in["defaultStream"];} if (in.isMember("location") && in["location"].isObject()){ out["location"]["lat"] = in["location"]["lat"].asDouble(); diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index c5b7f643..6e51f4ee 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -15,6 +15,7 @@ #include #include //for fstatvfs #include +#include #ifndef KILL_ON_EXIT #define KILL_ON_EXIT false @@ -30,7 +31,6 @@ #define STAT_CLI_UP 64 #define STAT_CLI_BPS_DOWN 128 #define STAT_CLI_BPS_UP 256 -#define STAT_CLI_CRC 512 #define STAT_CLI_SESSID 1024 #define STAT_CLI_PKTCOUNT 2048 #define STAT_CLI_PKTLOST 4096 @@ -46,25 +46,73 @@ #define STAT_TOT_PERCRETRANS 64 #define STAT_TOT_ALL 0xFF -#define COUNTABLE_BYTES 128 * 1024 - -std::map Controller::sessions; ///< list of sessions that have statistics data available -std::map Controller::connToSession; ///< Map of socket IDs to session info. +// Mapping of sessId -> session statistics +std::map sessions; std::map Controller::triggerStats; ///< Holds prometheus stats for trigger executions bool Controller::killOnExit = KILL_ON_EXIT; tthread::mutex Controller::statsMutex; -unsigned int Controller::maxConnsPerIP = 0; uint64_t Controller::statDropoff = 0; static uint64_t cpu_use = 0; char noBWCountMatches[1717]; uint64_t bwLimit = 128 * 1024 * 1024; // gigabit default limit -/// Session cache shared memory page -IPC::sharedPage *shmSessions = 0; -/// Lock for the session cache shared memory page -IPC::semaphore *cacheLock = 0; + +// For server-wide totals. Local to this file only. +struct streamTotals{ + uint64_t upBytes; + uint64_t downBytes; + uint64_t inputs; + uint64_t outputs; + uint64_t viewers; + uint64_t currIns; + uint64_t currOuts; + uint64_t currViews; + uint8_t status; + uint64_t viewSeconds; + uint64_t packSent; + uint64_t packLoss; + uint64_t packRetrans; +}; + +Comms::Sessions statComm; +bool statCommActive = false; +// Global server wide statistics +static uint64_t servUpBytes = 0; +static uint64_t servDownBytes = 0; +static uint64_t servUpOtherBytes = 0; +static uint64_t servDownOtherBytes = 0; +static uint64_t servInputs = 0; +static uint64_t servOutputs = 0; +static uint64_t servViewers = 0; +static uint64_t servSeconds = 0; +static uint64_t servPackSent = 0; +static uint64_t servPackLoss = 0; +static uint64_t servPackRetrans = 0; +// Total time watched for all sessions which are no longer active +static uint64_t viewSecondsTotal = 0; +// Mapping of streamName -> summary of stream-wide statistics +static std::map streamStats; + +// If sessId does not exist yet in streamStats, create and init an entry for it +static void createEmptyStatsIfNeeded(const std::string & sessId){ + if (streamStats.count(sessId)){return;} + streamTotals & sT = streamStats[sessId]; + sT.upBytes = 0; + sT.downBytes = 0; + sT.inputs = 0; + sT.outputs = 0; + sT.viewers = 0; + sT.currIns = 0; + sT.currOuts = 0; + sT.currViews = 0; + sT.status = 0; + sT.viewSeconds = 0; + sT.packSent = 0; + sT.packLoss = 0; + sT.packRetrans = 0; +} /// Convert bandwidth config into memory format void Controller::updateBandwidthConfig(){ @@ -100,106 +148,6 @@ void Controller::updateBandwidthConfig(){ } } -// For server-wide totals. Local to this file only. -struct streamTotals{ - uint64_t upBytes; - uint64_t downBytes; - uint64_t inputs; - uint64_t outputs; - uint64_t viewers; - uint64_t currIns; - uint64_t currOuts; - uint64_t currViews; - uint8_t status; - uint64_t viewSeconds; - uint64_t packSent; - uint64_t packLoss; - uint64_t packRetrans; -}; -static std::map streamStats; - -static void createEmptyStatsIfNeeded(const std::string & strm){ - if (streamStats.count(strm)){return;} - streamTotals & sT = streamStats[strm]; - sT.upBytes = 0; - sT.downBytes = 0; - sT.inputs = 0; - sT.outputs = 0; - sT.viewers = 0; - sT.currIns = 0; - sT.currOuts = 0; - sT.currViews = 0; - sT.status = 0; - sT.viewSeconds = 0; - sT.packSent = 0; - sT.packLoss = 0; - sT.packRetrans = 0; -} - - -static uint64_t servUpBytes = 0; -static uint64_t servDownBytes = 0; -static uint64_t servUpOtherBytes = 0; -static uint64_t servDownOtherBytes = 0; -static uint64_t servInputs = 0; -static uint64_t servOutputs = 0; -static uint64_t servViewers = 0; -static uint64_t servSeconds = 0; -static uint64_t servPackSent = 0; -static uint64_t servPackLoss = 0; -static uint64_t servPackRetrans = 0; - -Controller::sessIndex::sessIndex(){ - crc = 0; -} - -/// Initializes a sessIndex from a statistics object + index, converting binary format IP addresses -/// into strings. This extracts the host, stream name, connector and crc field, ignoring everything -/// else. -Controller::sessIndex::sessIndex(const Comms::Statistics &statComm, size_t id){ - Socket::hostBytesToStr(statComm.getHost(id).data(), 16, host); - streamName = statComm.getStream(id); - connector = statComm.getConnector(id); - crc = statComm.getCRC(id); - ID = statComm.getSessId(id); -} - -std::string Controller::sessIndex::toStr(){ - std::stringstream s; - s << ID << "(" << host << " " << crc << " " << streamName << " " << connector << ")"; - return s.str(); -} - -bool Controller::sessIndex::operator==(const Controller::sessIndex &b) const{ - return (host == b.host && crc == b.crc && streamName == b.streamName && connector == b.connector); -} - -bool Controller::sessIndex::operator!=(const Controller::sessIndex &b) const{ - return !(*this == b); -} - -bool Controller::sessIndex::operator>(const Controller::sessIndex &b) const{ - return host > b.host || - (host == b.host && - (crc > b.crc || (crc == b.crc && (streamName > b.streamName || - (streamName == b.streamName && connector > b.connector))))); -} - -bool Controller::sessIndex::operator<(const Controller::sessIndex &b) const{ - return host < b.host || - (host == b.host && - (crc < b.crc || (crc == b.crc && (streamName < b.streamName || - (streamName == b.streamName && connector < b.connector))))); -} - -bool Controller::sessIndex::operator<=(const Controller::sessIndex &b) const{ - return !(*this > b); -} - -bool Controller::sessIndex::operator>=(const Controller::sessIndex &b) const{ - return !(*this < b); -} - /// This function is ran whenever a stream becomes active. void Controller::streamStarted(std::string stream){ INFO_MSG("Stream %s became active", stream.c_str()); @@ -211,9 +159,6 @@ void Controller::streamStopped(std::string stream){ INFO_MSG("Stream %s became inactive", stream.c_str()); } -Comms::Statistics statComm; -bool statCommActive = false; - /// Invalidates all current sessions for the given streamname /// Updates the session cache, afterwards. void Controller::sessions_invalidate(const std::string &streamname){ @@ -221,18 +166,17 @@ void Controller::sessions_invalidate(const std::string &streamname){ FAIL_MSG("In shutdown procedure - cannot invalidate sessions."); return; } - unsigned int invalidated = 0; unsigned int sessCount = 0; - tthread::lock_guard guard(statsMutex); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if (it->first.streamName == streamname){ + // Find all matching streams in statComm + for (size_t i = 0; i < statComm.recordCount(); i++){ + if (statComm.getStatus(i) == COMM_STATUS_INVALID || (statComm.getStatus(i) & COMM_STATUS_DISCONNECT)){continue;} + if (statComm.getStream(i) == streamname){ sessCount++; - invalidated += it->second.invalidate(); + // Re-trigger USER_NEW trigger for this session + kill(statComm.getPid(i), SIGUSR1); } } - Controller::writeSessionCache(); - INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, - streamname.c_str()); + INFO_MSG("Invalidated %u session(s) for stream %s", sessCount, streamname.c_str()); } /// Shuts down all current sessions for the given streamname @@ -256,18 +200,8 @@ void Controller::sessId_shutdown(const std::string &sessId){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } - unsigned int murdered = 0; - unsigned int sessCount = 0; - tthread::lock_guard guard(statsMutex); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if (it->first.ID == sessId){ - sessCount++; - murdered += it->second.kill(); - break; - } - } - Controller::writeSessionCache(); - INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str()); + killConnections(sessId); + INFO_MSG("Shut down session with session ID %s", sessId.c_str()); } /// Tags the given session @@ -277,8 +211,8 @@ void Controller::sessId_tag(const std::string &sessId, const std::string &tag){ return; } tthread::lock_guard guard(statsMutex); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if (it->first.ID == sessId){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->first == sessId){ it->second.tags.insert(tag); return; } @@ -295,17 +229,15 @@ void Controller::tag_shutdown(const std::string &tag){ FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } - unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.tags.count(tag)){ sessCount++; - murdered += it->second.kill(); + killConnections(it->first); } } - Controller::writeSessionCache(); - INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str()); + INFO_MSG("Shut down %u session(s) for tag %s", sessCount, tag.c_str()); } /// Shuts down all current sessions for the given streamname @@ -315,48 +247,23 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions."); return; } - unsigned int murdered = 0; unsigned int sessCount = 0; tthread::lock_guard guard(statsMutex); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if ((!streamname.size() || it->first.streamName == streamname) && - (!protocol.size() || it->first.connector == protocol)){ + // Find all matching streams in statComm and get their sessId + for (size_t i = 0; i < statComm.recordCount(); i++){ + if (statComm.getStatus(i) == COMM_STATUS_INVALID || (statComm.getStatus(i) & COMM_STATUS_DISCONNECT)){continue;} + if ((!streamname.size() || statComm.getStream(i) == streamname) && + (!protocol.size() || statComm.hasConnector(i, protocol))){ + uint32_t pid = statComm.getPid(i); sessCount++; - murdered += it->second.kill(); - } - } - Controller::writeSessionCache(); - INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, - streamname.c_str(), protocol.c_str()); -} - -/// Writes the session cache to shared memory. -/// Assumes the config mutex, stats mutex and session cache semaphore are already locked. -/// Does nothing if the session cache could not be initialized on the first try -/// Does no error checking after first open attempt (fails silently)! -void Controller::writeSessionCache(){ - uint32_t shmOffset = 0; - if (shmSessions && shmSessions->mapped){ - if (cacheLock){cacheLock->wait(16);} - if (sessions.size()){ - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if (it->second.hasData()){ - // store an entry in the shmSessions page, if it fits - if (it->second.sync > 2 && shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ - *((uint32_t *)(shmSessions->mapped + shmOffset)) = it->first.crc; - strncpy(shmSessions->mapped + shmOffset + 4, it->first.streamName.c_str(), 100); - strncpy(shmSessions->mapped + shmOffset + 104, it->first.connector.c_str(), 20); - strncpy(shmSessions->mapped + shmOffset + 124, it->first.host.c_str(), 40); - shmSessions->mapped[shmOffset + 164] = it->second.sync; - shmOffset += SHM_SESSIONS_ITEM; - } - } + if (pid > 1){ + Util::Procs::Stop(pid); + INFO_MSG("Killing PID %" PRIu32, pid); } } - // set a final shmSessions entry to all zeroes - memset(shmSessions->mapped + shmOffset, 0, SHM_SESSIONS_ITEM); - if (cacheLock){cacheLock->post(16);} } + INFO_MSG("Shut down %u sessions for stream %s/%s", sessCount, + streamname.c_str(), protocol.c_str()); } /// This function runs as a thread and roughly once per second retrieves @@ -366,14 +273,6 @@ void Controller::SharedMemStats(void *config){ HIGH_MSG("Starting stats thread"); statComm.reload(true); statCommActive = true; - shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); - if (!shmSessions || !shmSessions->mapped){ - if (shmSessions){delete shmSessions;} - shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, true); - } - cacheLock = new IPC::semaphore(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 16); - cacheLock->unlink(); - cacheLock->open(SEM_SESSCACHE, O_CREAT | O_RDWR, ACCESSPERMS, 16); std::set inactiveStreams; Controller::initState(); bool shiftWrites = true; @@ -401,7 +300,6 @@ void Controller::SharedMemStats(void *config){ } } { - tthread::lock_guard guard(Controller::configMutex); tthread::lock_guard guard2(statsMutex); // parse current users @@ -429,29 +327,47 @@ void Controller::SharedMemStats(void *config){ it->second.packRetrans = 0; } } - // wipe old statistics + unsigned int tOut = Util::bootSecs() - STATS_DELAY; + unsigned int tIn = Util::bootSecs() - STATS_INPUT_DELAY; + if (streamStats.size()){ + for (std::map::iterator it = streamStats.begin(); + it != streamStats.end(); ++it){ + it->second.currViews = 0; + it->second.currIns = 0; + it->second.currOuts = 0; + } + } + // wipe old statistics and set session type counters if (sessions.size()){ - std::list mustWipe; + std::list mustWipe; uint64_t cutOffPoint = Util::bootSecs() - STAT_CUTOFF; - uint64_t disconnectPointIn = Util::bootSecs() - STATS_INPUT_DELAY; - uint64_t disconnectPointOut = Util::bootSecs() - STATS_DELAY; - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - uint64_t dPoint = it->second.getSessType() == SESS_INPUT ? disconnectPointIn : disconnectPointOut; - if (it->second.sync == 100){ - // Denied entries are connection-entry-wiped as soon as they become boring - it->second.wipeOld(dPoint); - }else{ - // Normal entries are summarized after STAT_CUTOFF seconds - it->second.wipeOld(cutOffPoint); - } + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ // This part handles ending sessions, keeping them in cache for now - if (it->second.isTracked() && !it->second.isConnected() && it->second.getEnd() < dPoint){ - it->second.dropSession(it->first); - } - // This part handles wiping from the session cache - if (!it->second.hasData()){ - it->second.dropSession(it->first); // End the session, just in case it wasn't yet + if (it->second.getEnd() < cutOffPoint && it->second.newestDataPoint() < cutOffPoint){ + viewSecondsTotal += it->second.getConnTime(); mustWipe.push_back(it->first); + // Don't count this session as a viewer + continue; + } + // Recount input, output and viewer type sessions + switch (it->second.getSessType()){ + case SESS_UNSET: break; + case SESS_VIEWER: + if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ + streamStats[it->first].currViews++; + } + servSeconds += it->second.getConnTime(); + break; + case SESS_INPUT: + if (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn)){ + streamStats[it->first].currIns++; + } + break; + case SESS_OUTPUT: + if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ + streamStats[it->first].currOuts++; + } + break; } } while (mustWipe.size()){ @@ -513,7 +429,6 @@ void Controller::SharedMemStats(void *config){ shiftWrites = true; } /*LTS-START*/ - Controller::writeSessionCache(); Controller::checkServerLimits(); /*LTS-END*/ } @@ -523,7 +438,6 @@ void Controller::SharedMemStats(void *config){ HIGH_MSG("Stopping stats thread"); if (Util::Config::is_restarting){ statComm.setMaster(false); - shmSessions->master = false; }else{/*LTS-START*/ if (Controller::killOnExit){ WARN_MSG("Killing all connected clients to force full shutdown"); @@ -532,10 +446,6 @@ void Controller::SharedMemStats(void *config){ /*LTS-END*/ } Controller::deinitState(Util::Config::is_restarting); - delete shmSessions; - shmSessions = 0; - delete cacheLock; - cacheLock = 0; } /// Gets a complete list of all streams currently in active state, with optional prefix matching @@ -559,97 +469,53 @@ std::set Controller::getActiveStreams(const std::string &prefix){ return ret; } -/// Forces a re-sync of the session +/// Kills all connection of a given session /// Assumes the session cache will be updated separately - may not work correctly if this is forgotten! -uint32_t Controller::statSession::invalidate(){ - uint32_t ret = 0; - sync = 1; - if (curConns.size() && statCommActive){ - for (std::map::iterator jt = curConns.begin(); jt != curConns.end(); ++jt){ - if (statComm.getStatus(jt->first) != COMM_STATUS_INVALID){ - statComm.setSync(2, jt->first); - ret++; - } - } - } - return ret; -} - -/// Kills all active connections, sets the session state to denied (sync=100). -/// Assumes the session cache will be updated separately - may not work correctly if this is forgotten! -uint32_t Controller::statSession::kill(){ - uint32_t ret = 0; - sync = 100; - if (curConns.size() && statCommActive){ - for (std::map::iterator jt = curConns.begin(); jt != curConns.end(); ++jt){ - if (statComm.getStatus(jt->first) != COMM_STATUS_INVALID){ - statComm.setSync(100, jt->first); - uint32_t pid = statComm.getPid(jt->first); +void Controller::killConnections(std::string sessId){ + if (statCommActive){ + // Find a matching stream in statComm with a matching sessID and kill it + for (size_t i = 0; i < statComm.recordCount(); i++){ + if (statComm.getStatus(i) == COMM_STATUS_INVALID || (statComm.getStatus(i) & COMM_STATUS_DISCONNECT)){continue;} + if (statComm.getSessId(i) == sessId){ + uint32_t pid = statComm.getPid(i); if (pid > 1){ Util::Procs::Stop(pid); INFO_MSG("Killing PID %" PRIu32, pid); } - ret++; } } } - return ret; } /// Updates the given active connection with new stats data. -void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm){ - std::string myHost; - Socket::hostBytesToStr(statComm.getHost(index).data(), 16, myHost); - std::string myStream = statComm.getStream(index); - std::string myConnector = statComm.getConnector(index); - // update the sync byte: 0 = requesting fill, 2 = requesting refill, 1 = needs checking, > 2 = - // state known (100=denied, 10=accepted) - if (!statComm.getSync(index)){ - sessIndex tmpidx(statComm, index); - // if we have a maximum connection count per IP, enforce it - if (maxConnsPerIP && !statComm.getSync(index)){ - unsigned int currConns = 1; - long long shortly = Util::bootSecs(); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - - if (&it->second != this && it->first.host == myHost && - (it->second.hasDataFor(shortly - STATS_DELAY) || it->second.hasDataFor(shortly) || - it->second.hasDataFor(shortly - 1) || it->second.hasDataFor(shortly - 2) || - it->second.hasDataFor(shortly - 3) || it->second.hasDataFor(shortly - 4) || - it->second.hasDataFor(shortly - 5)) && - ++currConns > maxConnsPerIP){ - break; - } - } - if (currConns > maxConnsPerIP){ - WARN_MSG("Disconnecting session from %s: exceeds max connection count of %u", myHost.c_str(), maxConnsPerIP); - statComm.setSync(100, index); - } - } - if (statComm.getSync(index) != 100){ - // only set the sync if this is the first connection in the list - // we also catch the case that there are no connections, which is an error-state - if (!sessions[tmpidx].curConns.size() || sessions[tmpidx].curConns.begin()->first == index){ - MEDIUM_MSG("Requesting sync to %u for %s, %s, %s, %" PRIu32, sync, myStream.c_str(), - myConnector.c_str(), myHost.c_str(), statComm.getCRC(index) & 0xFFFFFFFFu); - statComm.setSync(sync, index); - } - // and, always set the sync if it is > 2 - if (sync > 2){ - MEDIUM_MSG("Setting sync to %u for %s, %s, %s, %" PRIu32, sync, myStream.c_str(), - myConnector.c_str(), myHost.c_str(), statComm.getCRC(index) & 0xFFFFFFFFu); - statComm.setSync(sync, index); - } - } - }else{ - if (sync < 2 && statComm.getSync(index) > 2){sync = statComm.getSync(index);} +void Controller::statSession::update(uint64_t index, Comms::Sessions &statComm){ + if (host == ""){ + Socket::hostBytesToStr(statComm.getHost(index).data(), 16, host); } + if (streamName == ""){ + streamName = statComm.getStream(index); + } + if (curConnector == ""){ + curConnector = statComm.getConnector(index); + } + if (sessId == ""){ + sessId = statComm.getSessId(index); + } + // Export tags to session + if (tags.size()){ + std::stringstream tagStream; + for (std::set::iterator it = tags.begin(); it != tags.end(); ++it){ + tagStream << "[" << *it << "]"; + } + statComm.setTags(tagStream.str(), index); + } + long long prevDown = getDown(); long long prevUp = getUp(); uint64_t prevPktSent = getPktCount(); uint64_t prevPktLost = getPktLost(); uint64_t prevPktRetrans = getPktRetransmit(); - curConns[index].update(statComm, index); + curData.update(statComm, index); // store timestamp of first received data, if older if (firstSec > statComm.getNow(index)){firstSec = statComm.getNow(index);} uint64_t secIncr = 0; @@ -671,7 +537,7 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm uint64_t currPktRetrans = getPktRetransmit(); if (currUp - prevUp < 0 || currDown - prevDown < 0){ INFO_MSG("Negative data usage! %lldu/%lldd (u%lld->%lld) in %s over %s, #%" PRIu64, currUp - prevUp, - currDown - prevDown, prevUp, currUp, myStream.c_str(), myConnector.c_str(), index); + currDown - prevDown, prevUp, currUp, streamName.c_str(), curConnector.c_str(), index); }else{ if (!noBWCount){ size_t bwMatchOffset = 0; @@ -701,56 +567,39 @@ void Controller::statSession::update(uint64_t index, Comms::Statistics &statComm servPackRetrans += currPktRetrans - prevPktRetrans; } } - if (currDown + currUp >= COUNTABLE_BYTES){ - if (sessionType == SESS_UNSET){ - if (myConnector.size() >= 5 && myConnector.substr(0, 5) == "INPUT"){ - ++servInputs; - createEmptyStatsIfNeeded(myStream); - streamStats[myStream].inputs++; - streamStats[myStream].currIns++; - sessionType = SESS_INPUT; - }else if (myConnector.size() >= 6 && myConnector.substr(0, 6) == "OUTPUT"){ - ++servOutputs; - createEmptyStatsIfNeeded(myStream); - streamStats[myStream].outputs++; - streamStats[myStream].currOuts++; - sessionType = SESS_OUTPUT; - }else{ - ++servViewers; - createEmptyStatsIfNeeded(myStream); - streamStats[myStream].viewers++; - streamStats[myStream].currViews++; - sessionType = SESS_VIEWER; - } + if (sessionType == SESS_UNSET){ + if (curConnector.size() >= 5 && curConnector.substr(0, 5) == "INPUT"){ + ++servInputs; + createEmptyStatsIfNeeded(streamName); + streamStats[streamName].inputs++; + streamStats[streamName].currIns++; + sessionType = SESS_INPUT; + }else if (curConnector.size() >= 6 && curConnector.substr(0, 6) == "OUTPUT"){ + ++servOutputs; + createEmptyStatsIfNeeded(streamName); + streamStats[streamName].outputs++; + streamStats[streamName].currOuts++; + sessionType = SESS_OUTPUT; + }else{ + ++servViewers; + createEmptyStatsIfNeeded(streamName); + streamStats[streamName].viewers++; + streamStats[streamName].currViews++; + sessionType = SESS_VIEWER; } - // If previous < COUNTABLE_BYTES, we haven't counted any data so far. - // We need to count all the data in that case, otherwise we only count the difference. - if (noBWCount != 2){ //only count connections that are countable - if (prevUp + prevDown < COUNTABLE_BYTES){ - if (!myStream.size() || myStream[0] == 0){ - if (streamStats.count(myStream)){streamStats.erase(myStream);} - }else{ - createEmptyStatsIfNeeded(myStream); - streamStats[myStream].upBytes += currUp; - streamStats[myStream].downBytes += currDown; - streamStats[myStream].packSent += currPktSent; - streamStats[myStream].packLoss += currPktLost; - streamStats[myStream].packRetrans += currPktRetrans; - if (sessionType == SESS_VIEWER){streamStats[myStream].viewSeconds += lastSec - firstSec;} - } - }else{ - if (!myStream.size() || myStream[0] == 0){ - if (streamStats.count(myStream)){streamStats.erase(myStream);} - }else{ - createEmptyStatsIfNeeded(myStream); - streamStats[myStream].upBytes += currUp - prevUp; - streamStats[myStream].downBytes += currDown - prevDown; - streamStats[myStream].packSent += currPktSent - prevPktSent; - streamStats[myStream].packLoss += currPktLost - prevPktLost; - streamStats[myStream].packRetrans += currPktRetrans - prevPktRetrans; - if (sessionType == SESS_VIEWER){streamStats[myStream].viewSeconds += secIncr;} - } - } + } + // Only count connections that are countable + if (noBWCount != 2){ + if (!streamName.size() || streamName[0] == 0){ + if (streamStats.count(streamName)){streamStats.erase(streamName);} + }else{ + createEmptyStatsIfNeeded(streamName); + streamStats[streamName].upBytes += currUp - prevUp; + streamStats[streamName].downBytes += currDown - prevDown; + streamStats[streamName].packSent += currPktSent - prevPktSent; + streamStats[streamName].packLoss += currPktLost - prevPktLost; + streamStats[streamName].packRetrans += currPktRetrans - prevPktRetrans; + if (sessionType == SESS_VIEWER){streamStats[streamName].viewSeconds += secIncr;} } } } @@ -759,53 +608,19 @@ Controller::sessType Controller::statSession::getSessType(){ return sessionType; } -/// Archives connection log entries older than the given cutOff point. -void Controller::statSession::wipeOld(uint64_t cutOff){ - if (firstSec > cutOff){return;} - firstSec = 0xFFFFFFFFFFFFFFFFull; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - while (it->log.size() && it->log.begin()->first < cutOff){ - if (it->log.size() == 1){ - wipedDown += it->log.begin()->second.down; - wipedUp += it->log.begin()->second.up; - wipedPktCount += it->log.begin()->second.pktCount; - wipedPktLost += it->log.begin()->second.pktLost; - wipedPktRetransmit += it->log.begin()->second.pktRetransmit; - } - it->log.erase(it->log.begin()); - } - if (it->log.size()){ - if (firstSec > it->log.begin()->first){firstSec = it->log.begin()->first;} - } - } - while (oldConns.size() && !oldConns.begin()->log.size()){oldConns.pop_front();} - } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - while (it->second.log.size() > 1 && it->second.log.begin()->first < cutOff){ - it->second.log.erase(it->second.log.begin()); - } - if (it->second.log.size()){ - if (firstSec > it->second.log.begin()->first){firstSec = it->second.log.begin()->first;} - } - } - } -} - -void Controller::statSession::dropSession(const Controller::sessIndex &index){ - if (!tracked || curConns.size()){return;} +Controller::statSession::~statSession(){ + if (!tracked){return;} switch (sessionType){ - case SESS_INPUT: - if (streamStats.count(index.streamName) && streamStats[index.streamName].currIns){streamStats[index.streamName].currIns--;} - break; - case SESS_OUTPUT: - if (streamStats.count(index.streamName) && streamStats[index.streamName].currOuts){streamStats[index.streamName].currOuts--;} - break; - case SESS_VIEWER: - if (streamStats.count(index.streamName) && streamStats[index.streamName].currViews){streamStats[index.streamName].currViews--;} - break; - default: break; + case SESS_INPUT: + if (streamStats.count(streamName) && streamStats[streamName].currIns){streamStats[streamName].currIns--;} + break; + case SESS_OUTPUT: + if (streamStats.count(streamName) && streamStats[streamName].currOuts){streamStats[streamName].currOuts--;} + break; + case SESS_VIEWER: + if (streamStats.count(streamName) && streamStats[streamName].currViews){streamStats[streamName].currViews--;} + break; + default: break; } uint64_t duration = lastSec - firstActive; if (duration < 1){duration = 1;} @@ -815,13 +630,13 @@ void Controller::statSession::dropSession(const Controller::sessIndex &index){ tagStream << "[" << *it << "]"; } } - Controller::logAccess(index.ID, index.streamName, index.connector, index.host, duration, getUp(), + Controller::logAccess(sessId, streamName, curConnector, host, duration, getUp(), getDown(), tagStream.str()); if (Controller::accesslog.size()){ if (Controller::accesslog == "LOG"){ std::stringstream accessStr; - accessStr << "Session <" << index.ID << "> " << index.streamName << " (" << index.connector - << ") from " << index.host << " ended after " << duration << "s, avg " + accessStr << "Session <" << sessId << "> " << streamName << " (" << curConnector + << ") from " << host << " ended after " << duration << "s, avg " << getUp() / duration / 1024 << "KB/s up " << getDown() / duration / 1024 << "KB/s down."; if (tags.size()){accessStr << " Tags: " << tagStream.str();} Controller::Log("ACCS", accessStr.str()); @@ -845,8 +660,8 @@ void Controller::statSession::dropSession(const Controller::sessIndex &index){ time(&rawtime); timeinfo = localtime_r(&rawtime, &tmptime); strftime(buffer, 100, "%F %H:%M:%S", timeinfo); - accLogFile << buffer << ", " << index.ID << ", " << index.streamName << ", " - << index.connector << ", " << index.host << ", " << duration << ", " + accLogFile << buffer << ", " << sessId << ", " << streamName << ", " + << curConnector << ", " << host << ", " << duration << ", " << getUp() / duration / 1024 << ", " << getDown() / duration / 1024 << ", "; if (tags.size()){accLogFile << tagStream.str();} accLogFile << std::endl; @@ -857,77 +672,21 @@ void Controller::statSession::dropSession(const Controller::sessIndex &index){ firstActive = 0; firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; - wipedUp = 0; - wipedDown = 0; - wipedPktCount = 0; - wipedPktLost = 0; - wipedPktRetransmit = 0; - oldConns.clear(); sessionType = SESS_UNSET; } -/// Archives the given connection. -void Controller::statSession::finish(uint64_t index){ - oldConns.push_back(curConns[index]); - curConns.erase(index); -} - /// Constructs an empty session Controller::statSession::statSession(){ firstActive = 0; tracked = false; firstSec = 0xFFFFFFFFFFFFFFFFull; lastSec = 0; - sync = 1; - wipedUp = 0; - wipedDown = 0; - wipedPktCount = 0; - wipedPktLost = 0; - wipedPktRetransmit = 0; sessionType = SESS_UNSET; noBWCount = 0; -} - -/// Moves the given connection to the given session -void Controller::statSession::switchOverTo(statSession &newSess, uint64_t index){ - // add to the given session first - newSess.curConns[index] = curConns[index]; - // if this connection has data, update firstSec/lastSec if needed - if (curConns[index].log.size()){ - if (newSess.firstSec > curConns[index].log.begin()->first){ - newSess.firstSec = curConns[index].log.begin()->first; - } - if (newSess.lastSec < curConns[index].log.rbegin()->first){ - newSess.lastSec = curConns[index].log.rbegin()->first; - } - } - // remove from current session - curConns.erase(index); - // if there was any data, recalculate this session's firstSec and lastSec. - if (newSess.curConns[index].log.size()){ - firstSec = 0xFFFFFFFFFFFFFFFFull; - lastSec = 0; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){ - if (firstSec > it->log.begin()->first){firstSec = it->log.begin()->first;} - if (lastSec < it->log.rbegin()->first){lastSec = it->log.rbegin()->first;} - } - } - } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.log.size()){ - if (firstSec > it->second.log.begin()->first){ - firstSec = it->second.log.begin()->first; - } - if (lastSec < it->second.log.rbegin()->first){ - lastSec = it->second.log.rbegin()->first; - } - } - } - } - } + streamName = ""; + host = ""; + curConnector = ""; + sessId = ""; } /// Returns the first measured timestamp in this session. @@ -944,39 +703,34 @@ uint64_t Controller::statSession::getEnd(){ bool Controller::statSession::hasDataFor(uint64_t t){ if (lastSec < t){return false;} if (firstSec > t){return false;} - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){return true;} - } - } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){return true;} - } - } - return false; -} - -/// Returns true if there is any data for this session. -bool Controller::statSession::hasData(){ - if (!firstSec && !lastSec){return false;} - if (curConns.size()){return true;} - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){return true;} - } - } + if (curData.hasDataFor(t)){return true;} return false; } /// Returns true if this session should count as a viewer on the given timestamp. bool Controller::statSession::isViewerOn(uint64_t t){ - return getUp(t) + getDown(t) > COUNTABLE_BYTES; + return getUp(t) + getDown(t); +} + +std::string Controller::statSession::getStreamName(){ + return streamName; +} + +std::string Controller::statSession::getHost(){ + return host; +} + +std::string Controller::statSession::getSessId(){ + return sessId; +} + +std::string Controller::statSession::getCurrentProtocols(){ + return curConnector; } /// Returns true if this session should be considered connected -bool Controller::statSession::isConnected(){ - return curConns.size(); +uint64_t Controller::statSession::newestDataPoint(){ + return lastSec; } /// Returns true if this session has started (tracked == true) but not yet ended (log entry written) @@ -986,188 +740,103 @@ bool Controller::statSession::isTracked(){ /// Returns the cumulative connected time for this session at timestamp t. uint64_t Controller::statSession::getConnTime(uint64_t t){ - uint64_t retVal = 0; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){retVal += it->getDataFor(t).time;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).time; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).time;} - } + return 0; +} + +/// Returns the cumulative connected time for this session. +uint64_t Controller::statSession::getConnTime(){ + if (curData.log.size()){ + return curData.log.rbegin()->second.time; } - return retVal; + return 0; } /// Returns the last requested media timestamp for this session at timestamp t. uint64_t Controller::statSession::getLastSecond(uint64_t t){ - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){return it->second.getDataFor(t).lastSecond;} - } - } - if (oldConns.size()){ - for (std::deque::reverse_iterator it = oldConns.rbegin(); it != oldConns.rend(); ++it){ - if (it->hasDataFor(t)){return it->getDataFor(t).lastSecond;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).lastSecond; } return 0; } /// Returns the cumulative downloaded bytes for this session at timestamp t. uint64_t Controller::statSession::getDown(uint64_t t){ - uint64_t retVal = wipedDown; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){retVal += it->getDataFor(t).down;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).down; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).down;} - } - } - return retVal; + return 0; } /// Returns the cumulative uploaded bytes for this session at timestamp t. uint64_t Controller::statSession::getUp(uint64_t t){ - uint64_t retVal = wipedUp; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){retVal += it->getDataFor(t).up;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).up; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).up;} - } - } - return retVal; + return 0; } /// Returns the cumulative downloaded bytes for this session at timestamp t. uint64_t Controller::statSession::getDown(){ - uint64_t retVal = wipedDown; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){retVal += it->log.rbegin()->second.down;} - } + if (curData.log.size()){ + return curData.log.rbegin()->second.down; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.log.size()){retVal += it->second.log.rbegin()->second.down;} - } - } - return retVal; + return 0; } /// Returns the cumulative uploaded bytes for this session at timestamp t. uint64_t Controller::statSession::getUp(){ - uint64_t retVal = wipedUp; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){retVal += it->log.rbegin()->second.up;} - } + if (curData.log.size()){ + return curData.log.rbegin()->second.up; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.log.size()){retVal += it->second.log.rbegin()->second.up;} - } - } - return retVal; + return 0; } uint64_t Controller::statSession::getPktCount(uint64_t t){ - uint64_t retVal = wipedPktCount; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktCount;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).pktCount; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktCount;} - } - } - return retVal; + return 0; } /// Returns the cumulative uploaded bytes for this session at timestamp t. uint64_t Controller::statSession::getPktCount(){ - uint64_t retVal = wipedPktCount; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){retVal += it->log.rbegin()->second.pktCount;} - } + if (curData.log.size()){ + return curData.log.rbegin()->second.pktCount; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktCount;} - } - } - return retVal; + return 0; } + uint64_t Controller::statSession::getPktLost(uint64_t t){ - uint64_t retVal = wipedPktLost; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktLost;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).pktLost; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktLost;} - } - } - return retVal; + return 0; } /// Returns the cumulative uploaded bytes for this session at timestamp t. uint64_t Controller::statSession::getPktLost(){ - uint64_t retVal = wipedPktLost; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){retVal += it->log.rbegin()->second.pktLost;} - } + if (curData.log.size()){ + return curData.log.rbegin()->second.pktLost; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktLost;} - } - } - return retVal; + return 0; } + uint64_t Controller::statSession::getPktRetransmit(uint64_t t){ - uint64_t retVal = wipedPktRetransmit; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->hasDataFor(t)){retVal += it->getDataFor(t).pktRetransmit;} - } + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).pktRetransmit; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.hasDataFor(t)){retVal += it->second.getDataFor(t).pktRetransmit;} - } - } - return retVal; + return 0; } /// Returns the cumulative uploaded bytes for this session at timestamp t. uint64_t Controller::statSession::getPktRetransmit(){ - uint64_t retVal = wipedPktRetransmit; - if (oldConns.size()){ - for (std::deque::iterator it = oldConns.begin(); it != oldConns.end(); ++it){ - if (it->log.size()){retVal += it->log.rbegin()->second.pktRetransmit;} - } + if (curData.log.size()){ + return curData.log.rbegin()->second.pktRetransmit; } - if (curConns.size()){ - for (std::map::iterator it = curConns.begin(); it != curConns.end(); ++it){ - if (it->second.log.size()){retVal += it->second.log.rbegin()->second.pktRetransmit;} - } - } - return retVal; + return 0; } /// Returns the cumulative downloaded bytes per second for this session at timestamp t. @@ -1207,6 +876,7 @@ Controller::statLog &Controller::statStorage::getDataFor(unsigned long long t){ empty.pktCount = 0; empty.pktLost = 0; empty.pktRetransmit = 0; + empty.connectors = ""; return empty; } std::map::iterator it = log.upper_bound(t); @@ -1216,7 +886,7 @@ Controller::statLog &Controller::statStorage::getDataFor(unsigned long long t){ /// This function is called by parseStatistics. /// It updates the internally saved statistics data. -void Controller::statStorage::update(Comms::Statistics &statComm, size_t index){ +void Controller::statStorage::update(Comms::Sessions &statComm, size_t index){ statLog tmp; tmp.time = statComm.getTime(index); tmp.lastSecond = statComm.getLastSecond(index); @@ -1225,56 +895,58 @@ void Controller::statStorage::update(Comms::Statistics &statComm, size_t index){ tmp.pktCount = statComm.getPacketCount(index); tmp.pktLost = statComm.getPacketLostCount(index); tmp.pktRetransmit = statComm.getPacketRetransmitCount(index); + tmp.connectors = statComm.getConnector(index); log[statComm.getNow(index)] = tmp; - // wipe data older than approx. STAT_CUTOFF seconds - /// \todo Remove least interesting data first. - if (log.size() > STAT_CUTOFF){log.erase(log.begin());} + // wipe data older than STAT_CUTOFF seconds + while (log.size() && log.begin()->first < Util::bootSecs() - STAT_CUTOFF){log.erase(log.begin());} } void Controller::statLeadIn(){ statDropoff = Util::bootSecs() - 3; } -void Controller::statOnActive(size_t id){ - // calculate the current session index, store as idx. - sessIndex idx(statComm, id); +void Controller::statOnActive(size_t id){ if (statComm.getNow(id) >= statDropoff){ - // if the connection was already indexed and it has changed, move it - if (connToSession.count(id) && connToSession[id] != idx){ - if (sessions[connToSession[id]].getSessType() != SESS_UNSET){ - INFO_MSG("Switching connection %zu from active session %s over to %s", id, - connToSession[id].toStr().c_str(), idx.toStr().c_str()); - }else{ - INFO_MSG("Switching connection %zu from inactive session %s over to %s", id, - connToSession[id].toStr().c_str(), idx.toStr().c_str()); - } - sessions[connToSession[id]].switchOverTo(sessions[idx], id); - // Destroy this session without calling dropSession, because it was merged into another. What session? We never made it. Stop asking hard questions. Go, shoo. *sprays water* - if (!sessions[connToSession[id]].hasData()){sessions.erase(connToSession[id]);} - } - if (!connToSession.count(id)){ - INSANE_MSG("New connection: %zu as %s", id, idx.toStr().c_str()); - } - // store the index for later comparison - connToSession[id] = idx; // update the session with the latest data - sessions[idx].update(id, statComm); + sessions[statComm.getSessId(id)].update(id, statComm); } } + void Controller::statOnDisconnect(size_t id){ - sessIndex idx(statComm, id); - INSANE_MSG("Ended connection: %zu as %s", id, idx.toStr().c_str()); - sessions[idx].finish(id); - connToSession.erase(id); + // Check to see if cleanup is required (when a Session binary fails) + const std::string thisSessionId = statComm.getSessId(id); + // Try to lock to see if the session crashed during boot + IPC::semaphore sessionLock; + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, thisSessionId.c_str()); + sessionLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!sessionLock.tryWaitOneSecond()){ + // Session likely crashed during boot. Remove the session lock which was created on bootup of the session + sessionLock.unlink(); + }else if (!statComm.sessIdExists(thisSessionId)){ + // There is no running process managing this session, so check if the data page still exists + IPC::sharedPage dataPage; + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, thisSessionId.c_str()); + dataPage.init(userPageName, 1, false, false); + if(dataPage){ + // Session likely crashed while it was running + dataPage.init(userPageName, 1, true); + FAIL_MSG("Session '%s' got canceled unexpectedly. Hoovering up the left overs...", thisSessionId.c_str()); + } + // Finally remove the session lock which was created on bootup of the session + sessionLock.unlink(); + } } + void Controller::statLeadOut(){} /// Returns true if this stream has at least one connected client. bool Controller::hasViewers(std::string streamName){ if (sessions.size()){ long long currTime = Util::bootSecs(); - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - if (it->first.streamName == streamName && + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + if (it->second.getStreamName() == streamName && (it->second.hasDataFor(currTime) || it->second.hasDataFor(currTime - 1))){ return true; } @@ -1377,7 +1049,6 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ if (fields & STAT_CLI_UP){rep["fields"].append("up");} if (fields & STAT_CLI_BPS_DOWN){rep["fields"].append("downbps");} if (fields & STAT_CLI_BPS_UP){rep["fields"].append("upbps");} - if (fields & STAT_CLI_CRC){rep["fields"].append("crc");} if (fields & STAT_CLI_SESSID){rep["fields"].append("sessid");} if (fields & STAT_CLI_PKTCOUNT){rep["fields"].append("pktcount");} if (fields & STAT_CLI_PKTLOST){rep["fields"].append("pktlost");} @@ -1386,26 +1057,25 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ rep["data"].null(); // loop over all sessions if (sessions.size()){ - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ unsigned long long time = reqTime; if (now && reqTime - it->second.getEnd() < 5){time = it->second.getEnd();} // data present and wanted? insert it! if ((it->second.getEnd() >= time && it->second.getStart() <= time) && - (!streams.size() || streams.count(it->first.streamName)) && - (!protos.size() || protos.count(it->first.connector))){ + (!streams.size() || streams.count(it->second.getStreamName())) && + (!protos.size() || protos.count(it->second.getCurrentProtocols()))){ if (it->second.hasDataFor(time)){ JSON::Value d; - if (fields & STAT_CLI_HOST){d.append(it->first.host);} - if (fields & STAT_CLI_STREAM){d.append(it->first.streamName);} - if (fields & STAT_CLI_PROTO){d.append(it->first.connector);} + if (fields & STAT_CLI_HOST){d.append(it->second.getHost());} + if (fields & STAT_CLI_STREAM){d.append(it->second.getStreamName());} + if (fields & STAT_CLI_PROTO){d.append(it->second.getCurrentProtocols());} if (fields & STAT_CLI_CONNTIME){d.append(it->second.getConnTime(time));} if (fields & STAT_CLI_POSITION){d.append(it->second.getLastSecond(time));} if (fields & STAT_CLI_DOWN){d.append(it->second.getDown(time));} if (fields & STAT_CLI_UP){d.append(it->second.getUp(time));} if (fields & STAT_CLI_BPS_DOWN){d.append(it->second.getBpsDown(time));} if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));} - if (fields & STAT_CLI_CRC){d.append(it->first.crc);} - if (fields & STAT_CLI_SESSID){d.append(it->first.ID);} + if (fields & STAT_CLI_SESSID){d.append(it->second.getSessId());} if (fields & STAT_CLI_PKTCOUNT){d.append(it->second.getPktCount(time));} if (fields & STAT_CLI_PKTLOST){d.append(it->second.getPktLost(time));} if (fields & STAT_CLI_PKTRETRANSMIT){d.append(it->second.getPktRetransmit(time));} @@ -1463,12 +1133,12 @@ void Controller::fillHasStats(JSON::Value &req, JSON::Value &rep){ { tthread::lock_guard guard(statsMutex); if (sessions.size()){ - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.getSessType() == SESS_INPUT){ - streams.insert(it->first.streamName); + streams.insert(it->second.getStreamName()); }else{ - streams.insert(it->first.streamName); - if (it->second.getSessType() == SESS_VIEWER){clients[it->first.streamName]++;} + streams.insert(it->second.getStreamName()); + if (it->second.getSessType() == SESS_VIEWER){clients[it->second.getStreamName()]++;} } } } @@ -1742,12 +1412,12 @@ void Controller::fillTotals(JSON::Value &req, JSON::Value &rep){ // loop over all sessions /// \todo Make the interval configurable instead of 1 second if (sessions.size()){ - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ + for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ // data present and wanted? insert it! if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && - (!streams.size() || streams.count(it->first.streamName)) && - (!protos.size() || protos.count(it->first.connector))){ + (!streams.size() || streams.count(it->second.getStreamName())) && + (!protos.size() || protos.count(it->second.getCurrentProtocols()))){ for (unsigned long long i = reqStart; i <= reqEnd; ++i){ if (it->second.hasDataFor(i)){ totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i), it->second.getSessType(), it->second.getPktCount(), it->second.getPktLost(), it->second.getPktRetransmit()); @@ -1831,6 +1501,25 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int H.SetHeader("Server", APPIDENT); H.StartResponse("200", "OK", H, conn, true); + // Counters of current active viewers, inputs and outputs of the Session stats cache + std::map outputs; + uint32_t totViewers = 0; + uint32_t totInputs = 0; + uint32_t totOutputs = 0; + for (uint64_t idx = 0; idx < statComm.recordCount(); idx++){ + if (statComm.getStatus(idx) == COMM_STATUS_INVALID || statComm.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;} + const std::string thisSessId = statComm.getSessId(idx); + // Count active viewers, inputs, outputs and protocols + if (thisSessId[0] == 'I'){ + totInputs++; + }else if (thisSessId[0] == 'O'){ + totOutputs++; + outputs[statComm.getConnector(idx)]++; + }else{ + totViewers++; + } + } + // Collect core server stats uint64_t mem_total = 0, mem_free = 0, mem_bufcache = 0; uint64_t bw_up_total = 0, bw_down_total = 0; @@ -1904,109 +1593,69 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "# TYPE mist_shm_used gauge\n"; response << "mist_shm_used " << (shm_total - shm_free) << "\n\n"; - if (Controller::triggerStats.size()){ - response << "# HELP mist_trigger_count Total executions for the given trigger\n"; - response << "# HELP mist_trigger_time Total execution time in millis for the given trigger\n"; - response << "# HELP mist_trigger_fails Total failed executions for the given trigger\n"; - for (std::map::iterator it = Controller::triggerStats.begin(); - it != Controller::triggerStats.end(); it++){ - response << "mist_trigger_count{trigger=\"" << it->first << "\"}" << it->second.totalCount << "\n"; - response << "mist_trigger_time{trigger=\"" << it->first << "\"}" << it->second.ms << "\n"; - response << "mist_trigger_fails{trigger=\"" << it->first << "\"}" << it->second.failCount << "\n"; - } - response << "\n"; - } + response << "# HELP mist_viewseconds_total Number of seconds any media was received by a viewer.\n"; + response << "# TYPE mist_viewseconds_total counter\n"; + response << "mist_viewseconds_total " << servSeconds + viewSecondsTotal << "\n"; - {// Scope for shortest possible blocking of statsMutex - tthread::lock_guard guard(statsMutex); - // collect the data first - std::map outputs; - unsigned long totViewers = 0, totInputs = 0, totOutputs = 0; - unsigned int tOut = Util::bootSecs() - STATS_DELAY; - unsigned int tIn = Util::bootSecs() - STATS_INPUT_DELAY; - // check all sessions - if (sessions.size()){ - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - switch (it->second.getSessType()){ - case SESS_UNSET: break; - case SESS_VIEWER: - if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - outputs[it->first.connector]++; - totViewers++; - } - break; - case SESS_INPUT: - if (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn)){totInputs++;} - break; - case SESS_OUTPUT: - if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){totOutputs++;} - break; - } - } - } + response << "\n# HELP mist_sessions_count Counts of unique sessions by type since server " + "start.\n"; + response << "# TYPE mist_sessions_count counter\n"; + response << "mist_sessions_count{sessType=\"viewers\"}" << servViewers << "\n"; + response << "mist_sessions_count{sessType=\"incoming\"}" << servInputs << "\n"; + response << "mist_sessions_count{sessType=\"outgoing\"}" << servOutputs << "\n\n"; - response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by " - "type.\n"; - response << "# TYPE mist_sessions_total gauge\n"; - response << "mist_sessions_total{sessType=\"viewers\"}" << totViewers << "\n"; - response << "mist_sessions_total{sessType=\"incoming\"}" << totInputs << "\n"; - response << "mist_sessions_total{sessType=\"outgoing\"}" << totOutputs << "\n"; - response << "mist_sessions_total{sessType=\"cached\"}" << sessions.size() << "\n\n"; + response << "# HELP mist_bw_total Count of bytes handled since server start, by direction.\n"; + response << "# TYPE mist_bw_total counter\n"; + response << "stat_bw_total{direction=\"up\"}" << bw_up_total << "\n"; + response << "stat_bw_total{direction=\"down\"}" << bw_down_total << "\n\n"; + response << "mist_bw_total{direction=\"up\"}" << servUpBytes << "\n"; + response << "mist_bw_total{direction=\"down\"}" << servDownBytes << "\n\n"; + response << "mist_bw_other{direction=\"up\"}" << servUpOtherBytes << "\n"; + response << "mist_bw_other{direction=\"down\"}" << servDownOtherBytes << "\n\n"; + response << "mist_bw_limit " << bwLimit << "\n\n"; - response << "# HELP mist_viewseconds_total Number of seconds any media was received by a viewer.\n"; - response << "# TYPE mist_viewseconds_total counter\n"; - response << "mist_viewseconds_total " << servSeconds << "\n"; + response << "# HELP mist_packets_total Total number of packets sent/received/lost over lossy protocols, server-wide.\n"; + response << "# TYPE mist_packets_total counter\n"; + response << "mist_packets_total{pkttype=\"sent\"}" << servPackSent << "\n"; + response << "mist_packets_total{pkttype=\"lost\"}" << servPackLoss << "\n"; + response << "mist_packets_total{pkttype=\"retrans\"}" << servPackRetrans << "\n"; + if (outputs.size()){ response << "# HELP mist_outputs Number of viewers active right now, server-wide, by output type.\n"; response << "# TYPE mist_outputs gauge\n"; for (std::map::iterator it = outputs.begin(); it != outputs.end(); ++it){ response << "mist_outputs{output=\"" << it->first << "\"}" << it->second << "\n"; } response << "\n"; + } - response << "# HELP mist_sessions_count Counts of unique sessions by type since server " - "start.\n"; - response << "# TYPE mist_sessions_count counter\n"; - response << "mist_sessions_count{sessType=\"viewers\"}" << servViewers << "\n"; - response << "mist_sessions_count{sessType=\"incoming\"}" << servInputs << "\n"; - response << "mist_sessions_count{sessType=\"outgoing\"}" << servOutputs << "\n\n"; + {// Scope for shortest possible blocking of statsMutex + tthread::lock_guard guard(statsMutex); - response << "# HELP mist_bw_total Count of bytes handled since server start, by direction.\n"; - response << "# TYPE mist_bw_total counter\n"; - response << "stat_bw_total{direction=\"up\"}" << bw_up_total << "\n"; - response << "stat_bw_total{direction=\"down\"}" << bw_down_total << "\n\n"; - response << "mist_bw_total{direction=\"up\"}" << servUpBytes << "\n"; - response << "mist_bw_total{direction=\"down\"}" << servDownBytes << "\n\n"; - response << "mist_bw_other{direction=\"up\"}" << servUpOtherBytes << "\n"; - response << "mist_bw_other{direction=\"down\"}" << servDownOtherBytes << "\n\n"; - response << "mist_bw_limit " << bwLimit << "\n\n"; + response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n"; + response << "# TYPE mist_sessions_total gauge\n"; + response << "mist_sessions_total{sessType=\"viewers\"}" << totViewers << "\n"; + response << "mist_sessions_total{sessType=\"incoming\"}" << totInputs << "\n"; + response << "mist_sessions_total{sessType=\"outgoing\"}" << totOutputs << "\n"; + response << "mist_sessions_total{sessType=\"cached\"}" << sessions.size() << "\n"; - response << "# HELP mist_packets_total Total number of packets sent/received/lost over lossy protocols, server-wide.\n"; - response << "# TYPE mist_packets_total counter\n"; - response << "mist_packets_total{pkttype=\"sent\"}" << servPackSent << "\n"; - response << "mist_packets_total{pkttype=\"lost\"}" << servPackLoss << "\n"; - response << "mist_packets_total{pkttype=\"retrans\"}" << servPackRetrans << "\n"; - - response << "\n# HELP mist_viewers Number of sessions by type and stream active right now.\n"; - response << "# TYPE mist_viewers gauge\n"; - response << "# HELP mist_viewcount Count of unique viewer sessions since stream start, per " + response << "\n# HELP mist_viewcount Count of unique viewer sessions since stream start, per " "stream.\n"; response << "# TYPE mist_viewcount counter\n"; - response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n"; - response << "# TYPE mist_bw counter\n"; response << "# HELP mist_viewseconds Number of seconds any media was received by a viewer.\n"; response << "# TYPE mist_viewseconds counter\n"; + response << "# HELP mist_bw Count of bytes handled since stream start, by direction.\n"; + response << "# TYPE mist_bw counter\n"; response << "# HELP mist_packets Total number of packets sent/received/lost over lossy protocols.\n"; response << "# TYPE mist_packets counter\n"; - response << "mist_viewseconds_total " << servSeconds << "\n"; for (std::map::iterator it = streamStats.begin(); - it != streamStats.end(); ++it){ + it != streamStats.end(); ++it){ response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"viewers\"}" - << it->second.currViews << "\n"; + << it->second.currViews << "\n"; response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"incoming\"}" - << it->second.currIns << "\n"; + << it->second.currIns << "\n"; response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"}" - << it->second.currOuts << "\n"; + << it->second.currOuts << "\n"; response << "mist_viewcount{stream=\"" << it->first << "\"}" << it->second.viewers << "\n"; response << "mist_viewseconds{stream=\"" << it->first << "\"} " << it->second.viewSeconds << "\n"; response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"}" << it->second.upBytes << "\n"; @@ -2015,6 +1664,19 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "mist_packets{stream=\"" << it->first << "\",pkttype=\"lost\"}" << it->second.packLoss << "\n"; response << "mist_packets{stream=\"" << it->first << "\",pkttype=\"retrans\"}" << it->second.packRetrans << "\n"; } + + if (Controller::triggerStats.size()){ + response << "\n# HELP mist_trigger_count Total executions for the given trigger\n"; + response << "# HELP mist_trigger_time Total execution time in millis for the given trigger\n"; + response << "# HELP mist_trigger_fails Total failed executions for the given trigger\n"; + for (std::map::iterator it = Controller::triggerStats.begin(); + it != Controller::triggerStats.end(); it++){ + response << "mist_trigger_count{trigger=\"" << it->first << "\"}" << it->second.totalCount << "\n"; + response << "mist_trigger_time{trigger=\"" << it->first << "\"}" << it->second.ms << "\n"; + response << "mist_trigger_fails{trigger=\"" << it->first << "\"}" << it->second.failCount << "\n"; + } + response << "\n"; + } } H.Chunkify(response.str(), conn); } @@ -2026,58 +1688,33 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int resp["shm_total"] = shm_total; resp["shm_used"] = (shm_total - shm_free); resp["logs"] = Controller::logCounter; - if (Controller::triggerStats.size()){ - for (std::map::iterator it = Controller::triggerStats.begin(); - it != Controller::triggerStats.end(); it++){ - JSON::Value &tVal = resp["triggers"][it->first]; - tVal["count"] = it->second.totalCount; - tVal["ms"] = it->second.ms; - tVal["fails"] = it->second.failCount; - } - } + resp["curr"].append(totViewers); + resp["curr"].append(totInputs); + resp["curr"].append(totOutputs); + resp["tot"].append(servViewers); + resp["tot"].append(servInputs); + resp["tot"].append(servOutputs); + resp["st"].append(bw_up_total); + resp["st"].append(bw_down_total); + resp["bw"].append(servUpBytes); + resp["bw"].append(servDownBytes); + resp["pkts"].append(servPackSent); + resp["pkts"].append(servPackLoss); + resp["pkts"].append(servPackRetrans); + resp["bwlimit"] = bwLimit; {// Scope for shortest possible blocking of statsMutex tthread::lock_guard guard(statsMutex); - // collect the data first - std::map outputs; - uint64_t totViewers = 0, totInputs = 0, totOutputs = 0; - uint64_t tOut = Util::bootSecs() - STATS_DELAY; - uint64_t tIn = Util::bootSecs() - STATS_INPUT_DELAY; - // check all sessions - if (sessions.size()){ - for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ - switch (it->second.getSessType()){ - case SESS_UNSET: break; - case SESS_VIEWER: - if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - outputs[it->first.connector]++; - totViewers++; - } - break; - case SESS_INPUT: - if (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn)){totInputs++;} - break; - case SESS_OUTPUT: - if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){totOutputs++;} - break; - } + resp["curr"].append((uint64_t)sessions.size()); + + if (Controller::triggerStats.size()){ + for (std::map::iterator it = Controller::triggerStats.begin(); + it != Controller::triggerStats.end(); it++){ + JSON::Value &tVal = resp["triggers"][it->first]; + tVal["count"] = it->second.totalCount; + tVal["ms"] = it->second.ms; + tVal["fails"] = it->second.failCount; } } - - resp["curr"].append(totViewers); - resp["curr"].append(totInputs); - resp["curr"].append(totOutputs); - resp["curr"].append((uint64_t)sessions.size()); - resp["tot"].append(servViewers); - resp["tot"].append(servInputs); - resp["tot"].append(servOutputs); - resp["st"].append(bw_up_total); - resp["st"].append(bw_down_total); - resp["bw"].append(servUpBytes); - resp["bw"].append(servDownBytes); - resp["pkts"].append(servPackSent); - resp["pkts"].append(servPackLoss); - resp["pkts"].append(servPackRetrans); - resp["bwlimit"] = bwLimit; if (Storage["config"].isMember("location") && Storage["config"]["location"].isMember("lat") && Storage["config"]["location"].isMember("lon")){ resp["loc"]["lat"] = Storage["config"]["location"]["lat"].asDouble(); resp["loc"]["lon"] = Storage["config"]["location"]["lon"].asDouble(); diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 1cc4a82d..f798f811 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -18,8 +18,7 @@ namespace Controller{ extern bool killOnExit; - extern unsigned int maxConnsPerIP; - + /// This function is ran whenever a stream becomes active. void streamStarted(std::string stream); /// This function is ran whenever a stream becomes inactive. @@ -35,34 +34,14 @@ namespace Controller{ uint64_t pktCount; uint64_t pktLost; uint64_t pktRetransmit; + std::string connectors; }; enum sessType{SESS_UNSET = 0, SESS_INPUT, SESS_OUTPUT, SESS_VIEWER}; - /// This is a comparison and storage class that keeps sessions apart from each other. - /// Whenever two of these objects are not equal, it will create a new session. - class sessIndex{ - public: - sessIndex(); - sessIndex(const Comms::Statistics &statComm, size_t id); - std::string ID; - std::string host; - unsigned int crc; - std::string streamName; - std::string connector; - - bool operator==(const sessIndex &o) const; - bool operator!=(const sessIndex &o) const; - bool operator>(const sessIndex &o) const; - bool operator<=(const sessIndex &o) const; - bool operator<(const sessIndex &o) const; - bool operator>=(const sessIndex &o) const; - std::string toStr(); - }; - class statStorage{ public: - void update(Comms::Statistics &statComm, size_t index); + void update(Comms::Sessions &statComm, size_t index); bool hasDataFor(unsigned long long); statLog &getDataFor(unsigned long long); std::map log; @@ -75,36 +54,33 @@ namespace Controller{ uint64_t firstActive; uint64_t firstSec; uint64_t lastSec; - uint64_t wipedUp; - uint64_t wipedDown; - uint64_t wipedPktCount; - uint64_t wipedPktLost; - uint64_t wipedPktRetransmit; - std::deque oldConns; sessType sessionType; bool tracked; uint8_t noBWCount; ///< Set to 2 when not to count for external bandwidth + std::string streamName; + std::string host; + std::string curConnector; + std::string sessId; + public: statSession(); - uint32_t invalidate(); - uint32_t kill(); - char sync; - std::map curConns; + ~statSession(); + statStorage curData; std::set tags; sessType getSessType(); - void wipeOld(uint64_t); - void finish(uint64_t index); - void switchOverTo(statSession &newSess, uint64_t index); - void update(uint64_t index, Comms::Statistics &data); - void dropSession(const sessIndex &index); + void update(uint64_t index, Comms::Sessions &data); uint64_t getStart(); uint64_t getEnd(); bool isViewerOn(uint64_t time); - bool isConnected(); bool isTracked(); bool hasDataFor(uint64_t time); - bool hasData(); + std::string getStreamName(); + std::string getHost(); + std::string getSessId(); + std::string getCurrentProtocols(); + uint64_t newestDataPoint(); uint64_t getConnTime(uint64_t time); + uint64_t getConnTime(); uint64_t getLastSecond(uint64_t time); uint64_t getDown(uint64_t time); uint64_t getUp(); @@ -122,8 +98,6 @@ namespace Controller{ uint64_t getBpsUp(uint64_t start, uint64_t end); }; - extern std::map sessions; - extern std::map connToSession; extern tthread::mutex statsMutex; extern uint64_t statDropoff; @@ -155,6 +129,7 @@ namespace Controller{ void sessions_shutdown(const std::string &streamname, const std::string &protocol = ""); bool hasViewers(std::string streamName); void writeSessionCache(); /*LTS*/ + void killConnections(std::string sessId); #define PROMETHEUS_TEXT 0 #define PROMETHEUS_JSON 1 diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index 58e7b521..bdd52893 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -91,19 +91,6 @@ namespace Controller{ rlxAccs->setString("tags", tags, newEndPos); rlxAccs->setEndPos(newEndPos + 1); } - if (Triggers::shouldTrigger("USER_END", strm)){ - std::stringstream plgen; - plgen << sessId << "\n" - << strm << "\n" - << conn << "\n" - << host << "\n" - << duration << "\n" - << up << "\n" - << down << "\n" - << tags; - std::string payload = plgen.str(); - Triggers::doTrigger("USER_END", payload, strm); - } } void normalizeTrustedProxies(JSON::Value &tp){ @@ -450,7 +437,8 @@ namespace Controller{ systemBoot = globAccX.getInt("systemBoot"); } if(!globAccX.getFieldAccX("defaultStream") - || !globAccX.getFieldAccX("systemBoot")){ + || !globAccX.getFieldAccX("systemBoot") + || !globAccX.getFieldAccX("sessionMode")){ globAccX.setReload(); globCfg.master = true; globCfg.close(); @@ -461,12 +449,16 @@ namespace Controller{ if (!globAccX.isReady()){ globAccX.addField("defaultStream", RAX_128STRING); globAccX.addField("systemBoot", RAX_64UINT); + globAccX.addField("sessionMode", RAX_64UINT); + if (!Storage["config"]["sessionMode"]){ + Storage["config"]["sessionMode"] = SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID; + } globAccX.setRCount(1); globAccX.setEndPos(1); globAccX.setReady(); } globAccX.setString("defaultStream", Storage["config"]["defaultStream"].asStringRef()); - globAccX.setInt("systemBoot", systemBoot); + globAccX.setInt("sessionMode", Storage["config"]["sessionMode"].asInt()); globCfg.master = false; // leave the page after closing } } diff --git a/src/input/input.cpp b/src/input/input.cpp index a86fe3cd..0524f55b 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -794,7 +794,7 @@ namespace Mist{ void Input::streamMainLoop(){ uint64_t statTimer = 0; uint64_t startTime = Util::bootSecs(); - Comms::Statistics statComm; + Comms::Connections statComm; getNext(); if (thisPacket && !userSelect.count(thisIdx)){ userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK); @@ -820,7 +820,7 @@ namespace Mist{ if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection - if (!statComm){statComm.reload();} + if (!statComm){statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);} if (statComm){ if (!statComm){ config->is_active = false; @@ -829,7 +829,6 @@ namespace Mist{ } uint64_t now = Util::bootSecs(); statComm.setNow(now); - statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setTime(now - startTime); @@ -842,7 +841,7 @@ namespace Mist{ } } - void Input::connStats(Comms::Statistics &statComm){ + void Input::connStats(Comms::Connections &statComm){ statComm.setUp(0); statComm.setDown(streamByteCount()); statComm.setHost(getConnectedBinHost()); @@ -853,7 +852,7 @@ namespace Mist{ uint64_t statTimer = 0; uint64_t startTime = Util::bootSecs(); size_t idx; - Comms::Statistics statComm; + Comms::Connections statComm; DTSC::Meta liveMeta(config->getString("streamname"), false); @@ -985,7 +984,7 @@ namespace Mist{ if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection - if (!statComm){statComm.reload();} + if (!statComm){statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);} if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -994,7 +993,6 @@ namespace Mist{ } uint64_t now = Util::bootSecs(); statComm.setNow(now); - statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setTime(now - startTime); diff --git a/src/input/input.h b/src/input/input.h index 8d7e8891..ce7686c1 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -69,7 +69,7 @@ namespace Mist{ virtual void userOnActive(size_t id); virtual void userOnDisconnect(size_t id); virtual void userLeadOut(); - virtual void connStats(Comms::Statistics & statComm); + virtual void connStats(Comms::Connections & statComm); virtual void parseHeader(); bool bufferFrame(size_t track, uint32_t keyNum); diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp index fef613bb..ebb812e1 100644 --- a/src/input/input_rtsp.cpp +++ b/src/input/input_rtsp.cpp @@ -196,7 +196,7 @@ namespace Mist{ } void InputRTSP::streamMainLoop(){ - Comms::Statistics statComm; + Comms::Connections statComm; uint64_t startTime = Util::epoch(); uint64_t lastPing = Util::bootSecs(); uint64_t lastSecs = 0; @@ -210,7 +210,7 @@ namespace Mist{ if (lastSecs != currSecs){ lastSecs = currSecs; // Connect to stats for INPUT detection - statComm.reload(); + statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID); if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -219,7 +219,6 @@ namespace Mist{ } uint64_t now = Util::bootSecs(); statComm.setNow(now); - statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setUp(tcpCon.dataUp()); diff --git a/src/input/input_sdp.cpp b/src/input/input_sdp.cpp index 62c02aef..3169a836 100644 --- a/src/input/input_sdp.cpp +++ b/src/input/input_sdp.cpp @@ -193,7 +193,7 @@ namespace Mist{ // Updates stats and quits if parsePacket returns false void InputSDP::streamMainLoop(){ - Comms::Statistics statComm; + Comms::Connections statComm; uint64_t startTime = Util::epoch(); uint64_t lastSecs = 0; // Get RTP packets from UDP socket and stop if this fails @@ -202,7 +202,7 @@ namespace Mist{ if (lastSecs != currSecs){ lastSecs = currSecs; // Connect to stats for INPUT detection - statComm.reload(); + statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID); if (statComm){ if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -211,7 +211,6 @@ namespace Mist{ } uint64_t now = Util::bootSecs(); statComm.setNow(now); - statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setDown(bytesRead); diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index cfe3f7cd..23311dc0 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -527,7 +527,7 @@ namespace Mist{ void inputTS::streamMainLoop(){ meta.removeTrack(tmpIdx); INFO_MSG("Removed temptrack %zu", tmpIdx); - Comms::Statistics statComm; + Comms::Connections statComm; uint64_t downCounter = 0; uint64_t startTime = Util::bootSecs(); uint64_t noDataSince = Util::bootSecs(); @@ -621,7 +621,7 @@ namespace Mist{ // Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 1){ // Connect to stats for INPUT detection - statComm.reload(); + statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID); if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -630,7 +630,6 @@ namespace Mist{ } uint64_t now = Util::bootSecs(); statComm.setNow(now); - statComm.setCRC(getpid()); statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setUp(0); diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp index 4219ebbc..8fef6d7d 100644 --- a/src/input/input_tssrt.cpp +++ b/src/input/input_tssrt.cpp @@ -282,7 +282,7 @@ namespace Mist{ void inputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;} - void inputTSSRT::connStats(Comms::Statistics &statComm){ + void inputTSSRT::connStats(Comms::Connections &statComm){ statComm.setUp(srtConn.dataUp()); statComm.setDown(srtConn.dataDown()); statComm.setHost(getConnectedBinHost()); diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h index 4f337b48..143174cb 100644 --- a/src/input/input_tssrt.h +++ b/src/input/input_tssrt.h @@ -40,7 +40,7 @@ namespace Mist{ Socket::SRTConnection srtConn; bool singularFlag; - virtual void connStats(Comms::Statistics &statComm); + virtual void connStats(Comms::Connections &statComm); Util::ResizeablePointer rawBuffer; size_t rawIdx; diff --git a/src/output/output.cpp b/src/output/output.cpp index 1fa86eee..50d67549 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -7,7 +7,7 @@ #include #include -#include "output.h" +#include "output.h" #include #include #include @@ -92,7 +92,7 @@ namespace Mist{ firstTime = 0; firstPacketTime = 0xFFFFFFFFFFFFFFFFull; lastPacketTime = 0; - crc = getpid(); + sid = ""; parseData = false; wantRequest = true; sought = false; @@ -100,7 +100,7 @@ namespace Mist{ isBlocking = false; needsLookAhead = 0; extraKeepAway = 0; - lastStats = 0; + lastStats = 0xFFFFFFFFFFFFFFFFull; maxSkipAhead = 7500; uaDelay = 10; realTime = 1000; @@ -111,6 +111,7 @@ namespace Mist{ lastPushUpdate = 0; previousFile = ""; currentFile = ""; + sessionMode = 0xFFFFFFFFFFFFFFFFull; lastRecv = Util::bootSecs(); if (myConn){ @@ -211,95 +212,9 @@ namespace Mist{ onFail("Not allowed to play (CONN_PLAY)"); } } - doSync(true); /*LTS-END*/ } - /// If called with force set to true and a USER_NEW trigger enabled, forces a sync immediately. - /// Otherwise, does nothing unless the sync byte is set to 2, in which case it forces a sync as well. - /// May be called recursively because it calls stats() which calls this function. - /// If this happens, the extra calls to the function return instantly. - void Output::doSync(bool force){ - if (!statComm){return;} - if (recursingSync){return;} - recursingSync = true; - if (statComm.getSync() == 2 || force){ - if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){ - // sync byte 0 = no sync yet, wait for sync from controller... - char initialSync = 0; - // attempt to load sync status from session cache in shm - { - IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 16); - if (cacheLock){cacheLock.wait();} - IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false); - if (shmSessions.mapped){ - char shmEmpty[SHM_SESSIONS_ITEM]; - memset(shmEmpty, 0, SHM_SESSIONS_ITEM); - std::string host; - Socket::hostBytesToStr(statComm.getHost().data(), 16, host); - uint32_t shmOffset = 0; - const std::string &cName = capa["name"].asStringRef(); - while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){ - // compare crc - if (*(uint32_t*)(shmSessions.mapped + shmOffset) == crc){ - // compare stream name - if (strncmp(shmSessions.mapped + shmOffset + 4, streamName.c_str(), 100) == 0){ - // compare connector - if (strncmp(shmSessions.mapped + shmOffset + 104, cName.c_str(), 20) == 0){ - // compare host - if (strncmp(shmSessions.mapped + shmOffset + 124, host.c_str(), 40) == 0){ - initialSync = shmSessions.mapped[shmOffset + 164]; - HIGH_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync); - break; - } - } - } - } - // stop if we reached the end - if (memcmp(shmSessions.mapped + shmOffset, shmEmpty, SHM_SESSIONS_ITEM) == 0){ - break; - } - shmOffset += SHM_SESSIONS_ITEM; - } - } - if (cacheLock){cacheLock.post();} - } - unsigned int i = 0; - statComm.setSync(initialSync); - // wait max 10 seconds for sync - while ((!statComm.getSync() || statComm.getSync() == 2) && i++ < 100){ - Util::wait(100); - stats(true); - } - HIGH_MSG("USER_NEW sync achieved: %u", statComm.getSync()); - // 1 = check requested (connection is new) - if (statComm.getSync() == 1){ - std::string payload = streamName + "\n" + getConnectedHost() + "\n" + - JSON::Value(crc).asString() + "\n" + capa["name"].asStringRef() + - "\n" + reqUrl + "\n" + statComm.getSessId(); - if (!Triggers::doTrigger("USER_NEW", payload, streamName)){ - onFail("Not allowed to play (USER_NEW)"); - statComm.setSync(100); // 100 = denied - }else{ - statComm.setSync(10); // 10 = accepted - } - } - // 100 = denied - if (statComm.getSync() == 100){onFail("Not allowed to play (USER_NEW cache)");} - if (statComm.getSync() == 0){ - onFail("Not allowed to play (USER_NEW init timeout)", true); - } - if (statComm.getSync() == 2){ - onFail("Not allowed to play (USER_NEW re-init timeout)", true); - } - // anything else = accepted - }else{ - statComm.setSync(10); // auto-accept if no trigger - } - } - recursingSync = false; - } - std::string Output::getConnectedHost(){return myConn.getHost();} std::string Output::getConnectedBinHost(){ @@ -433,10 +348,10 @@ namespace Mist{ //Connect to stats reporting, if not connected already if (!statComm){ - statComm.reload(); + statComm.reload(streamName, getConnectedHost(), sid, capa["name"].asStringRef(), reqUrl, sessionMode); stats(true); } - + //push inputs do not need to wait for stream to be ready for playback if (isPushing()){return;} @@ -986,7 +901,7 @@ namespace Mist{ INFO_MSG("Will split recording every %lld seconds", atoll(targetParams["split"].c_str())); targetParams["nxt-split"] = JSON::Value((int64_t)(seekPos + endRec)).asString(); } - // Duration to record in seconds. Overrides recstop. + // Duration to record in seconds. Oversides recstop. if (targetParams.count("duration")){ long long endRec = atoll(targetParams["duration"].c_str()) * 1000; targetParams["recstop"] = JSON::Value((int64_t)(seekPos + endRec)).asString(); @@ -1301,6 +1216,7 @@ namespace Mist{ /// request URL (if any) /// ~~~~~~~~~~~~~~~ int Output::run(){ + sessionMode = Util::getGlobalConfig("sessionMode").asInt(); /*LTS-START*/ // Connect to file target, if needed if (isFileTarget()){ @@ -1507,6 +1423,8 @@ namespace Mist{ /*LTS-END*/ disconnect(); + stats(true); + userSelect.clear(); myConn.close(); return 0; } @@ -1822,7 +1740,7 @@ namespace Mist{ // also cancel if it has been less than a second since the last update // unless force is set to true uint64_t now = Util::bootSecs(); - if (now == lastStats && !force){return;} + if (now <= lastStats && !force){return;} if (isRecording()){ if(lastPushUpdate == 0){ @@ -1861,13 +1779,17 @@ namespace Mist{ } } - if (!statComm){statComm.reload();} - if (!statComm){return;} + if (!statComm){statComm.reload(streamName, getConnectedHost(), sid, capa["name"].asStringRef(), reqUrl, sessionMode);} + if (!statComm){return;} + if (statComm.getExit()){ + onFail("Shutting down since this session is not allowed to view this stream"); + return; + } lastStats = now; - VERYHIGH_MSG("Writing stats: %s, %s, %u, %" PRIu64 ", %" PRIu64, getConnectedHost().c_str(), streamName.c_str(), - crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown()); + VERYHIGH_MSG("Writing stats: %s, %s, %s, %" PRIu64 ", %" PRIu64, getConnectedHost().c_str(), streamName.c_str(), + sid.c_str(), myConn.dataUp(), myConn.dataDown()); /*LTS-START*/ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ onFail("Shutting down on controller request"); @@ -1875,9 +1797,6 @@ namespace Mist{ } /*LTS-END*/ statComm.setNow(now); - statComm.setHost(getConnectedBinHost()); - statComm.setCRC(crc); - statComm.setStream(streamName); statComm.setConnector(getStatsName()); connStats(now, statComm); statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0); @@ -1887,7 +1806,7 @@ namespace Mist{ // Tag the session with the user agent if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){ std::string APIcall = - "{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}"; + "{\"tag_sessid\":{\"" + statComm.sessionId + "\":" + JSON::string_escape("UA:" + UA) + "}}"; Socket::UDPConnection uSock; uSock.SetDestination(UDP_API_HOST, UDP_API_PORT); uSock.SendNow(APIcall); @@ -1895,8 +1814,6 @@ namespace Mist{ } /*LTS-END*/ - doSync(); - if (isPushing()){ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ if (it->second.getStatus() & COMM_STATUS_REQDISCONNECT){ @@ -1912,7 +1829,7 @@ namespace Mist{ } } - void Output::connStats(uint64_t now, Comms::Statistics &statComm){ + void Output::connStats(uint64_t now, Comms::Connections &statComm){ statComm.setUp(myConn.dataUp()); statComm.setDown(myConn.dataDown()); statComm.setTime(now - myConn.connTime()); diff --git a/src/output/output.h b/src/output/output.h index 84d17482..173b3840 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -86,7 +86,6 @@ namespace Mist{ std::string hostLookup(std::string ip); bool onList(std::string ip, std::string list); std::string getCountry(std::string ip); - void doSync(bool force = false); /*LTS-END*/ std::map currentPage; void loadPageForKey(size_t trackId, size_t keyNum); @@ -105,6 +104,7 @@ namespace Mist{ bool firstData; uint64_t lastPushUpdate; bool newUA; + protected: // these are to be messed with by child classes virtual bool inlineRestartCapable() const{ return false; @@ -122,15 +122,16 @@ namespace Mist{ virtual std::string getStatsName(); virtual bool hasSessionIDs(){return false;} - virtual void connStats(uint64_t now, Comms::Statistics &statComm); + virtual void connStats(uint64_t now, Comms::Connections &statComm); std::set getSupportedTracks(const std::string &type = "") const; inline virtual bool keepGoing(){return config->is_active && myConn;} - Comms::Statistics statComm; + Comms::Connections statComm; bool isBlocking; ///< If true, indicates that myConn is blocking. - uint32_t crc; ///< Checksum, if any, for usage in the stats. + std::string sid; ///< Random identifier used to split connections into sessions + uint64_t sessionMode; uint64_t nextKeyTime(); // stream delaying variables diff --git a/src/output/output_cmaf.cpp b/src/output/output_cmaf.cpp index 4582a8b6..c2fbc5bc 100644 --- a/src/output/output_cmaf.cpp +++ b/src/output/output_cmaf.cpp @@ -101,7 +101,7 @@ namespace Mist{ } } - void OutCMAF::connStats(uint64_t now, Comms::Statistics &statComm){ + void OutCMAF::connStats(uint64_t now, Comms::Connections &statComm){ // For non-push usage, call usual function. if (!isRecording()){ Output::connStats(now, statComm); diff --git a/src/output/output_cmaf.h b/src/output/output_cmaf.h index efc36511..390549cc 100644 --- a/src/output/output_cmaf.h +++ b/src/output/output_cmaf.h @@ -39,7 +39,7 @@ namespace Mist{ bool isReadyForPlay(); protected: - virtual void connStats(uint64_t now, Comms::Statistics &statComm); + virtual void connStats(uint64_t now, Comms::Connections &statComm); void onTrackEnd(size_t idx); bool hasSessionIDs(){return !config->getBool("mergesessions");} @@ -72,6 +72,7 @@ namespace Mist{ void startPushOut(); void pushNext(); + uint32_t crc; HTTP::URL pushUrl; std::map pushTracks; void setupTrackObject(size_t idx); diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index 252f069a..f55b974d 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -55,7 +55,6 @@ namespace Mist{ } void HTTPOutput::onFail(const std::string &msg, bool critical){ - INFO_MSG("Failing '%s': %s", H.url.c_str(), msg.c_str()); if (!webSock && !isRecording() && !responded){ H.Clean(); // make sure no parts of old requests are left in any buffers H.SetHeader("Server", APPIDENT); @@ -238,18 +237,6 @@ namespace Mist{ } /*LTS-END*/ if (H.hasHeader("User-Agent")){UA = H.GetHeader("User-Agent");} - if (hasSessionIDs()){ - if (H.GetVar("sessId").size()){ - std::string ua = H.GetVar("sessId"); - crc = checksum::crc32(0, ua.data(), ua.size()); - }else{ - std::string ua = JSON::Value(getpid()).asString(); - crc = checksum::crc32(0, ua.data(), ua.size()); - } - }else{ - std::string mixed_ua = UA + H.GetHeader("X-Playback-Session-Id"); - crc = checksum::crc32(0, mixed_ua.data(), mixed_ua.size()); - } if (H.GetVar("audio") != ""){targetParams["audio"] = H.GetVar("audio");} if (H.GetVar("video") != ""){targetParams["video"] = H.GetVar("video");} @@ -281,6 +268,21 @@ namespace Mist{ realTime = 0; } } + // Get session ID cookie or generate a random one if it wasn't set + if (!sid.size()){ + std::map storage; + const std::string koekjes = H.GetHeader("Cookie"); + HTTP::parseVars(koekjes, storage); + if (storage.count("sid")){ + // Get sid cookie, which is used to divide connections into sessions + sid = storage.at("sid"); + }else{ + // Else generate one + const std::string newSid = UA + JSON::Value(getpid()).asString(); + sid = JSON::Value(checksum::crc32(0, newSid.data(), newSid.size())).asString(); + H.SetHeader("sid", sid.c_str()); + } + } // Handle upgrade to websocket if the output supports it std::string upgradeHeader = H.GetHeader("Upgrade"); Util::stringToLower(upgradeHeader); diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index db07dc91..dc04b247 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -344,7 +344,7 @@ namespace Mist{ } } - void OutTSSRT::connStats(uint64_t now, Comms::Statistics &statComm){ + void OutTSSRT::connStats(uint64_t now, Comms::Connections &statComm){ if (!srtConn){return;} statComm.setUp(srtConn.dataUp()); statComm.setDown(srtConn.dataDown()); diff --git a/src/output/output_tssrt.h b/src/output/output_tssrt.h index 1423af8d..71c9b72f 100644 --- a/src/output/output_tssrt.h +++ b/src/output/output_tssrt.h @@ -15,7 +15,7 @@ namespace Mist{ bool isReadyForPlay(){return true;} virtual void requestHandler(); protected: - virtual void connStats(uint64_t now, Comms::Statistics &statComm); + virtual void connStats(uint64_t now, Comms::Connections &statComm); virtual std::string getConnectedHost(){return srtConn.remotehost;} virtual std::string getConnectedBinHost(){return srtConn.getBinHost();} diff --git a/src/output/output_webrtc.cpp b/src/output/output_webrtc.cpp index b4289b64..058cd737 100644 --- a/src/output/output_webrtc.cpp +++ b/src/output/output_webrtc.cpp @@ -1015,7 +1015,7 @@ namespace Mist{ } } - void OutWebRTC::connStats(uint64_t now, Comms::Statistics &statComm){ + void OutWebRTC::connStats(uint64_t now, Comms::Connections &statComm){ statComm.setUp(myConn.dataUp()); statComm.setDown(myConn.dataDown()); statComm.setPacketCount(totalPkts); diff --git a/src/output/output_webrtc.h b/src/output/output_webrtc.h index 9c3db580..b2b528c5 100644 --- a/src/output/output_webrtc.h +++ b/src/output/output_webrtc.h @@ -144,7 +144,7 @@ namespace Mist{ void onDTSCConverterHasInitData(const size_t trackID, const std::string &initData); void onRTPPacketizerHasRTPPacket(const char *data, size_t nbytes); void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes); - virtual void connStats(uint64_t now, Comms::Statistics &statComm); + virtual void connStats(uint64_t now, Comms::Connections &statComm); private: uint64_t lastRecv; diff --git a/src/process/process_exec.cpp b/src/process/process_exec.cpp index 7b775848..34ac53fd 100644 --- a/src/process/process_exec.cpp +++ b/src/process/process_exec.cpp @@ -72,7 +72,7 @@ namespace Mist{ } bool needsLock(){return false;} bool isSingular(){return false;} - void connStats(Comms::Statistics &statComm){ + void connStats(Comms::Connections &statComm){ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ if (it->second){it->second.setStatus(COMM_STATUS_DONOTTRACK | it->second.getStatus());} } @@ -117,7 +117,7 @@ namespace Mist{ realTime = 0; OutEBML::sendHeader(); }; - void connStats(uint64_t now, Comms::Statistics &statComm){ + void connStats(uint64_t now, Comms::Connections &statComm){ for (std::map::iterator it = userSelect.begin(); it != userSelect.end(); it++){ if (it->second){it->second.setStatus(COMM_STATUS_DONOTTRACK | it->second.getStatus());} } diff --git a/src/session.cpp b/src/session.cpp new file mode 100644 index 00000000..3865e0ec --- /dev/null +++ b/src/session.cpp @@ -0,0 +1,367 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +// Stats of connections which have closed are added to these global counters +uint64_t globalNow = 0; +uint64_t globalTime = 0; +uint64_t globalDown = 0; +uint64_t globalUp = 0; +uint64_t globalPktcount = 0; +uint64_t globalPktloss = 0; +uint64_t globalPktretrans = 0; +// Counts the duration a connector has been active +std::map connectorCount; +std::map connectorLastActive; +// Set to True when a session gets invalidated, so that we know to run a new USER_NEW trigger +bool forceTrigger = false; +void handleSignal(int signum){ + if (signum == SIGUSR1){ + forceTrigger = true; + } +} + +void userOnActive(uint64_t &connections){ + ++connections; +} + +std::string getEnvWithDefault(const std::string variableName, const std::string defaultValue){ + const char* value = getenv(variableName.c_str()); + if (value){ + unsetenv(variableName.c_str()); + return value; + }else{ + return defaultValue; + } +} + +/// \brief Adds stats of closed connections to global counters +void userOnDisconnect(Comms::Connections & connections, size_t idx){ + std::string thisConnector = connections.getConnector(idx); + if (thisConnector != ""){ + connectorCount[thisConnector] += connections.getTime(idx); + } + globalTime += connections.getTime(idx); + globalDown += connections.getDown(idx); + globalUp += connections.getUp(idx); + globalPktcount += connections.getPacketCount(idx); + globalPktloss += connections.getPacketLostCount(idx); + globalPktretrans += connections.getPacketRetransmitCount(idx); +} + +int main(int argc, char **argv){ + Comms::Connections connections; + Comms::Sessions sessions; + uint64_t lastSeen = Util::bootSecs(); + uint64_t currentConnections = 0; + Util::redirectLogsIfNeeded(); + signal(SIGUSR1, handleSignal); + // Init config and parse arguments + Util::Config config = Util::Config("MistSession"); + JSON::Value option; + + option.null(); + option["arg_num"] = 1; + option["arg"] = "string"; + option["help"] = "Session identifier of the entire session"; + option["default"] = ""; + config.addOption("sessionid", option); + + option.null(); + option["long"] = "sessionmode"; + option["short"] = "m"; + option["arg"] = "integer"; + option["default"] = 0; + config.addOption("sessionmode", option); + + option.null(); + option["long"] = "streamname"; + option["short"] = "n"; + option["arg"] = "string"; + option["default"] = ""; + config.addOption("streamname", option); + + option.null(); + option["long"] = "ip"; + option["short"] = "i"; + option["arg"] = "string"; + option["default"] = ""; + config.addOption("ip", option); + + option.null(); + option["long"] = "sid"; + option["short"] = "s"; + option["arg"] = "string"; + option["default"] = ""; + config.addOption("sid", option); + + option.null(); + option["long"] = "protocol"; + option["short"] = "p"; + option["arg"] = "string"; + option["default"] = ""; + config.addOption("protocol", option); + + option.null(); + option["long"] = "requrl"; + option["short"] = "r"; + option["arg"] = "string"; + option["default"] = ""; + config.addOption("requrl", option); + + config.activate(); + if (!(config.parseArgs(argc, argv))){ + FAIL_MSG("Cannot start a new session due to invalid arguments"); + return 1; + } + + const uint64_t bootTime = Util::getMicros(); + // Get session ID, session mode and other variables used as payload for the USER_NEW and USER_END triggers + const std::string thisStreamName = config.getString("streamname"); + const std::string thisHost = config.getString("ip"); + const std::string thisSid = config.getString("sid"); + const std::string thisProtocol = config.getString("protocol"); + const std::string thisReqUrl = config.getString("requrl"); + const std::string thisSessionId = config.getString("sessionid"); + const uint64_t sessionMode = config.getInteger("sessionmode"); + + if (thisSessionId == "" || thisProtocol == "" || thisStreamName == ""){ + FAIL_MSG("Given the following incomplete arguments: SessionId: '%s', protocol: '%s', stream name: '%s'. Aborting opening a new session", + thisSessionId.c_str(), thisProtocol.c_str(), thisStreamName.c_str()); + return 1; + } + + MEDIUM_MSG("Starting a new session for sessionId '%s'", thisSessionId.c_str()); + if (sessionMode < 1 || sessionMode > 15) { + FAIL_MSG("Invalid session mode of value %lu. Should be larger than 0 and smaller than 16", sessionMode); + return 1; + } + + // Try to lock to ensure we are the only process initialising this session + IPC::semaphore sessionLock; + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, thisSessionId.c_str()); + sessionLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + // If the lock fails, the previous Session process must've failed in spectacular fashion + // It's the Controller's task to clean everything up. When the lock fails, this cleanup hasn't happened yet + if (!sessionLock.tryWaitOneSecond()){ + FAIL_MSG("Session '%s' already locked", thisSessionId.c_str()); + return 1; + } + + // Check if a page already exists for this session ID. If so, quit + { + IPC::sharedPage dataPage; + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, thisSessionId.c_str()); + dataPage.init(userPageName, 0, false, false); + if (dataPage){ + INFO_MSG("Session '%s' already has a running process", thisSessionId.c_str()); + sessionLock.post(); + return 0; + } + } + + // Claim a spot in shared memory for this session on the global statistics page + sessions.reload(); + if (!sessions){ + FAIL_MSG("Unable to register entry for session '%s' on the stats page", thisSessionId.c_str()); + sessionLock.post(); + return 1; + } + // Open the shared memory page containing statistics for each individual connection in this session + connections.reload(thisStreamName, thisHost, thisSid, thisProtocol, thisReqUrl, sessionMode, true, false); + // Initialise global session data + sessions.setHost(thisHost); + sessions.setSessId(thisSessionId); + sessions.setStream(thisStreamName); + sessionLock.post(); + + // Determine session type, since triggers only get run for viewer type sessions + uint64_t thisType = 0; + if (thisSessionId[0] == 'I'){ + INFO_MSG("Started new input session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime)); + thisType = 1; + } + else if (thisSessionId[0] == 'O'){ + INFO_MSG("Started new output session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime)); + thisType = 2; + } + else{ + INFO_MSG("Started new viewer session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime)); + } + + // Do a USER_NEW trigger if it is defined for this stream + if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){ + std::string payload = thisStreamName + "\n" + thisHost + "\n" + + thisSid + "\n" + thisProtocol + + "\n" + thisReqUrl + "\n" + thisSessionId; + if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ + // Mark all connections of this session as finished, since this viewer is not allowed to view this stream + connections.setExit(); + connections.finishAll(); + } + } + + uint64_t lastSecond = 0; + uint64_t now = 0; + uint64_t time = 0; + uint64_t down = 0; + uint64_t up = 0; + uint64_t pktcount = 0; + uint64_t pktloss = 0; + uint64_t pktretrans = 0; + std::string connector = ""; + // Stay active until Mist exits or we no longer have an active connection + while (config.is_active && (currentConnections || Util::bootSecs() - lastSeen <= 10)){ + time = 0; + connector = ""; + down = 0; + up = 0; + pktcount = 0; + pktloss = 0; + pktretrans = 0; + currentConnections = 0; + + // Count active connections + COMM_LOOP(connections, userOnActive(currentConnections), userOnDisconnect(connections, id)); + // Loop through all connection entries to get a summary of statistics + for (uint64_t idx = 0; idx < connections.recordCount(); idx++){ + if (connections.getStatus(idx) == COMM_STATUS_INVALID || connections.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;} + uint64_t thisLastSecond = connections.getLastSecond(idx); + std::string thisConnector = connections.getConnector(idx); + // Save info on the latest active connection separately + if (thisLastSecond > lastSecond){ + lastSecond = thisLastSecond; + now = connections.getNow(idx); + } + connectorLastActive[thisConnector] = thisLastSecond; + // Sum all other variables + time += connections.getTime(idx); + down += connections.getDown(idx); + up += connections.getUp(idx); + pktcount += connections.getPacketCount(idx); + pktloss += connections.getPacketLostCount(idx); + pktretrans += connections.getPacketRetransmitCount(idx); + } + + // Convert connector duration to string + std::stringstream connectorSummary; + bool addDelimiter = false; + connectorSummary << "{"; + for (std::map::iterator it = connectorLastActive.begin(); + it != connectorLastActive.end(); ++it){ + if (lastSecond - it->second < 10000){ + connectorSummary << (addDelimiter ? "," : "") << it->first; + addDelimiter = true; + } + } + connectorSummary << "}"; + + // Write summary to global statistics + sessions.setTime(time + globalTime); + sessions.setDown(down + globalDown); + sessions.setUp(up + globalUp); + sessions.setPacketCount(pktcount + globalPktcount); + sessions.setPacketLostCount(pktloss + globalPktloss); + sessions.setPacketRetransmitCount(pktretrans + globalPktretrans); + sessions.setLastSecond(lastSecond); + sessions.setConnector(connectorSummary.str()); + sessions.setNow(now); + + // Retrigger USER_NEW if a re-sync was requested + if (!thisType && forceTrigger){ + forceTrigger = false; + if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){ + INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str()); + std::string payload = thisStreamName + "\n" + thisHost + "\n" + + thisSid + "\n" + thisProtocol + + "\n" + thisReqUrl + "\n" + thisSessionId; + if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ + INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str()); + connections.setExit(); + connections.finishAll(); + }else{ + INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str()); + } + } + } + + // Invalidate connections if the session is marked as invalid + if(connections.getExit()){ + connections.finishAll(); + break; + } + // Remember latest activity so we know when this session ends + if (currentConnections){ + lastSeen = Util::bootSecs(); + } + Util::sleep(1000); + } + + // Trigger USER_END + if (!thisType && Triggers::shouldTrigger("USER_END", thisStreamName)){ + lastSecond = 0; + time = 0; + down = 0; + up = 0; + + // Get a final summary of this session + for (uint64_t idx = 0; idx < connections.recordCount(); idx++){ + if (connections.getStatus(idx) == COMM_STATUS_INVALID || connections.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;} + uint64_t thisLastSecond = connections.getLastSecond(idx); + // Set last second to the latest entry + if (thisLastSecond > lastSecond){ + lastSecond = thisLastSecond; + } + // Count protocol durations across the entire session + std::string thisConnector = connections.getConnector(idx); + if (thisConnector != ""){ + connectorCount[thisConnector] += connections.getTime(idx); + } + // Sum all other variables + time += connections.getTime(idx); + down += connections.getDown(idx); + up += connections.getUp(idx); + } + + // Convert connector duration to string + std::stringstream connectorSummary; + bool addDelimiter = false; + connectorSummary << "{"; + for (std::map::iterator it = connectorCount.begin(); + it != connectorCount.end(); ++it){ + connectorSummary << (addDelimiter ? "," : "") << it->first << ":" << it->second; + addDelimiter = true; + } + connectorSummary << "}"; + + const uint64_t duration = lastSecond - (bootTime / 1000); + std::stringstream summary; + summary << thisSessionId << "\n" + << thisStreamName << "\n" + << connectorSummary.str() << "\n" + << thisHost << "\n" + << duration << "\n" + << up << "\n" + << down << "\n" + << sessions.getTags(); + Triggers::doTrigger("USER_END", summary.str(), thisStreamName); + } + + if (!thisType && connections.getExit()){ + WARN_MSG("Session %s has been invalidated since it is not allowed to view stream %s", thisSessionId.c_str(), thisStreamName.c_str()); + uint64_t sleepStart = Util::bootSecs(); + // Keep session invalidated for 10 minutes, or until the session stops + while (config.is_active && sleepStart - Util::bootSecs() < 600){ + Util::sleep(1000); + } + } + INFO_MSG("Shutting down session %s", thisSessionId.c_str()); + return 0; +}