diff --git a/lib/stream.cpp b/lib/stream.cpp index 1a3afe98..48332c6c 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -471,6 +471,43 @@ bool Util::streamAlive(std::string &streamname){ } } +/// Returns active tags for an exact-matching (already sanitized) streamname +std::set Util::streamTags(const std::string &streamname){ + std::set ret; + + IPC::sharedPage shmStreams(SHM_STATE_STREAMS, 0, false, false); + // Abort silently if page cannot be loaded + if (!shmStreams){return ret;} + + Util::RelAccX rlxStreams(shmStreams.mapped); + // Abort silently if page cannot be loaded + if (!rlxStreams.isReady()){return ret;} + + uint64_t startPos = rlxStreams.getDeleted(); + uint64_t endPos = rlxStreams.getEndPos(); + for (uint64_t cPos = startPos; cPos < endPos; ++cPos){ + const std::string & strm = rlxStreams.getPointer("stream", cPos); + if (strm != streamname){continue;} + + // Found it! Fill and break, since only one match can exist. + std::string tags = rlxStreams.getPointer("tags", cPos); + while (tags.size()){ + size_t endPos = tags.find(' '); + if (!endPos){ + //extra space, ignore + tags.erase(0, 1); + continue; + } + if (endPos == std::string::npos){endPos = tags.size();} + ret.insert(tags.substr(0, endPos)); + if (endPos == tags.size()){break;} + tags.erase(0, endPos+1); + } + break; + } + return ret; +} + /// Assures the input for the given stream name is active. /// Does stream name sanitation first, followed by a stream name length check (<= 100 chars). /// Then, checks if an input is already active by running streamAlive(). If yes, return true. diff --git a/lib/stream.h b/lib/stream.h index 0c6cd3dd..10d95f52 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -18,6 +18,7 @@ namespace Util{ std::string getTmpFolder(); void sanitizeName(std::string &streamname); bool streamAlive(std::string &streamname); + std::set streamTags(const std::string &streamname); bool startInput(std::string streamname, std::string filename = "", bool forkFirst = true, bool isProvider = false, const std::map &overrides = std::map(), diff --git a/lib/triggers.cpp b/lib/triggers.cpp index 46f2a87b..2fb0075a 100644 --- a/lib/triggers.cpp +++ b/lib/triggers.cpp @@ -203,6 +203,15 @@ namespace Triggers{ if ((streamName.size() == stringLen || splitter == stringLen) && strncmp(strPtr + bPos + 4, streamName.data(), stringLen) == 0){ isHandled = true; + break; + } + // Tag-based? Check tags for this stream + if (strPtr[bPos + 4] == '#'){ + std::set tags = Util::streamTags(streamName); + if (tags.count(std::string(strPtr + bPos + 5, stringLen - 1))){ + isHandled = true; + break; + } } bPos += stringLen + 4; } diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 8670ed7d..bc11c613 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -175,15 +175,17 @@ public: viewers = rlx.getInt("viewers", entry); inputs = rlx.getInt("inputs", entry); outputs = rlx.getInt("outputs", entry); + tags = rlx.getPointer("tags", entry); } bool operator==(const streamStat &b) const{ - return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs); + return (status == b.status && viewers == b.viewers && inputs == b.inputs && outputs == b.outputs && tags == b.tags); } bool operator!=(const streamStat &b) const{return !(*this == b);} uint8_t status; uint64_t viewers; uint64_t inputs; uint64_t outputs; + std::string tags; }; void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){ @@ -292,6 +294,7 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){ tmp[1u].append(tmpStat.viewers); tmp[1u].append(tmpStat.inputs); tmp[1u].append(tmpStat.outputs); + tmp[1u].append(tmpStat.tags); W.sendFrame(tmp.toString()); } } @@ -305,6 +308,7 @@ void Controller::handleWebSocket(HTTP::Parser &H, Socket::Connection &C){ tmp[1u].append(0u); tmp[1u].append(0u); tmp[1u].append(0u); + tmp[1u].append(""); W.sendFrame(tmp.toString()); strmRemove.erase(strm); lastStrmStat.erase(strm); @@ -1164,6 +1168,69 @@ void Controller::handleAPICommands(JSON::Value &Request, JSON::Value &Response){ } } + if (Request.isMember("tag_stream")){ + if (Request["tag_stream"].isObject()){ + jsonForEach(Request["tag_stream"], it){ + if (it->isString()){ + Controller::stream_tag(it.key(), it->asStringRef()); + }else if (it->isArray()){ + jsonForEach(*it, jt){ + if (jt->isString()){ + Controller::stream_tag(it.key(), jt->asStringRef()); + } + } + } + } + } + } + + if (Request.isMember("untag_stream")){ + if (Request["untag_stream"].isObject()){ + jsonForEach(Request["untag_stream"], it){ + if (it->isString()){ + Controller::stream_untag(it.key(), it->asStringRef()); + }else if (it->isArray()){ + jsonForEach(*it, jt){ + if (jt->isString()){ + Controller::stream_untag(it.key(), jt->asStringRef()); + } + } + } + } + } + } + + if (Request.isMember("stream_tags")){ + JSON::Value & rT = Response["stream_tags"]; + if (Request["stream_tags"].isArray()){ + jsonForEach(Request["stream_tags"], it){ + if (it->isString()){ + std::set tags = Controller::stream_tags(it->asStringRef()); + JSON::Value & tRef = rT[it->asStringRef()]; + for (std::set::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);} + } + } + }else if (Request["stream_tags"].isObject()){ + jsonForEach(Request["stream_tags"], it){ + std::set tags = Controller::stream_tags(it.key()); + JSON::Value & tRef = rT[it.key()]; + for (std::set::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);} + } + }else if (Request["stream_tags"].isString() && Request["stream_tags"].asStringRef().size()){ + std::set tags = Controller::stream_tags(Request["stream_tags"].asStringRef()); + JSON::Value & tRef = rT[Request["stream_tags"].asStringRef()]; + for (std::set::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);} + }else{ + JSON::Value nullPkt, resp; + Controller::fillActive(nullPkt, resp); + jsonForEach(resp, it){ + std::set tags = Controller::stream_tags(it->asStringRef()); + JSON::Value & tRef = rT[it->asStringRef()]; + for (std::set::iterator ti = tags.begin(); ti != tags.end(); ++ti){tRef.append(*ti);} + } + } + } + if (Request.isMember("push_start")){ std::string stream; std::string target; diff --git a/src/controller/controller_push.cpp b/src/controller/controller_push.cpp index cdd2c580..bae78246 100644 --- a/src/controller/controller_push.cpp +++ b/src/controller/controller_push.cpp @@ -333,19 +333,17 @@ namespace Controller{ for (std::set::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ std::string streamname = *jt; - if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){ - if (!isPushActive(streamname, target)){ - if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ - waitingPushes[streamname].erase(target); - if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} - MEDIUM_MSG("Conditions of push `%s->%s` evaluate to true. Starting push...", stream.c_str(), target.c_str()); - startPush(streamname, target); - curCount++; - // If no end time is given but there is a start time, remove the push after starting it - if (startTime && !endTime){ - removePush(*it); - break; - } + if (!isPushActive(streamname, target)){ + if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ + waitingPushes[streamname].erase(target); + if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} + MEDIUM_MSG("Conditions of push `%s->%s` evaluate to true. Starting push...", stream.c_str(), target.c_str()); + startPush(streamname, target); + curCount++; + // If no end time is given but there is a start time, remove the push after starting it + if (startTime && !endTime){ + removePush(*it); + break; } } } @@ -537,9 +535,7 @@ namespace Controller{ for (std::set::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ std::string streamname = *jt; - if (stream == streamname || (*stream.rbegin() == '+' && streamname.substr(0, stream.size()) == stream)){ - startPush(streamname, target); - } + startPush(streamname, target); } } // Return push list @@ -588,7 +584,7 @@ namespace Controller{ jsonForEach(Controller::Storage["autopushes"], it){ if ((*it)[2u].asInt() && (*it)[2u].asInt() < Util::epoch()){continue;} const std::string &pStr = (*it)[0u].asStringRef(); - if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ + if (Controller::streamMatches(streamname, pStr)){ std::string stream = streamname; Util::sanitizeName(stream); // Check variable condition if it exists diff --git a/src/controller/controller_statistics.cpp b/src/controller/controller_statistics.cpp index d38248b2..a0310b0c 100644 --- a/src/controller/controller_statistics.cpp +++ b/src/controller/controller_statistics.cpp @@ -51,13 +51,37 @@ std::map sessions; std::map Controller::triggerStats; ///< Holds prometheus stats for trigger executions bool Controller::killOnExit = KILL_ON_EXIT; -tthread::mutex Controller::statsMutex; +tthread::recursive_mutex statsMutex; uint64_t Controller::statDropoff = 0; static uint64_t cpu_use = 0; char noBWCountMatches[1717]; uint64_t bwLimit = 128 * 1024 * 1024; // gigabit default limit +class tagQueueItem{ + public: + uint64_t lastChange; + std::set tags; + tagQueueItem(){ + lastChange = Util::bootSecs(); + } + tagQueueItem(const std::string & initialTag){ + lastChange = Util::bootSecs(); + tags.insert(initialTag); + } + void add(const std::string & newTag){ + lastChange = Util::bootSecs(); + tags.insert(newTag); + } + void remove(const std::string & delTag){ + lastChange = Util::bootSecs(); + tags.erase(delTag); + if (!tags.size()){lastChange = 0;} + } +}; + +std::map tagQueue; + const char nullAddress[16] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; static Controller::statLog emptyLogEntry = {0, 0, 0, 0, 0, 0 ,0 ,0, "", nullAddress, ""}; bool notEmpty(const Controller::statLog & dta){ @@ -81,6 +105,7 @@ struct streamTotals{ uint64_t packSent; uint64_t packLoss; uint64_t packRetrans; + std::set tags; }; Comms::Sessions statComm; @@ -161,6 +186,14 @@ void Controller::updateBandwidthConfig(){ /// This function is ran whenever a stream becomes active. void Controller::streamStarted(std::string stream){ INFO_MSG("Stream %s became active", stream.c_str()); + if (tagQueue.count(stream)){ + tagQueueItem & q = tagQueue[stream]; + for (std::set::iterator it = q.tags.begin(); it != q.tags.end(); ++it){ + streamStats[stream].tags.insert(*it); + } + INFO_MSG("Applied %zu tags to stream %s retroactively",q.tags.size() , stream.c_str()); + tagQueue.erase(stream); + } Controller::doAutoPush(stream); } @@ -214,13 +247,73 @@ void Controller::sessId_shutdown(const std::string &sessId){ INFO_MSG("Shut down session with session ID %s", sessId.c_str()); } +///Checks if the given stream is matched by the given matchString. +///Currently checks exact matches, wildcard matches (when ending in '+') and tag matches (when starting with '#') +bool Controller::streamMatches(const std::string &stream, const std::string &matchString){ + //Exact match check + if (stream == matchString){return true;} + //Wildcard match, when ending in '+' + if (*matchString.rbegin() == '+' && stream.substr(0, matchString.size()) == matchString){return true;} + if (matchString[0] == '#'){ + tthread::lock_guard guard(statsMutex); + return streamStats.at(stream).tags.count(matchString.substr(1));//true if tag set, false otherwise + } + return false;//fallback response +} + +/// Retrieves a copy of the stream's tags +std::set Controller::stream_tags(const std::string &stream){ + tthread::lock_guard guard(statsMutex); + if (!streamStats.count(stream)){return std::set();} + return streamStats[stream].tags; +} + +/// Tags the given stream +bool Controller::stream_tag(const std::string &stream, const std::string &tag){ + if (!statCommActive){ + FAIL_MSG("In controller shutdown procedure - cannot tag streams."); + return false; + } + tthread::lock_guard guard(statsMutex); + if (!streamStats.count(stream)){ + FAIL_MSG("Cannot tag stream '%s' with '%s': stream is not currently active -> adding to tag queue", stream.c_str(), tag.c_str()); + + tagQueue[stream].add(tag); + return false; + } + streamStats[stream].tags.insert(tag); + return true; +} + +/// Untags the given stream +bool Controller::stream_untag(const std::string &stream, const std::string &tag){ + if (!statCommActive){ + FAIL_MSG("In controller shutdown procedure - cannot tag streams."); + return false; + } + tthread::lock_guard guard(statsMutex); + if (!streamStats.count(stream)){ + // If a tag was queued, remove matching tag (if any) + if (tagQueue.count(stream)){ + tagQueue[stream].remove(tag); + // And also clean up the entry if it is now empty + if (!tagQueue[stream].lastChange){tagQueue.erase(stream);} + return true; + } + FAIL_MSG("Cannot untag stream '%s' with '%s': stream is not currently active", stream.c_str(), tag.c_str()); + return false; + } + streamStats[stream].tags.erase(tag); + return true; +} + /// Tags the given session void Controller::sessId_tag(const std::string &sessId, const std::string &tag){ if (!statCommActive){ FAIL_MSG("In controller shutdown procedure - cannot tag sessions."); return; } - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->first == sessId){ it->second.tags.insert(tag); @@ -240,7 +333,7 @@ void Controller::tag_shutdown(const std::string &tag){ return; } unsigned int sessCount = 0; - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.tags.count(tag)){ sessCount++; @@ -258,7 +351,7 @@ void Controller::sessions_shutdown(const std::string &streamname, const std::str return; } unsigned int sessCount = 0; - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); // Find all matching streams in statComm and get their sessId for (size_t i = 0; i < statComm.recordCount(); i++){ if (statComm.getStatus(i) == COMM_STATUS_INVALID || (statComm.getStatus(i) & COMM_STATUS_DISCONNECT)){continue;} @@ -311,7 +404,7 @@ void Controller::SharedMemStats(void *config){ } { tthread::lock_guard guard(Controller::configMutex); - tthread::lock_guard guard2(statsMutex); + tthread::lock_guard guard2(statsMutex); // parse current users statLeadIn(); COMM_LOOP(statComm, statOnActive(id), statOnDisconnect(id)); @@ -336,6 +429,32 @@ void Controller::SharedMemStats(void *config){ it->second.packLoss = 0; it->second.packRetrans = 0; } + Util::RelAccX *strmStats = streamsAccessor(); + if (!strmStats || !strmStats->isReady()){strmStats = 0;} + if (strmStats){ + uint64_t startPos = strmStats->getDeleted(); + uint64_t endPos = strmStats->getEndPos(); + for (uint64_t cPos = startPos; cPos < endPos; ++cPos){ + std::string strm = strmStats->getPointer("stream", cPos); + std::string tags = strmStats->getPointer("tags", cPos); + if (tags.size() && streamStats.count(strm)){ + INFO_MSG("Restoring stream tags: %s -> %s", strm.c_str(), tags.c_str()); + streamTotals & st = streamStats[strm]; + while (tags.size()){ + size_t endPos = tags.find(' '); + if (!endPos){ + //extra space, ignore + tags.erase(0, 1); + continue; + } + if (endPos == std::string::npos){endPos = tags.size();} + st.tags.insert(tags.substr(0, endPos)); + if (endPos == tags.size()){break;} + tags.erase(0, endPos+1); + } + } + } + } } unsigned int tOut = Util::bootSecs() - STATS_DELAY; unsigned int tIn = Util::bootSecs() - STATS_INPUT_DELAY; @@ -430,10 +549,34 @@ void Controller::SharedMemStats(void *config){ strmStats->setInt("inputs", it->second.currIns, strmPos); strmStats->setInt("outputs", it->second.currOuts, strmPos); strmStats->setInt("unspecified", it->second.currUnspecified, strmPos); + if (it->second.tags.size()){ + std::string tags; + for (std::set::iterator jt = it->second.tags.begin(); jt != it->second.tags.end(); ++jt){ + if (tags.size()){tags += " ";} + tags += *jt; + } + strmStats->setString("tags", tags, strmPos); + }else{ + strmStats->setString("tags", "", strmPos); + } ++strmPos; } } } + if (tagQueue.size()){ + bool updatedTagQueue = true; + while (updatedTagQueue){ + updatedTagQueue = false; + for (std::map::iterator it = tagQueue.begin(); it != tagQueue.end(); ++it){ + if (it->second.lastChange + 60 < Util::bootSecs()){ + WARN_MSG("Erasing %zu not-applied tags for offline stream %s since it did not show up for a minute", it->second.tags.size(), it->first.c_str()); + tagQueue.erase(it); + updatedTagQueue = true; + break; + } + } + } + } if (strmStats && shiftWrites){ shiftWrites = false; uint64_t prevEnd = strmStats->getEndPos(); @@ -472,7 +615,8 @@ void Controller::SharedMemStats(void *config){ Controller::deinitState(Util::Config::is_restarting); } -/// Gets a complete list of all streams currently in active state, with optional prefix matching +/// Gets a complete list of all streams currently in active state, with optional stream matching +/// Stream matching uses Controller::streamMatches internally, thus providing the same matching features. std::set Controller::getActiveStreams(const std::string &prefix){ std::set ret; Util::RelAccX *strmStats = streamsAccessor(); @@ -482,7 +626,7 @@ std::set Controller::getActiveStreams(const std::string &prefix){ for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ if (strmStats->getInt("status", i) != STRMSTAT_READY){continue;} const char *S = strmStats->getPointer("stream", i); - if (!strncmp(S, prefix.data(), prefix.size())){ret.insert(S);} + if (streamMatches(S, prefix)){ret.insert(S);} } }else{ for (uint64_t i = strmStats->getDeleted(); i < endPos; ++i){ @@ -1036,7 +1180,7 @@ bool Controller::hasViewers(std::string streamName){ /// ~~~~~~~~~~~~~~~ /// In case of the second method, the response is an array in the same order as the requests. void Controller::fillClients(JSON::Value &req, JSON::Value &rep){ - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); // first, figure out the timestamp wanted int64_t reqTime = 0; uint64_t epoch = Util::epoch(); @@ -1187,7 +1331,7 @@ void Controller::fillHasStats(JSON::Value &req, JSON::Value &rep){ std::map clients; // check all sessions { - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); if (sessions.size()){ for (std::map::iterator it = sessions.begin(); it != sessions.end(); it++){ if (it->second.getSessType() == SESS_INPUT){ @@ -1278,7 +1422,7 @@ void Controller::fillActive(JSON::Value &req, JSON::Value &rep){ } DTSC::Meta M; { - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); for (std::map::iterator it = streamStats.begin(); it != streamStats.end(); ++it){ //If specific streams were requested, match and skip non-matching if (streams.size()){ @@ -1415,7 +1559,7 @@ public: /// This takes a "totals" request, and fills in the response data. void Controller::fillTotals(JSON::Value &req, JSON::Value &rep){ - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); // first, figure out the timestamps wanted int64_t reqStart = 0; int64_t reqEnd = 0; @@ -1705,7 +1849,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int } {// Scope for shortest possible blocking of statsMutex - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); response << "# HELP mist_sessions_total Number of sessions active right now, server-wide, by type.\n"; response << "# TYPE mist_sessions_total gauge\n"; @@ -1783,7 +1927,7 @@ void Controller::handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int resp["pkts"].append(servPackRetrans); resp["bwlimit"] = bwLimit; {// Scope for shortest possible blocking of statsMutex - tthread::lock_guard guard(statsMutex); + tthread::lock_guard guard(statsMutex); resp["curr"].append((uint64_t)sessions.size()); if (Controller::triggerStats.size()){ diff --git a/src/controller/controller_statistics.h b/src/controller/controller_statistics.h index 69605942..b4094b82 100644 --- a/src/controller/controller_statistics.h +++ b/src/controller/controller_statistics.h @@ -7,7 +7,6 @@ #include #include #include -#include #include /// The STAT_CUTOFF define sets how many seconds of statistics history is kept. @@ -97,7 +96,6 @@ namespace Controller{ uint64_t getBpsUp(uint64_t start, uint64_t end); }; - extern tthread::mutex statsMutex; extern uint64_t statDropoff; struct triggerLog{ @@ -125,11 +123,16 @@ namespace Controller{ void sessId_shutdown(const std::string &sessId); void tag_shutdown(const std::string &tag); void sessId_tag(const std::string &sessId, const std::string &tag); + bool stream_tag(const std::string &stream, const std::string &tag); + std::set stream_tags(const std::string &stream); + bool stream_untag(const std::string &stream, const std::string &tag); void sessions_shutdown(const std::string &streamname, const std::string &protocol = ""); bool hasViewers(std::string streamName); void writeSessionCache(); /*LTS*/ void killConnections(std::string sessId); + bool streamMatches(const std::string &stream, const std::string &matchString); + #define PROMETHEUS_TEXT 0 #define PROMETHEUS_JSON 1 void handlePrometheus(HTTP::Parser &H, Socket::Connection &conn, int mode); diff --git a/src/controller/controller_storage.cpp b/src/controller/controller_storage.cpp index e7623015..425ee4ff 100644 --- a/src/controller/controller_storage.cpp +++ b/src/controller/controller_storage.cpp @@ -188,10 +188,10 @@ namespace Controller{ } maxAccsRecs = (1024 * 1024 - rlxAccs->getOffset()) / rlxAccs->getRSize(); - shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024 * 1024, false, false); // max 1M of stream data + shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 5*1024 * 1024, false, false); // max 5M of stream data if (!shmStrm || !shmStrm->mapped){ if (shmStrm){delete shmStrm;} - shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 1024 * 1024, true); // max 1M of stream data + shmStrm = new IPC::sharedPage(SHM_STATE_STREAMS, 5*1024 * 1024, true); // max 5M of stream data } if (!shmStrm->mapped){ FAIL_MSG("Could not open memory page for stream data"); @@ -205,6 +205,7 @@ namespace Controller{ rlxStrm->addField("inputs", RAX_64UINT); rlxStrm->addField("outputs", RAX_64UINT); rlxStrm->addField("unspecified", RAX_64UINT); + rlxStrm->addField("tags", RAX_512STRING); rlxStrm->setReady(); } rlxStrm->setRCount((1024 * 1024 - rlxStrm->getOffset()) / rlxStrm->getRSize());