WS/MP4 reliability edits:

- Stream selecting now obeys new "maxdelay" capa entry
- Output::liveSeek now takes an optional bool argument to indicate only rate control should be applied (no seeking)
- dataWaitTimeout is now configurable per-output, defaults to the old 25s
- WS/MP4 uses the new liveSeek with rate-control only
- WS/MP4 uses the new dataWaitTimeout and sets it to 4.5s
- WS/MP4 uses the new maxdelay capa, sets it to 5s
- WS/MP4 will now auto-reselect tracks if a track is dropped for data wait timeout or disappeared from metadata reasons
- Added support for jitter information in WS/MP4 protocol
- Robustify sendWebsocketCodecData being ran when sendHeader is ran
- Fix race condition when switching video tracks before previous video track has sent a single frame
This commit is contained in:
Thulinma 2021-07-08 14:02:50 +02:00
parent a0eb42f1dd
commit 200e1e4a1c
6 changed files with 113 additions and 73 deletions

View file

@ -2294,7 +2294,7 @@ namespace DTSC{
MEDIUM_MSG("Jitter lowered from %" PRIu64 " to %" PRIu64 " ms", maxJitter, curJitter); MEDIUM_MSG("Jitter lowered from %" PRIu64 " to %" PRIu64 " ms", maxJitter, curJitter);
maxJitter = curJitter; maxJitter = curJitter;
} }
curJitter = maxJitter*0.75; curJitter = maxJitter*0.90;
} }
++x; ++x;
trueTime[x % 8] = curMs; trueTime[x % 8] = curMs;

View file

@ -1139,7 +1139,7 @@ std::set<size_t> Util::wouldSelect(const DTSC::Meta &M, const std::string &track
std::set<size_t> Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value &capa, std::set<size_t> Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value &capa,
const std::string &type, const std::string &UA){ const std::string &type, const std::string &UA){
std::set<size_t> validTracks = M.getValidTracks(); std::set<size_t> validTracks = M.getValidTracks();
uint64_t maxLastMs = 0;
std::set<size_t> toRemove; std::set<size_t> toRemove;
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){ for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
// Remove unrequested tracks // Remove unrequested tracks
@ -1208,10 +1208,27 @@ std::set<size_t> Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value
toRemove.insert(*it); toRemove.insert(*it);
} }
} }
//not removing this track? Keep track of highest lastMs
if (capa.isMember("maxdelay")){
uint64_t lMs = M.getLastms(*it);
if (lMs > maxLastMs){maxLastMs = lMs;}
}
} }
for (std::set<size_t>::iterator it = toRemove.begin(); it != toRemove.end(); it++){ for (std::set<size_t>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
validTracks.erase(*it); validTracks.erase(*it);
} }
//if there is a max delay configured, remove tracks that are further behind than this
if (capa.isMember("maxdelay")){
uint64_t maxDelay = capa["maxdelay"].asInt();
if (maxDelay > maxLastMs){maxDelay = maxLastMs;}
toRemove.clear();
for (std::set<size_t>::iterator it = validTracks.begin(); it != validTracks.end(); it++){
if (M.getLastms(*it) < maxLastMs - maxDelay){toRemove.insert(*it);}
}
for (std::set<size_t>::iterator it = toRemove.begin(); it != toRemove.end(); it++){
validTracks.erase(*it);
}
}
return validTracks; return validTracks;
} }

View file

