Merge branch 'development' into LTS_development
This commit is contained in:
commit
7f6b919e4f
11 changed files with 211 additions and 228 deletions
|
@ -329,48 +329,50 @@ namespace Mist {
|
|||
liveMeta.post();
|
||||
}
|
||||
|
||||
///Checks if removing a key from this track is allowed/safe, and if so, removes it.
|
||||
///Returns true if a key was actually removed, false otherwise
|
||||
///Aborts if any of the following conditions are true (while active):
|
||||
/// * no keys present
|
||||
/// * not at least 4 whole fragments present
|
||||
/// * first fragment hasn't been at least lastms-firstms ms in buffer
|
||||
/// * less than 8 times the biggest fragment duration is buffered
|
||||
/// If a key was deleted and the first buffered data page is no longer used, it is deleted also.
|
||||
bool inputBuffer::removeKey(unsigned int tid) {
|
||||
DTSC::Track & Trk = myMeta.tracks[tid];
|
||||
//Make sure we have at least 3 whole fragments at all times,
|
||||
//unless we're shutting down the whole buffer right now
|
||||
if (Trk.fragments.size() < 5 && config->is_active) {
|
||||
//If this track is empty, abort
|
||||
if (!Trk.keys.size()) {
|
||||
return false;
|
||||
}
|
||||
//If we're shutting down, and this track is empty, abort
|
||||
if (!myMeta.tracks[tid].keys.size()) {
|
||||
return false;
|
||||
}
|
||||
if (config->is_active && Trk.fragments.size() > 2){
|
||||
///Make sure we have at least 8X the target duration.
|
||||
//The target duration is the biggest fragment, rounded up to whole seconds.
|
||||
uint32_t targetDuration = (Trk.biggestFragment() / 1000 + 1) * 1000;
|
||||
//The start is the third fragment's begin
|
||||
uint32_t fragStart = Trk.getKey((++(++Trk.fragments.begin()))->getNumber()).getTime();
|
||||
//The end is the last fragment's begin
|
||||
uint32_t fragEnd = Trk.getKey(Trk.fragments.rbegin()->getNumber()).getTime();
|
||||
if ((fragEnd - fragStart) < targetDuration * 8){
|
||||
//the following checks only run if we're not shutting down
|
||||
if (config->is_active){
|
||||
//Make sure we have at least 4 whole fragments at all times,
|
||||
if (Trk.fragments.size() < 5) {
|
||||
return false;
|
||||
}
|
||||
//ensure we have each fragment buffered for at least the whole bufferTime
|
||||
if (!Trk.secsSinceFirstFragmentInsert() || (Trk.lastms - Trk.firstms) < bufferTime){
|
||||
return false;
|
||||
}
|
||||
if (Trk.fragments.size() > 2){
|
||||
///Make sure we have at least 8X the target duration.
|
||||
//The target duration is the biggest fragment, rounded up to whole seconds.
|
||||
uint32_t targetDuration = (Trk.biggestFragment() / 1000 + 1) * 1000;
|
||||
//The start is the third fragment's begin
|
||||
uint32_t fragStart = Trk.getKey((++(++Trk.fragments.begin()))->getNumber()).getTime();
|
||||
//The end is the last fragment's begin
|
||||
uint32_t fragEnd = Trk.getKey(Trk.fragments.rbegin()->getNumber()).getTime();
|
||||
if ((fragEnd - fragStart) < targetDuration * 8){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
HIGH_MSG("Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber());
|
||||
//remove all parts of this key
|
||||
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) {
|
||||
myMeta.tracks[tid].parts.pop_front();
|
||||
}
|
||||
//remove the key itself
|
||||
myMeta.tracks[tid].keys.pop_front();
|
||||
myMeta.tracks[tid].keySizes.pop_front();
|
||||
//re-calculate firstms
|
||||
myMeta.tracks[tid].firstms = myMeta.tracks[tid].keys[0].getTime();
|
||||
//delete the fragment if it's no longer fully buffered
|
||||
if (myMeta.tracks[tid].fragments[0].getNumber() < myMeta.tracks[tid].keys[0].getNumber()) {
|
||||
myMeta.tracks[tid].fragments.pop_front();
|
||||
myMeta.tracks[tid].missedFrags ++;
|
||||
}
|
||||
//Alright, everything looks good, let's delete the key and possibly also fragment
|
||||
Trk.removeFirstKey();
|
||||
//if there is more than one page buffered for this track...
|
||||
if (bufferLocations[tid].size() > 1) {
|
||||
//Check if the first key starts on the second page or higher
|
||||
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){
|
||||
if (Trk.keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){
|
||||
//If so, we can delete the first page entirely
|
||||
HIGH_MSG("Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
|
||||
bufferRemove(tid, bufferLocations[tid].begin()->first);
|
||||
|
||||
|
@ -537,6 +539,7 @@ namespace Mist {
|
|||
}
|
||||
}
|
||||
//Buffer size management
|
||||
/// \TODO Make sure data has been in the buffer for at least bufferTime after it goes in
|
||||
while (it->second.keys.size() > 1 && (it->second.lastms - it->second.keys[1].getTime()) > bufferTime) {
|
||||
if (!removeKey(it->first)) {
|
||||
break;
|
||||
|
@ -817,11 +820,11 @@ namespace Mist {
|
|||
}
|
||||
|
||||
void inputBuffer::updateTrackMeta(unsigned long tNum) {
|
||||
VERYHIGH_MSG("Updating meta for track %d", tNum);
|
||||
//Store a reference for easier access
|
||||
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
|
||||
char * mappedPointer = nProxy.metaPages[tNum].mapped;
|
||||
if (!mappedPointer){return;}
|
||||
VERYHIGH_MSG("Updating meta for track %lu, %lu pages", tNum, locations.size());
|
||||
|
||||
//First detect all entries on metaPage
|
||||
for (int i = 0; i < 8192; i += 8) {
|
||||
|
@ -830,11 +833,11 @@ namespace Mist {
|
|||
continue;
|
||||
}
|
||||
unsigned long keyNum = ntohl(tmpOffset[0]);
|
||||
INSANE_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1]));
|
||||
|
||||
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
|
||||
if (!locations.count(keyNum)) {
|
||||
locations[keyNum].curOffset = 0;
|
||||
VERYHIGH_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1]));
|
||||
}
|
||||
locations[keyNum].pageNum = keyNum;
|
||||
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#include <cstdlib>
|
||||
#include <cstdio>
|
||||
#include <string>
|
||||
#include <mist/util.h>
|
||||
#include <mist/stream.h>
|
||||
#include <mist/defines.h>
|
||||
|
||||
|
@ -53,33 +54,35 @@ namespace Mist {
|
|||
//See whether a separate header file exists.
|
||||
if (readExistingHeader()){return true;}
|
||||
//Create header file from FLV data
|
||||
fseek(inFile, 13, SEEK_SET);
|
||||
Util::fseek(inFile, 13, SEEK_SET);
|
||||
AMF::Object amf_storage;
|
||||
JSON::Value lastPack;
|
||||
long long int lastBytePos = 13;
|
||||
uint64_t bench = Util::getMicros();
|
||||
while (!feof(inFile) && !FLV::Parse_Error){
|
||||
if (tmpTag.FileLoader(inFile)){
|
||||
lastPack.null();
|
||||
lastPack = tmpTag.toJSON(myMeta, amf_storage);
|
||||
lastPack["bpos"] = lastBytePos;
|
||||
myMeta.update(lastPack);
|
||||
lastBytePos = ftell(inFile);
|
||||
tmpTag.toMeta(myMeta, amf_storage);
|
||||
if (!tmpTag.getDataLen()){continue;}
|
||||
if (tmpTag.needsInitData() && tmpTag.isInitData()){continue;}
|
||||
myMeta.update(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe);
|
||||
lastBytePos = Util::ftell(inFile);
|
||||
}
|
||||
}
|
||||
bench = Util::getMicros(bench);
|
||||
INFO_MSG("Header generated in %llu ms: @%lld, %s, %s", bench/1000, lastBytePos, myMeta.vod?"VoD":"NOVoD", myMeta.live?"Live":"NOLive");
|
||||
if (FLV::Parse_Error){
|
||||
std::cerr << FLV::Error_Str << std::endl;
|
||||
return false;
|
||||
FLV::Parse_Error = false;
|
||||
ERROR_MSG("Stopping at FLV parse error @%lld: %s", lastBytePos, FLV::Error_Str.c_str());
|
||||
}
|
||||
myMeta.toFile(config->getString("input") + ".dtsh");
|
||||
return true;
|
||||
}
|
||||
|
||||
void inputFLV::getNext(bool smart) {
|
||||
long long int lastBytePos = ftell(inFile);
|
||||
long long int lastBytePos = Util::ftell(inFile);
|
||||
while (!feof(inFile) && !FLV::Parse_Error){
|
||||
if (tmpTag.FileLoader(inFile)){
|
||||
if ( !selectedTracks.count(tmpTag.getTrackID())){
|
||||
lastBytePos = ftell(inFile);
|
||||
lastBytePos = Util::ftell(inFile);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
|
@ -90,11 +93,12 @@ namespace Mist {
|
|||
return;
|
||||
}
|
||||
if (FLV::Parse_Error){
|
||||
FAIL_MSG("FLV error: %s", FLV::Error_Str.c_str());
|
||||
FLV::Parse_Error = false;
|
||||
FAIL_MSG("FLV error @ %lld: %s", lastBytePos, FLV::Error_Str.c_str());
|
||||
thisPacket.null();
|
||||
return;
|
||||
}
|
||||
if (!tmpTag.getDataLen()){
|
||||
if (!tmpTag.getDataLen() || (tmpTag.needsInitData() && tmpTag.isInitData())){
|
||||
return getNext();
|
||||
}
|
||||
thisPacket.genericFill(tmpTag.tagTime(), tmpTag.offset(), tmpTag.getTrackID(), tmpTag.getData(), tmpTag.getDataLen(), lastBytePos, tmpTag.isKeyframe); //init packet from tmpTags data
|
||||
|
@ -104,14 +108,14 @@ namespace Mist {
|
|||
//We will seek to the corresponding keyframe of the video track if selected, otherwise audio keyframe.
|
||||
//Flv files are never multi-track, so track 1 is video, track 2 is audio.
|
||||
int trackSeek = (selectedTracks.count(1) ? 1 : 2);
|
||||
size_t seekPos = myMeta.tracks[trackSeek].keys[0].getBpos();
|
||||
uint64_t seekPos = myMeta.tracks[trackSeek].keys[0].getBpos();
|
||||
for (unsigned int i = 0; i < myMeta.tracks[trackSeek].keys.size(); i++){
|
||||
if (myMeta.tracks[trackSeek].keys[i].getTime() > seekTime){
|
||||
break;
|
||||
}
|
||||
seekPos = myMeta.tracks[trackSeek].keys[i].getBpos();
|
||||
}
|
||||
fseek(inFile, seekPos, SEEK_SET);
|
||||
Util::fseek(inFile, seekPos, SEEK_SET);
|
||||
}
|
||||
|
||||
void inputFLV::trackSelect(std::string trackSpec) {
|
||||
|
|
25
src/io.cpp
25
src/io.cpp
|
@ -482,16 +482,14 @@ namespace Mist {
|
|||
unsigned long tid = packet.getTrackId();
|
||||
//Do nothing if the trackid is invalid
|
||||
if (!tid) {
|
||||
INFO_MSG("Packet without trackid");
|
||||
WARN_MSG("Packet without trackid!");
|
||||
return;
|
||||
}
|
||||
//If the track is not negotiated yet, start the negotiation
|
||||
if (!trackState.count(tid)) {
|
||||
continueNegotiate(tid, myMeta);
|
||||
}
|
||||
//negotiate track ID if needed
|
||||
continueNegotiate(tid, myMeta);
|
||||
//If the track is declined, stop here
|
||||
if (trackState[tid] == FILL_DEC) {
|
||||
INFO_MSG("Track %lu Declined", tid);
|
||||
INFO_MSG("Track %lu declined", tid);
|
||||
preBuffer[tid].clear();
|
||||
return;
|
||||
}
|
||||
|
@ -499,9 +497,12 @@ namespace Mist {
|
|||
if (trackState[tid] != FILL_ACC) {
|
||||
preBuffer[tid].push_back(packet);
|
||||
}else{
|
||||
while (preBuffer[tid].size()){
|
||||
bufferSinglePacket(preBuffer[tid].front(), myMeta);
|
||||
preBuffer[tid].pop_front();
|
||||
if (preBuffer[tid].size()){
|
||||
INFO_MSG("Track %lu accepted", tid);
|
||||
while (preBuffer[tid].size()){
|
||||
bufferSinglePacket(preBuffer[tid].front(), myMeta);
|
||||
preBuffer[tid].pop_front();
|
||||
}
|
||||
}
|
||||
bufferSinglePacket(packet, myMeta);
|
||||
}
|
||||
|
@ -513,9 +514,7 @@ namespace Mist {
|
|||
//This update needs to happen whether the track is accepted or not.
|
||||
bool isKeyframe = false;
|
||||
if (myMeta.tracks[tid].type == "video") {
|
||||
if (packet.hasMember("keyframe") && packet.getFlag("keyframe")) {
|
||||
isKeyframe = true;
|
||||
}
|
||||
isKeyframe = packet.getFlag("keyframe");
|
||||
} else {
|
||||
if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0) {
|
||||
//Assume this is the first packet on the track
|
||||
|
@ -532,6 +531,7 @@ namespace Mist {
|
|||
//This also happens in bufferNext, with the same rules
|
||||
if (myMeta.live){
|
||||
if (packet.getTime() > 0xFFFF0000 && !myMeta.tracks[tid].lastms){
|
||||
INFO_MSG("Ignoring packet with unexpected timestamp");
|
||||
return;//ignore bullshit timestamps
|
||||
}
|
||||
if (packet.getTime() < myMeta.tracks[tid].lastms){
|
||||
|
@ -574,6 +574,7 @@ namespace Mist {
|
|||
}
|
||||
//If we have no pages by track, we have not received a starting keyframe yet. Drop this packet.
|
||||
if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0){
|
||||
INFO_MSG("Track %lu not starting with a keyframe!", tid);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -556,6 +556,7 @@ namespace Mist {
|
|||
nProxy.curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
|
||||
if (!(nProxy.curPage[trackId].mapped)){
|
||||
FAIL_MSG("Initializing page %s failed", nProxy.curPage[trackId].name.c_str());
|
||||
currKeyOpen.erase(trackId);
|
||||
return;
|
||||
}
|
||||
currKeyOpen[trackId] = pageNum;
|
||||
|
@ -978,7 +979,7 @@ namespace Mist {
|
|||
}
|
||||
//if this is a live stream, we might have just reached the live point.
|
||||
//check where the next key is
|
||||
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
|
||||
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time);
|
||||
int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1);
|
||||
//if the next key hasn't shown up on another page, then we're waiting.
|
||||
//VoD might be slow, so we check VoD case also, just in case
|
||||
|
|
|
@ -426,7 +426,11 @@ namespace Mist {
|
|||
data_len += dheader_len;
|
||||
|
||||
unsigned int timestamp = thisPacket.getTime() - rtmpOffset;
|
||||
if (rtmpOffset > thisPacket.getTime()){timestamp = 0;}//make sure we don't go negative
|
||||
//make sure we don't go negative
|
||||
if (rtmpOffset > thisPacket.getTime()){
|
||||
timestamp = 0;
|
||||
rtmpOffset = thisPacket.getTime();
|
||||
}
|
||||
|
||||
bool allow_short = RTMPStream::lastsend.count(4);
|
||||
RTMPStream::Chunk & prev = RTMPStream::lastsend[4];
|
||||
|
@ -1153,16 +1157,18 @@ namespace Mist {
|
|||
}else{
|
||||
amf_storage = &(pushMeta.begin()->second);
|
||||
}
|
||||
JSON::Value pack_out = F.toJSON(myMeta, *amf_storage, next.cs_id*3 + (F.data[0] == 0x09 ? 0 : (F.data[0] == 0x08 ? 1 : 2) ));
|
||||
if ( !pack_out.isNull()){
|
||||
|
||||
unsigned int reTrack = next.cs_id*3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3));
|
||||
F.toMeta(myMeta, *amf_storage, reTrack);
|
||||
if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){
|
||||
thisPacket.genericFill(F.tagTime(), F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
|
||||
if (!nProxy.userClient.getData()){
|
||||
char userPageName[NAME_BUFFER_SIZE];
|
||||
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
|
||||
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
|
||||
}
|
||||
continueNegotiate(pack_out["trackid"].asInt());
|
||||
nProxy.streamName = streamName;
|
||||
bufferLivePacket(pack_out);
|
||||
bufferLivePacket(thisPacket);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue