#include "controller_push.h" #include "controller_statistics.h" #include "controller_storage.h" #include #include #include #include #include #include #include namespace Controller{ /// Internal list of currently active pushes std::map activePushes; /// Internal list of waiting pushes std::map > waitingPushes; static bool mustWritePushList = false; static bool pushListRead = false; /// Immediately starts a push for the given stream to the given target. /// Simply calls Util::startPush and stores the resulting PID in the local activePushes map. void startPush(const std::string &stream, std::string &target){ //Cancel if already active if (isPushActive(stream, target)){return;} std::string originalTarget = target; pid_t ret = Util::startPush(stream, target); if (ret){ JSON::Value push; push.append(ret); push.append(stream); push.append(originalTarget); push.append(target); activePushes[ret] = push; mustWritePushList = true; } } /// Returns true if the push is currently active, false otherwise. bool isPushActive(const std::string &streamname, const std::string &target){ while (Controller::conf.is_active && !pushListRead){ Util::sleep(100); } std::set toWipe; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ if (Util::Procs::isActive(it->first)){ if (it->second[1u].asStringRef() == streamname && it->second[2u].asStringRef() == target){return true;} }else{ toWipe.insert(it->first); } } while (toWipe.size()){ activePushes.erase(*toWipe.begin()); mustWritePushList = true; toWipe.erase(toWipe.begin()); } return false; } /// Stops any pushes matching the stream name (pattern) and target void stopActivePushes(const std::string &streamname, const std::string &target){ while (Controller::conf.is_active && !pushListRead){ Util::sleep(100); } std::set toWipe; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ if (Util::Procs::isActive(it->first)){ if (it->second[2u].asStringRef() == target && (it->second[1u].asStringRef() == streamname || (*streamname.rbegin() == '+' && it->second[1u].asStringRef().substr(0, streamname.size()) == streamname))){ Util::Procs::Stop(it->first); } }else{ toWipe.insert(it->first); } } while (toWipe.size()){ activePushes.erase(*toWipe.begin()); mustWritePushList = true; toWipe.erase(toWipe.begin()); } } /// Immediately stops a push with the given ID void stopPush(unsigned int ID){ if (ID > 1 && activePushes.count(ID)){Util::Procs::Stop(ID);} } /// Compactly writes the list of pushes to a pointer, assumed to be 8MiB in size static void writePushList(char * pwo){ char * max = pwo + 8*1024*1024 - 4; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ //check if the whole entry will fit unsigned int entrylen = 4+2+it->second[1u].asStringRef().size()+2+it->second[2u].asStringRef().size()+2+it->second[3u].asStringRef().size(); if (pwo+entrylen >= max){return;} //write the pid as a 32 bits unsigned integer Bit::htobl(pwo, it->first); pwo += 4; //write the streamname, original target and target, 2-byte-size-prepended for (unsigned int i = 1; i < 4; ++i){ const std::string &itm = it->second[i].asStringRef(); Bit::htobs(pwo, itm.size()); memcpy(pwo+2, itm.data(), itm.size()); pwo += 2+itm.size(); } } //if it fits, write an ending zero to indicate end of page if (pwo <= max){ Bit::htobl(pwo, 0); } } ///Reads the list of pushes from a pointer, assumed to end in four zeroes static void readPushList(char * pwo){ activePushes.clear(); pid_t p = Bit::btohl(pwo); HIGH_MSG("Recovering pushes: %" PRIu32, (uint32_t)p); while (p > 1){ JSON::Value push; push.append(p); pwo += 4; for (uint8_t i = 0; i < 3; ++i){ uint16_t l = Bit::btohs(pwo); push.append(std::string(pwo+2, l)); pwo += 2+l; } INFO_MSG("Recovered push: %s", push.toString().c_str()); Util::Procs::remember(p); mustWritePushList = true; activePushes[p] = push; p = Bit::btohl(pwo); } } /// Loops, checking every second if any pushes need restarting. void pushCheckLoop(void *np){ { IPC::sharedPage pushReadPage("MstPush", 8*1024*1024, false, false); if (pushReadPage.mapped){readPushList(pushReadPage.mapped);} } pushListRead = true; IPC::sharedPage pushPage("MstPush", 8*1024*1024, true, false); while (Controller::conf.is_active){ // this scope prevents the configMutex from being locked constantly { tthread::lock_guard guard(Controller::configMutex); long long maxspeed = Controller::Storage["push_settings"]["maxspeed"].asInt(); long long waittime = Controller::Storage["push_settings"]["wait"].asInt(); long long curCount = 0; jsonForEach(Controller::Storage["autopushes"], it){ if (it->size() > 3 && (*it)[3u].asInt() < Util::epoch()){ INFO_MSG("Deleting autopush from %s to %s because end time passed", (*it)[0u].asStringRef().c_str(), (*it)[1u].asStringRef().c_str()); stopActivePushes((*it)[0u], (*it)[1u]); removePush(*it); break; } if (it->size() > 2 && *((*it)[0u].asStringRef().rbegin()) != '+'){ if ((*it)[2u].asInt() <= Util::epoch()){ std::string streamname = (*it)[0u]; std::string target = (*it)[1u]; if (!isPushActive(streamname, target)){ if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ waitingPushes[streamname].erase(target); if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} startPush(streamname, target); curCount++; } } } continue; } if (waittime || it->size() > 2){ const std::string &pStr = (*it)[0u].asStringRef(); std::set activeStreams = Controller::getActiveStreams(pStr); if (activeStreams.size()){ for (std::set::iterator jt = activeStreams.begin(); jt != activeStreams.end(); ++jt){ std::string streamname = *jt; std::string target = (*it)[1u]; if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ if (!isPushActive(streamname, target)){ if (waitingPushes[streamname][target]++ >= waittime && (curCount < maxspeed || !maxspeed)){ waitingPushes[streamname].erase(target); if (!waitingPushes[streamname].size()){waitingPushes.erase(streamname);} startPush(streamname, target); curCount++; } } } } } } if (it->size() == 3){ removePush(*it); break; } } if (mustWritePushList && pushPage.mapped){ writePushList(pushPage.mapped); mustWritePushList = false; } } Util::wait(1000); // wait at least a second } //keep the pushPage if we are restarting, so we can restore state from it if (Util::Config::is_restarting){ pushPage.master = false; //forget about all pushes, so they keep running for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ Util::Procs::forget(it->first); } } } /// Gives a list of all currently active pushes void listPush(JSON::Value &output){ output.null(); std::set toWipe; for (std::map::iterator it = activePushes.begin(); it != activePushes.end(); ++it){ if (Util::Procs::isActive(it->first)){ output.append(it->second); }else{ toWipe.insert(it->first); } } while (toWipe.size()){ activePushes.erase(*toWipe.begin()); mustWritePushList = true; toWipe.erase(toWipe.begin()); } } /// Adds a push to the list of auto-pushes. /// Auto-starts currently active matches immediately. void addPush(JSON::Value &request){ JSON::Value newPush; if (request.isArray()){ newPush = request; }else{ newPush.append(request["stream"]); newPush.append(request["target"]); bool startTime = false; if (request.isMember("scheduletime") && request["scheduletime"].isInt()){ newPush.append(request["scheduletime"]); startTime = true; } if (request.isMember("completetime") && request["completetime"].isInt()){ if (!startTime){newPush.append(0u);} newPush.append(request["completetime"]); } } long long epo = Util::epoch(); if (newPush.size() > 3 && newPush[3u].asInt() <= epo){ WARN_MSG("Automatic push not added: removal time is in the past! (%" PRId64 " <= %" PRIu64 ")", newPush[3u].asInt(), Util::epoch()); return; } bool edited = false; jsonForEach(Controller::Storage["autopushes"], it){ if ((*it)[0u] == newPush[0u] && (*it)[1u] == newPush[1u]){ (*it) = newPush; edited = true; } } if (!edited && (newPush.size() != 3 || newPush[2u].asInt() > epo)){ Controller::Storage["autopushes"].append(newPush); } if (newPush.size() < 3 || newPush[2u].asInt() <= epo){ if (newPush.size() > 2 && *(newPush[0u].asStringRef().rbegin()) != '+'){ std::string streamname = newPush[0u].asStringRef(); std::string target = newPush[1u].asStringRef(); startPush(streamname, target); return; } const std::string &pStr = newPush[0u].asStringRef(); std::set activeStreams = Controller::getActiveStreams(pStr); if (activeStreams.size()){ std::string target = newPush[1u].asStringRef(); for (std::set::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){ std::string streamname = *it; if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ std::string tmpName = streamname; std::string tmpTarget = target; startPush(tmpName, tmpTarget); } } } } } /// Removes a push from the list of auto-pushes. /// Does not stop currently active matching pushes. void removePush(const JSON::Value &request){ JSON::Value delPush; if (request.isString()){ removeAllPush(request.asStringRef()); return; } if (request.isArray()){ delPush = request; }else{ delPush.append(request["stream"]); delPush.append(request["target"]); } JSON::Value newautopushes; jsonForEach(Controller::Storage["autopushes"], it){ if ((*it) != delPush){newautopushes.append(*it);} } Controller::Storage["autopushes"] = newautopushes; } /// Removes a push from the list of auto-pushes. /// Does not stop currently active matching pushes. void removeAllPush(const std::string &streamname){ JSON::Value newautopushes; jsonForEach(Controller::Storage["autopushes"], it){ if ((*it)[0u] != streamname){newautopushes.append(*it);} } Controller::Storage["autopushes"] = newautopushes; } /// Starts all configured auto pushes for the given stream. void doAutoPush(std::string &streamname){ jsonForEach(Controller::Storage["autopushes"], it){ if (it->size() > 2 && (*it)[2u].asInt() < Util::epoch()){continue;} const std::string &pStr = (*it)[0u].asStringRef(); if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){ std::string stream = streamname; Util::sanitizeName(stream); std::string target = (*it)[1u]; startPush(stream, target); } } } void pushSettings(const JSON::Value &request, JSON::Value &response){ if (request.isObject()){ if (request.isMember("wait")){Controller::Storage["push_settings"]["wait"] = request["wait"].asInt();} if (request.isMember("maxspeed")){Controller::Storage["push_settings"]["maxspeed"] = request["maxspeed"].asInt();} } response = Controller::Storage["push_settings"]; } }