diff --git a/lib/dtsc.cpp b/lib/dtsc.cpp index 496f7683..9c07bdf2 100644 --- a/lib/dtsc.cpp +++ b/lib/dtsc.cpp @@ -1036,6 +1036,9 @@ namespace DTSC{ setID(tIdx, trak.getMember("trackid").asInt()); setFirstms(tIdx, trak.getMember("firstms").asInt()); setLastms(tIdx, trak.getMember("lastms").asInt()); + if (trak.hasMember("nowms")){ + setNowms(tIdx, trak.getMember("nowms").asInt()); + } setBps(tIdx, trak.getMember("bps").asInt()); setMaxBps(tIdx, trak.getMember("maxbps").asInt()); setSourceTrack(tIdx, INVALID_TRACK_ID); @@ -1774,6 +1777,7 @@ namespace DTSC{ t.track.addField("codec", RAX_STRING, 8); t.track.addField("firstms", RAX_64UINT); t.track.addField("lastms", RAX_64UINT); + t.track.addField("nowms", RAX_64UINT); t.track.addField("bps", RAX_32UINT); t.track.addField("maxbps", RAX_32UINT); t.track.addField("lang", RAX_STRING, 4); @@ -2014,6 +2018,9 @@ namespace DTSC{ void Meta::setLastms(size_t trackIdx, uint64_t lastms){ DTSC::Track &t = tracks.at(trackIdx); t.track.setInt(t.trackLastmsField, lastms); + if (t.trackNowmsField && t.track.getInt(t.trackNowmsField) < lastms){ + t.track.setInt(t.trackNowmsField, lastms); + } } uint64_t Meta::getLastms(size_t trackIdx) const{ const DTSC::Track &t = tracks.find(trackIdx)->second; @@ -2539,6 +2546,7 @@ namespace DTSC{ t.fragments.setInt(t.fragmentSizeField, t.fragments.getInt(t.fragmentSizeField, lastFragNum) + packDataSize, lastFragNum); t.track.setInt(t.trackLastmsField, packTime); + t.track.setInt(t.trackNowmsField, packTime); markUpdated(tNumber); } @@ -3202,7 +3210,7 @@ namespace DTSC{ const Util::RelAccX &keys = trk.keys; const Util::RelAccX &parts = trk.parts; if (!keys.getEndPos()){return INVALID_KEY_NUM;} - size_t res = keys.getStartPos(); + size_t res = keys.getDeleted(); for (size_t i = res; i < keys.getEndPos(); i++){ if (keys.getInt(trk.keyTimeField, i) > time){ //It's possible we overshot our timestamp, but the previous key does not contain it. diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index c91be57c..cda04feb 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -192,6 +192,7 @@ namespace Mist{ Playlist::Playlist(const std::string &uriSource){ nextUTC = 0; + oUTC = 0; id = 0; // to be set later //If this is the copy constructor, just be silent. std::string uriSrc; @@ -521,7 +522,6 @@ namespace Mist{ DONTEVEN_MSG("Reloading playlist '%s'", uri.c_str()); while (std::getline(input, line)){ - if (input.eof()){break;} // Skip last line, might be incomplete DONTEVEN_MSG("Parsing line '%s'", line.c_str()); cleanLine(line); if (line.empty()){continue;}// skip empty lines @@ -579,19 +579,23 @@ namespace Mist{ if (val == "VOD"){ streamIsVOD = true; streamIsLive = false; + INFO_MSG("SIL=F"); }else if (val == "LIVE"){ streamIsVOD = false; streamIsLive = true; + INFO_MSG("SIL=T"); }else if (val == "EVENT"){ streamIsVOD = true; streamIsLive = true; + INFO_MSG("SIL=T"); } } // Once we see this tag, the entire playlist becomes VOD if (key == "ENDLIST"){ - streamIsVOD = true; streamIsLive = false; + INFO_MSG("SIL=F"); + streamIsVOD = true; } continue; } @@ -667,7 +671,28 @@ namespace Mist{ if (entry.duration * 1000 > DTSC::veryUglyJitterOverride){ DTSC::veryUglyJitterOverride = entry.duration * 1000; } + + { + tthread::lock_guard guard(entryMutex); + if (id && listEntries[id].size()){ + // If the UTC has gone backwards, shift forward. + playListEntries & prev = listEntries[id].back(); + if (nextUTC && prev.mUTC && nextUTC < prev.mUTC + prev.duration * 1000){ + WARN_MSG("UTC time went from %s to %s; adjusting!", Util::getUTCStringMillis(prev.mUTC + prev.duration * 1000).c_str(), Util::getUTCStringMillis(nextUTC).c_str()); + // Reset UTC time, this will cause the next check to set it correctly + nextUTC = 0; + } + // If we ever had a UTC time, ensure it's set for all segments going forward + if (!nextUTC && prev.mUTC){ + nextUTC = prev.mUTC + (uint64_t)(prev.duration * 1000); + } + } + } + entry.mUTC = nextUTC; + if (nextUTC && !oUTC){ + oUTC = nextUTC - (lastTimestamp + startTime); + } if (key.size() && iv.size()){ memcpy(entry.ivec, iv.data(), 16); @@ -677,15 +702,13 @@ namespace Mist{ memset(entry.keyAES, 0, 16); } - if (!isUrl()){ - std::ifstream fileSource; - std::string test = root.link(entry.filename).getFilePath(); - fileSource.open(test.c_str(), std::ios::ate | std::ios::binary); - if (!fileSource.good()){WARN_MSG("file: %s, error: %s", test.c_str(), strerror(errno));} + // Base timestamp of entry on UTC time if we have it, otherwise on a simple addition of the previous duration + if (nextUTC && oUTC){ + entry.timestamp = nextUTC - oUTC; + }else{ + entry.timestamp = lastTimestamp + startTime; } - - entry.timestamp = lastTimestamp + startTime; - lastTimestamp += duration; + lastTimestamp = entry.timestamp - startTime + duration; { tthread::lock_guard guard(entryMutex); // Set a playlist ID if we haven't assigned one yet. @@ -773,15 +796,11 @@ namespace Mist{ Util::logExitReason(ER_UNKNOWN, "Failed to load HLS playlist, aborting"); return false; } - + // Segments can be added (and removed if VOD is false) - if (streamIsLive){ - meta.setLive(true); - } + meta.setLive(streamIsLive); // Segments can not be removed - if (streamIsVOD){ - meta.setVod(true); - } + meta.setVod(streamIsVOD); return true; } @@ -906,7 +925,7 @@ namespace Mist{ tsStream.clear(); uint32_t entId = 0; bool foundAtLeastOnePacket = false; - INFO_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); + VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); for (std::deque::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end() && config->is_active; entryIt++){ @@ -934,13 +953,15 @@ namespace Mist{ tsStream.initializeMetadata(meta, tmpTrackId, packetId); idx = M.trackIDToIndex(packetId, getpid()); } - headerPack.getString("data", data, dataLen); + if (!streamIsLive){ + headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; + DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); + } tsStream.getEarliestPacket(headerPack); foundAtLeastOnePacket = true; } @@ -965,12 +986,14 @@ namespace Mist{ idx = M.trackIDToIndex(packetId, getpid()); } - headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); + if (!streamIsLive){ + headerPack.getString("data", data, dataLen); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; + DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); + } tsStream.getEarliestPacket(headerPack); } // Finally save the offset as part of the TS segment. This is required for bufferframe @@ -988,11 +1011,18 @@ namespace Mist{ streamStatus.mapped[1] = (255 * currentSegment) / totalSegments; } - // Init segment counters to what was set to MEDIA-SEQUENCE - parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment; + // If live, don't actually parse anything. + // If non-live, we read all the segments + if (streamIsLive){ + parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex; + }else{ + parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment; + } - // For non-vod, only parse the first segment for each playlist - if (!streamIsVOD && foundAtLeastOnePacket){break;} + // For still-appending streams, only parse the first segment for each playlist + if (streamIsLive && foundAtLeastOnePacket){ + break; + } } } if (!config->is_active){return false;} @@ -1088,7 +1118,7 @@ namespace Mist{ if (!hasOffset && curList.at(segmentIndex).mUTC){ hasOffset = true; DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; - MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); + INFO_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; } if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ @@ -1642,7 +1672,7 @@ namespace Mist{ if (codecSupported){ - ret = readPlaylist(playlistRootPath.link(line), line, fullInit); + ret |= readPlaylist(playlistRootPath.link(line), line, fullInit); }else{ INFO_MSG("skipping variant playlist %s, none of the codecs are supported", playlistRootPath.link(line).getUrl().c_str()); @@ -1658,7 +1688,7 @@ namespace Mist{ int pos = line.find("URI"); if (pos != std::string::npos){ mediafile = line.substr(pos + 5, line.length() - pos - 6); - ret = readPlaylist(playlistRootPath.link(mediafile), mediafile, fullInit); + ret |= readPlaylist(playlistRootPath.link(mediafile), mediafile, fullInit); } } @@ -1686,7 +1716,7 @@ namespace Mist{ } if (isRegularPls){ - ret = readPlaylist(playlistRootPath.getUrl(), "", fullInit); + ret |= readPlaylist(playlistRootPath.getUrl(), "", fullInit); } if (!isUrl){fileSource.close();} @@ -1719,7 +1749,7 @@ namespace Mist{ else{ urlBuffer = (fullInit ? "" : ";") + uri.getUrl() + "\n" + relurl; } - INFO_MSG("Adding playlist(s): %s", urlBuffer.c_str()); + VERYHIGH_MSG("Adding playlist(s): %s", urlBuffer.c_str()); tthread::thread runList(playlistRunner, (void *)urlBuffer.data()); runList.detach(); // Abandon the thread, it's now running independently uint32_t timeout = 0; @@ -1740,7 +1770,7 @@ namespace Mist{ currentIndex++; tthread::lock_guard guard(entryMutex); std::deque &curList = listEntries[currentPlaylist]; - INFO_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); + HIGH_MSG("Current playlist contains %zu entries. Current index is %zu in playlist %" PRIu64, curList.size(), currentIndex, currentPlaylist); if (curList.size() <= currentIndex){ if (streamIsLive){ INFO_MSG("Reached last entry in playlist %" PRIu64 "; waiting for more segments", currentPlaylist); @@ -1801,7 +1831,7 @@ namespace Mist{ } void inputHLS::finish(){ - if (!streamIsVOD){ //< Already generated from readHeader + if (streamIsLive){ //< Already generated from readHeader INFO_MSG("Writing updated header to disk"); injectLocalVars(); M.toFile(HTTP::localURIResolver().link(config->getString("input") + ".dtsh").getUrl()); diff --git a/src/input/input_hls.h b/src/input/input_hls.h index c805e108..b296f329 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -26,9 +26,9 @@ namespace Mist{ std::string relative_filename; uint64_t bytePos; uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known - float duration; - uint64_t timestamp; - int64_t timeOffset; + float duration; ///< Duration of entry in seconds + uint64_t timestamp; ///< zUTC-based timestamp for this entry + int64_t timeOffset; ///< Value timestamps in the media are shifted by to get zUTC-based timestamps uint64_t wait; char ivec[16]; char keyAES[16]; @@ -96,6 +96,7 @@ namespace Mist{ uint64_t waitTime; uint64_t lastTimestamp; uint64_t startTime; + int64_t oUTC; uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist char keyAES[16]; std::map keys; diff --git a/src/output/output.cpp b/src/output/output.cpp index fa3f2193..eaac40ce 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -896,6 +896,23 @@ namespace Mist{ while (tmp.time < pos && tmpPack){ tmp.offset += tmpPack.getDataLen(); tmpPack.reInit(mpd + tmp.offset, 0, true); + if (!tmpPack){ + nowMs = M.getNowms(tid); + if (M.getLastms(tid) <= tmp.time && nowMs > tmp.time){ + // Okay, we're awaiting more data, let's insert a ghost packet instead. + break; + } + uint64_t timeOut = Util::bootMS() + 10000; + while (Util::bootMS() < timeOut && !tmpPack){ + Util::sleep(50); + tmpPack.reInit(mpd + tmp.offset, 0, true); + } + if (!tmpPack){ + WARN_MSG("Aborting seek to %" PRIu64 "ms in track %zu: timeout", pos, tid); + userSelect.erase(tid); + return false; + } + } tmp.time = tmpPack.getTime(); } if (tmpPack){