Added stream tagging support

This commit is contained in:
Thulinma 2021-06-18 00:44:32 +02:00
parent 4d50364703
commit 6bec4066a9
8 changed files with 293 additions and 35 deletions

View file

@ -471,6 +471,43 @@ bool Util::streamAlive(std::string &streamname){
}
}
/// Returns active tags for an exact-matching (already sanitized) streamname
std::set<std::string> Util::streamTags(const std::string &streamname){
std::set<std::string> ret;
IPC::sharedPage shmStreams(SHM_STATE_STREAMS, 0, false, false);
// Abort silently if page cannot be loaded
if (!shmStreams){return ret;}
Util::RelAccX rlxStreams(shmStreams.mapped);
// Abort silently if page cannot be loaded
if (!rlxStreams.isReady()){return ret;}
uint64_t startPos = rlxStreams.getDeleted();
uint64_t endPos = rlxStreams.getEndPos();
for (uint64_t cPos = startPos; cPos < endPos; ++cPos){
const std::string & strm = rlxStreams.getPointer("stream", cPos);
if (strm != streamname){continue;}
// Found it! Fill and break, since only one match can exist.
std::string tags = rlxStreams.getPointer("tags", cPos);
while (tags.size()){
size_t endPos = tags.find(' ');
if (!endPos){
//extra space, ignore
tags.erase(0, 1);
continue;
}
if (endPos == std::string::npos){endPos = tags.size();}
ret.insert(tags.substr(0, endPos));
if (endPos == tags.size()){break;}
tags.erase(0, endPos+1);
}
break;
}
return ret;
}
/// Assures the input for the given stream name is active.
/// Does stream name sanitation first, followed by a stream name length check (<= 100 chars).
/// Then, checks if an input is already active by running streamAlive(). If yes, return true.

View file

