diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 7da3e177..54355a07 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -112,10 +112,10 @@ namespace Mist{ static std::map playlistMapping; /// Local RAM buffer for recently accessed segments - std::map segBufs; + std::map segBufs; /// Order of adding/accessing for local RAM buffer of segments - std::deque segBufAccs; + std::deque segBufAccs; /// Order of adding/accessing sizes for local RAM buffer of segments std::deque segBufSize; @@ -264,6 +264,7 @@ namespace Mist{ bool SegmentDownloader::atEnd() const{ if (!isOpen || !currBuf){return true;} if (buffered){return currBuf->size() <= offset + 188;} + if (stopAtByte && (stopAtByte - startAtByte) <= offset + 188){return true;} return !segDL && currBuf->size() <= offset + 188; // return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); } @@ -334,6 +335,7 @@ namespace Mist{ }else{ if (!currBuf){return false;} size_t retries = 0; + if (stopAtByte && (stopAtByte - startAtByte) <= currBuf->size()){return false;} while (segDL && currBuf->size() < offset + 188 + 188){ size_t preSize = currBuf->size(); segDL.readSome(offset + 188 + 188 - currBuf->size(), *this); @@ -348,7 +350,7 @@ namespace Mist{ segDL.close(); return false; } - segDL.seek(currBuf->size()); + segDL.seek(startAtByte+currBuf->size()); } } if (currBuf->size() <= preSize){ @@ -387,7 +389,7 @@ namespace Mist{ segBufTotalSize += segBufSize.front(); } - size_t SegmentDownloader::getDataCallbackPos() const{return currBuf->size();} + size_t SegmentDownloader::getDataCallbackPos() const{return startAtByte+currBuf->size();} /// Attempts to read a single TS packet from the current segment, setting packetPtr on success void SegmentDownloader::close(){ @@ -401,33 +403,41 @@ namespace Mist{ std::string hexKey = printhex(entry.keyAES, 16); std::string hexIvec = printhex(entry.ivec, 16); - MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), + MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.shortName().c_str(), hexKey.c_str(), hexIvec.c_str()); + startAtByte = entry.startAtByte; + stopAtByte = entry.stopAtByte; offset = 0; firstPacket = true; - buffered = segBufs.count(entry.filename); + buffered = segBufs.count(entry); if (!buffered){ - HIGH_MSG("Reading non-cache: %s", entry.filename.c_str()); + HIGH_MSG("Reading non-cache: %s", entry.shortName().c_str()); if (!segDL.open(entry.filename)){ - FAIL_MSG("Could not open %s", entry.filename.c_str()); + FAIL_MSG("Could not open %s", entry.shortName().c_str()); return false; } if (!segDL){return false;} //Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times) while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){ - HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().c_str()); + HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().shortName().c_str()); segBufs.erase(segBufAccs.back()); segBufTotalSize -= segBufSize.back(); segBufAccs.pop_back(); segBufSize.pop_back(); } - segBufAccs.push_front(entry.filename); + segBufAccs.push_front(entry); segBufSize.push_front(0); - currBuf = &(segBufs[entry.filename]); + currBuf = &(segBufs[entry]); + // Non-seekable case is handled further down + if (segDL.isSeekable() && startAtByte){ + //Seek to startAtByte position, since it's not the beginning of the file + MEDIUM_MSG("Seeking to %zu", startAtByte); + segDL.seek(startAtByte); + } }else{ - HIGH_MSG("Reading from segment cache: %s", entry.filename.c_str()); - currBuf = &(segBufs[entry.filename]); + HIGH_MSG("Reading from segment cache: %s", entry.shortName().c_str()); + currBuf = &(segBufs[entry]); if (currBuf->rsize() != currBuf->size()){ MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize()); buffered = false; @@ -436,23 +446,31 @@ namespace Mist{ HTTP::URL B = HTTP::localURIResolver().link(entry.filename); if (A != B){ if (!segDL.open(entry.filename)){ - FAIL_MSG("Could not open %s", entry.filename.c_str()); + FAIL_MSG("Could not open %s", entry.shortName().c_str()); return false; } if (!segDL){return false;} //Seek to current position in segment for resuming - currBuf->truncate(currBuf->size() / 188 * 188); - MEDIUM_MSG("Seeking to %zu", currBuf->size()); - segDL.seek(currBuf->size()); + MEDIUM_MSG("Seeking to %zu", currBuf->size()+startAtByte); + segDL.seek(currBuf->size()+startAtByte); } } } if (!buffered){ // Allocate full size if known - if (segDL.getSize() != std::string::npos){currBuf->allocate(segDL.getSize());} + if (stopAtByte || segDL.getSize() != std::string::npos){currBuf->allocate(stopAtByte?(stopAtByte - startAtByte):segDL.getSize());} // Download full segment if not seekable, pretend it was cached all along if (!segDL.isSeekable()){ segDL.readAll(*this); + if (startAtByte || stopAtByte){ + WARN_MSG("Wasting data: downloaded whole segment due to unavailability of range requests, but caching only part of it"); + if (startAtByte){currBuf->shift(startAtByte);} + if (stopAtByte){currBuf->truncate(stopAtByte - startAtByte);} + //Overwrite the current segment size + segBufTotalSize -= segBufSize.front(); + segBufSize.front() = currBuf->size(); + segBufTotalSize += segBufSize.front(); + } buffered = true; } } @@ -487,6 +505,8 @@ namespace Mist{ std::string line; std::string key; std::string val; + float segDur = 0.0; + uint64_t startByte = std::string::npos, lenByte = 0; std::string keyMethod; std::string keyUri; @@ -527,6 +547,10 @@ namespace Mist{ cleanLine(line); if (line.empty()){continue;}// skip empty lines + if (line.compare(0, 7, "#EXTINF") == 0){ + segDur = atof(line.c_str() + 8); + continue; + } if (line.compare(0, 7, "#EXT-X-") == 0){ size_t pos = line.find(":"); key = line.substr(7, pos - 7); @@ -560,11 +584,26 @@ namespace Mist{ } keys.insert(std::pair(keyUri, std::string(keyPtr, keyLen))); } + continue; + } + + if (key == "BYTERANGE"){ + size_t atSign = val.find('@'); + if (atSign != std::string::npos){ + std::string len = val.substr(0, atSign); + std::string pos = val.substr(atSign+1); + lenByte = atoll(len.c_str()); + startByte = atoll(pos.c_str()); + }else{ + lenByte = atoll(val.c_str()); + } + continue; } if (key == "TARGETDURATION"){ waitTime = atoi(val.c_str()) / 2; if (waitTime < 2){waitTime = 2;} + continue; } // Assuming this always comes before any segment @@ -572,47 +611,45 @@ namespace Mist{ // Reinit the segment counter firstIndex = atoll(val.c_str()); bposCounter = firstIndex + 1; + continue; } - if (key == "PROGRAM-DATE-TIME"){nextUTC = ISO8601toUnixmillis(val);} + if (key == "PROGRAM-DATE-TIME"){ + nextUTC = ISO8601toUnixmillis(val); + continue; + } if (key == "PLAYLIST-TYPE"){ if (val == "VOD"){ streamIsVOD = true; streamIsLive = false; - INFO_MSG("SIL=F"); }else if (val == "LIVE"){ streamIsVOD = false; streamIsLive = true; - INFO_MSG("SIL=T"); }else if (val == "EVENT"){ streamIsVOD = true; streamIsLive = true; - INFO_MSG("SIL=T"); } + continue; } // Once we see this tag, the entire playlist becomes VOD if (key == "ENDLIST"){ streamIsLive = false; - INFO_MSG("SIL=F"); streamIsVOD = true; + continue; } + VERYHIGH_MSG("ignoring line: %s.", line.c_str()); continue; } - if (line.compare(0, 7, "#EXTINF") != 0){ + if (line[0] == '#'){ VERYHIGH_MSG("ignoring line: %s.", line.c_str()); continue; } - float f = atof(line.c_str() + 8); - std::string filename; - std::getline(input, filename); - // check for already added segments DONTEVEN_MSG("Current segment #%" PRIu64 ", last segment was #%" PRIu64 "", bposCounter, lastSegment); if (bposCounter > lastSegment){ - cleanLine(filename); char ivec[16]; if (keyIV.size()){ parseKey(keyIV, ivec, 16); @@ -620,11 +657,14 @@ namespace Mist{ memset(ivec, 0, 16); Bit::htobll(ivec + 8, bposCounter); } - addEntry(root.link(filename).getUrl(), filename, f, bposCounter, keys[keyUri], std::string(ivec, 16)); + addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), startByte, lenByte); lastSegment = bposCounter; ++count; } nextUTC = 0; + segDur = 0.0; + startByte = std::string::npos; + lenByte = 0; ++bposCounter; } @@ -639,30 +679,9 @@ namespace Mist{ return (count > 0); } - bool Playlist::isSupportedFile(const std::string filename){ - // only ts files - if (filename.find_last_of(".") != std::string::npos){ - std::string ext = filename.substr(filename.find_last_of(".") + 1); - - if (ext.compare(0, 2, "ts") == 0){ - return true; - }else{ - DEBUG_MSG(DLVL_HIGH, "Not supported extension: %s", ext.c_str()); - return false; - } - } - // No extension. We assume it's fine. - return true; - } - /// Adds playlist segments to be processed void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &bpos, - const std::string &key, const std::string &iv){ - // if (!isSupportedFile(filename)){ - // WARN_MSG("Ignoring unsupported file: %s", filename.c_str()); - // return; - //} - + const std::string &key, const std::string &iv, uint64_t startByte, uint64_t lenByte){ playListEntries entry; entry.filename = absolute_filename; entry.relative_filename = filename; @@ -687,8 +706,24 @@ namespace Mist{ if (!nextUTC && prev.mUTC){ nextUTC = prev.mUTC + (uint64_t)(prev.duration * 1000); } + // If startByte unknown and we have a length, calculate it from previous entry + if (startByte == std::string::npos && lenByte){ + if (filename == prev.relative_filename){startByte = prev.stopAtByte;} + } + }else{ + // If startByte unknown and we have a length, set to zero + if (startByte == std::string::npos && lenByte){startByte = 0;} } } + if ((lenByte && startByte == std::string::npos) || (!lenByte && startByte != std::string::npos)){ + WARN_MSG("Invalid byte range entry for segment: %s", filename.c_str()); + lenByte = 0; + startByte = std::string::npos; + } + if (lenByte){ + entry.startAtByte = startByte; + entry.stopAtByte = startByte + lenByte; + } entry.mUTC = nextUTC; if (nextUTC && !oUTC){ @@ -716,7 +751,11 @@ namespace Mist{ // Note: This method requires never removing playlists, only adding. // The mutex assures we have a unique count/number. if (!id){id = listEntries.size() + 1;} - HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id); + if (entry.startAtByte){ + HIGH_MSG("Adding entry '%s' (%" PRIu64 "-%" PRIu64 ") to ID %u", filename.c_str(), entry.startAtByte, entry.stopAtByte, id); + }else{ + HIGH_MSG("Adding entry '%s' to ID %u", filename.c_str(), id); + } playlist_urls[JSON::Value(id).asString()] = relurl; listEntries[id].push_back(entry); } @@ -867,6 +906,10 @@ namespace Mist{ memset(newEntry.ivec, 0, 16); memset(newEntry.keyAES, 0, 16); } + if (thisEntry.size() >= 11){ + newEntry.startAtByte = thisEntry[9u].asInt(); + newEntry.stopAtByte = thisEntry[10u].asInt(); + } newList.push_back(newEntry); } listEntries[plNum] = newList; @@ -1059,6 +1102,10 @@ namespace Mist{ thisEntries.append(entryIt->wait); thisEntries.append(entryIt->ivec); thisEntries.append(entryIt->keyAES); + if (entryIt->startAtByte || entryIt->stopAtByte){ + thisEntries.append(entryIt->startAtByte); + thisEntries.append(entryIt->stopAtByte); + } thisPlaylist.append(thisEntries); } allEntries[JSON::Value(pListIt->first).asString()] = thisPlaylist; @@ -1119,7 +1166,7 @@ namespace Mist{ if (!hasOffset && curList.at(segmentIndex).mUTC){ hasOffset = true; DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; - INFO_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); + MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; } if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 277118b6..92f024e8 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -21,6 +21,8 @@ namespace Mist{ struct playListEntries{ std::string filename; std::string relative_filename; + uint64_t startAtByte; ///< Byte position inside filename where to start reading + uint64_t stopAtByte; ///< Byte position inside filename where to stop sending uint64_t bytePos; uint64_t mUTC; ///< UTC unix millis timestamp of first packet, if known float duration; ///< Duration of entry in seconds @@ -36,13 +38,29 @@ namespace Mist{ timestamp = 0; timeOffset = 0; wait = 0; + startAtByte = 0; + stopAtByte = 0; for (size_t i = 0; i < 16; ++i){ ivec[i] = 0; keyAES[i] = 0; } } + std::string shortName() const{ + if (!startAtByte && !stopAtByte){return filename;} + std::string ret = filename; + ret += " ("; + ret += JSON::Value(startAtByte).asString(); + ret += "-"; + ret += JSON::Value(stopAtByte).asString(); + ret += ")"; + return ret; + } }; + inline bool operator< (const playListEntries a, const playListEntries b){ + return a.filename < b.filename || (a.filename == b.filename && a.startAtByte < b.startAtByte); + } + /// Keeps the segment entry list by playlist ID extern std::map > listEntries; @@ -59,6 +77,8 @@ namespace Mist{ bool atEnd() const; private: + uint64_t startAtByte; + uint64_t stopAtByte; bool encrypted; bool buffered; size_t offset; @@ -79,8 +99,7 @@ namespace Mist{ bool isUrl() const; bool reload(); void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &bpos, - const std::string &key, const std::string &keyIV); - bool isSupportedFile(const std::string filename); + const std::string &key, const std::string &keyIV, uint64_t startByte, uint64_t lenByte); std::string uri; // link to the current playlistfile HTTP::URL root;