diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index ebd7a185..6873a2a9 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -100,12 +100,21 @@ static uint64_t ISO8601toUnixmillis(const std::string &ts){ } namespace Mist{ - // Save playlist objects for manual reloading + /// Save playlist objects for manual reloading static std::map playlistMapping; - // Track which segment numbers have been parsed + + /// Local RAM buffer for recently accessed segments + std::map segBufs; + + /// Order of adding/accessing for local RAM buffer of segments + std::deque segBufAccs; + + /// Track which segment numbers have been parsed std::map parsedSegments; + /// Mutex for accesses to listEntries tthread::mutex entryMutex; + JSON::Value playlist_urls; ///< Relative URLs to the various playlists static unsigned int plsTotalCount = 0; /// Total playlists active @@ -232,11 +241,14 @@ namespace Mist{ isOpen = false; segDL.onProgress(callbackFunc); encrypted = false; + currBuf = 0; + packetPtr = 0; } /// Returns true if packetPtr is at the end of the current segment. bool SegmentDownloader::atEnd() const{ - if (!isOpen){return true;} + if (!isOpen || !currBuf){return true;} + if (buffered){return currBuf->size() <= offset + 188;} return segDL.isEOF(); // return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); } @@ -302,16 +314,30 @@ namespace Mist{ return false; }else{ // Plaintext - size_t len = 0; - segDL.readSome(packetPtr, len, 188); - if (len != 188 || packetPtr[0] != 0x47){ - FAIL_MSG("Not a valid TS packet: len %zu, first byte %" PRIu8, len, (uint8_t)packetPtr[0]); + if (buffered){ + if (atEnd()){return false;} + }else{ + if (!currBuf){return false;} + while (segDL && currBuf->size() < offset + 188 + 188){ + segDL.readSome(188, *this); + if (currBuf->size() < offset + 188 + 188){Util::sleep(50);} + } + if (currBuf->size() < offset + 188 + 188){return false;} + } + offset += 188; + packetPtr = *currBuf + offset; + if (!packetPtr || packetPtr[0] != 0x47){ + FAIL_MSG("Not a valid TS packet: first byte %" PRIu8, packetPtr?(uint8_t)packetPtr[0]:0); return false; } return true; } } + void SegmentDownloader::dataCallback(const char *ptr, size_t size){ + currBuf->append(ptr, size); + } + /// Attempts to read a single TS packet from the current segment, setting packetPtr on success void SegmentDownloader::close(){ packetPtr = 0; @@ -326,12 +352,24 @@ namespace Mist{ MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.filename.c_str(), hexKey.c_str(), hexIvec.c_str()); - if (!segDL.open(entry.filename)){ - FAIL_MSG("Could not open %s", entry.filename.c_str()); - return false; - } - if (!segDL){return false;} + offset = 0; + buffered = segBufs.count(entry.filename); + if (!buffered){ + if (!segDL.open(entry.filename)){ + FAIL_MSG("Could not open %s", entry.filename.c_str()); + return false; + } + if (!segDL){return false;} + if (segBufs.size() > 60){ + segBufs.erase(segBufAccs.back()); + segBufAccs.pop_back(); + } + segBufAccs.push_front(entry.filename); + }else{ + INFO_MSG("Reading from segment cache: %s", entry.filename.c_str()); + } + currBuf = &(segBufs[entry.filename]); encrypted = false; outData.truncate(0); diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 9829f7a4..897514aa 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -50,19 +50,23 @@ namespace Mist{ /// Keeps the segment entry list by playlist ID extern std::map > listEntries; - class SegmentDownloader{ + class SegmentDownloader: public Util::DataCallback{ public: SegmentDownloader(); HTTP::URIReader segDL; char *packetPtr; bool loadSegment(const playListEntries &entry); bool readNext(); + virtual void dataCallback(const char *ptr, size_t size); void close(); bool atEnd() const; private: bool encrypted; + bool buffered; + size_t offset; Util::ResizeablePointer outData; + Util::ResizeablePointer * currBuf; size_t encOffset; unsigned char tmpIvec[16]; mbedtls_aes_context aes;