Added push-related API calls to controller, made outputs able to wait for playable streams.

This commit is contained in:
Thulinma 2016-05-10 00:18:30 +02:00
parent 861b62d47b
commit 263dee7b25
17 changed files with 329 additions and 457 deletions

View file

@ -14,6 +14,7 @@
/*LTS-START*/
#include "controller_updater.h"
#include "controller_limits.h"
#include "controller_push.h"
/*LTS-END*/
///\brief Check the submitted configuration and handle things accordingly.
@ -568,7 +569,53 @@ int Controller::handleAPIConnection(Socket::Connection & conn){
if (Request.isMember("stats_streams")){
Controller::fillActive(Request["stats_streams"], Response["stats_streams"]);
}
if (Request.isMember("push_start")){
std::string stream;
std::string target;
if (Request["push_start"].isArray()){
stream = Request["push_start"][0u].asStringRef();
target = Request["push_start"][1u].asStringRef();
}else{
stream = Request["push_start"]["stream"].asStringRef();
target = Request["push_start"]["target"].asStringRef();
}
Controller::startPush(stream, target);
}
if (Request.isMember("push_list")){
Controller::listPush(Response["push_list"]);
}
if (Request.isMember("push_stop")){
if (Request["push_stop"].isArray()){
jsonForEach(Request["push_stop"], it){
Controller::stopPush(it->asInt());
}
}else{
Controller::stopPush(Request["push_stop"].asInt());
}
}
if (Request.isMember("push_auto_add")){
Controller::addPush(Request["push_auto_add"]);
}
if (Request.isMember("push_auto_remove")){
if (Request["push_auto_remove"].isArray()){
jsonForEach(Request["push_auto_remove"], it){
Controller::removePush(*it);
}
}else{
Controller::removePush(Request["push_auto_remove"]);
}
}
if (Request.isMember("push_auto_list")){
Response["push_auto_list"] = Controller::Storage["autopushes"];
}
Controller::configChanged = true;
}else{//unauthorized

View file

@ -0,0 +1,120 @@
#include <string>
#include <mist/json.h>
#include <mist/config.h>
#include <mist/tinythread.h>
#include <mist/procs.h>
#include <mist/stream.h>
#include "controller_storage.h"
#include "controller_statistics.h"
namespace Controller {
/// Internal list of currently active pushes
std::map<pid_t, JSON::Value> activePushes;
/// Immediately starts a push for the given stream to the given target.
/// Simply calls Util::startPush and stores the resulting PID in the local activePushes map.
void startPush(std::string & stream, std::string & target){
pid_t ret = Util::startPush(stream, target);
if (ret){
JSON::Value push;
push.append((long long)ret);
push.append(stream);
push.append(target);
activePushes[ret] = push;
}
}
/// Immediately stops a push with the given ID
void stopPush(unsigned int ID){
Util::Procs::Stop(ID);
}
/// Gives a list of all currently active pushes
void listPush(JSON::Value & output){
output.null();
std::set<pid_t> toWipe;
for (std::map<pid_t, JSON::Value>::iterator it = activePushes.begin(); it != activePushes.end(); ++it){
if (Util::Procs::isActive(it->first)){
output.append(it->second);
}else{
toWipe.insert(it->first);
}
}
while (toWipe.size()){
activePushes.erase(*toWipe.begin());
toWipe.erase(toWipe.begin());
}
}
/// Adds a push to the list of auto-pushes.
/// Auto-starts currently active matches immediately.
void addPush(JSON::Value & request){
JSON::Value newPush;
if (request.isArray()){
newPush = request;
}else{
newPush.append(request["stream"]);
newPush.append(request["target"]);
}
Controller::Storage["autopushes"].append(newPush);
if (activeStreams.size()){
const std::string & pStr = newPush[0u].asStringRef();
std::string target = newPush[1u].asStringRef();
for (std::map<std::string, unsigned int>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
std::string streamname = it->first;
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
startPush(streamname, target);
}
}
}
}
/// Removes a push from the list of auto-pushes.
/// Does not stop currently active matching pushes.
void removePush(const JSON::Value & request){
JSON::Value delPush;
if (request.isString()){
return removePush(request.asStringRef());
}
if (request.isArray()){
delPush = request;
}else{
delPush.append(request["stream"]);
delPush.append(request["target"]);
}
JSON::Value newautopushes;
jsonForEach(Controller::Storage["autopushes"], it){
if ((*it) != delPush){
newautopushes.append(*it);
}
}
Controller::Storage["autopushes"] = newautopushes;
}
/// Removes a push from the list of auto-pushes.
/// Does not stop currently active matching pushes.
void removePush(const std::string & streamname){
JSON::Value newautopushes;
jsonForEach(Controller::Storage["autopushes"], it){
if ((*it)[0u] != streamname){
newautopushes.append(*it);
}
}
Controller::Storage["autopushes"] = newautopushes;
}
/// Starts all configured auto pushes for the given stream.
void doAutoPush(std::string & streamname){
jsonForEach(Controller::Storage["autopushes"], it){
const std::string & pStr = (*it)[0u].asStringRef();
if (pStr == streamname || (*pStr.rbegin() == '+' && streamname.substr(0, pStr.size()) == pStr)){
std::string stream = streamname;
std::string target = (*it)[1u];
startPush(stream, target);
}
}
}
}

