Various stability and verbosity tweaks for DTSC, RTMP and in general

This commit is contained in:
Thulinma 2016-07-23 13:11:25 +02:00
parent 4eb0a7c56c
commit b7c3d7dc44
5 changed files with 33 additions and 25 deletions

View file

@ -230,7 +230,7 @@ namespace Mist {
return false; return false;
} }
} }
DEBUG_MSG(DLVL_HIGH, "Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber()); HIGH_MSG("Erasing key %d:%lu", tid, myMeta.tracks[tid].keys[0].getNumber());
//remove all parts of this key //remove all parts of this key
for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) { for (int i = 0; i < myMeta.tracks[tid].keys[0].getParts(); i++) {
myMeta.tracks[tid].parts.pop_front(); myMeta.tracks[tid].parts.pop_front();
@ -248,8 +248,8 @@ namespace Mist {
//if there is more than one page buffered for this track... //if there is more than one page buffered for this track...
if (bufferLocations[tid].size() > 1) { if (bufferLocations[tid].size() > 1) {
//Check if the first key starts on the second page or higher //Check if the first key starts on the second page or higher
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active) { if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active){
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); HIGH_MSG("Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
bufferRemove(tid, bufferLocations[tid].begin()->first); bufferRemove(tid, bufferLocations[tid].begin()->first);
nProxy.curPageNum.erase(tid); nProxy.curPageNum.erase(tid);
@ -261,7 +261,7 @@ namespace Mist {
bufferLocations[tid].erase(bufferLocations[tid].begin()); bufferLocations[tid].erase(bufferLocations[tid].begin());
} else { } else {
DEBUG_MSG(DLVL_HIGH, "%lu still on first page (%lu - %lu)", myMeta.tracks[tid].keys[0].getNumber(), bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1); VERYHIGH_MSG("%lu still on first page (%lu - %lu)", myMeta.tracks[tid].keys[0].getNumber(), bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
} }
} }
return true; return true;

View file

@ -255,6 +255,10 @@ namespace Mist {
void negotiationProxy::bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta) { void negotiationProxy::bufferNext(DTSC::Packet & pack, DTSC::Meta & myMeta) {
//Save the trackid of the track for easier access //Save the trackid of the track for easier access
unsigned long tid = pack.getTrackId(); unsigned long tid = pack.getTrackId();
if (pack.getTime() < myMeta.tracks[tid].lastms){
INFO_MSG("Wrong order packet ignored: %lu < %lu", pack.getTime(), myMeta.tracks[tid].lastms);
return;
}
unsigned long mapTid = trackMap[tid]; unsigned long mapTid = trackMap[tid];
//Do nothing if no page is opened for this track //Do nothing if no page is opened for this track
if (!curPage.count(tid)) { if (!curPage.count(tid)) {
@ -459,7 +463,7 @@ namespace Mist {
if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE || packet.getTime() - tmpIt->second.firstTime > FLIP_TARGET_DURATION) { if (tmpIt->second.curOffset > FLIP_DATA_PAGE_SIZE || packet.getTime() - tmpIt->second.firstTime > FLIP_TARGET_DURATION) {
//Create the book keeping data for the new page //Create the book keeping data for the new page
nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum; nextPageNum = tmpIt->second.pageNum + tmpIt->second.keyNum;
INFO_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum); HIGH_MSG("We should go to next page now, transition from %lu to %d", tmpIt->second.pageNum, nextPageNum);
pagesByTrack[tid][nextPageNum].dataSize = DEFAULT_DATA_PAGE_SIZE; pagesByTrack[tid][nextPageNum].dataSize = DEFAULT_DATA_PAGE_SIZE;
pagesByTrack[tid][nextPageNum].pageNum = nextPageNum; pagesByTrack[tid][nextPageNum].pageNum = nextPageNum;
pagesByTrack[tid][nextPageNum].firstTime = packet.getTime(); pagesByTrack[tid][nextPageNum].firstTime = packet.getTime();

View file

@ -13,6 +13,10 @@
#include <mist/timing.h> #include <mist/timing.h>
#include "output.h" #include "output.h"
#ifndef MIN_DELAY
#define MIN_DELAY 2500
#endif
namespace Mist { namespace Mist {
JSON::Value Output::capa = JSON::Value(); JSON::Value Output::capa = JSON::Value();
@ -451,6 +455,12 @@ namespace Mist {
currKeyOpen[trackId] = pageNum; currKeyOpen[trackId] = pageNum;
VERYHIGH_MSG("Page %s loaded for %s", id, streamName.c_str()); VERYHIGH_MSG("Page %s loaded for %s", id, streamName.c_str());
} }
///Return the current time of the media buffer, or 0 if no buffer available.
uint64_t Output::currentTime(){
if (!buffer.size()){return 0;}
return buffer.begin()->time;
}
/// Prepares all tracks from selectedTracks for seeking to the specified ms position. /// Prepares all tracks from selectedTracks for seeking to the specified ms position.
void Output::seek(unsigned long long pos){ void Output::seek(unsigned long long pos){
@ -531,7 +541,7 @@ namespace Mist {
/// This function decides where in the stream initial playback starts. /// This function decides where in the stream initial playback starts.
/// The default implementation calls seek(0) for VoD. /// The default implementation calls seek(0) for VoD.
/// For live, it seeks to the last sync'ed keyframe of the main track, no closer than 2.5s from the end. /// For live, it seeks to the last sync'ed keyframe of the main track, no closer than MIN_DELAY ms from the end.
/// Unless lastms < 5000, then it seeks to the first keyframe of the main track. /// Unless lastms < 5000, then it seeks to the first keyframe of the main track.
/// Aborts if there is no main track or it has no keyframes. /// Aborts if there is no main track or it has no keyframes.
void Output::initialSeek(){ void Output::initialSeek(){
@ -547,7 +557,7 @@ namespace Mist {
bool good = true; bool good = true;
//check if all tracks have data for this point in time //check if all tracks have data for this point in time
for (std::set<unsigned long>::iterator ti = selectedTracks.begin(); ti != selectedTracks.end(); ++ti){ for (std::set<unsigned long>::iterator ti = selectedTracks.begin(); ti != selectedTracks.end(); ++ti){
if (myMeta.tracks[*ti].lastms < seekPos+2500){good = false; break;} if (myMeta.tracks[*ti].lastms < seekPos+MIN_DELAY){good = false; break;}
if (mainTrack == *ti){continue;}//skip self if (mainTrack == *ti){continue;}//skip self
if (!myMeta.tracks.count(*ti)){ if (!myMeta.tracks.count(*ti)){
HIGH_MSG("Skipping track %lu, not in tracks", *ti); HIGH_MSG("Skipping track %lu, not in tracks", *ti);
@ -658,8 +668,8 @@ namespace Mist {
} }
stats(); stats();
} }
onFinish();
MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data"); MEDIUM_MSG("MistOut client handler shutting down: %s, %s, %s", myConn.connected() ? "conn_active" : "conn_closed", wantRequest ? "want_request" : "no_want_request", parseData ? "parsing_data" : "not_parsing_data");
onFinish();
stats(true); stats(true);
nProxy.userClient.finish(); nProxy.userClient.finish();
@ -845,12 +855,12 @@ namespace Mist {
if (thisPacket.getTime() != nxt.time && nxt.time && !atLivePoint){ if (thisPacket.getTime() != nxt.time && nxt.time && !atLivePoint){
static int warned = 0; static int warned = 0;
if (warned < 5){ if (warned < 5){
WARN_MSG("Loaded %s track %ld@%llu in stead of %u@%llu (%dms, %s)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str()); WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset);
if (++warned == 5){ if (++warned == 5){
WARN_MSG("Further warnings about time mismatches printed on HIGH level."); WARN_MSG("Further warnings about time mismatches printed on HIGH level.");
} }
}else{ }else{
HIGH_MSG("Loaded %s track %ld@%llu in stead of %u@%llu (%dms, %s)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str()); HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset);
} }
} }
@ -869,7 +879,9 @@ namespace Mist {
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime()); nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
} }
if (myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime() != thisPacket.getTime()){ if (myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime() != thisPacket.getTime()){
WARN_MSG("Keyframe value is not correct - state will now be inconsistent."); WARN_MSG("Keyframe value is not correct (%llu != %llu) - state will now be inconsistent; resetting", myMeta.tracks[nxt.tid].getKey(nxtKeyNum[nxt.tid]).getTime(), thisPacket.getTime());
initialSeek();
return false;
} }
EXTREME_MSG("Track %u @ %llums = key %lu", nxt.tid, thisPacket.getTime(), nxtKeyNum[nxt.tid]); EXTREME_MSG("Track %u @ %llums = key %lu", nxt.tid, thisPacket.getTime(), nxtKeyNum[nxt.tid]);
} }

View file

@ -46,6 +46,7 @@ namespace Mist {
void seek(unsigned long long pos); void seek(unsigned long long pos);
bool seek(unsigned int tid, unsigned long long pos, bool getNextKey = false); bool seek(unsigned int tid, unsigned long long pos, bool getNextKey = false);
void stop(); void stop();
uint64_t currentTime();
void setBlocking(bool blocking); void setBlocking(bool blocking);
long unsigned int getMainSelectedTrack(); long unsigned int getMainSelectedTrack();
void updateMeta(); void updateMeta();

View file

@ -488,6 +488,7 @@ namespace Mist {
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
sendCommand(amfreply, 20, 1); sendCommand(amfreply, 20, 1);
stop();
return; return;
} }
if (amfData.getContentP(0)->StrValue() == "deleteStream") { if (amfData.getContentP(0)->StrValue() == "deleteStream") {
@ -679,13 +680,8 @@ namespace Mist {
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
uint64_t earliestTime = 0xffffffffffffffff; initialSeek();
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ rtmpOffset = currentTime();
if (myMeta.tracks.count(*it) && myMeta.tracks[*it].firstms < earliestTime){
earliestTime = myMeta.tracks[*it].firstms;
}
}
rtmpOffset = earliestTime;
amfreply.getContentP(3)->addContent(AMF::Object("timecodeOffset", (double)rtmpOffset)); amfreply.getContentP(3)->addContent(AMF::Object("timecodeOffset", (double)rtmpOffset));
sendCommand(amfreply, playMessageType, playStreamId); sendCommand(amfreply, playMessageType, playStreamId);
RTMPStream::chunk_snd_max = 10240000; //10000KiB RTMPStream::chunk_snd_max = 10240000; //10000KiB
@ -744,13 +740,8 @@ namespace Mist {
amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!")); amfreply.getContentP(3)->addContent(AMF::Object("description", "Playing!"));
amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV")); amfreply.getContentP(3)->addContent(AMF::Object("details", "DDV"));
amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337)); amfreply.getContentP(3)->addContent(AMF::Object("clientid", (double)1337));
uint64_t earliestTime = 0xffffffffffffffff; initialSeek();
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ rtmpOffset = currentTime();
if (myMeta.tracks.count(*it) && myMeta.tracks[*it].firstms < earliestTime){
earliestTime = myMeta.tracks[*it].firstms;
}
}
rtmpOffset = earliestTime;
amfreply.getContentP(3)->addContent(AMF::Object("timecodeOffset", (double)rtmpOffset)); amfreply.getContentP(3)->addContent(AMF::Object("timecodeOffset", (double)rtmpOffset));
sendCommand(amfreply, playMessageType, playStreamId); sendCommand(amfreply, playMessageType, playStreamId);
RTMPStream::chunk_snd_max = 10240000; //10000KiB RTMPStream::chunk_snd_max = 10240000; //10000KiB