JSON-based input selection.

This commit is contained in:
Thulinma 2014-10-02 15:41:50 +02:00
parent d85fe140ca
commit 8542281ac2
13 changed files with 279 additions and 215 deletions

View file

@ -31,7 +31,7 @@ namespace Mist {
JSON::Value option;
option["long"] = "json";
option["short"] = "j";
option["help"] = "Output MistIn info in JSON format, then exit.";
option["help"] = "Output MistIn info in JSON format, then exit";
option["value"].append(0ll);
config->addOption("json", option);
option.null();
@ -50,13 +50,13 @@ namespace Mist {
option["arg"] = "string";
option["short"] = "s";
option["long"] = "stream";
option["help"] = "The name of the stream that this connector will transmit.";
option["help"] = "The name of the stream that this connector will provide in player mode";
config->addOption("streamname", option);
option.null();
option["short"] = "p";
option["long"] = "player";
option["help"] = "Makes this connector into a player";
config->addOption("player", option);
capa["optional"]["debug"]["name"] = "debug";
capa["optional"]["debug"]["help"] = "The debug level at which messages need to be printed.";
capa["optional"]["debug"]["option"] = "--debug";
capa["optional"]["debug"]["type"] = "uint";
packTime = 0;
lastActive = Util::epoch();
@ -99,7 +99,7 @@ namespace Mist {
int Input::run() {
if (config->getBool("json")) {
std::cerr << capa.toString() << std::endl;
std::cout << capa.toString() << std::endl;
return 0;
}
if (!setup()) {
@ -113,7 +113,7 @@ namespace Mist {
}
parseHeader();
if (!config->getBool("player")){
if (!config->getString("streamname").size()){
//check filename for no -
if (config->getString("output") != "-"){
std::string filename = config->getString("output");

View file

@ -14,26 +14,25 @@
namespace Mist {
inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) {
capa["name"] = "Buffer";
JSON::Value option;
option["arg"] = "integer";
option["long"] = "buffer";
option["short"] = "b";
option["help"] = "Buffertime for this stream.";
option["help"] = "DVR buffer time in ms";
option["value"].append(30000LL);
config->addOption("bufferTime", option);
capa["desc"] = "Enables buffered live input";
capa["optional"]["DVR"]["name"] = "Buffer time (ms)";
capa["optional"]["DVR"]["help"] = "The target available buffer time for this live stream, in milliseconds. This is the time available to seek around in, and will automatically be extended to fit whole keyframes.";
capa["optional"]["DVR"]["option"] = "--buffer";
capa["optional"]["DVR"]["type"] = "uint";
capa["optional"]["DVR"]["default"] = 30000LL;
capa["source_match"] = "push://*";
capa["priority"] = 9ll;
capa["desc"] = "Provides buffered live input";
capa["codecs"][0u][0u].append("*");
capa["codecs"][0u][1u].append("*");
capa["codecs"][0u][2u].append("*");
capa["codecs"][0u][3u].append("*");
capa["codecs"][0u][4u].append("*");
capa["codecs"][0u][5u].append("*");
capa["codecs"][0u][6u].append("*");
capa["codecs"][0u][7u].append("*");
capa["codecs"][0u][8u].append("*");
capa["codecs"][0u][9u].append("*");
DEBUG_MSG(DLVL_DEVEL, "Started MistInBuffer");
isBuffer = true;
singleton = this;
bufferTime = 0;
@ -41,6 +40,15 @@ namespace Mist {
}
inputBuffer::~inputBuffer(){
if (myMeta.tracks.size()){
DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes");
for(std::map<int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
while (removeKey(it->first)){}
}
}
}
void inputBuffer::updateMeta(){
long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int lastms = 0;
@ -55,15 +63,18 @@ namespace Mist {
myMeta.bufferWindow = lastms - firstms;
myMeta.vod = false;
myMeta.live = true;
myMeta.writeTo(metaPage.mapped);
IPC::semaphore liveMeta(std::string("liveMeta@" + config->getString("streamname")).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.wait();
myMeta.writeTo(metaPage.mapped);
memset(metaPage.mapped+myMeta.getSendLen(), 0, metaPage.len > myMeta.getSendLen() ? std::min(metaPage.len-myMeta.getSendLen(), 4ll) : 0);
liveMeta.post();
}
bool inputBuffer::removeKey(unsigned int tid){
if (myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2){
if ((myMeta.tracks[tid].keys.size() < 2 || myMeta.tracks[tid].fragments.size() < 2) && config->is_active){
return false;
}
if (!myMeta.tracks[tid].keys.size()){
return false;
}
DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%d", tid, myMeta.tracks[tid].keys[0].getNumber());
@ -144,16 +155,19 @@ namespace Mist {
//Skip value 0 as this indicates an empty track
continue;
}
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
if (negotiateTracks.count(value)){
negotiateTracks.erase(value);
metaPages.erase(value);
if (pushedLoc[value] == thisData){
if (counter == 126 || counter == 127 || counter == 254 || counter == 255){
pushedLoc.erase(value);
if (negotiateTracks.count(value)){
negotiateTracks.erase(value);
metaPages.erase(value);
}
if (data[4] == 0xFF && data[5] == 0xFF && givenTracks.count(value)){
givenTracks.erase(value);
inputLoc.erase(value);
}
continue;
}
if (data[4] == 0xFF && data[5] == 0xFF && givenTracks.count(value)){
givenTracks.erase(value);
inputLoc.erase(value);
}
continue;
}
if (value & 0x80000000){
//Track is set to "New track request", assign new track id and create shared memory page
@ -226,6 +240,7 @@ namespace Mist {
}
}
givenTracks.insert(finalMap);
pushedLoc[finalMap] = thisData;
if (!myMeta.tracks.count(finalMap)){
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = tmpMeta.tracks.begin()->second;
@ -249,7 +264,7 @@ namespace Mist {
}
}
}
if (givenTracks.count(value)){
if (givenTracks.count(value) && pushedLoc[value] == thisData){
//First check if the previous page has been finished:
if (!inputLoc[value].count(dataPages[value].rbegin()->first) || !inputLoc[value][dataPages[value].rbegin()->first].curOffset){
if (dataPages[value].size() > 1){
@ -307,16 +322,21 @@ namespace Mist {
if (!bufferTime){
bufferTime = config->getInteger("bufferTime");
}
JSON::Value servConf = JSON::fromFile(Util::getTmpFolder() + "streamlist");
if (servConf.isMember("streams") && servConf["streams"].isMember(config->getString("streamname"))){
JSON::Value & streamConfig = servConf["streams"][config->getString("streamname")];
if (streamConfig.isMember("DVR") && streamConfig["DVR"].asInt()){
if (bufferTime != streamConfig["DVR"].asInt()){
DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, streamConfig["DVR"].asInt());
bufferTime = streamConfig["DVR"].asInt();
}
IPC::sharedPage serverCfg("!mistConfig", 4*1024*1024); ///< Contains server configuration and capabilities
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(config->getString("streamname"));
if (streamCfg && streamCfg.getMember("DVR")){
long long bufTime = streamCfg.getMember("DVR").asInt();
if (bufferTime != bufTime){
DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, bufTime);
bufferTime = bufTime;
}
}
configLock.post();
configLock.close();
return true;
}

View file

@ -6,6 +6,7 @@ namespace Mist {
class inputBuffer : public Input {
public:
inputBuffer(Util::Config * cfg);
~inputBuffer();
private:
unsigned int bufferTime;
unsigned int cutTime;
@ -26,6 +27,7 @@ namespace Mist {
std::map<unsigned long, IPC::sharedPage> metaPages;
///Maps trackid to a pagenum->pageData map
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > inputLoc;
std::map<unsigned long, char *> pushedLoc;
inputBuffer * singleton;
};
}

View file

@ -11,7 +11,10 @@
namespace Mist {
inputDTSC::inputDTSC(Util::Config * cfg) : Input(cfg) {
capa["name"] = "DTSC";
capa["decs"] = "Enables DTSC Input";
capa["priority"] = 9ll;
capa["source_match"] = "/*.dtsc";
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("H263");
capa["codecs"][0u][0u].append("VP6");
@ -26,7 +29,7 @@ namespace Mist {
std::cerr << "Input from stdin not yet supported" << std::endl;
return false;
}
if (!config->getBool("player")){
if (!config->getString("streamname").size()){
if (config->getString("output") == "-") {
std::cerr << "Output to stdout not yet supported" << std::endl;
return false;

View file

@ -13,7 +13,10 @@
namespace Mist {
inputFLV::inputFLV(Util::Config * cfg) : Input(cfg) {
capa["name"] = "FLV";
capa["decs"] = "Enables FLV Input";
capa["source_match"] = "/*.flv";
capa["priority"] = 9ll;
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("H263");
capa["codecs"][0u][0u].append("VP6");
@ -26,7 +29,7 @@ namespace Mist {
std::cerr << "Input from stdin not yet supported" << std::endl;
return false;
}
if (!config->getBool("player")){
if (!config->getString("streamname").size()){
if (config->getString("output") == "-") {
std::cerr << "Output to stdout not yet supported" << std::endl;
return false;

View file

@ -14,6 +14,7 @@
namespace Mist {
inputOGG::inputOGG(Util::Config * cfg) : Input(cfg) {
capa["name"] = "OGG";
capa["decs"] = "Enables OGG Input";
capa["codecs"][0u][0u].append("theora");
capa["codecs"][0u][1u].append("vorbis");
@ -24,7 +25,7 @@ namespace Mist {
std::cerr << "Input from stdin not yet supported" << std::endl;
return false;
}
if (!config->getBool("player")){
if (!config->getString("streamname").size()){
if (config->getString("output") == "-") {
std::cerr << "Output to stdout not yet supported" << std::endl;
return false;