Backported various Pro edition changes and general code to Free edition

This commit is contained in:
Thulinma 2018-10-11 17:51:35 +02:00
parent 2aa86ccf01
commit 776cfe1850
7 changed files with 123 additions and 91 deletions

View file

@ -263,6 +263,7 @@ namespace Mist {
///Checks in the server configuration if this stream is set to always on or not.
/// Returns true if it is, or if the stream could not be found in the configuration.
/// If the compiled default debug level is < INFO, instead returns false if the stream is not found.
bool Input::isAlwaysOn(){
bool ret = true;
std::string strName = streamName.substr(0, (streamName.find_first_of("+ ")));
@ -274,6 +275,10 @@ namespace Mist {
if (!streamCfg.getMember("always_on") || !streamCfg.getMember("always_on").asBool()){
ret = false;
}
}else{
#if DEBUG < DLVL_DEVEL
ret = false;
#endif
}
configLock.post();
return ret;
@ -331,6 +336,10 @@ namespace Mist {
// - INPUT_TIMEOUT seconds haven't passed yet,
// - this is a live stream and at least two of the biggest fragment haven't passed yet,
bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)));
if (!ret && config->is_active && isAlwaysOn()){
ret = true;
activityCounter = Util::bootSecs();
}
return ret;
}
@ -473,12 +482,6 @@ namespace Mist {
if (!it2->second){
bufferRemove(it->first, it2->first);
pageCounter[it->first].erase(it2->first);
for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ntohl(((((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
if (thisKeyNum == it2->first){
(((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) = 0;
}
}
change = true;
break;
}

View file

@ -351,6 +351,19 @@ namespace Mist {
bufferLocations[tid].erase(bufferLocations[tid].begin());
}
if (pushLocation.count(it->first)){
// \todo Debugger says this is null sometimes. It shouldn't be. Figure out why!
// For now, this if will prevent crashes in these cases.
if (pushLocation[it->first]){
//Reset the userpage, to allow repushing from TS
IPC::userConnection userConn(pushLocation[it->first]);
for (int i = 0; i < SIMUL_TRACKS; i++) {
if (userConn.getTrackId(i) == it->first) {
userConn.setTrackId(i, 0);
userConn.setKeynum(i, 0);
break;
}
}
}
pushLocation.erase(it->first);
}
nProxy.curPageNum.erase(it->first);

View file

@ -74,6 +74,12 @@ namespace Mist{
DEBUG_MSG(DLVL_WARN, "Warning: MistOut created with closed socket!");
}
sentHeader = false;
//If we have a streamname option, set internal streamname to that option
if (!streamName.size() && config->hasOption("streamname")){
streamName = config->getString("streamname");
}
}
void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){
@ -161,7 +167,9 @@ namespace Mist{
}
bool Output::isReadyForPlay(){
if (isPushing()){return true;}
static bool recursing = false;
if (isPushing() || recursing){return true;}
recursing = true;
if (!isInitialized){initialize();}
if (!myMeta.tracks.size()){updateMeta();}
if (myMeta.tracks.size()){
@ -170,6 +178,7 @@ namespace Mist{
}
unsigned int mainTrack = getMainSelectedTrack();
if (mainTrack && myMeta.tracks.count(mainTrack) && (myMeta.tracks[mainTrack].keys.size() >= 2 || myMeta.tracks[mainTrack].lastms - myMeta.tracks[mainTrack].firstms > 5000)){
recursing = false;
return true;
}else{
HIGH_MSG("NOT READY YET (%lu tracks, %lu = %lu keys)", myMeta.tracks.size(), getMainSelectedTrack(), myMeta.tracks[getMainSelectedTrack()].keys.size());
@ -177,6 +186,7 @@ namespace Mist{
}else{
HIGH_MSG("NOT READY YET (%lu tracks)", myMeta.tracks.size());
}
recursing = false;
return false;
}
@ -572,11 +582,31 @@ namespace Mist{
return start;
}
///Return the end time of the selected tracks, or 0 if unknown or live.
///Return the end time of the selected tracks, or 0 if unknown.
///Returns the end time of latest track if nothing is selected.
///Returns zero if no tracks exist.
uint64_t Output::endTime(){
if (myMeta.live){return 0;}
if (!myMeta.tracks.size()){return 0;}
uint64_t end = 0;
if (selectedTracks.size()){
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks.count(*it)){
if (end < myMeta.tracks[*it].lastms){end = myMeta.tracks[*it].lastms;}
}
}
}else{
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (end < it->second.lastms){end = it->second.lastms;}
}
}
return end;
}
///Return the most live time stamp of the selected tracks, or 0 if unknown or non-live.
///Returns the time stamp of the newest track if nothing is selected.
///Returns zero if no tracks exist.
uint64_t Output::liveTime(){
if (!myMeta.live){return 0;}
if (!myMeta.tracks.size()){return 0;}
uint64_t end = 0;
if (selectedTracks.size()){
@ -944,6 +974,10 @@ namespace Mist{
dropTrack(nxt.tid, "timeless empty packet");
return false;
}
//for VoD, check if we've reached the end of the track, if so, drop it
if (myMeta.vod && nxt.time > myMeta.tracks[nxt.tid].lastms){
dropTrack(nxt.tid, "Reached end of track");
}
//if this is a live stream, we might have just reached the live point.
//check where the next key is
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time);
@ -954,8 +988,8 @@ namespace Mist{
if (++emptyCount < 100){
Util::wait(250);
//we're waiting for new data to show up
if (emptyCount % 8 == 0){
reconnect();//reconnect every 2 seconds
if (emptyCount % 64 == 0){
reconnect();//reconnect every 16 seconds
}else{
//updating meta is only useful with live streams
if (myMeta.live && emptyCount % 4 == 0){
@ -1097,7 +1131,7 @@ namespace Mist{
if (now == lastStats && !force){return;}
lastStats = now;
EXTREME_MSG("Writing stats: %s, %s, %lu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu);
HIGH_MSG("Writing stats: %s, %s, %lu, %llu, %llu", getConnectedHost().c_str(), streamName.c_str(), crc & 0xFFFFFFFFu, myConn.dataUp(), myConn.dataDown());
if (statsPage.getData()){
IPC::statExchange tmpEx(statsPage.getData());
tmpEx.now(now);
@ -1198,6 +1232,7 @@ namespace Mist{
return false;
}
close(outFile);
sought = false;
return true;
}
@ -1248,6 +1283,7 @@ namespace Mist{
Util::wait(1000);
streamStatus = Util::getStreamStatus(streamName);
if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){
INFO_MSG("Reconnecting to %s buffer... (%u)", streamName.c_str(), streamStatus);
reconnect();
streamStatus = Util::getStreamStatus(streamName);
}

View file

@ -49,6 +49,7 @@ namespace Mist {
uint64_t currentTime();
uint64_t startTime();
uint64_t endTime();
uint64_t liveTime();
void setBlocking(bool blocking);
void updateMeta();
bool selectDefaultTracks();

View file

@ -82,7 +82,10 @@ namespace Mist {
tmpRes += 16//SMHD Box
+ 16//STSD
+ 36//MP4A
+ 37 + thisTrack.init.size();//ESDS
+ 35;
if (thisTrack.init.size()){
tmpRes += 2 + thisTrack.init.size();//ESDS
}
}
//Unfortunately, for our STTS and CTTS boxes, we need to loop through all parts of the track
@ -117,6 +120,7 @@ namespace Mist {
}
res += 8; //mdat beginning
fileSize += res;
MEDIUM_MSG("H size %llu, file: %llu", res, fileSize);
return res;
}
@ -148,6 +152,7 @@ namespace Mist {
//Construct with duration of -1
MP4::MVHD mvhdBox(-1);
//Then override it to set the correct duration
uint64_t fms;
uint64_t firstms = 0xFFFFFFFFFFFFFFull;
uint64_t lastms = 0;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
@ -155,6 +160,7 @@ namespace Mist {
firstms = std::min(firstms, (uint64_t)myMeta.tracks[*it].firstms);
}
mvhdBox.setDuration(lastms - firstms);
fms = firstms;
//Set the trackid for the first "empty" track within the file.
mvhdBox.setTrackID(selectedTracks.size() + 1);
moovBox.setContent(mvhdBox, moovOffset++);
@ -176,11 +182,25 @@ namespace Mist {
MP4::ELST elstBox;
elstBox.setVersion(0);
elstBox.setFlags(0);
elstBox.setCount(1);
elstBox.setSegmentDuration(0, tDuration);
elstBox.setMediaTime(0, 0);
elstBox.setMediaRateInteger(0, 1);
elstBox.setMediaRateFraction(0, 0);
if (myMeta.vod && thisTrack.firstms != fms){
elstBox.setCount(2);
elstBox.setSegmentDuration(0, thisTrack.firstms - fms);
elstBox.setMediaTime(0, 0xFFFFFFFFull);
elstBox.setMediaRateInteger(0, 0);
elstBox.setMediaRateFraction(0, 0);
elstBox.setSegmentDuration(1, tDuration);
elstBox.setMediaTime(1, 0);
elstBox.setMediaRateInteger(1, 1);
elstBox.setMediaRateFraction(1, 0);
}else{
elstBox.setCount(1);
elstBox.setSegmentDuration(0, tDuration);
elstBox.setMediaTime(0, 0);
elstBox.setMediaRateInteger(0, 1);
elstBox.setMediaRateFraction(0, 0);
}
edtsBox.setContent(elstBox, 0);
trakBox.setContent(edtsBox, trakOffset++);
@ -198,6 +218,9 @@ namespace Mist {
MP4::MINF minfBox;
size_t minfOffset = 0;
MP4::STBL stblBox;
unsigned int stblOffset = 0;
//Add a track-type specific box to the MINF box
if (thisTrack.type == "video") {
MP4::VMHD vmhdBox;
@ -214,10 +237,6 @@ namespace Mist {
dinfBox.setContent(drefBox, 0);
minfBox.setContent(dinfBox, minfOffset++);
MP4::STBL stblBox;
size_t stblOffset = 0;
//Add STSD box
MP4::STSD stsdBox(0);
if (thisTrack.type == "video") {
@ -346,10 +365,12 @@ namespace Mist {
//Current values are actual byte offset without header-sized offset
std::set <keyPart> sortSet;//filling sortset for interleaving parts
for (std::set<long unsigned int>::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) {
DTSC::Track & thisTrack = myMeta.tracks[*subIt];
keyPart temp;
temp.trackID = *subIt;
temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame
temp.time = thisTrack.firstms;//timeplace of frame
temp.index = 0;
temp.size = thisTrack.parts[0].getDuration();
HIGH_MSG("Header sortSet: tid %lu time %lu", temp.trackID, temp.time);
sortSet.insert(temp);
}
@ -372,6 +393,7 @@ namespace Mist {
if (temp.index + 1< thisTrack.parts.size()) {//Only create new element, when there are new elements to be added
temp.time += thisTrack.parts[temp.index].getDuration();
++temp.index;
temp.size = thisTrack.parts[temp.index].getSize();
sortSet.insert(temp);
}
}
@ -384,8 +406,9 @@ namespace Mist {
if (mdatSize < 0xFFFFFFFF){
Bit::htobl(mdatHeader, mdatSize);
}
header << std::string(mdatHeader, 8);
header.write(mdatHeader, 8);
size += header.str().size();
MEDIUM_MSG("Header %llu, file: %llu", header.str().size(), size);
return header.str();
}
@ -425,6 +448,7 @@ namespace Mist {
if (temp.index + 1 < myMeta.tracks[temp.trackID].parts.size()){ //only insert when there are parts left
temp.time += thisTrack.parts[temp.index].getDuration();
++temp.index;
temp.size = thisTrack.parts[temp.index].getSize();
sortSet.insert(temp);
}
//Remove just-parsed element
@ -470,10 +494,12 @@ namespace Mist {
currPos = 0;
sortSet.clear();
for (std::set<long unsigned int>::iterator subIt = selectedTracks.begin(); subIt != selectedTracks.end(); subIt++) {
DTSC::Track & thisTrack = myMeta.tracks[*subIt];
keyPart temp;
temp.trackID = *subIt;
temp.time = myMeta.tracks[*subIt].firstms;//timeplace of frame
temp.time = thisTrack.firstms;//timeplace of frame
temp.index = 0;
temp.size = thisTrack.parts[temp.index].getSize();
sortSet.insert(temp);
}
if (H.GetHeader("Range") != ""){
@ -517,12 +543,6 @@ namespace Mist {
//HTTP_S.StartResponse(HTTP_R, conn);
}
leftOver = byteEnd - byteStart + 1;//add one byte, because range "0-0" = 1 byte of data
if (byteStart < headerSize) {
std::string headerData = DTSCMeta2MP4Header(fileSize);
myConn.SendNow(headerData.data() + byteStart, std::min(headerSize, byteEnd) - byteStart); //send MP4 header
leftOver -= std::min(headerSize, byteEnd) - byteStart;
}
currPos += headerSize;//we're now guaranteed to be past the header point, no matter what
}
void OutProgressiveMP4::sendNext() {
@ -534,8 +554,7 @@ namespace Mist {
thisPacket.getString("data", dataPointer, len);
keyPart thisPart = *sortSet.begin();
uint64_t thisSize = myMeta.tracks[thisPart.trackID].parts[thisPart.index].getSize();
if ((unsigned long)thisPacket.getTrackId() != thisPart.trackID || thisPacket.getTime() != thisPart.time || len != thisSize){
if ((unsigned long)thisPacket.getTrackId() != thisPart.trackID || thisPacket.getTime() != thisPart.time || len != thisPart.size){
if (thisPacket.getTime() > sortSet.begin()->time || thisPacket.getTrackId() > sortSet.begin()->trackID) {
if (perfect) {
WARN_MSG("Warning: input is inconsistent. Expected %lu:%lu but got %ld:%llu - cancelling playback", thisPart.trackID, thisPart.time, thisPacket.getTrackId(), thisPacket.getTime());
@ -543,7 +562,7 @@ namespace Mist {
myConn.close();
}
} else {
WARN_MSG("Did not receive expected %lu:%lu (%lub) but got %ld:%llu (%ub) - throwing it away", thisPart.trackID, thisPart.time, thisSize, thisPacket.getTrackId(), thisPacket.getTime(), len);
WARN_MSG("Did not receive expected %lu:%lu (%lub) but got %ld:%llu (%ub) - throwing it away", thisPart.trackID, thisPart.time, thisPart.size, thisPacket.getTrackId(), thisPacket.getTime(), len);
}
return;
}
@ -571,6 +590,7 @@ namespace Mist {
if (temp.index + 1 < thisTrack.parts.size()) { //only insert when there are parts left
temp.time += thisTrack.parts[temp.index].getDuration();
++temp.index;
temp.size = thisTrack.parts[temp.index].getSize();
sortSet.insert(temp);
}
@ -584,6 +604,14 @@ namespace Mist {
}
void OutProgressiveMP4::sendHeader(){
//Send the header data
uint64_t headerSize = mp4HeaderSize(fileSize);
if (byteStart < headerSize){
std::string headerData = DTSCMeta2MP4Header(fileSize);
myConn.SendNow(headerData.data() + byteStart, std::min(headerSize, byteEnd) - byteStart); //send MP4 header
leftOver -= std::min(headerSize, byteEnd) - byteStart;
}
currPos += headerSize;//we're now guaranteed to be past the header point, no matter what
seek(seekPoint);
sentHeader = true;
}

View file

@ -20,6 +20,7 @@ namespace Mist {
uint64_t time;
uint64_t byteOffset;//Stores relative bpos for fragmented MP4
uint64_t index;
uint32_t size;
};
class OutProgressiveMP4 : public HTTPOutput {

View file

@ -10,6 +10,9 @@
namespace Mist {
OutRTMP::OutRTMP(Socket::Connection & conn) : Output(conn) {
lastOutTime = 0;
rtmpOffset = 0;
bootMsOffset = 0;
setBlocking(true);
while (!conn.Received().available(1537) && conn.connected() && config->is_active) {
conn.spool();
@ -67,60 +70,6 @@ namespace Mist {
return false;
}
void OutRTMP::parseVars(std::string data){
std::string varname;
std::string varval;
bool trackSwitch = false;
// position where a part start (e.g. after &)
size_t pos = 0;
while (pos < data.length()){
size_t nextpos = data.find('&', pos);
if (nextpos == std::string::npos){
nextpos = data.length();
}
size_t eq_pos = data.find('=', pos);
if (eq_pos < nextpos){
// there is a key and value
varname = data.substr(pos, eq_pos - pos);
varval = data.substr(eq_pos + 1, nextpos - eq_pos - 1);
}else{
// no value, only a key
varname = data.substr(pos, nextpos - pos);
varval.clear();
}
if (varname == "track" || varname == "audio" || varname == "video"){
long long int selTrack = JSON::Value(varval).asInt();
if (myMeta){
if (myMeta.tracks.count(selTrack)){
std::string & delThis = myMeta.tracks[selTrack].type;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].type == delThis){
selectedTracks.erase(it);
trackSwitch = true;
break;
}
}
selectedTracks.insert(selTrack);
}
}else{
selectedTracks.insert(selTrack);
}
}
if (nextpos == std::string::npos){
// in case the string is gigantic
break;
}
// erase &
pos = nextpos + 1;
}
if (trackSwitch && thisPacket){
seek(thisPacket.getTime());
}
}
void OutRTMP::init(Util::Config * cfg){
Output::init(cfg);
capa["name"] = "RTMP";
@ -275,9 +224,9 @@ namespace Mist {
unsigned int timestamp = thisPacket.getTime() - rtmpOffset;
//make sure we don't go negative
if (rtmpOffset > thisPacket.getTime()){
if (rtmpOffset > (int64_t)thisPacket.getTime()){
timestamp = 0;
rtmpOffset = thisPacket.getTime();
rtmpOffset = (int64_t)thisPacket.getTime();
}
bool allow_short = RTMPStream::lastsend.count(4);
@ -638,7 +587,8 @@ namespace Mist {
if (streamName.find('?') != std::string::npos){
std::string tmpVars = streamName.substr(streamName.find('?') + 1);
streamName = streamName.substr(0, streamName.find('?'));
parseVars(tmpVars);
std::map<std::string, std::string> targetParams;
HTTP::parseVars(tmpVars, targetParams);
}
size_t colonPos = streamName.find(':');