libmist backport from new metadata branch with improved typing and styling

This commit is contained in:
Erik Zandvliet 2018-11-28 12:16:00 +01:00 committed by Thulinma
parent 7a03d3e96c
commit 10fa4b7e7b
88 changed files with 5957 additions and 5757 deletions

View file

@ -385,7 +385,7 @@ int main_loop(int argc, char **argv){
// Check if we have a usable server, if not, print messages with helpful hints
{
std::string web_port = JSON::Value((long long)Controller::conf.getInteger("port")).asString();
std::string web_port = JSON::Value(Controller::conf.getInteger("port")).asString();
// check for username
if (!Controller::Storage.isMember("account") || Controller::Storage["account"].size() < 1){
Controller::Log("CONF",
@ -530,7 +530,6 @@ int main(int argc, char **argv){
Util::Procs::setHandler(); // set child handler
{
struct sigaction new_action;
struct sigaction cur_action;
new_action.sa_sigaction = handleUSR1;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = 0;
@ -547,7 +546,6 @@ int main(int argc, char **argv){
Util::Procs::reaper_thread = 0;
{
struct sigaction new_action;
struct sigaction cur_action;
new_action.sa_sigaction = handleUSR1;
sigemptyset(&new_action.sa_mask);
new_action.sa_flags = 0;
@ -584,7 +582,7 @@ int main(int argc, char **argv){
execvp(myFile.c_str(), argv);
FAIL_MSG("Error restarting: %s", strerror(errno));
}
INFO_MSG("Controller uncleanly shut down! Restarting in %llu...", reTimer);
INFO_MSG("Controller uncleanly shut down! Restarting in %" PRIu64 "...", reTimer);
Util::wait(reTimer);
reTimer += 1000;
}

View file

@ -7,6 +7,7 @@
#include <mist/defines.h>
#include <mist/timing.h>
#include <mist/procs.h>
#include <mist/bitfields.h>
#include "controller_api.h"
#include "controller_storage.h"
#include "controller_streams.h"
@ -190,7 +191,7 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
sent = true;
JSON::Value tmp;
tmp[0u] = "log";
tmp[1u].append((long long)rlxLog.getInt("time", logPos));
tmp[1u].append(rlxLog.getInt("time", logPos));
tmp[1u].append(rlxLog.getPointer("kind", logPos));
tmp[1u].append(rlxLog.getPointer("msg", logPos));
W.sendFrame(tmp.toString());
@ -200,14 +201,14 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
sent = true;
JSON::Value tmp;
tmp[0u] = "access";
tmp[1u].append((long long)rlxAccs.getInt("time", accsPos));
tmp[1u].append(rlxAccs.getInt("time", accsPos));
tmp[1u].append(rlxAccs.getPointer("session", accsPos));
tmp[1u].append(rlxAccs.getPointer("stream", accsPos));
tmp[1u].append(rlxAccs.getPointer("connector", accsPos));
tmp[1u].append(rlxAccs.getPointer("host", accsPos));
tmp[1u].append((long long)rlxAccs.getInt("duration", accsPos));
tmp[1u].append((long long)rlxAccs.getInt("up", accsPos));
tmp[1u].append((long long)rlxAccs.getInt("down", accsPos));
tmp[1u].append(rlxAccs.getInt("duration", accsPos));
tmp[1u].append(rlxAccs.getInt("up", accsPos));
tmp[1u].append(rlxAccs.getInt("down", accsPos));
tmp[1u].append(rlxAccs.getPointer("tags", accsPos));
W.sendFrame(tmp.toString());
accsPos++;
@ -228,10 +229,10 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
JSON::Value tmp;
tmp[0u] = "stream";
tmp[1u].append(strm);
tmp[1u].append((long long)tmpStat.status);
tmp[1u].append((long long)tmpStat.viewers);
tmp[1u].append((long long)tmpStat.inputs);
tmp[1u].append((long long)tmpStat.outputs);
tmp[1u].append(tmpStat.status);
tmp[1u].append(tmpStat.viewers);
tmp[1u].append(tmpStat.inputs);
tmp[1u].append(tmpStat.outputs);
W.sendFrame(tmp.toString());
}
}
@ -241,10 +242,10 @@ void Controller::handleWebSocket(HTTP::Parser & H, Socket::Connection & C){
JSON::Value tmp;
tmp[0u] = "stream";
tmp[1u].append(strm);
tmp[1u].append((long long)0);
tmp[1u].append((long long)0);
tmp[1u].append((long long)0);
tmp[1u].append((long long)0);
tmp[1u].append(0u);
tmp[1u].append(0u);
tmp[1u].append(0u);
tmp[1u].append(0u);
W.sendFrame(tmp.toString());
strmRemove.erase(strm);
lastStrmStat.erase(strm);
@ -710,7 +711,7 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response
/*LTS-END*/
if (!Request.isMember("minimal") || Request.isMember("streams") || Request.isMember("addstream") || Request.isMember("deletestream")){
if (!Request.isMember("streams") && (Request.isMember("addstream") || Request.isMember("deletestream"))){
Response["streams"]["incomplete list"] = 1ll;
Response["streams"]["incomplete list"] = 1u;
if (Request.isMember("addstream")){
jsonForEach(Request["addstream"], jit){
if (Controller::Storage["streams"].isMember(jit.key())){

View file

@ -2,6 +2,7 @@
#include <string.h>
#include <fstream>
#include <set>
#include <mist/defines.h>
#include <mist/config.h>
#include <mist/procs.h>
#include "controller_capabilities.h"
@ -381,23 +382,23 @@ namespace Controller {
}
continue;
}
long long int i;
if (sscanf(line, "MemTotal : %lli kB", &i) == 1){
uint64_t i;
if (sscanf(line, "MemTotal : %" PRIu64 " kB", &i) == 1){
capa["mem"]["total"] = i / 1024;
}
if (sscanf(line, "MemFree : %lli kB", &i) == 1){
if (sscanf(line, "MemFree : %" PRIu64 " kB", &i) == 1){
capa["mem"]["free"] = i / 1024;
}
if (sscanf(line, "SwapTotal : %lli kB", &i) == 1){
if (sscanf(line, "SwapTotal : %" PRIu64 " kB", &i) == 1){
capa["mem"]["swaptotal"] = i / 1024;
}
if (sscanf(line, "SwapFree : %lli kB", &i) == 1){
if (sscanf(line, "SwapFree : %" PRIu64 " kB", &i) == 1){
capa["mem"]["swapfree"] = i / 1024;
}
if (sscanf(line, "Buffers : %lli kB", &i) == 1){
if (sscanf(line, "Buffers : %" PRIu64 " kB", &i) == 1){
bufcache += i / 1024;
}
if (sscanf(line, "Cached : %lli kB", &i) == 1){
if (sscanf(line, "Cached : %" PRIu64 " kB", &i) == 1){
bufcache += i / 1024;
}
}
@ -413,23 +414,23 @@ namespace Controller {
//parse lines here
float onemin, fivemin, fifteenmin;
if (sscanf(line, "%f %f %f", &onemin, &fivemin, &fifteenmin) == 3){
capa["load"]["one"] = (long long int)(onemin * 100);
capa["load"]["five"] = (long long int)(fivemin * 100);
capa["load"]["fifteen"] = (long long int)(fifteenmin * 100);
capa["load"]["one"] = uint64_t(onemin * 100);
capa["load"]["five"] = uint64_t(fivemin * 100);
capa["load"]["fifteen"] = uint64_t(fifteenmin * 100);
}
}
std::ifstream cpustat("/proc/stat");
if (cpustat){
char line[300];
while (cpustat.getline(line, 300)){
static unsigned long long cl_total = 0, cl_idle = 0;
unsigned long long c_user, c_nice, c_syst, c_idle, c_total;
if (sscanf(line, "cpu %Lu %Lu %Lu %Lu", &c_user, &c_nice, &c_syst, &c_idle) == 4){
static uint64_t cl_total = 0, cl_idle = 0;
uint64_t c_user, c_nice, c_syst, c_idle, c_total;
if (sscanf(line, "cpu %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, &c_user, &c_nice, &c_syst, &c_idle) == 4){
c_total = c_user + c_nice + c_syst + c_idle;
if (c_total - cl_total > 0){
capa["cpu_use"] = (long long int)(1000 - ((c_idle - cl_idle) * 1000) / (c_total - cl_total));
capa["cpu_use"] = (1000 - ((c_idle - cl_idle) * 1000) / (c_total - cl_total));
}else{
capa["cpu_use"] = 0ll;
capa["cpu_use"] = 0u;
}
cl_total = c_total;
cl_idle = c_idle;

View file

@ -64,7 +64,10 @@ namespace Controller {
/// Deletes the shared memory page with connector information
/// in preparation of shutdown.
void prepareActiveConnectorsForShutdown(){
IPC::sharedPage f("MstCnns", 4096, true, false);
IPC::sharedPage f("MstCnns", 4096, false, false);
if (f){
f.master = true;
}
}
/// Forgets all active connectors, preventing them from being killed,
@ -104,7 +107,7 @@ namespace Controller {
}else{
if (it.key() == "debug"){
static std::string debugLvlStr;
debugLvlStr = JSON::Value((long long)Util::Config::printDebugLevel).asString();
debugLvlStr = JSON::Value(Util::Config::printDebugLevel).asString();
argarr[argnum++] = (char*)((*it)["option"].asStringRef().c_str());
argarr[argnum++] = (char*)debugLvlStr.c_str();
}

View file

@ -35,7 +35,7 @@ namespace Controller{
updateLicense("&boot=1");
checkLicense();
}else{
lastCheck = std::min(Util::epoch(), currentLicense["valid_from"].asInt());
lastCheck = std::min(Util::epoch(), (uint64_t)currentLicense["valid_from"].asInt());
}
}else{
updateLicense("&boot=1");
@ -46,7 +46,7 @@ namespace Controller{
bool isLicensed(){
uint64_t now = Util::epoch() + timeOffset;
#if DEBUG >= DLVL_DEVEL
INFO_MSG("Verifying license against %llu: %s", now, currentLicense.toString().c_str());
INFO_MSG("Verifying license against %" PRIu64 ": %s", now, currentLicense.toString().c_str());
#endif
//Print messages for user, if any
if (currentLicense.isMember("user_msg") && currentLicense["user_msg"].asStringRef().size()){
@ -104,7 +104,7 @@ namespace Controller{
if (currID){
char aesKey[16];
if (strlen(SUPER_SECRET) >= 32){
parseKey(SUPER_SECRET SUPER_SECRET + 7,aesKey,16);
parseKey((SUPER_SECRET SUPER_SECRET)+7,aesKey,16);
}else{
parseKey("4E56721C67306E1F473156F755FF5570",aesKey,16);
}
@ -128,7 +128,7 @@ namespace Controller{
void readLicense(uint64_t licID, const std::string & input, bool fromServer){
char aesKey[16];
if (strlen(SUPER_SECRET) >= 32){
parseKey(SUPER_SECRET SUPER_SECRET + 7,aesKey,16);
parseKey((SUPER_SECRET SUPER_SECRET)+ 7,aesKey,16);
}else{
parseKey("4E56721C67306E1F473156F755FF5570",aesKey,16);
}
@ -158,10 +158,10 @@ namespace Controller{
uint64_t localTime = Util::epoch();
uint64_t remoteTime = newLicense["time"].asInt();
if (localTime > remoteTime + 60){
WARN_MSG("Your computer clock is %u seconds ahead! Please ensure your computer clock is set correctly.", localTime - remoteTime);
WARN_MSG("Your computer clock is %" PRIu64 " seconds ahead! Please ensure your computer clock is set correctly.", localTime - remoteTime);
}
if (localTime < remoteTime - 60){
WARN_MSG("Your computer clock is %u seconds late! Please ensure your computer clock is set correctly.", remoteTime - localTime);
WARN_MSG("Your computer clock is %" PRIu64 " seconds late! Please ensure your computer clock is set correctly.", remoteTime - localTime);
}
timeOffset = remoteTime - localTime;
@ -177,7 +177,7 @@ namespace Controller{
if (currentLicense["store"].asBool()){
if (Storage["license"].asStringRef() != input){
Storage["license"] = input;
Storage["license_id"] = (long long)licID;
Storage["license_id"] = licID;
INFO_MSG("Stored license for offline use");
}
}

View file

@ -29,7 +29,7 @@ namespace Controller{
pid_t ret = Util::startPush(stream, target);
if (ret){
JSON::Value push;
push.append((long long)ret);
push.append(ret);
push.append(stream);
push.append(originalTarget);
push.append(target);
@ -114,10 +114,10 @@ namespace Controller{
static void readPushList(char * pwo){
activePushes.clear();
pid_t p = Bit::btohl(pwo);
HIGH_MSG("Recovering pushes: %lu", (uint32_t)p);
HIGH_MSG("Recovering pushes: %" PRIu32, (uint32_t)p);
while (p > 1){
JSON::Value push;
push.append((long long)p);
push.append(p);
pwo += 4;
for (uint8_t i = 0; i < 3; ++i){
uint16_t l = Bit::btohs(pwo);
@ -244,13 +244,13 @@ namespace Controller{
startTime = true;
}
if (request.isMember("completetime") && request["completetime"].isInt()){
if (!startTime){newPush.append(0ll);}
if (!startTime){newPush.append(0u);}
newPush.append(request["completetime"]);
}
}
long long epo = Util::epoch();
if (newPush.size() > 3 && newPush[3u].asInt() <= epo){
WARN_MSG("Automatic push not added: removal time is in the past! (%lld <= %lld)", newPush[3u].asInt(), Util::epoch());
WARN_MSG("Automatic push not added: removal time is in the past! (%" PRId64 " <= %" PRIu64 ")", newPush[3u].asInt(), Util::epoch());
return;
}
bool edited = false;

View file

@ -7,6 +7,7 @@
#include <mist/dtsc.h>
#include <mist/procs.h>
#include <mist/stream.h>
#include <mist/bitfields.h>
#include "controller_statistics.h"
#include "controller_limits.h"
#include "controller_push.h"
@ -76,24 +77,24 @@ void Controller::updateBandwidthConfig(){
//For server-wide totals. Local to this file only.
struct streamTotals {
unsigned long long upBytes;
unsigned long long downBytes;
unsigned long long inputs;
unsigned long long outputs;
unsigned long long viewers;
unsigned long long currIns;
unsigned long long currOuts;
unsigned long long currViews;
uint64_t upBytes;
uint64_t downBytes;
uint64_t inputs;
uint64_t outputs;
uint64_t viewers;
uint64_t currIns;
uint64_t currOuts;
uint64_t currViews;
uint8_t status;
};
static std::map<std::string, struct streamTotals> streamStats;
static unsigned long long servUpBytes = 0;
static unsigned long long servDownBytes = 0;
static unsigned long long servUpOtherBytes = 0;
static unsigned long long servDownOtherBytes = 0;
static unsigned long long servInputs = 0;
static unsigned long long servOutputs = 0;
static unsigned long long servViewers = 0;
static uint64_t servUpBytes = 0;
static uint64_t servDownBytes = 0;
static uint64_t servUpOtherBytes = 0;
static uint64_t servDownOtherBytes = 0;
static uint64_t servInputs = 0;
static uint64_t servOutputs = 0;
static uint64_t servViewers = 0;
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
ID = "UNSET";
@ -317,7 +318,7 @@ void Controller::writeSessionCache(){
/// statistics from all connected clients, as well as wipes
/// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
HIGH_MSG("Starting stats thread");
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
statPointer = &statServer;
shmSessions = new IPC::sharedPage(SHM_SESSIONS, SHM_SESSIONS_SIZE, true);
@ -435,7 +436,7 @@ void Controller::SharedMemStats(void * config){
shmSessions->master = false;
}else{/*LTS-START*/
if (Controller::killOnExit){
DEBUG_MSG(DLVL_WARN, "Killing all connected clients to force full shutdown");
WARN_MSG("Killing all connected clients to force full shutdown");
statServer.finishEach();
}
/*LTS-END*/
@ -637,7 +638,7 @@ Controller::sessType Controller::statSession::getSessType(){
}
/// Archives the given connection.
void Controller::statSession::wipeOld(unsigned long long cutOff){
void Controller::statSession::wipeOld(uint64_t cutOff){
if (firstSec > cutOff){
return;
}
@ -675,7 +676,7 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){
}
}
void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){
void Controller::statSession::ping(const Controller::sessIndex & index, uint64_t disconnectPoint){
if (!tracked){return;}
if (lastSec < disconnectPoint){
switch (sessionType){
@ -688,6 +689,8 @@ void Controller::statSession::ping(const Controller::sessIndex & index, unsigned
case SESS_VIEWER:
if (streamStats[index.streamName].currViews){streamStats[index.streamName].currViews--;}
break;
default:
break;
}
uint64_t duration = lastSec - firstActive;
if (duration < 1){duration = 1;}
@ -807,17 +810,17 @@ void Controller::statSession::switchOverTo(statSession & newSess, unsigned long
}
/// Returns the first measured timestamp in this session.
unsigned long long Controller::statSession::getStart(){
uint64_t Controller::statSession::getStart(){
return firstSec;
}
/// Returns the last measured timestamp in this session.
unsigned long long Controller::statSession::getEnd(){
uint64_t Controller::statSession::getEnd(){
return lastSec;
}
/// Returns true if there is data for this session at timestamp t.
bool Controller::statSession::hasDataFor(unsigned long long t){
bool Controller::statSession::hasDataFor(uint64_t t){
if (lastSec < t){return false;}
if (firstSec > t){return false;}
if (oldConns.size()){
@ -826,7 +829,7 @@ bool Controller::statSession::hasDataFor(unsigned long long t){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){return true;}
}
}
@ -842,7 +845,7 @@ bool Controller::statSession::hasData(){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){return true;}
}
}
@ -850,7 +853,7 @@ bool Controller::statSession::hasData(){
}
/// Returns true if this session should count as a viewer on the given timestamp.
bool Controller::statSession::isViewerOn(unsigned long long t){
bool Controller::statSession::isViewerOn(uint64_t t){
return getUp(t) + getDown(t) > COUNTABLE_BYTES;
}
@ -877,8 +880,8 @@ bool Controller::statSession::isViewer(){
}
/// Returns the cumulative connected time for this session at timestamp t.
long long Controller::statSession::getConnTime(unsigned long long t){
long long retVal = 0;
uint64_t Controller::statSession::getConnTime(uint64_t t){
uint64_t retVal = 0;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
@ -887,7 +890,7 @@ long long Controller::statSession::getConnTime(unsigned long long t){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
retVal += it->second.getDataFor(t).time;
}
@ -897,9 +900,9 @@ long long Controller::statSession::getConnTime(unsigned long long t){
}
/// Returns the last requested media timestamp for this session at timestamp t.
long long Controller::statSession::getLastSecond(unsigned long long t){
uint64_t Controller::statSession::getLastSecond(uint64_t t){
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
return it->second.getDataFor(t).lastSecond;
}
@ -916,8 +919,8 @@ long long Controller::statSession::getLastSecond(unsigned long long t){
}
/// Returns the cumulative downloaded bytes for this session at timestamp t.
long long Controller::statSession::getDown(unsigned long long t){
long long retVal = wipedDown;
uint64_t Controller::statSession::getDown(uint64_t t){
uint64_t retVal = wipedDown;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
@ -926,7 +929,7 @@ long long Controller::statSession::getDown(unsigned long long t){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
retVal += it->second.getDataFor(t).down;
}
@ -936,8 +939,8 @@ long long Controller::statSession::getDown(unsigned long long t){
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
long long Controller::statSession::getUp(unsigned long long t){
long long retVal = wipedUp;
uint64_t Controller::statSession::getUp(uint64_t t){
uint64_t retVal = wipedUp;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->hasDataFor(t)){
@ -946,7 +949,7 @@ long long Controller::statSession::getUp(unsigned long long t){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.hasDataFor(t)){
retVal += it->second.getDataFor(t).up;
}
@ -956,8 +959,8 @@ long long Controller::statSession::getUp(unsigned long long t){
}
/// Returns the cumulative downloaded bytes for this session at timestamp t.
long long Controller::statSession::getDown(){
long long retVal = wipedDown;
uint64_t Controller::statSession::getDown(){
uint64_t retVal = wipedDown;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){
@ -966,7 +969,7 @@ long long Controller::statSession::getDown(){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){
retVal += it->second.log.rbegin()->second.down;
}
@ -976,8 +979,8 @@ long long Controller::statSession::getDown(){
}
/// Returns the cumulative uploaded bytes for this session at timestamp t.
long long Controller::statSession::getUp(){
long long retVal = wipedUp;
uint64_t Controller::statSession::getUp(){
uint64_t retVal = wipedUp;
if (oldConns.size()){
for (std::deque<statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); ++it){
if (it->log.size()){
@ -986,7 +989,7 @@ long long Controller::statSession::getUp(){
}
}
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
for (std::map<uint64_t, statStorage>::iterator it = curConns.begin(); it != curConns.end(); ++it){
if (it->second.log.size()){
retVal += it->second.log.rbegin()->second.up;
}
@ -996,35 +999,31 @@ long long Controller::statSession::getUp(){
}
/// Returns the cumulative downloaded bytes per second for this session at timestamp t.
long long Controller::statSession::getBpsDown(unsigned long long t){
unsigned long long aTime = t - 5;
uint64_t Controller::statSession::getBpsDown(uint64_t t){
uint64_t aTime = t - 5;
if (aTime < firstSec){
aTime = firstSec;
}
long long valA = getDown(aTime);
long long valB = getDown(t);
if (t > aTime){
//INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, (valB - valA) / (t - aTime));
return (valB - valA) / (t - aTime);
}else{
//INFO_MSG("Saying the speed from time %lli to %lli (being %lli - %lli) is %lli.", aTime, t, valB, valA, 0);
if (t <= aTime){
return 0;
}
uint64_t valA = getDown(aTime);
uint64_t valB = getDown(t);
return (valB - valA) / (t - aTime);
}
/// Returns the cumulative uploaded bytes per second for this session at timestamp t.
long long Controller::statSession::getBpsUp(unsigned long long t){
unsigned long long aTime = t - 5;
uint64_t Controller::statSession::getBpsUp(uint64_t t){
uint64_t aTime = t - 5;
if (aTime < firstSec){
aTime = firstSec;
}
long long valA = getUp(aTime);
long long valB = getUp(t);
if (t > aTime){
return (valB - valA) / (t - aTime);
}else{
if (t <= aTime){
return 0;
}
uint64_t valA = getUp(aTime);
uint64_t valB = getUp(t);
return (valB - valA) / (t - aTime);
}
/// Returns true if there is data available for timestamp t.
@ -1068,7 +1067,7 @@ void Controller::statStorage::update(IPC::statExchange & data) {
/// This function is called by the shared memory page that holds statistics.
/// It updates the internally saved statistics data, moving across sessions or archiving when necessary.
void Controller::parseStatistics(char * data, size_t len, unsigned int id){
void Controller::parseStatistics(char * data, size_t len, uint32_t id){
//retrieve stats data
IPC::statExchange tmpEx(data);
//calculate the current session index, store as idx.
@ -1076,9 +1075,9 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
//if the connection was already indexed and it has changed, move it
if (connToSession.count(id) && connToSession[id] != idx){
if (sessions[connToSession[id]].getSessType() != SESS_UNSET){
INFO_MSG("Switching connection %lu from active session %s over to %s", id, connToSession[id].toStr().c_str(), idx.toStr().c_str());
INFO_MSG("Switching connection %" PRIu32 " from active session %s over to %s", id, connToSession[id].toStr().c_str(), idx.toStr().c_str());
}else{
INFO_MSG("Switching connection %lu from inactive session %s over to %s", id, connToSession[id].toStr().c_str(), idx.toStr().c_str());
INFO_MSG("Switching connection %" PRIu32 " from inactive session %s over to %s", id, connToSession[id].toStr().c_str(), idx.toStr().c_str());
}
sessions[connToSession[id]].switchOverTo(sessions[idx], id);
if (!sessions[connToSession[id]].hasData()){
@ -1086,7 +1085,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
}
}
if (!connToSession.count(id)){
INSANE_MSG("New connection: %lu as %s", id, idx.toStr().c_str());
INSANE_MSG("New connection: %" PRIu32 " as %s", id, idx.toStr().c_str());
}
//store the index for later comparison
connToSession[id] = idx;
@ -1096,7 +1095,7 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
char counter = (*(data - 1)) & 0x7F;
if (counter == 126 || counter == 127){
//the data is no longer valid - connection has gone away, store for later
INSANE_MSG("Ended connection: %lu as %s", id, idx.toStr().c_str());
INSANE_MSG("Ended connection: %" PRIu32 " as %s", id, idx.toStr().c_str());
sessions[idx].finish(id);
connToSession.erase(id);
}else{
@ -1157,7 +1156,7 @@ bool Controller::hasViewers(std::string streamName){
void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//first, figure out the timestamp wanted
long long int reqTime = 0;
uint64_t reqTime = 0;
if (req.isMember("time")){
reqTime = req["time"].asInt();
}
@ -1233,7 +1232,7 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
if (fields & STAT_CLI_UP){d.append(it->second.getUp(time));}
if (fields & STAT_CLI_BPS_DOWN){d.append(it->second.getBpsDown(time));}
if (fields & STAT_CLI_BPS_UP){d.append(it->second.getBpsUp(time));}
if (fields & STAT_CLI_CRC){d.append((long long)it->first.crc);}
if (fields & STAT_CLI_CRC){d.append(it->first.crc);}
rep["data"].append(d);
}
}
@ -1311,7 +1310,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){
rep[*it].null();
jsonForEach(req, j){
if (j->asStringRef() == "clients"){
rep[*it].append((long long)clients[*it]);
rep[*it].append(clients[*it]);
}
if (j->asStringRef() == "lastms"){
char pageId[NAME_BUFFER_SIZE];
@ -1335,7 +1334,7 @@ void Controller::fillActive(JSON::Value & req, JSON::Value & rep, bool onlyNow){
rep[*it].append(lms);
metaLocker.post();
}else{
rep[*it].append(-1ll);
rep[*it].append(-1);
}
}
}
@ -1355,20 +1354,21 @@ class totalsData {
downbps = 0;
upbps = 0;
}
void add(unsigned int down, unsigned int up, Controller::sessType sT){
void add(uint64_t down, uint64_t up, Controller::sessType sT){
switch (sT){
case Controller::SESS_VIEWER: clients++; break;
case Controller::SESS_INPUT: inputs++; break;
case Controller::SESS_OUTPUT: outputs++; break;
default: break;
}
downbps += down;
upbps += up;
}
long long clients;
long long inputs;
long long outputs;
long long downbps;
long long upbps;
uint64_t clients;
uint64_t inputs;
uint64_t outputs;
uint64_t downbps;
uint64_t upbps;
};
/// This takes a "totals" request, and fills in the response data.
@ -1430,7 +1430,7 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");}
if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");}
//start data collection
std::map<long long unsigned int, totalsData> totalsCount;
std::map<uint64_t, totalsData> totalsCount;
//loop over all sessions
/// \todo Make the interval configurable instead of 1 second
if (sessions.size()){
@ -1455,13 +1455,13 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
return;
}
//yay! We have data!
rep["start"] = (long long)totalsCount.begin()->first;
rep["end"] = (long long)totalsCount.rbegin()->first;
rep["start"] = totalsCount.begin()->first;
rep["end"] = totalsCount.rbegin()->first;
rep["data"].null();
rep["interval"].null();
long long prevT = 0;
uint64_t prevT = 0;
JSON::Value i;
for (std::map<long long unsigned int, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
for (std::map<uint64_t, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
JSON::Value d;
if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);}
if (fields & STAT_TOT_INPUTS){d.append(it->second.inputs);}
@ -1471,13 +1471,13 @@ void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
rep["data"].append(d);
if (prevT){
if (i.size() < 2){
i.append(1ll);
i.append((long long)(it->first - prevT));
i.append(1u);
i.append(it->first - prevT);
}else{
if (i[1u].asInt() != (long long)(it->first - prevT)){
if (i[1u].asInt() != it->first - prevT){
rep["interval"].append(i);
i[0u] = 1ll;
i[1u] = (long long)(it->first - prevT);
i[0u] = 1u;
i[1u] = it->first - prevT;
}else{
i[0u] = i[0u].asInt() + 1;
}
@ -1510,9 +1510,9 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
//Collect core server stats
long long int cpu_use = 0;
long long int mem_total = 0, mem_free = 0, mem_bufcache = 0;
long long int bw_up_total = 0, bw_down_total = 0;
uint64_t cpu_use = 0;
uint64_t mem_total = 0, mem_free = 0, mem_bufcache = 0;
uint64_t bw_up_total = 0, bw_down_total = 0;
{
std::ifstream cpustat("/proc/stat");
if (cpustat){
@ -1576,7 +1576,7 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
}
}
}
long long shm_total = 0, shm_free = 0;
uint64_t shm_total = 0, shm_free = 0;
#if !defined(__CYGWIN__) && !defined(_WIN32)
{
struct statvfs shmd;
@ -1698,7 +1698,7 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
resp["mem_used"] = (mem_total - mem_free - mem_bufcache);
resp["shm_total"] = shm_total;
resp["shm_used"] = (shm_total - shm_free);
resp["logs"] = (long long)Controller::logCounter;
resp["logs"] = Controller::logCounter;
{//Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::mutex> guard(statsMutex);
//collect the data first
@ -1732,34 +1732,34 @@ void Controller::handlePrometheus(HTTP::Parser & H, Socket::Connection & conn, i
}
}
resp["curr"].append((long long)totViewers);
resp["curr"].append((long long)totInputs);
resp["curr"].append((long long)totOutputs);
resp["curr"].append((long long)sessions.size());
resp["tot"].append((long long)servViewers);
resp["tot"].append((long long)servInputs);
resp["tot"].append((long long)servOutputs);
resp["st"].append((long long)bw_up_total);
resp["st"].append((long long)bw_down_total);
resp["bw"].append((long long)servUpBytes);
resp["bw"].append((long long)servDownBytes);
resp["bwlimit"] = (long long)bwLimit;
resp["obw"].append((long long)servUpOtherBytes);
resp["obw"].append((long long)servDownOtherBytes);
resp["curr"].append(totViewers);
resp["curr"].append(totInputs);
resp["curr"].append(totOutputs);
resp["curr"].append(sessions.size());
resp["tot"].append(servViewers);
resp["tot"].append(servInputs);
resp["tot"].append(servOutputs);
resp["st"].append(bw_up_total);
resp["st"].append(bw_down_total);
resp["bw"].append(servUpBytes);
resp["bw"].append(servDownBytes);
resp["bwlimit"] = bwLimit;
resp["obw"].append(servUpOtherBytes);
resp["obw"].append(servDownOtherBytes);
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
resp["streams"][it->first]["tot"].append((long long)it->second.viewers);
resp["streams"][it->first]["tot"].append((long long)it->second.inputs);
resp["streams"][it->first]["tot"].append((long long)it->second.outputs);
resp["streams"][it->first]["bw"].append((long long)it->second.upBytes);
resp["streams"][it->first]["bw"].append((long long)it->second.downBytes);
resp["streams"][it->first]["curr"].append((long long)it->second.currViews);
resp["streams"][it->first]["curr"].append((long long)it->second.currIns);
resp["streams"][it->first]["curr"].append((long long)it->second.currOuts);
resp["streams"][it->first]["tot"].append(it->second.viewers);
resp["streams"][it->first]["tot"].append(it->second.inputs);
resp["streams"][it->first]["tot"].append(it->second.outputs);
resp["streams"][it->first]["bw"].append(it->second.upBytes);
resp["streams"][it->first]["bw"].append(it->second.downBytes);
resp["streams"][it->first]["curr"].append(it->second.currViews);
resp["streams"][it->first]["curr"].append(it->second.currIns);
resp["streams"][it->first]["curr"].append(it->second.currOuts);
}
for (std::map<std::string, uint32_t>::iterator it = outputs.begin(); it != outputs.end(); ++it){
resp["outputs"][it->first] = (long long)it->second;
resp["outputs"][it->first] = it->second;
}
}

View file

@ -26,10 +26,10 @@ namespace Controller {
void updateBandwidthConfig();
struct statLog {
long time;
long lastSecond;
long long down;
long long up;
uint64_t time;
uint64_t lastSecond;
uint64_t down;
uint64_t up;
};
enum sessType {
@ -75,10 +75,10 @@ namespace Controller {
class statSession {
private:
uint64_t firstActive;
unsigned long long firstSec;
unsigned long long lastSec;
unsigned long long wipedUp;
unsigned long long wipedDown;
uint64_t firstSec;
uint64_t lastSec;
uint64_t wipedUp;
uint64_t wipedDown;
std::deque<statStorage> oldConns;
sessType sessionType;
bool tracked;
@ -88,30 +88,30 @@ namespace Controller {
uint32_t invalidate();
uint32_t kill();
char sync;
std::map<unsigned long, statStorage> curConns;
std::map<uint64_t, statStorage> curConns;
std::set<std::string> tags;
sessType getSessType();
void wipeOld(unsigned long long);
void finish(unsigned long index);
void switchOverTo(statSession & newSess, unsigned long index);
void update(unsigned long index, IPC::statExchange & data);
void ping(const sessIndex & index, unsigned long long disconnectPoint);
unsigned long long getStart();
unsigned long long getEnd();
bool isViewerOn(unsigned long long time);
void wipeOld(uint64_t);
void finish(uint64_t index);
void switchOverTo(statSession & newSess, uint64_t index);
void update(uint64_t index, IPC::statExchange & data);
void ping(const sessIndex & index, uint64_t disconnectPoint);
uint64_t getStart();
uint64_t getEnd();
bool isViewerOn(uint64_t time);
bool isViewer();
bool hasDataFor(unsigned long long time);
bool hasDataFor(uint64_t time);
bool hasData();
long long getConnTime(unsigned long long time);
long long getLastSecond(unsigned long long time);
long long getDown(unsigned long long time);
long long getUp();
long long getDown();
long long getUp(unsigned long long time);
long long getBpsDown(unsigned long long time);
long long getBpsUp(unsigned long long time);
long long getBpsDown(unsigned long long start, unsigned long long end);
long long getBpsUp(unsigned long long start, unsigned long long end);
uint64_t getConnTime(uint64_t time);
uint64_t getLastSecond(uint64_t time);
uint64_t getDown(uint64_t time);
uint64_t getUp();
uint64_t getDown();
uint64_t getUp(uint64_t time);
uint64_t getBpsDown(uint64_t time);
uint64_t getBpsUp(uint64_t time);
uint64_t getBpsDown(uint64_t start, uint64_t end);
uint64_t getBpsUp(uint64_t start, uint64_t end);
};

View file

@ -19,7 +19,7 @@ namespace Controller{
JSON::Value Storage; ///< Global storage of data.
tthread::mutex configMutex;
tthread::mutex logMutex;
unsigned long long logCounter = 0;
uint64_t logCounter = 0;
bool configChanged = false;
bool restarting = false;
bool isTerminal = false;
@ -54,7 +54,7 @@ namespace Controller{
tthread::lock_guard<tthread::mutex> guard(logMutex);
JSON::Value m;
uint64_t logTime = Util::epoch();
m.append((long long)logTime);
m.append(logTime);
m.append(kind);
m.append(message);
Storage["log"].append(m);

View file

@ -16,7 +16,7 @@ namespace Controller {
extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
extern bool isTerminal;///< True if connected to a terminal and not a log file.
extern bool isColorized;///< True if we colorize the output
extern unsigned long long logCounter; ///<Count of logged messages since boot
extern uint64_t logCounter; ///<Count of logged messages since boot
Util::RelAccX * logAccessor();
Util::RelAccX * accesslogAccessor();

View file

@ -138,7 +138,6 @@ namespace Controller {
///\param data The stream configuration for the server.
///\returns True if the server status changed
bool CheckAllStreams(JSON::Value & data){
long long int currTime = Util::epoch();
jsonForEach(data, jit) {
checkStream(jit.key(), (*jit));
}

View file

@ -87,7 +87,7 @@ namespace Controller{
void insertUpdateInfo(JSON::Value &ret){
tthread::lock_guard<tthread::mutex> guard(updaterMutex);
ret = updates;
if (updatePerc){ret["progress"] = (long long)updatePerc;}
if (updatePerc){ret["progress"] = (uint16_t)updatePerc;}
}
/// Downloads the latest details on updates

View file

@ -34,7 +34,7 @@ void Controller::uplinkConnection(void * np) {
return;
}
unsigned long long lastSend = Util::epoch() - 5;
uint64_t lastSend = Util::epoch() - 5;
Socket::Connection uplink;
while (Controller::conf.is_active) {
if (!uplink) {
@ -98,7 +98,7 @@ void Controller::uplinkConnection(void * np) {
}
JSON::Value totalsRequest;
Controller::fillClients(totalsRequest, data["clients"]);
totalsRequest["start"] = (long long)lastSend;
totalsRequest["start"] = lastSend;
Controller::fillTotals(totalsRequest, data["totals"]);
data["streams"] = Controller::Storage["streams"];
jsonForEach(data["streams"], it){