diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 77748515..084eddb2 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -230,7 +230,7 @@ namespace Mist { return false; } } - DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); + HIGH_MSG("Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); //remove all parts of this key for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) { myMeta.tracks[tid].parts.pop_front(); @@ -248,8 +248,8 @@ namespace Mist { //if there is more than one page buffered for this track... if (bufferLocations[tid].size() > 1) { //Check if the first key starts on the second page or higher - if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active) { - DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); + if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){ + HIGH_MSG("Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); bufferRemove(tid, bufferLocations[tid].begin()->first); nProxy.curPageNum.erase(tid); @@ -261,7 +261,7 @@ namespace Mist { bufferLocations[tid].erase(bufferLocations[tid].begin()); } else { - DEBUG_MSG(DLVL_HIGH, "%lu still on first page (%lu - %lu)", myMeta.tracks[tid].keys[0].getNumber(), bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); + VERYHIGH_MSG("%lu still on first page (%lu - %lu)", myMeta.tracks[tid].keys[0].getNumber(), bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); } } return true; diff --git a/src/io.cpp b/src/io.cpp index 28772762..d669de59 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -255,6 +255,10 @@ namespace Mist { void negotiationProxy::bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta) { //Save the trackid of the track for easier access unsigned long tid = pack.getTrackId(); + if (pack.getTime() < myMeta.tracks[tid].lastms){ + INFO_MSG("Wrong order packet ignored: %lu < %lu", pack.getTime(), myMeta.tracks[tid].lastms); + return; + } unsigned long mapTid = trackMap[tid]; //Do nothing if no page is opened for this track if (!curPage.count(tid)) { @@ -459,7 +463,7 @@ namespace Mist { if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE || packet.getTime() - tmpIt->second.firstTime > FLIP_TARGET_DURATION) { //Create the book keeping data for the new page nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum; - INFO_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum); + HIGH_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum); pagesByTrack[tid][nextPageNum].dataSize = DEFAULT_DATA_PAGE_SIZE; pagesByTrack[tid][nextPageNum].pageNum = nextPageNum; pagesByTrack[tid][nextPageNum].firstTime = packet.getTime(); diff --git a/src/output/output.cpp b/src/output/output.cpp index 139e1cdd..ae7d6ec7 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -13,6 +13,10 @@ #include #include "output.h" +#ifndef MIN_DELAY +#define MIN_DELAY 2500 +#endif + namespace Mist { JSON::Value Output::capa = JSON::Value(); @@ -451,6 +455,12 @@ namespace Mist { currKeyOpen[trackId] = pageNum; VERYHIGH_MSG("Page %s loaded for %s", id, streamName.c_str()); } + + ///Return the current time of the media buffer, or 0 if no buffer available. + uint64_t Output::currentTime(){ + if (!buffer.size()){return 0;} + return buffer.begin()->time; + } /// Prepares all tracks from selectedTracks for seeking to the specified ms position. void Output::seek(unsigned long long pos){ @@ -531,7 +541,7 @@ namespace Mist { /// This function decides where in the stream initial playback starts. /// The default implementation calls seek(0) for VoD. - /// For live, it seeks to the last sync'ed keyframe of the main track, no closer than 2.5s from the end. + /// For live, it seeks to the last sync'ed keyframe of the main track, no closer than MIN_DELAY ms from the end. /// Unless lastms < 5000, then it seeks to the first keyframe of the main track. /// Aborts if there is no main track or it has no keyframes. void Output::initialSeek(){ @@ -547,7 +557,7 @@ namespace Mist { bool good = true; //check if all tracks have data for this point in time for (std::set::iterator ti = selectedTracks.begin(); ti != selectedTracks.end(); ++ti){ - if (myMeta.tracks[*ti].lastms < seekPos+2500){good = false; break;} + if (myMeta.tracks[*ti].lastms < seekPos+MIN_DELAY){good = false; break;} if (mainTrack == *ti){continue;}//skip self if (!myMeta.tracks.count(*ti)){ HIGH_MSG("Skipping track %lu, not in tracks", *ti); @@ -658,8 +668,8 @@ namespace Mist { } stats(); } - onFinish(); MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data"); + onFinish(); stats(true); nProxy.userClient.finish(); @@ -845,12 +855,12 @@ namespace Mist { if (thisPacket.getTime() != nxt.time && nxt.time && !atLivePoint){ static int warned = 0; if (warned < 5){ - WARN_MSG("Loaded %s track %ld@%llu in stead of %u@%llu (%dms, %s)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str()); + WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); if (++warned == 5){ WARN_MSG("Further warnings about time mismatches printed on HIGH level."); } }else{ - HIGH_MSG("Loaded %s track %ld@%llu in stead of %u@%llu (%dms, %s)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str()); + HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset); } } @@ -869,7 +879,9 @@ namespace Mist { nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); } if (myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime() != thisPacket.getTime()){ - WARN_MSG("Keyframe value is not correct - state will now be inconsistent."); + WARN_MSG("Keyframe value is not correct (%llu != %llu) - state will now be inconsistent; resetting", myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime(), thisPacket.getTime()); + initialSeek(); + return false; } EXTREME_MSG("Track %u @ %llums = key %lu", nxt.tid, thisPacket.getTime(), nxtKeyNum[nxt.tid]); } diff --git a/src/output/output.h b/src/output/output.h index bb7f734e..3cbd0c5c 100644 --- a/src/output/output.h +++ b/src/output/output.h @@ -46,6 +46,7 @@ namespace Mist { void seek(unsigned long long pos); bool seek(unsigned int tid, unsigned long long pos, bool getNextKey = false); void stop(); + uint64_t currentTime(); void setBlocking(bool blocking); long unsigned int getMainSelectedTrack(); void updateMeta(); diff --git a/src/output/output_rtmp.cpp b/src/output/output_rtmp.cpp index d00fa204..549189f6 100644 --- a/src/output/output_rtmp.cpp +++ b/src/output/output_rtmp.cpp @@ -488,6 +488,7 @@ namespace Mist { amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); sendCommand(amfreply, 20, 1); + stop(); return; } if (amfData.getContentP(0)->StrValue() == "deleteStream") { @@ -679,13 +680,8 @@ namespace Mist { amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - uint64_t earliestTime = 0xffffffffffffffff; - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks.count(*it) && myMeta.tracks[*it].firstms < earliestTime){ - earliestTime = myMeta.tracks[*it].firstms; - } - } - rtmpOffset = earliestTime; + initialSeek(); + rtmpOffset = currentTime(); amfreply.getContentP(3)->addContent(AMF::Object("timecodeOffset", (double)rtmpOffset)); sendCommand(amfreply, playMessageType, playStreamId); RTMPStream::chunk_snd_max = 10240000; //10000KiB @@ -744,13 +740,8 @@ namespace Mist { amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); - uint64_t earliestTime = 0xffffffffffffffff; - for (std::set::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ - if (myMeta.tracks.count(*it) && myMeta.tracks[*it].firstms < earliestTime){ - earliestTime = myMeta.tracks[*it].firstms; - } - } - rtmpOffset = earliestTime; + initialSeek(); + rtmpOffset = currentTime(); amfreply.getContentP(3)->addContent(AMF::Object("timecodeOffset", (double)rtmpOffset)); sendCommand(amfreply, playMessageType, playStreamId); RTMPStream::chunk_snd_max = 10240000; //10000KiB