WebSocket API in controller
This commit is contained in:
		
							parent
							
								
									5ffd51e958
								
							
						
					
					
						commit
						0dd602d5ca
					
				
					 7 changed files with 523 additions and 14 deletions
				
			
		|  | @ -135,6 +135,9 @@ static inline void show_stackframe(){} | |||
| #define SEM_INPUT "/MstInpt%s" //%s stream name
 | ||||
| #define SEM_CONF "/MstConfLock" | ||||
| #define SHM_CONF "MstConf" | ||||
| #define SHM_STATE_LOGS "MstStateLogs" | ||||
| #define SHM_STATE_ACCS "MstStateAccs" | ||||
| #define SHM_STATE_STREAMS "MstStateStreams" | ||||
| #define NAME_BUFFER_SIZE 200    //char buffer size for snprintf'ing shm filenames
 | ||||
| 
 | ||||
| #define SIMUL_TRACKS 20 | ||||
|  |  | |||
|  | @ -97,6 +97,156 @@ bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket | |||
|   return false; | ||||
| }//Authorize
 | ||||
| 
 | ||||
| class streamStat{ | ||||
|   public: | ||||
|     streamStat(){ | ||||
|       status = 0; | ||||
|       viewers = 0; | ||||
|       inputs = 0; | ||||
|       outputs = 0; | ||||
|     } | ||||
|     streamStat(const Util::RelAccX & rlx, uint64_t entry){ | ||||
|       status = rlx.getInt("status", entry); | ||||
|       viewers = rlx.getInt("viewers", entry); | ||||
|       inputs = rlx.getInt("inputs", entry); | ||||
|       outputs = rlx.getInt("outputs", entry); | ||||
|     } | ||||
|     bool operator ==(const streamStat &b) const{ | ||||
|       return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs); | ||||
|     } | ||||
|     bool operator !=(const streamStat &b) const{ | ||||
|       return !(*this == b); | ||||
|     } | ||||
|     uint8_t status; | ||||
|     uint64_t viewers; | ||||
|     uint64_t inputs; | ||||
|     uint64_t outputs; | ||||
| }; | ||||
| 
 | ||||