@ -18,6 +18,7 @@ namespace Util{
std::string getTmpFolder();
void sanitizeName(std::string &streamname);
bool streamAlive(std::string &streamname);
std::set<std::string> streamTags(const std::string &streamname);
bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true,
bool isProvider = false,
const std::map<std::string, std::string> &overrides = std::map<std::string, std::string>(),

View file

@ -203,6 +203,15 @@ namespace Triggers{
if ((streamName.size() == stringLen || splitter == stringLen) &&
strncmp(strPtr + bPos + 4, streamName.data(), stringLen) == 0){
isHandled = true;
break;
}
// Tag-based? Check tags for this stream
if (strPtr[bPos + 4] == '#'){
std::set<std::string> tags = Util::streamTags(streamName);
if (tags.count(std::string(strPtr + bPos + 5, stringLen - 1))){
isHandled = true;
break;
}
}
bPos += stringLen + 4;
}

View file

@ -175,15 +175,17 @@ public:
viewers = rlx.getInt("viewers", entry);
inputs = rlx.getInt("inputs", entry);
outputs = rlx.getInt("outputs", entry);
tags = rlx.getPointer("tags", entry);
}
bool operator==(const streamStat &b) const{
return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs);
return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs && tags == b.tags);
}
bool operator!=(const streamStat &b) const{return !(*this == b);}
uint8_t status;
uint64_t viewers;
uint64_t inputs;
uint64_t outputs;
std::string tags;
};
void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){
@ -292,6 +294,7 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){
tmp[1u].append(tmpStat.viewers);
tmp[1u].append(tmpStat.inputs);
tmp[1u].append(tmpStat.outputs);
tmp[1u].append(tmpStat.tags);
W.sendFrame(tmp.toString());
}
}
@ -305,6 +308,7 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){
tmp[1u].append(0u);
tmp[1u].append(0u);
tmp[1u].append(0u);
tmp[1u].append("");
W.sendFrame(tmp.toString());
strmRemove.erase(strm);
lastStrmStat.erase(strm);
@ -1164,6 +1168,69 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){
}
}
if (Request.isMember("tag_stream")){
if (Request["tag_stream"].isObject()){
jsonForEach(Request["tag_stream"], it){
if (it->isString()){
Controller::stream_tag(it.key(), it->asStringRef());
}else if (it->isArray()){
jsonForEach(*it, jt){
if (jt->isString()){
Controller::stream_tag(it.key(), jt->asStringRef());
}
}
}
}
}
}
if (Request.isMember("untag_stream")){
if (Request["untag_stream"].isObject()){
jsonForEach(Request["untag_stream"], it){
if (it->isString()){
Controller::stream_untag(it.key(), it->asStringRef());
}else if (it->isArray()){
jsonForEach(*it, jt){
if (jt->isString()){
Controller::stream_untag(it.key(), jt->asStringRef());
}
}
}
}
}
}
if (Request.isMember("stream_tags")){
JSON::Value & rT = Response["stream_tags"];
if (Request["stream_tags"].isArray()){
jsonForEach(Request["stream_tags"], it){
if (it->isString()){
std::set<std::string> tags = Controller::stream_tags(it->asStringRef());
JSON::Value & tRef = rT[it->asStringRef()];
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
}
}
}else if (Request["stream_tags"].isObject()){
jsonForEach(Request["stream_tags"], it){
std::set<std::string> tags = Controller::stream_tags(it.key());
JSON::Value & tRef = rT[it.key()];
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
}
}else if (Request["stream_tags"].isString() && Request["stream_tags"].asStringRef().size()){
std::set<std::string> tags = Controller::stream_tags(Request["stream_tags"].asStringRef());
JSON::Value & tRef = rT[Request["stream_tags"].asStringRef()];
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
}else{
JSON::Value nullPkt, resp;
Controller::fillActive(nullPkt, resp);
jsonForEach(resp, it){
std::set<std::string> tags = Controller::stream_tags(it->asStringRef());
JSON::Value & tRef = rT[it->asStringRef()];
for (std::set<std::string>::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);}
}
}
}
if (Request.isMember("push_start")){
std::string stream;
std::string target;

View file

@ -333,7 +333,6 @@ namespace Controller{
for (std::set<std::string>::iterator jt = activeStreams.begin();
jt != activeStreams.end(); ++jt){
std::string streamname = *jt;
if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){
if (!isPushActive(streamname, target)){
if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){
waitingPushes[streamname].erase(target);
@ -352,7 +351,6 @@ namespace Controller{
}
}
}
}
//Check if any pushes have ended, clean them up
std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
@ -537,11 +535,9 @@ namespace Controller{
for (std::set<std::string>::iterator jt = activeStreams.begin();
jt != activeStreams.end(); ++jt){
std::string streamname = *jt;
if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){
startPush(streamname, target);
}
}
}
// Return push list
response["push_auto_list"] = Controller::Storage["autopushes"];
}
@ -588,7 +584,7 @@ namespace Controller{
jsonForEach(Controller::Storage["autopushes"], it){
if ((*it)[2u].asInt() && (*it)[2u].asInt() < Util::epoch()){continue;}
const std::string &pStr = (*it)[0u].asStringRef();
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
if (Controller::streamMatches(streamname, pStr)){
std::string stream = streamname;
Util::sanitizeName(stream);
// Check variable condition if it exists

View file

@ -51,13 +51,37 @@ std::map<std::string, Controller::statSession> sessions;
std::map<std::string, Controller::triggerLog> Controller::triggerStats; ///< Holds prometheus stats for trigger executions
bool Controller::killOnExit = KILL_ON_EXIT;
tthread::mutex Controller::statsMutex;
tthread::recursive_mutex statsMutex;
uint64_t Controller::statDropoff = 0;
static uint64_t cpu_use = 0;
char noBWCountMatches[1717];
uint64_t bwLimit = 128 * 1024 * 1024; // gigabit default limit
class tagQueueItem{
public:
uint64_t lastChange;
std::set<std::string> tags;
tagQueueItem(){
lastChange = Util::bootSecs();
}
tagQueueItem(const std::string & initialTag){
lastChange = Util::bootSecs();
tags.insert(initialTag);
}
void add(const std::string & newTag){
lastChange = Util::bootSecs();
tags.insert(newTag);
}
void remove(const std::string & delTag){
lastChange = Util::bootSecs();
tags.erase(delTag);
if (!tags.size()){lastChange = 0;}
}
};
std::map<std::string, tagQueueItem> tagQueue;
const char nullAddress[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
static Controller::statLog emptyLogEntry = {0, 0, 0, 0, 0, 0 ,0 ,0, "", nullAddress, ""};
bool notEmpty(const Controller::statLog & dta){
@ -81,6 +105,7 @@ struct streamTotals{
uint64_t packSent;
uint64_t packLoss;
uint64_t packRetrans;
std::set<std::string> tags;
};
Comms::Sessions statComm;
@ -161,6 +186,14 @@ void Controller::updateBandwidthConfig(){
/// This function is ran whenever a stream becomes active.
void Controller::streamStarted(std::string stream){
INFO_MSG("Stream %s became active", stream.c_str());
if (tagQueue.count(stream)){
tagQueueItem & q = tagQueue[stream];
for (std::set<std::string>::iterator it = q.tags.begin(); it != q.tags.end(); ++it){
streamStats[stream].tags.insert(*it);
}
INFO_MSG("Applied %zu tags to stream %s retroactively",q.tags.size() , stream.c_str());
tagQueue.erase(stream);
}
Controller::doAutoPush(stream);
}
@ -214,13 +247,73 @@ void Controller::sessId_shutdown(const std::string &sessId){
INFO_MSG("Shut down session with session ID %s", sessId.c_str());
}
///Checks if the given stream is matched by the given matchString.
///Currently checks exact matches, wildcard matches (when ending in '+') and tag matches (when starting with '#')
bool Controller::streamMatches(const std::string &stream, const std::string &matchString){
//Exact match check
if (stream == matchString){return true;}
//Wildcard match, when ending in '+'
if (*matchString.rbegin() == '+' && stream.substr(0, matchString.size()) == matchString){return true;}
if (matchString[0] == '#'){
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
return streamStats.at(stream).tags.count(matchString.substr(1));//true if tag set, false otherwise
}
return false;//fallback response
}
/// Retrieves a copy of the stream's tags
std::set<std::string> Controller::stream_tags(const std::string &stream){
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
if (!streamStats.count(stream)){return std::set<std::string>();}
return streamStats[stream].tags;
}
/// Tags the given stream
bool Controller::stream_tag(const std::string &stream, const std::string &tag){
if (!statCommActive){
FAIL_MSG("In controller shutdown procedure - cannot tag streams.");
return false;
}
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
if (!streamStats.count(stream)){
FAIL_MSG("Cannot tag stream '%s' with '%s': stream is not currently active -> adding to tag queue", stream.c_str(), tag.c_str());
tagQueue[stream].add(tag);
return false;
}
streamStats[stream].tags.insert(tag);
return true;
}
/// Untags the given stream
bool Controller::stream_untag(const std::string &stream, const std::string &tag){
if (!statCommActive){
FAIL_MSG("In controller shutdown procedure - cannot tag streams.");
return false;
}
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
if (!streamStats.count(stream)){
// If a tag was queued, remove matching tag (if any)
if (tagQueue.count(stream)){
tagQueue[stream].remove(tag);
// And also clean up the entry if it is now empty
if (!tagQueue[stream].lastChange){tagQueue.erase(stream);}
return true;
}
FAIL_MSG("Cannot untag stream '%s' with '%s': stream is not currently active", stream.c_str(), tag.c_str());
return false;
}
streamStats[stream].tags.erase(tag);
return true;
}
/// Tags the given session
void Controller::sessId_tag(const std::string &sessId, const std::string &tag){
if (!statCommActive){
FAIL_MSG("In controller shutdown procedure - cannot tag sessions.");
return;
}
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
for (std::map<std::string, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->first == sessId){
it->second.tags.insert(tag);
@ -240,7 +333,7 @@ void Controller::tag_shutdown(const std::string &tag){
return;
}
unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
for (std::map<std::string, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.tags.count(tag)){
sessCount++;
@ -258,7 +351,7 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str
return;
}
unsigned int sessCount = 0;
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
// Find all matching streams in statComm and get their sessId
for (size_t i = 0; i < statComm.recordCount(); i++){
if (statComm.getStatus(i) == COMM_STATUS_INVALID || (statComm.getStatus(i) & COMM_STATUS_DISCONNECT)){continue;}
@ -311,7 +404,7 @@ void Controller::SharedMemStats(void *config){
}
{
tthread::lock_guard<tthread::mutex> guard(Controller::configMutex);
tthread::lock_guard<tthread::mutex> guard2(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard2(statsMutex);
// parse current users
statLeadIn();
COMM_LOOP(statComm, statOnActive(id), statOnDisconnect(id));
@ -336,6 +429,32 @@ void Controller::SharedMemStats(void *config){
it->second.packLoss = 0;
it->second.packRetrans = 0;
}
Util::RelAccX *strmStats = streamsAccessor();
if (!strmStats || !strmStats->isReady()){strmStats = 0;}
if (strmStats){
uint64_t startPos = strmStats->getDeleted();
uint64_t endPos = strmStats->getEndPos();
for (uint64_t cPos = startPos; cPos < endPos; ++cPos){
std::string strm = strmStats->getPointer("stream", cPos);
std::string tags = strmStats->getPointer("tags", cPos);
if (tags.size() && streamStats.count(strm)){
INFO_MSG("Restoring stream tags: %s -> %s", strm.c_str(), tags.c_str());
streamTotals & st = streamStats[strm];
while (tags.size()){
size_t endPos = tags.find(' ');
if (!endPos){
//extra space, ignore
tags.erase(0, 1);
continue;
}
if (endPos == std::string::npos){endPos = tags.size();}
st.tags.insert(tags.substr(0, endPos));
if (endPos == tags.size()){break;}
tags.erase(0, endPos+1);
}
}
}
}
}
unsigned int tOut = Util::bootSecs() - STATS_DELAY;
unsigned int tIn = Util::bootSecs() - STATS_INPUT_DELAY;
@ -430,10 +549,34 @@ void Controller::SharedMemStats(void *config){
strmStats->setInt("inputs", it->second.currIns, strmPos);
strmStats->setInt("outputs", it->second.currOuts, strmPos);
strmStats->setInt("unspecified", it->second.currUnspecified, strmPos);
if (it->second.tags.size()){
std::string tags;
for (std::set<std::string>::iterator jt = it->second.tags.begin(); jt != it->second.tags.end(); ++jt){
if (tags.size()){tags += " ";}
tags += *jt;
}
strmStats->setString("tags", tags, strmPos);
}else{
strmStats->setString("tags", "", strmPos);
}
++strmPos;
}
}
}
if (tagQueue.size()){
bool updatedTagQueue = true;
while (updatedTagQueue){
updatedTagQueue = false;
for (std::map<std::string, tagQueueItem>::iterator it = tagQueue.begin(); it != tagQueue.end(); ++it){
if (it->second.lastChange + 60 < Util::bootSecs()){
WARN_MSG("Erasing %zu not-applied tags for offline stream %s since it did not show up for a minute", it->second.tags.size(), it->first.c_str());
tagQueue.erase(it);
updatedTagQueue = true;
break;
}
}
}
}
if (strmStats && shiftWrites){
shiftWrites = false;
uint64_t prevEnd = strmStats->getEndPos();
@ -472,7 +615,8 @@ void Controller::SharedMemStats(void *config){
Controller::deinitState(Util::Config::is_restarting);
}
/// Gets a complete list of all streams currently in active state, with optional prefix matching
/// Gets a complete list of all streams currently in active state, with optional stream matching
/// Stream matching uses Controller::streamMatches internally, thus providing the same matching features.
std::set<std::string> Controller::getActiveStreams(const std::string &prefix){
std::set<std::string> ret;
Util::RelAccX *strmStats = streamsAccessor();
@ -482,7 +626,7 @@ std::set<std::string> Controller::getActiveStreams(const std::string &prefix){
for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){
if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;}
const char *S = strmStats->getPointer("stream", i);
if (!strncmp(S, prefix.data(), prefix.size())){ret.insert(S);}
if (streamMatches(S, prefix)){ret.insert(S);}
}
}else{
for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){
@ -1036,7 +1180,7 @@ bool Controller::hasViewers(std::string streamName){
/// ~~~~~~~~~~~~~~~
/// In case of the second method, the response is an array in the same order as the requests.
void Controller::fillClients(JSON::Value &req, JSON::Value &rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
// first, figure out the timestamp wanted
int64_t reqTime = 0;
uint64_t epoch = Util::epoch();
@ -1187,7 +1331,7 @@ void Controller::fillHasStats(JSON::Value &req, JSON::Value &rep){
std::map<std::string, uint64_t> clients;
// check all sessions
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
if (sessions.size()){
for (std::map<std::string, statSession>::iterator it = sessions.begin(); it != sessions.end(); it++){
if (it->second.getSessType() == SESS_INPUT){
@ -1278,7 +1422,7 @@ void Controller::fillActive(JSON::Value &req, JSON::Value &rep){
}
DTSC::Meta M;
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
for (std::map<std::string, struct streamTotals>::iterator it = streamStats.begin(); it != streamStats.end(); ++it){
//If specific streams were requested, match and skip non-matching
if (streams.size()){
@ -1415,7 +1559,7 @@ public:
/// This takes a "totals" request, and fills in the response data.
void Controller::fillTotals(JSON::Value &req, JSON::Value &rep){
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
// first, figure out the timestamps wanted
int64_t reqStart = 0;
int64_t reqEnd = 0;
@ -1705,7 +1849,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
}
{// Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n";
response << "# TYPE mist_sessions_total gauge\n";
@ -1783,7 +1927,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int
resp["pkts"].append(servPackRetrans);
resp["bwlimit"] = bwLimit;
{// Scope for shortest possible blocking of statsMutex
tthread::lock_guard<tthread::mutex> guard(statsMutex);
tthread::lock_guard<tthread::recursive_mutex> guard(statsMutex);
resp["curr"].append((uint64_t)sessions.size());
if (Controller::triggerStats.size()){

View file

@ -7,7 +7,6 @@
#include <mist/shared_memory.h>
#include <mist/socket.h>
#include <mist/timing.h>
#include <mist/tinythread.h>
#include <string>
/// The STAT_CUTOFF define sets how many seconds of statistics history is kept.
@ -97,7 +96,6 @@ namespace Controller{
uint64_t getBpsUp(uint64_t start, uint64_t end);
};
extern tthread::mutex statsMutex;
extern uint64_t statDropoff;
struct triggerLog{
@ -125,11 +123,16 @@ namespace Controller{
void sessId_shutdown(const std::string &sessId);
void tag_shutdown(const std::string &tag);
void sessId_tag(const std::string &sessId, const std::string &tag);
bool stream_tag(const std::string &stream, const std::string &tag);
std::set<std::string> stream_tags(const std::string &stream);
bool stream_untag(const std::string &stream, const std::string &tag);
void sessions_shutdown(const std::string &streamname, const std::string &protocol = "");
bool hasViewers(std::string streamName);
void writeSessionCache(); /*LTS*/
void killConnections(std::string sessId);
bool streamMatches(const std::string &stream, const std::string &matchString);
#define PROMETHEUS_TEXT 0
#define PROMETHEUS_JSON 1
void handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int mode);

View file

@ -188,10 +188,10 @@ namespace Controller{
}
maxAccsRecs = (1024 * 1024 - rlxAccs->getOffset()) / rlxAccs->getRSize();
shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024 * 1024, false, false); // max 1M of stream data
shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 5*1024 * 1024, false, false); // max 5M of stream data
if (!shmStrm || !shmStrm->mapped){
if (shmStrm){delete shmStrm;}
shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024 * 1024, true); // max 1M of stream data
shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 5*1024 * 1024, true); // max 5M of stream data
}
if (!shmStrm->mapped){
FAIL_MSG("Could not open memory page for stream data");
@ -205,6 +205,7 @@ namespace Controller{
rlxStrm->addField("inputs", RAX_64UINT);
rlxStrm->addField("outputs", RAX_64UINT);
rlxStrm->addField("unspecified", RAX_64UINT);
rlxStrm->addField("tags", RAX_512STRING);
rlxStrm->setReady();
}
rlxStrm->setRCount((1024 * 1024 - rlxStrm->getOffset()) / rlxStrm->getRSize());