From 8ac486b815cac711c434422a1b0955486c28bb68 Mon Sep 17 00:00:00 2001 From: Marco van Dijk Date: Wed, 16 Mar 2022 13:46:14 +0100 Subject: [PATCH] Completed new sessions system Co-authored-by: Thulinma --- lib/comms.cpp | 206 ++++++++---- lib/comms.h | 36 +- lib/defines.h | 2 +- lib/hls_support.cpp | 12 +- lib/http_parser.cpp | 6 +- lib/http_parser.h | 2 +- lib/socket.cpp | 10 + lib/socket.h | 1 + lib/websocket.cpp | 22 +- lib/websocket.h | 2 +- src/controller/controller.cpp | 18 + src/controller/controller_api.cpp | 11 +- src/controller/controller_statistics.cpp | 393 +++++++++++++--------- src/controller/controller_statistics.h | 31 +- src/controller/controller_storage.cpp | 28 +- src/controller/controller_storage.h | 1 + src/input/input.cpp | 21 +- src/input/input_rtsp.cpp | 3 +- src/input/input_sdp.cpp | 3 +- src/input/input_ts.cpp | 3 +- src/input/input_tsrist.cpp | 2 +- src/input/input_tsrist.h | 2 +- src/output/output.cpp | 42 ++- src/output/output.h | 3 +- src/output/output_cmaf.cpp | 27 +- src/output/output_hls.cpp | 28 +- src/output/output_http.cpp | 51 ++- src/output/output_http_internal.cpp | 179 +++++----- src/output/output_http_internal.h | 8 +- src/output/output_sdp.cpp | 12 + src/output/output_sdp.h | 2 + src/output/output_ts.cpp | 12 + src/output/output_ts.h | 2 + src/output/output_tsrist.cpp | 16 +- src/output/output_tsrist.h | 4 +- src/session.cpp | 410 +++++++++++++---------- 36 files changed, 991 insertions(+), 620 deletions(-) diff --git a/lib/comms.cpp b/lib/comms.cpp index 85f1e6d7..14faa980 100644 --- a/lib/comms.cpp +++ b/lib/comms.cpp @@ -3,6 +3,7 @@ #include "comms.h" #include "defines.h" #include "encode.h" +#include "stream.h" #include "procs.h" #include "timing.h" #include @@ -10,6 +11,34 @@ #include "config.h" namespace Comms{ + uint8_t sessionViewerMode = SESS_BUNDLE_DEFAULT_VIEWER; + uint8_t sessionInputMode = SESS_BUNDLE_DEFAULT_OTHER; + uint8_t sessionOutputMode = SESS_BUNDLE_DEFAULT_OTHER; + uint8_t sessionUnspecifiedMode = 0; + uint8_t sessionStreamInfoMode = SESS_DEFAULT_STREAM_INFO_MODE; + uint8_t tknMode = SESS_TKN_DEFAULT_MODE; + + /// \brief Refreshes the session configuration if the last update was more than 5 seconds ago + void sessionConfigCache(){ + static uint64_t lastUpdate = 0; + if (Util::bootSecs() > lastUpdate + 5){ + VERYHIGH_MSG("Updating session config"); + JSON::Value tmpVal = Util::getGlobalConfig("sessionViewerMode"); + if (!tmpVal.isNull()){ sessionViewerMode = tmpVal.asInt(); } + tmpVal = Util::getGlobalConfig("sessionInputMode"); + if (!tmpVal.isNull()){ sessionInputMode = tmpVal.asInt(); } + tmpVal = Util::getGlobalConfig("sessionOutputMode"); + if (!tmpVal.isNull()){ sessionOutputMode = tmpVal.asInt(); } + tmpVal = Util::getGlobalConfig("sessionUnspecifiedMode"); + if (!tmpVal.isNull()){ sessionUnspecifiedMode = tmpVal.asInt(); } + tmpVal = Util::getGlobalConfig("sessionStreamInfoMode"); + if (!tmpVal.isNull()){ sessionStreamInfoMode = tmpVal.asInt(); } + tmpVal = Util::getGlobalConfig("tknMode"); + if (!tmpVal.isNull()){ tknMode = tmpVal.asInt(); } + lastUpdate = Util::bootSecs(); + } + } + Comms::Comms(){ index = INVALID_RECORD_INDEX; currentSize = 0; @@ -17,7 +46,7 @@ namespace Comms{ } Comms::~Comms(){ - if (index != INVALID_RECORD_INDEX){ + if (index != INVALID_RECORD_INDEX && status){ setStatus(COMM_STATUS_DISCONNECT | getStatus()); } if (master){ @@ -123,6 +152,10 @@ namespace Comms{ return; } dataAccX = Util::RelAccX(dataPage.mapped); + if (dataAccX.isExit()){ + dataPage.close(); + return; + } fieldAccess(); if (index == INVALID_RECORD_INDEX || reIssue){ size_t reqCount = dataAccX.getRCount(); @@ -170,19 +203,30 @@ namespace Comms{ void Sessions::addFields(){ Connections::addFields(); + dataAccX.addField("tags", RAX_STRING, 512); dataAccX.addField("sessid", RAX_STRING, 80); } void Sessions::nullFields(){ Connections::nullFields(); setSessId(""); + setTags(""); } void Sessions::fieldAccess(){ Connections::fieldAccess(); + tags = dataAccX.getFieldAccX("tags"); sessId = dataAccX.getFieldAccX("sessid"); } + std::string Sessions::getTags() const{return tags.string(index);} + std::string Sessions::getTags(size_t idx) const{return (master ? tags.string(idx) : 0);} + void Sessions::setTags(std::string _sid){tags.set(_sid, index);} + void Sessions::setTags(std::string _sid, size_t idx){ + if (!master){return;} + tags.set(_sid, idx); + } + Users::Users() : Comms(){} Users::Users(const Users &rhs) : Comms(){ @@ -251,31 +295,60 @@ namespace Comms{ keyNum.set(_keyNum, idx); } + + + void Connections::reload(const std::string & sessId, bool _master, bool reIssue){ + // Open SEM_SESSION + if(!sem){ + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, sessId.c_str()); + sem.open(semName, O_RDWR, ACCESSPERMS, 1); + if (!sem){return;} + } + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, sessId.c_str()); + Comms::reload(userPageName, COMMS_SESSIONS_INITSIZE, _master, reIssue); + } + /// \brief Claims a spot on the connections page for the input/output which calls this function /// Starts the MistSession binary for each session, which handles the statistics /// and the USER_NEW and USER_END triggers /// \param streamName: Name of the stream the input is providing or an output is making available to viewers /// \param ip: IP address of the viewer which wants to access streamName. For inputs this value can be set to any value - /// \param sid: Session ID given by the player or randomly generated + /// \param tkn: Session token given by the player or randomly generated /// \param protocol: Protocol currently in use for this connection - /// \param sessionMode: Determines how a viewer session is defined: - // If set to 0, all connections with the same viewer IP and stream name are bundled. - // If set to 1, all connections with the same viewer IP and player ID are bundled. - // If set to 2, all connections with the same player ID and stream name are bundled. - // If set to 3, all connections with the same viewer IP, player ID and stream name are bundled. /// \param _master: If True, we are reading from this page. If False, we are writing (to our entry) on this page /// \param reIssue: If True, claim a new entry on this page - void Connections::reload(std::string streamName, std::string ip, std::string sid, std::string protocol, std::string reqUrl, uint64_t sessionMode, bool _master, bool reIssue){ - if (sessionMode == 0xFFFFFFFFFFFFFFFFull){ - FAIL_MSG("The session mode was not initialised properly. Assuming default behaviour of bundling by viewer IP, stream name and player id"); - sessionMode = SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID; - } + void Connections::reload(const std::string & streamName, const std::string & ip, const std::string & tkn, const std::string & protocol, const std::string & reqUrl, bool _master, bool reIssue){ + initialTkn = tkn; + uint8_t sessMode = sessionViewerMode; // Generate a unique session ID for each viewer, input or output - sessionId = generateSession(streamName, ip, sid, protocol, sessionMode); if (protocol.size() >= 6 && protocol.substr(0, 6) == "INPUT:"){ - sessionId = "I" + sessionId; + sessMode = sessionInputMode; + sessionId = "I" + generateSession(streamName, ip, tkn, protocol, sessMode); }else if (protocol.size() >= 7 && protocol.substr(0, 7) == "OUTPUT:"){ - sessionId = "O" + sessionId; + sessMode = sessionOutputMode; + sessionId = "O" + generateSession(streamName, ip, tkn, protocol, sessMode); + }else{ + // If the session only contains the HTTP connector, check sessionStreamInfoMode + if (protocol.size() == 4 && protocol == "HTTP"){ + if (sessionStreamInfoMode == SESS_HTTP_AS_VIEWER){ + sessionId = generateSession(streamName, ip, tkn, protocol, sessMode); + }else if (sessionStreamInfoMode == SESS_HTTP_AS_OUTPUT){ + sessMode = sessionOutputMode; + sessionId = "O" + generateSession(streamName, ip, tkn, protocol, sessMode); + }else if (sessionStreamInfoMode == SESS_HTTP_DISABLED){ + return; + }else if (sessionStreamInfoMode == SESS_HTTP_AS_UNSPECIFIED){ + // Set sessMode to include all variables when determining the session ID + sessMode = sessionUnspecifiedMode; + sessionId = "U" + generateSession(streamName, ip, tkn, protocol, sessMode); + }else{ + sessionId = generateSession(streamName, ip, tkn, protocol, sessMode); + } + }else{ + sessionId = generateSession(streamName, ip, tkn, protocol, sessMode); + } } char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, sessionId.c_str()); @@ -283,36 +356,59 @@ namespace Comms{ if (!_master){ dataPage.init(userPageName, 0, false, false); if (!dataPage){ + std::string host; + Socket::hostBytesToStr(ip.data(), 16, host); pid_t thisPid; std::deque args; args.push_back(Util::getMyPath() + "MistSession"); args.push_back(sessionId); - args.push_back("--sessionmode"); - args.push_back(JSON::Value(sessionMode).asString()); - args.push_back("--streamname"); - args.push_back(streamName); - args.push_back("--ip"); - args.push_back(ip); - args.push_back("--sid"); - args.push_back(sid); - args.push_back("--protocol"); - args.push_back(protocol); - args.push_back("--requrl"); - args.push_back(reqUrl); + + // First bit defines whether to include stream name + if (sessMode & 0x08){ + args.push_back("--streamname"); + args.push_back(streamName); + }else{ + setenv("SESSION_STREAM", streamName.c_str(), 1); + } + // Second bit defines whether to include viewer ip + if (sessMode & 0x04){ + args.push_back("--ip"); + args.push_back(host); + }else{ + setenv("SESSION_IP", host.c_str(), 1); + } + // Third bit defines whether to include tkn + if (sessMode & 0x02){ + args.push_back("--tkn"); + args.push_back(tkn); + }else{ + setenv("SESSION_TKN", tkn.c_str(), 1); + } + // Fourth bit defines whether to include protocol + if (sessMode & 0x01){ + args.push_back("--protocol"); + args.push_back(protocol); + }else{ + setenv("SESSION_PROTOCOL", protocol.c_str(), 1); + } + setenv("SESSION_REQURL", reqUrl.c_str(), 1); int err = fileno(stderr); thisPid = Util::Procs::StartPiped(args, 0, 0, &err); Util::Procs::forget(thisPid); - HIGH_MSG("Spawned new session executeable (pid %u) for sessionId '%s', corresponding to host %s and stream %s", thisPid, sessionId.c_str(), ip.c_str(), streamName.c_str()); + unsetenv("SESSION_STREAM"); + unsetenv("SESSION_IP"); + unsetenv("SESSION_TKN"); + unsetenv("SESSION_PROTOCOL"); + unsetenv("SESSION_REQURL"); } } - // Open SEM_SESSION - if(!sem){ - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, sessionId.c_str()); - sem.open(semName, O_RDWR, ACCESSPERMS, 1); + reload(sessionId, _master, reIssue); + if (index != INVALID_RECORD_INDEX){ + setConnector(protocol); + setHost(ip); + setStream(streamName); + VERYHIGH_MSG("Reloading connection. Claimed record %lu", index); } - Comms::reload(userPageName, COMMS_SESSIONS_INITSIZE, _master, reIssue); - VERYHIGH_MSG("Reloading connection. Claimed record %lu", index); } /// \brief Marks the data page as closed, so that we longer write any new data to is @@ -341,7 +437,6 @@ namespace Comms{ dataAccX.addField("host", RAX_RAW, 16); dataAccX.addField("stream", RAX_STRING, 100); dataAccX.addField("connector", RAX_STRING, 20); - dataAccX.addField("tags", RAX_STRING, 512); dataAccX.addField("pktcount", RAX_64UINT); dataAccX.addField("pktloss", RAX_64UINT); dataAccX.addField("pktretrans", RAX_64UINT); @@ -349,7 +444,6 @@ namespace Comms{ void Connections::nullFields(){ Comms::nullFields(); - setTags(""); setConnector(""); setStream(""); setHost(""); @@ -373,7 +467,6 @@ namespace Comms{ host = dataAccX.getFieldAccX("host"); stream = dataAccX.getFieldAccX("stream"); connector = dataAccX.getFieldAccX("connector"); - tags = dataAccX.getFieldAccX("tags"); pktcount = dataAccX.getFieldAccX("pktcount"); pktloss = dataAccX.getFieldAccX("pktloss"); pktretrans = dataAccX.getFieldAccX("pktretrans"); @@ -461,14 +554,6 @@ namespace Comms{ return false; } - std::string Connections::getTags() const{return tags.string(index);} - std::string Connections::getTags(size_t idx) const{return (master ? tags.string(idx) : 0);} - void Connections::setTags(std::string _sid){tags.set(_sid, index);} - void Connections::setTags(std::string _sid, size_t idx){ - if (!master){return;} - tags.set(_sid, idx); - } - uint64_t Connections::getPacketCount() const{return pktcount.uint(index);} uint64_t Connections::getPacketCount(size_t idx) const{ return (master ? pktcount.uint(idx) : 0); @@ -501,31 +586,32 @@ namespace Comms{ /// \brief Generates a session ID which is unique per viewer /// \return generated session ID as string - std::string Connections::generateSession(std::string streamName, std::string ip, std::string sid, std::string connector, uint64_t sessionMode){ + std::string Connections::generateSession(const std::string & streamName, const std::string & ip, const std::string & tkn, const std::string & connector, uint64_t sessionMode){ std::string concat; + std::string debugMsg = "Generating session id based on"; // First bit defines whether to include stream name - if (sessionMode > 7){ + if (sessionMode & 0x08){ concat += streamName; - sessionMode -= 8; + debugMsg += " stream name '" + streamName + "'"; } // Second bit defines whether to include viewer ip - if (sessionMode > 3){ + if (sessionMode & 0x04){ concat += ip; - sessionMode -= 4; + std::string ipHex; + Socket::hostBytesToStr(ip.c_str(), ip.size(), ipHex); + debugMsg += " IP '" + ipHex + "'"; } - // Third bit defines whether to include player ip - if (sessionMode > 1){ - concat += sid; - sessionMode -= 2; + // Third bit defines whether to include client-side session token + if (sessionMode & 0x02){ + concat += tkn; + debugMsg += " session token '" + tkn + "'"; } // Fourth bit defines whether to include protocol - if (sessionMode == 1){ + if (sessionMode & 0x01){ concat += connector; - sessionMode = 0; - } - if (sessionMode > 0){ - WARN_MSG("Could not resolve session mode of value %lu", sessionMode); + debugMsg += " protocol '" + connector + "'"; } + VERYHIGH_MSG("%s", debugMsg.c_str()); return Secure::sha256(concat.c_str(), concat.length()); } }// namespace Comms diff --git a/lib/comms.h b/lib/comms.h index 9a5c0ea9..ec36dcb0 100644 --- a/lib/comms.h +++ b/lib/comms.h @@ -9,13 +9,21 @@ #define COMM_STATUS_REQDISCONNECT 0x10 #define COMM_STATUS_ACTIVE 0x1 #define COMM_STATUS_INVALID 0x0 +#define SESS_BUNDLE_DEFAULT_VIEWER 14 +#define SESS_BUNDLE_DEFAULT_OTHER 15 +#define SESS_DEFAULT_STREAM_INFO_MODE 1 +#define SESS_HTTP_AS_VIEWER 1 +#define SESS_HTTP_AS_OUTPUT 2 +#define SESS_HTTP_DISABLED 3 +#define SESS_HTTP_AS_UNSPECIFIED 4 +#define SESS_TKN_DEFAULT_MODE 15 #define COMM_LOOP(comm, onActive, onDisconnect) \ {\ for (size_t id = 0; id < comm.recordCount(); id++){\ if (comm.getStatus(id) == COMM_STATUS_INVALID){continue;}\ - if (!Util::Procs::isRunning(comm.getPid(id))){\ + if (!(comm.getStatus(id) & COMM_STATUS_DISCONNECT) && comm.getPid(id) && !Util::Procs::isRunning(comm.getPid(id))){\ comm.setStatus(COMM_STATUS_DISCONNECT | comm.getStatus(id), id);\ }\ onActive;\ @@ -27,6 +35,14 @@ } namespace Comms{ + extern uint8_t sessionViewerMode; + extern uint8_t sessionInputMode; + extern uint8_t sessionOutputMode; + extern uint8_t sessionUnspecifiedMode; + extern uint8_t sessionStreamInfoMode; + extern uint8_t tknMode; + void sessionConfigCache(); + class Comms{ public: Comms(); @@ -66,11 +82,13 @@ namespace Comms{ class Connections : public Comms{ public: - void reload(std::string streamName, std::string ip, std::string sid, std::string protocol, std::string reqUrl, uint64_t sessionMode, bool _master = false, bool reIssue = false); + void reload(const std::string & streamName, const std::string & ip, const std::string & tkn, const std::string & protocol, const std::string & reqUrl, bool _master = false, bool reIssue = false); + void reload(const std::string & sessId, bool _master = false, bool reIssue = false); void unload(); operator bool() const{return dataPage.mapped && (master || index != INVALID_RECORD_INDEX);} - std::string generateSession(std::string streamName, std::string ip, std::string sid, std::string connector, uint64_t sessionMode); + std::string generateSession(const std::string & streamName, const std::string & ip, const std::string & tkn, const std::string & connector, uint64_t sessionMode); std::string sessionId; + std::string initialTkn; void setExit(); bool getExit(); @@ -79,6 +97,8 @@ namespace Comms{ virtual void nullFields(); virtual void fieldAccess(); + const std::string & getTkn() const{return initialTkn;} + uint64_t getNow() const; uint64_t getNow(size_t idx) const; void setNow(uint64_t _now); @@ -120,11 +140,6 @@ namespace Comms{ void setConnector(std::string _connector, size_t idx); bool hasConnector(size_t idx, std::string protocol); - std::string getTags() const; - std::string getTags(size_t idx) const; - void setTags(std::string _sid); - void setTags(std::string _sid, size_t idx); - uint64_t getPacketCount() const; uint64_t getPacketCount(size_t idx) const; void setPacketCount(uint64_t _count); @@ -197,5 +212,10 @@ namespace Comms{ virtual void addFields(); virtual void nullFields(); virtual void fieldAccess(); + + std::string getTags() const; + std::string getTags(size_t idx) const; + void setTags(std::string _sid); + void setTags(std::string _sid, size_t idx); }; }// namespace Comms diff --git a/lib/defines.h b/lib/defines.h index 203b75db..d08d1d04 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -231,7 +231,7 @@ static inline void show_stackframe(){} #define SEM_TRACKLIST "/MstTRKS%s" //%s stream name #define SEM_SESSION "MstSess%s" #define SEM_SESSCACHE "/MstSessCacheLock" -#define SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID 14 +#define SESS_TIMEOUT 600 // Session timeout in seconds #define SHM_CAPA "MstCapa" #define SHM_PROTO "MstProt" #define SHM_PROXY "MstProx" diff --git a/lib/hls_support.cpp b/lib/hls_support.cpp index c311a7ac..130e691d 100644 --- a/lib/hls_support.cpp +++ b/lib/hls_support.cpp @@ -274,7 +274,7 @@ namespace HLS{ if (trackData.mediaFormat == ".ts"){return;} result << "#EXT-X-MAP:URI=\"" << trackData.urlPrefix << "init" << trackData.mediaFormat; - if (trackData.sessionId.size()){result << "?sessId=" << trackData.sessionId;} + if (trackData.sessionId.size()){result << "?tkn=" << trackData.sessionId;} result << "\"\r\n"; } @@ -327,7 +327,7 @@ namespace HLS{ result << "?msn=" << fragData.currentFrag; result << "&mTrack=" << trackData.timingTrackId; result << "&dur=" << fragData.duration; - if (trackData.sessionId.size()){result << "&sessId=" << trackData.sessionId;} + if (trackData.sessionId.size()){result << "&tkn=" << trackData.sessionId;} result << "\r\n"; } @@ -341,7 +341,7 @@ namespace HLS{ result << "?msn=" << fragData.currentFrag; result << "&mTrack=" << trackData.timingTrackId; result << "&dur=" << duration; - if (trackData.sessionId.size()){result << "&sessId=" << trackData.sessionId;} + if (trackData.sessionId.size()){result << "&tkn=" << trackData.sessionId;} result << "\""; // NOTE: INDEPENDENT tags, specified ONLY for VIDEO tracks, indicate the first partial fragment @@ -448,7 +448,7 @@ namespace HLS{ result << "?msn=" << fragData.currentFrag - 1; result << "&mTrack=" << trackData.timingTrackId; result << "&dur=" << partDurationMaxMs; - if (trackData.sessionId.size()){result << "&sessId=" << trackData.sessionId;} + if (trackData.sessionId.size()){result << "&tkn=" << trackData.sessionId;} result << "\"\r\n"; } @@ -509,7 +509,7 @@ namespace HLS{ result << ",NAME=\"" << name << "\",URI=\"" << trackId << "/index.m3u8"; result << "?mTrack=" << masterData.mainTrack; result << "&iMsn=" << iFrag; - if (masterData.hasSessId){result << "&sessId=" << masterData.sessId;} + if (masterData.sessId.size()){result << "&tkn=" << masterData.sessId;} if (masterData.noLLHLS){result << "&llhls=0";} result << "\"\r\n"; } @@ -529,7 +529,7 @@ namespace HLS{ result << "/index.m3u8"; result << "?mTrack=" << masterData.mainTrack; result << "&iMsn=" << iFrag; - if (masterData.hasSessId){result << "&sessId=" << masterData.sessId;} + if (masterData.sessId.size()){result << "&tkn=" << masterData.sessId;} if (masterData.noLLHLS){result << "&llhls=0";} result << "\r\n"; } diff --git a/lib/http_parser.cpp b/lib/http_parser.cpp index c08e3d56..113fdb51 100644 --- a/lib/http_parser.cpp +++ b/lib/http_parser.cpp @@ -742,13 +742,13 @@ bool HTTP::Parser::parse(std::string &HTTPbuffer, Util::DataCallback &cb){ /// HTTP variable parser to std::map structure. /// Reads variables from data, decodes and stores them to storage. -void HTTP::parseVars(const std::string &data, std::map &storage){ +void HTTP::parseVars(const std::string &data, std::map &storage, const std::string & separator){ std::string varname; std::string varval; // position where a part starts (e.g. after &) size_t pos = 0; while (pos < data.length()){ - size_t nextpos = data.find('&', pos); + size_t nextpos = data.find(separator, pos); if (nextpos == std::string::npos){nextpos = data.length();} size_t eq_pos = data.find('=', pos); if (eq_pos < nextpos){ @@ -769,7 +769,7 @@ void HTTP::parseVars(const std::string &data, std::map break; } // erase & - pos = nextpos + 1; + pos = nextpos + separator.size(); } } diff --git a/lib/http_parser.h b/lib/http_parser.h index 9c843f20..a5528df3 100644 --- a/lib/http_parser.h +++ b/lib/http_parser.h @@ -14,7 +14,7 @@ namespace HTTP{ /// HTTP variable parser to std::map structure. /// Reads variables from data, decodes and stores them to storage. - void parseVars(const std::string &data, std::map &storage); + void parseVars(const std::string &data, std::map &storage, const std::string & separator = "&"); /// Simple class for reading and writing HTTP 1.0 and 1.1. class Parser : public Util::DataCallback{ diff --git a/lib/socket.cpp b/lib/socket.cpp index 6d63af00..47b2b6bd 100644 --- a/lib/socket.cpp +++ b/lib/socket.cpp @@ -166,6 +166,8 @@ bool Socket::isBinAddress(const std::string &binAddr, std::string addr){ /// Converts the given address with optional subnet to binary IPv6 form. /// Returns 16 bytes of address, followed by 1 byte of subnet bits, zero or more times. std::string Socket::getBinForms(std::string addr){ + // Check for empty address + if (!addr.size()){return std::string(17, (char)0);} // Check if we need to do prefix matching uint8_t prefixLen = 128; if (addr.find('/') != std::string::npos){ @@ -1796,6 +1798,14 @@ void Socket::UDPConnection::GetDestination(std::string &destIp, uint32_t &port){ FAIL_MSG("Could not get destination for UDP socket"); }// Socket::UDPConnection GetDestination +/// Gets the properties of the receiving end of this UDP socket. +/// This will be the receiving end for all SendNow calls. +std::string Socket::UDPConnection::getBinDestination(){ + std::string binList = getIPv6BinAddr(*(sockaddr_in6*)destAddr); + if (binList.size() < 16){ return std::string("\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000", 16); } + return binList.substr(0, 16); +}// Socket::UDPConnection GetDestination + /// Returns the port number of the receiving end of this socket. /// Returns 0 on error. uint32_t Socket::UDPConnection::getDestPort() const{ diff --git a/lib/socket.h b/lib/socket.h index a85fbf39..b369ac84 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -215,6 +215,7 @@ namespace Socket{ void setBlocking(bool blocking); void SetDestination(std::string hostname, uint32_t port); void GetDestination(std::string &hostname, uint32_t &port); + std::string getBinDestination(); const void * getDestAddr(){return destAddr;} size_t getDestAddrLen(){return destAddr_size;} std::string getBoundAddress(); diff --git a/lib/websocket.cpp b/lib/websocket.cpp index 05703b55..c82b4cba 100644 --- a/lib/websocket.cpp +++ b/lib/websocket.cpp @@ -73,31 +73,31 @@ namespace HTTP{ } /// Takes an incoming HTTP::Parser request for a Websocket, and turns it into one. - Websocket::Websocket(Socket::Connection &c, HTTP::Parser &h) : C(c){ + Websocket::Websocket(Socket::Connection &c, const HTTP::Parser &req, HTTP::Parser &resp) : C(c){ frameType = 0; maskOut = false; - std::string connHeader = h.GetHeader("Connection"); + std::string connHeader = req.GetHeader("Connection"); Util::stringToLower(connHeader); if (connHeader.find("upgrade") == std::string::npos){ FAIL_MSG("Could not negotiate websocket, connection header incorrect (%s).", connHeader.c_str()); C.close(); return; } - std::string upgradeHeader = h.GetHeader("Upgrade"); + std::string upgradeHeader = req.GetHeader("Upgrade"); Util::stringToLower(upgradeHeader); if (upgradeHeader != "websocket"){ FAIL_MSG("Could not negotiate websocket, upgrade header incorrect (%s).", upgradeHeader.c_str()); C.close(); return; } - if (h.GetHeader("Sec-WebSocket-Version") != "13"){ + if (req.GetHeader("Sec-WebSocket-Version") != "13"){ FAIL_MSG("Could not negotiate websocket, version incorrect (%s).", - h.GetHeader("Sec-WebSocket-Version").c_str()); + req.GetHeader("Sec-WebSocket-Version").c_str()); C.close(); return; } #ifdef SSL - std::string client_key = h.GetHeader("Sec-WebSocket-Key"); + std::string client_key = req.GetHeader("Sec-WebSocket-Key"); if (!client_key.size()){ FAIL_MSG("Could not negotiate websocket, missing key!"); C.close(); @@ -105,15 +105,13 @@ namespace HTTP{ } #endif - h.Clean(); - h.setCORSHeaders(); - h.SetHeader("Upgrade", "websocket"); - h.SetHeader("Connection", "Upgrade"); + resp.SetHeader("Upgrade", "websocket"); + resp.SetHeader("Connection", "Upgrade"); #ifdef SSL - h.SetHeader("Sec-WebSocket-Accept", calculateKeyAccept(client_key)); + resp.SetHeader("Sec-WebSocket-Accept", calculateKeyAccept(client_key)); #endif // H.SetHeader("Sec-WebSocket-Protocol", "json"); - h.SendResponse("101", "Websocket away!", C); + resp.SendResponse("101", "Websocket away!", C); } /// Loops calling readFrame until the connection is closed, sleeping in between reads if needed. diff --git a/lib/websocket.h b/lib/websocket.h index 819f18c9..07861a19 100644 --- a/lib/websocket.h +++ b/lib/websocket.h @@ -7,7 +7,7 @@ namespace HTTP{ class Websocket{ public: - Websocket(Socket::Connection &c, HTTP::Parser &h); + Websocket(Socket::Connection &c, const HTTP::Parser &req, HTTP::Parser &resp); Websocket(Socket::Connection &c, const HTTP::URL & url, std::map * headers = 0); Websocket(Socket::Connection &c, bool client); operator bool() const; diff --git a/src/controller/controller.cpp b/src/controller/controller.cpp index 9e17ca6b..972c7ed7 100644 --- a/src/controller/controller.cpp +++ b/src/controller/controller.cpp @@ -309,6 +309,24 @@ int main_loop(int argc, char **argv){ Controller::Storage["config"]["prometheus"] = Controller::conf.getString("prometheus"); Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog"); Controller::normalizeTrustedProxies(Controller::Storage["config"]["trustedproxy"]); + if (!Controller::Storage["config"]["sessionViewerMode"]){ + Controller::Storage["config"]["sessionViewerMode"] = SESS_BUNDLE_DEFAULT_VIEWER; + } + if (!Controller::Storage["config"]["sessionInputMode"]){ + Controller::Storage["config"]["sessionInputMode"] = SESS_BUNDLE_DEFAULT_OTHER; + } + if (!Controller::Storage["config"]["sessionOutputMode"]){ + Controller::Storage["config"]["sessionOutputMode"] = SESS_BUNDLE_DEFAULT_OTHER; + } + if (!Controller::Storage["config"]["sessionUnspecifiedMode"]){ + Controller::Storage["config"]["sessionUnspecifiedMode"] = 0; + } + if (!Controller::Storage["config"]["sessionStreamInfoMode"]){ + Controller::Storage["config"]["sessionStreamInfoMode"] = SESS_DEFAULT_STREAM_INFO_MODE; + } + if (!Controller::Storage["config"].isMember("tknMode")){ + Controller::Storage["config"]["tknMode"] = SESS_TKN_DEFAULT_MODE; + } Controller::prometheus = Controller::Storage["config"]["prometheus"].asStringRef(); Controller::accesslog = Controller::Storage["config"]["accesslog"].asStringRef(); Controller::writeConfig(); diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 02491065..4e935aba 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -188,7 +188,9 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){ std::string logs = H.GetVar("logs"); std::string accs = H.GetVar("accs"); bool doStreams = H.GetVar("streams").size(); - HTTP::Websocket W(C, H); + HTTP::Parser req = H; + H.Clean(); + HTTP::Websocket W(C, req, H); if (!W){return;} IPC::sharedPage shmLogs(SHM_STATE_LOGS, 1024 * 1024); @@ -594,7 +596,12 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ out["prometheus"] = in["prometheus"]; Controller::prometheus = out["prometheus"].asStringRef(); } - if (in.isMember("sessionMode")){out["sessionMode"] = in["sessionMode"];} + if (in.isMember("sessionViewerMode")){out["sessionViewerMode"] = in["sessionViewerMode"];} + if (in.isMember("sessionInputMode")){out["sessionInputMode"] = in["sessionInputMode"];} + if (in.isMember("sessionOutputMode")){out["sessionOutputMode"] = in["sessionOutputMode"];} + if (in.isMember("sessionUnspecifiedMode")){out["sessionUnspecifiedMode"] = in["sessionUnspecifiedMode"];} + if (in.isMember("sessionStreamInfoMode")){out["sessionStreamInfoMode"] = in["sessionStreamInfoMode"];} + if (in.isMember("tknMode")){out["tknMode"] = in["tknMode"];} if (in.isMember("defaultStream")){out["defaultStream"] = in["defaultStream"];} if (in.isMember("location") && in["location"].isObject()){ out["location"]["lat"] = in["location"]["lat"].asDouble(); diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index 6e51f4ee..ffb89897 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -58,6 +58,11 @@ static uint64_t cpu_use = 0; char noBWCountMatches[1717]; uint64_t bwLimit = 128 * 1024 * 1024; // gigabit default limit +const char nullAddress[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; +static Controller::statLog emptyLogEntry = {0, 0, 0, 0, 0, 0 ,0 ,0, "", nullAddress, ""}; +bool notEmpty(const Controller::statLog & dta){ + return dta.time || dta.firstActive || dta.lastSecond || dta.down || dta.up || dta.streamName.size() || dta.connectors.size(); +} // For server-wide totals. Local to this file only. struct streamTotals{ @@ -66,9 +71,11 @@ struct streamTotals{ uint64_t inputs; uint64_t outputs; uint64_t viewers; + uint64_t unspecified; uint64_t currIns; uint64_t currOuts; uint64_t currViews; + uint64_t currUnspecified; uint8_t status; uint64_t viewSeconds; uint64_t packSent; @@ -84,6 +91,7 @@ static uint64_t servDownBytes = 0; static uint64_t servUpOtherBytes = 0; static uint64_t servDownOtherBytes = 0; static uint64_t servInputs = 0; +static uint64_t servUnspecified = 0; static uint64_t servOutputs = 0; static uint64_t servViewers = 0; static uint64_t servSeconds = 0; @@ -95,17 +103,19 @@ static uint64_t viewSecondsTotal = 0; // Mapping of streamName -> summary of stream-wide statistics static std::map streamStats; -// If sessId does not exist yet in streamStats, create and init an entry for it -static void createEmptyStatsIfNeeded(const std::string & sessId){ - if (streamStats.count(sessId)){return;} - streamTotals & sT = streamStats[sessId]; +// If streamName does not exist yet in streamStats, create and init an entry for it +static void createEmptyStatsIfNeeded(const std::string & streamName){ + if (streamStats.count(streamName)){return;} + streamTotals & sT = streamStats[streamName]; sT.upBytes = 0; sT.downBytes = 0; sT.inputs = 0; sT.outputs = 0; sT.viewers = 0; + sT.unspecified = 0; sT.currIns = 0; sT.currOuts = 0; + sT.currUnspecified = 0; sT.currViews = 0; sT.status = 0; sT.viewSeconds = 0; @@ -335,15 +345,23 @@ void Controller::SharedMemStats(void *config){ it->second.currViews = 0; it->second.currIns = 0; it->second.currOuts = 0; + it->second.currUnspecified = 0; } } // wipe old statistics and set session type counters if (sessions.size()){ std::list mustWipe; - uint64_t cutOffPoint = Util::bootSecs() - STAT_CUTOFF; + // Ensure cutOffPoint is either time of boot or 10 minutes ago, whichever is closer. + // Prevents wrapping around to high values close to system boot time. + uint64_t cutOffPoint = Util::bootSecs(); + if (cutOffPoint > STAT_CUTOFF){ + cutOffPoint -= STAT_CUTOFF; + }else{ + cutOffPoint = 0; + } for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ // This part handles ending sessions, keeping them in cache for now - if (it->second.getEnd() < cutOffPoint && it->second.newestDataPoint() < cutOffPoint){ + if (it->second.getEnd() < cutOffPoint){ viewSecondsTotal += it->second.getConnTime(); mustWipe.push_back(it->first); // Don't count this session as a viewer @@ -353,19 +371,24 @@ void Controller::SharedMemStats(void *config){ switch (it->second.getSessType()){ case SESS_UNSET: break; case SESS_VIEWER: - if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - streamStats[it->first].currViews++; + if (it->second.hasDataFor(tOut)){ + streamStats[it->second.getStreamName()].currViews++; } servSeconds += it->second.getConnTime(); break; case SESS_INPUT: - if (it->second.hasDataFor(tIn) && it->second.isViewerOn(tIn)){ - streamStats[it->first].currIns++; + if (it->second.hasDataFor(tIn)){ + streamStats[it->second.getStreamName()].currIns++; } break; case SESS_OUTPUT: - if (it->second.hasDataFor(tOut) && it->second.isViewerOn(tOut)){ - streamStats[it->first].currOuts++; + if (it->second.hasDataFor(tOut)){ + streamStats[it->second.getStreamName()].currOuts++; + } + break; + case SESS_UNSPECIFIED: + if (it->second.hasDataFor(tOut)){ + streamStats[it->second.getStreamName()].currUnspecified++; } break; } @@ -406,6 +429,7 @@ void Controller::SharedMemStats(void *config){ strmStats->setInt("viewers", it->second.currViews, strmPos); strmStats->setInt("inputs", it->second.currIns, strmPos); strmStats->setInt("outputs", it->second.currOuts, strmPos); + strmStats->setInt("unspecified", it->second.currUnspecified, strmPos); ++strmPos; } } @@ -489,18 +513,34 @@ void Controller::killConnections(std::string sessId){ /// Updates the given active connection with new stats data. void Controller::statSession::update(uint64_t index, Comms::Sessions &statComm){ - if (host == ""){ - Socket::hostBytesToStr(statComm.getHost(index).data(), 16, host); - } - if (streamName == ""){ - streamName = statComm.getStream(index); - } - if (curConnector == ""){ - curConnector = statComm.getConnector(index); - } if (sessId == ""){ sessId = statComm.getSessId(index); } + + if (sessionType == SESS_UNSET){ + if (sessId[0] == 'I'){ + sessionType = SESS_INPUT; + }else if (sessId[0] == 'O'){ + sessionType = SESS_OUTPUT; + }else if (sessId[0] == 'U'){ + sessionType = SESS_UNSPECIFIED; + }else{ + sessionType = SESS_VIEWER; + } + } + + uint64_t prevNow = curData.log.size() ? curData.log.rbegin()->first : 0; + // only parse last received data, if newer + if (prevNow > statComm.getNow(index)){return;}; + long long prevDown = getDown(); + long long prevUp = getUp(); + uint64_t prevPktSent = getPktCount(); + uint64_t prevPktLost = getPktLost(); + uint64_t prevPktRetrans = getPktRetransmit(); + uint64_t prevFirstActive = getFirstActive(); + + curData.update(statComm, index); + const std::string& streamName = getStreamName(); // Export tags to session if (tags.size()){ std::stringstream tagStream; @@ -508,28 +548,11 @@ void Controller::statSession::update(uint64_t index, Comms::Sessions &statComm){ tagStream << "[" << *it << "]"; } statComm.setTags(tagStream.str(), index); + } else { + statComm.setTags("", index); } - - long long prevDown = getDown(); - long long prevUp = getUp(); - uint64_t prevPktSent = getPktCount(); - uint64_t prevPktLost = getPktLost(); - uint64_t prevPktRetrans = getPktRetransmit(); - curData.update(statComm, index); - // store timestamp of first received data, if older - if (firstSec > statComm.getNow(index)){firstSec = statComm.getNow(index);} - uint64_t secIncr = 0; - // store timestamp of last received data, if newer - if (statComm.getNow(index) > lastSec){ - lastSec = statComm.getNow(index); - if (!tracked){ - tracked = true; - firstActive = firstSec; - }else{ - secIncr = (statComm.getNow(index) - lastSec); - } - lastSec = statComm.getNow(index); - } + + uint64_t secIncr = prevFirstActive ? (statComm.getNow(index) - prevNow) : 0; long long currDown = getDown(); long long currUp = getUp(); uint64_t currPktSent = getPktCount(); @@ -537,7 +560,7 @@ void Controller::statSession::update(uint64_t index, Comms::Sessions &statComm){ uint64_t currPktRetrans = getPktRetransmit(); if (currUp - prevUp < 0 || currDown - prevDown < 0){ INFO_MSG("Negative data usage! %lldu/%lldd (u%lld->%lld) in %s over %s, #%" PRIu64, currUp - prevUp, - currDown - prevDown, prevUp, currUp, streamName.c_str(), curConnector.c_str(), index); + currDown - prevDown, prevUp, currUp, streamName.c_str(), curData.log.rbegin()->second.connectors.c_str(), index); }else{ if (!noBWCount){ size_t bwMatchOffset = 0; @@ -567,40 +590,38 @@ void Controller::statSession::update(uint64_t index, Comms::Sessions &statComm){ servPackRetrans += currPktRetrans - prevPktRetrans; } } - if (sessionType == SESS_UNSET){ - if (curConnector.size() >= 5 && curConnector.substr(0, 5) == "INPUT"){ - ++servInputs; - createEmptyStatsIfNeeded(streamName); - streamStats[streamName].inputs++; - streamStats[streamName].currIns++; - sessionType = SESS_INPUT; - }else if (curConnector.size() >= 6 && curConnector.substr(0, 6) == "OUTPUT"){ - ++servOutputs; - createEmptyStatsIfNeeded(streamName); - streamStats[streamName].outputs++; - streamStats[streamName].currOuts++; - sessionType = SESS_OUTPUT; - }else{ - ++servViewers; - createEmptyStatsIfNeeded(streamName); - streamStats[streamName].viewers++; - streamStats[streamName].currViews++; - sessionType = SESS_VIEWER; + if (!prevFirstActive && streamName.size()){ + createEmptyStatsIfNeeded(streamName); + switch(sessionType){ + case SESS_INPUT: + ++servInputs; + streamStats[streamName].inputs++; + break; + case SESS_OUTPUT: + ++servOutputs; + streamStats[streamName].outputs++; + break; + case SESS_VIEWER: + ++servViewers; + streamStats[streamName].viewers++; + break; + case SESS_UNSPECIFIED: + ++servUnspecified; + streamStats[streamName].unspecified++; + break; + case SESS_UNSET: + break; } } // Only count connections that are countable if (noBWCount != 2){ - if (!streamName.size() || streamName[0] == 0){ - if (streamStats.count(streamName)){streamStats.erase(streamName);} - }else{ - createEmptyStatsIfNeeded(streamName); - streamStats[streamName].upBytes += currUp - prevUp; - streamStats[streamName].downBytes += currDown - prevDown; - streamStats[streamName].packSent += currPktSent - prevPktSent; - streamStats[streamName].packLoss += currPktLost - prevPktLost; - streamStats[streamName].packRetrans += currPktRetrans - prevPktRetrans; - if (sessionType == SESS_VIEWER){streamStats[streamName].viewSeconds += secIncr;} - } + createEmptyStatsIfNeeded(streamName); + streamStats[streamName].upBytes += currUp - prevUp; + streamStats[streamName].downBytes += currDown - prevDown; + streamStats[streamName].packSent += currPktSent - prevPktSent; + streamStats[streamName].packLoss += currPktLost - prevPktLost; + streamStats[streamName].packRetrans += currPktRetrans - prevPktRetrans; + if (sessionType == SESS_VIEWER){streamStats[streamName].viewSeconds += secIncr;} } } @@ -608,21 +629,10 @@ Controller::sessType Controller::statSession::getSessType(){ return sessionType; } -Controller::statSession::~statSession(){ - if (!tracked){return;} - switch (sessionType){ - case SESS_INPUT: - if (streamStats.count(streamName) && streamStats[streamName].currIns){streamStats[streamName].currIns--;} - break; - case SESS_OUTPUT: - if (streamStats.count(streamName) && streamStats[streamName].currOuts){streamStats[streamName].currOuts--;} - break; - case SESS_VIEWER: - if (streamStats.count(streamName) && streamStats[streamName].currViews){streamStats[streamName].currViews--;} - break; - default: break; - } - uint64_t duration = lastSec - firstActive; +/// Ends the currently active session by inserting a null datapoint one second after the last datapoint +void Controller::statSession::finish(){ + if (!getFirstActive()){return;} + uint64_t duration = getEnd() - getFirstActive(); if (duration < 1){duration = 1;} std::stringstream tagStream; if (tags.size()){ @@ -630,6 +640,9 @@ Controller::statSession::~statSession(){ tagStream << "[" << *it << "]"; } } + const std::string& streamName = getStreamName(); + const std::string& curConnector = getConnectors(); + const std::string& host = getStrHost(); Controller::logAccess(sessId, streamName, curConnector, host, duration, getUp(), getDown(), tagStream.str()); if (Controller::accesslog.size()){ @@ -668,74 +681,99 @@ Controller::statSession::~statSession(){ } } } - tracked = false; - firstActive = 0; - firstSec = 0xFFFFFFFFFFFFFFFFull; - lastSec = 0; - sessionType = SESS_UNSET; + tags.clear(); + // Insert null datapoint + curData.log[curData.log.rbegin()->first + 1] = emptyLogEntry; } /// Constructs an empty session Controller::statSession::statSession(){ - firstActive = 0; - tracked = false; - firstSec = 0xFFFFFFFFFFFFFFFFull; - lastSec = 0; sessionType = SESS_UNSET; noBWCount = 0; - streamName = ""; - host = ""; - curConnector = ""; sessId = ""; } /// Returns the first measured timestamp in this session. uint64_t Controller::statSession::getStart(){ - return firstSec; + if (!curData.log.size()){return 0;} + return curData.log.begin()->first; } /// Returns the last measured timestamp in this session. uint64_t Controller::statSession::getEnd(){ - return lastSec; + if (!curData.log.size()){return 0;} + return curData.log.rbegin()->first; } /// Returns true if there is data for this session at timestamp t. bool Controller::statSession::hasDataFor(uint64_t t){ - if (lastSec < t){return false;} - if (firstSec > t){return false;} if (curData.hasDataFor(t)){return true;} return false; } -/// Returns true if this session should count as a viewer on the given timestamp. -bool Controller::statSession::isViewerOn(uint64_t t){ - return getUp(t) + getDown(t); -} - -std::string Controller::statSession::getStreamName(){ - return streamName; -} - -std::string Controller::statSession::getHost(){ - return host; -} - -std::string Controller::statSession::getSessId(){ +const std::string& Controller::statSession::getSessId(){ return sessId; } -std::string Controller::statSession::getCurrentProtocols(){ - return curConnector; +uint64_t Controller::statSession::getFirstActive(){ + if (curData.log.size()){ + return curData.log.rbegin()->second.firstActive; + } + return 0; } -/// Returns true if this session should be considered connected -uint64_t Controller::statSession::newestDataPoint(){ - return lastSec; +const std::string& Controller::statSession::getStreamName(uint64_t t){ + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).streamName; + } + return emptyLogEntry.streamName; } -/// Returns true if this session has started (tracked == true) but not yet ended (log entry written) -bool Controller::statSession::isTracked(){ - return tracked; +const std::string& Controller::statSession::getStreamName(){ + if (curData.log.size()){ + return curData.log.rbegin()->second.streamName; + } + return emptyLogEntry.streamName; +} + +std::string Controller::statSession::getStrHost(uint64_t t){ + std::string host; + Socket::hostBytesToStr(getHost(t).data(), 16, host); + return host; +} + +std::string Controller::statSession::getStrHost(){ + std::string host; + Socket::hostBytesToStr(getHost().data(), 16, host); + return host; +} + +const std::string& Controller::statSession::getHost(uint64_t t){ + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).host; + } + return emptyLogEntry.host; +} + +const std::string& Controller::statSession::getHost(){ + if (curData.log.size()){ + return curData.log.rbegin()->second.host; + } + return emptyLogEntry.host; +} + +const std::string& Controller::statSession::getConnectors(uint64_t t){ + if (curData.hasDataFor(t)){ + return curData.getDataFor(t).connectors; + } + return emptyLogEntry.connectors; +} + +const std::string& Controller::statSession::getConnectors(){ + if (curData.log.size()){ + return curData.log.rbegin()->second.connectors; + } + return emptyLogEntry.connectors; } /// Returns the cumulative connected time for this session at timestamp t. @@ -842,7 +880,7 @@ uint64_t Controller::statSession::getPktRetransmit(){ /// Returns the cumulative downloaded bytes per second for this session at timestamp t. uint64_t Controller::statSession::getBpsDown(uint64_t t){ uint64_t aTime = t - 5; - if (aTime < firstSec){aTime = firstSec;} + if (aTime < curData.log.begin()->first){aTime = curData.log.begin()->first;} if (t <= aTime){return 0;} uint64_t valA = getDown(aTime); uint64_t valB = getDown(t); @@ -852,7 +890,7 @@ uint64_t Controller::statSession::getBpsDown(uint64_t t){ /// Returns the cumulative uploaded bytes per second for this session at timestamp t. uint64_t Controller::statSession::getBpsUp(uint64_t t){ uint64_t aTime = t - 5; - if (aTime < firstSec){aTime = firstSec;} + if (aTime < curData.log.begin()->first){aTime = curData.log.begin()->first;} if (t <= aTime){return 0;} uint64_t valA = getUp(aTime); uint64_t valB = getUp(t); @@ -867,17 +905,8 @@ bool Controller::statStorage::hasDataFor(unsigned long long t){ /// Returns a reference to the most current data available at timestamp t. Controller::statLog &Controller::statStorage::getDataFor(unsigned long long t){ - static statLog empty; if (!log.size()){ - empty.time = 0; - empty.lastSecond = 0; - empty.down = 0; - empty.up = 0; - empty.pktCount = 0; - empty.pktLost = 0; - empty.pktRetransmit = 0; - empty.connectors = ""; - return empty; + return emptyLogEntry; } std::map::iterator it = log.upper_bound(t); if (it != log.begin()){it--;} @@ -889,6 +918,11 @@ Controller::statLog &Controller::statStorage::getDataFor(unsigned long long t){ void Controller::statStorage::update(Comms::Sessions &statComm, size_t index){ statLog tmp; tmp.time = statComm.getTime(index); + if (!log.size() || !log.rbegin()->second.firstActive){ + tmp.firstActive = statComm.getNow(index); + } else{ + tmp.firstActive = log.rbegin()->second.firstActive; + } tmp.lastSecond = statComm.getLastSecond(index); tmp.down = statComm.getDown(index); tmp.up = statComm.getUp(index); @@ -896,9 +930,19 @@ void Controller::statStorage::update(Comms::Sessions &statComm, size_t index){ tmp.pktLost = statComm.getPacketLostCount(index); tmp.pktRetransmit = statComm.getPacketRetransmitCount(index); tmp.connectors = statComm.getConnector(index); + tmp.streamName = statComm.getStream(index); + tmp.host = statComm.getHost(index); log[statComm.getNow(index)] = tmp; // wipe data older than STAT_CUTOFF seconds - while (log.size() && log.begin()->first < Util::bootSecs() - STAT_CUTOFF){log.erase(log.begin());} + // Ensure cutOffPoint is either time of boot or 10 minutes ago, whichever is closer. + // Prevents wrapping around to high values close to system boot time. + uint64_t cutOffPoint = Util::bootSecs(); + if (cutOffPoint > STAT_CUTOFF){ + cutOffPoint -= STAT_CUTOFF; + }else{ + cutOffPoint = 0; + } + while (log.size() && log.begin()->first < cutOffPoint){log.erase(log.begin());} } void Controller::statLeadIn(){ @@ -915,6 +959,7 @@ void Controller::statOnActive(size_t id){ void Controller::statOnDisconnect(size_t id){ // Check to see if cleanup is required (when a Session binary fails) const std::string thisSessionId = statComm.getSessId(id); + sessions[thisSessionId].finish(); // Try to lock to see if the session crashed during boot IPC::semaphore sessionLock; char semName[NAME_BUFFER_SIZE]; @@ -932,7 +977,7 @@ void Controller::statOnDisconnect(size_t id){ if(dataPage){ // Session likely crashed while it was running dataPage.init(userPageName, 1, true); - FAIL_MSG("Session '%s' got canceled unexpectedly. Hoovering up the left overs...", thisSessionId.c_str()); + FAIL_MSG("Session '%s' got cancelled unexpectedly. Cleaning up the leftovers...", thisSessionId.c_str()); } // Finally remove the session lock which was created on bootup of the session sessionLock.unlink(); @@ -999,13 +1044,23 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ if (req.isMember("time")){reqTime = req["time"].asInt();} // to make sure no nasty timing business takes place, we store the case "now" as a bool. bool now = (reqTime == 0); - //if greater than current bootsecs, assume unix time and subtract epoch from it - if (reqTime > (int64_t)epoch - STAT_CUTOFF){reqTime -= (epoch-bSecs);} + //if in the last 600 seconds of unix time (or higher), assume unix time and subtract epoch from it + if (reqTime > (int64_t)epoch - STAT_CUTOFF){reqTime -= Controller::systemBoot/1000;} // add the current time, if negative or zero. if (reqTime < 0){reqTime += bSecs;} - if (reqTime == 0){reqTime = bSecs - STAT_CUTOFF;} + if (reqTime == 0){ + // Ensure cutOffPoint is either time of boot or 10 minutes ago, whichever is closer. + // Prevents wrapping around to high values close to system boot time. + uint64_t cutOffPoint = bSecs; + if (cutOffPoint > STAT_CUTOFF){ + cutOffPoint -= STAT_CUTOFF; + }else{ + cutOffPoint = 0; + } + reqTime = cutOffPoint; + } // at this point, we have the absolute timestamp in bootsecs. - rep["time"] = reqTime + (epoch-bSecs); // fill the absolute timestamp + rep["time"] = reqTime + (Controller::systemBoot/1000); // fill the absolute timestamp unsigned int fields = 0; // next, figure out the fields wanted @@ -1062,13 +1117,14 @@ void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ if (now && reqTime - it->second.getEnd() < 5){time = it->second.getEnd();} // data present and wanted? insert it! if ((it->second.getEnd() >= time && it->second.getStart() <= time) && - (!streams.size() || streams.count(it->second.getStreamName())) && - (!protos.size() || protos.count(it->second.getCurrentProtocols()))){ - if (it->second.hasDataFor(time)){ + (!streams.size() || streams.count(it->second.getStreamName(time))) && + (!protos.size() || protos.count(it->second.getConnectors(time)))){ + const statLog & dta = it->second.curData.getDataFor(time); + if (notEmpty(dta)){ JSON::Value d; - if (fields & STAT_CLI_HOST){d.append(it->second.getHost());} - if (fields & STAT_CLI_STREAM){d.append(it->second.getStreamName());} - if (fields & STAT_CLI_PROTO){d.append(it->second.getCurrentProtocols());} + if (fields & STAT_CLI_HOST){d.append(it->second.getStrHost(time));} + if (fields & STAT_CLI_STREAM){d.append(it->second.getStreamName(time));} + if (fields & STAT_CLI_PROTO){d.append(it->second.getConnectors(time));} if (fields & STAT_CLI_CONNTIME){d.append(it->second.getConnTime(time));} if (fields & STAT_CLI_POSITION){d.append(it->second.getLastSecond(time));} if (fields & STAT_CLI_DOWN){d.append(it->second.getDown(time));} @@ -1252,6 +1308,8 @@ void Controller::fillActive(JSON::Value &req, JSON::Value &rep){ F = it->second.currIns; }else if (j->asStringRef() == "outputs"){ F = it->second.currOuts; + }else if (j->asStringRef() == "unspecified"){ + F = it->second.currUnspecified; }else if (j->asStringRef() == "views"){ F = it->second.viewers; }else if (j->asStringRef() == "viewseconds"){ @@ -1323,6 +1381,7 @@ public: clients = 0; inputs = 0; outputs = 0; + unspecified = 0; downbps = 0; upbps = 0; pktCount = 0; @@ -1334,6 +1393,7 @@ public: case Controller::SESS_VIEWER: clients++; break; case Controller::SESS_INPUT: inputs++; break; case Controller::SESS_OUTPUT: outputs++; break; + case Controller::SESS_UNSPECIFIED: unspecified++; break; default: break; } downbps += down; @@ -1345,6 +1405,7 @@ public: uint64_t clients; uint64_t inputs; uint64_t outputs; + uint64_t unspecified; uint64_t downbps; uint64_t upbps; uint64_t pktCount; @@ -1363,11 +1424,21 @@ void Controller::fillTotals(JSON::Value &req, JSON::Value &rep){ if (req.isMember("start")){reqStart = req["start"].asInt();} if (req.isMember("end")){reqEnd = req["end"].asInt();} //if the reqStart or reqEnd is greater than current bootsecs, assume unix time and subtract epoch from it - if (reqStart > (int64_t)epoch - STAT_CUTOFF){reqStart -= (epoch-bSecs);} - if (reqEnd > (int64_t)epoch - STAT_CUTOFF){reqEnd -= (epoch-bSecs);} + if (reqStart > (int64_t)epoch - STAT_CUTOFF){reqStart -= Controller::systemBoot/1000;} + if (reqEnd > (int64_t)epoch - STAT_CUTOFF){reqEnd -= Controller::systemBoot/1000;} // add the current time, if negative or zero. if (reqStart < 0){reqStart += bSecs;} - if (reqStart == 0){reqStart = bSecs - STAT_CUTOFF;} + if (reqStart == 0){ + // Ensure cutOffPoint is either time of boot or 10 minutes ago, whichever is closer. + // Prevents wrapping around to high values close to system boot time. + uint64_t cutOffPoint = bSecs; + if (cutOffPoint > STAT_CUTOFF){ + cutOffPoint -= STAT_CUTOFF; + }else{ + cutOffPoint = 0; + } + reqStart = cutOffPoint; + } if (reqEnd <= 0){reqEnd += bSecs;} // at this point, reqStart and reqEnd are the absolute timestamp in bootsecs. if (reqEnd < reqStart){reqEnd = reqStart;} @@ -1417,7 +1488,7 @@ void Controller::fillTotals(JSON::Value &req, JSON::Value &rep){ if ((it->second.getEnd() >= (unsigned long long)reqStart || it->second.getStart() <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.getStreamName())) && - (!protos.size() || protos.count(it->second.getCurrentProtocols()))){ + (!protos.size() || protos.count(it->second.getConnectors()))){ for (unsigned long long i = reqStart; i <= reqEnd; ++i){ if (it->second.hasDataFor(i)){ totalsCount[i].add(it->second.getBpsDown(i), it->second.getBpsUp(i), it->second.getSessType(), it->second.getPktCount(), it->second.getPktLost(), it->second.getPktRetransmit()); @@ -1436,8 +1507,8 @@ void Controller::fillTotals(JSON::Value &req, JSON::Value &rep){ return; } // yay! We have data! - rep["start"] = totalsCount.begin()->first + (epoch-bSecs); - rep["end"] = totalsCount.rbegin()->first + (epoch-bSecs); + rep["start"] = totalsCount.begin()->first + (Controller::systemBoot/1000); + rep["end"] = totalsCount.rbegin()->first + (Controller::systemBoot/1000); rep["data"].null(); rep["interval"].null(); uint64_t prevT = 0; @@ -1506,12 +1577,15 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int uint32_t totViewers = 0; uint32_t totInputs = 0; uint32_t totOutputs = 0; + uint32_t totUnspecified = 0; for (uint64_t idx = 0; idx < statComm.recordCount(); idx++){ if (statComm.getStatus(idx) == COMM_STATUS_INVALID || statComm.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;} const std::string thisSessId = statComm.getSessId(idx); // Count active viewers, inputs, outputs and protocols if (thisSessId[0] == 'I'){ totInputs++; + }else if (thisSessId[0] == 'U'){ + totUnspecified++; }else if (thisSessId[0] == 'O'){ totOutputs++; outputs[statComm.getConnector(idx)]++; @@ -1602,6 +1676,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "# TYPE mist_sessions_count counter\n"; response << "mist_sessions_count{sessType=\"viewers\"}" << servViewers << "\n"; response << "mist_sessions_count{sessType=\"incoming\"}" << servInputs << "\n"; + response << "mist_sessions_count{sessType=\"unspecified\"}" << servUnspecified << "\n"; response << "mist_sessions_count{sessType=\"outgoing\"}" << servOutputs << "\n\n"; response << "# HELP mist_bw_total Count of bytes handled since server start, by direction.\n"; @@ -1637,6 +1712,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int response << "mist_sessions_total{sessType=\"viewers\"}" << totViewers << "\n"; response << "mist_sessions_total{sessType=\"incoming\"}" << totInputs << "\n"; response << "mist_sessions_total{sessType=\"outgoing\"}" << totOutputs << "\n"; + response << "mist_sessions_total{sessType=\"unspecified\"}" << totUnspecified << "\n"; response << "mist_sessions_total{sessType=\"cached\"}" << sessions.size() << "\n"; response << "\n# HELP mist_viewcount Count of unique viewer sessions since stream start, per " @@ -1656,6 +1732,8 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int << it->second.currIns << "\n"; response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"outgoing\"}" << it->second.currOuts << "\n"; + response << "mist_sessions{stream=\"" << it->first << "\",sessType=\"unspecified\"}" + << it->second.currUnspecified << "\n"; response << "mist_viewcount{stream=\"" << it->first << "\"}" << it->second.viewers << "\n"; response << "mist_viewseconds{stream=\"" << it->first << "\"} " << it->second.viewSeconds << "\n"; response << "mist_bw{stream=\"" << it->first << "\",direction=\"up\"}" << it->second.upBytes << "\n"; @@ -1691,9 +1769,11 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int resp["curr"].append(totViewers); resp["curr"].append(totInputs); resp["curr"].append(totOutputs); + resp["curr"].append(totUnspecified); resp["tot"].append(servViewers); resp["tot"].append(servInputs); resp["tot"].append(servOutputs); + resp["tot"].append(servUnspecified); resp["st"].append(bw_up_total); resp["st"].append(bw_down_total); resp["bw"].append(servUpBytes); @@ -1735,6 +1815,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int resp["streams"][it->first]["curr"].append(it->second.currViews); resp["streams"][it->first]["curr"].append(it->second.currIns); resp["streams"][it->first]["curr"].append(it->second.currOuts); + resp["streams"][it->first]["curr"].append(it->second.currUnspecified); resp["streams"][it->first]["pkts"].append(it->second.packSent); resp["streams"][it->first]["pkts"].append(it->second.packLoss); resp["streams"][it->first]["pkts"].append(it->second.packRetrans); diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index f798f811..69605942 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -28,16 +28,19 @@ namespace Controller{ struct statLog{ uint64_t time; + uint64_t firstActive; uint64_t lastSecond; uint64_t down; uint64_t up; uint64_t pktCount; uint64_t pktLost; uint64_t pktRetransmit; + std::string streamName; + std::string host; std::string connectors; }; - enum sessType{SESS_UNSET = 0, SESS_INPUT, SESS_OUTPUT, SESS_VIEWER}; + enum sessType{SESS_UNSET = 0, SESS_INPUT, SESS_OUTPUT, SESS_VIEWER, SESS_UNSPECIFIED}; class statStorage{ public: @@ -51,34 +54,30 @@ namespace Controller{ /// Allows for moving of connections to another session. class statSession{ private: - uint64_t firstActive; - uint64_t firstSec; - uint64_t lastSec; sessType sessionType; - bool tracked; uint8_t noBWCount; ///< Set to 2 when not to count for external bandwidth - std::string streamName; - std::string host; - std::string curConnector; std::string sessId; public: statSession(); - ~statSession(); + void finish(); statStorage curData; std::set tags; sessType getSessType(); void update(uint64_t index, Comms::Sessions &data); uint64_t getStart(); uint64_t getEnd(); - bool isViewerOn(uint64_t time); - bool isTracked(); bool hasDataFor(uint64_t time); - std::string getStreamName(); - std::string getHost(); - std::string getSessId(); - std::string getCurrentProtocols(); - uint64_t newestDataPoint(); + const std::string& getSessId(); + const std::string& getStreamName(uint64_t t); + const std::string& getStreamName(); + std::string getStrHost(uint64_t t); + std::string getStrHost(); + const std::string& getHost(uint64_t t); + const std::string& getHost(); + const std::string& getConnectors(uint64_t t); + const std::string& getConnectors(); + uint64_t getFirstActive(); uint64_t getConnTime(uint64_t time); uint64_t getConnTime(); uint64_t getLastSecond(uint64_t time); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index bdd52893..532ee4fe 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -196,6 +196,7 @@ namespace Controller{ rlxStrm->addField("viewers", RAX_64UINT); rlxStrm->addField("inputs", RAX_64UINT); rlxStrm->addField("outputs", RAX_64UINT); + rlxStrm->addField("unspecified", RAX_64UINT); rlxStrm->setReady(); } rlxStrm->setRCount((1024 * 1024 - rlxStrm->getOffset()) / rlxStrm->getRSize()); @@ -433,12 +434,17 @@ namespace Controller{ // if fields missing, recreate the page if (globAccX.isReady()){ - if(globAccX.getFieldAccX("systemBoot")){ + if(globAccX.getFieldAccX("systemBoot") && globAccX.getInt("systemBoot")){ systemBoot = globAccX.getInt("systemBoot"); } if(!globAccX.getFieldAccX("defaultStream") || !globAccX.getFieldAccX("systemBoot") - || !globAccX.getFieldAccX("sessionMode")){ + || !globAccX.getFieldAccX("sessionViewerMode") + || !globAccX.getFieldAccX("sessionInputMode") + || !globAccX.getFieldAccX("sessionOutputMode") + || !globAccX.getFieldAccX("sessionUnspecifiedMode") + || !globAccX.getFieldAccX("sessionStreamInfoMode") + || !globAccX.getFieldAccX("tknMode")){ globAccX.setReload(); globCfg.master = true; globCfg.close(); @@ -449,16 +455,24 @@ namespace Controller{ if (!globAccX.isReady()){ globAccX.addField("defaultStream", RAX_128STRING); globAccX.addField("systemBoot", RAX_64UINT); - globAccX.addField("sessionMode", RAX_64UINT); - if (!Storage["config"]["sessionMode"]){ - Storage["config"]["sessionMode"] = SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID; - } + globAccX.addField("sessionViewerMode", RAX_64UINT); + globAccX.addField("sessionInputMode", RAX_64UINT); + globAccX.addField("sessionOutputMode", RAX_64UINT); + globAccX.addField("sessionUnspecifiedMode", RAX_64UINT); + globAccX.addField("sessionStreamInfoMode", RAX_64UINT); + globAccX.addField("tknMode", RAX_64UINT); globAccX.setRCount(1); globAccX.setEndPos(1); globAccX.setReady(); } globAccX.setString("defaultStream", Storage["config"]["defaultStream"].asStringRef()); - globAccX.setInt("sessionMode", Storage["config"]["sessionMode"].asInt()); + globAccX.setInt("sessionViewerMode", Storage["config"]["sessionViewerMode"].asInt()); + globAccX.setInt("sessionInputMode", Storage["config"]["sessionInputMode"].asInt()); + globAccX.setInt("sessionOutputMode", Storage["config"]["sessionOutputMode"].asInt()); + globAccX.setInt("sessionUnspecifiedMode", Storage["config"]["sessionUnspecifiedMode"].asInt()); + globAccX.setInt("sessionStreamInfoMode", Storage["config"]["sessionStreamInfoMode"].asInt()); + globAccX.setInt("tknMode", Storage["config"]["tknMode"].asInt()); + globAccX.setInt("systemBoot", systemBoot); globCfg.master = false; // leave the page after closing } } diff --git a/src/controller/controller_storage.h b/src/controller/controller_storage.h index 6339cfd0..974d989b 100644 --- a/src/controller/controller_storage.h +++ b/src/controller/controller_storage.h @@ -16,6 +16,7 @@ namespace Controller{ extern bool isTerminal; ///< True if connected to a terminal and not a log file. extern bool isColorized; ///< True if we colorize the output extern uint64_t logCounter; ///< Count of logged messages since boot + extern uint64_t systemBoot; ///< Unix time in milliseconds of system boot Util::RelAccX *logAccessor(); Util::RelAccX *accesslogAccessor(); diff --git a/src/input/input.cpp b/src/input/input.cpp index 0524f55b..782dc4ef 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -502,6 +502,7 @@ namespace Mist{ } int Input::run(){ + Comms::sessionConfigCache(); if (streamStatus){streamStatus.mapped[0] = STRMSTAT_BOOT;} checkHeaderTimes(config->getString("input")); if (needHeader()){ @@ -623,6 +624,8 @@ namespace Mist{ /// ~~~~~~~~~~~~~~~ void Input::serve(){ users.reload(streamName, true); + Comms::Connections statComm; + uint64_t startTime = Util::bootSecs(); if (!M){ // Initialize meta page @@ -636,6 +639,7 @@ namespace Mist{ meta.setSource(config->getString("input")); bool internalOnly = (config->getString("input").find("INTERNAL_ONLY") != std::string::npos); + bool isBuffer = (capa["name"].asStringRef() == "Buffer"); /*LTS-START*/ if (Triggers::shouldTrigger("STREAM_READY", config->getString("streamname"))){ @@ -666,6 +670,18 @@ namespace Mist{ }else{ if (connectedUsers && M.getValidTracks().size()){activityCounter = Util::bootSecs();} } + // Connect to stats for INPUT detection + if (!internalOnly && !isBuffer){ + if (!statComm){statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "");} + if (statComm){ + uint64_t now = Util::bootSecs(); + statComm.setNow(now); + statComm.setStream(streamName); + statComm.setTime(now - startTime); + statComm.setLastSecond(0); + connStats(statComm); + } + } // if not shutting down, wait 1 second before looping if (config->is_active){Util::wait(INPUT_USER_INTERVAL);} } @@ -820,7 +836,7 @@ namespace Mist{ if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection - if (!statComm){statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);} + if (!statComm){statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "");} if (statComm){ if (!statComm){ config->is_active = false; @@ -830,7 +846,6 @@ namespace Mist{ uint64_t now = Util::bootSecs(); statComm.setNow(now); statComm.setStream(streamName); - statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setTime(now - startTime); statComm.setLastSecond(0); connStats(statComm); @@ -984,7 +999,7 @@ namespace Mist{ if (Util::bootSecs() - statTimer > 1){ // Connect to stats for INPUT detection - if (!statComm){statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);} + if (!statComm){statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "");} if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; diff --git a/src/input/input_rtsp.cpp b/src/input/input_rtsp.cpp index ebb812e1..25be6e23 100644 --- a/src/input/input_rtsp.cpp +++ b/src/input/input_rtsp.cpp @@ -210,7 +210,7 @@ namespace Mist{ if (lastSecs != currSecs){ lastSecs = currSecs; // Connect to stats for INPUT detection - statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID); + statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), ""); if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -225,7 +225,6 @@ namespace Mist{ statComm.setDown(tcpCon.dataDown()); statComm.setTime(now - startTime); statComm.setLastSecond(0); - statComm.setHost(getConnectedBinHost()); } } } diff --git a/src/input/input_sdp.cpp b/src/input/input_sdp.cpp index 3169a836..0b8ddeb5 100644 --- a/src/input/input_sdp.cpp +++ b/src/input/input_sdp.cpp @@ -202,7 +202,7 @@ namespace Mist{ if (lastSecs != currSecs){ lastSecs = currSecs; // Connect to stats for INPUT detection - statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID); + statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), ""); if (statComm){ if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -217,7 +217,6 @@ namespace Mist{ statComm.setUp(bytesUp); statComm.setTime(now - startTime); statComm.setLastSecond(0); - statComm.setHost(getConnectedBinHost()); } } // If the error flag is raised or we are lacking data, try to recover diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 23311dc0..e405e3b6 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -621,7 +621,7 @@ namespace Mist{ // Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 1){ // Connect to stats for INPUT detection - statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID); + statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), ""); if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -636,7 +636,6 @@ namespace Mist{ statComm.setDown(downCounter + tcpCon.dataDown()); statComm.setTime(now - startTime); statComm.setLastSecond(0); - statComm.setHost(getConnectedBinHost()); } std::set activeTracks = liveStream.getActiveTracks(); diff --git a/src/input/input_tsrist.cpp b/src/input/input_tsrist.cpp index 5b95aa9f..df658da1 100644 --- a/src/input/input_tsrist.cpp +++ b/src/input/input_tsrist.cpp @@ -284,7 +284,7 @@ namespace Mist{ } - void inputTSRIST::connStats(Comms::Statistics &statComm){ + void inputTSRIST::connStats(Comms::Connections &statComm){ statComm.setUp(0); statComm.setDown(downBytes); statComm.setHost(getConnectedBinHost()); diff --git a/src/input/input_tsrist.h b/src/input/input_tsrist.h index 731f9b04..a75f6601 100644 --- a/src/input/input_tsrist.h +++ b/src/input/input_tsrist.h @@ -30,7 +30,7 @@ namespace Mist{ int64_t timeStampOffset; uint64_t lastTimeStamp; - virtual void connStats(Comms::Statistics &statComm); + virtual void connStats(Comms::Connections &statComm); struct rist_ctx *receiver_ctx; diff --git a/src/output/output.cpp b/src/output/output.cpp index 50d67549..d525b8ea 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -92,7 +92,7 @@ namespace Mist{ firstTime = 0; firstPacketTime = 0xFFFFFFFFFFFFFFFFull; lastPacketTime = 0; - sid = ""; + tkn = ""; parseData = false; wantRequest = true; sought = false; @@ -111,7 +111,6 @@ namespace Mist{ lastPushUpdate = 0; previousFile = ""; currentFile = ""; - sessionMode = 0xFFFFFFFFFFFFFFFFull; lastRecv = Util::bootSecs(); if (myConn){ @@ -230,7 +229,7 @@ namespace Mist{ bool Output::isReadyForPlay(){ // If a protocol does not support any codecs, we assume you know what you're doing if (!capa.isMember("codecs")){return true;} - if (!isInitialized){initialize();} + if (!isInitialized){return false;} meta.reloadReplacedPagesIfNeeded(); if (getSupportedTracks().size()){ size_t minTracks = 2; @@ -277,6 +276,7 @@ namespace Mist{ /// Assumes streamName class member has been set already. /// Will start input if not currently active, calls onFail() if this does not succeed. void Output::reconnect(){ + Comms::sessionConfigCache(); thisPacket.null(); if (config->hasOption("noinput") && config->getBool("noinput")){ Util::sanitizeName(streamName); @@ -347,11 +347,10 @@ namespace Mist{ isInitialized = true; //Connect to stats reporting, if not connected already - if (!statComm){ - statComm.reload(streamName, getConnectedHost(), sid, capa["name"].asStringRef(), reqUrl, sessionMode); - stats(true); - } - + stats(true); + //Abort if the stats code shut us down just now + if (!isInitialized){return;} + //push inputs do not need to wait for stream to be ready for playback if (isPushing()){return;} @@ -1216,7 +1215,7 @@ namespace Mist{ /// request URL (if any) /// ~~~~~~~~~~~~~~~ int Output::run(){ - sessionMode = Util::getGlobalConfig("sessionMode").asInt(); + Comms::sessionConfigCache(); /*LTS-START*/ // Connect to file target, if needed if (isFileTarget()){ @@ -1257,6 +1256,7 @@ namespace Mist{ /*LTS-END*/ DONTEVEN_MSG("MistOut client handler started"); while (keepGoing() && (wantRequest || parseData)){ + Comms::sessionConfigCache(); if (wantRequest){requestHandler();} if (parseData){ if (!isInitialized){ @@ -1779,27 +1779,35 @@ namespace Mist{ } } - if (!statComm){statComm.reload(streamName, getConnectedHost(), sid, capa["name"].asStringRef(), reqUrl, sessionMode);} - if (!statComm){return;} - if (statComm.getExit()){ - onFail("Shutting down since this session is not allowed to view this stream"); - return; + // Disable stats for HTTP internal output + if (Comms::sessionStreamInfoMode == SESS_HTTP_DISABLED && capa["name"].asStringRef() == "HTTP"){return;} + + // Set the token to the pid for outputs which do not generate it in the requestHandler + if (!tkn.size()){ tkn = JSON::Value(getpid()).asString(); } + + if (!statComm){ + statComm.reload(streamName, getConnectedBinHost(), tkn, getStatsName(), reqUrl); } + if (!statComm || statComm.getExit()){ + onFail("Shutting down since this session is not allowed to view this stream"); + statComm.unload(); + return; + } lastStats = now; VERYHIGH_MSG("Writing stats: %s, %s, %s, %" PRIu64 ", %" PRIu64, getConnectedHost().c_str(), streamName.c_str(), - sid.c_str(), myConn.dataUp(), myConn.dataDown()); + tkn.c_str(), myConn.dataUp(), myConn.dataDown()); /*LTS-START*/ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ onFail("Shutting down on controller request"); + statComm.unload(); return; } /*LTS-END*/ statComm.setNow(now); - statComm.setConnector(getStatsName()); connStats(now, statComm); - statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0); + statComm.setLastSecond(thisPacket ? thisPacket.getTime()/1000 : 0); statComm.setPid(getpid()); /*LTS-START*/ diff --git a/src/output/output.h b/src/output/output.h index 173b3840..441c06ab 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -130,8 +130,7 @@ namespace Mist{ Comms::Connections statComm; bool isBlocking; ///< If true, indicates that myConn is blocking. - std::string sid; ///< Random identifier used to split connections into sessions - uint64_t sessionMode; + std::string tkn; ///< Random identifier used to split connections into sessions uint64_t nextKeyTime(); // stream delaying variables diff --git a/src/output/output_cmaf.cpp b/src/output/output_cmaf.cpp index c2fbc5bc..eb7c87da 100644 --- a/src/output/output_cmaf.cpp +++ b/src/output/output_cmaf.cpp @@ -222,24 +222,17 @@ namespace Mist{ void OutCMAF::sendHlsMasterManifest(){ selectDefaultTracks(); - std::string sessId = ""; - if (hasSessionIDs()){ - std::string ua = UA + JSON::Value(getpid()).asString(); - crc = checksum::crc32(0, ua.data(), ua.size()); - sessId = JSON::Value(crc).asString(); - } - // check for forced "no low latency" parameter bool noLLHLS = H.GetVar("llhls").size() ? H.GetVar("llhls") == "0" : false; // Populate the struct that will help generate the master playlist const HLS::MasterData masterData ={ - hasSessionIDs(), + false,//hasSessionIDs, unused noLLHLS, hlsMediaFormat == ".ts", getMainSelectedTrack(), H.GetHeader("User-Agent"), - sessId, + (Comms::tknMode & 0x04)?tkn:"", systemBoot, bootMsOffset, }; @@ -261,11 +254,8 @@ namespace Mist{ // Chunkpath & Session ID logic std::string urlPrefix = ""; - std::string sessId = ""; if (config->getString("chunkpath").size()){ urlPrefix = HTTP::URL(config->getString("chunkpath")).link("./" + H.url).link("./").getUrl(); - }else{ - sessId = H.GetVar("sessId"); } // check for forced "no low latency" parameter @@ -279,7 +269,7 @@ namespace Mist{ noLLHLS, hlsMediaFormat, M.getEncryption(requestTid), - sessId, + (Comms::tknMode & 0x04)?tkn:"", timingTid, requestTid, M.biggestFragment(timingTid) / 1000, @@ -346,6 +336,16 @@ namespace Mist{ std::string url = H.url.substr(H.url.find('/', 6) + 1); HTTP::URL req(reqUrl); + + if (tkn.size()){ + if (Comms::tknMode & 0x08){ + const std::string koekjes = H.GetHeader("Cookie"); + std::stringstream cookieHeader; + cookieHeader << "tkn=" << tkn << "; Max-Age=" << SESS_TIMEOUT; + H.SetHeader("Set-Cookie", cookieHeader.str()); + } + } + // Send a dash manifest for any URL with .mpd in the path if (req.getExt() == "mpd"){ sendDashManifest(); @@ -438,6 +438,7 @@ namespace Mist{ H.SendResponse("400", "Bad Request: Could not parse the url", myConn); return; } + std::string headerData = CMAF::keyHeader(M, idx, startTime, targetTime, fragmentIndex, false, false); diff --git a/src/output/output_hls.cpp b/src/output/output_hls.cpp index ad810aa7..112e7ee2 100644 --- a/src/output/output_hls.cpp +++ b/src/output/output_hls.cpp @@ -11,7 +11,7 @@ const std::string hlsMediaFormat = ".ts"; namespace Mist{ bool OutHLS::isReadyForPlay(){ - if (!isInitialized){initialize();} + if (!isInitialized){return false;} meta.reloadReplacedPagesIfNeeded(); if (!M.getValidTracks().size()){return false;} uint32_t mainTrack = M.mainTrack(); @@ -110,25 +110,17 @@ namespace Mist{ ///\return The master playlist file for (LL)HLS. void OutHLS::sendHlsMasterManifest(){ selectDefaultTracks(); - - std::string sessId = ""; - if (hasSessionIDs()){ - std::string ua = UA + JSON::Value(getpid()).asString(); - crc = checksum::crc32(0, ua.data(), ua.size()); - sessId = JSON::Value(crc).asString(); - } - // check for forced "no low latency" parameter bool noLLHLS = H.GetVar("llhls").size() ? H.GetVar("llhls") == "0" : false; // Populate the struct that will help generate the master playlist const HLS::MasterData masterData ={ - hasSessionIDs(), + false,//hasSessionIDs, unused noLLHLS, hlsMediaFormat == ".ts", getMainSelectedTrack(), H.GetHeader("User-Agent"), - sessId, + (Comms::tknMode & 0x04)?tkn:"", systemBoot, bootMsOffset, }; @@ -150,11 +142,8 @@ namespace Mist{ // Chunkpath & Session ID logic std::string urlPrefix = ""; - std::string sessId = ""; if (config->getString("chunkpath").size()){ urlPrefix = HTTP::URL(config->getString("chunkpath")).link("./" + H.url).link("./").getUrl(); - }else{ - sessId = H.GetVar("sessId"); } // check for forced "no low latency" parameter @@ -168,7 +157,7 @@ namespace Mist{ noLLHLS, hlsMediaFormat, M.getEncryption(requestTid), - sessId, + (Comms::tknMode & 0x04)?tkn:"", timingTid, requestTid, M.biggestFragment(timingTid) / 1000, @@ -226,6 +215,15 @@ namespace Mist{ bootMsOffset = 0; if (M.getLive()){bootMsOffset = M.getBootMsOffset();} + if (tkn.size()){ + if (Comms::tknMode & 0x08){ + const std::string koekjes = H.GetHeader("Cookie"); + std::stringstream cookieHeader; + cookieHeader << "tkn=" << tkn << "; Max-Age=" << SESS_TIMEOUT; + H.SetHeader("Set-Cookie", cookieHeader.str()); + } + } + if (H.url == "/crossdomain.xml"){ H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Server", APPIDENT); diff --git a/src/output/output_http.cpp b/src/output/output_http.cpp index f55b974d..ed8fa33b 100644 --- a/src/output/output_http.cpp +++ b/src/output/output_http.cpp @@ -217,7 +217,9 @@ namespace Mist{ myConn.close(); return; } - if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName){ + + //Check if we need to change binary and/or reconnect + if (handler != capa["name"].asStringRef() || H.GetVar("stream") != streamName || (statComm && (statComm.getHost() != getConnectedBinHost() || statComm.getTkn() != tkn))){ MEDIUM_MSG("Switching from %s (%s) to %s (%s)", capa["name"].asStringRef().c_str(), streamName.c_str(), handler.c_str(), H.GetVar("stream").c_str()); streamName = H.GetVar("stream"); @@ -268,21 +270,32 @@ namespace Mist{ realTime = 0; } } - // Get session ID cookie or generate a random one if it wasn't set - if (!sid.size()){ + // Read the session token + if (Comms::tknMode & 0x01){ + // Get session token from the request url + if (H.GetVar("tkn") != ""){ + tkn = H.GetVar("tkn"); + } else if (H.GetVar("sid") != ""){ + tkn = H.GetVar("sid"); + } else if (H.GetVar("sessId") != ""){ + tkn = H.GetVar("sessId"); + } + } + if ((Comms::tknMode & 0x02) && !tkn.size()){ + // Get session token from the request cookie std::map storage; const std::string koekjes = H.GetHeader("Cookie"); - HTTP::parseVars(koekjes, storage); - if (storage.count("sid")){ - // Get sid cookie, which is used to divide connections into sessions - sid = storage.at("sid"); - }else{ - // Else generate one - const std::string newSid = UA + JSON::Value(getpid()).asString(); - sid = JSON::Value(checksum::crc32(0, newSid.data(), newSid.size())).asString(); - H.SetHeader("sid", sid.c_str()); + HTTP::parseVars(koekjes, storage, "; "); + if (storage.count("tkn")){ + tkn = storage.at("tkn"); } } + // Generate a session token if it is being sent as a cookie or url parameter and we couldn't read one + if (!tkn.size() && Comms::tknMode > 3){ + const std::string newTkn = UA + JSON::Value(getpid()).asString(); + tkn = JSON::Value(checksum::crc32(0, newTkn.data(), newTkn.size())).asString(); + HIGH_MSG("Generated tkn '%s'", tkn.c_str()); + } // Handle upgrade to websocket if the output supports it std::string upgradeHeader = H.GetHeader("Upgrade"); Util::stringToLower(upgradeHeader); @@ -290,7 +303,9 @@ namespace Mist{ INFO_MSG("Switching to Websocket mode"); setBlocking(false); preWebsocketConnect(); - webSock = new HTTP::Websocket(myConn, H); + HTTP::Parser req = H; + H.Clean(); + webSock = new HTTP::Websocket(myConn, req, H); if (!(*webSock)){ delete webSock; webSock = 0; @@ -333,6 +348,14 @@ namespace Mist{ void HTTPOutput::respondHTTP(const HTTP::Parser & req, bool headersOnly){ //We generally want the CORS headers to be set for all responses H.setCORSHeaders(); + H.SetHeader("Server", APPIDENT); + if (tkn.size()){ + if (Comms::tknMode & 0x08){ + std::stringstream cookieHeader; + cookieHeader << "tkn=" << tkn << "; Max-Age=" << SESS_TIMEOUT; + H.SetHeader("Set-Cookie", cookieHeader.str()); + } + } //Set attachment header to force download, if applicable if (req.GetVar("dl").size()){ //If we want to download, and the string contains a dot, use as-is. @@ -395,6 +418,8 @@ namespace Mist{ ///\brief Handles requests by starting a corresponding output process. ///\param connector The type of connector to be invoked. void HTTPOutput::reConnector(std::string &connector){ + // Clear tkn in order to deal with reverse proxies + tkn = ""; // taken from CheckProtocols (controller_connectors.cpp) char *argarr[32]; for (int i = 0; i < 32; i++){argarr[i] = 0;} diff --git a/src/output/output_http_internal.cpp b/src/output/output_http_internal.cpp index 97ca455f..be955196 100644 --- a/src/output/output_http_internal.cpp +++ b/src/output/output_http_internal.cpp @@ -76,11 +76,11 @@ namespace Mist{ std::string method = H.method; // send logo icon if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){ - sendIcon(); + sendIcon(false); return; } if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".html"){ - HTMLResponse(); + HTMLResponse(H, false); return; } if (H.url.size() >= 3 && H.url.substr(H.url.size() - 3) == ".js"){ @@ -337,9 +337,9 @@ namespace Mist{ } } - void OutHTTP::HTMLResponse(){ - std::string method = H.method; - HTTP::URL fullURL(H.GetHeader("Host")); + void OutHTTP::HTMLResponse(const HTTP::Parser & req, bool headersOnly){ + HTTPOutput::respondHTTP(req, headersOnly); + HTTP::URL fullURL(req.GetHeader("Host")); if (!fullURL.protocol.size()){fullURL.protocol = getProtocolForPort(fullURL.getPort());} if (config->getString("pubaddr") != ""){ HTTP::URL altURL(config->getString("pubaddr")); @@ -349,24 +349,22 @@ namespace Mist{ fullURL.path = altURL.path; } if (mistPath.size()){fullURL = mistPath;} - std::string uAgent = H.GetHeader("User-Agent"); + std::string uAgent = req.GetHeader("User-Agent"); std::string forceType = ""; if (H.GetVar("forcetype").size()){ - forceType = ",forceType:\"" + H.GetVar("forcetype") + "\""; + forceType = ",forceType:\"" + req.GetVar("forcetype") + "\""; } std::string devSkin = ""; - if (H.GetVar("dev").size()){devSkin = ",skin:\"dev\"";} - H.SetVar("stream", ""); - H.SetVar("dev", ""); + if (req.GetVar("dev").size()){devSkin = ",skin:\"dev\"";} devSkin += ",urlappend:\"" + H.allVars() + "\""; H.SetVar("stream", streamName); std::string seekTo = ""; - if (H.GetVar("t").size()){ + if (req.GetVar("t").size()){ uint64_t autoSeekTime = 0; - std::string sTime = H.GetVar("t"); + std::string sTime = req.GetVar("t"); unsigned long long h = 0, m = 0, s = 0; autoSeekTime = JSON::Value(sTime).asInt(); if (sscanf(sTime.c_str(), "%llum%llus", &m, &s) == 2){autoSeekTime = m * 60 + s;} @@ -385,13 +383,10 @@ namespace Mist{ streamName + "\").addEventListener(\"initialized\",f);"; } } - - H.Clean(); + H.SetHeader("Content-Type", "text/html"); H.SetHeader("X-UA-Compatible", "IE=edge"); - H.SetHeader("Server", APPIDENT); - H.setCORSHeaders(); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -427,6 +422,7 @@ namespace Mist{ } H.SendResponse("200", "OK", myConn); responded = true; + H.Clean(); } JSON::Value OutHTTP::getStatusJSON(std::string &reqHost, const std::string &useragent){ @@ -634,23 +630,31 @@ namespace Mist{ // loop over the added sources, add them to json_resp["sources"] for (std::set::iterator it = sources.begin(); it != sources.end(); it++){ - if ((*it)["simul_tracks"].asInt() > 0){json_resp["source"].append(*it);} + if ((*it)["simul_tracks"].asInt() > 0){ + if (Comms::tknMode & 0x04){ + JSON::Value tmp; + tmp = (*it); + tmp["url"] = tmp["url"].asStringRef() + "?tkn=" + tkn; + tmp["relurl"] = tmp["relurl"].asStringRef() + "?tkn=" + tkn; + json_resp["source"].append(tmp); + }else{ + json_resp["source"].append(*it); + } + } } return json_resp; } - void OutHTTP::onHTTP(){ + void OutHTTP::respondHTTP(const HTTP::Parser & req, bool headersOnly){ origStreamName = streamName; - std::string method = H.method; - if (H.GetHeader("X-Mst-Path").size()){mistPath = H.GetHeader("X-Mst-Path");} + if (req.GetHeader("X-Mst-Path").size()){mistPath = req.GetHeader("X-Mst-Path");} // Handle certbot validations - if (H.url.substr(0, 28) == "/.well-known/acme-challenge/"){ + if (req.url.substr(0, 28) == "/.well-known/acme-challenge/"){ std::string cbToken = H.url.substr(28); jsonForEach(config->getOption("certbot", true), it){ if (it->asStringRef().substr(0, cbToken.size() + 1) == cbToken + ":"){ - H.Clean(); H.SetHeader("Content-Type", "text/plain"); H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); @@ -661,9 +665,7 @@ namespace Mist{ return; } } - H.Clean(); H.SetHeader("Content-Type", "text/plain"); - H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); H.SetBody("No matching validation found for token '" + cbToken + "'"); H.SendResponse("404", "Not found", myConn); @@ -672,12 +674,11 @@ namespace Mist{ return; } - if (H.url == "/crossdomain.xml"){ - H.Clean(); + if (req.url == "/crossdomain.xml"){ H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -693,12 +694,11 @@ namespace Mist{ return; }// crossdomain.xml - if (H.url == "/clientaccesspolicy.xml"){ - H.Clean(); + if (req.url == "/clientaccesspolicy.xml"){ H.SetHeader("Content-Type", "text/xml"); H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -716,8 +716,7 @@ namespace Mist{ return; }// clientaccesspolicy.xml - if (H.url == "/flashplayer.swf"){ - H.Clean(); + if (req.url == "/flashplayer.swf"){ H.SetHeader("Content-Type", "application/x-shockwave-flash"); H.SetHeader("Server", APPIDENT); H.SetBody((const char *)FlashMediaPlayback_101_swf, FlashMediaPlayback_101_swf_len); @@ -725,8 +724,7 @@ namespace Mist{ responded = true; return; } - if (H.url == "/oldflashplayer.swf"){ - H.Clean(); + if (req.url == "/oldflashplayer.swf"){ H.SetHeader("Content-Type", "application/x-shockwave-flash"); H.SetHeader("Server", APPIDENT); H.SetBody((const char *)FlashMediaPlayback_swf, FlashMediaPlayback_swf_len); @@ -735,20 +733,21 @@ namespace Mist{ return; } // send logo icon - if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){ - sendIcon(); + if (req.url.length() > 4 && req.url.substr(req.url.length() - 4, 4) == ".ico"){ + sendIcon(headersOnly); return; } // send generic HTML page - if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".html"){ - HTMLResponse(); + if (req.url.length() > 6 && req.url.substr(req.url.length() - 5, 5) == ".html"){ + HTMLResponse(req, headersOnly); return; } // send smil MBR index - if (H.url.length() > 6 && H.url.substr(H.url.length() - 5, 5) == ".smil"){ - std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; + if (req.url.length() > 6 && req.url.substr(req.url.length() - 5, 5) == ".smil"){ + HTTPOutput::respondHTTP(req, headersOnly); + std::string reqHost = HTTP::URL(req.GetHeader("Host")).host; std::string port, url_rel; std::string trackSources; // this string contains all track sources for MBR smil { @@ -782,11 +781,8 @@ namespace Mist{ } } - H.Clean(); H.SetHeader("Content-Type", "application/smil"); - H.SetHeader("Server", APPIDENT); - H.setCORSHeaders(); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -800,24 +796,22 @@ namespace Mist{ return; } - if ((H.url.length() > 9 && H.url.substr(0, 6) == "/info_" && H.url.substr(H.url.length() - 3, 3) == ".js") || - (H.url.length() > 9 && H.url.substr(0, 6) == "/json_" && H.url.substr(H.url.length() - 3, 3) == ".js")){ - if (websocketHandler()){return;} - std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; - std::string useragent = H.GetVar("ua"); - if (!useragent.size()){useragent = H.GetHeader("User-Agent");} + if ((req.url.length() > 9 && req.url.substr(0, 6) == "/info_" && req.url.substr(req.url.length() - 3, 3) == ".js") || + (req.url.length() > 9 && req.url.substr(0, 6) == "/json_" && req.url.substr(req.url.length() - 3, 3) == ".js")){ + HTTPOutput::respondHTTP(req, headersOnly); + if (websocketHandler(req, headersOnly)){return;} + std::string reqHost = HTTP::URL(req.GetHeader("Host")).host; + std::string useragent = req.GetVar("ua"); + if (!useragent.size()){useragent = req.GetHeader("User-Agent");} std::string response; - std::string rURL = H.url; - if (method != "OPTIONS" && method != "HEAD"){initialize();} - H.Clean(); - H.SetHeader("Server", APPIDENT); - H.setCORSHeaders(); + std::string rURL = req.url; + if (headersOnly){initialize();} if (rURL.substr(0, 6) != "/json_"){ H.SetHeader("Content-Type", "application/javascript"); }else{ H.SetHeader("Content-Type", "application/json"); } - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -837,9 +831,9 @@ namespace Mist{ return; }// embed code generator - if ((H.url == "/player.js") || ((H.url.substr(0, 7) == "/embed_") && (H.url.length() > 10) && - (H.url.substr(H.url.length() - 3, 3) == ".js"))){ - HTTP::URL fullURL(H.GetHeader("Host")); + if ((req.url == "/player.js") || ((req.url.substr(0, 7) == "/embed_") && (req.url.length() > 10) && + (req.url.substr(H.url.length() - 3, 3) == ".js"))){ + HTTP::URL fullURL(req.GetHeader("Host")); if (!fullURL.protocol.size()){fullURL.protocol = getProtocolForPort(fullURL.getPort());} if (config->getString("pubaddr") != ""){ HTTP::URL altURL(config->getString("pubaddr")); @@ -850,12 +844,17 @@ namespace Mist{ } if (mistPath.size()){fullURL = mistPath;} std::string response; - std::string rURL = H.url; - H.Clean(); + std::string rURL = req.url; + + if ((rURL.substr(0, 7) == "/embed_") && (rURL.length() > 10) && + (rURL.substr(rURL.length() - 3, 3) == ".js")){ + HTTPOutput::respondHTTP(req, headersOnly); + } + H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript; charset=utf-8"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -933,14 +932,13 @@ namespace Mist{ return; } - if (H.url.substr(0, 7) == "/skins/"){ + if (req.url.substr(0, 7) == "/skins/"){ std::string response; - std::string url = H.url; - H.Clean(); + std::string url = req.url; H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); H.SetHeader("Content-Type", "text/css"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -970,13 +968,12 @@ namespace Mist{ H.Clean(); return; } - if (H.url == "/videojs.js"){ + if (req.url == "/videojs.js"){ std::string response; - H.Clean(); H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -992,13 +989,12 @@ namespace Mist{ H.Clean(); return; } - if (H.url == "/dashjs.js"){ + if (req.url == "/dashjs.js"){ std::string response; - H.Clean(); H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -1016,13 +1012,12 @@ namespace Mist{ H.Clean(); return; } - if (H.url == "/webrtc.js"){ + if (req.url == "/webrtc.js"){ std::string response; - H.Clean(); H.SetHeader("Server", APPIDENT); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -1038,13 +1033,12 @@ namespace Mist{ H.Clean(); return; } - if (H.url == "/flv.js"){ + if (req.url == "/flv.js"){ std::string response; - H.Clean(); H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); H.Clean(); return; @@ -1058,13 +1052,12 @@ namespace Mist{ H.Clean(); return; } - if (H.url == "/hlsjs.js"){ + if (req.url == "/hlsjs.js"){ std::string response; - H.Clean(); H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); H.Clean(); return; @@ -1084,7 +1077,7 @@ namespace Mist{ H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); H.setCORSHeaders(); H.SetHeader("Content-Type", "application/javascript"); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); H.Clean(); return; @@ -1100,15 +1093,13 @@ namespace Mist{ } } - void OutHTTP::sendIcon(){ - std::string method = H.method; - H.Clean(); + void OutHTTP::sendIcon(bool headersOnly){ #include "../icon.h" H.SetHeader("Content-Type", "image/x-icon"); H.SetHeader("Server", APPIDENT); H.SetHeader("Content-Length", icon_len); H.setCORSHeaders(); - if (method == "OPTIONS" || method == "HEAD"){ + if (headersOnly){ H.SendResponse("200", "OK", myConn); responded = true; H.Clean(); @@ -1120,16 +1111,16 @@ namespace Mist{ H.Clean(); } - bool OutHTTP::websocketHandler(){ + bool OutHTTP::websocketHandler(const HTTP::Parser & req, bool headersOnly){ stayConnected = true; - std::string reqHost = HTTP::URL(H.GetHeader("Host")).host; - if (H.GetHeader("X-Mst-Path").size()){mistPath = H.GetHeader("X-Mst-Path");} - std::string useragent = H.GetVar("ua"); - if (!useragent.size()){useragent = H.GetHeader("User-Agent");} - std::string upgradeHeader = H.GetHeader("Upgrade"); + std::string reqHost = HTTP::URL(req.GetHeader("Host")).host; + if (req.GetHeader("X-Mst-Path").size()){mistPath = req.GetHeader("X-Mst-Path");} + std::string useragent = req.GetVar("ua"); + if (!useragent.size()){useragent = req.GetHeader("User-Agent");} + std::string upgradeHeader = req.GetHeader("Upgrade"); Util::stringToLower(upgradeHeader); if (upgradeHeader != "websocket"){return false;} - HTTP::Websocket ws(myConn, H); + HTTP::Websocket ws(myConn, req, H); if (!ws){return false;} setBlocking(false); // start the stream, if needed diff --git a/src/output/output_http_internal.h b/src/output/output_http_internal.h index 774eb186..8e610145 100644 --- a/src/output/output_http_internal.h +++ b/src/output/output_http_internal.h @@ -10,10 +10,10 @@ namespace Mist{ virtual void onFail(const std::string &msg, bool critical = false); /// preHTTP is disabled in the internal HTTP output, since most don't need the stream alive to work virtual void preHTTP(){}; - void HTMLResponse(); - void onHTTP(); - void sendIcon(); - bool websocketHandler(); + void HTMLResponse(const HTTP::Parser & req, bool headersOnly); + void respondHTTP(const HTTP::Parser & req, bool headersOnly); + void sendIcon(bool headersOnly); + bool websocketHandler(const HTTP::Parser & req, bool headersOnly); JSON::Value getStatusJSON(std::string &reqHost, const std::string &useragent = ""); bool stayConnected; virtual bool onFinish(){return stayConnected;} diff --git a/src/output/output_sdp.cpp b/src/output/output_sdp.cpp index 6baf9067..50d491e8 100644 --- a/src/output/output_sdp.cpp +++ b/src/output/output_sdp.cpp @@ -141,6 +141,18 @@ namespace Mist{ } } + std::string OutSDP::getConnectedHost(){ + if (!sdpState.tracks.size()) { return Output::getConnectedHost(); } + std::string hostname; + uint32_t port; + sdpState.tracks[0].data.GetDestination(hostname, port); + return hostname; + } + std::string OutSDP::getConnectedBinHost(){ + if (!sdpState.tracks.size()) { return Output::getConnectedBinHost(); } + return sdpState.tracks[0].data.getBinDestination(); + } + void OutSDP::sendNext(){ char *dataPointer = 0; size_t dataLen = 0; diff --git a/src/output/output_sdp.h b/src/output/output_sdp.h index 2d4b3368..6c466753 100644 --- a/src/output/output_sdp.h +++ b/src/output/output_sdp.h @@ -16,6 +16,8 @@ namespace Mist{ void sendNext(); void sendHeader(); bool onFinish(); + std::string getConnectedHost(); + std::string getConnectedBinHost(); private: void initTracks(uint32_t & port, std::string targetIP); diff --git a/src/output/output_ts.cpp b/src/output/output_ts.cpp index 561bf9aa..bf46663b 100644 --- a/src/output/output_ts.cpp +++ b/src/output/output_ts.cpp @@ -239,6 +239,18 @@ namespace Mist{ } } + std::string OutTS::getConnectedHost(){ + if (!pushOut) { return Output::getConnectedHost(); } + std::string hostname; + uint32_t port; + pushSock.GetDestination(hostname, port); + return hostname; + } + std::string OutTS::getConnectedBinHost(){ + if (!pushOut) { return Output::getConnectedBinHost(); } + return pushSock.getBinDestination(); + } + bool OutTS::listenMode(){return !(config->getString("target").size());} void OutTS::onRequest(){ diff --git a/src/output/output_ts.h b/src/output/output_ts.h index 32aa6958..0c38cc70 100644 --- a/src/output/output_ts.h +++ b/src/output/output_ts.h @@ -12,6 +12,8 @@ namespace Mist{ virtual void initialSeek(); bool isReadyForPlay(); void onRequest(); + std::string getConnectedHost(); + std::string getConnectedBinHost(); private: size_t udpSize; diff --git a/src/output/output_tsrist.cpp b/src/output/output_tsrist.cpp index c4e03f80..10b29673 100644 --- a/src/output/output_tsrist.cpp +++ b/src/output/output_tsrist.cpp @@ -178,6 +178,20 @@ namespace Mist{ } OutTSRIST::~OutTSRIST(){} + + std::string OutTSRIST::getConnectedHost(){ + if (!pushOut) { return Output::getConnectedHost(); } + return target.host; + } + std::string OutTSRIST::getConnectedBinHost(){ + if (!pushOut) { return Output::getConnectedBinHost(); } + std::string binHost = Socket::getBinForms(target.host); + if (binHost.size() > 16){ binHost = binHost.substr(0, 16); } + if (binHost.size() < 16){ + binHost = std::string("\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001", 16); + } + return binHost; + } void OutTSRIST::init(Util::Config *cfg){ Output::init(cfg); @@ -319,7 +333,7 @@ namespace Mist{ } } - void OutTSRIST::connStats(uint64_t now, Comms::Statistics &statComm){ + void OutTSRIST::connStats(uint64_t now, Comms::Connections &statComm){ if (!myConn){return;} statComm.setUp(upBytes); statComm.setDown(0); diff --git a/src/output/output_tsrist.h b/src/output/output_tsrist.h index c9a5db39..80ee2376 100644 --- a/src/output/output_tsrist.h +++ b/src/output/output_tsrist.h @@ -16,9 +16,11 @@ namespace Mist{ bool isReadyForPlay(){return true;} virtual void requestHandler(); static void listener(Util::Config &conf, int (*callback)(Socket::Connection &S)); + std::string getConnectedHost(); + std::string getConnectedBinHost(); protected: - virtual void connStats(uint64_t now, Comms::Statistics &statComm); + virtual void connStats(uint64_t now, Comms::Connections &statComm); //virtual std::string getConnectedHost(){ // return srtConn.remotehost; //} diff --git a/src/session.cpp b/src/session.cpp index 3865e0ec..d2331c53 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -7,17 +7,31 @@ #include #include #include -// Stats of connections which have closed are added to these global counters -uint64_t globalNow = 0; + +// Global counters +uint64_t now = Util::bootSecs(); +uint64_t currentConnections = 0; +uint64_t lastSecond = 0; uint64_t globalTime = 0; uint64_t globalDown = 0; uint64_t globalUp = 0; uint64_t globalPktcount = 0; uint64_t globalPktloss = 0; uint64_t globalPktretrans = 0; +// Stores last values of each connection +std::map connTime; +std::map connDown; +std::map connUp; +std::map connPktcount; +std::map connPktloss; +std::map connPktretrans; // Counts the duration a connector has been active std::map connectorCount; std::map connectorLastActive; +std::map hostCount; +std::map hostLastActive; +std::map streamCount; +std::map streamLastActive; // Set to True when a session gets invalidated, so that we know to run a new USER_NEW trigger bool forceTrigger = false; void handleSignal(int signum){ @@ -26,96 +40,141 @@ void handleSignal(int signum){ } } -void userOnActive(uint64_t &connections){ - ++connections; -} +const char nullAddress[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; -std::string getEnvWithDefault(const std::string variableName, const std::string defaultValue){ - const char* value = getenv(variableName.c_str()); - if (value){ - unsetenv(variableName.c_str()); - return value; - }else{ - return defaultValue; - } -} - -/// \brief Adds stats of closed connections to global counters -void userOnDisconnect(Comms::Connections & connections, size_t idx){ +void userOnActive(Comms::Connections &connections, size_t idx){ + uint64_t lastUpdate = connections.getNow(idx); + if (lastUpdate < now - 10){return;} + ++currentConnections; std::string thisConnector = connections.getConnector(idx); - if (thisConnector != ""){ - connectorCount[thisConnector] += connections.getTime(idx); + std::string thisStreamName = connections.getStream(idx); + const std::string& thisHost = connections.getHost(idx); + + if (connections.getLastSecond(idx) > lastSecond){lastSecond = connections.getLastSecond(idx);} + // Save info on the latest active stream, protocol and host separately + if (thisConnector.size() && thisConnector != "HTTP"){ + connectorCount[thisConnector]++; + if (connectorLastActive[thisConnector] < lastUpdate){connectorLastActive[thisConnector] = lastUpdate;} } - globalTime += connections.getTime(idx); - globalDown += connections.getDown(idx); - globalUp += connections.getUp(idx); - globalPktcount += connections.getPacketCount(idx); - globalPktloss += connections.getPacketLostCount(idx); - globalPktretrans += connections.getPacketRetransmitCount(idx); + if (thisStreamName.size()){ + streamCount[thisStreamName]++; + if (streamLastActive[thisStreamName] < lastUpdate){streamLastActive[thisStreamName] = lastUpdate;} + } + if (memcmp(thisHost.data(), nullAddress, 16)){ + hostCount[thisHost]++; + if (!hostLastActive.count(thisHost) || hostLastActive[thisHost] < lastUpdate){hostLastActive[thisHost] = lastUpdate;} + } + // Sanity checks + if (connections.getDown(idx) < connDown[idx]){ + WARN_MSG("Connection downloaded bytes should be a counter, but has decreased in value"); + connDown[idx] = connections.getDown(idx); + } + if (connections.getUp(idx) < connUp[idx]){ + WARN_MSG("Connection uploaded bytes should be a counter, but has decreased in value"); + connUp[idx] = connections.getUp(idx); + } + if (connections.getPacketCount(idx) < connPktcount[idx]){ + WARN_MSG("Connection packet count should be a counter, but has decreased in value"); + connPktcount[idx] = connections.getPacketCount(idx); + } + if (connections.getPacketLostCount(idx) < connPktloss[idx]){ + WARN_MSG("Connection packet loss count should be a counter, but has decreased in value"); + connPktloss[idx] = connections.getPacketLostCount(idx); + } + if (connections.getPacketRetransmitCount(idx) < connPktretrans[idx]){ + WARN_MSG("Connection packets retransmitted should be a counter, but has decreased in value"); + connPktretrans[idx] = connections.getPacketRetransmitCount(idx); + } + // Add increase in stats to global stats + globalDown += connections.getDown(idx) - connDown[idx]; + globalUp += connections.getUp(idx) - connUp[idx]; + globalPktcount += connections.getPacketCount(idx) - connPktcount[idx]; + globalPktloss += connections.getPacketLostCount(idx) - connPktloss[idx]; + globalPktretrans += connections.getPacketRetransmitCount(idx) - connPktretrans[idx]; + // Set last values of this connection + connTime[idx]++; + connDown[idx] = connections.getDown(idx); + connUp[idx] = connections.getUp(idx); + connPktcount[idx] = connections.getPacketCount(idx); + connPktloss[idx] = connections.getPacketLostCount(idx); + connPktretrans[idx] = connections.getPacketRetransmitCount(idx); +} + +/// \brief Remove mappings of inactive connections +void userOnDisconnect(Comms::Connections & connections, size_t idx){ + connTime.erase(idx); + connDown.erase(idx); + connUp.erase(idx); + connPktcount.erase(idx); + connPktloss.erase(idx); + connPktretrans.erase(idx); } int main(int argc, char **argv){ Comms::Connections connections; Comms::Sessions sessions; uint64_t lastSeen = Util::bootSecs(); - uint64_t currentConnections = 0; Util::redirectLogsIfNeeded(); signal(SIGUSR1, handleSignal); // Init config and parse arguments Util::Config config = Util::Config("MistSession"); JSON::Value option; + char * tmpStr = 0; option.null(); option["arg_num"] = 1; option["arg"] = "string"; option["help"] = "Session identifier of the entire session"; - option["default"] = ""; config.addOption("sessionid", option); - option.null(); - option["long"] = "sessionmode"; - option["short"] = "m"; - option["arg"] = "integer"; - option["default"] = 0; - config.addOption("sessionmode", option); - option.null(); option["long"] = "streamname"; - option["short"] = "n"; + option["short"] = "s"; option["arg"] = "string"; - option["default"] = ""; + option["help"] = "Stream name initial value. May also be passed as SESSION_STREAM"; + tmpStr = getenv("SESSION_STREAM"); + option["default"] = tmpStr?tmpStr:""; config.addOption("streamname", option); option.null(); option["long"] = "ip"; option["short"] = "i"; option["arg"] = "string"; - option["default"] = ""; + option["help"] = "IP address initial value. May also be passed as SESSION_IP"; + tmpStr = getenv("SESSION_IP"); + option["default"] = tmpStr?tmpStr:""; config.addOption("ip", option); option.null(); - option["long"] = "sid"; - option["short"] = "s"; + option["long"] = "tkn"; + option["short"] = "t"; option["arg"] = "string"; - option["default"] = ""; - config.addOption("sid", option); + option["help"] = "Client-side session ID initial value. May also be passed as SESSION_TKN"; + tmpStr = getenv("SESSION_TKN"); + option["default"] = tmpStr?tmpStr:""; + config.addOption("tkn", option); option.null(); option["long"] = "protocol"; option["short"] = "p"; option["arg"] = "string"; - option["default"] = ""; + option["help"] = "Protocol initial value. May also be passed as SESSION_PROTOCOL"; + tmpStr = getenv("SESSION_PROTOCOL"); + option["default"] = tmpStr?tmpStr:""; config.addOption("protocol", option); option.null(); option["long"] = "requrl"; option["short"] = "r"; option["arg"] = "string"; - option["default"] = ""; + option["help"] = "Request URL initial value. May also be passed as SESSION_REQURL"; + tmpStr = getenv("SESSION_REQURL"); + option["default"] = tmpStr?tmpStr:""; config.addOption("requrl", option); config.activate(); if (!(config.parseArgs(argc, argv))){ + config.printHelp(std::cout); FAIL_MSG("Cannot start a new session due to invalid arguments"); return 1; } @@ -123,24 +182,17 @@ int main(int argc, char **argv){ const uint64_t bootTime = Util::getMicros(); // Get session ID, session mode and other variables used as payload for the USER_NEW and USER_END triggers const std::string thisStreamName = config.getString("streamname"); - const std::string thisHost = config.getString("ip"); - const std::string thisSid = config.getString("sid"); + const std::string thisToken = config.getString("tkn"); const std::string thisProtocol = config.getString("protocol"); const std::string thisReqUrl = config.getString("requrl"); const std::string thisSessionId = config.getString("sessionid"); - const uint64_t sessionMode = config.getInteger("sessionmode"); + std::string thisHost = Socket::getBinForms(config.getString("ip")); + if (thisHost.size() > 16){thisHost = thisHost.substr(0, 16);} - if (thisSessionId == "" || thisProtocol == "" || thisStreamName == ""){ - FAIL_MSG("Given the following incomplete arguments: SessionId: '%s', protocol: '%s', stream name: '%s'. Aborting opening a new session", - thisSessionId.c_str(), thisProtocol.c_str(), thisStreamName.c_str()); - return 1; - } - - MEDIUM_MSG("Starting a new session for sessionId '%s'", thisSessionId.c_str()); - if (sessionMode < 1 || sessionMode > 15) { - FAIL_MSG("Invalid session mode of value %lu. Should be larger than 0 and smaller than 16", sessionMode); - return 1; - } + std::string ipHex; + Socket::hostBytesToStr(thisHost.c_str(), thisHost.size(), ipHex); + VERYHIGH_MSG("Starting a new session. Passed variables are stream name '%s', session token '%s', protocol '%s', requested URL '%s', IP '%s' and session id '%s'", + thisStreamName.c_str(), thisToken.c_str(), thisProtocol.c_str(), thisReqUrl.c_str(), ipHex.c_str(), thisSessionId.c_str()); // Try to lock to ensure we are the only process initialising this session IPC::semaphore sessionLock; @@ -174,194 +226,200 @@ int main(int argc, char **argv){ sessionLock.post(); return 1; } - // Open the shared memory page containing statistics for each individual connection in this session - connections.reload(thisStreamName, thisHost, thisSid, thisProtocol, thisReqUrl, sessionMode, true, false); + // Initialise global session data sessions.setHost(thisHost); sessions.setSessId(thisSessionId); sessions.setStream(thisStreamName); - sessionLock.post(); + if (thisProtocol.size() && thisProtocol != "HTTP"){connectorLastActive[thisProtocol] = now;} + if (thisStreamName.size()){streamLastActive[thisStreamName] = now;} + if (memcmp(thisHost.data(), nullAddress, 16)){hostLastActive[thisHost] = now;} + + // Open the shared memory page containing statistics for each individual connection in this session + connections.reload(thisSessionId, true); // Determine session type, since triggers only get run for viewer type sessions uint64_t thisType = 0; if (thisSessionId[0] == 'I'){ - INFO_MSG("Started new input session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime)); thisType = 1; - } - else if (thisSessionId[0] == 'O'){ - INFO_MSG("Started new output session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime)); + } else if (thisSessionId[0] == 'O'){ thisType = 2; - } - else{ - INFO_MSG("Started new viewer session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime)); + } else if (thisSessionId[0] == 'U'){ + thisType = 3; } // Do a USER_NEW trigger if it is defined for this stream if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){ - std::string payload = thisStreamName + "\n" + thisHost + "\n" + - thisSid + "\n" + thisProtocol + + std::string payload = thisStreamName + "\n" + config.getString("ip") + "\n" + + thisToken + "\n" + thisProtocol + "\n" + thisReqUrl + "\n" + thisSessionId; if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ // Mark all connections of this session as finished, since this viewer is not allowed to view this stream + Util::logExitReason("Session rejected by USER_NEW"); connections.setExit(); connections.finishAll(); } } - uint64_t lastSecond = 0; - uint64_t now = 0; - uint64_t time = 0; - uint64_t down = 0; - uint64_t up = 0; - uint64_t pktcount = 0; - uint64_t pktloss = 0; - uint64_t pktretrans = 0; - std::string connector = ""; + //start allowing viewers + sessionLock.post(); + + INFO_MSG("Started new session %s in %.3f ms", thisSessionId.c_str(), (double)Util::getMicros(bootTime)/1000.0); + // Stay active until Mist exits or we no longer have an active connection - while (config.is_active && (currentConnections || Util::bootSecs() - lastSeen <= 10)){ - time = 0; - connector = ""; - down = 0; - up = 0; - pktcount = 0; - pktloss = 0; - pktretrans = 0; + while (config.is_active && (currentConnections || now - lastSeen <= STATS_DELAY) && !connections.getExit()){ currentConnections = 0; + lastSecond = 0; + now = Util::bootSecs(); - // Count active connections - COMM_LOOP(connections, userOnActive(currentConnections), userOnDisconnect(connections, id)); // Loop through all connection entries to get a summary of statistics - for (uint64_t idx = 0; idx < connections.recordCount(); idx++){ - if (connections.getStatus(idx) == COMM_STATUS_INVALID || connections.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;} - uint64_t thisLastSecond = connections.getLastSecond(idx); - std::string thisConnector = connections.getConnector(idx); - // Save info on the latest active connection separately - if (thisLastSecond > lastSecond){ - lastSecond = thisLastSecond; - now = connections.getNow(idx); - } - connectorLastActive[thisConnector] = thisLastSecond; - // Sum all other variables - time += connections.getTime(idx); - down += connections.getDown(idx); - up += connections.getUp(idx); - pktcount += connections.getPacketCount(idx); - pktloss += connections.getPacketLostCount(idx); - pktretrans += connections.getPacketRetransmitCount(idx); + COMM_LOOP(connections, userOnActive(connections, id), userOnDisconnect(connections, id)); + if (currentConnections){ + globalTime++; + lastSeen = now; } - // Convert connector duration to string - std::stringstream connectorSummary; - bool addDelimiter = false; - connectorSummary << "{"; - for (std::map::iterator it = connectorLastActive.begin(); - it != connectorLastActive.end(); ++it){ - if (lastSecond - it->second < 10000){ - connectorSummary << (addDelimiter ? "," : "") << it->first; - addDelimiter = true; - } - } - connectorSummary << "}"; - // Write summary to global statistics - sessions.setTime(time + globalTime); - sessions.setDown(down + globalDown); - sessions.setUp(up + globalUp); - sessions.setPacketCount(pktcount + globalPktcount); - sessions.setPacketLostCount(pktloss + globalPktloss); - sessions.setPacketRetransmitCount(pktretrans + globalPktretrans); + sessions.setTime(globalTime); + sessions.setDown(globalDown); + sessions.setUp(globalUp); + sessions.setPacketCount(globalPktcount); + sessions.setPacketLostCount(globalPktloss); + sessions.setPacketRetransmitCount(globalPktretrans); sessions.setLastSecond(lastSecond); - sessions.setConnector(connectorSummary.str()); sessions.setNow(now); + if (currentConnections){ + { + // Convert active protocols to string + std::stringstream connectorSummary; + for (std::map::iterator it = connectorLastActive.begin(); + it != connectorLastActive.end(); ++it){ + if (now - it->second < STATS_DELAY){ + connectorSummary << (connectorSummary.str().size() ? "," : "") << it->first; + } + } + sessions.setConnector(connectorSummary.str()); + } + + { + // Set active host to last active or 0 if there were various hosts active recently + std::string thisHost; + for (std::map::iterator it = hostLastActive.begin(); + it != hostLastActive.end(); ++it){ + if (now - it->second < STATS_DELAY){ + if (!thisHost.size()){ + thisHost = it->first; + }else if (thisHost != it->first){ + thisHost = nullAddress; + break; + } + } + } + if (!thisHost.size()){ + thisHost = nullAddress; + } + sessions.setHost(thisHost); + } + + { + // Set active stream name to last active or "" if there were multiple streams active recently + std::string thisStream = ""; + for (std::map::iterator it = streamLastActive.begin(); + it != streamLastActive.end(); ++it){ + if (now - it->second < STATS_DELAY){ + if (!thisStream.size()){ + thisStream = it->first; + }else if (thisStream != it->first){ + thisStream = ""; + break; + } + } + } + sessions.setStream(thisStream); + } + } + // Retrigger USER_NEW if a re-sync was requested if (!thisType && forceTrigger){ forceTrigger = false; + std::string host; + Socket::hostBytesToStr(thisHost.data(), 16, host); if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){ INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str()); - std::string payload = thisStreamName + "\n" + thisHost + "\n" + - thisSid + "\n" + thisProtocol + + std::string payload = thisStreamName + "\n" + host + "\n" + + thisToken + "\n" + thisProtocol + "\n" + thisReqUrl + "\n" + thisSessionId; if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){ INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str()); + Util::logExitReason("Session rejected by USER_NEW"); connections.setExit(); connections.finishAll(); + break; }else{ INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str()); } } } - // Invalidate connections if the session is marked as invalid - if(connections.getExit()){ - connections.finishAll(); - break; - } // Remember latest activity so we know when this session ends if (currentConnections){ - lastSeen = Util::bootSecs(); } - Util::sleep(1000); + Util::wait(1000); + } + if (Util::bootSecs() - lastSeen > STATS_DELAY){ + Util::logExitReason("Session inactive for %d seconds", STATS_DELAY); } // Trigger USER_END if (!thisType && Triggers::shouldTrigger("USER_END", thisStreamName)){ - lastSecond = 0; - time = 0; - down = 0; - up = 0; - // Get a final summary of this session - for (uint64_t idx = 0; idx < connections.recordCount(); idx++){ - if (connections.getStatus(idx) == COMM_STATUS_INVALID || connections.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;} - uint64_t thisLastSecond = connections.getLastSecond(idx); - // Set last second to the latest entry - if (thisLastSecond > lastSecond){ - lastSecond = thisLastSecond; - } - // Count protocol durations across the entire session - std::string thisConnector = connections.getConnector(idx); - if (thisConnector != ""){ - connectorCount[thisConnector] += connections.getTime(idx); - } - // Sum all other variables - time += connections.getTime(idx); - down += connections.getDown(idx); - up += connections.getUp(idx); - } - - // Convert connector duration to string + // Convert connector, host and stream into lists and counts std::stringstream connectorSummary; - bool addDelimiter = false; - connectorSummary << "{"; - for (std::map::iterator it = connectorCount.begin(); - it != connectorCount.end(); ++it){ - connectorSummary << (addDelimiter ? "," : "") << it->first << ":" << it->second; - addDelimiter = true; + std::stringstream connectorTimes; + for (std::map::iterator it = connectorCount.begin(); it != connectorCount.end(); ++it){ + connectorSummary << (connectorSummary.str().size() ? "," : "") << it->first; + connectorTimes << (connectorTimes.str().size() ? "," : "") << it->second; + } + std::stringstream hostSummary; + std::stringstream hostTimes; + for (std::map::iterator it = hostCount.begin(); it != hostCount.end(); ++it){ + std::string host; + Socket::hostBytesToStr(it->first.data(), 16, host); + hostSummary << (hostSummary.str().size() ? "," : "") << host; + hostTimes << (hostTimes.str().size() ? "," : "") << it->second; + } + std::stringstream streamSummary; + std::stringstream streamTimes; + for (std::map::iterator it = streamCount.begin(); it != streamCount.end(); ++it){ + streamSummary << (streamSummary.str().size() ? "," : "") << it->first; + streamTimes << (streamTimes.str().size() ? "," : "") << it->second; } - connectorSummary << "}"; - const uint64_t duration = lastSecond - (bootTime / 1000); std::stringstream summary; - summary << thisSessionId << "\n" - << thisStreamName << "\n" + summary << thisToken << "\n" + << streamSummary.str() << "\n" << connectorSummary.str() << "\n" - << thisHost << "\n" - << duration << "\n" - << up << "\n" - << down << "\n" - << sessions.getTags(); + << hostSummary.str() << "\n" + << globalTime << "\n" + << globalUp << "\n" + << globalDown << "\n" + << sessions.getTags() << "\n" + << hostTimes.str() << "\n" + << connectorTimes.str() << "\n" + << streamTimes.str() << "\n" + << thisSessionId; Triggers::doTrigger("USER_END", summary.str(), thisStreamName); } if (!thisType && connections.getExit()){ - WARN_MSG("Session %s has been invalidated since it is not allowed to view stream %s", thisSessionId.c_str(), thisStreamName.c_str()); uint64_t sleepStart = Util::bootSecs(); // Keep session invalidated for 10 minutes, or until the session stops - while (config.is_active && sleepStart - Util::bootSecs() < 600){ + while (config.is_active && Util::bootSecs() - sleepStart < SESS_TIMEOUT){ Util::sleep(1000); + if (forceTrigger){break;} } } - INFO_MSG("Shutting down session %s", thisSessionId.c_str()); + INFO_MSG("Shutting down session %s: %s", thisSessionId.c_str(), Util::exitReason); return 0; }