Sessions rework

This commit is contained in:
Ramkoemar 2021-10-18 14:29:13 +02:00 committed by Thulinma
parent 3e85da2afd
commit 074e757028
27 changed files with 1222 additions and 1183 deletions

View file

@ -306,7 +306,6 @@ int main_loop(int argc, char **argv){
if (Controller::Storage["config"].isMember("accesslog")){
Controller::conf.getOption("accesslog", true)[0u] = Controller::Storage["config"]["accesslog"];
}
Controller::maxConnsPerIP = Controller::conf.getInteger("maxconnsperip");
Controller::Storage["config"]["prometheus"] = Controller::conf.getString("prometheus");
Controller::Storage["config"]["accesslog"] = Controller::conf.getString("accesslog");
Controller::normalizeTrustedProxies(Controller::Storage["config"]["trustedproxy"]);

View file

@ -594,6 +594,7 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
out["prometheus"] = in["prometheus"];
Controller::prometheus = out["prometheus"].asStringRef();
}
if (in.isMember("sessionMode")){out["sessionMode"] = in["sessionMode"];}
if (in.isMember("defaultStream")){out["defaultStream"] = in["defaultStream"];}
if (in.isMember("location") && in["location"].isObject()){
out["location"]["lat"] = in["location"]["lat"].asDouble();

File diff suppressed because it is too large Load diff

View file

@ -18,8 +18,7 @@
namespace Controller{
extern bool killOnExit;
extern unsigned int maxConnsPerIP;
/// This function is ran whenever a stream becomes active.
void streamStarted(std::string stream);
/// This function is ran whenever a stream becomes inactive.
@ -35,34 +34,14 @@ namespace Controller{
uint64_t pktCount;
uint64_t pktLost;
uint64_t pktRetransmit;
std::string connectors;
};
enum sessType{SESS_UNSET = 0, SESS_INPUT, SESS_OUTPUT, SESS_VIEWER};
/// This is a comparison and storage class that keeps sessions apart from each other.
/// Whenever two of these objects are not equal, it will create a new session.
class sessIndex{
public:
sessIndex();
sessIndex(const Comms::Statistics &statComm, size_t id);
std::string ID;
std::string host;
unsigned int crc;
std::string streamName;
std::string connector;
bool operator==(const sessIndex &o) const;
bool operator!=(const sessIndex &o) const;
bool operator>(const sessIndex &o) const;
bool operator<=(const sessIndex &o) const;
bool operator<(const sessIndex &o) const;
bool operator>=(const sessIndex &o) const;
std::string toStr();
};
class statStorage{
public:
void update(Comms::Statistics &statComm, size_t index);
void update(Comms::Sessions &statComm, size_t index);
bool hasDataFor(unsigned long long);
statLog &getDataFor(unsigned long long);
std::map<unsigned long long, statLog> log;
@ -75,36 +54,33 @@ namespace Controller{
uint64_t firstActive;
uint64_t firstSec;
uint64_t lastSec;
uint64_t wipedUp;
uint64_t wipedDown;
uint64_t wipedPktCount;
uint64_t wipedPktLost;
uint64_t wipedPktRetransmit;
std::deque<statStorage> oldConns;
sessType sessionType;
bool tracked;
uint8_t noBWCount; ///< Set to 2 when not to count for external bandwidth
std::string streamName;
std::string host;
std::string curConnector;
std::string sessId;
public:
statSession();
uint32_t invalidate();
uint32_t kill();
char sync;
std::map<uint64_t, statStorage> curConns;
~statSession();
statStorage curData;
std::set<std::string> tags;
sessType getSessType();
void wipeOld(uint64_t);
void finish(uint64_t index);
void switchOverTo(statSession &newSess, uint64_t index);
void update(uint64_t index, Comms::Statistics &data);
void dropSession(const sessIndex &index);
void update(uint64_t index, Comms::Sessions &data);
uint64_t getStart();
uint64_t getEnd();
bool isViewerOn(uint64_t time);
bool isConnected();
bool isTracked();
bool hasDataFor(uint64_t time);
bool hasData();
std::string getStreamName();
std::string getHost();
std::string getSessId();
std::string getCurrentProtocols();
uint64_t newestDataPoint();
uint64_t getConnTime(uint64_t time);
uint64_t getConnTime();
uint64_t getLastSecond(uint64_t time);
uint64_t getDown(uint64_t time);
uint64_t getUp();
@ -122,8 +98,6 @@ namespace Controller{
uint64_t getBpsUp(uint64_t start, uint64_t end);
};
extern std::map<sessIndex, statSession> sessions;
extern std::map<unsigned long, sessIndex> connToSession;
extern tthread::mutex statsMutex;
extern uint64_t statDropoff;
@ -155,6 +129,7 @@ namespace Controller{
void sessions_shutdown(const std::string &streamname, const std::string &protocol = "");
bool hasViewers(std::string streamName);
void writeSessionCache(); /*LTS*/
void killConnections(std::string sessId);
#define PROMETHEUS_TEXT 0
#define PROMETHEUS_JSON 1

View file

@ -91,19 +91,6 @@ namespace Controller{
rlxAccs->setString("tags", tags, newEndPos);
rlxAccs->setEndPos(newEndPos + 1);
}
if (Triggers::shouldTrigger("USER_END", strm)){
std::stringstream plgen;
plgen << sessId << "\n"
<< strm << "\n"
<< conn << "\n"
<< host << "\n"
<< duration << "\n"
<< up << "\n"
<< down << "\n"
<< tags;
std::string payload = plgen.str();
Triggers::doTrigger("USER_END", payload, strm);
}
}
void normalizeTrustedProxies(JSON::Value &tp){
@ -450,7 +437,8 @@ namespace Controller{
systemBoot = globAccX.getInt("systemBoot");
}
if(!globAccX.getFieldAccX("defaultStream")
|| !globAccX.getFieldAccX("systemBoot")){
|| !globAccX.getFieldAccX("systemBoot")
|| !globAccX.getFieldAccX("sessionMode")){
globAccX.setReload();
globCfg.master = true;
globCfg.close();
@ -461,12 +449,16 @@ namespace Controller{
if (!globAccX.isReady()){
globAccX.addField("defaultStream", RAX_128STRING);
globAccX.addField("systemBoot", RAX_64UINT);
globAccX.addField("sessionMode", RAX_64UINT);
if (!Storage["config"]["sessionMode"]){
Storage["config"]["sessionMode"] = SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID;
}
globAccX.setRCount(1);
globAccX.setEndPos(1);
globAccX.setReady();
}
globAccX.setString("defaultStream", Storage["config"]["defaultStream"].asStringRef());
globAccX.setInt("systemBoot", systemBoot);
globAccX.setInt("sessionMode", Storage["config"]["sessionMode"].asInt());
globCfg.master = false; // leave the page after closing
}
}

View file

@ -794,7 +794,7 @@ namespace Mist{
void Input::streamMainLoop(){
uint64_t statTimer = 0;
uint64_t startTime = Util::bootSecs();
Comms::Statistics statComm;
Comms::Connections statComm;
getNext();
if (thisPacket && !userSelect.count(thisIdx)){
userSelect[thisIdx].reload(streamName, thisIdx, COMM_STATUS_ACTIVE | COMM_STATUS_SOURCE | COMM_STATUS_DONOTTRACK);
@ -820,7 +820,7 @@ namespace Mist{
if (Util::bootSecs() - statTimer > 1){
// Connect to stats for INPUT detection
if (!statComm){statComm.reload();}
if (!statComm){statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);}
if (statComm){
if (!statComm){
config->is_active = false;
@ -829,7 +829,6 @@ namespace Mist{
}
uint64_t now = Util::bootSecs();
statComm.setNow(now);
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setTime(now - startTime);
@ -842,7 +841,7 @@ namespace Mist{
}
}
void Input::connStats(Comms::Statistics &statComm){
void Input::connStats(Comms::Connections &statComm){
statComm.setUp(0);
statComm.setDown(streamByteCount());
statComm.setHost(getConnectedBinHost());
@ -853,7 +852,7 @@ namespace Mist{
uint64_t statTimer = 0;
uint64_t startTime = Util::bootSecs();
size_t idx;
Comms::Statistics statComm;
Comms::Connections statComm;
DTSC::Meta liveMeta(config->getString("streamname"), false);
@ -985,7 +984,7 @@ namespace Mist{
if (Util::bootSecs() - statTimer > 1){
// Connect to stats for INPUT detection
if (!statComm){statComm.reload();}
if (!statComm){statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);}
if (statComm){
if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
config->is_active = false;
@ -994,7 +993,6 @@ namespace Mist{
}
uint64_t now = Util::bootSecs();
statComm.setNow(now);
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setTime(now - startTime);

View file

@ -69,7 +69,7 @@ namespace Mist{
virtual void userOnActive(size_t id);
virtual void userOnDisconnect(size_t id);
virtual void userLeadOut();
virtual void connStats(Comms::Statistics & statComm);
virtual void connStats(Comms::Connections & statComm);
virtual void parseHeader();
bool bufferFrame(size_t track, uint32_t keyNum);

View file

@ -196,7 +196,7 @@ namespace Mist{
}
void InputRTSP::streamMainLoop(){
Comms::Statistics statComm;
Comms::Connections statComm;
uint64_t startTime = Util::epoch();
uint64_t lastPing = Util::bootSecs();
uint64_t lastSecs = 0;
@ -210,7 +210,7 @@ namespace Mist{
if (lastSecs != currSecs){
lastSecs = currSecs;
// Connect to stats for INPUT detection
statComm.reload();
statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);
if (statComm){
if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
config->is_active = false;
@ -219,7 +219,6 @@ namespace Mist{
}
uint64_t now = Util::bootSecs();
statComm.setNow(now);
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setUp(tcpCon.dataUp());

View file

@ -193,7 +193,7 @@ namespace Mist{
// Updates stats and quits if parsePacket returns false
void InputSDP::streamMainLoop(){
Comms::Statistics statComm;
Comms::Connections statComm;
uint64_t startTime = Util::epoch();
uint64_t lastSecs = 0;
// Get RTP packets from UDP socket and stop if this fails
@ -202,7 +202,7 @@ namespace Mist{
if (lastSecs != currSecs){
lastSecs = currSecs;
// Connect to stats for INPUT detection
statComm.reload();
statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);
if (statComm){
if (statComm.getStatus() == COMM_STATUS_REQDISCONNECT){
config->is_active = false;
@ -211,7 +211,6 @@ namespace Mist{
}
uint64_t now = Util::bootSecs();
statComm.setNow(now);
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setDown(bytesRead);

View file

@ -527,7 +527,7 @@ namespace Mist{
void inputTS::streamMainLoop(){
meta.removeTrack(tmpIdx);
INFO_MSG("Removed temptrack %zu", tmpIdx);
Comms::Statistics statComm;
Comms::Connections statComm;
uint64_t downCounter = 0;
uint64_t startTime = Util::bootSecs();
uint64_t noDataSince = Util::bootSecs();
@ -621,7 +621,7 @@ namespace Mist{
// Check for and spawn threads here.
if (Util::bootSecs() - threadCheckTimer > 1){
// Connect to stats for INPUT detection
statComm.reload();
statComm.reload(streamName, "", JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "", SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID);
if (statComm){
if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
config->is_active = false;
@ -630,7 +630,6 @@ namespace Mist{
}
uint64_t now = Util::bootSecs();
statComm.setNow(now);
statComm.setCRC(getpid());
statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setUp(0);

View file

@ -282,7 +282,7 @@ namespace Mist{
void inputTSSRT::setSingular(bool newSingular){singularFlag = newSingular;}
void inputTSSRT::connStats(Comms::Statistics &statComm){
void inputTSSRT::connStats(Comms::Connections &statComm){
statComm.setUp(srtConn.dataUp());
statComm.setDown(srtConn.dataDown());
statComm.setHost(getConnectedBinHost());

View file

@ -40,7 +40,7 @@ namespace Mist{
Socket::SRTConnection srtConn;
bool singularFlag;
virtual void connStats(Comms::Statistics &statComm);
virtual void connStats(Comms::Connections &statComm);
Util::ResizeablePointer rawBuffer;
size_t rawIdx;

View file

@ -7,7 +7,7 @@
#include <sys/wait.h>
#include <unistd.h>
#include "output.h"
#include "output.h"
#include <mist/bitfields.h>
#include <mist/defines.h>
#include <mist/h264.h>
@ -92,7 +92,7 @@ namespace Mist{
firstTime = 0;
firstPacketTime = 0xFFFFFFFFFFFFFFFFull;
lastPacketTime = 0;
crc = getpid();
sid = "";
parseData = false;
wantRequest = true;
sought = false;
@ -100,7 +100,7 @@ namespace Mist{
isBlocking = false;
needsLookAhead = 0;
extraKeepAway = 0;
lastStats = 0;
lastStats = 0xFFFFFFFFFFFFFFFFull;
maxSkipAhead = 7500;
uaDelay = 10;
realTime = 1000;
@ -111,6 +111,7 @@ namespace Mist{
lastPushUpdate = 0;
previousFile = "";
currentFile = "";
sessionMode = 0xFFFFFFFFFFFFFFFFull;
lastRecv = Util::bootSecs();
if (myConn){
@ -211,95 +212,9 @@ namespace Mist{
onFail("Not allowed to play (CONN_PLAY)");
}
}
doSync(true);
/*LTS-END*/
}
/// If called with force set to true and a USER_NEW trigger enabled, forces a sync immediately.
/// Otherwise, does nothing unless the sync byte is set to 2, in which case it forces a sync as well.
/// May be called recursively because it calls stats() which calls this function.
/// If this happens, the extra calls to the function return instantly.
void Output::doSync(bool force){
if (!statComm){return;}
if (recursingSync){return;}
recursingSync = true;
if (statComm.getSync() == 2 || force){
if (getStatsName() == capa["name"].asStringRef() && Triggers::shouldTrigger("USER_NEW", streamName)){
// sync byte 0 = no sync yet, wait for sync from controller...
char initialSync = 0;
// attempt to load sync status from session cache in shm
{
IPC::semaphore cacheLock(SEM_SESSCACHE, O_RDWR, ACCESSPERMS, 16);
if (cacheLock){cacheLock.wait();}
IPC::sharedPage shmSessions(SHM_SESSIONS, SHM_SESSIONS_SIZE, false, false);
if (shmSessions.mapped){
char shmEmpty[SHM_SESSIONS_ITEM];
memset(shmEmpty, 0, SHM_SESSIONS_ITEM);
std::string host;
Socket::hostBytesToStr(statComm.getHost().data(), 16, host);
uint32_t shmOffset = 0;
const std::string &cName = capa["name"].asStringRef();
while (shmOffset + SHM_SESSIONS_ITEM < SHM_SESSIONS_SIZE){
// compare crc
if (*(uint32_t*)(shmSessions.mapped + shmOffset) == crc){
// compare stream name
if (strncmp(shmSessions.mapped + shmOffset + 4, streamName.c_str(), 100) == 0){
// compare connector
if (strncmp(shmSessions.mapped + shmOffset + 104, cName.c_str(), 20) == 0){
// compare host
if (strncmp(shmSessions.mapped + shmOffset + 124, host.c_str(), 40) == 0){
initialSync = shmSessions.mapped[shmOffset + 164];
HIGH_MSG("Instant-sync from session cache to %u", (unsigned int)initialSync);
break;
}
}
}
}
// stop if we reached the end
if (memcmp(shmSessions.mapped + shmOffset, shmEmpty, SHM_SESSIONS_ITEM) == 0){
break;
}
shmOffset += SHM_SESSIONS_ITEM;
}
}
if (cacheLock){cacheLock.post();}
}
unsigned int i = 0;
statComm.setSync(initialSync);
// wait max 10 seconds for sync
while ((!statComm.getSync() || statComm.getSync() == 2) && i++ < 100){
Util::wait(100);
stats(true);
}
HIGH_MSG("USER_NEW sync achieved: %u", statComm.getSync());
// 1 = check requested (connection is new)
if (statComm.getSync() == 1){
std::string payload = streamName + "\n" + getConnectedHost() + "\n" +
JSON::Value(crc).asString() + "\n" + capa["name"].asStringRef() +
"\n" + reqUrl + "\n" + statComm.getSessId();
if (!Triggers::doTrigger("USER_NEW", payload, streamName)){
onFail("Not allowed to play (USER_NEW)");
statComm.setSync(100); // 100 = denied
}else{
statComm.setSync(10); // 10 = accepted
}
}
// 100 = denied
if (statComm.getSync() == 100){onFail("Not allowed to play (USER_NEW cache)");}
if (statComm.getSync() == 0){
onFail("Not allowed to play (USER_NEW init timeout)", true);
}
if (statComm.getSync() == 2){
onFail("Not allowed to play (USER_NEW re-init timeout)", true);
}
// anything else = accepted
}else{
statComm.setSync(10); // auto-accept if no trigger
}
}
recursingSync = false;
}
std::string Output::getConnectedHost(){return myConn.getHost();}
std::string Output::getConnectedBinHost(){
@ -433,10 +348,10 @@ namespace Mist{
//Connect to stats reporting, if not connected already
if (!statComm){
statComm.reload();
statComm.reload(streamName, getConnectedHost(), sid, capa["name"].asStringRef(), reqUrl, sessionMode);
stats(true);
}
//push inputs do not need to wait for stream to be ready for playback
if (isPushing()){return;}
@ -986,7 +901,7 @@ namespace Mist{
INFO_MSG("Will split recording every %lld seconds", atoll(targetParams["split"].c_str()));
targetParams["nxt-split"] = JSON::Value((int64_t)(seekPos + endRec)).asString();
}
// Duration to record in seconds. Overrides recstop.
// Duration to record in seconds. Oversides recstop.
if (targetParams.count("duration")){
long long endRec = atoll(targetParams["duration"].c_str()) * 1000;
targetParams["recstop"] = JSON::Value((int64_t)(seekPos + endRec)).asString();
@ -1301,6 +1216,7 @@ namespace Mist{
/// request URL (if any)
/// ~~~~~~~~~~~~~~~
int Output::run(){
sessionMode = Util::getGlobalConfig("sessionMode").asInt();
/*LTS-START*/
// Connect to file target, if needed
if (isFileTarget()){
@ -1507,6 +1423,8 @@ namespace Mist{
/*LTS-END*/
disconnect();
stats(true);
userSelect.clear();
myConn.close();
return 0;
}
@ -1822,7 +1740,7 @@ namespace Mist{
// also cancel if it has been less than a second since the last update
// unless force is set to true
uint64_t now = Util::bootSecs();
if (now == lastStats && !force){return;}
if (now <= lastStats && !force){return;}
if (isRecording()){
if(lastPushUpdate == 0){
@ -1861,13 +1779,17 @@ namespace Mist{
}
}
if (!statComm){statComm.reload();}
if (!statComm){return;}
if (!statComm){statComm.reload(streamName, getConnectedHost(), sid, capa["name"].asStringRef(), reqUrl, sessionMode);}
if (!statComm){return;}
if (statComm.getExit()){
onFail("Shutting down since this session is not allowed to view this stream");
return;
}
lastStats = now;
VERYHIGH_MSG("Writing stats: %s, %s, %u, %" PRIu64 ", %" PRIu64, getConnectedHost().c_str(), streamName.c_str(),
crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown());
VERYHIGH_MSG("Writing stats: %s, %s, %s, %" PRIu64 ", %" PRIu64, getConnectedHost().c_str(), streamName.c_str(),
sid.c_str(), myConn.dataUp(), myConn.dataDown());
/*LTS-START*/
if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
onFail("Shutting down on controller request");
@ -1875,9 +1797,6 @@ namespace Mist{
}
/*LTS-END*/
statComm.setNow(now);
statComm.setHost(getConnectedBinHost());
statComm.setCRC(crc);
statComm.setStream(streamName);
statComm.setConnector(getStatsName());
connStats(now, statComm);
statComm.setLastSecond(thisPacket ? thisPacket.getTime() : 0);
@ -1887,7 +1806,7 @@ namespace Mist{
// Tag the session with the user agent
if (newUA && ((now - myConn.connTime()) >= uaDelay || !myConn) && UA.size()){
std::string APIcall =
"{\"tag_sessid\":{\"" + statComm.getSessId() + "\":" + JSON::string_escape("UA:" + UA) + "}}";
"{\"tag_sessid\":{\"" + statComm.sessionId + "\":" + JSON::string_escape("UA:" + UA) + "}}";
Socket::UDPConnection uSock;
uSock.SetDestination(UDP_API_HOST, UDP_API_PORT);
uSock.SendNow(APIcall);
@ -1895,8 +1814,6 @@ namespace Mist{
}
/*LTS-END*/
doSync();
if (isPushing()){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (it->second.getStatus() & COMM_STATUS_REQDISCONNECT){
@ -1912,7 +1829,7 @@ namespace Mist{
}
}
void Output::connStats(uint64_t now, Comms::Statistics &statComm){
void Output::connStats(uint64_t now, Comms::Connections &statComm){
statComm.setUp(myConn.dataUp());
statComm.setDown(myConn.dataDown());
statComm.setTime(now - myConn.connTime());

View file

@ -86,7 +86,6 @@ namespace Mist{
std::string hostLookup(std::string ip);
bool onList(std::string ip, std::string list);
std::string getCountry(std::string ip);
void doSync(bool force = false);
/*LTS-END*/
std::map<size_t, uint32_t> currentPage;
void loadPageForKey(size_t trackId, size_t keyNum);
@ -105,6 +104,7 @@ namespace Mist{
bool firstData;
uint64_t lastPushUpdate;
bool newUA;
protected: // these are to be messed with by child classes
virtual bool inlineRestartCapable() const{
return false;
@ -122,15 +122,16 @@ namespace Mist{
virtual std::string getStatsName();
virtual bool hasSessionIDs(){return false;}
virtual void connStats(uint64_t now, Comms::Statistics &statComm);
virtual void connStats(uint64_t now, Comms::Connections &statComm);
std::set<size_t> getSupportedTracks(const std::string &type = "") const;
inline virtual bool keepGoing(){return config->is_active && myConn;}
Comms::Statistics statComm;
Comms::Connections statComm;
bool isBlocking; ///< If true, indicates that myConn is blocking.
uint32_t crc; ///< Checksum, if any, for usage in the stats.
std::string sid; ///< Random identifier used to split connections into sessions
uint64_t sessionMode;
uint64_t nextKeyTime();
// stream delaying variables

View file

@ -101,7 +101,7 @@ namespace Mist{
}
}
void OutCMAF::connStats(uint64_t now, Comms::Statistics &statComm){
void OutCMAF::connStats(uint64_t now, Comms::Connections &statComm){
// For non-push usage, call usual function.
if (!isRecording()){
Output::connStats(now, statComm);

View file

@ -39,7 +39,7 @@ namespace Mist{
bool isReadyForPlay();
protected:
virtual void connStats(uint64_t now, Comms::Statistics &statComm);
virtual void connStats(uint64_t now, Comms::Connections &statComm);
void onTrackEnd(size_t idx);
bool hasSessionIDs(){return !config->getBool("mergesessions");}
@ -72,6 +72,7 @@ namespace Mist{
void startPushOut();
void pushNext();
uint32_t crc;
HTTP::URL pushUrl;
std::map<size_t, CMAFPushTrack> pushTracks;
void setupTrackObject(size_t idx);

View file

@ -55,7 +55,6 @@ namespace Mist{
}
void HTTPOutput::onFail(const std::string &msg, bool critical){
INFO_MSG("Failing '%s': %s", H.url.c_str(), msg.c_str());
if (!webSock && !isRecording() && !responded){
H.Clean(); // make sure no parts of old requests are left in any buffers
H.SetHeader("Server", APPIDENT);
@ -238,18 +237,6 @@ namespace Mist{
}
/*LTS-END*/
if (H.hasHeader("User-Agent")){UA = H.GetHeader("User-Agent");}
if (hasSessionIDs()){
if (H.GetVar("sessId").size()){
std::string ua = H.GetVar("sessId");
crc = checksum::crc32(0, ua.data(), ua.size());
}else{
std::string ua = JSON::Value(getpid()).asString();
crc = checksum::crc32(0, ua.data(), ua.size());
}
}else{
std::string mixed_ua = UA + H.GetHeader("X-Playback-Session-Id");
crc = checksum::crc32(0, mixed_ua.data(), mixed_ua.size());
}
if (H.GetVar("audio") != ""){targetParams["audio"] = H.GetVar("audio");}
if (H.GetVar("video") != ""){targetParams["video"] = H.GetVar("video");}
@ -281,6 +268,21 @@ namespace Mist{
realTime = 0;
}
}
// Get session ID cookie or generate a random one if it wasn't set
if (!sid.size()){
std::map<std::string, std::string> storage;
const std::string koekjes = H.GetHeader("Cookie");
HTTP::parseVars(koekjes, storage);
if (storage.count("sid")){
// Get sid cookie, which is used to divide connections into sessions
sid = storage.at("sid");
}else{
// Else generate one
const std::string newSid = UA + JSON::Value(getpid()).asString();
sid = JSON::Value(checksum::crc32(0, newSid.data(), newSid.size())).asString();
H.SetHeader("sid", sid.c_str());
}
}
// Handle upgrade to websocket if the output supports it
std::string upgradeHeader = H.GetHeader("Upgrade");
Util::stringToLower(upgradeHeader);

View file

@ -344,7 +344,7 @@ namespace Mist{
}
}
void OutTSSRT::connStats(uint64_t now, Comms::Statistics &statComm){
void OutTSSRT::connStats(uint64_t now, Comms::Connections &statComm){
if (!srtConn){return;}
statComm.setUp(srtConn.dataUp());
statComm.setDown(srtConn.dataDown());

View file

@ -15,7 +15,7 @@ namespace Mist{
bool isReadyForPlay(){return true;}
virtual void requestHandler();
protected:
virtual void connStats(uint64_t now, Comms::Statistics &statComm);
virtual void connStats(uint64_t now, Comms::Connections &statComm);
virtual std::string getConnectedHost(){return srtConn.remotehost;}
virtual std::string getConnectedBinHost(){return srtConn.getBinHost();}

View file

@ -1015,7 +1015,7 @@ namespace Mist{
}
}
void OutWebRTC::connStats(uint64_t now, Comms::Statistics &statComm){
void OutWebRTC::connStats(uint64_t now, Comms::Connections &statComm){
statComm.setUp(myConn.dataUp());
statComm.setDown(myConn.dataDown());
statComm.setPacketCount(totalPkts);

View file

@ -144,7 +144,7 @@ namespace Mist{
void onDTSCConverterHasInitData(const size_t trackID, const std::string &initData);
void onRTPPacketizerHasRTPPacket(const char *data, size_t nbytes);
void onRTPPacketizerHasRTCPPacket(const char *data, uint32_t nbytes);
virtual void connStats(uint64_t now, Comms::Statistics &statComm);
virtual void connStats(uint64_t now, Comms::Connections &statComm);
private:
uint64_t lastRecv;

View file

@ -72,7 +72,7 @@ namespace Mist{
}
bool needsLock(){return false;}
bool isSingular(){return false;}
void connStats(Comms::Statistics &statComm){
void connStats(Comms::Connections &statComm){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (it->second){it->second.setStatus(COMM_STATUS_DONOTTRACK | it->second.getStatus());}
}
@ -117,7 +117,7 @@ namespace Mist{
realTime = 0;
OutEBML::sendHeader();
};
void connStats(uint64_t now, Comms::Statistics &statComm){
void connStats(uint64_t now, Comms::Connections &statComm){
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
if (it->second){it->second.setStatus(COMM_STATUS_DONOTTRACK | it->second.getStatus());}
}

367
src/session.cpp Normal file
View file

@ -0,0 +1,367 @@
#include <mist/defines.h>
#include <mist/stream.h>
#include <mist/util.h>
#include <mist/config.h>
#include <mist/auth.h>
#include <mist/comms.h>
#include <mist/triggers.h>
#include <signal.h>
#include <stdio.h>
// Stats of connections which have closed are added to these global counters
uint64_t globalNow = 0;
uint64_t globalTime = 0;
uint64_t globalDown = 0;
uint64_t globalUp = 0;
uint64_t globalPktcount = 0;
uint64_t globalPktloss = 0;
uint64_t globalPktretrans = 0;
// Counts the duration a connector has been active
std::map<std::string, uint64_t> connectorCount;
std::map<std::string, uint64_t> connectorLastActive;
// Set to True when a session gets invalidated, so that we know to run a new USER_NEW trigger
bool forceTrigger = false;
void handleSignal(int signum){
if (signum == SIGUSR1){
forceTrigger = true;
}
}
void userOnActive(uint64_t &connections){
++connections;
}
std::string getEnvWithDefault(const std::string variableName, const std::string defaultValue){
const char* value = getenv(variableName.c_str());
if (value){
unsetenv(variableName.c_str());
return value;
}else{
return defaultValue;
}
}
/// \brief Adds stats of closed connections to global counters
void userOnDisconnect(Comms::Connections & connections, size_t idx){
std::string thisConnector = connections.getConnector(idx);
if (thisConnector != ""){
connectorCount[thisConnector] += connections.getTime(idx);
}
globalTime += connections.getTime(idx);
globalDown += connections.getDown(idx);
globalUp += connections.getUp(idx);
globalPktcount += connections.getPacketCount(idx);
globalPktloss += connections.getPacketLostCount(idx);
globalPktretrans += connections.getPacketRetransmitCount(idx);
}
int main(int argc, char **argv){
Comms::Connections connections;
Comms::Sessions sessions;
uint64_t lastSeen = Util::bootSecs();
uint64_t currentConnections = 0;
Util::redirectLogsIfNeeded();
signal(SIGUSR1, handleSignal);
// Init config and parse arguments
Util::Config config = Util::Config("MistSession");
JSON::Value option;
option.null();
option["arg_num"] = 1;
option["arg"] = "string";
option["help"] = "Session identifier of the entire session";
option["default"] = "";
config.addOption("sessionid", option);
option.null();
option["long"] = "sessionmode";
option["short"] = "m";
option["arg"] = "integer";
option["default"] = 0;
config.addOption("sessionmode", option);
option.null();
option["long"] = "streamname";
option["short"] = "n";
option["arg"] = "string";
option["default"] = "";
config.addOption("streamname", option);
option.null();
option["long"] = "ip";
option["short"] = "i";
option["arg"] = "string";
option["default"] = "";
config.addOption("ip", option);
option.null();
option["long"] = "sid";
option["short"] = "s";
option["arg"] = "string";
option["default"] = "";
config.addOption("sid", option);
option.null();
option["long"] = "protocol";
option["short"] = "p";
option["arg"] = "string";
option["default"] = "";
config.addOption("protocol", option);
option.null();
option["long"] = "requrl";
option["short"] = "r";
option["arg"] = "string";
option["default"] = "";
config.addOption("requrl", option);
config.activate();
if (!(config.parseArgs(argc, argv))){
FAIL_MSG("Cannot start a new session due to invalid arguments");
return 1;
}
const uint64_t bootTime = Util::getMicros();
// Get session ID, session mode and other variables used as payload for the USER_NEW and USER_END triggers
const std::string thisStreamName = config.getString("streamname");
const std::string thisHost = config.getString("ip");
const std::string thisSid = config.getString("sid");
const std::string thisProtocol = config.getString("protocol");
const std::string thisReqUrl = config.getString("requrl");
const std::string thisSessionId = config.getString("sessionid");
const uint64_t sessionMode = config.getInteger("sessionmode");
if (thisSessionId == "" || thisProtocol == "" || thisStreamName == ""){
FAIL_MSG("Given the following incomplete arguments: SessionId: '%s', protocol: '%s', stream name: '%s'. Aborting opening a new session",
thisSessionId.c_str(), thisProtocol.c_str(), thisStreamName.c_str());
return 1;
}
MEDIUM_MSG("Starting a new session for sessionId '%s'", thisSessionId.c_str());
if (sessionMode < 1 || sessionMode > 15) {
FAIL_MSG("Invalid session mode of value %lu. Should be larger than 0 and smaller than 16", sessionMode);
return 1;
}
// Try to lock to ensure we are the only process initialising this session
IPC::semaphore sessionLock;
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, thisSessionId.c_str());
sessionLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
// If the lock fails, the previous Session process must've failed in spectacular fashion
// It's the Controller's task to clean everything up. When the lock fails, this cleanup hasn't happened yet
if (!sessionLock.tryWaitOneSecond()){
FAIL_MSG("Session '%s' already locked", thisSessionId.c_str());
return 1;
}
// Check if a page already exists for this session ID. If so, quit
{
IPC::sharedPage dataPage;
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, thisSessionId.c_str());
dataPage.init(userPageName, 0, false, false);
if (dataPage){
INFO_MSG("Session '%s' already has a running process", thisSessionId.c_str());
sessionLock.post();
return 0;
}
}
// Claim a spot in shared memory for this session on the global statistics page
sessions.reload();
if (!sessions){
FAIL_MSG("Unable to register entry for session '%s' on the stats page", thisSessionId.c_str());
sessionLock.post();
return 1;
}
// Open the shared memory page containing statistics for each individual connection in this session
connections.reload(thisStreamName, thisHost, thisSid, thisProtocol, thisReqUrl, sessionMode, true, false);
// Initialise global session data
sessions.setHost(thisHost);
sessions.setSessId(thisSessionId);
sessions.setStream(thisStreamName);
sessionLock.post();
// Determine session type, since triggers only get run for viewer type sessions
uint64_t thisType = 0;
if (thisSessionId[0] == 'I'){
INFO_MSG("Started new input session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime));
thisType = 1;
}
else if (thisSessionId[0] == 'O'){
INFO_MSG("Started new output session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime));
thisType = 2;
}
else{
INFO_MSG("Started new viewer session %s in %lu microseconds", thisSessionId.c_str(), Util::getMicros(bootTime));
}
// Do a USER_NEW trigger if it is defined for this stream
if (!thisType && Triggers::shouldTrigger("USER_NEW", thisStreamName)){
std::string payload = thisStreamName + "\n" + thisHost + "\n" +
thisSid + "\n" + thisProtocol +
"\n" + thisReqUrl + "\n" + thisSessionId;
if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){
// Mark all connections of this session as finished, since this viewer is not allowed to view this stream
connections.setExit();
connections.finishAll();
}
}
uint64_t lastSecond = 0;
uint64_t now = 0;
uint64_t time = 0;
uint64_t down = 0;
uint64_t up = 0;
uint64_t pktcount = 0;
uint64_t pktloss = 0;
uint64_t pktretrans = 0;
std::string connector = "";
// Stay active until Mist exits or we no longer have an active connection
while (config.is_active && (currentConnections || Util::bootSecs() - lastSeen <= 10)){
time = 0;
connector = "";
down = 0;
up = 0;
pktcount = 0;
pktloss = 0;
pktretrans = 0;
currentConnections = 0;
// Count active connections
COMM_LOOP(connections, userOnActive(currentConnections), userOnDisconnect(connections, id));
// Loop through all connection entries to get a summary of statistics
for (uint64_t idx = 0; idx < connections.recordCount(); idx++){
if (connections.getStatus(idx) == COMM_STATUS_INVALID || connections.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;}
uint64_t thisLastSecond = connections.getLastSecond(idx);
std::string thisConnector = connections.getConnector(idx);
// Save info on the latest active connection separately
if (thisLastSecond > lastSecond){
lastSecond = thisLastSecond;
now = connections.getNow(idx);
}
connectorLastActive[thisConnector] = thisLastSecond;
// Sum all other variables
time += connections.getTime(idx);
down += connections.getDown(idx);
up += connections.getUp(idx);
pktcount += connections.getPacketCount(idx);
pktloss += connections.getPacketLostCount(idx);
pktretrans += connections.getPacketRetransmitCount(idx);
}
// Convert connector duration to string
std::stringstream connectorSummary;
bool addDelimiter = false;
connectorSummary << "{";
for (std::map<std::string, uint64_t>::iterator it = connectorLastActive.begin();
it != connectorLastActive.end(); ++it){
if (lastSecond - it->second < 10000){
connectorSummary << (addDelimiter ? "," : "") << it->first;
addDelimiter = true;
}
}
connectorSummary << "}";
// Write summary to global statistics
sessions.setTime(time + globalTime);
sessions.setDown(down + globalDown);
sessions.setUp(up + globalUp);
sessions.setPacketCount(pktcount + globalPktcount);
sessions.setPacketLostCount(pktloss + globalPktloss);
sessions.setPacketRetransmitCount(pktretrans + globalPktretrans);
sessions.setLastSecond(lastSecond);
sessions.setConnector(connectorSummary.str());
sessions.setNow(now);
// Retrigger USER_NEW if a re-sync was requested
if (!thisType && forceTrigger){
forceTrigger = false;
if (Triggers::shouldTrigger("USER_NEW", thisStreamName)){
INFO_MSG("Triggering USER_NEW for stream %s", thisStreamName.c_str());
std::string payload = thisStreamName + "\n" + thisHost + "\n" +
thisSid + "\n" + thisProtocol +
"\n" + thisReqUrl + "\n" + thisSessionId;
if (!Triggers::doTrigger("USER_NEW", payload, thisStreamName)){
INFO_MSG("USER_NEW rejected stream %s", thisStreamName.c_str());
connections.setExit();
connections.finishAll();
}else{
INFO_MSG("USER_NEW accepted stream %s", thisStreamName.c_str());
}
}
}
// Invalidate connections if the session is marked as invalid
if(connections.getExit()){
connections.finishAll();
break;
}
// Remember latest activity so we know when this session ends
if (currentConnections){
lastSeen = Util::bootSecs();
}
Util::sleep(1000);
}
// Trigger USER_END
if (!thisType && Triggers::shouldTrigger("USER_END", thisStreamName)){
lastSecond = 0;
time = 0;
down = 0;
up = 0;
// Get a final summary of this session
for (uint64_t idx = 0; idx < connections.recordCount(); idx++){
if (connections.getStatus(idx) == COMM_STATUS_INVALID || connections.getStatus(idx) & COMM_STATUS_DISCONNECT){continue;}
uint64_t thisLastSecond = connections.getLastSecond(idx);
// Set last second to the latest entry
if (thisLastSecond > lastSecond){
lastSecond = thisLastSecond;
}
// Count protocol durations across the entire session
std::string thisConnector = connections.getConnector(idx);
if (thisConnector != ""){
connectorCount[thisConnector] += connections.getTime(idx);
}
// Sum all other variables
time += connections.getTime(idx);
down += connections.getDown(idx);
up += connections.getUp(idx);
}
// Convert connector duration to string
std::stringstream connectorSummary;
bool addDelimiter = false;
connectorSummary << "{";
for (std::map<std::string, uint64_t>::iterator it = connectorCount.begin();
it != connectorCount.end(); ++it){
connectorSummary << (addDelimiter ? "," : "") << it->first << ":" << it->second;
addDelimiter = true;
}
connectorSummary << "}";
const uint64_t duration = lastSecond - (bootTime / 1000);
std::stringstream summary;
summary << thisSessionId << "\n"
<< thisStreamName << "\n"
<< connectorSummary.str() << "\n"
<< thisHost << "\n"
<< duration << "\n"
<< up << "\n"
<< down << "\n"
<< sessions.getTags();
Triggers::doTrigger("USER_END", summary.str(), thisStreamName);
}
if (!thisType && connections.getExit()){
WARN_MSG("Session %s has been invalidated since it is not allowed to view stream %s", thisSessionId.c_str(), thisStreamName.c_str());
uint64_t sleepStart = Util::bootSecs();
// Keep session invalidated for 10 minutes, or until the session stops
while (config.is_active && sleepStart - Util::bootSecs() < 600){
Util::sleep(1000);
}
}
INFO_MSG("Shutting down session %s", thisSessionId.c_str());
return 0;
}