Removed metadata parsing from controller for better stability and faster stream handling.

This commit is contained in:
Thulinma 2015-06-16 11:18:00 +02:00
parent c04572f685
commit 036178f780

View file

@ -58,26 +58,6 @@ namespace Controller {
}
if (URL.substr(0, 1) != "/"){
//push-style stream
if (hasViewers(name)){
data["meta"].null();
char streamPageName[NAME_BUFFER_SIZE];
snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, name.c_str());
IPC::sharedPage streamIndex(streamPageName, DEFAULT_META_PAGE_SIZE, false, false);
if (!streamIndex.mapped){
return;
}
unsigned int i = 0;
JSON::fromDTMI((const unsigned char*)streamIndex.mapped + 8, streamIndex.len - 8, i, data["meta"]);
if (data["meta"].isMember("tracks") && data["meta"]["tracks"].size()){
for(JSON::ObjIter trackIt = data["meta"]["tracks"].ObjBegin(); trackIt != data["meta"]["tracks"].ObjEnd(); trackIt++){
trackIt->second.removeMember("fragments");
trackIt->second.removeMember("keys");
trackIt->second.removeMember("keysizes");
trackIt->second.removeMember("parts");
trackIt->second.removeMember("ivecs");/*LTS*/
}
}
}
return;
}
if (URL.substr(0, 1) == "/"){
@ -92,85 +72,6 @@ namespace Controller {
data["online"] = 0;
return;
}
bool getMeta = false;
if ( !data.isMember("l_meta") || fileinfo.st_mtime != data["l_meta"].asInt()){
DEBUG_MSG(DLVL_INSANE, "File for stream %s is newer than metadata - triggering reload", name.c_str());
getMeta = true;
data["l_meta"] = (long long)fileinfo.st_mtime;
}
if (stat((URL+".dtsh").c_str(), &fileinfo) == 0 && !S_ISDIR(fileinfo.st_mode)){
if ( !data.isMember("h_meta") || fileinfo.st_mtime != data["h_meta"].asInt()){
DEBUG_MSG(DLVL_INSANE, "DTSH for stream %s is newer than metadata - triggering reload", name.c_str());
getMeta = true;
data["h_meta"] = (long long)fileinfo.st_mtime;
}
}
if ( !getMeta && data.isMember("meta") && data["meta"].isMember("tracks")){
if (!data["meta"]["tracks"]){
data["error"] = "Stream offline: Corrupt file?";
if (data["error"].asStringRef() != prevState){
Log("WARN", "Source file " + URL + " seems to be corrupt.");
}
data["online"] = 0;
return;
}else{
if (!getMeta && data["meta"].packedSize() > 10*1024 * data["meta"]["tracks"].size()){
DEBUG_MSG(DLVL_WARN, "Metadata for stream %s is quite big (%u b) - assuming corruption and forcing reload", name.c_str(), data["meta"].packedSize());
getMeta = true;
}
}
}else{
DEBUG_MSG(DLVL_INSANE, "Invalid metadata (no tracks object) for stream %s - triggering reload", name.c_str());
getMeta = true;
}
if (*(URL.rbegin()) == '/'){
getMeta = false;
}
if (getMeta){
// if the file isn't dtsc and there's no dtsh file, run getStream on it
// this guarantees that if the stream is playable, it now has a valid header.
DEBUG_MSG(DLVL_INSANE, "(re)loading metadata for stream %s", name.c_str());
Util::startInput(name);
DEBUG_MSG(DLVL_INSANE, "Waiting for stream %s to open...", name.c_str());
//wait for the stream
{
char streamPageName[NAME_BUFFER_SIZE];
snprintf(streamPageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, name.c_str());
IPC::sharedPage streamIndex(streamPageName, DEFAULT_META_PAGE_SIZE, false, false);
if (!streamIndex.mapped){
DEBUG_MSG(DLVL_INSANE, "Stream %s opening failed! Cancelling and marking as corrupt.", name.c_str());
data["meta"].null();
data["meta"]["tracks"].null();
data["error"] = "Stream offline: Corrupt file?";
if (data["error"].asStringRef() != prevState){
Log("WARN", "Source file " + URL + " seems to be corrupt.");
}
data["online"] = 0;
return;
}
unsigned int i = 0;
JSON::fromDTMI((const unsigned char*)streamIndex.mapped + 8, streamIndex.len - 8, i, data["meta"]);
if (data["meta"].isMember("tracks") && data["meta"]["tracks"].size()){
for(JSON::ObjIter trackIt = data["meta"]["tracks"].ObjBegin(); trackIt != data["meta"]["tracks"].ObjEnd(); trackIt++){
trackIt->second.removeMember("fragments");
trackIt->second.removeMember("keys");
trackIt->second.removeMember("keysizes");
trackIt->second.removeMember("parts");
trackIt->second.removeMember("ivecs");
}
}
if ( !data["meta"] || !data["meta"].isMember("tracks") || !data["meta"]["tracks"]){
data["error"] = "Stream offline: Corrupt file?";
if (data["error"].asStringRef() != prevState){
Log("WARN", "Source file " + URL + " seems to be corrupt.");
}
data["online"] = 0;
return;
}
DEBUG_MSG(DLVL_INSANE, "Metadata for stream %s (re)loaded", name.c_str());
}
DEBUG_MSG(DLVL_INSANE, "Stream %s opened", name.c_str());
}
if (!hasViewers(name)){
if ( !data.isMember("error")){
data["error"] = "Available";
@ -182,10 +83,6 @@ namespace Controller {
checkServerLimits(); /*LTS*/
return;
}
/// \todo Implement ffmpeg pulling again?
//Util::Procs::Start(name, "ffmpeg -re -async 2 -i " + URL + " -f flv -", Util::getMyPath() + "MistFLV2DTSC", Util::getMyPath() + buffcmd);
//Log("BUFF", "(re)starting stream buffer " + name + " for ffmpeg data: ffmpeg -re -async 2 -i " + URL + " -f flv -");
//not recognized
data["error"] = "Invalid source format";
if (data["error"].asStringRef() != prevState){
@ -218,37 +115,6 @@ namespace Controller {
// assume all is fine
jit->second.removeMember("error");
jit->second["online"] = 1;
// check if source is valid
//if (jit->second.isMember("live") && !jit->second.isMember("meta") || !jit->second["meta"]){
if ( (jit->second.isMember("meta") && !jit->second["meta"].isMember("tracks"))){
jit->second["online"] = 0;
jit->second["error"] = "No (valid) source connected ";
}else{
// for live streams, keep track of activity
if (jit->second.isMember("meta") && jit->second["meta"].isMember("live")){
static std::map<std::string, liveCheck> checker;
//check H264 tracks for optimality
if (jit->second.isMember("meta") && jit->second["meta"].isMember("tracks")){
for (JSON::ObjIter trIt = jit->second["meta"]["tracks"].ObjBegin(); trIt != jit->second["meta"]["tracks"].ObjEnd(); trIt++){
if (trIt->second["lastms"].asInt() > checker[jit->first].lastms){
checker[jit->first].lastms = trIt->second["lastms"].asInt();
checker[jit->first].last_active = currTime;
}
/*LTS-START*/
if (trIt->second["firstms"].asInt() > Storage["streams"][jit->first]["cut"].asInt()){
Storage["streams"][jit->first].removeMember("cut");
}
/*LTS-END*/
}
}
// mark stream as offline if no activity for 5 seconds
//if (jit->second.isMember("last_active") && jit->second["last_active"].asInt() < currTime - 5){
if (checker[jit->first].last_active < currTime - 5){
jit->second["online"] = 2;
jit->second["error"] = "Source not active";
}
}
}
}
}
static JSON::Value strlist;