| void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){ | ||||
|   std::string logs = H.GetVar("logs"); | ||||
|   std::string accs = H.GetVar("accs"); | ||||
|   bool doStreams = H.GetVar("streams").size(); | ||||
|   HTTP::Websocket W(C, H); | ||||
|   if (!W){return;} | ||||
| 
 | ||||
|   IPC::sharedPage shmLogs(SHM_STATE_LOGS, 1024*1024); | ||||
|   IPC::sharedPage shmAccs(SHM_STATE_ACCS, 1024*1024); | ||||
|   IPC::sharedPage shmStreams(SHM_STATE_STREAMS, 1024*1024); | ||||
|   Util::RelAccX rlxStreams(shmStreams.mapped); | ||||
|   Util::RelAccX rlxLog(shmLogs.mapped); | ||||
|   Util::RelAccX rlxAccs(shmAccs.mapped); | ||||
|   if (!rlxStreams.isReady()){doStreams = false;} | ||||
|   uint64_t logPos = 0; | ||||
|   bool doLog = false; | ||||
|   uint64_t accsPos = 0; | ||||
|   bool doAccs = false; | ||||
|   if (logs.size() && rlxLog.isReady()){ | ||||
|     doLog = true; | ||||
|     logPos = rlxLog.getEndPos(); | ||||
|     if (logs.substr(0, 6) == "since:"){ | ||||
|       uint64_t startLogs = JSON::Value(logs.substr(6)).asInt(); | ||||
|       logPos = rlxLog.getDeleted(); | ||||
|       while (logPos < rlxLog.getEndPos() && rlxLog.getInt("time", logPos) < startLogs){++logPos;} | ||||
|     }else{ | ||||
|       uint64_t numLogs = JSON::Value(logs).asInt(); | ||||
|       if (logPos <= numLogs){ | ||||
|         logPos = rlxLog.getDeleted(); | ||||
|       }else{ | ||||
|         logPos -= numLogs; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   if (accs.size() && rlxAccs.isReady()){ | ||||
|     doAccs = true; | ||||
|     accsPos = rlxAccs.getEndPos(); | ||||
|     if (accs.substr(0, 6) == "since:"){ | ||||
|       uint64_t startAccs = JSON::Value(accs.substr(6)).asInt(); | ||||
|       accsPos = rlxAccs.getDeleted(); | ||||
|       while (accsPos < rlxAccs.getEndPos() && rlxAccs.getInt("time", accsPos) < startAccs){++accsPos;} | ||||
|     }else{ | ||||
|       uint64_t numAccs = JSON::Value(accs).asInt(); | ||||
|       if (accsPos <= numAccs){ | ||||
|         accsPos = rlxAccs.getDeleted(); | ||||
|       }else{ | ||||
|         accsPos -= numAccs; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   std::map<std::string, streamStat> lastStrmStat; | ||||
|   std::set<std::string> strmRemove; | ||||
|   while (W){ | ||||
|     bool sent = false; | ||||
|     while (doLog && rlxLog.getEndPos() > logPos){ | ||||
|       sent = true; | ||||
|       JSON::Value tmp; | ||||
|       tmp[0u] = "log"; | ||||
|       tmp[1u].append((long long)rlxLog.getInt("time", logPos)); | ||||
|       tmp[1u].append(rlxLog.getPointer("kind", logPos)); | ||||
|       tmp[1u].append(rlxLog.getPointer("msg", logPos)); | ||||
|       W.sendFrame(tmp.toString()); | ||||
|       logPos++; | ||||
|     } | ||||
|     while (doAccs && rlxAccs.getEndPos() > accsPos){ | ||||
|       sent = true; | ||||
|       JSON::Value tmp; | ||||
|       tmp[0u] = "access"; | ||||
|       tmp[1u].append((long long)rlxAccs.getInt("time", accsPos)); | ||||
|       tmp[1u].append(rlxAccs.getPointer("session", accsPos)); | ||||
|       tmp[1u].append(rlxAccs.getPointer("stream", accsPos)); | ||||
|       tmp[1u].append(rlxAccs.getPointer("connector", accsPos)); | ||||
|       tmp[1u].append(rlxAccs.getPointer("host", accsPos)); | ||||
|       tmp[1u].append((long long)rlxAccs.getInt("duration", accsPos)); | ||||
|       tmp[1u].append((long long)rlxAccs.getInt("up", accsPos)); | ||||
|       tmp[1u].append((long long)rlxAccs.getInt("down", accsPos)); | ||||
|       tmp[1u].append(rlxAccs.getPointer("tags", accsPos)); | ||||
|       W.sendFrame(tmp.toString()); | ||||
|       accsPos++; | ||||
|     } | ||||
|     if (doStreams){ | ||||
|       for (std::map<std::string, streamStat>::iterator it = lastStrmStat.begin(); it != lastStrmStat.end(); ++it){ | ||||
|         strmRemove.insert(it->first); | ||||
|       } | ||||
|       uint64_t startPos = rlxStreams.getDeleted(); | ||||
|       uint64_t endPos = rlxStreams.getEndPos(); | ||||
|       for (uint64_t cPos = startPos; cPos < endPos; ++cPos){ | ||||
|         std::string strm = rlxStreams.getPointer("stream", cPos); | ||||
|         strmRemove.erase(strm); | ||||
|         streamStat tmpStat(rlxStreams, cPos); | ||||
|         if (lastStrmStat[strm] != tmpStat){ | ||||
|           lastStrmStat[strm] = tmpStat; | ||||
|           sent = true; | ||||
|           JSON::Value tmp; | ||||
|           tmp[0u] = "stream"; | ||||
|           tmp[1u].append(strm); | ||||
|           tmp[1u].append((long long)tmpStat.status); | ||||
|           tmp[1u].append((long long)tmpStat.viewers); | ||||
|           tmp[1u].append((long long)tmpStat.inputs); | ||||
|           tmp[1u].append((long long)tmpStat.outputs); | ||||
|           W.sendFrame(tmp.toString()); | ||||
|         } | ||||
|       } | ||||
|       while (strmRemove.size()){ | ||||
|         std::string strm = *strmRemove.begin(); | ||||
|         sent = true; | ||||
|         JSON::Value tmp; | ||||
|         tmp[0u] = "stream"; | ||||
|         tmp[1u].append(strm); | ||||
|         tmp[1u].append((long long)0); | ||||
|         tmp[1u].append((long long)0); | ||||
|         tmp[1u].append((long long)0); | ||||
|         tmp[1u].append((long long)0); | ||||
|         W.sendFrame(tmp.toString()); | ||||
|         strmRemove.erase(strm); | ||||
|         lastStrmStat.erase(strm); | ||||
|       } | ||||
|     } | ||||
|     if (!sent){ | ||||
|       Util::sleep(500); | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| /// Handles a single incoming API connection.
 | ||||
| /// Assumes the connection is unauthorized and will allow for 4 requests without authorization before disconnecting.
 | ||||
| int Controller::handleAPIConnection(Socket::Connection & conn){ | ||||
|  | @ -137,6 +287,20 @@ int Controller::handleAPIConnection(Socket::Connection & conn){ | |||
|           } | ||||
|         } | ||||
|       } | ||||
|       //Catch websocket requests
 | ||||
|       if (H.url == "/ws"){ | ||||
|         if (!authorized){ | ||||
|           H.Clean(); | ||||
|           H.body = "Please login first or provide a valid token authentication."; | ||||
|           H.SetHeader("Server", "MistServer/" PACKAGE_VERSION); | ||||
|           H.SendResponse("403", "Not authorized", conn); | ||||
|           H.Clean(); | ||||
|           continue; | ||||
|         } | ||||
|         handleWebSocket(H, conn); | ||||
|         H.Clean(); | ||||
|         continue; | ||||
|       } | ||||
|       JSON::Value Response; | ||||
|       JSON::Value Request = JSON::fromString(H.GetVar("command")); | ||||
|       //invalid request? send the web interface, unless requested as "/api"
 | ||||
|  |  | |||
|  | @ -1,8 +1,11 @@ | |||
| #include <mist/socket.h> | ||||
| #include <mist/json.h> | ||||
| #include <mist/websocket.h> | ||||
| #include <mist/http_parser.h> | ||||
| 
 | ||||
| namespace Controller { | ||||
|   bool authorize(JSON::Value & Request, JSON::Value & Response, Socket::Connection & conn); | ||||
|   int handleAPIConnection(Socket::Connection & conn); | ||||
|   void handleAPICommands(JSON::Value & Request, JSON::Value & Response); | ||||
|   void handleWebSocket(HTTP::Parser & H, Socket::Connection & C); | ||||
| } | ||||
|  |  | |||
|  | @ -1,6 +1,7 @@ | |||
| #include <cstdio> | ||||
| #include <list> | ||||
| #include <mist/config.h> | ||||
| #include <mist/stream.h> | ||||
| #include "controller_statistics.h" | ||||
| #include "controller_storage.h" | ||||
| 
 | ||||
|  | @ -29,6 +30,20 @@ std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; / | |||
| std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
 | ||||
| tthread::mutex Controller::statsMutex; | ||||
| 
 | ||||
| //For server-wide totals. Local to this file only.
 | ||||
| struct streamTotals { | ||||
|   unsigned long long upBytes; | ||||
|   unsigned long long downBytes; | ||||
|   unsigned long long inputs; | ||||
|   unsigned long long outputs; | ||||
|   unsigned long long viewers; | ||||
|   unsigned long long currIns; | ||||
|   unsigned long long currOuts; | ||||
|   unsigned long long currViews; | ||||
|   uint8_t status; | ||||
| }; | ||||
| static std::map<std::string, struct streamTotals> streamStats; | ||||
| 
 | ||||
| Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){ | ||||
|   host = dhost; | ||||
|   crc = dcrc; | ||||
|  | @ -92,6 +107,8 @@ void Controller::SharedMemStats(void * config){ | |||
|   IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true); | ||||
|   statPointer = &statServer; | ||||
|   std::set<std::string> inactiveStreams; | ||||
|   Controller::initState(); | ||||
|   bool shiftWrites = true; | ||||
|   while(((Util::Config*)config)->is_active){ | ||||
|     { | ||||
|       tthread::lock_guard<tthread::mutex> guard(Controller::configMutex); | ||||
|  | @ -113,6 +130,50 @@ void Controller::SharedMemStats(void * config){ | |||
|           mustWipe.pop_front(); | ||||
|         } | ||||
|       } | ||||
|       Util::RelAccX * strmStats = streamsAccessor(); | ||||
|       if (!strmStats || !strmStats->isReady()){strmStats = 0;} | ||||
|       uint64_t strmPos = 0; | ||||
|       if (strmStats){ | ||||
|         if (shiftWrites || (strmStats->getEndPos() - strmStats->getDeleted() != streamStats.size())){ | ||||
|           shiftWrites = true; | ||||
|           strmPos = strmStats->getEndPos(); | ||||
|         }else{ | ||||
|           strmPos = strmStats->getDeleted(); | ||||
|         } | ||||
|       } | ||||
|       if (streamStats.size()){ | ||||
|         for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ | ||||
|           uint8_t newState = Util::getStreamStatus(it->first); | ||||
|           uint8_t oldState = it->second.status; | ||||
|           if (newState != oldState){ | ||||
|             it->second.status = newState; | ||||
|           } | ||||
|           if (newState == STRMSTAT_OFF){ | ||||
|             inactiveStreams.insert(it->first); | ||||
|           } | ||||
|           if (strmStats){ | ||||
|             if (shiftWrites){ | ||||
|               strmStats->setString("stream", it->first, strmPos); | ||||
|             } | ||||
|             strmStats->setInt("status", it->second.status, strmPos); | ||||
|             strmStats->setInt("viewers", it->second.currViews, strmPos); | ||||
|             strmStats->setInt("inputs", it->second.currIns, strmPos); | ||||
|             strmStats->setInt("outputs", it->second.currOuts, strmPos); | ||||
|             ++strmPos; | ||||
|           } | ||||
|         } | ||||
|       } | ||||
|       if (strmStats && shiftWrites){ | ||||
|         shiftWrites = false; | ||||
|         uint64_t prevEnd = strmStats->getEndPos(); | ||||
|         strmStats->setEndPos(strmPos); | ||||
|         strmStats->setDeleted(prevEnd); | ||||
|       } | ||||
|       while (inactiveStreams.size()){ | ||||
|         streamStats.erase(*inactiveStreams.begin()); | ||||
|         inactiveStreams.erase(inactiveStreams.begin()); | ||||
|         shiftWrites = true; | ||||
|       } | ||||
|     } | ||||
|     Util::wait(1000); | ||||
|   } | ||||
|  | @ -121,18 +182,89 @@ void Controller::SharedMemStats(void * config){ | |||
|   if (Controller::restarting){ | ||||
|     statServer.abandon(); | ||||
|   } | ||||
|   Controller::deinitState(Controller::restarting); | ||||
| } | ||||
| 
 | ||||
| /// Gets a complete list of all streams currently in active state, with optional prefix matching
 | ||||
| std::set<std::string> Controller::getActiveStreams(const std::string & prefix){ | ||||
|   std::set<std::string> ret; | ||||
|   Util::RelAccX * strmStats = streamsAccessor(); | ||||
|   if (!strmStats || !strmStats->isReady()){return ret;} | ||||
|   uint64_t endPos = strmStats->getEndPos(); | ||||
|   if (prefix.size()){ | ||||
|     for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ | ||||
|       if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} | ||||
|       const char * S = strmStats->getPointer("stream", i); | ||||
|       if (!strncmp(S, prefix.data(), prefix.size())){ | ||||
|         ret.insert(S); | ||||
|       } | ||||
|     } | ||||
|   }else{ | ||||
|     for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ | ||||
|       if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} | ||||
|       ret.insert(strmStats->getPointer("stream", i)); | ||||
|     } | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
| 
 | ||||
| /// Updates the given active connection with new stats data.
 | ||||
| void Controller::statSession::update(unsigned long index, IPC::statExchange & data){ | ||||
|   long long prevDown = getDown(); | ||||
|   long long prevUp = getUp(); | ||||
|   curConns[index].update(data); | ||||
|   //store timestamp of first received data, if older
 | ||||
|   if (firstSec > data.now()){ | ||||
|     firstSec = data.now(); | ||||
|   } | ||||
|   //store timestamp of last received data, if newer
 | ||||
|   if (data.now() > lastSec){ | ||||
|     lastSec = data.now(); | ||||
|   } | ||||
|   //store timestamp of first received data, if older
 | ||||
|   if (firstSec > data.now()){ | ||||
|     firstSec = data.now(); | ||||
|   long long currDown = getDown(); | ||||
|   long long currUp = getUp(); | ||||
|   if (currUp - prevUp < 0 || currDown-prevDown < 0){ | ||||
|     INFO_MSG("Negative data usage! %lldu/%lldd (u%lld->%lld) in %s over %s, #%lu", currUp-prevUp, currDown-prevDown, prevUp, currUp, data.streamName().c_str(), data.connector().c_str(), index); | ||||
|   } | ||||
|   if (currDown + currUp > COUNTABLE_BYTES){ | ||||
|     std::string streamName = data.streamName(); | ||||
|     if (prevUp + prevDown < COUNTABLE_BYTES){ | ||||
|       if (data.connector() == "INPUT"){ | ||||
|         streamStats[streamName].inputs++; | ||||
|         streamStats[streamName].currIns++; | ||||
|         sessionType = SESS_INPUT; | ||||
|       }else if (data.connector() == "OUTPUT"){ | ||||
|         streamStats[streamName].outputs++; | ||||
|         streamStats[streamName].currOuts++; | ||||
|         sessionType = SESS_OUTPUT; | ||||
|       }else{ | ||||
|         streamStats[streamName].viewers++; | ||||
|         streamStats[streamName].currViews++; | ||||
|         sessionType = SESS_VIEWER; | ||||
|       } | ||||
|       if (!streamName.size() || streamName[0] == 0){ | ||||
|         if (streamStats.count(streamName)){streamStats.erase(streamName);} | ||||
|       }else{ | ||||
|         streamStats[streamName].upBytes += currUp; | ||||
|         streamStats[streamName].downBytes += currDown; | ||||
|       } | ||||
|     }else{ | ||||
|       if (!streamName.size() || streamName[0] == 0){ | ||||
|         if (streamStats.count(streamName)){streamStats.erase(streamName);} | ||||
|       }else{ | ||||
|         streamStats[streamName].upBytes += currUp - prevUp; | ||||
|         streamStats[streamName].downBytes += currDown - prevDown; | ||||
|       } | ||||
|       if (sessionType == SESS_UNSET){ | ||||
|         if (data.connector() == "INPUT"){ | ||||
|           sessionType = SESS_INPUT; | ||||
|         }else if (data.connector() == "OUTPUT"){ | ||||
|           sessionType = SESS_OUTPUT; | ||||
|         }else{ | ||||
|           sessionType = SESS_VIEWER; | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
|  | @ -171,6 +303,25 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){ | |||
|   } | ||||
| } | ||||
| 
 | ||||
| void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){ | ||||
|   if (lastSec < disconnectPoint){ | ||||
|     switch (sessionType){ | ||||
|       case SESS_INPUT: | ||||
|         streamStats[index.streamName].currIns--; | ||||
|         break; | ||||
|       case SESS_OUTPUT: | ||||
|         streamStats[index.streamName].currOuts--; | ||||
|         break; | ||||
|       case SESS_VIEWER: | ||||
|         streamStats[index.streamName].currViews--; | ||||
|         break; | ||||
|     } | ||||
|     uint64_t duration = lastSec - firstSec; | ||||
|     if (duration < 1){duration = 1;} | ||||
|     Controller::logAccess("", index.streamName, index.connector, index.host, duration, getUp(), getDown(), ""); | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| /// Archives the given connection.
 | ||||
| void Controller::statSession::finish(unsigned long index){ | ||||
|   oldConns.push_back(curConns[index]); | ||||
|  | @ -181,6 +332,7 @@ void Controller::statSession::finish(unsigned long index){ | |||
| Controller::statSession::statSession(){ | ||||
|   firstSec = 0xFFFFFFFFFFFFFFFFull; | ||||
|   lastSec = 0; | ||||
|   sessionType = SESS_UNSET; | ||||
| } | ||||
| 
 | ||||
| /// Moves the given connection to the given session
 | ||||
|  | @ -351,6 +503,32 @@ long long Controller::statSession::getUp(unsigned long long t){ | |||
|   return retVal; | ||||
| } | ||||
| 
 | ||||
| /// Returns the cumulative downloaded bytes for this session at timestamp t.
 | ||||
| long long Controller::statSession::getDown(){ | ||||
|   long long retVal = 0; | ||||
|   if (curConns.size()){ | ||||
|     for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){ | ||||
|       if (it->second.log.size()){ | ||||
|         retVal += it->second.log.rbegin()->second.down; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   return retVal; | ||||
| } | ||||
| 
 | ||||
| /// Returns the cumulative uploaded bytes for this session at timestamp t.
 | ||||
| long long Controller::statSession::getUp(){ | ||||
|   long long retVal = 0; | ||||
|   if (curConns.size()){ | ||||
|     for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){ | ||||
|       if (it->second.log.size()){ | ||||
|         retVal += it->second.log.rbegin()->second.up; | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|   return retVal; | ||||
| } | ||||
| 
 | ||||
| /// Returns the cumulative downloaded bytes per second for this session at timestamp t.
 | ||||
| long long Controller::statSession::getBpsDown(unsigned long long t){ | ||||
|   unsigned long long aTime = t - 5; | ||||
|  |  | |||
|  | @ -19,6 +19,13 @@ namespace Controller { | |||
|     long long up; | ||||
|   }; | ||||
| 
 | ||||
|   enum sessType { | ||||
|     SESS_UNSET = 0, | ||||
|     SESS_INPUT, | ||||
|     SESS_OUTPUT, | ||||
|     SESS_VIEWER | ||||
|   }; | ||||
| 
 | ||||
|   /// This is a comparison and storage class that keeps sessions apart from each other.
 | ||||
|   /// Whenever two of these objects are not equal, it will create a new session.
 | ||||
|   class sessIndex { | ||||
|  | @ -60,13 +67,15 @@ namespace Controller { | |||
|       unsigned long long firstSec; | ||||
|       unsigned long long lastSec; | ||||
|       std::deque<statStorage> oldConns; | ||||
|       std::map<unsigned long, statStorage> curConns; | ||||
|       sessType sessionType; | ||||
|     public: | ||||
|       statSession(); | ||||
|       std::map<unsigned long, statStorage> curConns; | ||||
|       void wipeOld(unsigned long long); | ||||
|       void finish(unsigned long index); | ||||
|       void switchOverTo(statSession & newSess, unsigned long index); | ||||
|       void update(unsigned long index, IPC::statExchange & data); | ||||
|       void ping(const sessIndex & index, unsigned long long disconnectPoint); | ||||
|       unsigned long long getStart(); | ||||
|       unsigned long long getEnd(); | ||||
|       bool hasDataFor(unsigned long long time); | ||||
|  | @ -74,6 +83,8 @@ namespace Controller { | |||
|       long long getConnTime(unsigned long long time); | ||||
|       long long getLastSecond(unsigned long long time); | ||||
|       long long getDown(unsigned long long time); | ||||
|       long long getUp(); | ||||
|       long long getDown(); | ||||
|       long long getUp(unsigned long long time); | ||||
|       long long getBpsDown(unsigned long long time); | ||||
|       long long getBpsUp(unsigned long long time); | ||||
|  | @ -85,6 +96,8 @@ namespace Controller { | |||
|   extern std::map<sessIndex, statSession> sessions; | ||||
|   extern std::map<unsigned long, sessIndex> connToSession; | ||||
|   extern tthread::mutex statsMutex; | ||||
| 
 | ||||
|   std::set<std::string> getActiveStreams(const std::string & prefix = ""); | ||||
|   void parseStatistics(char * data, size_t len, unsigned int id); | ||||
|   void fillClients(JSON::Value & req, JSON::Value & rep); | ||||
|   void fillTotals(JSON::Value & req, JSON::Value & rep); | ||||
|  |  | |||
|  | @ -18,27 +18,81 @@ namespace Controller { | |||
|   JSON::Value Storage; ///< Global storage of data.
 | ||||
|   tthread::mutex configMutex; | ||||
|   tthread::mutex logMutex; | ||||
|   unsigned long long logCounter = 0; | ||||
|   bool configChanged = false; | ||||
|   bool restarting = false; | ||||
|   bool isTerminal = false; | ||||
|   bool isColorized = false; | ||||
|   uint32_t maxLogsRecs = 0; | ||||
|   uint32_t maxAccsRecs = 0; | ||||
|   uint64_t firstLog = 0; | ||||
|   IPC::sharedPage * shmLogs = 0; | ||||
|   Util::RelAccX * rlxLogs = 0; | ||||
|   IPC::sharedPage * shmAccs = 0; | ||||
|   Util::RelAccX * rlxAccs = 0; | ||||
|   IPC::sharedPage * shmStrm = 0; | ||||
|   Util::RelAccX * rlxStrm = 0; | ||||
| 
 | ||||
|   Util::RelAccX * logAccessor(){ | ||||
|     return rlxLogs; | ||||
|   } | ||||
| 
 | ||||
|   Util::RelAccX * accesslogAccessor(){ | ||||
|     return rlxAccs; | ||||
|   } | ||||
| 
 | ||||
|   Util::RelAccX * streamsAccessor(){ | ||||
|     return rlxStrm; | ||||
|   } | ||||
| 
 | ||||
|   ///\brief Store and print a log message.
 | ||||
|   ///\param kind The type of message.
 | ||||
|   ///\param message The message to be logged.
 | ||||
|   void Log(std::string kind, std::string message, bool noWriteToLog){ | ||||
|     tthread::lock_guard<tthread::mutex> guard(logMutex); | ||||
|     JSON::Value m; | ||||
|     m.append(Util::epoch()); | ||||
|     m.append(kind); | ||||
|     m.append(message); | ||||
|     Storage["log"].append(m); | ||||
|     Storage["log"].shrink(100); // limit to 100 log messages
 | ||||
|     if (!noWriteToLog){ | ||||
|     if (noWriteToLog){ | ||||
|       tthread::lock_guard<tthread::mutex> guard(logMutex); | ||||
|       JSON::Value m; | ||||
|       uint64_t logTime = Util::epoch(); | ||||
|       m.append((long long)logTime); | ||||
|       m.append(kind); | ||||
|       m.append(message); | ||||
|       Storage["log"].append(m); | ||||
|       Storage["log"].shrink(100); // limit to 100 log messages
 | ||||
|       logCounter++; | ||||
|       if (rlxLogs && rlxLogs->isReady()){ | ||||
|         if (!firstLog){ | ||||
|           firstLog = logCounter; | ||||
|         } | ||||
|         rlxLogs->setRCount(logCounter > maxLogsRecs ? maxLogsRecs : logCounter); | ||||
|         rlxLogs->setDeleted(logCounter > rlxLogs->getRCount() ? logCounter - rlxLogs->getRCount() : firstLog); | ||||
|         rlxLogs->setInt("time", logTime, logCounter-1); | ||||
|         rlxLogs->setString("kind", kind, logCounter-1); | ||||
|         rlxLogs->setString("msg", message, logCounter-1); | ||||
|         rlxLogs->setEndPos(logCounter); | ||||
|       } | ||||
|     }else{ | ||||
|       std::cerr << kind << "|MistController|" << getpid() << "||" << message << "\n"; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   void logAccess(const std::string & sessId, const std::string & strm, const std::string & conn, const std::string & host, uint64_t duration, uint64_t up, uint64_t down, const std::string & tags){ | ||||
|     if (rlxAccs && rlxAccs->isReady()){ | ||||
|       uint64_t newEndPos = rlxAccs->getEndPos(); | ||||
|       rlxAccs->setRCount(newEndPos+1 > maxLogsRecs ? maxAccsRecs : newEndPos+1); | ||||
|       rlxAccs->setDeleted(newEndPos + 1 > maxAccsRecs ? newEndPos + 1 - maxAccsRecs : 0); | ||||
|       rlxAccs->setInt("time", Util::epoch(), newEndPos); | ||||
|       rlxAccs->setString("session", sessId, newEndPos); | ||||
|       rlxAccs->setString("stream", strm, newEndPos); | ||||
|       rlxAccs->setString("connector", conn, newEndPos); | ||||
|       rlxAccs->setString("host", host, newEndPos); | ||||
|       rlxAccs->setInt("duration", duration, newEndPos); | ||||
|       rlxAccs->setInt("up", up, newEndPos); | ||||
|       rlxAccs->setInt("down", down, newEndPos); | ||||
|       rlxAccs->setString("tags", tags, newEndPos); | ||||
|       rlxAccs->setEndPos(newEndPos + 1); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   ///\brief Write contents to Filename
 | ||||
|   ///\param Filename The full path of the file to write to.
 | ||||
|   ///\param contents The data to be written to the file.
 | ||||
|  | @ -49,7 +103,93 @@ namespace Controller { | |||
|     File.close(); | ||||
|     return File.good(); | ||||
|   } | ||||
|    | ||||
| 
 | ||||
|   void initState(){ | ||||
|     tthread::lock_guard<tthread::mutex> guard(logMutex); | ||||
|     shmLogs = new IPC::sharedPage(SHM_STATE_LOGS, 1024*1024, true);//max 1M of logs cached
 | ||||
|     if (!shmLogs->mapped){ | ||||
|       FAIL_MSG("Could not open memory page for logs buffer"); | ||||
|       return; | ||||
|     } | ||||
|     rlxLogs = new Util::RelAccX(shmLogs->mapped, false); | ||||
|     if (rlxLogs->isReady()){ | ||||
|       logCounter = rlxLogs->getEndPos(); | ||||
|     }else{ | ||||
|       rlxLogs->addField("time", RAX_64UINT); | ||||
|       rlxLogs->addField("kind", RAX_32STRING); | ||||
|       rlxLogs->addField("msg", RAX_512STRING); | ||||
|       rlxLogs->setReady(); | ||||
|     } | ||||
|     maxLogsRecs = (1024*1024 - rlxLogs->getOffset()) / rlxLogs->getRSize(); | ||||
| 
 | ||||
|     shmAccs = new IPC::sharedPage(SHM_STATE_ACCS, 1024*1024, true);//max 1M of accesslogs cached
 | ||||
|     if (!shmAccs->mapped){ | ||||
|       FAIL_MSG("Could not open memory page for access logs buffer"); | ||||
|       return; | ||||
|     } | ||||
|     rlxAccs = new Util::RelAccX(shmAccs->mapped, false); | ||||
|     if (!rlxAccs->isReady()){ | ||||
|       rlxAccs->addField("time", RAX_64UINT); | ||||
|       rlxAccs->addField("session", RAX_32STRING); | ||||
|       rlxAccs->addField("stream", RAX_128STRING); | ||||
|       rlxAccs->addField("connector", RAX_32STRING); | ||||
|       rlxAccs->addField("host", RAX_64STRING); | ||||
|       rlxAccs->addField("duration", RAX_32UINT); | ||||
|       rlxAccs->addField("up", RAX_64UINT); | ||||
|       rlxAccs->addField("down", RAX_64UINT); | ||||
|       rlxAccs->addField("tags", RAX_256STRING); | ||||
|       rlxAccs->setReady(); | ||||
|     } | ||||
|     maxAccsRecs = (1024*1024 - rlxAccs->getOffset()) / rlxAccs->getRSize(); | ||||
| 
 | ||||
|     shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024*1024, true);//max 1M of stream data
 | ||||
|     if (!shmStrm->mapped){ | ||||
|       FAIL_MSG("Could not open memory page for stream data"); | ||||
|       return; | ||||
|     } | ||||
|     rlxStrm = new Util::RelAccX(shmStrm->mapped, false); | ||||
|     if (!rlxStrm->isReady()){ | ||||
|       rlxStrm->addField("stream", RAX_128STRING); | ||||
|       rlxStrm->addField("status", RAX_UINT, 1); | ||||
|       rlxStrm->addField("viewers", RAX_64UINT); | ||||
|       rlxStrm->addField("inputs", RAX_64UINT); | ||||
|       rlxStrm->addField("outputs", RAX_64UINT); | ||||
|       rlxStrm->setReady(); | ||||
|     } | ||||
|     rlxStrm->setRCount((1024*1024 - rlxStrm->getOffset()) / rlxStrm->getRSize()); | ||||
|   } | ||||
| 
 | ||||
|   void deinitState(bool leaveBehind){ | ||||
|     tthread::lock_guard<tthread::mutex> guard(logMutex); | ||||
|     if (!leaveBehind){ | ||||
|       rlxLogs->setExit(); | ||||
|       shmLogs->master = true; | ||||
|       rlxAccs->setExit(); | ||||
|       shmAccs->master = true; | ||||
|       rlxStrm->setExit(); | ||||
|       shmStrm->master = true; | ||||
|     }else{ | ||||
|       shmLogs->master = false; | ||||
|       shmAccs->master = false; | ||||
|       shmStrm->master = false; | ||||
|     } | ||||
|     Util::RelAccX * tmp = rlxLogs; | ||||
|     rlxLogs = 0; | ||||
|     delete tmp; | ||||
|     delete shmLogs; | ||||
|     shmLogs = 0; | ||||
|     tmp = rlxAccs; | ||||
|     rlxAccs = 0; | ||||
|     delete tmp; | ||||
|     delete shmAccs; | ||||
|     shmAccs = 0; | ||||
|     tmp = rlxStrm; | ||||
|     rlxStrm = 0; | ||||
|     delete tmp; | ||||
|     delete shmStrm; | ||||
|     shmStrm = 0; | ||||
|   } | ||||
| 
 | ||||
|   void handleMsg(void *err){ | ||||
|     Util::logParser((long long)err, fileno(stdout), Controller::isColorized, &Log); | ||||
|   } | ||||
|  |  | |||
|  | @ -2,6 +2,7 @@ | |||
| #include <mist/json.h> | ||||
| #include <mist/config.h> | ||||
| #include <mist/tinythread.h> | ||||
| #include <mist/util.h> | ||||
| 
 | ||||
| namespace Controller { | ||||
|   extern std::string instanceId; ///<global storage of instanceId (previously uniqID) is set in controller.cpp
 | ||||
|  | @ -13,16 +14,23 @@ namespace Controller { | |||
|   extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
 | ||||
|   extern bool isTerminal;///< True if connected to a terminal and not a log file.
 | ||||
|   extern bool isColorized;///< True if we colorize the output
 | ||||
|   extern unsigned long long logCounter; ///<Count of logged messages since boot
 | ||||
|    | ||||
|   Util::RelAccX * logAccessor(); | ||||
|   Util::RelAccX * accesslogAccessor(); | ||||
|   Util::RelAccX * streamsAccessor(); | ||||
| 
 | ||||
|   /// Store and print a log message.
 | ||||
|   void Log(std::string kind, std::string message, bool noWriteToLog = false); | ||||
|   void logAccess(const std::string & sessId, const std::string & strm, const std::string & conn, const std::string & host, uint64_t duration, uint64_t up, uint64_t down, const std::string & tags); | ||||
| 
 | ||||
|   /// Write contents to Filename.
 | ||||
|   bool WriteFile(std::string Filename, std::string contents); | ||||
|   void writeConfigToDisk(); | ||||
|    | ||||
|   void handleMsg(void * err); | ||||
| 
 | ||||
|   void initState(); | ||||
|   void deinitState(bool leaveBehind); | ||||
|   void writeConfig(); | ||||
| 
 | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Thulinma
						Thulinma