Library code style update + some minor backports from Pro edition
This commit is contained in:
parent
593b291e85
commit
2607113727
68 changed files with 4538 additions and 4665 deletions
|
@ -53,7 +53,7 @@ bool AnalyserDTSC::parsePacket(){
|
|||
}
|
||||
if (detail >= 8){
|
||||
char * payDat;
|
||||
unsigned int payLen;
|
||||
size_t payLen;
|
||||
P.getString("data", payDat, payLen);
|
||||
for (uint64_t i = 0; i < payLen; ++i){
|
||||
if ((i % 32) == 0){std::cout << std::endl;}
|
||||
|
|
|
@ -321,7 +321,7 @@ int main_loop(int argc, char **argv){
|
|||
|
||||
// Check if we have a usable server, if not, print messages with helpful hints
|
||||
{
|
||||
std::string web_port = JSON::Value((long long)Controller::conf.getInteger("port")).asString();
|
||||
std::string web_port = JSON::Value(Controller::conf.getInteger("port")).asString();
|
||||
// check for username
|
||||
if (!Controller::Storage.isMember("account") || Controller::Storage["account"].size() < 1){
|
||||
Controller::Log("CONF",
|
||||
|
@ -406,7 +406,6 @@ int main(int argc, char **argv){
|
|||
Util::Procs::setHandler(); // set child handler
|
||||
{
|
||||
struct sigaction new_action;
|
||||
struct sigaction cur_action;
|
||||
new_action.sa_sigaction = handleUSR1;
|
||||
sigemptyset(&new_action.sa_mask);
|
||||
new_action.sa_flags = 0;
|
||||
|
@ -423,7 +422,6 @@ int main(int argc, char **argv){
|
|||
Util::Procs::reaper_thread = 0;
|
||||
{
|
||||
struct sigaction new_action;
|
||||
struct sigaction cur_action;
|
||||
new_action.sa_sigaction = handleUSR1;
|
||||
sigemptyset(&new_action.sa_mask);
|
||||
new_action.sa_flags = 0;
|
||||
|
@ -460,7 +458,7 @@ int main(int argc, char **argv){
|
|||
execvp(myFile.c_str(), argv);
|
||||
FAIL_MSG("Error restarting: %s", strerror(errno));
|
||||
}
|
||||
INFO_MSG("Controller uncleanly shut down! Restarting in %llu...", reTimer);
|
||||
INFO_MSG("Controller uncleanly shut down! Restarting in %" PRIu64 "...", reTimer);
|
||||
Util::wait(reTimer);
|
||||
reTimer += 1000;
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
|
|||
sent = true;
|
||||
JSON::Value tmp;
|
||||
tmp[0u] = "log";
|
||||
tmp[1u].append((long long)rlxLog.getInt("time", logPos));
|
||||
tmp[1u].append(rlxLog.getInt("time", logPos));
|
||||
tmp[1u].append(rlxLog.getPointer("kind", logPos));
|
||||
tmp[1u].append(rlxLog.getPointer("msg", logPos));
|
||||
W.sendFrame(tmp.toString());
|
||||
|
@ -192,14 +192,14 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
|
|||
sent = true;
|
||||
JSON::Value tmp;
|
||||
tmp[0u] = "access";
|
||||
tmp[1u].append((long long)rlxAccs.getInt("time", accsPos));
|
||||
tmp[1u].append(rlxAccs.getInt("time", accsPos));
|
||||
tmp[1u].append(rlxAccs.getPointer("session", accsPos));
|
||||
tmp[1u].append(rlxAccs.getPointer("stream", accsPos));
|
||||
tmp[1u].append(rlxAccs.getPointer("connector", accsPos));
|
||||
tmp[1u].append(rlxAccs.getPointer("host", accsPos));
|
||||
tmp[1u].append((long long)rlxAccs.getInt("duration", accsPos));
|
||||
tmp[1u].append((long long)rlxAccs.getInt("up", accsPos));
|
||||
tmp[1u].append((long long)rlxAccs.getInt("down", accsPos));
|
||||
tmp[1u].append(rlxAccs.getInt("duration", accsPos));
|
||||
tmp[1u].append(rlxAccs.getInt("up", accsPos));
|
||||
tmp[1u].append(rlxAccs.getInt("down", accsPos));
|
||||
tmp[1u].append(rlxAccs.getPointer("tags", accsPos));
|
||||
W.sendFrame(tmp.toString());
|
||||
accsPos++;
|
||||
|
@ -220,10 +220,10 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
|
|||
JSON::Value tmp;
|
||||
tmp[0u] = "stream";
|
||||
tmp[1u].append(strm);
|
||||
tmp[1u].append((long long)tmpStat.status);
|
||||
tmp[1u].append((long long)tmpStat.viewers);
|
||||
tmp[1u].append((long long)tmpStat.inputs);
|
||||
tmp[1u].append((long long)tmpStat.outputs);
|
||||
tmp[1u].append(tmpStat.status);
|
||||
tmp[1u].append(tmpStat.viewers);
|
||||
tmp[1u].append(tmpStat.inputs);
|
||||
tmp[1u].append(tmpStat.outputs);
|
||||
W.sendFrame(tmp.toString());
|
||||
}
|
||||
}
|
||||
|
@ -233,10 +233,10 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
|
|||
JSON::Value tmp;
|
||||
tmp[0u] = "stream";
|
||||
tmp[1u].append(strm);
|
||||
tmp[1u].append((long long)0);
|
||||
tmp[1u].append((long long)0);
|
||||
tmp[1u].append((long long)0);
|
||||
tmp[1u].append((long long)0);
|
||||
tmp[1u].append(0u);
|
||||
tmp[1u].append(0u);
|
||||
tmp[1u].append(0u);
|
||||
tmp[1u].append(0u);
|
||||
W.sendFrame(tmp.toString());
|
||||
strmRemove.erase(strm);
|
||||
lastStrmStat.erase(strm);
|
||||
|
@ -534,7 +534,7 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response
|
|||
}
|
||||
if (!Request.isMember("minimal") || Request.isMember("streams") || Request.isMember("addstream") || Request.isMember("deletestream")){
|
||||
if (!Request.isMember("streams") && (Request.isMember("addstream") || Request.isMember("deletestream"))){
|
||||
Response["streams"]["incomplete list"] = 1ll;
|
||||
Response["streams"]["incomplete list"] = 1u;
|
||||
if (Request.isMember("addstream")){
|
||||
jsonForEach(Request["addstream"], jit){
|
||||
if (Controller::Storage["streams"].isMember(jit.key())){
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include <string.h>
|
||||
#include <fstream>
|
||||
#include <set>
|
||||
#include <mist/defines.h>
|
||||
#include <mist/config.h>
|
||||
#include <mist/procs.h>
|
||||
#include "controller_capabilities.h"
|
||||
|
@ -233,23 +234,23 @@ namespace Controller {
|
|||
}
|
||||
continue;
|
||||
}
|
||||
long long int i;
|
||||
if (sscanf(line, "MemTotal : %lli kB", &i) == 1){
|
||||
uint64_t i;
|
||||
if (sscanf(line, "MemTotal : %" PRIu64 " kB", &i) == 1){
|
||||
capa["mem"]["total"] = i / 1024;
|
||||
}
|
||||
if (sscanf(line, "MemFree : %lli kB", &i) == 1){
|
||||
if (sscanf(line, "MemFree : %" PRIu64 " kB", &i) == 1){
|
||||
capa["mem"]["free"] = i / 1024;
|
||||
}
|
||||
if (sscanf(line, "SwapTotal : %lli kB", &i) == 1){
|
||||
if (sscanf(line, "SwapTotal : %" PRIu64 " kB", &i) == 1){
|
||||
capa["mem"]["swaptotal"] = i / 1024;
|
||||
}
|
||||
if (sscanf(line, "SwapFree : %lli kB", &i) == 1){
|
||||
if (sscanf(line, "SwapFree : %" PRIu64 " kB", &i) == 1){
|
||||
capa["mem"]["swapfree"] = i / 1024;
|
||||
}
|
||||
if (sscanf(line, "Buffers : %lli kB", &i) == 1){
|
||||
if (sscanf(line, "Buffers : %" PRIu64 " kB", &i) == 1){
|
||||
bufcache += i / 1024;
|
||||
}
|
||||
if (sscanf(line, "Cached : %lli kB", &i) == 1){
|
||||
if (sscanf(line, "Cached : %" PRIu64 " kB", &i) == 1){
|
||||
bufcache += i / 1024;
|
||||
}
|
||||
}
|
||||
|
@ -265,23 +266,23 @@ namespace Controller {
|
|||
//parse lines here
|
||||
float onemin, fivemin, fifteenmin;
|
||||
if (sscanf(line, "%f %f %f", &onemin, &fivemin, &fifteenmin) == 3){
|
||||
capa["load"]["one"] = (long long int)(onemin * 100);
|
||||
capa["load"]["five"] = (long long int)(fivemin * 100);
|
||||
capa["load"]["fifteen"] = (long long int)(fifteenmin * 100);
|
||||
capa["load"]["one"] = uint64_t(onemin * 100);
|
||||
capa["load"]["five"] = uint64_t(fivemin * 100);
|
||||
capa["load"]["fifteen"] = uint64_t(fifteenmin * 100);
|
||||
}
|
||||
}
|
||||
std::ifstream cpustat("/proc/stat");
|
||||
if (cpustat){
|
||||
char line[300];
|
||||
while (cpustat.getline(line, 300)){
|
||||
static unsigned long long cl_total = 0, cl_idle = 0;
|
||||
unsigned long long c_user, c_nice, c_syst, c_idle, c_total;
|
||||
if (sscanf(line, "cpu %Lu %Lu %Lu %Lu", &c_user, &c_nice, &c_syst, &c_idle) == 4){
|
||||
static uint64_t cl_total = 0, cl_idle = 0;
|
||||
uint64_t c_user, c_nice, c_syst, c_idle, c_total;
|
||||
if (sscanf(line, "cpu %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, &c_user, &c_nice, &c_syst, &c_idle) == 4){
|
||||
c_total = c_user + c_nice + c_syst + c_idle;
|
||||
if (c_total - cl_total > 0){
|
||||
capa["cpu_use"] = (long long int)(1000 - ((c_idle - cl_idle) * 1000) / (c_total - cl_total));
|
||||
capa["cpu_use"] = (1000 - ((c_idle - cl_idle) * 1000) / (c_total - cl_total));
|
||||
}else{
|
||||
capa["cpu_use"] = 0ll;
|
||||
capa["cpu_use"] = 0u;
|
||||
}
|
||||
cl_total = c_total;
|
||||
cl_idle = c_idle;
|
||||
|
|
|
@ -63,7 +63,10 @@ namespace Controller {
|
|||
/// Deletes the shared memory page with connector information
|
||||
/// in preparation of shutdown.
|
||||
void prepareActiveConnectorsForShutdown(){
|
||||
IPC::sharedPage f("MstCnns", 4096, true, false);
|
||||
IPC::sharedPage f("MstCnns", 4096, false, false);
|
||||
if (f){
|
||||
f.master = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// Forgets all active connectors, preventing them from being killed,
|
||||
|
@ -103,7 +106,7 @@ namespace Controller {
|
|||
}else{
|
||||
if (it.key() == "debug"){
|
||||
static std::string debugLvlStr;
|
||||
debugLvlStr = JSON::Value((long long)Util::Config::printDebugLevel).asString();
|
||||
debugLvlStr = JSON::Value(Util::Config::printDebugLevel).asString();
|
||||
argarr[argnum++] = (char*)((*it)["option"].asStringRef().c_str());
|
||||
argarr[argnum++] = (char*)debugLvlStr.c_str();
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
#include <mist/shared_memory.h>
|
||||
#include <mist/dtsc.h>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/bitfields.h>
|
||||
#include "controller_statistics.h"
|
||||
#include "controller_storage.h"
|
||||
|
||||
|
@ -36,14 +37,14 @@ tthread::mutex Controller::statsMutex;
|
|||
|
||||
//For server-wide totals. Local to this file only.
|
||||
struct streamTotals {
|
||||
unsigned long long upBytes;
|
||||
unsigned long long downBytes;
|
||||
unsigned long long inputs;
|
||||
unsigned long long outputs;
|
||||
unsigned long long viewers;
|
||||
unsigned long long currIns;
|
||||
unsigned long long currOuts;
|
||||
unsigned long long currViews;
|
||||
uint64_t upBytes;
|
||||
uint64_t downBytes;
|
||||
uint64_t inputs;
|
||||
uint64_t outputs;
|
||||
uint64_t viewers;
|
||||
uint64_t currIns;
|
||||
uint64_t currOuts;
|
||||
uint64_t currViews;
|
||||
uint8_t status;
|
||||
};
|
||||
static std::map<std::string, struct streamTotals> streamStats;
|
||||
|
@ -107,7 +108,7 @@ IPC::sharedServer * statPointer = 0;
|
|||
/// statistics from all connected clients, as well as wipes
|
||||
/// old statistics that have disconnected over 10 minutes ago.
|
||||
void Controller::SharedMemStats(void * config){
|
||||
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
|
||||
HIGH_MSG("Starting stats thread");
|
||||
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
|
||||
statPointer = &statServer;
|
||||
std::set<std::string> inactiveStreams;
|
||||
|
@ -288,7 +289,7 @@ Controller::sessType Controller::statSession::getSessType(){
|
|||
}
|
||||
|
||||
/// Archives the given connection.
|
||||
void Controller::statSession::wipeOld(unsigned long long cutOff){
|
||||
void Controller::statSession::wipeOld(uint64_t cutOff){
|
||||
if (firstSec > cutOff){
|
||||
return;
|
||||
}
|
||||
|
@ -326,7 +327,7 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){
|
|||
}
|
||||
}
|
||||
|
||||
void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){
|
||||
void Controller::statSession::ping(const Controller::sessIndex & index, uint64_t disconnectPoint){
|
||||
if (!tracked){return;}
|
||||
if (lastSec < disconnectPoint){
|
||||
switch (sessionType){
|
||||
|
@ -339,6 +340,8 @@ void Controller::statSession::ping(const Controller::sessIndex & index, unsigned
|
|||
case SESS_VIEWER:
|
||||
if (streamStats[index.streamName].currViews){streamStats[index.streamName].currViews--;}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
uint64_t duration = lastSec - firstActive;
|
||||
if (duration < 1){duration = 1;}
|
||||
|
@ -418,17 +421,17 @@ void Controller::statSession::switchOverTo(statSession & newSess, unsigned long
|
|||
}
|
||||
|
||||
/// Returns the first measured timestamp in this session.
|
||||
unsigned long long Controller::statSession::getStart(){
|
||||
uint64_t Controller::statSession::getStart(){
|
||||
return firstSec;
|
||||
}
|
||||
|
||||
/// Returns the last measured timestamp in this session.
|
||||
unsigned long long Controller::statSession::getEnd(){
|
||||
uint64_t Controller::statSession::getEnd(){
|
||||
return lastSec;
|
||||
}
|
||||
|
||||
/// Returns true if there is data for this session at timestamp t.
|
||||
bool Controller::statSession::hasDataFor(unsigned long long t){
|
||||
bool Controller::statSession::hasDataFor(uint64_t t){
|
||||
if (lastSec < t){return false;}
|
||||
if (firstSec > t){return false;}
|
||||
if (oldConns.size()){
|
||||
|
@ -437,7 +440,7 @@ bool Controller::statSession::hasDataFor(unsigned long long t){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.hasDataFor(t)){return true;}
|
||||
}
|
||||
}
|
||||
|
@ -453,7 +456,7 @@ bool Controller::statSession::hasData(){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.log.size()){return true;}
|
||||
}
|
||||
}
|
||||
|
@ -461,7 +464,7 @@ bool Controller::statSession::hasData(){
|
|||
}
|
||||
|
||||
/// Returns true if this session should count as a viewer on the given timestamp.
|
||||
bool Controller::statSession::isViewerOn(unsigned long long t){
|
||||
bool Controller::statSession::isViewerOn(uint64_t t){
|
||||
return getUp(t) + getDown(t) > COUNTABLE_BYTES;
|
||||
}
|
||||
|
||||
|
@ -488,8 +491,8 @@ bool Controller::statSession::isViewer(){
|
|||
}
|
||||
|
||||
/// Returns the cumulative connected time for this session at timestamp t.
|
||||
long long Controller::statSession::getConnTime(unsigned long long t){
|
||||
long long retVal = 0;
|
||||
uint64_t Controller::statSession::getConnTime(uint64_t t){
|
||||
uint64_t retVal = 0;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->hasDataFor(t)){
|
||||
|
@ -498,7 +501,7 @@ long long Controller::statSession::getConnTime(unsigned long long t){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.hasDataFor(t)){
|
||||
retVal += it->second.getDataFor(t).time;
|
||||
}
|
||||
|
@ -508,9 +511,9 @@ long long Controller::statSession::getConnTime(unsigned long long t){
|
|||
}
|
||||
|
||||
/// Returns the last requested media timestamp for this session at timestamp t.
|
||||
long long Controller::statSession::getLastSecond(unsigned long long t){
|
||||
uint64_t Controller::statSession::getLastSecond(uint64_t t){
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.hasDataFor(t)){
|
||||
return it->second.getDataFor(t).lastSecond;
|
||||
}
|
||||
|
@ -527,8 +530,8 @@ long long Controller::statSession::getLastSecond(unsigned long long t){
|
|||
}
|
||||
|
||||
/// Returns the cumulative downloaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getDown(unsigned long long t){
|
||||
long long retVal = wipedDown;
|
||||
uint64_t Controller::statSession::getDown(uint64_t t){
|
||||
uint64_t retVal = wipedDown;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->hasDataFor(t)){
|
||||
|
@ -537,7 +540,7 @@ long long Controller::statSession::getDown(unsigned long long t){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.hasDataFor(t)){
|
||||
retVal += it->second.getDataFor(t).down;
|
||||
}
|
||||
|
@ -547,8 +550,8 @@ long long Controller::statSession::getDown(unsigned long long t){
|
|||
}
|
||||
|
||||
/// Returns the cumulative uploaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getUp(unsigned long long t){
|
||||
long long retVal = wipedUp;
|
||||
uint64_t Controller::statSession::getUp(uint64_t t){
|
||||
uint64_t retVal = wipedUp;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->hasDataFor(t)){
|
||||
|
@ -557,7 +560,7 @@ long long Controller::statSession::getUp(unsigned long long t){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.hasDataFor(t)){
|
||||
retVal += it->second.getDataFor(t).up;
|
||||
}
|
||||
|
@ -567,8 +570,8 @@ long long Controller::statSession::getUp(unsigned long long t){
|
|||
}
|
||||
|
||||
/// Returns the cumulative downloaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getDown(){
|
||||
long long retVal = wipedDown;
|
||||
uint64_t Controller::statSession::getDown(){
|
||||
uint64_t retVal = wipedDown;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->log.size()){
|
||||
|
@ -577,7 +580,7 @@ long long Controller::statSession::getDown(){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.log.size()){
|
||||
retVal += it->second.log.rbegin()->second.down;
|
||||
}
|
||||
|
@ -587,8 +590,8 @@ long long Controller::statSession::getDown(){
|
|||
}
|
||||
|
||||
/// Returns the cumulative uploaded bytes for this session at timestamp t.
|
||||
long long Controller::statSession::getUp(){
|
||||
long long retVal = wipedUp;
|
||||
uint64_t Controller::statSession::getUp(){
|
||||
uint64_t retVal = wipedUp;
|
||||
if (oldConns.size()){
|
||||
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
|
||||
if (it->log.size()){
|
||||
|
@ -597,7 +600,7 @@ long long Controller::statSession::getUp(){
|
|||
}
|
||||
}
|
||||
if (curConns.size()){
|
||||
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
|
||||
if (it->second.log.size()){
|
||||
retVal += it->second.log.rbegin()->second.up;
|
||||
}
|
||||
|
@ -607,35 +610,31 @@ long long Controller::statSession::getUp(){
|
|||
}
|
||||
|
||||
/// Returns the cumulative downloaded bytes per second for this session at timestamp t.
|
||||
long long Controller::statSession::getBpsDown(unsigned long long t){
|
||||
unsigned long long aTime = t - 5;
|
||||
uint64_t Controller::statSession::getBpsDown(uint64_t t){
|
||||
uint64_t aTime = t - 5;
|
||||
if (aTime < firstSec){
|
||||
aTime = firstSec;
|
||||
}
|
||||
long long valA = getDown(aTime);
|
||||
long long valB = getDown(t);
|
||||
if (t > aTime){
|
||||
//INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, (valB - valA) / (t - aTime));
|
||||
return (valB - valA) / (t - aTime);
|
||||
}else{
|
||||
//INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, 0);
|
||||
if (t <= aTime){
|
||||
return 0;
|
||||
}
|
||||
uint64_t valA = getDown(aTime);
|
||||
uint64_t valB = getDown(t);
|
||||
return (valB - valA) / (t - aTime);
|
||||
}
|
||||
|
||||
/// Returns the cumulative uploaded bytes per second for this session at timestamp t.
|
||||
long long Controller::statSession::getBpsUp(unsigned long long t){
|
||||
unsigned long long aTime = t - 5;
|
||||
uint64_t Controller::statSession::getBpsUp(uint64_t t){
|
||||
uint64_t aTime = t - 5;
|
||||
if (aTime < firstSec){
|
||||
aTime = firstSec;
|
||||
}
|
||||
long long valA = getUp(aTime);
|
||||
long long valB = getUp(t);
|
||||
if (t > aTime){
|
||||
return (valB - valA) / (t - aTime);
|
||||
}else{
|
||||
if (t <= aTime){
|
||||
return 0;
|
||||
}
|
||||
uint64_t valA = getUp(aTime);
|
||||
uint64_t valB = getUp(t);
|
||||
return (valB - valA) / (t - aTime);
|
||||
}
|
||||
|
||||
/// Returns true if there is data available for timestamp t.
|
||||
|
@ -679,20 +678,25 @@ void Controller::statStorage::update(IPC::statExchange & data) {
|
|||
|
||||
/// This function is called by the shared memory page that holds statistics.
|
||||
/// It updates the internally saved statistics data, moving across sessions or archiving when necessary.
|
||||
void Controller::parseStatistics(char * data, size_t len, unsigned int id){
|
||||
void Controller::parseStatistics(char * data, size_t len, uint32_t id){
|
||||
//retrieve stats data
|
||||
IPC::statExchange tmpEx(data);
|
||||
//calculate the current session index, store as idx.
|
||||
sessIndex idx(tmpEx);
|
||||
//if the connection was already indexed and it has changed, move it
|
||||
if (connToSession.count(id) && connToSession[id] != idx){
|
||||
if (sessions[connToSession[id]].getSessType() != SESS_UNSET){
|
||||
INFO_MSG("Switching connection %" PRIu32 " from active session %s over to %s", id, connToSession[id].toStr().c_str(), idx.toStr().c_str());
|
||||
}else{
|
||||
INFO_MSG("Switching connection %" PRIu32 " from inactive session %s over to %s", id, connToSession[id].toStr().c_str(), idx.toStr().c_str());
|
||||
}
|
||||
sessions[connToSession[id]].switchOverTo(sessions[idx], id);
|
||||
if (!sessions[connToSession[id]].hasData()){
|
||||
sessions.erase(connToSession[id]);
|
||||
}
|
||||
}
|
||||
if (!connToSession.count(id)){
|
||||
INSANE_MSG("New connection: %lu as %s", id, idx.toStr().c_str());
|
||||
INSANE_MSG("New connection: %" PRIu32 " as %s", id, idx.toStr().c_str());
|
||||
}
|
||||
//store the index for later comparison
|
||||
connToSession[id] = idx;
|
||||
|
@ -702,7 +706,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
|
|||
char counter = (*(data - 1)) & 0x7F;
|
||||
if (counter == 126 || counter == 127){
|
||||
//the data is no longer valid - connection has gone away, store for later
|
||||
INSANE_MSG("Ended connection: %lu as %s", id, idx.toStr().c_str());
|
||||
INSANE_MSG("Ended connection: %" PRIu32 " as %s", id, idx.toStr().c_str());
|
||||
sessions[idx].finish(id);
|
||||
connToSession.erase(id);
|
||||
}else{
|
||||
|
@ -763,7 +767,7 @@ bool Controller::hasViewers(std::string streamName){
|
|||
void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
|
||||
tthread::lock_guard<tthread::mutex> guard(statsMutex);
|
||||
//first, figure out the timestamp wanted
|
||||
long long int reqTime = 0;
|
||||
uint64_t reqTime = 0;
|
||||
if (req.isMember("time")){
|
||||
reqTime = req["time"].asInt();
|
||||
}
|
||||
|
@ -839,7 +843,7 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
|
|||
if (fields & STAT_CLI_UP){d.append(it->second.getUp(time));}
|
||||
if (fields & STAT_CLI_BPS_DOWN){d.append(it->second.getBpsDown(time));}
|
||||
if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));}
|
||||
if (fields & STAT_CLI_CRC){d.append((long long)it->first.crc);}
|
||||
if (fields & STAT_CLI_CRC){d.append(it->first.crc);}
|
||||
rep["data"].append(d);
|
||||
}
|
||||
}
|
||||
|
@ -917,7 +921,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){
|
|||
rep[*it].null();
|
||||
jsonForEach(req, j){
|
||||
if (j->asStringRef() == "clients"){
|
||||
rep[*it].append((long long)clients[*it]);
|
||||
rep[*it].append(clients[*it]);
|
||||
}
|
||||
if (j->asStringRef() == "lastms"){
|
||||
char pageId[NAME_BUFFER_SIZE];
|
||||
|
@ -941,7 +945,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){
|
|||
rep[*it].append(lms);
|
||||
metaLocker.post();
|
||||
}else{
|
||||
rep[*it].append(-1ll);
|
||||
rep[*it].append(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -961,20 +965,21 @@ class totalsData {
|
|||
downbps = 0;
|
||||
upbps = 0;
|
||||
}
|
||||
void add(unsigned int down, unsigned int up, Controller::sessType sT){
|
||||
void add(uint64_t down, uint64_t up, Controller::sessType sT){
|
||||
switch (sT){
|
||||
case Controller::SESS_VIEWER: clients++; break;
|
||||
case Controller::SESS_INPUT: inputs++; break;
|
||||
case Controller::SESS_OUTPUT: outputs++; break;
|
||||
default: break;
|
||||
}
|
||||
downbps += down;
|
||||
upbps += up;
|
||||
}
|
||||
long long clients;
|
||||
long long inputs;
|
||||
long long outputs;
|
||||
long long downbps;
|
||||
long long upbps;
|
||||
uint64_t clients;
|
||||
uint64_t inputs;
|
||||
uint64_t outputs;
|
||||
uint64_t downbps;
|
||||
uint64_t upbps;
|
||||
};
|
||||
|
||||
/// This takes a "totals" request, and fills in the response data.
|
||||
|
@ -1036,7 +1041,7 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
|
|||
if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");}
|
||||
if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");}
|
||||
//start data collection
|
||||
std::map<long long unsigned int, totalsData> totalsCount;
|
||||
std::map<uint64_t, totalsData> totalsCount;
|
||||
//loop over all sessions
|
||||
/// \todo Make the interval configurable instead of 1 second
|
||||
if (sessions.size()){
|
||||
|
@ -1061,13 +1066,13 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
|
|||
return;
|
||||
}
|
||||
//yay! We have data!
|
||||
rep["start"] = (long long)totalsCount.begin()->first;
|
||||
rep["end"] = (long long)totalsCount.rbegin()->first;
|
||||
rep["start"] = totalsCount.begin()->first;
|
||||
rep["end"] = totalsCount.rbegin()->first;
|
||||
rep["data"].null();
|
||||
rep["interval"].null();
|
||||
long long prevT = 0;
|
||||
uint64_t prevT = 0;
|
||||
JSON::Value i;
|
||||
for (std::map<long long unsigned int, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
|
||||
for (std::map<uint64_t, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
|
||||
JSON::Value d;
|
||||
if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);}
|
||||
if (fields & STAT_TOT_INPUTS){d.append(it->second.inputs);}
|
||||
|
@ -1077,13 +1082,13 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
|
|||
rep["data"].append(d);
|
||||
if (prevT){
|
||||
if (i.size() < 2){
|
||||
i.append(1ll);
|
||||
i.append((long long)(it->first - prevT));
|
||||
i.append(1u);
|
||||
i.append(it->first - prevT);
|
||||
}else{
|
||||
if (i[1u].asInt() != (long long)(it->first - prevT)){
|
||||
if (i[1u].asInt() != it->first - prevT){
|
||||
rep["interval"].append(i);
|
||||
i[0u] = 1ll;
|
||||
i[1u] = (long long)(it->first - prevT);
|
||||
i[0u] = 1u;
|
||||
i[1u] = it->first - prevT;
|
||||
}else{
|
||||
i[0u] = i[0u].asInt() + 1;
|
||||
}
|
||||
|
|
|
@ -13,10 +13,10 @@
|
|||
|
||||
namespace Controller {
|
||||
struct statLog {
|
||||
long time;
|
||||
long lastSecond;
|
||||
long long down;
|
||||
long long up;
|
||||
uint64_t time;
|
||||
uint64_t lastSecond;
|
||||
uint64_t down;
|
||||
uint64_t up;
|
||||
};
|
||||
|
||||
enum sessType {
|
||||
|
@ -61,38 +61,38 @@ namespace Controller {
|
|||
class statSession {
|
||||
private:
|
||||
uint64_t firstActive;
|
||||
unsigned long long firstSec;
|
||||
unsigned long long lastSec;
|
||||
unsigned long long wipedUp;
|
||||
unsigned long long wipedDown;
|
||||
uint64_t firstSec;
|
||||
uint64_t lastSec;
|
||||
uint64_t wipedUp;
|
||||
uint64_t wipedDown;
|
||||
std::deque<statStorage> oldConns;
|
||||
sessType sessionType;
|
||||
bool tracked;
|
||||
public:
|
||||
statSession();
|
||||
std::map<unsigned long, statStorage> curConns;
|
||||
std::map<uint64_t, statStorage> curConns;
|
||||
sessType getSessType();
|
||||
void wipeOld(unsigned long long);
|
||||
void finish(unsigned long index);
|
||||
void switchOverTo(statSession & newSess, unsigned long index);
|
||||
void update(unsigned long index, IPC::statExchange & data);
|
||||
void ping(const sessIndex & index, unsigned long long disconnectPoint);
|
||||
unsigned long long getStart();
|
||||
unsigned long long getEnd();
|
||||
bool isViewerOn(unsigned long long time);
|
||||
void wipeOld(uint64_t);
|
||||
void finish(uint64_t index);
|
||||
void switchOverTo(statSession & newSess, uint64_t index);
|
||||
void update(uint64_t index, IPC::statExchange & data);
|
||||
void ping(const sessIndex & index, uint64_t disconnectPoint);
|
||||
uint64_t getStart();
|
||||
uint64_t getEnd();
|
||||
bool isViewerOn(uint64_t time);
|
||||
bool isViewer();
|
||||
bool hasDataFor(unsigned long long time);
|
||||
bool hasDataFor(uint64_t time);
|
||||
bool hasData();
|
||||
long long getConnTime(unsigned long long time);
|
||||
long long getLastSecond(unsigned long long time);
|
||||
long long getDown(unsigned long long time);
|
||||
long long getUp();
|
||||
long long getDown();
|
||||
long long getUp(unsigned long long time);
|
||||
long long getBpsDown(unsigned long long time);
|
||||
long long getBpsUp(unsigned long long time);
|
||||
long long getBpsDown(unsigned long long start, unsigned long long end);
|
||||
long long getBpsUp(unsigned long long start, unsigned long long end);
|
||||
uint64_t getConnTime(uint64_t time);
|
||||
uint64_t getLastSecond(uint64_t time);
|
||||
uint64_t getDown(uint64_t time);
|
||||
uint64_t getUp();
|
||||
uint64_t getDown();
|
||||
uint64_t getUp(uint64_t time);
|
||||
uint64_t getBpsDown(uint64_t time);
|
||||
uint64_t getBpsUp(uint64_t time);
|
||||
uint64_t getBpsDown(uint64_t start, uint64_t end);
|
||||
uint64_t getBpsUp(uint64_t start, uint64_t end);
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace Controller {
|
|||
JSON::Value Storage; ///< Global storage of data.
|
||||
tthread::mutex configMutex;
|
||||
tthread::mutex logMutex;
|
||||
unsigned long long logCounter = 0;
|
||||
uint64_t logCounter = 0;
|
||||
bool configChanged = false;
|
||||
bool restarting = false;
|
||||
bool isTerminal = false;
|
||||
|
@ -53,7 +53,7 @@ namespace Controller {
|
|||
tthread::lock_guard<tthread::mutex> guard(logMutex);
|
||||
JSON::Value m;
|
||||
uint64_t logTime = Util::epoch();
|
||||
m.append((long long)logTime);
|
||||
m.append(logTime);
|
||||
m.append(kind);
|
||||
m.append(message);
|
||||
Storage["log"].append(m);
|
||||
|
|
|
@ -14,7 +14,7 @@ namespace Controller {
|
|||
extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
|
||||
extern bool isTerminal;///< True if connected to a terminal and not a log file.
|
||||
extern bool isColorized;///< True if we colorize the output
|
||||
extern unsigned long long logCounter; ///<Count of logged messages since boot
|
||||
extern uint64_t logCounter; ///<Count of logged messages since boot
|
||||
|
||||
Util::RelAccX * logAccessor();
|
||||
Util::RelAccX * accesslogAccessor();
|
||||
|
|
|
@ -107,7 +107,6 @@ namespace Controller {
|
|||
///\param data The stream configuration for the server.
|
||||
///\returns True if the server status changed
|
||||
bool CheckAllStreams(JSON::Value & data){
|
||||
long long int currTime = Util::epoch();
|
||||
jsonForEach(data, jit) {
|
||||
checkStream(jit.key(), (*jit));
|
||||
}
|
||||
|
|
|
@ -497,7 +497,7 @@ namespace Mist {
|
|||
//Read in the metadata through a temporary JSON object
|
||||
///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
|
||||
JSON::Value tempJSONForMeta;
|
||||
JSON::fromDTMI((const unsigned char *)tMeta.mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
|
||||
JSON::fromDTMI((const char *)tMeta.mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
|
||||
|
||||
tMeta.master = true;
|
||||
|
||||
|
@ -563,7 +563,7 @@ namespace Mist {
|
|||
//Read in the metadata through a temporary JSON object
|
||||
///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
|
||||
JSON::Value tempJSONForMeta;
|
||||
JSON::fromDTMI((const unsigned char *)nProxy.metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
|
||||
JSON::fromDTMI((const char *)nProxy.metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
|
||||
//Construct a metadata object for the current track
|
||||
DTSC::Meta trackMeta(tempJSONForMeta);
|
||||
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
|
||||
|
|
|
@ -140,7 +140,7 @@ namespace Mist {
|
|||
DTSC::Track & trk = myMeta.tracks[tmpTag.getTrackID()];
|
||||
if (trk.codec == "PCM" && trk.size == 16){
|
||||
char * ptr = 0;
|
||||
uint32_t ptrSize = 0;
|
||||
size_t ptrSize = 0;
|
||||
thisPacket.getString("data", ptr, ptrSize);
|
||||
for (uint32_t i = 0; i < ptrSize; i+=2){
|
||||
char tmpchar = ptr[i];
|
||||
|
|
|
@ -361,30 +361,6 @@ namespace Mist{
|
|||
uint64_t byteEnd = totalSize-1;
|
||||
uint64_t byteStart = 0;
|
||||
|
||||
/*LTS-START*/
|
||||
// allow setting of max lead time through buffer variable.
|
||||
// max lead time is set in MS, but the variable is in integer seconds for simplicity.
|
||||
if (H.GetVar("buffer") != ""){maxSkipAhead = JSON::Value(H.GetVar("buffer")).asInt() * 1000;}
|
||||
//allow setting of play back rate through buffer variable.
|
||||
//play back rate is set in MS per second, but the variable is a simple multiplier.
|
||||
if (H.GetVar("rate") != ""){
|
||||
long long int multiplier = JSON::Value(H.GetVar("rate")).asInt();
|
||||
if (multiplier){
|
||||
realTime = 1000 / multiplier;
|
||||
}else{
|
||||
realTime = 0;
|
||||
}
|
||||
}
|
||||
if (H.GetHeader("X-Mist-Rate") != ""){
|
||||
long long int multiplier = JSON::Value(H.GetHeader("X-Mist-Rate")).asInt();
|
||||
if (multiplier){
|
||||
realTime = 1000 / multiplier;
|
||||
}else{
|
||||
realTime = 0;
|
||||
}
|
||||
}
|
||||
/*LTS-END*/
|
||||
|
||||
char rangeType = ' ';
|
||||
if (!myMeta.live){
|
||||
if (H.GetHeader("Range") != ""){
|
||||
|
|
|
@ -68,7 +68,7 @@ namespace Mist {
|
|||
return;
|
||||
}
|
||||
char * dataPointer = 0;
|
||||
unsigned int len = 0;
|
||||
size_t len = 0;
|
||||
thisPacket.getString("data", dataPointer, len);
|
||||
H.Chunkify(dataPointer, len, myConn);
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ namespace Mist {
|
|||
JSON::Value jPack;
|
||||
if (myMeta.tracks[thisPacket.getTrackId()].codec == "JSON"){
|
||||
char * dPtr;
|
||||
unsigned int dLen;
|
||||
size_t dLen;
|
||||
thisPacket.getString("data", dPtr, dLen);
|
||||
jPack["data"] = JSON::fromString(dPtr, dLen);
|
||||
jPack["time"] = (long long)thisPacket.getTime();
|
||||
|
|
|
@ -17,7 +17,7 @@ namespace Mist {
|
|||
|
||||
void OutProgressiveMP3::sendNext(){
|
||||
char * dataPointer = 0;
|
||||
unsigned int len = 0;
|
||||
size_t len = 0;
|
||||
thisPacket.getString("data", dataPointer, len);
|
||||
myConn.SendNow(dataPointer, len);
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ namespace Mist {
|
|||
+ 32 //MDHD Box
|
||||
+ 33 + thisTrack.getIdentifier().size() // HDLR Box
|
||||
+ 8 //MINF Box
|
||||
+ 36 //DINF Box
|
||||
+ 44 //DINF Box
|
||||
+ 8; // STBL Box
|
||||
if (thisTrack.firstms != firstms){
|
||||
tmpRes += 12;// EDTS entry extra
|
||||
|
@ -75,6 +75,7 @@ namespace Mist {
|
|||
tmpRes += 20//VMHD Box
|
||||
+ 16 //STSD
|
||||
+ 86 //AVC1
|
||||
+ 16 //PASP
|
||||
+ 8 + thisTrack.init.size();//avcC
|
||||
tmpRes += 16 + (thisTrack.keys.size() * 4);
|
||||
}
|
||||
|
@ -550,7 +551,7 @@ namespace Mist {
|
|||
|
||||
//Obtain a pointer to the data of this packet
|
||||
char * dataPointer = 0;
|
||||
unsigned int len = 0;
|
||||
size_t len = 0;
|
||||
thisPacket.getString("data", dataPointer, len);
|
||||
|
||||
keyPart thisPart = *sortSet.begin();
|
||||
|
|
|
@ -129,7 +129,7 @@ namespace Mist {
|
|||
unsigned int dheader_len = 1;
|
||||
static Util::ResizeablePointer swappy;
|
||||
char * tmpData = 0;//pointer to raw media data
|
||||
unsigned int data_len = 0;//length of processed media data
|
||||
size_t data_len = 0;//length of processed media data
|
||||
thisPacket.getString("data", tmpData, data_len);
|
||||
DTSC::Track & track = myMeta.tracks[thisPacket.getTrackId()];
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace Mist {
|
|||
|
||||
void OutProgressiveSRT::sendNext(){
|
||||
char * dataPointer = 0;
|
||||
unsigned int len = 0;
|
||||
size_t len = 0;
|
||||
thisPacket.getString("data", dataPointer, len);
|
||||
//ignore empty subs
|
||||
if (len == 0 || (len == 1 && dataPointer[0] == ' ')){
|
||||
|
|
|
@ -67,7 +67,7 @@ namespace Mist {
|
|||
firstPack = true;
|
||||
|
||||
char * dataPointer = 0;
|
||||
unsigned int tmpDataLen = 0;
|
||||
size_t tmpDataLen = 0;
|
||||
thisPacket.getString("data", dataPointer, tmpDataLen); //data
|
||||
uint64_t dataLen = tmpDataLen;
|
||||
//apple compatibility timestamp correction
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue