From 776cfe1850b844210de0727e63ec0e5d35b714b9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Thu, 11 Oct 2018 17:51:35 +0200 Subject: [PATCH] Backported various Pro edition changes and general code to Free edition --- src/input/input.cpp | 15 +++--- src/input/input_buffer.cpp | 13 +++++ src/output/output.cpp | 48 +++++++++++++++--- src/output/output.h | 1 + src/output/output_progressive_mp4.cpp | 72 +++++++++++++++++++-------- src/output/output_progressive_mp4.h | 1 + src/output/output_rtmp.cpp | 64 +++--------------------- 7 files changed, 123 insertions(+), 91 deletions(-) diff --git a/src/input/input.cpp b/src/input/input.cpp index c44f22d8..5d50cb7f 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -263,6 +263,7 @@ namespace Mist { ///Checks in the server configuration if this stream is set to always on or not. /// Returns true if it is, or if the stream could not be found in the configuration. + /// If the compiled default debug level is < INFO, instead returns false if the stream is not found. bool Input::isAlwaysOn(){ bool ret = true; std::string strName = streamName.substr(0, (streamName.find_first_of("+ "))); @@ -274,6 +275,10 @@ namespace Mist { if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){ ret = false; } + }else{ +#if DEBUG < DLVL_DEVEL + ret = false; +#endif } configLock.post(); return ret; @@ -331,6 +336,10 @@ namespace Mist { // - INPUT_TIMEOUT seconds haven't passed yet, // - this is a live stream and at least two of the biggest fragment haven't passed yet, bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500))); + if (!ret && config->is_active && isAlwaysOn()){ + ret = true; + activityCounter = Util::bootSecs(); + } return ret; } @@ -473,12 +482,6 @@ namespace Mist { if (!it2->second){ bufferRemove(it->first, it2->first); pageCounter[it->first].erase(it2->first); - for (int i = 0; i < 8192; i += 8){ - unsigned int thisKeyNum = ntohl(((((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF); - if (thisKeyNum == it2->first){ - (((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) = 0; - } - } change = true; break; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 95c59f59..027da501 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -351,6 +351,19 @@ namespace Mist { bufferLocations[tid].erase(bufferLocations[tid].begin()); } if (pushLocation.count(it->first)){ + // \todo Debugger says this is null sometimes. It shouldn't be. Figure out why! + // For now, this if will prevent crashes in these cases. + if (pushLocation[it->first]){ + //Reset the userpage, to allow repushing from TS + IPC::userConnection userConn(pushLocation[it->first]); + for (int i = 0; i < SIMUL_TRACKS; i++) { + if (userConn.getTrackId(i) == it->first) { + userConn.setTrackId(i, 0); + userConn.setKeynum(i, 0); + break; + } + } + } pushLocation.erase(it->first); } nProxy.curPageNum.erase(it->first); diff --git a/src/output/output.cpp b/src/output/output.cpp index c260df6f..0bcc0bb0 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -74,6 +74,12 @@ namespace Mist{ DEBUG_MSG(DLVL_WARN, "Warning: MistOut created with closed socket!"); } sentHeader = false; + + //If we have a streamname option, set internal streamname to that option + if (!streamName.size() && config->hasOption("streamname")){ + streamName = config->getString("streamname"); + } + } void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){ @@ -161,7 +167,9 @@ namespace Mist{ } bool Output::isReadyForPlay(){ - if (isPushing()){return true;} + static bool recursing = false; + if (isPushing() || recursing){return true;} + recursing = true; if (!isInitialized){initialize();} if (!myMeta.tracks.size()){updateMeta();} if (myMeta.tracks.size()){ @@ -170,6 +178,7 @@ namespace Mist{ } unsigned int mainTrack = getMainSelectedTrack(); if (mainTrack && myMeta.tracks.count(mainTrack) && (myMeta.tracks[mainTrack].keys.size() >= 2 || myMeta.tracks[mainTrack].lastms - myMeta.tracks[mainTrack].firstms > 5000)){ + recursing = false; return true; }else{ HIGH_MSG("NOT READY YET (%lu tracks, %lu = %lu keys)", myMeta.tracks.size(), getMainSelectedTrack(), myMeta.tracks[getMainSelectedTrack()].keys.size()); @@ -177,6 +186,7 @@ namespace Mist{ }else{ HIGH_MSG("NOT READY YET (%lu tracks)", myMeta.tracks.size()); } + recursing = false; return false; } @@ -572,11 +582,31 @@ namespace Mist{ return start; } - ///Return the end time of the selected tracks, or 0 if unknown or live. + ///Return the end time of the selected tracks, or 0 if unknown. ///Returns the end time of latest track if nothing is selected. ///Returns zero if no tracks exist. uint64_t Output::endTime(){ - if (myMeta.live){return 0;} + if (!myMeta.tracks.size()){return 0;} + uint64_t end = 0; + if (selectedTracks.size()){ + for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ + if (myMeta.tracks.count(*it)){ + if (end < myMeta.tracks[*it].lastms){end = myMeta.tracks[*it].lastms;} + } + } + }else{ + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + if (end < it->second.lastms){end = it->second.lastms;} + } + } + return end; + } + + ///Return the most live time stamp of the selected tracks, or 0 if unknown or non-live. + ///Returns the time stamp of the newest track if nothing is selected. + ///Returns zero if no tracks exist. + uint64_t Output::liveTime(){ + if (!myMeta.live){return 0;} if (!myMeta.tracks.size()){return 0;} uint64_t end = 0; if (selectedTracks.size()){ @@ -944,6 +974,10 @@ namespace Mist{ dropTrack(nxt.tid, "timeless empty packet"); return false; } + //for VoD, check if we've reached the end of the track, if so, drop it + if (myMeta.vod && nxt.time > myMeta.tracks[nxt.tid].lastms){ + dropTrack(nxt.tid, "Reached end of track"); + } //if this is a live stream, we might have just reached the live point. //check where the next key is nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time); @@ -954,8 +988,8 @@ namespace Mist{ if (++emptyCount < 100){ Util::wait(250); //we're waiting for new data to show up - if (emptyCount % 8 == 0){ - reconnect();//reconnect every 2 seconds + if (emptyCount % 64 == 0){ + reconnect();//reconnect every 16 seconds }else{ //updating meta is only useful with live streams if (myMeta.live && emptyCount % 4 == 0){ @@ -1097,7 +1131,7 @@ namespace Mist{ if (now == lastStats && !force){return;} lastStats = now; - EXTREME_MSG("Writing stats: %s, %s, %lu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu); + HIGH_MSG("Writing stats: %s, %s, %lu, %llu, %llu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown()); if (statsPage.getData()){ IPC::statExchange tmpEx(statsPage.getData()); tmpEx.now(now); @@ -1198,6 +1232,7 @@ namespace Mist{ return false; } close(outFile); + sought = false; return true; } @@ -1248,6 +1283,7 @@ namespace Mist{ Util::wait(1000); streamStatus = Util::getStreamStatus(streamName); if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){ + INFO_MSG("Reconnecting to %s buffer... (%u)", streamName.c_str(), streamStatus); reconnect(); streamStatus = Util::getStreamStatus(streamName); } diff --git a/src/output/output.h b/src/output/output.h index d6daab76..009cc6d3 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -49,6 +49,7 @@ namespace Mist { uint64_t currentTime(); uint64_t startTime(); uint64_t endTime(); + uint64_t liveTime(); void setBlocking(bool blocking); void updateMeta(); bool selectDefaultTracks(); diff --git a/src/output/output_progressive_mp4.cpp b/src/output/output_progressive_mp4.cpp index ed690583..fe61fbe6 100644 --- a/src/output/output_progressive_mp4.cpp +++ b/src/output/output_progressive_mp4.cpp @@ -82,7 +82,10 @@ namespace Mist { tmpRes += 16//SMHD Box + 16//STSD + 36//MP4A - + 37 + thisTrack.init.size();//ESDS + + 35; + if (thisTrack.init.size()){ + tmpRes += 2 + thisTrack.init.size();//ESDS + } } //Unfortunately, for our STTS and CTTS boxes, we need to loop through all parts of the track @@ -117,6 +120,7 @@ namespace Mist { } res += 8; //mdat beginning fileSize += res; + MEDIUM_MSG("H size %llu, file: %llu", res, fileSize); return res; } @@ -148,6 +152,7 @@ namespace Mist { //Construct with duration of -1 MP4::MVHD mvhdBox(-1); //Then override it to set the correct duration + uint64_t fms; uint64_t firstms = 0xFFFFFFFFFFFFFFull; uint64_t lastms = 0; for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ @@ -155,6 +160,7 @@ namespace Mist { firstms = std::min(firstms, (uint64_t)myMeta.tracks[*it].firstms); } mvhdBox.setDuration(lastms - firstms); + fms = firstms; //Set the trackid for the first "empty" track within the file. mvhdBox.setTrackID(selectedTracks.size() + 1); moovBox.setContent(mvhdBox, moovOffset++); @@ -176,11 +182,25 @@ namespace Mist { MP4::ELST elstBox; elstBox.setVersion(0); elstBox.setFlags(0); - elstBox.setCount(1); - elstBox.setSegmentDuration(0, tDuration); - elstBox.setMediaTime(0, 0); - elstBox.setMediaRateInteger(0, 1); - elstBox.setMediaRateFraction(0, 0); + if (myMeta.vod && thisTrack.firstms != fms){ + elstBox.setCount(2); + + elstBox.setSegmentDuration(0, thisTrack.firstms - fms); + elstBox.setMediaTime(0, 0xFFFFFFFFull); + elstBox.setMediaRateInteger(0, 0); + elstBox.setMediaRateFraction(0, 0); + + elstBox.setSegmentDuration(1, tDuration); + elstBox.setMediaTime(1, 0); + elstBox.setMediaRateInteger(1, 1); + elstBox.setMediaRateFraction(1, 0); + }else{ + elstBox.setCount(1); + elstBox.setSegmentDuration(0, tDuration); + elstBox.setMediaTime(0, 0); + elstBox.setMediaRateInteger(0, 1); + elstBox.setMediaRateFraction(0, 0); + } edtsBox.setContent(elstBox, 0); trakBox.setContent(edtsBox, trakOffset++); @@ -198,6 +218,9 @@ namespace Mist { MP4::MINF minfBox; size_t minfOffset = 0; + MP4::STBL stblBox; + unsigned int stblOffset = 0; + //Add a track-type specific box to the MINF box if (thisTrack.type == "video") { MP4::VMHD vmhdBox; @@ -214,10 +237,6 @@ namespace Mist { dinfBox.setContent(drefBox, 0); minfBox.setContent(dinfBox, minfOffset++); - - MP4::STBL stblBox; - size_t stblOffset = 0; - //Add STSD box MP4::STSD stsdBox(0); if (thisTrack.type == "video") { @@ -346,10 +365,12 @@ namespace Mist { //Current values are actual byte offset without header-sized offset std::set sortSet;//filling sortset for interleaving parts for (std::set::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) { + DTSC::Track & thisTrack = myMeta.tracks[*subIt]; keyPart temp; temp.trackID = *subIt; - temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame + temp.time = thisTrack.firstms;//timeplace of frame temp.index = 0; + temp.size = thisTrack.parts[0].getDuration(); HIGH_MSG("Header sortSet: tid %lu time %lu", temp.trackID, temp.time); sortSet.insert(temp); } @@ -372,6 +393,7 @@ namespace Mist { if (temp.index + 1< thisTrack.parts.size()) {//Only create new element, when there are new elements to be added temp.time += thisTrack.parts[temp.index].getDuration(); ++temp.index; + temp.size = thisTrack.parts[temp.index].getSize(); sortSet.insert(temp); } } @@ -384,8 +406,9 @@ namespace Mist { if (mdatSize < 0xFFFFFFFF){ Bit::htobl(mdatHeader, mdatSize); } - header << std::string(mdatHeader, 8); + header.write(mdatHeader, 8); size += header.str().size(); + MEDIUM_MSG("Header %llu, file: %llu", header.str().size(), size); return header.str(); } @@ -425,6 +448,7 @@ namespace Mist { if (temp.index + 1 < myMeta.tracks[temp.trackID].parts.size()){ //only insert when there are parts left temp.time += thisTrack.parts[temp.index].getDuration(); ++temp.index; + temp.size = thisTrack.parts[temp.index].getSize(); sortSet.insert(temp); } //Remove just-parsed element @@ -470,10 +494,12 @@ namespace Mist { currPos = 0; sortSet.clear(); for (std::set::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) { + DTSC::Track & thisTrack = myMeta.tracks[*subIt]; keyPart temp; temp.trackID = *subIt; - temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame + temp.time = thisTrack.firstms;//timeplace of frame temp.index = 0; + temp.size = thisTrack.parts[temp.index].getSize(); sortSet.insert(temp); } if (H.GetHeader("Range") != ""){ @@ -517,12 +543,6 @@ namespace Mist { //HTTP_S.StartResponse(HTTP_R, conn); } leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data - if (byteStart < headerSize) { - std::string headerData = DTSCMeta2MP4Header(fileSize); - myConn.SendNow(headerData.data() + byteStart, std::min(headerSize, byteEnd) - byteStart); //send MP4 header - leftOver -= std::min(headerSize, byteEnd) - byteStart; - } - currPos += headerSize;//we're now guaranteed to be past the header point, no matter what } void OutProgressiveMP4::sendNext() { @@ -534,8 +554,7 @@ namespace Mist { thisPacket.getString("data", dataPointer, len); keyPart thisPart = *sortSet.begin(); - uint64_t thisSize = myMeta.tracks[thisPart.trackID].parts[thisPart.index].getSize(); - if ((unsigned long)thisPacket.getTrackId() != thisPart.trackID || thisPacket.getTime() != thisPart.time || len != thisSize){ + if ((unsigned long)thisPacket.getTrackId() != thisPart.trackID || thisPacket.getTime() != thisPart.time || len != thisPart.size){ if (thisPacket.getTime() > sortSet.begin()->time || thisPacket.getTrackId() > sortSet.begin()->trackID) { if (perfect) { WARN_MSG("Warning: input is inconsistent. Expected %lu:%lu but got %ld:%llu - cancelling playback", thisPart.trackID, thisPart.time, thisPacket.getTrackId(), thisPacket.getTime()); @@ -543,7 +562,7 @@ namespace Mist { myConn.close(); } } else { - WARN_MSG("Did not receive expected %lu:%lu (%lub) but got %ld:%llu (%ub) - throwing it away", thisPart.trackID, thisPart.time, thisSize, thisPacket.getTrackId(), thisPacket.getTime(), len); + WARN_MSG("Did not receive expected %lu:%lu (%lub) but got %ld:%llu (%ub) - throwing it away", thisPart.trackID, thisPart.time, thisPart.size, thisPacket.getTrackId(), thisPacket.getTime(), len); } return; } @@ -571,6 +590,7 @@ namespace Mist { if (temp.index + 1 < thisTrack.parts.size()) { //only insert when there are parts left temp.time += thisTrack.parts[temp.index].getDuration(); ++temp.index; + temp.size = thisTrack.parts[temp.index].getSize(); sortSet.insert(temp); } @@ -584,6 +604,14 @@ namespace Mist { } void OutProgressiveMP4::sendHeader(){ + //Send the header data + uint64_t headerSize = mp4HeaderSize(fileSize); + if (byteStart < headerSize){ + std::string headerData = DTSCMeta2MP4Header(fileSize); + myConn.SendNow(headerData.data() + byteStart, std::min(headerSize, byteEnd) - byteStart); //send MP4 header + leftOver -= std::min(headerSize, byteEnd) - byteStart; + } + currPos += headerSize;//we're now guaranteed to be past the header point, no matter what seek(seekPoint); sentHeader = true; } diff --git a/src/output/output_progressive_mp4.h b/src/output/output_progressive_mp4.h index 99f66ac2..fe49368a 100644 --- a/src/output/output_progressive_mp4.h +++ b/src/output/output_progressive_mp4.h @@ -20,6 +20,7 @@ namespace Mist { uint64_t time; uint64_t byteOffset;//Stores relative bpos for fragmented MP4 uint64_t index; + uint32_t size; }; class OutProgressiveMP4 : public HTTPOutput { diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 2cec1183..84ca7251 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -10,6 +10,9 @@ namespace Mist { OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) { + lastOutTime = 0; + rtmpOffset = 0; + bootMsOffset = 0; setBlocking(true); while (!conn.Received().available(1537) && conn.connected() && config->is_active) { conn.spool(); @@ -67,60 +70,6 @@ namespace Mist { return false; } - void OutRTMP::parseVars(std::string data){ - std::string varname; - std::string varval; - bool trackSwitch = false; - // position where a part start (e.g. after &) - size_t pos = 0; - while (pos < data.length()){ - size_t nextpos = data.find('&', pos); - if (nextpos == std::string::npos){ - nextpos = data.length(); - } - size_t eq_pos = data.find('=', pos); - if (eq_pos < nextpos){ - // there is a key and value - varname = data.substr(pos, eq_pos - pos); - varval = data.substr(eq_pos + 1, nextpos - eq_pos - 1); - }else{ - // no value, only a key - varname = data.substr(pos, nextpos - pos); - varval.clear(); - } - - if (varname == "track" || varname == "audio" || varname == "video"){ - long long int selTrack = JSON::Value(varval).asInt(); - if (myMeta){ - if (myMeta.tracks.count(selTrack)){ - std::string & delThis = myMeta.tracks[selTrack].type; - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks[*it].type == delThis){ - selectedTracks.erase(it); - trackSwitch = true; - break; - } - } - selectedTracks.insert(selTrack); - } - }else{ - selectedTracks.insert(selTrack); - } - } - - if (nextpos == std::string::npos){ - // in case the string is gigantic - break; - } - // erase & - pos = nextpos + 1; - } - if (trackSwitch && thisPacket){ - seek(thisPacket.getTime()); - } - } - - void OutRTMP::init(Util::Config * cfg){ Output::init(cfg); capa["name"] = "RTMP"; @@ -275,9 +224,9 @@ namespace Mist { unsigned int timestamp = thisPacket.getTime() - rtmpOffset; //make sure we don't go negative - if (rtmpOffset > thisPacket.getTime()){ + if (rtmpOffset > (int64_t)thisPacket.getTime()){ timestamp = 0; - rtmpOffset = thisPacket.getTime(); + rtmpOffset = (int64_t)thisPacket.getTime(); } bool allow_short = RTMPStream::lastsend.count(4); @@ -638,7 +587,8 @@ namespace Mist { if (streamName.find('?') != std::string::npos){ std::string tmpVars = streamName.substr(streamName.find('?') + 1); streamName = streamName.substr(0, streamName.find('?')); - parseVars(tmpVars); + std::map targetParams; + HTTP::parseVars(tmpVars, targetParams); } size_t colonPos = streamName.find(':');