From 92e73cb6db92fe1ab69d76dc0fe8d1f4ab64d0d9 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 30 Sep 2016 17:18:11 +0200 Subject: [PATCH 1/6] Fixed bug when tracks pause for 16 seconds. --- src/output/output.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/output/output.cpp b/src/output/output.cpp index eaa68dab..1daf8655 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -470,6 +470,7 @@ namespace Mist { nProxy.curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE); if (!(nProxy.curPage[trackId].mapped)){ FAIL_MSG("Initializing page %s failed", nProxy.curPage[trackId].name.c_str()); + currKeyOpen.erase(trackId); return; } currKeyOpen[trackId] = pageNum; @@ -856,7 +857,7 @@ namespace Mist { } //if this is a live stream, we might have just reached the live point. //check where the next key is - nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); + nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time); int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1); //if the next key hasn't shown up on another page, then we're waiting. //VoD might be slow, so we check VoD case also, just in case From 748960bb4403b18f7cb8ed99047770551efee2df Mon Sep 17 00:00:00 2001 From: Thulinma Date: Fri, 30 Sep 2016 23:20:55 +0200 Subject: [PATCH 2/6] Removed JSON dependency from FLV, sped up FLV input significantly, minor RTMP input speedup --- lib/flv_tag.cpp | 215 +++++++++++-------------------------- lib/flv_tag.h | 7 +- lib/util.cpp | 18 ++++ lib/util.h | 2 + src/input/input_buffer.cpp | 4 +- src/input/input_flv.cpp | 34 +++--- src/io.cpp | 25 ++--- src/output/output_rtmp.cpp | 10 +- 8 files changed, 124 insertions(+), 191 deletions(-) diff --git a/lib/flv_tag.cpp b/lib/flv_tag.cpp index 520929e5..70636616 100644 --- a/lib/flv_tag.cpp +++ b/lib/flv_tag.cpp @@ -518,7 +518,7 @@ void FLV::Tag::setLen() { data[ --i] = (len4) & 0xFF; } -/// FLV Video init data loader function from JSON. +/// FLV Video init data loader function from metadata. bool FLV::Tag::DTSCVideoInit(DTSC::Track & video) { //Unknown? Assume H264. len = 0; @@ -549,7 +549,7 @@ bool FLV::Tag::DTSCVideoInit(DTSC::Track & video) { return true; } -/// FLV Audio init data loader function from JSON. +/// FLV Audio init data loader function from metadata. bool FLV::Tag::DTSCAudioInit(DTSC::Track & audio) { len = 0; //Unknown? Assume AAC. @@ -709,6 +709,7 @@ bool FLV::Tag::ChunkLoader(const RTMPStream::Chunk & O) { data[2] = (O.len >> 8) & 0xFF; data[1] = (O.len >> 16) & 0xFF; tagTime(O.timestamp); + isKeyframe = ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)); return true; } @@ -794,11 +795,7 @@ bool FLV::Tag::MemLoader(char * D, unsigned int S, unsigned int & P) { //read tag body if (MemReadUntil(data, len, sofar, D, S, P)) { //calculate keyframeness, next time read header again, return true - if ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)) { - isKeyframe = true; - } else { - isKeyframe = false; - } + isKeyframe = ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)); done = true; sofar = 0; return true; @@ -892,11 +889,7 @@ bool FLV::Tag::FileLoader(FILE * f) { //read tag body if (FileReadUntil(data, len, sofar, f)) { //calculate keyframeness, next time read header again, return true - if ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)) { - isKeyframe = true; - } else { - isKeyframe = false; - } + isKeyframe = ((data[0] == 0x09) && (((data[11] & 0xf0) >> 4) == 1)); done = true; sofar = 0; fcntl(fileno(f), F_SETFL, preflags); @@ -944,9 +937,7 @@ unsigned int FLV::Tag::getDataLen(){ return len - 16; } -JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack) { - JSON::Value pack_out; // Storage for outgoing metadata. - +void FLV::Tag::toMeta(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack){ if (!reTrack){ switch (data[0]){ case 0x09: reTrack = 1; break;//video @@ -954,7 +945,6 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u case 0x12: reTrack = 3; break;//meta } } - pack_out["trackid"] = reTrack; if (data[0] == 0x12) { AMF::Object meta_in = AMF::parse((unsigned char *)data + 11, len - 15); @@ -968,78 +958,56 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u } if (tmp) { amf_storage = *tmp; - bool empty = true; - for (int i = 0; i < tmp->hasContent(); ++i) { - if (tmp->getContentP(i)->Indice() == "videocodecid" || tmp->getContentP(i)->Indice() == "audiocodecid" || tmp->getContentP(i)->Indice() == "width" || tmp->getContentP(i)->Indice() == "height" || tmp->getContentP(i)->Indice() == "videodatarate" || tmp->getContentP(i)->Indice() == "videoframerate" || tmp->getContentP(i)->Indice() == "audiodatarate" || tmp->getContentP(i)->Indice() == "audiosamplerate" || tmp->getContentP(i)->Indice() == "audiosamplesize" || tmp->getContentP(i)->Indice() == "audiochannels") { - continue; - } - if (tmp->getContentP(i)->NumValue()) { - pack_out["data"][tmp->getContentP(i)->Indice()] = (long long)tmp->getContentP(i)->NumValue(); - empty = false; - } else { - if (tmp->getContentP(i)->StrValue() != "") { - pack_out["data"][tmp->getContentP(i)->Indice()] = tmp->getContentP(i)->StrValue(); - empty = false; - } - } - } - if (!empty) { - pack_out["datatype"] = "meta"; - pack_out["time"] = tagTime(); - }else{ - pack_out.null(); - } } - return pack_out; //empty + return; } - if (data[0] == 0x08) { + if (data[0] == 0x08 && (metadata.tracks[reTrack].codec == "" || metadata.tracks[reTrack].codec != getAudioCodec() || (needsInitData() && isInitData()))) { char audiodata = data[11]; metadata.tracks[reTrack].trackID = reTrack; metadata.tracks[reTrack].type = "audio"; - if (metadata.tracks[reTrack].codec == "" || metadata.tracks[reTrack].codec != getAudioCodec()) { - metadata.tracks[reTrack].codec = getAudioCodec(); - switch (audiodata & 0x0C) { - case 0x0: - metadata.tracks[reTrack].rate = 5512; - break; - case 0x4: - metadata.tracks[reTrack].rate = 11025; - break; - case 0x8: - metadata.tracks[reTrack].rate = 22050; - break; - case 0xC: - metadata.tracks[reTrack].rate = 44100; - break; - } - if (amf_storage.getContentP("audiosamplerate")) { - metadata.tracks[reTrack].rate = (long long int)amf_storage.getContentP("audiosamplerate")->NumValue(); - } - switch (audiodata & 0x02) { - case 0x0: - metadata.tracks[reTrack].size = 8; - break; - case 0x2: - metadata.tracks[reTrack].size = 16; - break; - } - if (amf_storage.getContentP("audiosamplesize")) { - metadata.tracks[reTrack].size = (long long int)amf_storage.getContentP("audiosamplesize")->NumValue(); - } - switch (audiodata & 0x01) { - case 0x0: - metadata.tracks[reTrack].channels = 1; - break; - case 0x1: - metadata.tracks[reTrack].channels = 2; - break; - } - if (amf_storage.getContentP("stereo")) { - if (amf_storage.getContentP("stereo")->NumValue() == 1) { - metadata.tracks[reTrack].channels = 2; - } else { - metadata.tracks[reTrack].channels = 1; - } + metadata.tracks[reTrack].codec = getAudioCodec(); + + switch (audiodata & 0x0C) { + case 0x0: + metadata.tracks[reTrack].rate = 5512; + break; + case 0x4: + metadata.tracks[reTrack].rate = 11025; + break; + case 0x8: + metadata.tracks[reTrack].rate = 22050; + break; + case 0xC: + metadata.tracks[reTrack].rate = 44100; + break; + } + if (amf_storage.getContentP("audiosamplerate")) { + metadata.tracks[reTrack].rate = (long long int)amf_storage.getContentP("audiosamplerate")->NumValue(); + } + switch (audiodata & 0x02) { + case 0x0: + metadata.tracks[reTrack].size = 8; + break; + case 0x2: + metadata.tracks[reTrack].size = 16; + break; + } + if (amf_storage.getContentP("audiosamplesize")) { + metadata.tracks[reTrack].size = (long long int)amf_storage.getContentP("audiosamplesize")->NumValue(); + } + switch (audiodata & 0x01) { + case 0x0: + metadata.tracks[reTrack].channels = 1; + break; + case 0x1: + metadata.tracks[reTrack].channels = 2; + break; + } + if (amf_storage.getContentP("stereo")) { + if (amf_storage.getContentP("stereo")->NumValue() == 1) { + metadata.tracks[reTrack].channels = 2; + } else { + metadata.tracks[reTrack].channels = 1; } } if (needsInitData() && isInitData()) { @@ -1048,54 +1016,36 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u } else { metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } - pack_out.null(); - return pack_out; //skip rest of parsing, get next tag. } - pack_out["time"] = tagTime(); - if ((audiodata & 0xF0) == 0xA0) { - if (len < 18) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 13, (size_t)len - 17); - } else { - if (len < 17) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 12, (size_t)len - 16); - } - return pack_out; } - if (data[0] == 0x09) { + + if (data[0] == 0x09 && ((needsInitData() && isInitData()) || !metadata.tracks[reTrack].codec.size())){ char videodata = data[11]; - if (metadata.tracks[reTrack].codec == "") { - metadata.tracks[reTrack].codec = getVideoCodec(); - } + metadata.tracks[reTrack].codec = getVideoCodec(); metadata.tracks[reTrack].type = "video"; metadata.tracks[reTrack].trackID = reTrack; - if (!metadata.tracks[reTrack].width || !metadata.tracks[reTrack].height){ - if (amf_storage.getContentP("width")) { - metadata.tracks[reTrack].width = (long long int)amf_storage.getContentP("width")->NumValue(); - } - if (amf_storage.getContentP("height")) { - metadata.tracks[reTrack].height = (long long int)amf_storage.getContentP("height")->NumValue(); - } + if (amf_storage.getContentP("width")) { + metadata.tracks[reTrack].width = (long long int)amf_storage.getContentP("width")->NumValue(); + } + if (amf_storage.getContentP("height")) { + metadata.tracks[reTrack].height = (long long int)amf_storage.getContentP("height")->NumValue(); } if (!metadata.tracks[reTrack].fpks && amf_storage.getContentP("videoframerate")) { if (amf_storage.getContentP("videoframerate")->NumValue()){ metadata.tracks[reTrack].fpks = (long long int)(amf_storage.getContentP("videoframerate")->NumValue() * 1000.0); }else{ - metadata.tracks[reTrack].fpks = JSON::Value(amf_storage.getContentP("videoframerate")->StrValue()).asInt() * 1000.0; + metadata.tracks[reTrack].fpks = atoi(amf_storage.getContentP("videoframerate")->StrValue().c_str()) * 1000.0; } } if (needsInitData() && isInitData()) { if ((videodata & 0x0F) == 7) { if (len < 21) { - return JSON::Value(); + return; } metadata.tracks[reTrack].init = std::string((char *)data + 16, (size_t)len - 20); } else { if (len < 17) { - return JSON::Value(); + return; } metadata.tracks[reTrack].init = std::string((char *)data + 12, (size_t)len - 16); } @@ -1108,48 +1058,9 @@ JSON::Value FLV::Tag::toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, u metadata.tracks[reTrack].height = spsChar.height; metadata.tracks[reTrack].fpks = spsChar.fps * 1000; } - pack_out.null(); - return pack_out; //skip rest of parsing, get next tag. } - switch (videodata & 0xF0) { - case 0x10: - case 0x40: - pack_out["keyframe"] = 1; - break; - case 0x50: - return JSON::Value(); - break; //the video info byte we just throw away - useless to us... - } - pack_out["time"] = tagTime(); - if (!getDataLen()){ - //empty packet - pack_out["data"] = ""; - return pack_out; - } - if ((videodata & 0x0F) == 7) { - switch (data[12]) { - case 1: - pack_out["nalu"] = 1; - break; - case 2: - pack_out["nalu_end"] = 1; - break; - } - pack_out["offset"] = offset(); - if (len < 21) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 16, (size_t)len - 20); - } else { - if (len < 17) { - return JSON::Value(); - } - pack_out["data"] = std::string((char *)data + 12, (size_t)len - 16); - } - return pack_out; } - return pack_out; //should never get here -} //FLV::Tag::toJSON +} /// Checks if buf is large enough to contain len. /// Attempts to resize data buffer if not/ diff --git a/lib/flv_tag.h b/lib/flv_tag.h index b91fe53e..4e430830 100644 --- a/lib/flv_tag.h +++ b/lib/flv_tag.h @@ -4,7 +4,6 @@ #pragma once #include "socket.h" #include "dtsc.h" -#include "json.h" #include "amf.h" #include @@ -51,7 +50,7 @@ namespace FLV { bool DTSCVideoInit(DTSC::Track & video); bool DTSCAudioInit(DTSC::Track & audio); bool DTSCMetaInit(DTSC::Meta & M, std::set & selTracks); - JSON::Value toJSON(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack = 0); + void toMeta(DTSC::Meta & metadata, AMF::Object & amf_storage, unsigned int reTrack = 0); bool MemLoader(char * D, unsigned int S, unsigned int & P); bool FileLoader(FILE * f); unsigned int getTrackID(); @@ -66,10 +65,6 @@ namespace FLV { //loader helper functions bool MemReadUntil(char * buffer, unsigned int count, unsigned int & sofar, char * D, unsigned int S, unsigned int & P); bool FileReadUntil(char * buffer, unsigned int count, unsigned int & sofar, FILE * f); - //JSON writer helpers - void Meta_Put(JSON::Value & meta, std::string cat, std::string elem, std::string val); - void Meta_Put(JSON::Value & meta, std::string cat, std::string elem, uint64_t val); - bool Meta_Has(JSON::Value & meta, std::string cat, std::string elem); }; //Tag diff --git a/lib/util.cpp b/lib/util.cpp index 2c44cf47..0e1d39fb 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -1,4 +1,8 @@ +//This line will make ftello/fseeko work with 64 bits numbers +#define _FILE_OFFSET_BITS 64 + #include "util.h" +#include #include namespace Util { @@ -36,5 +40,19 @@ namespace Util { } return result.size() == positions.size(); } + + /// 64-bits version of ftell + uint64_t ftell(FILE * stream){ + /// \TODO Windows implementation (e.g. _ftelli64 ?) + return ftello(stream); + } + + /// 64-bits version of fseek + uint64_t fseek(FILE * stream, uint64_t offset, int whence){ + /// \TODO Windows implementation (e.g. _fseeki64 ?) + return fseeko(stream, offset, whence); + } + + } diff --git a/lib/util.h b/lib/util.h index df0a27b9..2bb6cd9a 100644 --- a/lib/util.h +++ b/lib/util.h @@ -3,4 +3,6 @@ namespace Util { bool stringScan(const std::string & src, const std::string & pattern, std::deque & result); + uint64_t ftell(FILE * stream); + uint64_t fseek(FILE * stream, uint64_t offset, int whence); } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 16795a58..574198e1 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -624,11 +624,11 @@ namespace Mist { } void inputBuffer::updateTrackMeta(unsigned long tNum) { - VERYHIGH_MSG("Updating meta for track %d", tNum); //Store a reference for easier access std::map & locations = bufferLocations[tNum]; char * mappedPointer = nProxy.metaPages[tNum].mapped; if (!mappedPointer){return;} + VERYHIGH_MSG("Updating meta for track %lu, %lu pages", tNum, locations.size()); //First detect all entries on metaPage for (int i = 0; i < 8192; i += 8) { @@ -637,11 +637,11 @@ namespace Mist { continue; } unsigned long keyNum = ntohl(tmpOffset[0]); - INSANE_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); //Add an entry into bufferLocations[tNum] for the pages we haven't handled yet. if (!locations.count(keyNum)) { locations[keyNum].curOffset = 0; + VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1])); } locations[keyNum].pageNum = keyNum; locations[keyNum].keyNum = ntohl(tmpOffset[1]); diff --git a/src/input/input_flv.cpp b/src/input/input_flv.cpp index 53412c9c..1065f6ef 100644 --- a/src/input/input_flv.cpp +++ b/src/input/input_flv.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -53,33 +54,35 @@ namespace Mist { //See whether a separate header file exists. if (readExistingHeader()){return true;} //Create header file from FLV data - fseek(inFile, 13, SEEK_SET); + Util::fseek(inFile, 13, SEEK_SET); AMF::Object amf_storage; - JSON::Value lastPack; long long int lastBytePos = 13; + uint64_t bench = Util::getMicros(); while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ - lastPack.null(); - lastPack = tmpTag.toJSON(myMeta, amf_storage); - lastPack["bpos"] = lastBytePos; - myMeta.update(lastPack); - lastBytePos = ftell(inFile); + tmpTag.toMeta(myMeta, amf_storage); + if (!tmpTag.getDataLen()){continue;} + if (tmpTag.needsInitData() && tmpTag.isInitData()){continue;} + myMeta.update(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); + lastBytePos = Util::ftell(inFile); } } + bench = Util::getMicros(bench); + INFO_MSG("Header generated in %llu ms: @%lld, %s, %s", bench/1000, lastBytePos, myMeta.vod?"VoD":"NOVoD", myMeta.live?"Live":"NOLive"); if (FLV::Parse_Error){ - std::cerr << FLV::Error_Str << std::endl; - return false; + FLV::Parse_Error = false; + ERROR_MSG("Stopping at FLV parse error @%lld: %s", lastBytePos, FLV::Error_Str.c_str()); } myMeta.toFile(config->getString("input") + ".dtsh"); return true; } void inputFLV::getNext(bool smart) { - long long int lastBytePos = ftell(inFile); + long long int lastBytePos = Util::ftell(inFile); while (!feof(inFile) && !FLV::Parse_Error){ if (tmpTag.FileLoader(inFile)){ if ( !selectedTracks.count(tmpTag.getTrackID())){ - lastBytePos = ftell(inFile); + lastBytePos = Util::ftell(inFile); continue; } break; @@ -90,11 +93,12 @@ namespace Mist { return; } if (FLV::Parse_Error){ - FAIL_MSG("FLV error: %s", FLV::Error_Str.c_str()); + FLV::Parse_Error = false; + FAIL_MSG("FLV error @ %lld: %s", lastBytePos, FLV::Error_Str.c_str()); thisPacket.null(); return; } - if (!tmpTag.getDataLen()){ + if (!tmpTag.getDataLen() || (tmpTag.needsInitData() && tmpTag.isInitData())){ return getNext(); } thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data @@ -104,14 +108,14 @@ namespace Mist { //We will seek to the corresponding keyframe of the video track if selected, otherwise audio keyframe. //Flv files are never multi-track, so track 1 is video, track 2 is audio. int trackSeek = (selectedTracks.count(1) ? 1 : 2); - size_t seekPos = myMeta.tracks[trackSeek].keys[0].getBpos(); + uint64_t seekPos = myMeta.tracks[trackSeek].keys[0].getBpos(); for (unsigned int i = 0; i < myMeta.tracks[trackSeek].keys.size(); i++){ if (myMeta.tracks[trackSeek].keys[i].getTime() > seekTime){ break; } seekPos = myMeta.tracks[trackSeek].keys[i].getBpos(); } - fseek(inFile, seekPos, SEEK_SET); + Util::fseek(inFile, seekPos, SEEK_SET); } void inputFLV::trackSelect(std::string trackSpec) { diff --git a/src/io.cpp b/src/io.cpp index a38f0d6c..3530d2c6 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -417,16 +417,14 @@ namespace Mist { unsigned long tid = packet.getTrackId(); //Do nothing if the trackid is invalid if (!tid) { - INFO_MSG("Packet without trackid"); + WARN_MSG("Packet without trackid!"); return; } - //If the track is not negotiated yet, start the negotiation - if (!trackState.count(tid)) { - continueNegotiate(tid, myMeta); - } + //negotiate track ID if needed + continueNegotiate(tid, myMeta); //If the track is declined, stop here if (trackState[tid] == FILL_DEC) { - INFO_MSG("Track %lu Declined", tid); + INFO_MSG("Track %lu declined", tid); preBuffer[tid].clear(); return; } @@ -434,9 +432,12 @@ namespace Mist { if (trackState[tid] != FILL_ACC) { preBuffer[tid].push_back(packet); }else{ - while (preBuffer[tid].size()){ - bufferSinglePacket(preBuffer[tid].front(), myMeta); - preBuffer[tid].pop_front(); + if (preBuffer[tid].size()){ + INFO_MSG("Track %lu accepted", tid); + while (preBuffer[tid].size()){ + bufferSinglePacket(preBuffer[tid].front(), myMeta); + preBuffer[tid].pop_front(); + } } bufferSinglePacket(packet, myMeta); } @@ -448,9 +449,7 @@ namespace Mist { //This update needs to happen whether the track is accepted or not. bool isKeyframe = false; if (myMeta.tracks[tid].type == "video") { - if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) { - isKeyframe = true; - } + isKeyframe = packet.getFlag("keyframe"); } else { if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0) { //Assume this is the first packet on the track @@ -467,6 +466,7 @@ namespace Mist { //This also happens in bufferNext, with the same rules if (myMeta.live){ if (packet.getTime() > 0xFFFF0000 && !myMeta.tracks[tid].lastms){ + INFO_MSG("Ignoring packet with unexpected timestamp"); return;//ignore bullshit timestamps } if (packet.getTime() < myMeta.tracks[tid].lastms){ @@ -509,6 +509,7 @@ namespace Mist { } //If we have no pages by track, we have not received a starting keyframe yet. Drop this packet. if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0){ + INFO_MSG("Track %lu not starting with a keyframe!", tid); return; } diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index 73dce935..a69fc9a1 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -936,16 +936,18 @@ namespace Mist { }else{ amf_storage = &(pushMeta.begin()->second); } - JSON::Value pack_out = F.toJSON(myMeta, *amf_storage, next.cs_id*3 + (F.data[0] == 0x09 ? 0 : (F.data[0] == 0x08 ? 1 : 2) )); - if ( !pack_out.isNull()){ + + unsigned int reTrack = next.cs_id*3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3)); + F.toMeta(myMeta, *amf_storage, reTrack); + if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){ + thisPacket.genericFill(F.tagTime(), F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe); if (!nProxy.userClient.getData()){ char userPageName[NAME_BUFFER_SIZE]; snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); } - continueNegotiate(pack_out["trackid"].asInt()); nProxy.streamName = streamName; - bufferLivePacket(pack_out); + bufferLivePacket(thisPacket); } break; } From c5870b02f1f9df1883327b94145b6d97d2345e92 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Sat, 1 Oct 2016 14:13:03 +0200 Subject: [PATCH 3/6] Improved buffer behaviour for streams that are faster than real-time. Improved documentation for those areas as well. --- lib/dtsc.h | 4 +++ lib/dtscmeta.cpp | 44 +++++++++++++++++++++++-- src/input/input_buffer.cpp | 67 ++++++++++++++++++++------------------ 3 files changed, 80 insertions(+), 35 deletions(-) diff --git a/lib/dtsc.h b/lib/dtsc.h index e09d44ee..7f464545 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -271,6 +271,7 @@ namespace DTSC { std::deque keySizes; std::deque parts; Key & getKey(unsigned int keyNum); + Fragment & getFrag(unsigned int fragNum); unsigned int timeToKeynum(unsigned int timestamp); unsigned int timeToFragnum(unsigned int timestamp); void reset(); @@ -297,8 +298,11 @@ namespace DTSC { int width; int height; int fpks; + void removeFirstKey(); + uint32_t secsSinceFirstFragmentInsert(); private: std::string cachedIdent; + std::deque fragInsertTime; }; ///\brief Class for storage of meta data diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index d2c1b371..4071cf9b 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -1179,6 +1179,8 @@ namespace DTSC { newFrag.setDuration(0); newFrag.setSize(0); fragments.push_back(newFrag); + //We set the insert time lastms-firstms in the future, to prevent unstable playback + fragInsertTime.push_back(Util::bootSecs() + ((lastms - firstms)/1000)); } else { Fragment & lastFrag = fragments[fragments.size() - 1]; lastFrag.setLength(lastFrag.getLength() + 1); @@ -1188,6 +1190,41 @@ namespace DTSC { (*keySizes.rbegin()) += packSendSize; fragments.rbegin()->setSize(fragments.rbegin()->getSize() + packDataSize); } + + /// Removes the first buffered key, including any fragments it was part of + void Track::removeFirstKey(){ + HIGH_MSG("Erasing key %d:%lu", trackID, keys[0].getNumber()); + //remove all parts of this key + for (int i = 0; i < keys[0].getParts(); i++) { + parts.pop_front(); + } + //remove the key itself + keys.pop_front(); + keySizes.pop_front(); + //update firstms + firstms = keys[0].getTime(); + //delete any fragments no longer fully buffered + while (fragments[0].getNumber() < keys[0].getNumber()) { + fragments.pop_front(); + fragInsertTime.pop_front(); + //and update the missed fragment counter + ++missedFrags; + } + } + + /// Returns the amount of whole seconds since the first fragment was inserted into the buffer. + /// This assumes playback from the start of the buffer at time of insert, meaning that + /// the time is offset by that difference. E.g.: if a buffer is 50s long, the newest fragment + /// will have a value of 0 until 50s have passed, after which it will increase at a rate of + /// 1 per second. + uint32_t Track::secsSinceFirstFragmentInsert(){ + uint32_t bs = Util::bootSecs(); + if (bs > fragInsertTime.front()){ + return bs - fragInsertTime.front(); + }else{ + return 0; + } + } void Track::finalize(){ keys.rbegin()->setLength(lastms - keys.rbegin()->getTime() + parts.rbegin()->getDuration()); @@ -1216,6 +1253,7 @@ namespace DTSC { return keys[keyNum - keys[0].getNumber()]; } + /// Returns the number of the key containing timestamp, or last key if nowhere. unsigned int Track::timeToKeynum(unsigned int timestamp){ unsigned int result = 0; for (std::deque::iterator it = keys.begin(); it != keys.end(); it++){ @@ -1227,13 +1265,12 @@ namespace DTSC { return result; } + /// Gets indice of the fragment containing timestamp, or last fragment if nowhere. unsigned int Track::timeToFragnum(unsigned int timestamp){ - unsigned long long int totalTime = firstms; for (unsigned int i = 0; iis_active) { + //If this track is empty, abort + if (!Trk.keys.size()) { return false; } - //If we're shutting down, and this track is empty, abort - if (!myMeta.tracks[tid].keys.size()) { - return false; - } - if (config->is_active && Trk.fragments.size() > 2){ - ///Make sure we have at least 8X the target duration. - //The target duration is the biggest fragment, rounded up to whole seconds. - uint32_t targetDuration = (Trk.biggestFragment() / 1000 + 1) * 1000; - //The start is the third fragment's begin - uint32_t fragStart = Trk.getKey((++(++Trk.fragments.begin()))->getNumber()).getTime(); - //The end is the last fragment's begin - uint32_t fragEnd = Trk.getKey(Trk.fragments.rbegin()->getNumber()).getTime(); - if ((fragEnd - fragStart) < targetDuration * 8){ + //the following checks only run if we're not shutting down + if (config->is_active){ + //Make sure we have at least 4 whole fragments at all times, + if (Trk.fragments.size() < 5) { return false; } + //ensure we have each fragment buffered for at least the whole bufferTime + if (!Trk.secsSinceFirstFragmentInsert() || (Trk.lastms - Trk.firstms) < bufferTime){ + return false; + } + if (Trk.fragments.size() > 2){ + ///Make sure we have at least 8X the target duration. + //The target duration is the biggest fragment, rounded up to whole seconds. + uint32_t targetDuration = (Trk.biggestFragment() / 1000 + 1) * 1000; + //The start is the third fragment's begin + uint32_t fragStart = Trk.getKey((++(++Trk.fragments.begin()))->getNumber()).getTime(); + //The end is the last fragment's begin + uint32_t fragEnd = Trk.getKey(Trk.fragments.rbegin()->getNumber()).getTime(); + if ((fragEnd - fragStart) < targetDuration * 8){ + return false; + } + } } - HIGH_MSG("Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); - //remove all parts of this key - for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) { - myMeta.tracks[tid].parts.pop_front(); - } - //remove the key itself - myMeta.tracks[tid].keys.pop_front(); - myMeta.tracks[tid].keySizes.pop_front(); - //re-calculate firstms - myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime(); - //delete the fragment if it's no longer fully buffered - if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()) { - myMeta.tracks[tid].fragments.pop_front(); - myMeta.tracks[tid].missedFrags ++; - } + //Alright, everything looks good, let's delete the key and possibly also fragment + Trk.removeFirstKey(); //if there is more than one page buffered for this track... if (bufferLocations[tid].size() > 1) { //Check if the first key starts on the second page or higher - if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){ + if (Trk.keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){ + //If so, we can delete the first page entirely HIGH_MSG("Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); bufferRemove(tid, bufferLocations[tid].begin()->first); @@ -382,6 +384,7 @@ namespace Mist { } } //Buffer size management + /// \TODO Make sure data has been in the buffer for at least bufferTime after it goes in while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime) { if (!removeKey(it->first)) { break; From 61feff7ba135f1fe34442bf1186700b47076f870 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 11 Oct 2016 15:15:26 +0200 Subject: [PATCH 4/6] Faster and less spammy sharedServer user counter --- lib/shared_memory.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 4ceae195..4caa538f 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -781,6 +781,7 @@ namespace IPC { } mySemaphore.post(); semGuard tmpGuard(&mySemaphore); + amount = 0; newPage(); } @@ -811,6 +812,7 @@ namespace IPC { myPages.push_back(tmp); tmp.master = false; DEBUG_MSG(DLVL_VERYHIGH, "Created a new page: %s", tmp.name.c_str()); + amount += tmp.len / (payLen + (hasCounter ? 1 : 0)) - 1; } ///\brief Deletes the highest allocated page @@ -861,7 +863,7 @@ namespace IPC { do{ parseEach(killStatistics); Util::wait(250); - }while(amount && c++ < 10); + }while(amount>1 && c++ < 10); } ///Returns a pointer to the data for the given index. @@ -914,6 +916,7 @@ namespace IPC { unsigned int id = 0; unsigned int userCount = 0; unsigned int emptyCount = 0; + unsigned int lastFilled = 0; connectedUsers = 0; for (std::deque::iterator it = myPages.begin(); it != myPages.end(); it++) { if (!it->mapped || !it->len) { @@ -932,6 +935,7 @@ namespace IPC { connectedUsers++; } char countNum = (*counter) & 0x7F; + lastFilled = id; if (id >= amount) { amount = id + 1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); @@ -986,10 +990,10 @@ namespace IPC { } } else { //stop if we're past the amount counted and we're empty - if (id >= amount - 1) { + if (id >= amount) { //bring the counter down if this was the last element - if (id == amount - 1) { - amount = id; + if (lastFilled+1 < amount) { + amount = lastFilled+1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } //stop, we're guaranteed no more pages are full at this point @@ -1000,6 +1004,7 @@ namespace IPC { if (memcmp(empty, it->mapped + offset, payLen)) { ++userCount; //increase the count if needed + lastFilled = id; if (id >= amount) { amount = id + 1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); @@ -1007,10 +1012,10 @@ namespace IPC { callback(it->mapped + offset, payLen, id); } else { //stop if we're past the amount counted and we're empty - if (id >= amount - 1) { + if (id >= amount) { //bring the counter down if this was the last element - if (id == amount - 1) { - amount = id; + if (lastFilled+1 < amount) { + amount = lastFilled+1; VERYHIGH_MSG("Shared memory %s is now at count %u", baseName.c_str(), amount); } //stop, we're guaranteed no more pages are full at this point From 419686f50cbd63c3b75438f3d2131df60f7d8c13 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 4 Oct 2016 12:56:40 +0200 Subject: [PATCH 5/6] Fixed zero-timestamp RTMP push output --- src/output/output_rtmp.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index a69fc9a1..66737e6e 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -286,7 +286,11 @@ namespace Mist { data_len += dheader_len; unsigned int timestamp = thisPacket.getTime() - rtmpOffset; - if (rtmpOffset > thisPacket.getTime()){timestamp = 0;}//make sure we don't go negative + //make sure we don't go negative + if (rtmpOffset > thisPacket.getTime()){ + timestamp = 0; + rtmpOffset = thisPacket.getTime(); + } bool allow_short = RTMPStream::lastsend.count(4); RTMPStream::Chunk & prev = RTMPStream::lastsend[4]; From bf8ddcb300c5f627e8960d446a7b377d5d7f93be Mon Sep 17 00:00:00 2001 From: Thulinma Date: Tue, 11 Oct 2016 15:20:13 +0200 Subject: [PATCH 6/6] Removed dead code --- lib/shared_memory.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/shared_memory.cpp b/lib/shared_memory.cpp index 4caa538f..554715c7 100644 --- a/lib/shared_memory.cpp +++ b/lib/shared_memory.cpp @@ -757,7 +757,6 @@ namespace IPC { ///\param len The lenght of the payload ///\param withCounter Whether the content should have a counter void sharedServer::init(std::string name, int len, bool withCounter) { - amount = 0; if (mySemaphore) { mySemaphore.close(); }