@ -86,6 +86,7 @@ namespace Mist{
} }
Output::Output(Socket::Connection &conn) : myConn(conn){ Output::Output(Socket::Connection &conn) : myConn(conn){
dataWaitTimeout = 2500;
pushing = false; pushing = false;
recursingSync = false; recursingSync = false;
firstTime = 0; firstTime = 0;
@ -1097,7 +1098,7 @@ namespace Mist{
/// This function attempts to forward playback in live streams to a more live point. /// This function attempts to forward playback in live streams to a more live point.
/// It seeks to the last sync'ed keyframe of the main track, no closer than needsLookAhead+minKeepAway ms from the end. /// It seeks to the last sync'ed keyframe of the main track, no closer than needsLookAhead+minKeepAway ms from the end.
/// Aborts if not live, there is no main track or it has no keyframes. /// Aborts if not live, there is no main track or it has no keyframes.
bool Output::liveSeek(){ bool Output::liveSeek(bool rateOnly){
if (!realTime){return false;}//Makes no sense when playing in turbo mode if (!realTime){return false;}//Makes no sense when playing in turbo mode
uint64_t seekPos = 0; uint64_t seekPos = 0;
if (!meta.getLive()){return false;} if (!meta.getLive()){return false;}
@ -1113,7 +1114,7 @@ namespace Mist{
if (lMs - mKa - needsLookAhead - extraKeepAway > cTime + 50){ if (lMs - mKa - needsLookAhead - extraKeepAway > cTime + 50){
// We need to speed up! // We need to speed up!
uint64_t diff = (lMs - mKa - needsLookAhead - extraKeepAway) - cTime; uint64_t diff = (lMs - mKa - needsLookAhead - extraKeepAway) - cTime;
if (diff > 3000){ if (!rateOnly && diff > 3000){
noReturn = true; noReturn = true;
newSpeed = 1000; newSpeed = 1000;
}else if (diff > 1000){ }else if (diff > 1000){
@ -1687,7 +1688,7 @@ namespace Mist{
//Okay, there's no next page yet, and no next packet on this page either. //Okay, there's no next page yet, and no next packet on this page either.
//That means we're waiting for data to show up, somewhere. //That means we're waiting for data to show up, somewhere.
// after ~25 seconds, give up and drop the track. // after ~25 seconds, give up and drop the track.
if (++emptyCount >= 2500){ if (++emptyCount >= dataWaitTimeout){
dropTrack(nxt.tid, "EOP: data wait timeout"); dropTrack(nxt.tid, "EOP: data wait timeout");
return false; return false;
} }

View file

@ -62,7 +62,7 @@ namespace Mist{
static void listener(Util::Config &conf, int (*callback)(Socket::Connection &S)); static void listener(Util::Config &conf, int (*callback)(Socket::Connection &S));
virtual void initialSeek(); virtual void initialSeek();
uint64_t getMinKeepAway(); uint64_t getMinKeepAway();
virtual bool liveSeek(); virtual bool liveSeek(bool rateOnly = false);
virtual bool onFinish(){return false;} virtual bool onFinish(){return false;}
void reconnect(); void reconnect();
void disconnect(); void disconnect();
@ -115,6 +115,7 @@ namespace Mist{
uint64_t uaDelay; ///< Seconds to wait before setting the UA. uint64_t uaDelay; ///< Seconds to wait before setting the UA.
uint64_t lastRecv; uint64_t lastRecv;
uint64_t extraKeepAway; uint64_t extraKeepAway;
uint64_t dataWaitTimeout; ///< How long to wait for new packets before dropping a track, in tens of milliseconds.
uint64_t firstTime; ///< Time of first packet after last seek. Used for real-time sending. uint64_t firstTime; ///< Time of first packet after last seek. Used for real-time sending.
virtual std::string getConnectedHost(); virtual std::string getConnectedHost();
virtual std::string getConnectedBinHost(); virtual std::string getConnectedBinHost();

View file

@ -1243,7 +1243,6 @@ namespace Mist{
dropTrack(prevVidTrack, "Smoothly switching to new video track", false); dropTrack(prevVidTrack, "Smoothly switching to new video track", false);
prevVidTrack = INVALID_TRACK_ID; prevVidTrack = INVALID_TRACK_ID;
onIdle(); onIdle();
sendWebsocketCodecData("tracks");
sendHeader(); sendHeader();
/* /*
@ -1281,7 +1280,7 @@ namespace Mist{
webBuf.append(dataPointer, len); webBuf.append(dataPointer, len);
webSock->sendFrame(webBuf, webBuf.size(), 2); webSock->sendFrame(webBuf, webBuf.size(), 2);
if (stayLive && thisPacket.getFlag("keyframe")){liveSeek();} if (stayLive && thisPacket.getFlag("keyframe")){liveSeek(true);}
// We must return here, the rest of this function won't work for websockets. // We must return here, the rest of this function won't work for websockets.
return; return;
} }
@ -1395,7 +1394,11 @@ namespace Mist{
void OutMP4::sendHeader(){ void OutMP4::sendHeader(){
if (webSock) { if (webSock) {
if (!sentHeader){
sendWebsocketCodecData("codec_data");
}else{
sendWebsocketCodecData("tracks");
}
JSON::Value r; JSON::Value r;
r["type"] = "info"; r["type"] = "info";
r["data"]["msg"] = "Sending header"; r["data"]["msg"] = "Sending header";
@ -1445,9 +1448,11 @@ namespace Mist{
void OutMP4::onWebsocketConnect() { void OutMP4::onWebsocketConnect() {
capa["name"] = "MP4/WS"; capa["name"] = "MP4/WS";
capa["maxdelay"] = 5000;
fragSeqNum = 0; fragSeqNum = 0;
idleInterval = 1000; idleInterval = 1000;
maxSkipAhead = 0; maxSkipAhead = 0;
dataWaitTimeout = 450;
} }
void OutMP4::onWebsocketFrame() { void OutMP4::onWebsocketFrame() {
@ -1482,7 +1487,7 @@ namespace Mist{
} }
} }
selectDefaultTracks(); selectDefaultTracks();
sendWebsocketCodecData("codec_data"); sendHeader();
}else if (command["type"] == "seek") { }else if (command["type"] == "seek") {
handleWebsocketSeek(command); handleWebsocketSeek(command);
}else if (command["type"] == "pause") { }else if (command["type"] == "pause") {
@ -1513,6 +1518,24 @@ namespace Mist{
targetParams.erase("video"); targetParams.erase("video");
} }
} }
if (command.isMember("seek_time")){
possiblyReselectTracks(command["seek_time"].asInt());
}else{
possiblyReselectTracks(currentTime());
}
return;
}else if (command["type"] == "set_speed") {
handleWebsocketSetSpeed(command);
}else if (command["type"] == "stop") {
Util::logExitReason("User requested stop");
myConn.close();
}else if (command["type"] == "play") {
parseData = true;
if (command.isMember("seek_time")){handleWebsocketSeek(command);}
}
}
bool OutMP4::possiblyReselectTracks(uint64_t seekTarget){
// Remember the previous video track, if any. // Remember the previous video track, if any.
std::set<size_t> prevSelTracks; std::set<size_t> prevSelTracks;
prevVidTrack = INVALID_TRACK_ID; prevVidTrack = INVALID_TRACK_ID;
@ -1522,12 +1545,12 @@ namespace Mist{
prevVidTrack = it->first; prevVidTrack = it->first;
} }
} }
if (selectDefaultTracks()) { if (!selectDefaultTracks()) {
uint64_t seekTarget = currentTime();
if (command.isMember("seek_time")){
seekTarget = command["seek_time"].asInt();
prevVidTrack = INVALID_TRACK_ID; prevVidTrack = INVALID_TRACK_ID;
onIdle();
return false;
} }
if (seekTarget != currentTime()){prevVidTrack = INVALID_TRACK_ID;}
// Add the previous video track back, if we had one. // Add the previous video track back, if we had one.
if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack)){ if (prevVidTrack != INVALID_TRACK_ID && !userSelect.count(prevVidTrack)){
userSelect[prevVidTrack].reload(streamName, prevVidTrack); userSelect[prevVidTrack].reload(streamName, prevVidTrack);
@ -1543,7 +1566,6 @@ namespace Mist{
seek(seekTarget, true); seek(seekTarget, true);
realTime = 0; realTime = 0;
forwardTo = seekTarget; forwardTo = seekTarget;
sendWebsocketCodecData(command["type"]);
sendHeader(); sendHeader();
JSON::Value r; JSON::Value r;
r["type"] = "set_speed"; r["type"] = "set_speed";
@ -1560,7 +1582,6 @@ namespace Mist{
seek(seekTarget, true); seek(seekTarget, true);
realTime = 0; realTime = 0;
forwardTo = seekTarget; forwardTo = seekTarget;
sendWebsocketCodecData(command["type"]);
sendHeader(); sendHeader();
JSON::Value r; JSON::Value r;
r["type"] = "set_speed"; r["type"] = "set_speed";
@ -1573,21 +1594,7 @@ namespace Mist{
webSock->sendFrame(r.toString()); webSock->sendFrame(r.toString());
} }
onIdle(); onIdle();
return; return true;
}else{
prevVidTrack = INVALID_TRACK_ID;
}
onIdle();
return;
}else if (command["type"] == "set_speed") {
handleWebsocketSetSpeed(command);
}else if (command["type"] == "stop") {
Util::logExitReason("User requested stop");
myConn.close();
}else if (command["type"] == "play") {
parseData = true;
if (command.isMember("seek_time")){handleWebsocketSeek(command);}
}
} }
void OutMP4::sendWebsocketCodecData(const std::string& type) { void OutMP4::sendWebsocketCodecData(const std::string& type) {
@ -1717,9 +1724,14 @@ namespace Mist{
r["data"]["play_rate_curr"] = target_rate; r["data"]["play_rate_curr"] = target_rate;
} }
} }
uint64_t jitter = 0;
for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){ for (std::map<size_t, Comms::Users>::iterator it = userSelect.begin(); it != userSelect.end(); it++){
r["data"]["tracks"].append(it->first); r["data"]["tracks"].append(it->first);
if (jitter < M.getMinKeepAway(it->first)){jitter = M.getMinKeepAway(it->first);}
} }
r["data"]["jitter"] = jitter;
if (dataWaitTimeout < jitter*1.5){dataWaitTimeout = jitter*1.5;}
if (capa["maxdelay"].asInt() < jitter*1.5){capa["maxdelay"] = jitter*1.5;}
webSock->sendFrame(r.toString()); webSock->sendFrame(r.toString());
} }
@ -1739,5 +1751,12 @@ namespace Mist{
return false; return false;
} }
void OutMP4::dropTrack(size_t trackId, const std::string &reason, bool probablyBad){
if (webSock && (reason == "EOP: data wait timeout" || reason == "disappeared from metadata") && possiblyReselectTracks(currentTime())){
return;
}
return Output::dropTrack(trackId, reason, probablyBad);
}
}// namespace Mist }// namespace Mist

View file

@ -108,7 +108,9 @@ namespace Mist{
void onWebsocketFrame(); void onWebsocketFrame();
void onIdle(); void onIdle();
virtual bool onFinish(); virtual bool onFinish();
virtual void dropTrack(size_t trackId, const std::string &reason, bool probablyBad = true);
protected: protected:
bool possiblyReselectTracks(uint64_t seekTarget);
void sendWebsocketCodecData(const std::string& type); void sendWebsocketCodecData(const std::string& type);
bool handleWebsocketSeek(JSON::Value& command); bool handleWebsocketSeek(JSON::Value& command);
bool handleWebsocketSetSpeed(JSON::Value& command); bool handleWebsocketSetSpeed(JSON::Value& command);