Access logs + made prometheus runtime changeable

This commit is contained in:
Thulinma 2016-07-20 11:01:07 +02:00
parent 7629b00a4c
commit 3980b8c8c1
9 changed files with 229 additions and 51 deletions

View file

@ -14,6 +14,7 @@
#include "procs.h"
#include "bitfields.h"
#include "timing.h"
#include "auth.h"
#if defined(__CYGWIN__) || defined(_WIN32)
#include <windows.h>
@ -619,6 +620,11 @@ namespace IPC {
htobl(data + 8, time);
}
/// Calculates session ID from CRC, stream name, connector and host.
std::string statExchange::getSessId(){
return Secure::md5(data+32, 140);
}
///\brief Gets time currently connected
long statExchange::time() {
long result;

View file

@ -41,6 +41,7 @@ namespace IPC {
void setSync(char s);
unsigned int crc();
uint32_t getPID();
std::string getSessId();
private:
///\brief The payload for the stat exchange
/// - 8 byte - now (timestamp of last statistics)

View file

@ -181,6 +181,7 @@ int main_loop(int argc, char ** argv){
Controller::conf.addOption("maxconnsperip", JSON::fromString("{\"long\":\"maxconnsperip\", \"short\":\"M\", \"arg\":\"integer\" \"default\":0, \"help\":\"Max simultaneous sessions per unique IP address. Only enforced if the USER_NEW trigger is in use.\"}"));
Controller::conf.addOption("account", JSON::fromString("{\"long\":\"account\", \"short\":\"a\", \"arg\":\"string\" \"default\":\"\", \"help\":\"A username:password string to create a new account with.\"}"));
Controller::conf.addOption("logfile", JSON::fromString("{\"long\":\"logfile\", \"short\":\"L\", \"arg\":\"string\" \"default\":\"\",\"help\":\"Redirect all standard output to a log file, provided with an argument\"}"));
Controller::conf.addOption("accesslog", JSON::fromString("{\"long\":\"accesslog\", \"short\":\"A\", \"arg\":\"string\" \"default\":\"LOG\",\"help\":\"Where to write the access log. If set to 'LOG' (the default), writes to wherever the log is written to. If empty, access logging is turned off. Otherwise, writes to the given filename.\"}"));
Controller::conf.addOption("configFile", JSON::fromString("{\"long\":\"config\", \"short\":\"c\", \"arg\":\"string\" \"default\":\"config.json\", \"help\":\"Specify a config file other than default.\"}"));
#ifdef UPDATER
Controller::conf.addOption("update", JSON::fromString("{\"default\":0, \"help\":\"Check for and install updates before starting.\", \"short\":\"D\", \"long\":\"update\"}")); /*LTS*/
@ -244,8 +245,14 @@ int main_loop(int argc, char ** argv){
if (Controller::Storage["config"]["controller"]["prometheus"]){
Controller::conf.getOption("prometheus", true)[0u] = Controller::Storage["config"]["controller"]["prometheus"];
}
if (Controller::Storage["config"].isMember("accesslog")){
Controller::conf.getOption("accesslog", true)[0u] = Controller::Storage["config"]["accesslog"];
}
Controller::maxConnsPerIP = Controller::conf.getInteger("maxconnsperip");
Controller::Storage["config"]["controller"]["prometheus"] = Controller::conf.getString("prometheus");
Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog");
Controller::prometheus = Controller::Storage["config"]["controller"]["prometheus"].asStringRef();
Controller::accesslog = Controller::Storage["config"]["accesslog"].asStringRef();
{
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.unlink();

View file

@ -71,6 +71,12 @@ void Controller::checkConfig(JSON::Value & in, JSON::Value & out){
INFO_MSG("Debug level set to %u", Util::Config::printDebugLevel);
}
}
if (out.isMember("controller") && out["controller"].isMember("prometheus")){
Controller::prometheus = out["controller"]["prometheus"].asStringRef();
}
if (out.isMember("accesslog")){
Controller::accesslog = out["accesslog"].asStringRef();
}
}
///\brief Checks an authorization request for a given user.
@ -168,12 +174,12 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
while (conn && logins < 4){
if ((conn.spool() || conn.Received().size()) && H.Read(conn)){
//Catch prometheus requests
if (conf.getString("prometheus").size()){
if (H.url == "/"+Controller::conf.getString("prometheus")){
if (Controller::prometheus.size()){
if (H.url == "/"+Controller::prometheus){
handlePrometheus(H, conn, PROMETHEUS_TEXT);
continue;
}
if (H.url == "/"+Controller::conf.getString("prometheus")+".json"){
if (H.url == "/"+Controller::prometheus+".json"){
handlePrometheus(H, conn, PROMETHEUS_JSON);
continue;
}
@ -602,6 +608,34 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
}
}
if (Request.isMember("stop_sessid")){
if (Request["stop_sessid"].isArray() || Request["stop_sessid"].isObject()){
jsonForEach(Request["stop_sessid"], it){
Controller::sessId_shutdown(it->asStringRef());
}
}else{
Controller::sessId_shutdown(Request["stop_sessid"].asStringRef());
}
}
if (Request.isMember("stop_tag")){
if (Request["stop_tag"].isArray() || Request["stop_tag"].isObject()){
jsonForEach(Request["stop_tag"], it){
Controller::tag_shutdown(it->asStringRef());
}
}else{
Controller::tag_shutdown(Request["stop_tag"].asStringRef());
}
}
if (Request.isMember("tag_sessid")){
if (Request["tag_sessid"].isObject()){
jsonForEach(Request["tag_sessid"], it){
Controller::sessId_tag(it.key(), it->asStringRef());
}
}
}
if (Request.isMember("push_start")){
std::string stream;
@ -665,6 +699,7 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
Controller::pushSettings(Request["push_settings"], Response["push_settings"]);
}
Controller::configChanged = true;
}else{//unauthorized

View file

@ -60,6 +60,7 @@ static unsigned long long servOutputs = 0;
static unsigned long long servViewers = 0;
Controller::sessIndex::sessIndex(std::string dhost, unsigned int dcrc, std::string dstreamName, std::string dconnector){
ID = "UNSET";
host = dhost;
crc = dcrc;
streamName = dstreamName;
@ -72,7 +73,7 @@ Controller::sessIndex::sessIndex(){
std::string Controller::sessIndex::toStr(){
std::stringstream s;
s << host << " " << crc << " " << streamName << " " << connector;
s << ID << "(" << host << " " << crc << " " << streamName << " " << connector << ")";
return s.str();
}
@ -83,6 +84,7 @@ Controller::sessIndex::sessIndex(IPC::statExchange & data){
streamName = data.streamName();
connector = data.connector();
crc = data.crc();
ID = data.getSessId();
}
@ -136,17 +138,7 @@ void Controller::sessions_invalidate(const std::string & streamname){
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->first.streamName == streamname){
sessCount++;
it->second.sync = 1;
if (it->second.curConns.size()){
for (std::map<unsigned long, statStorage>::iterator jt = it->second.curConns.begin(); jt != it->second.curConns.end(); ++jt){
char * data = statPointer->getIndex(jt->first);
if (data){
IPC::statExchange tmpEx(data);
tmpEx.setSync(2);
invalidated++;
}
}
}
invalidated += it->second.invalidate();
}
}
INFO_MSG("Invalidated %u connections in %u sessions for stream %s", invalidated, sessCount, streamname.c_str());
@ -168,6 +160,59 @@ void Controller::sessions_shutdown(JSON::Iter & i){
//not handled, ignore
}
///Shuts down the given session
void Controller::sessId_shutdown(const std::string & sessId){
if (!statPointer){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return;
}
unsigned int murdered = 0;
unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex);
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->first.ID == sessId){
sessCount++;
murdered += it->second.kill();
break;
}
}
INFO_MSG("Shut down %u connections in %u session(s) for ID %s", murdered, sessCount, sessId.c_str());
}
///Tags the given session
void Controller::sessId_tag(const std::string & sessId, const std::string & tag){
if (!statPointer){
FAIL_MSG("In controller shutdown procedure - cannot tag sessions.");
return;
}
tthread::lock_guard<tthread::mutex> guard(statsMutex);
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->first.ID == sessId){
it->second.tags.insert(tag);
return;
}
}
WARN_MSG("Session %s not found - cannot tag with %s", sessId.c_str(), tag.c_str());
}
///Shuts down sessions with the given tag set
void Controller::tag_shutdown(const std::string & tag){
if (!statPointer){
FAIL_MSG("In controller shutdown procedure - cannot shutdown sessions.");
return;
}
unsigned int murdered = 0;
unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex);
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.tags.count(tag)){
sessCount++;
murdered += it->second.kill();
}
}
INFO_MSG("Shut down %u connections in %u session(s) for tag %s", murdered, sessCount, tag.c_str());
}
///Shuts down all current sessions for the given streamname
void Controller::sessions_shutdown(const std::string & streamname, const std::string & protocol){
if (!statPointer){
@ -178,20 +223,9 @@ void Controller::sessions_shutdown(const std::string & streamname, const std::st
unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex);
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if ((!streamname.size() || it->first.streamName == streamname) && (!protocol.size() || it->first.connector == protocol) && it->second.curConns.size()){
if ((!streamname.size() || it->first.streamName == streamname) && (!protocol.size() || it->first.connector == protocol)){
sessCount++;
for (std::map<unsigned long, statStorage>::iterator jt = it->second.curConns.begin(); jt != it->second.curConns.end(); ++jt){
char * data = statPointer->getIndex(jt->first);
if (data){
IPC::statExchange tmpEx(data);
uint32_t pid = tmpEx.getPID();
if (pid > 1){
Util::Procs::Stop(pid);
INFO_MSG("Killing PID %lu", pid);
murdered++;
}
}
}
murdered += it->second.kill();
}
}
INFO_MSG("Shut down %u connections in %u sessions for stream %s/%s", murdered, sessCount, streamname.c_str(), protocol.c_str());
@ -217,8 +251,10 @@ void Controller::SharedMemStats(void * config){
if (sessions.size()){
std::list<sessIndex> mustWipe;
unsigned long long cutOffPoint = Util::epoch() - STAT_CUTOFF;
unsigned long long disconnectPoint = Util::epoch() - STATS_DELAY;
for (std::map<sessIndex, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
it->second.wipeOld(cutOffPoint);
it->second.ping(it->first, disconnectPoint);
if (!it->second.hasData()){
mustWipe.push_back(it->first);
}else{
@ -273,6 +309,45 @@ void Controller::SharedMemStats(void * config){
}
}
/// Forces a re-sync of the session
uint32_t Controller::statSession::invalidate(){
uint32_t ret = 0;
sync = 1;
if (curConns.size() && statPointer){
for (std::map<unsigned long, statStorage>::iterator jt = curConns.begin(); jt != curConns.end(); ++jt){
char * data = statPointer->getIndex(jt->first);
if (data){
IPC::statExchange tmpEx(data);
tmpEx.setSync(2);
ret++;
}
}
}
return ret;
}
/// Kills all active connections, sets the session state to denied (sync=100).
uint32_t Controller::statSession::kill(){
uint32_t ret = 0;
sync = 100;
if (curConns.size() && statPointer){
for (std::map<unsigned long, statStorage>::iterator jt = curConns.begin(); jt != curConns.end(); ++jt){
char * data = statPointer->getIndex(jt->first);
if (data){
IPC::statExchange tmpEx(data);
tmpEx.setSync(100);
uint32_t pid = tmpEx.getPID();
if (pid > 1){
Util::Procs::Stop(pid);
INFO_MSG("Killing PID %lu", pid);
}
ret++;
}
}
}
return ret;
}
/// Updates the given active connection with new stats data.
void Controller::statSession::update(unsigned long index, IPC::statExchange & data){
//update the sync byte: 0 = requesting fill, 2 = requesting refill, 1 = needs checking, > 1 = state known (100=denied, 10=accepted)
@ -313,14 +388,18 @@ void Controller::statSession::update(unsigned long index, IPC::statExchange & da
long long prevDown = getDown();
long long prevUp = getUp();
curConns[index].update(data);
//store timestamp of last received data, if newer
if (data.now() > lastSec){
lastSec = data.now();
}
//store timestamp of first received data, if older
if (firstSec > data.now()){
firstSec = data.now();
}
//store timestamp of last received data, if newer
if (data.now() > lastSec){
lastSec = data.now();
if (!tracked){
tracked = true;
firstActive = firstSec;
}
}
long long currDown = getDown();
long long currUp = getUp();
if (currUp - prevUp < 0 || currDown-prevDown < 0){
@ -414,6 +493,56 @@ void Controller::statSession::wipeOld(unsigned long long cutOff){
}
}
void Controller::statSession::ping(const Controller::sessIndex & index, unsigned long long disconnectPoint){
if (!tracked){return;}
if (lastSec < disconnectPoint){
if (Controller::accesslog.size()){
uint64_t duration = lastSec - firstActive;
if (duration < 1){duration = 1;}
if (Controller::accesslog == "LOG"){
std::stringstream accessStr;
accessStr << "Session <" << index.ID << "> " << index.streamName << " (" << index.connector << ") from " << index.host << " ended after " << duration << "s, avg " << getUp()/duration/1024 << "KB/s up " << getDown()/duration/1024 << "KB/s down.";
if (tags.size()){
accessStr << " Tags: ";
for (std::set<std::string>::iterator it = tags.begin(); it != tags.end(); ++it){
accessStr << "[" << *it << "]";
}
}
accessStr << std::endl;
Controller::Log("ACCS", accessStr.str());
}else{
static std::ofstream accLogFile;
static std::string accLogFileName;
if (accLogFileName != Controller::accesslog || !accLogFile.good()){
accLogFile.close();
accLogFile.open(Controller::accesslog, std::ios_base::app);
if (!accLogFile.good()){
FAIL_MSG("Could not open access log file '%s': %s", Controller::accesslog.c_str(), strerror(errno));
}else{
accLogFileName = Controller::accesslog;
}
}
if (accLogFile.good()){
time_t rawtime;
struct tm *timeinfo;
char buffer[100];
time(&rawtime);
timeinfo = localtime(&rawtime);
strftime(buffer, 100, "%F %H:%M:%S", timeinfo);
accLogFile << buffer << ", " << index.ID << ", " << index.streamName << ", " << index.connector << ", " << index.host << ", " << duration << ", " << getUp()/duration/1024 << ", " << getDown()/duration/1024 << ", ";
if (tags.size()){
for (std::set<std::string>::iterator it = tags.begin(); it != tags.end(); ++it){
accLogFile << "[" << *it << "]";
}
}
accLogFile << std::endl;
}
}
}
tracked = false;
}
}
/// Archives the given connection.
void Controller::statSession::finish(unsigned long index){
oldConns.push_back(curConns[index]);
@ -422,6 +551,8 @@ void Controller::statSession::finish(unsigned long index){
/// Constructs an empty session
Controller::statSession::statSession(){
firstActive = 0;
tracked = false;
firstSec = 0xFFFFFFFFFFFFFFFFull;
lastSec = 0;
sync = 1;

View file

@ -46,6 +46,7 @@ namespace Controller {
sessIndex(std::string host, unsigned int crc, std::string streamName, std::string connector);
sessIndex(IPC::statExchange & data);
sessIndex();
std::string ID;
std::string host;
unsigned int crc;
std::string streamName;
@ -73,21 +74,27 @@ namespace Controller {
/// Allows for moving of connections to another session.
class statSession {
private:
uint64_t firstActive;
unsigned long long firstSec;
unsigned long long lastSec;
unsigned long long wipedUp;
unsigned long long wipedDown;
std::deque<statStorage> oldConns;
sessType sessionType;
bool tracked;
public:
statSession();
uint32_t invalidate();
uint32_t kill();
char sync;
std::map<unsigned long, statStorage> curConns;
std::set<std::string> tags;
sessType getSessType();
statSession();
void wipeOld(unsigned long long);
void finish(unsigned long index);
void switchOverTo(statSession & newSess, unsigned long index);
void update(unsigned long index, IPC::statExchange & data);
void ping(const sessIndex & index, unsigned long long disconnectPoint);
unsigned long long getStart();
unsigned long long getEnd();
bool isViewerOn(unsigned long long time);
@ -118,6 +125,9 @@ namespace Controller {
void SharedMemStats(void * config);
void sessions_invalidate(const std::string & streamname);
void sessions_shutdown(JSON::Iter & i);
void sessId_shutdown(const std::string & sessId);
void tag_shutdown(const std::string & tag);
void sessId_tag(const std::string & sessId, const std::string & tag);
void sessions_shutdown(const std::string & streamname, const std::string & protocol = "");
bool hasViewers(std::string streamName);

View file

@ -13,6 +13,8 @@
///\brief Holds everything unique to the controller.
namespace Controller{
std::string instanceId; /// instanceId (previously uniqId) is first set in controller.cpp before licensing or update calls.
std::string prometheus;
std::string accesslog;
Util::Config conf;
JSON::Value Storage; ///< Global storage of data.
tthread::mutex configMutex;

View file

@ -4,7 +4,9 @@
#include <mist/tinythread.h>
namespace Controller {
extern std::string instanceId; ///global storage of instanceId (previously uniqID) for updater
extern std::string instanceId; ///<global storage of instanceId (previously uniqID) for updater
extern std::string prometheus; ///< Prometheus access string
extern std::string accesslog; ///< Where to write the access log
extern Util::Config conf;///< Global storage of configuration.
extern JSON::Value Storage; ///< Global storage of data.
extern tthread::mutex logMutex;///< Mutex for log thread.

View file

@ -133,22 +133,6 @@ namespace Mist{
myConn.close();
}
/// \triggers
/// The `"CONN_PLAY"` trigger is stream-specific, and is ran when an active connection first opens a stream. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// connected client host
/// output handler name
/// request URL (if any)
/// ~~~~~~~~~~~~~~~
/// The `"USER_NEW"` trigger is stream-specific, and is ran when a new user first opens a stream. Segmented protcols are unduplicated over the duration of the statistics log (~10 minutes), true streaming protocols (RTMP, RTSP) are not deduplicated as no duplication ever takes place. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// connected client host
/// User agent checksum (CRC32 of User-agent string)
/// output handler name
/// request URL (if any)
/// ~~~~~~~~~~~~~~~
void Output::initialize(){
if (isInitialized){
return;
@ -245,7 +229,7 @@ namespace Mist{
HIGH_MSG("USER_NEW sync achieved: %u", (unsigned int)tmpEx.getSync());
//1 = check requested (connection is new)
if (tmpEx.getSync() == 1){
std::string payload = streamName+"\n" + getConnectedHost() +"\n" + JSON::Value((long long)crc).asString() + "\n"+capa["name"].asStringRef()+"\n"+reqUrl;
std::string payload = streamName+"\n" + getConnectedHost() +"\n" + JSON::Value((long long)crc).asString() + "\n"+capa["name"].asStringRef()+"\n"+reqUrl+"\n"+tmpEx.getSessId();
if (!Triggers::doTrigger("USER_NEW", payload, streamName)){
MEDIUM_MSG("Closing connection because denied by USER_NEW trigger");
onFinish();