LTS Commits

This commit is contained in:
Thulinma 2015-04-05 21:38:36 +02:00
parent f24d97b510
commit 4bdbd82f66
72 changed files with 8245 additions and 105 deletions

View file

@ -46,6 +46,11 @@
#include "controller_capabilities.h"
#include "controller_connectors.h"
#include "controller_statistics.h"
/*LTS-START*/
#include "controller_updater.h"
#include "controller_limits.h"
#include "controller_uplink.h"
/*LTS-END*/
#include "controller_api.h"
#ifndef COMPILED_USERNAME
@ -88,7 +93,19 @@ void createAccount (std::string account){
/// Status monitoring thread.
/// Will check outputs, inputs and converters every five seconds
void statusMonitor(void * np){
#ifdef UPDATER
unsigned long updatechecker = Util::epoch(); /*LTS*/
#endif
while (Controller::conf.is_active){
/*LTS-START*/
#ifdef UPDATER
if (Util::epoch() - updatechecker > 3600){
updatechecker = Util::epoch();
Controller::CheckUpdateInfo();
}
#endif
/*LTS-END*/
//this scope prevents the configMutex from being locked constantly
{
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
@ -127,6 +144,12 @@ int main(int argc, char ** argv){
Controller::conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}"));
Controller::conf.addOption("logfile", JSON::fromString("{\"long\":\"logfile\", \"short\":\"L\", \"arg\":\"string\" \"default\":\"\",\"help\":\"Redirect all standard output to a log file, provided with an argument\"}"));
Controller::conf.addOption("configFile", JSON::fromString("{\"long\":\"config\", \"short\":\"c\", \"arg\":\"string\" \"default\":\"config.json\", \"help\":\"Specify a config file other than default.\"}"));
#ifdef UPDATER
Controller::conf.addOption("update", JSON::fromString("{\"default\":0, \"help\":\"Check for and install updates before starting.\", \"short\":\"D\", \"long\":\"update\"}")); /*LTS*/
#endif
Controller::conf.addOption("uplink", JSON::fromString("{\"default\":\"\", \"arg\":\"string\", \"help\":\"MistSteward uplink host and port.\", \"short\":\"U\", \"long\":\"uplink\"}")); /*LTS*/
Controller::conf.addOption("uplink-name", JSON::fromString("{\"default\":\"" COMPILED_USERNAME "\", \"arg\":\"string\", \"help\":\"MistSteward uplink username.\", \"short\":\"N\", \"long\":\"uplink-name\"}")); /*LTS*/
Controller::conf.addOption("uplink-pass", JSON::fromString("{\"default\":\"" COMPILED_PASSWORD "\", \"arg\":\"string\", \"help\":\"MistSteward uplink password.\", \"short\":\"P\", \"long\":\"uplink-pass\"}")); /*LTS*/
Controller::conf.parseArgs(argc, argv);
if(Controller::conf.getString("logfile")!= ""){
//open logfile, dup stdout to logfile
@ -246,14 +269,29 @@ int main(int argc, char ** argv){
Controller::Log("CONF", "Controller started");
Controller::conf.activate();//activate early, so threads aren't killed.
/*LTS-START*/
#ifdef UPDATER
if (Controller::conf.getBool("update")){
Controller::CheckUpdates();
}
#endif
/*LTS-END*/
//start stats thread
tthread::thread statsThread(Controller::SharedMemStats, &Controller::conf);
//start monitoring thread
tthread::thread monitorThread(statusMonitor, 0);
//start monitoring thread /*LTS*/
tthread::thread uplinkThread(Controller::uplinkConnection, 0);/*LTS*/
//start main loop
Controller::conf.serveThreadedSocket(Controller::handleAPIConnection);
//print shutdown reason
/*LTS-START*/
if (Controller::restarting){
Controller::Log("CONF", "Controller restarting for update");
}
/*LTS-END*/
if (!Controller::conf.is_active){
Controller::Log("CONF", "Controller shutting down because of user request (received shutdown signal)");
}else{
@ -263,6 +301,7 @@ int main(int argc, char ** argv){
//join all joinable threads
statsThread.join();
monitorThread.join();
uplinkThread.join();/*LTS*/
//give everything some time to print messages
Util::wait(100);
//close stderr to make the stderr reading thread exit
@ -282,5 +321,12 @@ int main(int argc, char ** argv){
//stop all child processes
Util::Procs::StopAll();
std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
/*LTS-START*/
if (Controller::restarting){
std::string myFile = Util::getMyPath() + "MistController";
execvp(myFile.c_str(), argv);
std::cout << "Error restarting: " << strerror(errno) << std::endl;
}
/*LTS-END*/
return 0;
}

View file

@ -11,6 +11,10 @@
#include "controller_connectors.h"
#include "controller_capabilities.h"
#include "controller_statistics.h"
/*LTS-START*/
#include "controller_updater.h"
#include "controller_limits.h"
/*LTS-END*/
///\brief Check the submitted configuration and handle things accordingly.
///\param in The new configuration.
@ -114,6 +118,10 @@ void Controller::checkConfig(JSON::Value & in, JSON::Value & out){
/// Please note that this is NOT secure. At all. Never use this mechanism over a public network!
/// A status of `"ACC_MADE"` indicates the account was created successfully and can now be used to login as normal.
bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket::Connection & conn){
#ifdef NOAUTH
Response["authorize"]["status"] = "OK";
return true;
#endif
time_t Time = time(0);
tm * TimeInfo = localtime( &Time);
std::stringstream Date;
@ -191,6 +199,148 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
if (Request.isMember("streams")){
Controller::CheckStreams(Request["streams"], Controller::Storage["streams"]);
}
/*LTS-START*/
///
/// \api
/// `"addstream"` requests (LTS-only) take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// "streamname": {
/// //Stream configuration - see the "streams" call for details on this format.
/// }
/// /// Optionally, repeat for more streams.
/// }
/// ~~~~~~~~~~~~~~~
/// These requests will add new streams or update existing streams with the same names, without touching other streams. In other words, this call can be used for incremental updates to the stream list instead of complete updates, like the "streams" call.
///
if (Request.isMember("addstream")){
Controller::AddStreams(Request["addstream"], Controller::Storage["streams"]);
}
///
/// \api
/// `"deletestream"` requests (LTS-only) take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// "streamname": {} //any contents in this object are ignored
/// /// Optionally, repeat for more streams.
/// }
/// ~~~~~~~~~~~~~~~
/// OR
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// "streamname",
/// /// Optionally, repeat for more streams.
/// ]
/// ~~~~~~~~~~~~~~~
/// OR
/// ~~~~~~~~~~~~~~~{.js}
/// "streamname"
/// ~~~~~~~~~~~~~~~
/// These requests will remove the named stream(s), without touching other streams. In other words, this call can be used for incremental updates to the stream list instead of complete updates, like the "streams" call.
///
if (Request.isMember("deletestream")){
//if array, delete all elements
//if object, delete all entries
//if string, delete just the one
if (Request["deletestream"].isString()){
Controller::Storage["streams"].removeMember(Request["deletestream"].asStringRef());
}
if (Request["deletestream"].isArray()){
for (JSON::ArrIter it = Request["deletestream"].ArrBegin(); it != Request["deletestream"].ArrEnd(); ++it){
Controller::Storage["streams"].removeMember(it->asString());
}
}
if (Request["deletestream"].isObject()){
for (JSON::ObjIter it = Request["deletestream"].ObjBegin(); it != Request["deletestream"].ObjEnd(); ++it){
Controller::Storage["streams"].removeMember(it->first);
}
}
}
///
/// \api
/// `"addprotocol"` requests (LTS-only) take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// "connector": "HTTP" //Name of the connector to enable
/// //any required and/or optional settings may be given here as "name": "value" pairs inside this object.
/// }
/// ~~~~~~~~~~~~~~~
/// OR
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// {
/// "connector": "HTTP" //Name of the connector to enable
/// //any required and/or optional settings may be given here as "name": "value" pairs inside this object.
/// }
/// /// Optionally, repeat for more protocols.
/// ]
/// ~~~~~~~~~~~~~~~
/// These requests will add the given protocol configurations, without touching existing configurations. In other words, this call can be used for incremental updates to the protocols list instead of complete updates, like the "config" call.
/// There is no response to this call.
///
if (Request.isMember("addprotocol")){
if (Request["addprotocol"].isArray()){
for (JSON::ArrIter it = Request["addprotocol"].ArrBegin(); it != Request["addprotocol"].ArrEnd(); ++it){
Controller::Storage["config"]["protocols"].append(*it);
}
}
if (Request["addprotocol"].isObject()){
Controller::Storage["config"]["protocols"].append(Request["addprotocol"]);
}
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities);
}
///
/// \api
/// `"deleteprotocol"` requests (LTS-only) take the form of:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// "connector": "HTTP" //Name of the connector to enable
/// //any required and/or optional settings may be given here as "name": "value" pairs inside this object.
/// }
/// ~~~~~~~~~~~~~~~
/// OR
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// {
/// "connector": "HTTP" //Name of the connector to enable
/// //any required and/or optional settings may be given here as "name": "value" pairs inside this object.
/// }
/// /// Optionally, repeat for more protocols.
/// ]
/// ~~~~~~~~~~~~~~~
/// These requests will remove the given protocol configurations (exact matches only), without touching other configurations. In other words, this call can be used for incremental updates to the protocols list instead of complete updates, like the "config" call.
/// There is no response to this call.
///
if (Request.isMember("deleteprotocol")){
if (Request["deleteprotocol"].isArray() && Request["deleteprotocol"].size()){
JSON::Value newProtocols;
for (JSON::ArrIter it = Controller::Storage["config"]["protocols"].ArrBegin(); it != Controller::Storage["config"]["protocols"].ArrEnd(); ++it){
bool add = true;
for (JSON::ArrIter pit = Request["deleteprotocol"].ArrBegin(); pit != Request["deleteprotocol"].ArrEnd(); ++pit){
if (*it == *pit){
add = false;
break;
}
}
if (add){
newProtocols.append(*it);
}
}
Controller::Storage["config"]["protocols"] = newProtocols;
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities);
}
if (Request["deleteprotocol"].isObject()){
JSON::Value newProtocols;
for (JSON::ArrIter it = Controller::Storage["config"]["protocols"].ArrBegin(); it != Controller::Storage["config"]["protocols"].ArrEnd(); ++it){
if (*it != Request["deleteprotocol"]){
newProtocols.append(*it);
}
}
Controller::Storage["config"]["protocols"] = newProtocols;
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities);
}
}
/*LTS-END*/
if (Request.isMember("capabilities")){
Controller::checkCapable(capabilities);
Response["capabilities"] = capabilities;
@ -248,6 +398,7 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
/// ]
/// ]
/// ~~~~~~~~~~~~~~~
///
if(Request.isMember("browse")){
if(Request["browse"] == ""){
Request["browse"] = ".";
@ -312,6 +463,28 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
}
Response["ui_settings"] = Storage["ui_settings"];
}
/*LTS-START*/
///
/// \api
/// LTS builds will always include an `"LTS"` response, set to 1.
///
Response["LTS"] = 1;
///
/// \api
/// `"autoupdate"` requests (LTS-only) will cause MistServer to apply a rolling update to itself, and are not responded to.
///
#ifdef UPDATER
if (Request.isMember("autoupdate")){
Controller::CheckUpdates();
}
if (Request.isMember("checkupdate")){
Controller::updates = Controller::CheckUpdateInfo();
}
if (Request.isMember("update") || Request.isMember("checkupdate")){
Response["update"] = Controller::updates;
}
#endif
/*LTS-END*/
//sent current configuration, no matter if it was changed or not
Response["config"] = Controller::Storage["config"];
Response["config"]["version"] = PACKAGE_VERSION "/" + Util::Config::libver + "/" RELEASE;
@ -363,6 +536,9 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
Controller::fillTotals(Request["totals"], Response["totals"]);
}
}
if (Request.isMember("active_streams")){
Controller::fillActive(Request["active_streams"], Response["active_streams"]);
}
Controller::writeConfig();
@ -370,6 +546,7 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
Util::sleep(1000);//sleep a second to prevent bruteforcing
logins++;
}
Controller::checkServerLimits(); /*LTS*/
}//config mutex lock
//send the response, either normally or through JSONP callback.
std::string jsonp = "";

View file

@ -0,0 +1,403 @@
#include "controller_limits.h"
#include "controller_statistics.h"
#include "controller_storage.h"
#include <iostream>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
namespace Controller{
void checkStreamLimits(std::string streamName, long long currentKbps, long long connectedUsers){
if( !Storage["streams"].isMember(streamName)){
return;
}
if( !Storage["streams"][streamName].isMember("limits")){
return;
}
if( !Storage["streams"][streamName]["limits"]){
return;
}
Storage["streams"][streamName].removeMember("hardlimit_active");
if (Storage["streams"][streamName]["online"].asInt() != 1){
for (JSON::ArrIter limitIt = Storage["streams"][streamName]["limits"].ArrBegin(); limitIt != Storage["streams"][streamName]["limits"].ArrEnd(); limitIt++){
if ((*limitIt).isMember("triggered")){
if ((*limitIt)["type"].asString() == "soft"){
Log("SLIM", "Softlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " for stream " + streamName + " reset - stream unavailable.");
}else{
Log("HLIM", "Hardlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " for stream " + streamName + " reset - stream unavailable.");
}
(*limitIt).removeMember("triggered");
}
}
return;
}
//run over all limits.
for (JSON::ArrIter limitIt = Storage["streams"][streamName]["limits"].ArrBegin(); limitIt != Storage["streams"][streamName]["limits"].ArrEnd(); limitIt++){
bool triggerLimit = false;
if ((*limitIt)["name"].asString() == "users" && connectedUsers >= (*limitIt)["value"].asInt()){
triggerLimit = true;
}
if ((*limitIt)["name"].asString() == "kbps_max" && currentKbps >= (*limitIt)["value"].asInt()){
triggerLimit = true;
}
if (triggerLimit){
if ((*limitIt)["type"].asString() == "hard"){
Storage["streams"][streamName]["hardlimit_active"] = true;
}
if ((*limitIt).isMember("triggered")){
continue;
}
if ((*limitIt)["type"].asString() == "soft"){
Log("SLIM", "Softlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " for stream " + streamName + " triggered.");
}else{
Log("HLIM", "Hardlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " for stream " + streamName + " triggered.");
}
(*limitIt)["triggered"] = true;
}else{
if ( !(*limitIt).isMember("triggered")){
continue;
}
if ((*limitIt)["type"].asString() == "soft"){
Log("SLIM", "Softlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " for stream " + streamName + " reset.");
}else{
Log("HLIM", "Hardlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " for stream " + streamName + " reset.");
}
(*limitIt).removeMember("triggered");
}
}
}
void checkServerLimits(){
int currentKbps = 0;
int connectedUsers = 0;
std::map<std::string, long long> strmUsers;
std::map<std::string, long long> strmBandw;
/*
if (curConns.size()){
for (std::map<unsigned long, statStorage>::iterator it = curConns.begin(); it != curConns.end(); it++){
if (it->second.log.size() < 2){continue;}
std::map<unsigned long long, statLog>::reverse_iterator statRef = it->second.log.rbegin();
std::map<unsigned long long, statLog>::reverse_iterator prevRef = --(it->second.log.rbegin());
unsigned int diff = statRef->first - prevRef->first;
strmUsers[it->second.streamName]++;
connectedUsers++;
strmBandw[it->second.streamName] += (((statRef->second.down - prevRef->second.down) + (statRef->second.up - prevRef->second.up)) / diff);
currentKbps += (((statRef->second.down - prevRef->second.down) + (statRef->second.up - prevRef->second.up)) / diff);
}
}
*/
//check stream limits
if (Storage["streams"].size()){
for (JSON::ObjIter strmIt = Storage["streams"].ObjBegin(); strmIt != Storage["streams"].ObjEnd(); strmIt++){
checkStreamLimits(strmIt->first, strmBandw[strmIt->first], strmUsers[strmIt->first]);
}
}
Storage["config"].removeMember("hardlimit_active");
if ( !Storage["config"]["limits"].size()){
return;
}
if ( !Storage["streams"].size()){
return;
}
for (JSON::ArrIter limitIt = Storage["config"]["limits"].ArrBegin(); limitIt != Storage["config"]["limits"].ArrEnd(); limitIt++){
bool triggerLimit = false;
if ((*limitIt)["name"].asString() == "users" && connectedUsers >= (*limitIt)["value"].asInt()){
triggerLimit = true;
}
if ((*limitIt)["name"].asString() == "kbps_max" && currentKbps >= (*limitIt)["value"].asInt()){
triggerLimit = true;
}
if (triggerLimit){
if ((*limitIt)["type"].asString() == "hard"){
Storage["config"]["hardlimit_active"] = true;
}
if ((*limitIt).isMember("triggered")){
continue;
}
if ((*limitIt)["type"].asString() == "soft"){
Log("SLIM", "Serverwide softlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " triggered.");
}else{
Log("HLIM", "Serverwide hardlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " triggered.");
}
(*limitIt)["triggered"] = true;
}else{
if ( !(*limitIt).isMember("triggered")){
continue;
}
if ((*limitIt)["type"].asString() == "soft"){
Log("SLIM", "Serverwide softlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " reset.");
}else{
Log("HLIM", "Serverwide hardlimit " + (*limitIt)["name"].asString() + " <= " + (*limitIt)["value"].asString() + " reset.");
}
(*limitIt).removeMember("triggered");
}
}
}
bool onList(std::string ip, std::string list){
if (list == ""){
return false;
}
std::string entry;
std::string lowerIpv6;//lower-case
std::string upperIpv6;//full-caps
do{
entry = list.substr(0,list.find(" "));//make sure we have a single entry
lowerIpv6 = "::ffff:" + entry;
upperIpv6 = "::FFFF:" + entry;
if (entry == ip || lowerIpv6 == ip || upperIpv6 == ip){
return true;
}
long long unsigned int starPos = entry.find("*");
if (starPos == std::string::npos){
if (ip == entry){
return true;
}
}else{
if (starPos == 0){//beginning of the filter
if (ip.substr(ip.length() - entry.size() - 1) == entry.substr(1)){
return true;
}
}else{
if (starPos == entry.size() - 1){//end of the filter
if (ip.find(entry.substr(0, entry.size() - 1)) == 0 ){
return true;
}
if (ip.find(entry.substr(0, lowerIpv6.size() - 1)) == 0 ){
return true;
}
if (ip.find(entry.substr(0, upperIpv6.size() - 1)) == 0 ){
return true;
}
}else{
Log("CONF","Invalid list entry detected: " + entry);
}
}
}
list.erase(0, entry.size() + 1);
}while (list != "");
return false;
}
std::string hostLookup(std::string ip){
struct sockaddr_in6 sa;
char hostName[1024];
char service[20];
if (inet_pton(AF_INET6, ip.c_str(), &(sa.sin6_addr)) != 1){
return "\n";
}
sa.sin6_family = AF_INET6;
sa.sin6_port = 0;
sa.sin6_flowinfo = 0;
sa.sin6_scope_id = 0;
int tmpRet = getnameinfo((struct sockaddr*)&sa, sizeof sa, hostName, sizeof hostName, service, sizeof service, NI_NAMEREQD );
if ( tmpRet == 0){
return hostName;
}
return "";
}
bool isBlacklisted(std::string host, std::string streamName, int timeConnected){
std::string myHostName = hostLookup(host);
if (myHostName == "\n"){
return false;
}
std::string myCountryName = getCountry(host);
JSON::ArrIter limitIt;
bool hasWhitelist = false;
bool hostOnWhitelist = false;
if (Storage["streams"].isMember(streamName)){
if (Storage["streams"][streamName].isMember("limits") && Storage["streams"][streamName]["limits"].size()){
for (limitIt = Storage["streams"][streamName]["limits"].ArrBegin(); limitIt != Storage["streams"][streamName]["limits"].ArrEnd(); limitIt++){
if ((*limitIt)["name"].asString() == "host"){
if ((*limitIt)["value"].asString()[0] == '+'){
if (!onList(host, (*limitIt)["value"].asString().substr(1))){
if (myHostName == ""){
if (timeConnected > Storage["config"]["limit_timeout"].asInt()){
return true;
}
}else{
if ( !onList(myHostName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " not whitelisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " not whitelisted for stream " + streamName);
}
}
}
}
}else{
if ((*limitIt)["value"].asString()[0] == '-'){
if (onList(host, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " blacklisted for stream " + streamName);
}
}
if (myHostName != "" && onList(myHostName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + myHostName + " blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + myHostName + " blacklisted for stream " + streamName);
}
}
}
}
}
if ((*limitIt)["name"].asString() == "geo"){
if ((*limitIt)["value"].asString()[0] == '+'){
if (myCountryName == ""){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " with unknown location blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " with unknown location blacklisted for stream " + streamName);
}
}
if (!onList(myCountryName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " with location " + myCountryName + " not whitelisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " with location " + myCountryName + " not whitelisted for stream " + streamName);
}
}
}else{
if ((*limitIt)["val"].asString()[0] == '-'){
if (onList(myCountryName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " with location " + myCountryName + " blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " with location " + myCountryName + " blacklisted for stream " + streamName);
}
}
}
}
}
}
}
}
if (Storage["config"]["limits"].size()){
for (limitIt = Storage["config"]["limits"].ArrBegin(); limitIt != Storage["config"]["limits"].ArrEnd(); limitIt++){
if ((*limitIt)["name"].asString() == "host"){
if ((*limitIt)["value"].asString()[0] == '+'){
if (!onList(host, (*limitIt)["value"].asString().substr(1))){
if (myHostName == ""){
if (timeConnected > Storage["config"]["limit_timeout"].asInt()){
return true;
}
}else{
if ( !onList(myHostName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " not whitelisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " not whitelisted for stream " + streamName);
}
}
}
}
}else{
if ((*limitIt)["value"].asString()[0] == '-'){
if (onList(host, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " blacklisted for stream " + streamName);
}
}
if (myHostName != "" && onList(myHostName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + myHostName + " blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + myHostName + " blacklisted for stream " + streamName);
}
}
}
}
}
if ((*limitIt)["name"].asString() == "geo"){
if ((*limitIt)["value"].asString()[0] == '+'){
if (myCountryName == ""){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " with unknown location blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " with unknown location blacklisted for stream " + streamName);
}
}
if (!onList(myCountryName, (*limitIt)["value"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " with location " + myCountryName + " not whitelisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " with location " + myCountryName + " not whitelisted for stream " + streamName);
}
}
}else{
if ((*limitIt)["value"].asString()[0] == '-'){
if (onList(myCountryName, (*limitIt)["val"].asString().substr(1))){
if ((*limitIt)["type"].asString() == "hard"){
Log("HLIM", "Host " + host + " with location " + myCountryName + " blacklisted for stream " + streamName);
return true;
}else{
Log("SLIM", "Host " + host + " with location " + myCountryName + " blacklisted for stream " + streamName);
}
}
}
}
}
}
}
if (hasWhitelist){
if (hostOnWhitelist || myHostName == ""){
return false;
}else{
return true;
}
}
return false;
}
std::string getCountry(std::string ip){
char * code = NULL;
#ifdef GEOIP
GeoIP * geoIP;
geoIP = GeoIP_open(GEOIPV4, GEOIP_STANDARD | GEOIP_CHECK_CACHE);
if (!geoIP){
std::cerr << "An error occured loading the IPv4 database" << std::endl;
}else{
code = (char*)GeoIP_country_code_by_addr(geoIP, ip.c_str());
GeoIP_delete(geoIP);
}
if (!code){
geoIP = GeoIP_open(GEOIPV6, GEOIP_STANDARD | GEOIP_CHECK_CACHE);
if (!geoIP){
std::cerr << "An error occured loading the IPv6 database" << std::endl;
}else{
code = (char*)GeoIP_country_code_by_addr_v6(geoIP, ip.c_str());
GeoIP_delete(geoIP);
}
}
#endif
if (!code){
return "";
}
return code;
}
}

View file

@ -0,0 +1,21 @@
#pragma once
#include <mist/json.h>
#include <map>
#include <string>
/*LTS-START*/
#ifdef GEOIP
#include <GeoIP.h>
#define GEOIPV4 "/usr/share/GeoIP/GeoIP.dat"
#define GEOIPV6 "/usr/share/GeoIP/GeoIPv6.dat"
#endif
/*LTS-END*/
namespace Controller{
void checkStreamLimits(std::string streamName, long long currentKbps, long long connectedUsers);
void checkServerLimits();
bool isBlacklisted(std::string host, std::string streamName, int timeConnected);
std::string hostLookup(std::string ip);
bool onList(std::string ip, std::string list);
std::string getCountry(std::string ip);
}

View file

@ -1,6 +1,11 @@
#include <cstdio>
#include <mist/config.h>
#include "controller_statistics.h"
#include "controller_limits.h"
#ifndef KILL_ON_EXIT
#define KILL_ON_EXIT false
#endif
// These are used to store "clients" field requests in a bitfield for speedup.
#define STAT_CLI_HOST 1
@ -23,6 +28,7 @@
std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; ///< list of sessions that have statistics data available
std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
bool Controller::killOnExit = KILL_ON_EXIT;
tthread::mutex Controller::statsMutex;
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
@ -79,6 +85,10 @@ bool Controller::sessIndex::operator>= (const Controller::sessIndex &b) const{
return !(*this < b);
}
/// Forces a disconnect to all users.
void Controller::killStatistics(char * data, size_t len, unsigned int id){
(*(data - 1)) = 128;//Send disconnect message;
}
/// This function runs as a thread and roughly once per second retrieves
/// statistics from all connected clients, as well as wipes
@ -98,10 +108,19 @@ void Controller::SharedMemStats(void * config){
it->second.wipeOld(cutOffPoint);
}
}
Controller::checkServerLimits(); /*LTS*/
}
Util::sleep(1000);
}
DEBUG_MSG(DLVL_HIGH, "Stopping stats thread");
if (Controller::killOnExit){
DEBUG_MSG(DLVL_WARN, "Killing all connected clients to force full shutdown");
unsigned int c = 0;//to prevent eternal loops
do{
statServer.parseEach(killStatistics);
Util::wait(250);
}while(statServer.amount && c++ < 10);
}
}
/// Updates the given active connection with new stats data.
@ -416,6 +435,11 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
sessions[idx].finish(id);
connToSession.erase(id);
}
/*LTS-START*/
//if (counter < 125 && Controller::isBlacklisted(tmpEx.host(), ID, tmpEx.time())){
// (*(data - 1)) = 128;//Send disconnect message;
//}
/*LTS-END*/
}
/// Returns true if this stream has at least one connected client.
@ -554,6 +578,36 @@ void Controller::fillClients(JSON::Value & req, JSON::Value & rep){
//all done! return is by reference, so no need to return anything here.
}
/// This takes a "active_streams" request, and fills in the response data.
///
/// \api
/// `"active_streams"` requests are always empty (passed data is ignored), and are responded to as:
/// ~~~~~~~~~~~~~~~{.js}
/// [
/// //Array of stream names
/// "streamA",
/// "streamB",
/// "streamC"
/// ]
/// ~~~~~~~~~~~~~~~
/// All streams that any statistics data is available for are listed, and only those streams.
void Controller::fillActive(JSON::Value & req, JSON::Value & rep){
//collect the data first
std::set<std::string> streams;
//check all sessions
if (sessions.size()){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
streams.insert(it->first.streamName);
}
}
//Good, now output what we found...
rep.null();
for (std::set<std::string>::iterator it = streams.begin(); it != streams.end(); it++){
rep.append(*it);
}
//all done! return is by reference, so no need to return anything here.
}
class totalsData {
public:
totalsData(){

View file

@ -11,6 +11,9 @@
namespace Controller {
extern bool killOnExit;
struct statLog {
long time;
long lastSecond;
@ -80,7 +83,9 @@ namespace Controller {
extern std::map<unsigned long, sessIndex> connToSession;
extern tthread::mutex statsMutex;
void parseStatistics(char * data, size_t len, unsigned int id);
void killStatistics(char * data, size_t len, unsigned int id);
void fillClients(JSON::Value & req, JSON::Value & rep);
void fillActive(JSON::Value & req, JSON::Value & rep);
void fillTotals(JSON::Value & req, JSON::Value & rep);
void SharedMemStats(void * config);
bool hasViewers(std::string streamName);

View file

@ -9,6 +9,7 @@
#include "controller_capabilities.h"
#include "controller_storage.h"
#include "controller_statistics.h"
#include "controller_limits.h" /*LTS*/
#include <sys/stat.h>
#include <map>
@ -73,6 +74,7 @@ namespace Controller {
trackIt->second.removeMember("keys");
trackIt->second.removeMember("keysizes");
trackIt->second.removeMember("parts");
trackIt->second.removeMember("ivecs");/*LTS*/
}
}
}
@ -82,7 +84,7 @@ namespace Controller {
//vod-style stream
data.removeMember("error");
struct stat fileinfo;
if (stat(URL.c_str(), &fileinfo) != 0 || S_ISDIR(fileinfo.st_mode)){
if (stat(URL.c_str(), &fileinfo) != 0){
data["error"] = "Stream offline: Not found: " + URL;
if (data["error"].asStringRef() != prevState){
Log("BUFF", "Warning for VoD stream " + name + "! File not found: " + URL);
@ -121,6 +123,9 @@ namespace Controller {
DEBUG_MSG(DLVL_INSANE, "Invalid metadata (no tracks object) for stream %s - triggering reload", name.c_str());
getMeta = true;
}
if (*(URL.rbegin()) == '/'){
getMeta = false;
}
if (getMeta){
// if the file isn't dtsc and there's no dtsh file, run getStream on it
// this guarantees that if the stream is playable, it now has a valid header.
@ -198,6 +203,7 @@ namespace Controller {
}else{
data["online"] = 1;
}
checkServerLimits(); /*LTS*/
return;
}
/// \todo Implement ffmpeg pulling again?
@ -231,6 +237,7 @@ namespace Controller {
}
jit->second["online"] = 0;
}
checkServerLimits(); /*LTS*/
}else{
// assume all is fine
jit->second.removeMember("error");
@ -242,7 +249,7 @@ namespace Controller {
jit->second["error"] = "No (valid) source connected ";
}else{
// for live streams, keep track of activity
if (jit->second["meta"].isMember("live")){
if (jit->second.isMember("meta") && jit->second["meta"].isMember("live")){
static std::map<std::string, liveCheck> checker;
//check H264 tracks for optimality
if (jit->second.isMember("meta") && jit->second["meta"].isMember("tracks")){
@ -251,7 +258,11 @@ namespace Controller {
checker[jit->first].lastms = trIt->second["lastms"].asInt();
checker[jit->first].last_active = currTime;
}
/*LTS-START*/
if (trIt->second["firstms"].asInt() > Storage["streams"][jit->first]["cut"].asInt()){
Storage["streams"][jit->first].removeMember("cut");
}
/*LTS-END*/
}
}
// mark stream as offline if no activity for 5 seconds
@ -289,14 +300,8 @@ namespace Controller {
Log("STRM", std::string("Updated stream ") + jit->first);
}
}else{
out[jit->first] = jit->second;
out[jit->first]["name"] = jit->first;
out[jit->first]["source"] = jit->second["source"];
if (jit->second.isMember("DVR")){
out[jit->first]["DVR"] = jit->second["DVR"].asInt();
}
if (jit->second.isMember("cut")){
out[jit->first]["cut"] = jit->second["cut"].asInt();
}
Log("STRM", std::string("New stream ") + jit->first);
}
}

View file

@ -0,0 +1,253 @@
/// \file controller_updater.cpp
/// Contains all code for the controller updater.
#include <fstream> //for files
#include <iostream> //for stdio
#include <unistd.h> //for unlink
#include <sys/stat.h> //for chmod
#include <stdlib.h> //for srand, rand
#include <time.h> //for time
#include <signal.h> //for raise
#include <mist/http_parser.h>
#include <mist/socket.h>
#include <mist/auth.h>
#include <mist/timing.h>
#include <mist/config.h>
#include "controller_storage.h"
#include "controller_connectors.h"
#include "controller_updater.h"
namespace Controller {
bool restarting = false;
JSON::Value updates;
std::string uniqId;
std::string readFile(std::string filename){
std::ifstream file(filename.c_str());
if ( !file.good()){
return "";
}
file.seekg(0, std::ios::end);
unsigned int len = file.tellg();
file.seekg(0, std::ios::beg);
std::string out;
out.reserve(len);
unsigned int i = 0;
while (file.good() && i++ < len){
out += file.get();
}
file.close();
return out;
} //readFile
bool writeFile(std::string filename, std::string & contents){
unlink(filename.c_str());
std::ofstream file(filename.c_str(), std::ios_base::trunc | std::ios_base::out);
if ( !file.is_open()){
return false;
}
file << contents;
file.close();
chmod(filename.c_str(), S_IRWXU | S_IRWXG);
return true;
} //writeFile
/// \api
/// `"update"` and `"checkupdate"` requests (LTS-only) are responded to as:
/// ~~~~~~~~~~~~~~~{.js}
/// {
/// "error": "Something went wrong", // 'Optional'
/// "release": "LTS64_99",
/// "version": "1.2 / 6.0.0",
/// "date": "January 5th, 2014",
/// "uptodate": 0,
/// "needs_update": ["MistBuffer", "MistController"], //Controller is guaranteed to be last
/// "MistController": "abcdef1234567890", //md5 sum of latest version
/// //... all other MD5 sums follow
/// }
/// ~~~~~~~~~~~~~~~
/// Note that `"update"` will only list known information, while `"checkupdate"` triggers an information refresh from the update server.
JSON::Value CheckUpdateInfo(){
JSON::Value ret;
if (uniqId == ""){
srand(time(NULL));
do{
char meh = 64 + rand() % 62;
uniqId += meh;
}while(uniqId.size() < 16);
}
//initialize connection
HTTP::Parser http;
JSON::Value updrInfo;
Socket::Connection updrConn("releases.mistserver.org", 80, true);
if ( !updrConn){
Log("UPDR", "Could not connect to releases.mistserver.org to get update information.");
ret["error"] = "Could not connect to releases.mistserver.org to get update information.";
return ret;
}
//retrieve update information
http.url = "/getsums.php?verinfo=1&rel=" RELEASE "&pass=" SHARED_SECRET "&uniqId=" + uniqId;
http.method = "GET";
http.SetHeader("Host", "releases.mistserver.org");
http.SetHeader("X-Version", PACKAGE_VERSION "/" + Util::Config::libver + "/" RELEASE);
updrConn.SendNow(http.BuildRequest());
http.Clean();
unsigned int startTime = Util::epoch();
while ((Util::epoch() - startTime < 10) && (updrConn || updrConn.Received().size())){
if (updrConn.spool() || updrConn.Received().size()){
if ( *(updrConn.Received().get().rbegin()) != '\n'){
std::string tmp = updrConn.Received().get();
updrConn.Received().get().clear();
if (updrConn.Received().size()){
updrConn.Received().get().insert(0, tmp);
}else{
updrConn.Received().append(tmp);
}
continue;
}
if (http.Read(updrConn.Received().get())){
updrInfo = JSON::fromString(http.body);
break; //break out of while loop
}
}
}
updrConn.close();
if (updrInfo){
if (updrInfo.isMember("error")){
Log("UPDR", updrInfo["error"].asStringRef());
ret["error"] = updrInfo["error"];
ret["uptodate"] = 1;
return ret;
}
ret["release"] = RELEASE;
if (updrInfo.isMember("version")){
ret["version"] = updrInfo["version"];
}
if (updrInfo.isMember("date")){
ret["date"] = updrInfo["date"];
}
ret["uptodate"] = 1;
ret["needs_update"].null();
// check if everything is up to date or not
for (JSON::ObjIter it = updrInfo.ObjBegin(); it != updrInfo.ObjEnd(); it++){
if (it->first.substr(0, 4) != "Mist"){
continue;
}
ret[it->first] = it->second;
if (it->second.asString() != Secure::md5(readFile(Util::getMyPath() + it->first))){
ret["uptodate"] = 0;
if (it->first.substr(0, 14) == "MistController"){
ret["needs_update"].append(it->first);
}else{
ret["needs_update"].prepend(it->first);
}
}
}
}else{
Log("UPDR", "Could not retrieve update information from releases server.");
ret["error"] = "Could not retrieve update information from releases server.";
}
return ret;
}
/// Calls CheckUpdateInfo(), uses the resulting JSON::Value to download any needed updates.
/// Will shut down the server if the JSON::Value contained a "shutdown" member.
void CheckUpdates(){
JSON::Value updrInfo = CheckUpdateInfo();
if (updrInfo.isMember("error")){
Log("UPDR", "Error retrieving update information: " + updrInfo["error"].asString());
return;
}
if (updrInfo.isMember("shutdown")){
Log("DDVT", "Shutting down: " + updrInfo["shutdown"].asString());
restarting = false;
raise(SIGINT); //trigger shutdown
return;
}
if (updrInfo["uptodate"]){
//nothing to do
return;
}
//initialize connection
Socket::Connection updrConn("releases.mistserver.org", 80, true);
if ( !updrConn){
Log("UPDR", "Could not connect to releases.mistserver.org.");
return;
}
//loop through the available components, update them
for (JSON::ArrIter it = updrInfo["needs_update"].ArrBegin(); it != updrInfo["needs_update"].ArrEnd(); it++){
updateComponent(it->asStringRef(), updrInfo[it->asStringRef()].asStringRef(), updrConn);
}
updrConn.close();
} //CheckUpdates
/// Attempts to download an update for the listed component.
/// \param component Filename of the component being checked.
/// \param md5sum The MD5 sum of the latest version of this file.
/// \param updrConn An connection to releases.mistserver.org to (re)use. Will be (re)opened if closed.
void updateComponent(const std::string & component, const std::string & md5sum, Socket::Connection & updrConn){
Log("UPDR", "Downloading update for " + component);
std::string new_file;
HTTP::Parser http;
http.url = "/getfile.php?rel=" RELEASE "&pass=" SHARED_SECRET "&file=" + component;
http.method = "GET";
http.SetHeader("Host", "releases.mistserver.org");
if ( !updrConn){
updrConn = Socket::Connection("releases.mistserver.org", 80, true);
if ( !updrConn){
Log("UPDR", "Could not connect to releases.mistserver.org for file download.");
return;
}
}
http.SendRequest(updrConn);
http.Clean();
unsigned int startTime = Util::epoch();
while ((Util::epoch() - startTime < 90) && (updrConn || updrConn.Received().size())){
if (updrConn.spool() || updrConn.Received().size()){
if ( *(updrConn.Received().get().rbegin()) != '\n'){
std::string tmp = updrConn.Received().get();
updrConn.Received().get().clear();
if (updrConn.Received().size()){
updrConn.Received().get().insert(0, tmp);
}else{
updrConn.Received().append(tmp);
}
}
if (http.Read(updrConn.Received().get())){
new_file = http.body;
break; //break out of while loop
}
}
}
http.Clean();
if (new_file == ""){
Log("UPDR", "Could not retrieve new version of " + component + " - retrying next time.");
return;
}
if (Secure::md5(new_file) != md5sum){
Log("UPDR", "Checksum "+Secure::md5(new_file)+" of " + component + " does not match "+md5sum+" - retrying next time.");
return;
}
if (writeFile(Util::getMyPath() + component, new_file)){
Controller::UpdateProtocol(component);
if (component == "MistController"){
restarting = true;
raise(SIGINT); //trigger restart
}
Log("UPDR", "New version of " + component + " installed.");
}else{
Log("UPDR", component + " could not be updated! (No write access to file?)");
}
}
} //Controller namespace

View file

@ -0,0 +1,21 @@
/// \file controller_updater.cpp
/// Contains all code for the controller updater.
#include <string>
#ifndef SHARED_SECRET
#define SHARED_SECRET "empty"
#endif
namespace Controller {
extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
extern JSON::Value updates;
extern std::string uniqId;
std::string readFile(std::string filename);
bool writeFile(std::string filename, std::string & contents);
JSON::Value CheckUpdateInfo();
void CheckUpdates();
void updateComponent(const std::string & component, const std::string & md5sum, Socket::Connection & updrConn);
} //Controller namespace

View file

@ -0,0 +1,138 @@
#include <stdlib.h>
#include <mist/auth.h>
#include <mist/dtsc.h>
#include <mist/config.h>
#include <mist/defines.h>
#include <mist/timing.h>
#include "controller_uplink.h"
#include "controller_storage.h"
#include "controller_streams.h"
#include "controller_connectors.h"
#include "controller_capabilities.h"
#include "controller_statistics.h"
#include "controller_updater.h"
#include "controller_limits.h"
#include "controller_api.h"
void Controller::uplinkConnection(void * np) {
std::string uplink_name = Controller::conf.getString("uplink-name");
std::string uplink_pass = Controller::conf.getString("uplink-pass");
std::string uplink_addr = Controller::conf.getString("uplink");
std::string uplink_host = "";
std::string uplink_chal = "";
int uplink_port = 0;
if (uplink_addr.size() > 0) {
size_t colon = uplink_addr.find(':');
if (colon != std::string::npos && colon != 0 && colon != uplink_addr.size()) {
uplink_host = uplink_addr.substr(0, colon);
uplink_port = atoi(uplink_addr.substr(colon + 1, std::string::npos).c_str());
Controller::Log("CONF", "Connection to uplink enabled on host " + uplink_host + " and port " + uplink_addr.substr(colon + 1, std::string::npos));
}
}
//cancel the whole thread if no uplink is set
if (!uplink_port) {
return;
}
if (uniqId == ""){
srand(time(NULL));
do{
char meh = 64 + rand() % 62;
uniqId += meh;
}while(uniqId.size() < 16);
}
unsigned long long lastSend = Util::epoch() - 5;
Socket::Connection uplink;
while (Controller::conf.is_active) {
if (!uplink) {
INFO_MSG("Connecting to uplink at %s:%u", uplink_host.c_str(), uplink_port);
uplink = Socket::Connection(uplink_host, uplink_port, true);
}
if (uplink) {
if (uplink.spool()) {
if (uplink.Received().available(9)) {
std::string data = uplink.Received().copy(8);
if (data.substr(0, 4) != "DTSC") {
uplink.Received().clear();
continue;
}
unsigned int size = ntohl(*(const unsigned int *)(data.data() + 4));
if (uplink.Received().available(8 + size)) {
std::string packet = uplink.Received().remove(8 + size);
DTSC::Scan inScan = DTSC::Packet(packet.data(), packet.size()).getScan();
if (!inScan){continue;}
JSON::Value curVal;
//Parse config and streams from the request.
if (inScan.hasMember("authorize") && inScan.getMember("authorize").hasMember("challenge")){
uplink_chal = inScan.getMember("authorize").getMember("challenge").asString();
}
if (inScan.hasMember("config")) {
curVal = inScan.getMember("config").asJSON();
Controller::checkConfig(curVal, Controller::Storage["config"]);
Controller::CheckProtocols(Controller::Storage["config"]["protocols"], capabilities);
}
if (inScan.hasMember("streams")) {
curVal = inScan.getMember("streams").asJSON();
Controller::CheckStreams(curVal, Controller::Storage["streams"]);
}
if (inScan.hasMember("addstream")) {
curVal = inScan.getMember("addstream").asJSON();
Controller::AddStreams(curVal, Controller::Storage["streams"]);
Controller::CheckAllStreams(Controller::Storage["streams"]);
}
if (inScan.hasMember("deletestream")) {
curVal = inScan.getMember("deletestream").asJSON();
//if array, delete all elements
//if object, delete all entries
//if string, delete just the one
if (curVal.isString()) {
Controller::Storage["streams"].removeMember(curVal.asStringRef());
}
if (curVal.isArray()) {
for (JSON::ArrIter it = curVal.ArrBegin(); it != curVal.ArrEnd(); ++it) {
Controller::Storage["streams"].removeMember(it->asString());
}
}
if (curVal.isObject()) {
for (JSON::ObjIter it = curVal.ObjBegin(); it != curVal.ObjEnd(); ++it) {
Controller::Storage["streams"].removeMember(it->first);
}
}
Controller::CheckAllStreams(Controller::Storage["streams"]);
}
}
}
}
if (Util::epoch() - lastSend >= 2) {
JSON::Value data;
data["tracks"].null();//make sure the data is encoded as DTSC
if (uplink_chal.size()){
data["authorize"]["username"] = uplink_name;
data["authorize"]["password"] = Secure::md5( Secure::md5(uplink_pass) + uplink_chal);
}
JSON::Value totalsRequest;
Controller::fillClients(totalsRequest, data["clients"]);
totalsRequest["start"] = (long long)lastSend;
Controller::fillTotals(totalsRequest, data["totals"]);
data["streams"] = Controller::Storage["streams"];
for (JSON::ObjIter it = data["streams"].ObjBegin(); it != data["streams"].ObjEnd(); it++){
it->second.removeMember("meta");
it->second.removeMember("l_meta");
it->second.removeMember("name");
}
data["config"] = Controller::Storage["config"];
data["config"]["uniq"] = uniqId;
data["config"]["version"] = PACKAGE_VERSION "/" + Util::Config::libver + "/" RELEASE;
Controller::checkCapable(capabilities);
data["capabilities"] = capabilities;
data["capabilities"].removeMember("connectors");
data.sendTo(uplink);
lastSend = Util::epoch();
}
} else {
Controller::Log("UPLK", "Could not connect to uplink.");
}
Util::wait(2000);//wait for 2.5 seconds
}
}

View file

@ -0,0 +1,3 @@
namespace Controller {
void uplinkConnection(void * np);
}