View file

@ -0,0 +1,19 @@
#include <string>
#include <mist/json.h>
#include <mist/config.h>
#include <mist/tinythread.h>
namespace Controller {
//Functions for current pushes, start/stop/list
void startPush(std::string & streamname, std::string & target);
void stopPush(unsigned int ID);
void listPush(JSON::Value & output);
//Functions for automated pushes, add/remove
void addPush(JSON::Value & request);
void removePush(const JSON::Value & request);
void removePush(const std::string & streamname);
void doAutoPush(std::string & streamname);
}

View file

@ -6,6 +6,7 @@
#include <mist/dtsc.h>
#include "controller_statistics.h"
#include "controller_limits.h"
#include "controller_push.h"
#ifndef KILL_ON_EXIT
#define KILL_ON_EXIT false
@ -40,6 +41,7 @@ std::map<Controller::sessIndex, Controller::statSession> Controller::sessions; /
std::map<unsigned long, Controller::sessIndex> Controller::connToSession; ///< Map of socket IDs to session info.
bool Controller::killOnExit = KILL_ON_EXIT;
tthread::mutex Controller::statsMutex;
std::map<std::string, unsigned int> Controller::activeStreams;
//For server-wide totals. Local to this file only.
struct streamTotals {
@ -118,12 +120,26 @@ void Controller::killStatistics(char * data, size_t len, unsigned int id){
(*(data - 1)) = 128;//Send disconnect message;
}
///This function is ran whenever a stream becomes active.
void Controller::streamStarted(std::string stream){
INFO_MSG("Stream %s became active", stream.c_str());
Controller::doAutoPush(stream);
}
///This function is ran whenever a stream becomes active.
void Controller::streamStopped(std::string stream){
INFO_MSG("Stream %s became inactive", stream.c_str());
}
/// This function runs as a thread and roughly once per second retrieves
/// statistics from all connected clients, as well as wipes
/// old statistics that have disconnected over 10 minutes ago.
void Controller::SharedMemStats(void * config){
DEBUG_MSG(DLVL_HIGH, "Starting stats thread");
IPC::sharedServer statServer(SHM_STATISTICS, STAT_EX_SIZE, true);
std::set<std::string> inactiveStreams;
while(((Util::Config*)config)->is_active){
{
tthread::lock_guard<tthread::mutex> guard(statsMutex);
@ -144,6 +160,18 @@ void Controller::SharedMemStats(void * config){
mustWipe.pop_front();
}
}
if (activeStreams.size()){
for (std::map<std::string, unsigned int>::iterator it = activeStreams.begin(); it != activeStreams.end(); ++it){
if (++it->second > 1){
streamStopped(it->first);
inactiveStreams.insert(it->first);
}
}
while (inactiveStreams.size()){
activeStreams.erase(*inactiveStreams.begin());
inactiveStreams.erase(inactiveStreams.begin());
}
}
Controller::checkServerLimits(); /*LTS*/
}
Util::sleep(1000);
@ -571,6 +599,14 @@ void Controller::parseStatistics(char * data, size_t len, unsigned int id){
//the data is no longer valid - connection has gone away, store for later
sessions[idx].finish(id);
connToSession.erase(id);
}else{
std::string strmName = tmpEx.streamName();
if (strmName.size()){
if (!activeStreams.count(strmName)){
streamStarted(strmName);
}
activeStreams[strmName] = 0;
}
}
/*LTS-START*/
//if (counter < 125 && Controller::isBlacklisted(tmpEx.host(), ID, tmpEx.time())){

View file

@ -1,3 +1,4 @@
#pragma once
#include <mist/shared_memory.h>
#include <mist/timing.h>
#include <mist/defines.h>
@ -15,6 +16,13 @@
namespace Controller {
extern bool killOnExit;
//These functions keep track of which streams are currently active.
extern std::map<std::string, unsigned int> activeStreams;
///This function is ran whenever a stream becomes active.
void streamStarted(std::string stream);
///This function is ran whenever a stream becomes active.
void streamStopped(std::string stream);
struct statLog {
long time;