Shared memory rewrite

This commit is contained in:
Thulinma 2014-04-04 19:50:40 +02:00
parent afcddbfca6
commit cd2fe225c5
81 changed files with 7775 additions and 5411 deletions

View file

@ -20,8 +20,14 @@
#include "controller_connectors.h"
#include "controller_streams.h"
#include "controller_capabilities.h"
#include "controller_statistics.h"
#include "server.html.h"
#include <mist/tinythread.h>
#include <mist/shared_memory.h>
#define UPLINK_INTERVAL 30
#ifndef COMPILED_USERNAME
@ -31,7 +37,8 @@
///\brief Holds everything unique to the controller.
namespace Controller {
Util::Config conf;
Secure::Auth keychecker; ///< Checks key authorization.
///\brief A class storing information about a connected user.
@ -133,42 +140,23 @@ namespace Controller {
out = in;
}
///\brief Parse received statistics.
///\param stats The statistics to be parsed.
void CheckStats(JSON::Value & stats){
long long int currTime = Util::epoch();
for (JSON::ObjIter jit = stats.ObjBegin(); jit != stats.ObjEnd(); jit++){
if (currTime - lastBuffer[jit->first] > 120){
stats.removeMember(jit->first);
return;
}else{
if (jit->second.isMember("curr") && jit->second["curr"].size() > 0){
for (JSON::ObjIter u_it = jit->second["curr"].ObjBegin(); u_it != jit->second["curr"].ObjEnd(); ++u_it){
if (u_it->second.isMember("now") && u_it->second["now"].asInt() < currTime - 3){
jit->second["log"].append(u_it->second);
jit->second["curr"].removeMember(u_it->first);
if ( !jit->second["curr"].size()){
break;
}
u_it = jit->second["curr"].ObjBegin();
}
}
}
}
}
}
} //Controller namespace
/// the following function is a simple check if the user wants to proceed to fix (y), ignore (n) or abort on (a) a question
char yna(std::string user_input){
if(user_input == "y" || user_input == "Y"){
return 'y';
}else if(user_input == "n" || user_input == "N"){
return 'n';
}else if(user_input == "a" || user_input == "A"){
return 'a';
}else{
return 'x';//when no valid option is found, yna returns x
char yna(std::string & user_input){
switch (user_input[0]){
case 'y': case 'Y':
return 'y';
break;
case 'n': case 'N':
return 'n';
break;
case 'a': case 'A':
return 'a';
break;
default:
return 'x';
break;
}
}
@ -210,37 +198,37 @@ int main(int argc, char ** argv){
if ( !stored_user["default"]){
stored_user["default"] = "root";
}
Util::Config conf = Util::Config(argv[0], PACKAGE_VERSION " / " RELEASE);
conf.addOption("listen_port", stored_port);
conf.addOption("listen_interface", stored_interface);
conf.addOption("username", stored_user);
conf.addOption("daemonize",
Controller::conf = Util::Config(argv[0], PACKAGE_VERSION " / " RELEASE);
Controller::conf.addOption("listen_port", stored_port);
Controller::conf.addOption("listen_interface", stored_interface);
Controller::conf.addOption("username", stored_user);
Controller::conf.addOption("daemonize",
JSON::fromString(
"{\"long\":\"daemon\", \"short\":\"d\", \"default\":0, \"long_off\":\"nodaemon\", \"short_off\":\"n\", \"help\":\"Turns deamon mode on (-d) or off (-n). -d runs quietly in background, -n (default) enables verbose in foreground.\"}"));
conf.addOption("account",
Controller::conf.addOption("account",
JSON::fromString(
"{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}"));
conf.addOption("logfile",
Controller::conf.addOption("logfile",
JSON::fromString(
"{\"long\":\"logfile\", \"short\":\"L\", \"arg\":\"string\" \"default\":\"\",\"help\":\"Redirect all standard output to a log file, provided with an argument\"}"));
conf.addOption("configFile",
Controller::conf.addOption("configFile",
JSON::fromString(
"{\"long\":\"config\", \"short\":\"c\", \"arg\":\"string\" \"default\":\"config.json\", \"help\":\"Specify a config file other than default.\"}"));
conf.addOption("uplink",
Controller::conf.addOption("uplink",
JSON::fromString(
"{\"default\":\"\", \"arg\":\"string\", \"help\":\"MistSteward uplink host and port.\", \"short\":\"U\", \"long\":\"uplink\"}"));
conf.addOption("uplink-name",
Controller::conf.addOption("uplink-name",
JSON::fromString(
"{\"default\":\"" COMPILED_USERNAME "\", \"arg\":\"string\", \"help\":\"MistSteward uplink username.\", \"short\":\"N\", \"long\":\"uplink-name\"}"));
conf.addOption("uplink-pass",
Controller::conf.addOption("uplink-pass",
JSON::fromString(
"{\"default\":\"" COMPILED_PASSWORD "\", \"arg\":\"string\", \"help\":\"MistSteward uplink password.\", \"short\":\"P\", \"long\":\"uplink-pass\"}"));
conf.parseArgs(argc, argv);
if(conf.getString("logfile")!= ""){
Controller::conf.parseArgs(argc, argv);
if(Controller::conf.getString("logfile")!= ""){
//open logfile, dup stdout to logfile
int output = open(conf.getString("logfile").c_str(),O_APPEND|O_CREAT|O_WRONLY,S_IRWXU);
int output = open(Controller::conf.getString("logfile").c_str(),O_APPEND|O_CREAT|O_WRONLY,S_IRWXU);
if(output < 0){
DEBUG_MSG(DLVL_ERROR, "Could not redirect output to %s: %s",conf.getString("logfile").c_str(),strerror(errno));
DEBUG_MSG(DLVL_ERROR, "Could not redirect output to %s: %s",Controller::conf.getString("logfile").c_str(),strerror(errno));
return 7;
}else{
dup2(output,STDOUT_FILENO);
@ -255,27 +243,25 @@ int main(int argc, char ** argv){
}
}
//Input custom config here
Controller::Storage = JSON::fromFile(conf.getString("configFile"));
Controller::Storage = JSON::fromFile(Controller::conf.getString("configFile"));
//check for port, interface and username in arguments
//if they are not there, take them from config file, if there
if (conf.getOption("listen_port", true).size() <= 1){
if (Controller::conf.getOption("listen_port", true).size() <= 1){
if (Controller::Storage["config"]["controller"]["port"]){
conf.getOption("listen_port") = Controller::Storage["config"]["controller"]["port"];
Controller::conf.getOption("listen_port") = Controller::Storage["config"]["controller"]["port"];
}
}
if (conf.getOption("listen_interface", true).size() <= 1){
if (Controller::conf.getOption("listen_interface", true).size() <= 1){
if (Controller::Storage["config"]["controller"]["interface"]){
conf.getOption("listen_interface") = Controller::Storage["config"]["controller"]["interface"];
Controller::conf.getOption("listen_interface") = Controller::Storage["config"]["controller"]["interface"];
}
}
if (conf.getOption("username", true).size() <= 1){
if (Controller::conf.getOption("username", true).size() <= 1){
if (Controller::Storage["config"]["controller"]["username"]){
conf.getOption("username") = Controller::Storage["config"]["controller"]["username"];
Controller::conf.getOption("username") = Controller::Storage["config"]["controller"]["username"];
}
}
JSON::Value capabilities;
//list available protocols and report about them
std::deque<std::string> execs;
@ -284,6 +270,10 @@ int main(int argc, char ** argv){
char const * conn_args[] = {0, "-j", 0};
for (std::deque<std::string>::iterator it = execs.begin(); it != execs.end(); it++){
if ((*it).substr(0, 8) == "MistConn"){
//skip if an MistOut already existed - MistOut takes precedence!
if (capabilities["connectors"].isMember((*it).substr(8))){
continue;
}
arg_one = Util::getMyPath() + (*it);
conn_args[0] = arg_one.c_str();
capabilities["connectors"][(*it).substr(8)] = JSON::fromString(Util::Procs::getOutputOf((char**)conn_args));
@ -291,9 +281,17 @@ int main(int argc, char ** argv){
capabilities["connectors"].removeMember((*it).substr(8));
}
}
if ((*it).substr(0, 7) == "MistOut"){
arg_one = Util::getMyPath() + (*it);
conn_args[0] = arg_one.c_str();
capabilities["connectors"][(*it).substr(7)] = JSON::fromString(Util::Procs::getOutputOf((char**)conn_args));
if (capabilities["connectors"][(*it).substr(7)].size() < 1){
capabilities["connectors"].removeMember((*it).substr(7));
}
}
}
createAccount(conf.getString("account"));
createAccount(Controller::conf.getString("account"));
/// User friendliness input added at this line
if (isatty(fileno(stdin))){
@ -340,11 +338,11 @@ int main(int argc, char ** argv){
}
//check for streams
if ( !Controller::Storage.isMember("streams") || Controller::Storage["streams"].size() < 1){
std::cerr << "No streams configured, remember to set up streams through local settings page on port " << conf.getInteger("listen_port") << " or using the API." << std::endl;
std::cerr << "No streams configured, remember to set up streams through local settings page on port " << Controller::conf.getInteger("listen_port") << " or using the API." << std::endl;
}
}
std::string uplink_addr = conf.getString("uplink");
std::string uplink_addr = Controller::conf.getString("uplink");
std::string uplink_host = "";
int uplink_port = 0;
if (uplink_addr.size() > 0){
@ -359,7 +357,7 @@ int main(int argc, char ** argv){
time_t lastuplink = 0;
time_t processchecker = 0;
Socket::Server API_Socket = Socket::Server(conf.getInteger("listen_port"), conf.getString("listen_interface"), true);
Socket::Server API_Socket = Socket::Server(Controller::conf.getInteger("listen_port"), Controller::conf.getString("listen_interface"), true);
Socket::Server Stats_Socket = Socket::Server(Util::getTmpFolder() + "statistics", true);
Socket::Connection Incoming;
std::vector<Controller::ConnectedUser> users;
@ -369,19 +367,21 @@ int main(int argc, char ** argv){
std::string jsonp;
Controller::ConnectedUser * uplink = 0;
Controller::Log("CONF", "Controller started");
conf.activate();
Controller::conf.activate();
//Create a converter class and automatically load in all encoders.
Converter::Converter myConverter;
while (API_Socket.connected() && conf.is_active){
tthread::thread statsThread(Controller::SharedMemStats, &Controller::conf);
while (API_Socket.connected() && Controller::conf.is_active){
Util::sleep(10);//sleep for 10 ms - prevents 100% CPU time
if (Util::epoch() - processchecker > 10){
if (Util::epoch() - processchecker > 5){
processchecker = Util::epoch();
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities);
Controller::CheckAllStreams(Controller::Storage["streams"]);
Controller::CheckStats(Controller::Storage["statistics"]);
myConverter.updateStatus();
}
if (uplink_port && Util::epoch() - lastuplink > UPLINK_INTERVAL){
@ -414,7 +414,8 @@ int main(int argc, char ** argv){
Response["config"] = Controller::Storage["config"];
Response["streams"] = Controller::Storage["streams"];
Response["log"] = Controller::Storage["log"];
Response["statistics"] = Controller::Storage["statistics"];
/// \todo Put this back in, someway, somehow...
//Response["statistics"] = Controller::Storage["statistics"];
Response["now"] = (unsigned int)lastuplink;
uplink->H.Clean();
uplink->H.SetBody("command=" + HTTP::Parser::urlencode(Response.toString()));
@ -431,93 +432,6 @@ int main(int argc, char ** argv){
if (Incoming.connected()){
users.push_back((Controller::ConnectedUser)Incoming);
}
Incoming = Stats_Socket.accept(true);
if (Incoming.connected()){
buffers.push_back(Incoming);
}
if (buffers.size() > 0){
for (std::vector<Socket::Connection>::iterator it = buffers.begin(); it != buffers.end(); it++){
if ( !it->connected()){
it->close();
buffers.erase(it);
break;
}
if (it->spool()){
while (it->Received().size()){
it->Received().get().resize(it->Received().get().size() - 1);
Request = JSON::fromString(it->Received().get());
it->Received().get().clear();
if (Request.isMember("buffer")){
std::string thisbuffer = Request["buffer"];
Controller::lastBuffer[thisbuffer] = Util::epoch();
//if metadata is available, store it
if (Request.isMember("meta")){
Controller::Storage["streams"][thisbuffer]["meta"] = Request["meta"];
}
if (Controller::Storage["streams"][thisbuffer].isMember("updated")){
Controller::Storage["streams"][thisbuffer].removeMember("updated");
if (Controller::Storage["streams"][thisbuffer].isMember("cut")){
it->SendNow("c"+Controller::Storage["streams"][thisbuffer]["cut"].asString()+"\n");
}else{
it->SendNow("c0\n");
}
if (Controller::Storage["streams"][thisbuffer].isMember("DVR")){
it->SendNow("d"+Controller::Storage["streams"][thisbuffer]["DVR"].asString()+"\n");
}else{
it->SendNow("d20000\n");
}
if (Controller::Storage["streams"][thisbuffer].isMember("source") && Controller::Storage["streams"][thisbuffer]["source"].asStringRef().substr(0, 7) == "push://"){
it->SendNow("s"+Controller::Storage["streams"][thisbuffer]["source"].asStringRef().substr(7)+"\n");
}else{
it->SendNow("s127.0.01\n");
}
}
if (Request.isMember("totals")){
Controller::Storage["statistics"][thisbuffer]["curr"] = Request["curr"];
std::string nowstr = Request["totals"]["now"].asString();
Controller::Storage["statistics"][thisbuffer]["totals"][nowstr] = Request["totals"];
Controller::Storage["statistics"][thisbuffer]["totals"][nowstr].removeMember("now");
Controller::Storage["statistics"][thisbuffer]["totals"].shrink(600); //limit to 10 minutes of data
for (JSON::ObjIter jit = Request["log"].ObjBegin(); jit != Request["log"].ObjEnd(); jit++){
Controller::Storage["statistics"][thisbuffer]["log"].append(jit->second);
Controller::Storage["statistics"][thisbuffer]["log"].shrink(1000); //limit to 1000 users per buffer
}
}
}
if (Request.isMember("vod")){
std::string thisfile = Request["vod"]["filename"];
for (JSON::ObjIter oit = Controller::Storage["streams"].ObjBegin(); oit != Controller::Storage["streams"].ObjEnd(); ++oit){
if ((oit->second.isMember("source") && oit->second["source"].asString() == thisfile)
|| (oit->second.isMember("channel") && oit->second["channel"]["URL"].asString() == thisfile)){
Controller::lastBuffer[oit->first] = Util::epoch();
if (Request["vod"].isMember("meta")){
Controller::Storage["streams"][oit->first]["meta"] = Request["vod"]["meta"];
}
JSON::Value sockit = (long long int)it->getSocket();
std::string nowstr = Request["vod"]["now"].asString();
Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()] = Request["vod"];
Controller::Storage["statistics"][oit->first]["curr"][sockit.asString()].removeMember("meta");
JSON::Value nowtotal;
for (JSON::ObjIter u_it = Controller::Storage["statistics"][oit->first]["curr"].ObjBegin();
u_it != Controller::Storage["statistics"][oit->first]["curr"].ObjEnd(); ++u_it){
nowtotal["up"] = nowtotal["up"].asInt() + u_it->second["up"].asInt();
nowtotal["down"] = nowtotal["down"].asInt() + u_it->second["down"].asInt();
nowtotal["count"] = nowtotal["count"].asInt() + 1;
}
Controller::Storage["statistics"][oit->first]["totals"][nowstr] = nowtotal;
Controller::Storage["statistics"][oit->first]["totals"].shrink(600);
}
}
}
if (Request.isMember("ctrl_log") && Request["ctrl_log"].size() > 0){
for (JSON::ArrIter it = Request["ctrl_log"].ArrBegin(); it != Request["ctrl_log"].ArrEnd(); it++){
Controller::Log((*it)[0u], (*it)[1u]);
}
}
}
}
}
}
if (users.size() > 0){
for (std::vector<Controller::ConnectedUser>::iterator it = users.begin(); it != users.end(); it++){
if ( !it->C.connected() || it->logins > 3){
@ -543,12 +457,13 @@ int main(int argc, char ** argv){
Response["config"] = Controller::Storage["config"];
Response["streams"] = Controller::Storage["streams"];
Response["log"] = Controller::Storage["log"];
Response["statistics"] = Controller::Storage["statistics"];
Response["authorize"]["username"] = conf.getString("uplink-name");
/// \todo Put this back in, someway, somehow...
//Response["statistics"] = Controller::Storage["statistics"];
Response["authorize"]["username"] = Controller::conf.getString("uplink-name");
Controller::checkCapable(capabilities);
Response["capabilities"] = capabilities;
Controller::Log("UPLK", "Responding to login challenge: " + Request["authorize"]["challenge"].asString());
Response["authorize"]["password"] = Secure::md5(conf.getString("uplink-pass") + Request["authorize"]["challenge"].asString());
Response["authorize"]["password"] = Secure::md5(Controller::conf.getString("uplink-pass") + Request["authorize"]["challenge"].asString());
it->H.Clean();
it->H.SetBody("command=" + HTTP::Parser::urlencode(Response.toString()));
it->H.BuildRequest();
@ -568,7 +483,6 @@ int main(int argc, char ** argv){
}
if (Request.isMember("clearstatlogs")){
Controller::Storage["log"].null();
Controller::Storage["statistics"].null();
}
}
}else{
@ -578,8 +492,9 @@ int main(int argc, char ** argv){
it->H.SetHeader("Content-Type", "text/html");
it->H.SetHeader("X-Info", "To force an API response, request the file /api");
it->H.SetHeader("Server", "mistserver/" PACKAGE_VERSION "/" + Util::Config::libver + "/" RELEASE);
it->H.SetBody(std::string((char*)server_html, (size_t)server_html_len));
it->C.Send(it->H.BuildResponse("200", "OK"));
it->H.SetHeader("Content-Length", server_html_len);
it->H.SendResponse("200", "OK", it->C);
it->C.SendNow(server_html, server_html_len);
it->H.Clean();
}else{
Authorize(Request, Response, ( *it));
@ -622,10 +537,10 @@ int main(int argc, char ** argv){
}
}
if (Request.isMember("save")){
if( Controller::WriteFile(conf.getString("configFile"), Controller::Storage.toString())){
if( Controller::WriteFile(Controller::conf.getString("configFile"), Controller::Storage.toString())){
Controller::Log("CONF", "Config written to file on request through API");
}else{
Controller::Log("ERROR", "Config " + conf.getString("configFile") + " could not be written");
Controller::Log("ERROR", "Config " + Controller::conf.getString("configFile") + " could not be written");
}
}
//sent current configuration, no matter if it was changed or not
@ -640,11 +555,15 @@ int main(int argc, char ** argv){
}
//sent any available logs and statistics
Response["log"] = Controller::Storage["log"];
Response["statistics"] = Controller::Storage["statistics"];
//clear log and statistics if requested
if (Request.isMember("clearstatlogs")){
Controller::Storage["log"].null();
Controller::Storage["statistics"].null();
}
if (Request.isMember("clients")){
Controller::fillClients(Request["clients"], Response["clients"]);
}
if (Request.isMember("totals")){
Controller::fillTotals(Request["totals"], Response["totals"]);
}
}
@ -657,6 +576,11 @@ int main(int argc, char ** argv){
}
it->H.Clean();
it->H.SetHeader("Content-Type", "text/javascript");
it->H.SetHeader("Access-Control-Allow-Origin", "*");
it->H.SetHeader("Access-Control-Allow-Methods", "GET, POST");
it->H.SetHeader("Access-Control-Allow-Headers", "Content-Type, X-Requested-With");
it->H.SetHeader("Access-Control-Allow-Credentials", "true");
if (jsonp == ""){
it->H.SetBody(Response.toString() + "\n\n");
}else{
@ -671,15 +595,17 @@ int main(int argc, char ** argv){
}
}
}
if (!conf.is_active){
if (!Controller::conf.is_active){
Controller::Log("CONF", "Controller shutting down because of user request (received shutdown signal)");
}
if (!API_Socket.connected()){
Controller::Log("CONF", "Controller shutting down because of socket problem (API port closed)");
}
Controller::conf.is_active = false;
API_Socket.close();
if ( !Controller::WriteFile(conf.getString("configFile"), Controller::Storage.toString())){
std::cerr << "Error writing config " << conf.getString("configFile") << std::endl;
statsThread.join();
if ( !Controller::WriteFile(Controller::conf.getString("configFile"), Controller::Storage.toString())){
std::cerr << "Error writing config " << Controller::conf.getString("configFile") << std::endl;
Controller::Storage.removeMember("log");
for (JSON::ObjIter it = Controller::Storage["streams"].ObjBegin(); it != Controller::Storage["streams"].ObjEnd(); it++){
it->second.removeMember("meta");

View file

@ -1,6 +1,7 @@
#include <stdio.h> // cout, cerr
#include <string>
#include <cstring> // strcpy
#include <sys/stat.h> //stat
#include <mist/json.h>
#include <mist/config.h>
#include <mist/procs.h>
@ -55,7 +56,14 @@ namespace Controller {
static inline void buildPipedArguments(JSON::Value & p, char * argarr[], JSON::Value & capabilities){
int argnum = 0;
static std::string tmparg;
tmparg = Util::getMyPath() + std::string("MistConn") + p["connector"].asStringRef();
tmparg = Util::getMyPath() + std::string("MistOut") + p["connector"].asStringRef();
struct stat buf;
if (::stat(tmparg.c_str(), &buf) != 0){
tmparg = Util::getMyPath() + std::string("MistConn") + p["connector"].asStringRef();
}
if (::stat(tmparg.c_str(), &buf) != 0){
return;
}
argarr[argnum++] = (char*)tmparg.c_str();
argarr[argnum++] = (char*)"-n";
JSON::Value & pipedCapa = capabilities["connectors"][p["connector"].asStringRef()];

View file

@ -0,0 +1,451 @@
#include <cstdio>
#include <mist/config.h>
#include "controller_statistics.h"
/// The STAT_CUTOFF define sets how many seconds of statistics history is kept.
#define STAT_CUTOFF 600
// These are used to store "clients" field requests in a bitfield for speedup.
#define STAT_CLI_HOST 1
#define STAT_CLI_STREAM 2
#define STAT_CLI_PROTO 4
#define STAT_CLI_CONNTIME 8
#define STAT_CLI_POSITION 16
#define STAT_CLI_DOWN 32
#define STAT_CLI_UP 64
#define STAT_CLI_BPS_DOWN 128
#define STAT_CLI_BPS_UP 256
#define STAT_CLI_ALL 0xFFFF
// These are used to store "totals" field requests in a bitfield for speedup.
#define STAT_TOT_CLIENTS 1
#define STAT_TOT_BPS_DOWN 2
#define STAT_TOT_BPS_UP 4
#define STAT_TOT_ALL 0xFF
std::multimap<unsigned long long int, Controller::statStorage> Controller::oldConns;///<Old connections, sorted on disconnect timestamp
std::map<unsigned long, Controller::statStorage> Controller::curConns;///<Connection storage, sorted on page location.
/// This function runs as a thread and roughly once per second retrieves
/// statistics from all connected clients, as well as wipes
/// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer("statistics", 88, true);
while(((Util::Config*)config)->is_active){
//parse current users
statServer.parseEach(parseStatistics);
//wipe old statistics
while (oldConns.size() && oldConns.begin()->first < (unsigned long long)(Util::epoch() - STAT_CUTOFF)){
oldConns.erase(oldConns.begin());
}
Util::sleep(1000);
}
DEBUG_MSG(DLVL_HIGH, "Stopping stats thread");
}
/// This function is called by parseStatistics.
/// It updates the internally saved statistics data.
void Controller::statStorage::update(IPC::statExchange & data) {
if (streamName == ""){
host = data.host();
streamName = data.streamName();
connector = data.connector();
}
statLog tmp;
tmp.time = data.time();
tmp.lastSecond = data.lastSecond();
tmp.down = data.down();
tmp.up = data.up();
log[data.now()] = tmp;
//wipe data older than approx. STAT_CUTOFF seconds
if (log.size() > STAT_CUTOFF){
log.erase(log.begin());
}
}
/// This function is called by the shared memory page that holds statistics.
/// It updates the internally saved statistics data, archiving if neccessary.
void Controller::parseStatistics(char * data, size_t len, unsigned int id){
IPC::statExchange tmpEx(data);
curConns[id].update(tmpEx);
char counter = (*(data - 1));
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
oldConns.insert(std::pair<unsigned long long int, statStorage>(Util::epoch(), curConns[id]));
curConns.erase(id);
}
}
/// Returns true if this stream has at least one connected client.
bool Controller::hasViewers(std::string streamName){
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
if (it->second.streamName == streamName){
return true;
}
}
}
return false;
}
/// This takes a "clients" request, and fills in the response data.
///
/// \api
/// `"client"` requests take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //array of streamnames to accumulate. Empty means all.
/// "streams": ["streama", "streamb", "streamc"],
/// //array of protocols to accumulate. Empty means all.
/// "protocols": ["HLS", "HSS"],
/// //list of requested data fields. Empty means all.
/// "fields": ["host", "stream", "protocol", "conntime", "position", "down", "up", "downbps", "upbps"],
/// //unix timestamp of measuring moment. Negative means X seconds ago. Empty means now.
/// "time": 1234567
/// }
/// ~~~~~~~~~~~~~~~
/// and are responded to as:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //unix timestamp of data. Always present, always absolute.
/// "time": 1234567,
/// //array of actually represented data fields.
/// "fields": [...]
/// //for all clients, the data in the order they appear in the "fields" field.
/// "data": [[x, y, z], [x, y, z], [x, y, z]]
/// }
/// ~~~~~~~~~~~~~~~
void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
//first, figure out the timestamp wanted
long long int reqTime = 0;
if (req.isMember("time")){
reqTime = req["time"].asInt();
}
//to make sure no nasty timing business takes place, we store the case "now" as a bool.
bool now = (reqTime == 0);
//add the current time, if negative or zero.
if (reqTime <= 0){
reqTime += Util::epoch();
}
//at this point, reqTime is the absolute timestamp.
rep["time"] = reqTime; //fill the absolute timestamp
unsigned int fields = 0;
//next, figure out the fields wanted
if (req.isMember("fields") && req["fields"].size()){
for (JSON::ArrIter it = req["fields"].ArrBegin(); it != req["fields"].ArrEnd(); it++){
if ((*it).asStringRef() == "host"){fields |= STAT_CLI_HOST;}
if ((*it).asStringRef() == "stream"){fields |= STAT_CLI_STREAM;}
if ((*it).asStringRef() == "protocol"){fields |= STAT_CLI_PROTO;}
if ((*it).asStringRef() == "conntime"){fields |= STAT_CLI_CONNTIME;}
if ((*it).asStringRef() == "position"){fields |= STAT_CLI_POSITION;}
if ((*it).asStringRef() == "down"){fields |= STAT_CLI_DOWN;}
if ((*it).asStringRef() == "up"){fields |= STAT_CLI_UP;}
if ((*it).asStringRef() == "downbps"){fields |= STAT_CLI_BPS_DOWN;}
if ((*it).asStringRef() == "upbps"){fields |= STAT_CLI_BPS_UP;}
}
}
//select all, if none selected
if (!fields){fields = STAT_CLI_ALL;}
//figure out what streams are wanted
std::set<std::string> streams;
if (req.isMember("streams") && req["streams"].size()){
for (JSON::ArrIter it = req["streams"].ArrBegin(); it != req["streams"].ArrEnd(); it++){
streams.insert((*it).asStringRef());
}
}
//figure out what protocols are wanted
std::set<std::string> protos;
if (req.isMember("protocols") && req["protocols"].size()){
for (JSON::ArrIter it = req["protocols"].ArrBegin(); it != req["protocols"].ArrEnd(); it++){
protos.insert((*it).asStringRef());
}
}
//output the selected fields
rep["fields"].null();
if (fields & STAT_CLI_HOST){rep["fields"].append("host");}
if (fields & STAT_CLI_STREAM){rep["fields"].append("stream");}
if (fields & STAT_CLI_PROTO){rep["fields"].append("protocol");}
if (fields & STAT_CLI_CONNTIME){rep["fields"].append("conntime");}
if (fields & STAT_CLI_POSITION){rep["fields"].append("position");}
if (fields & STAT_CLI_DOWN){rep["fields"].append("down");}
if (fields & STAT_CLI_UP){rep["fields"].append("up");}
if (fields & STAT_CLI_BPS_DOWN){rep["fields"].append("downbps");}
if (fields & STAT_CLI_BPS_UP){rep["fields"].append("upbps");}
//output the data itself
rep["data"].null();
//start with current connections
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
unsigned long long time = reqTime;
if (now){time = it->second.log.rbegin()->first;}
//data present and wanted? insert it!
if ((it->second.log.rbegin()->first >= time && it->second.log.begin()->first <= time) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
JSON::Value d;
std::map<unsigned long long, statLog>::iterator statRef = it->second.log.lower_bound(time);
std::map<unsigned long long, statLog>::iterator prevRef = --(it->second.log.lower_bound(time));
if (fields & STAT_CLI_HOST){d.append(it->second.host);}
if (fields & STAT_CLI_STREAM){d.append(it->second.streamName);}
if (fields & STAT_CLI_PROTO){d.append(it->second.connector);}
if (fields & STAT_CLI_CONNTIME){d.append((long long)statRef->second.time);}
if (fields & STAT_CLI_POSITION){d.append((long long)statRef->second.lastSecond);}
if (fields & STAT_CLI_DOWN){d.append(statRef->second.down);}
if (fields & STAT_CLI_UP){d.append(statRef->second.up);}
if (fields & STAT_CLI_BPS_DOWN){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.down - prevRef->second.down) / diff);
}else{
d.append(statRef->second.down);
}
}
if (fields & STAT_CLI_BPS_UP){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.up - prevRef->second.up) / diff);
}else{
d.append(statRef->second.up);
}
}
rep["data"].append(d);
}
}
}
//if we're only interested in current, don't even bother looking at history
if (now){
return;
}
//look at history
if (oldConns.size()){
for (std::map<unsigned long long int, statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); it++){
//data present and wanted? insert it!
if ((it->second.log.rbegin()->first >= (unsigned long long)reqTime && it->second.log.begin()->first <= (unsigned long long)reqTime) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
JSON::Value d;
std::map<unsigned long long, statLog>::iterator statRef = it->second.log.lower_bound(reqTime);
std::map<unsigned long long, statLog>::iterator prevRef = --(it->second.log.lower_bound(reqTime));
if (fields & STAT_CLI_HOST){d.append(it->second.host);}
if (fields & STAT_CLI_STREAM){d.append(it->second.streamName);}
if (fields & STAT_CLI_PROTO){d.append(it->second.connector);}
if (fields & STAT_CLI_CONNTIME){d.append((long long)statRef->second.time);}
if (fields & STAT_CLI_POSITION){d.append((long long)statRef->second.lastSecond);}
if (fields & STAT_CLI_DOWN){d.append(statRef->second.down);}
if (fields & STAT_CLI_UP){d.append(statRef->second.up);}
if (fields & STAT_CLI_BPS_DOWN){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.down - prevRef->second.down) / diff);
}else{
d.append(statRef->second.down);
}
}
if (fields & STAT_CLI_BPS_UP){
if (statRef != it->second.log.begin()){
unsigned int diff = statRef->first - prevRef->first;
d.append((statRef->second.up - prevRef->second.up) / diff);
}else{
d.append(statRef->second.up);
}
}
rep["data"].append(d);
}
}
}
//all done! return is by reference, so no need to return anything here.
}
class totalsData {
public:
totalsData(){
clients = 0;
downbps = 0;
upbps = 0;
}
void add(unsigned int down, unsigned int up){
clients++;
downbps += down;
upbps += up;
}
long long clients;
long long downbps;
long long upbps;
};
/// This takes a "totals" request, and fills in the response data.
///
/// \api
/// `"totals"` requests take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //array of streamnames to accumulate. Empty means all.
/// "streams": ["streama", "streamb", "streamc"],
/// //array of protocols to accumulate. Empty means all.
/// "protocols": ["HLS", "HSS"],
/// //list of requested data fields. Empty means all.
/// "fields": ["clients", "downbps", "upbps"],
/// //unix timestamp of data start. Negative means X seconds ago. Empty means earliest available.
/// "start": 1234567
/// //unix timestamp of data end. Negative means X seconds ago. Empty means latest available (usually 'now').
/// "end": 1234567
/// }
/// ~~~~~~~~~~~~~~~
/// and are responded to as:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// //unix timestamp of start of data. Always present, always absolute.
/// "start": 1234567,
/// //unix timestamp of end of data. Always present, always absolute.
/// "end": 1234567,
/// //array of actually represented data fields.
/// "fields": [...]
/// // Time between datapoints. Here: 10 points with each 5 seconds afterwards, followed by 10 points with each 1 second afterwards.
/// "interval": [[10, 5], [10, 1]],
/// //the data for the times as mentioned in the "interval" field, in the order they appear in the "fields" field.
/// "data": [[x, y, z], [x, y, z], [x, y, z]]
/// }
/// ~~~~~~~~~~~~~~~
void Controller::fillTotals(JSON::Value & req, JSON::Value & rep){
//first, figure out the timestamps wanted
long long int reqStart = 0;
long long int reqEnd = 0;
if (req.isMember("start")){
reqStart = req["start"].asInt();
}
if (req.isMember("end")){
reqEnd = req["end"].asInt();
}
//add the current time, if negative or zero.
if (reqStart < 0){
reqStart += Util::epoch();
}
if (reqStart == 0){
reqStart = Util::epoch() - STAT_CUTOFF;
}
if (reqEnd <= 0){
reqEnd += Util::epoch();
}
//at this point, reqStart and reqEnd are the absolute timestamp.
unsigned int fields = 0;
//next, figure out the fields wanted
if (req.isMember("fields") && req["fields"].size()){
for (JSON::ArrIter it = req["fields"].ArrBegin(); it != req["fields"].ArrEnd(); it++){
if ((*it).asStringRef() == "clients"){fields |= STAT_TOT_CLIENTS;}
if ((*it).asStringRef() == "downbps"){fields |= STAT_TOT_BPS_DOWN;}
if ((*it).asStringRef() == "upbps"){fields |= STAT_TOT_BPS_UP;}
}
}
//select all, if none selected
if (!fields){fields = STAT_TOT_ALL;}
//figure out what streams are wanted
std::set<std::string> streams;
if (req.isMember("streams") && req["streams"].size()){
for (JSON::ArrIter it = req["streams"].ArrBegin(); it != req["streams"].ArrEnd(); it++){
streams.insert((*it).asStringRef());
}
}
//figure out what protocols are wanted
std::set<std::string> protos;
if (req.isMember("protocols") && req["protocols"].size()){
for (JSON::ArrIter it = req["protocols"].ArrBegin(); it != req["protocols"].ArrEnd(); it++){
protos.insert((*it).asStringRef());
}
}
//output the selected fields
rep["fields"].null();
if (fields & STAT_TOT_CLIENTS){rep["fields"].append("clients");}
if (fields & STAT_TOT_BPS_DOWN){rep["fields"].append("downbps");}
if (fields & STAT_TOT_BPS_UP){rep["fields"].append("upbps");}
//start data collection
std::map<long long unsigned int, totalsData> totalsCount;
//start with current connections
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
//data present and wanted? insert it!
if (it->second.log.size() > 1 && (it->second.log.rbegin()->first >= (unsigned long long)reqStart || it->second.log.begin()->first <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
//keep track of the previous and current, starting at position 2 so there's always a delta down/up value.
std::map<unsigned long long, statLog>::iterator pi = it->second.log.begin();
for (std::map<unsigned long long, statLog>::iterator li = ++(it->second.log.begin()); li != it->second.log.end(); li++){
if (li->first < (unsigned long long)reqStart || pi->first > (unsigned long long)reqEnd){
continue;
}
unsigned int diff = li->first - pi->first;
unsigned int ddown = (li->second.down - pi->second.down) / diff;
unsigned int dup = (li->second.up - pi->second.up) / diff;
for (long long unsigned int t = pi->first; t < li->first; t++){
if (t >= (unsigned long long)reqStart && t <= (unsigned long long)reqEnd){
totalsCount[t].add(ddown, dup);
}
}
pi = li;//set previous iterator to log iterator
}
}
}
}
//look at history
if (oldConns.size()){
for (std::map<unsigned long long int, statStorage>::iterator it = oldConns.begin(); it != oldConns.end(); it++){
//data present and wanted? insert it!
if (it->second.log.size() > 1 && (it->second.log.rbegin()->first >= (unsigned long long)reqStart || it->second.log.begin()->first <= (unsigned long long)reqEnd) && (!streams.size() || streams.count(it->second.streamName)) && (!protos.size() || protos.count(it->second.connector))){
//keep track of the previous and current, starting at position 2 so there's always a delta down/up value.
std::map<unsigned long long, statLog>::iterator pi = it->second.log.begin();
for (std::map<unsigned long long, statLog>::iterator li = ++(it->second.log.begin()); li != it->second.log.end(); li++){
if (li->first < (unsigned long long)reqStart || pi->first > (unsigned long long)reqEnd){
continue;
}
unsigned int diff = li->first - pi->first;
unsigned int ddown = (li->second.down - pi->second.down) / diff;
unsigned int dup = (li->second.up - pi->second.up) / diff;
for (long long unsigned int t = pi->first; t < li->first; t++){
if (t >= (unsigned long long)reqStart && t <= (unsigned long long)reqEnd){
totalsCount[t].add(ddown, dup);
}
}
pi = li;//set previous iterator to log iterator
}
}
}
}
//output the data itself
if (!totalsCount.size()){
//Oh noes! No data. We'll just reply with a bunch of nulls.
rep["start"].null();
rep["end"].null();
rep["data"].null();
rep["interval"].null();
return;
}
//yay! We have data!
rep["start"] = (long long)totalsCount.begin()->first;
rep["end"] = (long long)totalsCount.rbegin()->first;
rep["data"].null();
rep["interval"].null();
long long prevT = 0;
JSON::Value i;
for (std::map<long long unsigned int, totalsData>::iterator it = totalsCount.begin(); it != totalsCount.end(); it++){
JSON::Value d;
if (fields & STAT_TOT_CLIENTS){d.append(it->second.clients);}
if (fields & STAT_TOT_BPS_DOWN){d.append(it->second.downbps);}
if (fields & STAT_TOT_BPS_UP){d.append(it->second.upbps);}
rep["data"].append(d);
if (prevT){
if (i.size() < 2){
i.append(1ll);
i.append((long long)(it->first - prevT));
}else{
if (i[1u].asInt() != (long long)(it->first - prevT)){
rep["interval"].append(i);
i[0u] = 1ll;
i[1u] = (long long)(it->first - prevT);
}else{
i[0u] = i[0u].asInt() + 1;
}
}
}
prevT = it->first;
}
if (i.size() > 1){
rep["interval"].append(i);
i.null();
}
//all done! return is by reference, so no need to return anything here.
}

View file

@ -0,0 +1,35 @@
#include <mist/shared_memory.h>
#include <mist/timing.h>
#include <mist/defines.h>
#include <mist/json.h>
#include <string>
#include <map>
namespace Controller {
struct statLog {
long time;
long lastSecond;
long long down;
long long up;
};
class statStorage {
public:
void update(IPC::statExchange & data);
std::string host;
std::string streamName;
std::string connector;
std::map<unsigned long long, statLog> log;
};
extern std::multimap<unsigned long long int, statStorage> oldConns;
extern std::map<unsigned long, statStorage> curConns;
void parseStatistics(char * data, size_t len, unsigned int id);
void fillClients(JSON::Value & req, JSON::Value & rep);
void fillTotals(JSON::Value & req, JSON::Value & rep);
void SharedMemStats(void * config);
bool hasViewers(std::string streamName);
}

View file

@ -3,16 +3,17 @@
#include <mist/timing.h>
#include <mist/stream.h>
#include <mist/dtsc.h>
#include <mist/defines.h>
#include <mist/shared_memory.h>
#include "controller_streams.h"
#include "controller_storage.h"
#include "controller_statistics.h"
#include <sys/stat.h>
#include <map>
///\brief Holds everything unique to the controller.
namespace Controller {
std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
///\brief Checks whether two streams are equal.
///\param one The first stream for the comparison.
///\param two The second stream for the comparison.
@ -43,22 +44,28 @@ namespace Controller {
if (data.isMember("source")){
URL = data["source"].asString();
}
std::string buffcmd;
if (URL == ""){
Log("STRM", "Error for stream " + name + "! Source parameter missing.");
data["error"] = "Stream offline: Missing source parameter!";
return;
}
buffcmd = "MistBuffer";
if (data.isMember("DVR") && data["DVR"].asInt() > 0){
data["DVR"] = data["DVR"].asInt();
buffcmd += " -t " + data["DVR"].asString();
}
buffcmd += " -s " + name;
if (URL.substr(0, 4) == "push"){
std::string pusher = URL.substr(7);
Util::Procs::Start(name, Util::getMyPath() + buffcmd + " " + pusher);
Log("BUFF", "(re)starting stream buffer " + name + " for push data from " + pusher);
if (hasViewers(name)){
data["meta"].null();
IPC::sharedPage streamIndex(name,0,false,false);
if (!streamIndex.mapped){
return;
}
unsigned int i = 0;
JSON::fromDTMI((const unsigned char*)streamIndex.mapped + 8, streamIndex.len - 8, i, data["meta"]);
if (data["meta"].isMember("tracks") && data["meta"]["tracks"].size()){
for(JSON::ObjIter trackIt = data["meta"]["tracks"].ObjBegin(); trackIt != data["meta"]["tracks"].ObjEnd(); trackIt++){
trackIt->second.removeMember("fragments");
trackIt->second.removeMember("keys");
trackIt->second.removeMember("parts");
}
}
}
}else{
if (URL.substr(0, 1) == "/"){
data.removeMember("error");
@ -74,6 +81,12 @@ namespace Controller {
getMeta = true;
data["l_meta"] = (long long)fileinfo.st_mtime;
}
if (stat((URL+".dtsh").c_str(), &fileinfo) == 0 && !S_ISDIR(fileinfo.st_mode)){
if ( !data.isMember("h_meta") || fileinfo.st_mtime != data["h_meta"].asInt()){
getMeta = true;
data["h_meta"] = (long long)fileinfo.st_mtime;
}
}
if ( !getMeta && data.isMember("meta") && data["meta"].isMember("tracks")){
for (JSON::ObjIter trIt = data["meta"]["tracks"].ObjBegin(); trIt != data["meta"]["tracks"].ObjEnd(); trIt++){
if (trIt->second["codec"] == "H264"){
@ -107,6 +120,9 @@ namespace Controller {
getMeta = true;
}
if (getMeta){
if ((URL.substr(URL.size() - 5) != ".dtsc") && (stat((URL+".dtsh").c_str(), &fileinfo) != 0)){
Util::Stream::getStream(name);
}
char * tmp_cmd[3] = {0, 0, 0};
std::string mistinfo = Util::getMyPath() + "MistInfo";
tmp_cmd[0] = (char*)mistinfo.c_str();
@ -127,7 +143,7 @@ namespace Controller {
Util::Procs::getOutputOf(tmp_cmd);
data.removeMember("meta");
}
if (Util::epoch() - lastBuffer[name] > 5){
if (!hasViewers(name)){
if ( !data.isMember("error")){
data["error"] = "Available";
}
@ -136,9 +152,11 @@ namespace Controller {
data["online"] = 1;
}
return; //MistPlayer handles VoD
}else{
/// \todo Implement ffmpeg pulling again?
//Util::Procs::Start(name, "ffmpeg -re -async 2 -i " + URL + " -f flv -", Util::getMyPath() + "MistFLV2DTSC", Util::getMyPath() + buffcmd);
//Log("BUFF", "(re)starting stream buffer " + name + " for ffmpeg data: ffmpeg -re -async 2 -i " + URL + " -f flv -");
}
Util::Procs::Start(name, "ffmpeg -re -async 2 -i " + URL + " -f flv -", Util::getMyPath() + "MistFLV2DTSC", Util::getMyPath() + buffcmd);
Log("BUFF", "(re)starting stream buffer " + name + " for ffmpeg data: ffmpeg -re -async 2 -i " + URL + " -f flv -");
}
}
@ -153,7 +171,7 @@ namespace Controller {
if (!jit->second.isMember("name")){
jit->second["name"] = jit->first;
}
if (currTime - lastBuffer[jit->first] > 5){
if (!hasViewers(jit->first)){
if (jit->second.isMember("source") && jit->second["source"].asString().substr(0, 1) == "/" && jit->second.isMember("error")
&& jit->second["error"].asString().substr(0,15) != "Stream offline:"){
jit->second["online"] = 2;
@ -229,11 +247,8 @@ namespace Controller {
WriteFile(Util::getTmpFolder() + "streamlist", strlist.toString());
}
}
///\brief Parse a given stream configuration.
///\param in The requested configuration.
///\param out The new configuration after parsing.
void CheckStreams(JSON::Value & in, JSON::Value & out){
void AddStreams(JSON::Value & in, JSON::Value & out){
//check for new streams and updates
for (JSON::ObjIter jit = in.ObjBegin(); jit != in.ObjEnd(); jit++){
if (out.isMember(jit->first)){
@ -263,6 +278,14 @@ namespace Controller {
startStream(jit->first, out[jit->first]);
}
}
}
///\brief Parse a given stream configuration.
///\param in The requested configuration.
///\param out The new configuration after parsing.
void CheckStreams(JSON::Value & in, JSON::Value & out){
//check for new streams and updates
AddStreams(in, out);
//check for deleted streams
std::set<std::string> toDelete;

View file

@ -1,13 +1,12 @@
#include <mist/json.h>
namespace Controller {
extern std::map<std::string, int> lastBuffer; ///< Last moment of contact with all buffers.
bool streamsEqual(JSON::Value & one, JSON::Value & two);
void startStream(std::string name, JSON::Value & data);
void CheckAllStreams(JSON::Value & data);
void CheckStreams(JSON::Value & in, JSON::Value & out);
void AddStreams(JSON::Value & in, JSON::Value & out);
struct liveCheck {
long long int lastms;
long long int last_active;