Removed metadata parsing from controller for better stability and faster stream handling.

# Conflicts:
#	src/controller/controller_streams.cpp
This commit is contained in:
Thulinma 2015-06-16 13:18:16 +02:00
parent ef29ce2ca6
commit a3354aeee6

View file

@ -57,25 +57,6 @@ namespace Controller {
} }
if (URL.substr(0, 1) != "/"){ if (URL.substr(0, 1) != "/"){
//push-style stream //push-style stream
if (hasViewers(name)){
data["meta"].null();
char streamPageName[NAME_BUFFER_SIZE];
snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, name.c_str());
IPC::sharedPage streamIndex(streamPageName, DEFAULT_META_PAGE_SIZE, false, false);
if (!streamIndex.mapped){
return;
}
unsigned int i = 0;
JSON::fromDTMI((const unsigned char*)streamIndex.mapped + 8, streamIndex.len - 8, i, data["meta"]);
if (data["meta"].isMember("tracks") && data["meta"]["tracks"].size()){
for(JSON::ObjIter trackIt = data["meta"]["tracks"].ObjBegin(); trackIt != data["meta"]["tracks"].ObjEnd(); trackIt++){
trackIt->second.removeMember("fragments");
trackIt->second.removeMember("keys");
trackIt->second.removeMember("keysizes");
trackIt->second.removeMember("parts");
}
}
}
return; return;
} }
if (URL.substr(0, 1) == "/"){ if (URL.substr(0, 1) == "/"){
@ -90,82 +71,6 @@ namespace Controller {
data["online"] = 0; data["online"] = 0;
return; return;
} }
bool getMeta = false;
if ( !data.isMember("l_meta") || fileinfo.st_mtime != data["l_meta"].asInt()){
DEBUG_MSG(DLVL_INSANE, "File for stream %s is newer than metadata - triggering reload", name.c_str());
getMeta = true;
data["l_meta"] = (long long)fileinfo.st_mtime;
}
if (stat((URL+".dtsh").c_str(), &fileinfo) == 0 && !S_ISDIR(fileinfo.st_mode)){
if ( !data.isMember("h_meta") || fileinfo.st_mtime != data["h_meta"].asInt()){
DEBUG_MSG(DLVL_INSANE, "DTSH for stream %s is newer than metadata - triggering reload", name.c_str());
getMeta = true;
data["h_meta"] = (long long)fileinfo.st_mtime;
}
}
if ( !getMeta && data.isMember("meta") && data["meta"].isMember("tracks")){
if (!data["meta"]["tracks"]){
data["error"] = "Stream offline: Corrupt file?";
if (data["error"].asStringRef() != prevState){
Log("WARN", "Source file " + URL + " seems to be corrupt.");
}
data["online"] = 0;
return;
}else{
if (!getMeta && data["meta"].packedSize() > 10*1024 * data["meta"]["tracks"].size()){
DEBUG_MSG(DLVL_WARN, "Metadata for stream %s is quite big (%u b) - assuming corruption and forcing reload", name.c_str(), data["meta"].packedSize());
getMeta = true;
}
}
}else{
DEBUG_MSG(DLVL_INSANE, "Invalid metadata (no tracks object) for stream %s - triggering reload", name.c_str());
getMeta = true;
}
if (getMeta){
// if the file isn't dtsc and there's no dtsh file, run getStream on it
// this guarantees that if the stream is playable, it now has a valid header.
DEBUG_MSG(DLVL_INSANE, "(re)loading metadata for stream %s", name.c_str());
Util::startInput(name);
DEBUG_MSG(DLVL_INSANE, "Waiting for stream %s to open...", name.c_str());
//wait for the stream
{
char streamPageName[NAME_BUFFER_SIZE];
snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, name.c_str());
IPC::sharedPage streamIndex(streamPageName, DEFAULT_META_PAGE_SIZE, false, false);
if (!streamIndex.mapped){
DEBUG_MSG(DLVL_INSANE, "Stream %s opening failed! Cancelling and marking as corrupt.", name.c_str());
data["meta"].null();
data["meta"]["tracks"].null();
data["error"] = "Stream offline: Corrupt file?";
if (data["error"].asStringRef() != prevState){
Log("WARN", "Source file " + URL + " seems to be corrupt.");
}
data["online"] = 0;
return;
}
unsigned int i = 0;
JSON::fromDTMI((const unsigned char*)streamIndex.mapped + 8, streamIndex.len - 8, i, data["meta"]);
if (data["meta"].isMember("tracks") && data["meta"]["tracks"].size()){
for(JSON::ObjIter trackIt = data["meta"]["tracks"].ObjBegin(); trackIt != data["meta"]["tracks"].ObjEnd(); trackIt++){
trackIt->second.removeMember("fragments");
trackIt->second.removeMember("keys");
trackIt->second.removeMember("keysizes");
trackIt->second.removeMember("parts");
trackIt->second.removeMember("ivecs");
}
}
if ( !data["meta"] || !data["meta"].isMember("tracks") || !data["meta"]["tracks"]){
data["error"] = "Stream offline: Corrupt file?";
if (data["error"].asStringRef() != prevState){
Log("WARN", "Source file " + URL + " seems to be corrupt.");
}
data["online"] = 0;
return;
}
DEBUG_MSG(DLVL_INSANE, "Metadata for stream %s (re)loaded", name.c_str());
}
DEBUG_MSG(DLVL_INSANE, "Stream %s opened", name.c_str());
}
if (!hasViewers(name)){ if (!hasViewers(name)){
if ( !data.isMember("error")){ if ( !data.isMember("error")){
data["error"] = "Available"; data["error"] = "Available";
@ -176,10 +81,6 @@ namespace Controller {
} }
return; return;
} }
/// \todo Implement ffmpeg pulling again?
//Util::Procs::Start(name, "ffmpeg -re -async 2 -i " + URL + " -f flv -", Util::getMyPath() + "MistFLV2DTSC", Util::getMyPath() + buffcmd);
//Log("BUFF", "(re)starting stream buffer " + name + " for ffmpeg data: ffmpeg -re -async 2 -i " + URL + " -f flv -");
//not recognized //not recognized
data["error"] = "Invalid source format"; data["error"] = "Invalid source format";
if (data["error"].asStringRef() != prevState){ if (data["error"].asStringRef() != prevState){
@ -211,33 +112,6 @@ namespace Controller {
// assume all is fine // assume all is fine
jit->second.removeMember("error"); jit->second.removeMember("error");
jit->second["online"] = 1; jit->second["online"] = 1;
// check if source is valid
//if (jit->second.isMember("live") && !jit->second.isMember("meta") || !jit->second["meta"]){
if ( (jit->second.isMember("meta") && !jit->second["meta"].isMember("tracks"))){
jit->second["online"] = 0;
jit->second["error"] = "No (valid) source connected ";
}else{
// for live streams, keep track of activity
if (jit->second["meta"].isMember("live")){
static std::map<std::string, liveCheck> checker;
//check H264 tracks for optimality
if (jit->second.isMember("meta") && jit->second["meta"].isMember("tracks")){
for (JSON::ObjIter trIt = jit->second["meta"]["tracks"].ObjBegin(); trIt != jit->second["meta"]["tracks"].ObjEnd(); trIt++){
if (trIt->second["lastms"].asInt() > checker[jit->first].lastms){
checker[jit->first].lastms = trIt->second["lastms"].asInt();
checker[jit->first].last_active = currTime;
}
}
}
// mark stream as offline if no activity for 5 seconds
//if (jit->second.isMember("last_active") && jit->second["last_active"].asInt() < currTime - 5){
if (checker[jit->first].last_active < currTime - 5){
jit->second["online"] = 2;
jit->second["error"] = "Source not active";
}
}
}
} }
} }
static JSON::Value strlist; static JSON::Value strlist;
@ -265,14 +139,8 @@ namespace Controller {
Log("STRM", std::string("Updated stream ") + jit->first); Log("STRM", std::string("Updated stream ") + jit->first);
} }
}else{ }else{
out[jit->first] = jit->second;
out[jit->first]["name"] = jit->first; out[jit->first]["name"] = jit->first;
out[jit->first]["source"] = jit->second["source"];
if (jit->second.isMember("DVR")){
out[jit->first]["DVR"] = jit->second["DVR"].asInt();
}
if (jit->second.isMember("cut")){
out[jit->first]["cut"] = jit->second["cut"].asInt();
}
Log("STRM", std::string("New stream ") + jit->first); Log("STRM", std::string("New stream ") + jit->first);
} }
} }