Completed new sessions system

Co-authored-by: Thulinma <jaron@vietors.com>
This commit is contained in:
Marco van Dijk 2022-03-16 13:46:14 +01:00 committed by Thulinma
parent 074e757028
commit 8ac486b815
36 changed files with 991 additions and 620 deletions

View file

@ -3,6 +3,7 @@
#include "comms.h"
#include "defines.h"
#include "encode.h"
#include "stream.h"
#include "procs.h"
#include "timing.h"
#include <fcntl.h>
@ -10,6 +11,34 @@
#include "config.h"
namespace Comms{
uint8_t sessionViewerMode = SESS_BUNDLE_DEFAULT_VIEWER;
uint8_t sessionInputMode = SESS_BUNDLE_DEFAULT_OTHER;
uint8_t sessionOutputMode = SESS_BUNDLE_DEFAULT_OTHER;
uint8_t sessionUnspecifiedMode = 0;
uint8_t sessionStreamInfoMode = SESS_DEFAULT_STREAM_INFO_MODE;
uint8_t tknMode = SESS_TKN_DEFAULT_MODE;
/// \brief Refreshes the session configuration if the last update was more than 5 seconds ago
void sessionConfigCache(){
static uint64_t lastUpdate = 0;
if (Util::bootSecs() > lastUpdate + 5){
VERYHIGH_MSG("Updating session config");
JSON::Value tmpVal = Util::getGlobalConfig("sessionViewerMode");
if (!tmpVal.isNull()){ sessionViewerMode = tmpVal.asInt(); }
tmpVal = Util::getGlobalConfig("sessionInputMode");
if (!tmpVal.isNull()){ sessionInputMode = tmpVal.asInt(); }
tmpVal = Util::getGlobalConfig("sessionOutputMode");
if (!tmpVal.isNull()){ sessionOutputMode = tmpVal.asInt(); }
tmpVal = Util::getGlobalConfig("sessionUnspecifiedMode");
if (!tmpVal.isNull()){ sessionUnspecifiedMode = tmpVal.asInt(); }
tmpVal = Util::getGlobalConfig("sessionStreamInfoMode");
if (!tmpVal.isNull()){ sessionStreamInfoMode = tmpVal.asInt(); }
tmpVal = Util::getGlobalConfig("tknMode");
if (!tmpVal.isNull()){ tknMode = tmpVal.asInt(); }
lastUpdate = Util::bootSecs();
}
}
Comms::Comms(){
index = INVALID_RECORD_INDEX;
currentSize = 0;
@ -17,7 +46,7 @@ namespace Comms{
}
Comms::~Comms(){
if (index != INVALID_RECORD_INDEX){
if (index != INVALID_RECORD_INDEX && status){
setStatus(COMM_STATUS_DISCONNECT | getStatus());
}
if (master){
@ -123,6 +152,10 @@ namespace Comms{
return;
}
dataAccX = Util::RelAccX(dataPage.mapped);
if (dataAccX.isExit()){
dataPage.close();
return;
}
fieldAccess();
if (index == INVALID_RECORD_INDEX || reIssue){
size_t reqCount = dataAccX.getRCount();
@ -170,19 +203,30 @@ namespace Comms{
void Sessions::addFields(){
Connections::addFields();
dataAccX.addField("tags", RAX_STRING, 512);
dataAccX.addField("sessid", RAX_STRING, 80);
}
void Sessions::nullFields(){
Connections::nullFields();
setSessId("");
setTags("");
}
void Sessions::fieldAccess(){
Connections::fieldAccess();
tags = dataAccX.getFieldAccX("tags");
sessId = dataAccX.getFieldAccX("sessid");
}
std::string Sessions::getTags() const{return tags.string(index);}
std::string Sessions::getTags(size_t idx) const{return (master ? tags.string(idx) : 0);}
void Sessions::setTags(std::string _sid){tags.set(_sid, index);}
void Sessions::setTags(std::string _sid, size_t idx){
if (!master){return;}
tags.set(_sid, idx);
}
Users::Users() : Comms(){}
Users::Users(const Users &rhs) : Comms(){
@ -251,31 +295,60 @@ namespace Comms{
keyNum.set(_keyNum, idx);
}
void Connections::reload(const std::string & sessId, bool _master, bool reIssue){
// Open SEM_SESSION
if(!sem){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, sessId.c_str());
sem.open(semName, O_RDWR, ACCESSPERMS, 1);
if (!sem){return;}
}
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, sessId.c_str());
Comms::reload(userPageName, COMMS_SESSIONS_INITSIZE, _master, reIssue);
}
/// \brief Claims a spot on the connections page for the input/output which calls this function
/// Starts the MistSession binary for each session, which handles the statistics
/// and the USER_NEW and USER_END triggers
/// \param streamName: Name of the stream the input is providing or an output is making available to viewers
/// \param ip: IP address of the viewer which wants to access streamName. For inputs this value can be set to any value
/// \param sid: Session ID given by the player or randomly generated
/// \param tkn: Session token given by the player or randomly generated
/// \param protocol: Protocol currently in use for this connection
/// \param sessionMode: Determines how a viewer session is defined:
// If set to 0, all connections with the same viewer IP and stream name are bundled.
// If set to 1, all connections with the same viewer IP and player ID are bundled.
// If set to 2, all connections with the same player ID and stream name are bundled.
// If set to 3, all connections with the same viewer IP, player ID and stream name are bundled.
/// \param _master: If True, we are reading from this page. If False, we are writing (to our entry) on this page
/// \param reIssue: If True, claim a new entry on this page
void Connections::reload(std::string streamName, std::string ip, std::string sid, std::string protocol, std::string reqUrl, uint64_t sessionMode, bool _master, bool reIssue){
if (sessionMode == 0xFFFFFFFFFFFFFFFFull){
FAIL_MSG("The session mode was not initialised properly. Assuming default behaviour of bundling by viewer IP, stream name and player id");
sessionMode = SESS_BUNDLE_STREAMNAME_HOSTNAME_SESSIONID;
}
void Connections::reload(const std::string & streamName, const std::string & ip, const std::string & tkn, const std::string & protocol, const std::string & reqUrl, bool _master, bool reIssue){
initialTkn = tkn;
uint8_t sessMode = sessionViewerMode;
// Generate a unique session ID for each viewer, input or output
sessionId = generateSession(streamName, ip, sid, protocol, sessionMode);
if (protocol.size() >= 6 && protocol.substr(0, 6) == "INPUT:"){
sessionId = "I" + sessionId;
sessMode = sessionInputMode;
sessionId = "I" + generateSession(streamName, ip, tkn, protocol, sessMode);
}else if (protocol.size() >= 7 && protocol.substr(0, 7) == "OUTPUT:"){
sessionId = "O" + sessionId;
sessMode = sessionOutputMode;
sessionId = "O" + generateSession(streamName, ip, tkn, protocol, sessMode);
}else{
// If the session only contains the HTTP connector, check sessionStreamInfoMode
if (protocol.size() == 4 && protocol == "HTTP"){
if (sessionStreamInfoMode == SESS_HTTP_AS_VIEWER){
sessionId = generateSession(streamName, ip, tkn, protocol, sessMode);
}else if (sessionStreamInfoMode == SESS_HTTP_AS_OUTPUT){
sessMode = sessionOutputMode;
sessionId = "O" + generateSession(streamName, ip, tkn, protocol, sessMode);
}else if (sessionStreamInfoMode == SESS_HTTP_DISABLED){
return;
}else if (sessionStreamInfoMode == SESS_HTTP_AS_UNSPECIFIED){
// Set sessMode to include all variables when determining the session ID
sessMode = sessionUnspecifiedMode;
sessionId = "U" + generateSession(streamName, ip, tkn, protocol, sessMode);
}else{
sessionId = generateSession(streamName, ip, tkn, protocol, sessMode);
}
}else{
sessionId = generateSession(streamName, ip, tkn, protocol, sessMode);
}
}
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, COMMS_SESSIONS, sessionId.c_str());
@ -283,36 +356,59 @@ namespace Comms{
if (!_master){
dataPage.init(userPageName, 0, false, false);
if (!dataPage){
std::string host;
Socket::hostBytesToStr(ip.data(), 16, host);
pid_t thisPid;
std::deque<std::string> args;
args.push_back(Util::getMyPath() + "MistSession");
args.push_back(sessionId);
args.push_back("--sessionmode");
args.push_back(JSON::Value(sessionMode).asString());
args.push_back("--streamname");
args.push_back(streamName);
args.push_back("--ip");
args.push_back(ip);
args.push_back("--sid");
args.push_back(sid);
args.push_back("--protocol");
args.push_back(protocol);
args.push_back("--requrl");
args.push_back(reqUrl);
// First bit defines whether to include stream name
if (sessMode & 0x08){
args.push_back("--streamname");
args.push_back(streamName);
}else{
setenv("SESSION_STREAM", streamName.c_str(), 1);
}
// Second bit defines whether to include viewer ip
if (sessMode & 0x04){
args.push_back("--ip");
args.push_back(host);
}else{
setenv("SESSION_IP", host.c_str(), 1);
}
// Third bit defines whether to include tkn
if (sessMode & 0x02){
args.push_back("--tkn");
args.push_back(tkn);
}else{
setenv("SESSION_TKN", tkn.c_str(), 1);
}
// Fourth bit defines whether to include protocol
if (sessMode & 0x01){
args.push_back("--protocol");
args.push_back(protocol);
}else{
setenv("SESSION_PROTOCOL", protocol.c_str(), 1);
}
setenv("SESSION_REQURL", reqUrl.c_str(), 1);
int err = fileno(stderr);
thisPid = Util::Procs::StartPiped(args, 0, 0, &err);
Util::Procs::forget(thisPid);
HIGH_MSG("Spawned new session executeable (pid %u) for sessionId '%s', corresponding to host %s and stream %s", thisPid, sessionId.c_str(), ip.c_str(), streamName.c_str());
unsetenv("SESSION_STREAM");
unsetenv("SESSION_IP");
unsetenv("SESSION_TKN");
unsetenv("SESSION_PROTOCOL");
unsetenv("SESSION_REQURL");
}
}
// Open SEM_SESSION
if(!sem){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_SESSION, sessionId.c_str());
sem.open(semName, O_RDWR, ACCESSPERMS, 1);
reload(sessionId, _master, reIssue);
if (index != INVALID_RECORD_INDEX){
setConnector(protocol);
setHost(ip);
setStream(streamName);
VERYHIGH_MSG("Reloading connection. Claimed record %lu", index);
}
Comms::reload(userPageName, COMMS_SESSIONS_INITSIZE, _master, reIssue);
VERYHIGH_MSG("Reloading connection. Claimed record %lu", index);
}
/// \brief Marks the data page as closed, so that we longer write any new data to is
@ -341,7 +437,6 @@ namespace Comms{
dataAccX.addField("host", RAX_RAW, 16);
dataAccX.addField("stream", RAX_STRING, 100);
dataAccX.addField("connector", RAX_STRING, 20);
dataAccX.addField("tags", RAX_STRING, 512);
dataAccX.addField("pktcount", RAX_64UINT);
dataAccX.addField("pktloss", RAX_64UINT);
dataAccX.addField("pktretrans", RAX_64UINT);
@ -349,7 +444,6 @@ namespace Comms{
void Connections::nullFields(){
Comms::nullFields();
setTags("");
setConnector("");
setStream("");
setHost("");
@ -373,7 +467,6 @@ namespace Comms{
host = dataAccX.getFieldAccX("host");
stream = dataAccX.getFieldAccX("stream");
connector = dataAccX.getFieldAccX("connector");
tags = dataAccX.getFieldAccX("tags");
pktcount = dataAccX.getFieldAccX("pktcount");
pktloss = dataAccX.getFieldAccX("pktloss");
pktretrans = dataAccX.getFieldAccX("pktretrans");
@ -461,14 +554,6 @@ namespace Comms{
return false;
}
std::string Connections::getTags() const{return tags.string(index);}
std::string Connections::getTags(size_t idx) const{return (master ? tags.string(idx) : 0);}
void Connections::setTags(std::string _sid){tags.set(_sid, index);}
void Connections::setTags(std::string _sid, size_t idx){
if (!master){return;}
tags.set(_sid, idx);
}
uint64_t Connections::getPacketCount() const{return pktcount.uint(index);}
uint64_t Connections::getPacketCount(size_t idx) const{
return (master ? pktcount.uint(idx) : 0);
@ -501,31 +586,32 @@ namespace Comms{
/// \brief Generates a session ID which is unique per viewer
/// \return generated session ID as string
std::string Connections::generateSession(std::string streamName, std::string ip, std::string sid, std::string connector, uint64_t sessionMode){
std::string Connections::generateSession(const std::string & streamName, const std::string & ip, const std::string & tkn, const std::string & connector, uint64_t sessionMode){
std::string concat;
std::string debugMsg = "Generating session id based on";
// First bit defines whether to include stream name
if (sessionMode > 7){
if (sessionMode & 0x08){
concat += streamName;
sessionMode -= 8;
debugMsg += " stream name '" + streamName + "'";
}
// Second bit defines whether to include viewer ip
if (sessionMode > 3){
if (sessionMode & 0x04){
concat += ip;
sessionMode -= 4;
std::string ipHex;
Socket::hostBytesToStr(ip.c_str(), ip.size(), ipHex);
debugMsg += " IP '" + ipHex + "'";
}
// Third bit defines whether to include player ip
if (sessionMode > 1){
concat += sid;
sessionMode -= 2;
// Third bit defines whether to include client-side session token
if (sessionMode & 0x02){
concat += tkn;
debugMsg += " session token '" + tkn + "'";
}
// Fourth bit defines whether to include protocol
if (sessionMode == 1){
if (sessionMode & 0x01){
concat += connector;
sessionMode = 0;
}
if (sessionMode > 0){
WARN_MSG("Could not resolve session mode of value %lu", sessionMode);
debugMsg += " protocol '" + connector + "'";
}
VERYHIGH_MSG("%s", debugMsg.c_str());
return Secure::sha256(concat.c_str(), concat.length());
}
}// namespace Comms