diff --git a/lib/meson.build b/lib/meson.build index 81f99e51..50eaf9c7 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -35,6 +35,7 @@ headers = [ 'rtmpchunks.h', 'rtp_fec.h', 'rtp.h', + 'segmentreader.h', 'sdp.h', 'sdp_media.h', 'shared_memory.h', @@ -105,6 +106,7 @@ libmist = library('mist', 'rtmpchunks.cpp', 'rtp_fec.cpp', 'rtp.cpp', + 'segmentreader.cpp', 'sdp.cpp', 'sdp_media.cpp', 'shared_memory.cpp', diff --git a/lib/segmentreader.cpp b/lib/segmentreader.cpp new file mode 100644 index 00000000..8f0e95fa --- /dev/null +++ b/lib/segmentreader.cpp @@ -0,0 +1,304 @@ +#include "segmentreader.h" +#include "timing.h" + +#ifdef SSL +#include "mbedtls/aes.h" +#endif + +/// Helper function for printing encryption keys in hex format +static std::string printhex(const char *data, size_t len){ + static const char *const lut = "0123456789ABCDEF"; + std::string output; + output.reserve(2 * len); + for (size_t i = 0; i < len; ++i){ + const unsigned char c = data[i]; + output.push_back(lut[c >> 4]); + output.push_back(lut[c & 15]); + } + return output; +} + + + +namespace Mist{ + SegmentReader::SegmentReader(){ + progressCallback = 0; + isOpen = false; +#ifdef SSL + encrypted = false; +#endif + currBuf = 0; + packetPtr = 0; + } + + void SegmentReader::onProgress(bool (*callback)(uint8_t)){ + progressCallback = callback; + segDL.onProgress(callback); + } + + void SegmentReader::reset(){ + tsStream.clear(); + } + + /// Reads the segment at least up to position _offset. + /// Returns true if the position is available, false otherwise. + bool SegmentReader::readTo(size_t _offset){ + // Have it? Return true right away + if (currBuf->size() >= _offset){return true;} + + // Buffered? Just return false - we can't download more. + if (buffered){return false;} + +#ifdef SSL + // Encrypted? Round up to nearest multiple of 16 + if (encrypted && _offset % 16){ + _offset = ((size_t)(_offset / 16) + 1) * 16; + // Clip to size of file + if (_offset > currBuf->rsize()){_offset = currBuf->rsize();} + } +#endif + + // Attempt to download what we need + size_t retries = 0; + while (currBuf->size() < _offset){ + size_t preSize = getDataCallbackPos(); + if (!segDL){ + if (!segDL.isSeekable()){return false;} + // Only retry/resume if seekable and allocated size greater than current size + if (currBuf->rsize() > currBuf->size()){ + // Seek to current position to resume + if (retries++ > 5){ + segDL.close(); + return false; + } + segDL.seek(getDataCallbackPos()); + } + } + segDL.readSome(_offset - currBuf->size(), *this); + + // Sleep if we made no progress + if (getDataCallbackPos() == preSize){ + Util::sleep(5); + if (progressCallback && !progressCallback(0)){return false;} + } + } + return true; + } + + void SegmentReader::initializeMetadata(DTSC::Meta &meta, size_t tid, size_t mappingId){ + tsStream.initializeMetadata(meta, tid, mappingId); + } + + /// Attempts to read a single TS packet from the current segment, setting packetPtr on success + bool SegmentReader::readNext(DTSC::Packet & thisPacket, uint64_t bytePos){ + while (*this){ + if (parser == STRM_UNKN){ + if (!readTo(189)){ + WARN_MSG("File format detection failed: could not read at least 189 bytes!"); + return false; + } + if ((*currBuf)[0] == 0x47 && (*currBuf)[188] == 0x47){ + parser = STRM_TS; + continue; + } + if (!memcmp(*currBuf + 4, "ftyp", 4) || !memcmp(*currBuf + 4, "moof", 4) || !memcmp(*currBuf + 4, "moov", 4)){ + parser = STRM_MP4; + continue; + } + WARN_MSG("File format detection failed: unable to recognize file format!"); + return false; + } + + if (parser == STRM_TS){ + if (currBuf->size() == currBuf->rsize()){tsStream.finish();} + if (tsStream.hasPacketOnEachTrack() || currBuf->size() == currBuf->rsize()){ + if (!tsStream.hasPacket()){return false;} + tsStream.getEarliestPacket(thisPacket); + return true; + } + if (!readTo(offset + 188)){return false;} + tsStream.parse(*currBuf + offset, bytePos); + offset += 188; + } + + if (parser == STRM_MP4){ + /// \TODO Implement parsing MP4 data + } + } + return false; + } + + void SegmentReader::setInit(const std::string & data){ + /// \TODO Implement detecting/parsing MP4 init data + /* + std::string boxType = std::string(readBuffer+4, 4); + uint64_t boxSize = MP4::calcBoxSize(readBuffer); + if (boxType == "moov"){ + while (readBuffer.size() < boxSize && inFile && keepRunning()){inFile.readSome(boxSize-readBuffer.size(), *this);} + if (readBuffer.size() < boxSize){ + Util::logExitReason(ER_FORMAT_SPECIFIC, "Could not read entire MOOV box into memory"); + break; + } + MP4::Box moovBox(readBuffer, false); + + // for all box in moov + std::deque trak = ((MP4::MOOV*)&moovBox)->getChildren(); + for (std::deque::iterator trakIt = trak.begin(); trakIt != trak.end(); trakIt++){ + trackHeaders.push_back(MP4::TrackHeader()); + trackHeaders.rbegin()->read(*trakIt); + } + hasMoov = true; + } + */ + } + + /// Stores data in currBuf, decodes if/as necessary, in whole 16-byte blocks + void SegmentReader::dataCallback(const char *ptr, size_t size){ +#ifdef SSL + if (encrypted){ + // Try to complete a 16-byte remainder + if (decBuffer.size()){ + size_t toAppend = 16 - decBuffer.size(); + decBuffer.append(ptr, toAppend); + if (decBuffer.size() != 16){ + //Not enough data yet + return; + } + // Decode 16 bytes + currBuf->allocate(currBuf->size() + 16); + mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, 16, tmpIvec, (const unsigned char *)(char*)decBuffer, + ((unsigned char *)(char *)*currBuf) + currBuf->size()); + currBuf->append(0, 16); + // Clear remainder + decBuffer.truncate(0); + // Shift buffers + ptr += toAppend; + size -= toAppend; + } + // Decode any multiple of 16 bytes + size_t toDecode = ((size_t)(size / 16)) * 16; + if (toDecode){ + currBuf->allocate(currBuf->size() + toDecode); + mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, toDecode, tmpIvec, (const unsigned char *)ptr, + ((unsigned char *)(char *)*currBuf) + currBuf->size()); + currBuf->append(0, toDecode); + // Shift buffers + ptr += toDecode; + size -= toDecode; + } + // Store remainder, if needed + if (size){decBuffer.append(ptr, size);} + return; + } +#endif + currBuf->append(ptr, size); + } + + size_t SegmentReader::getDataCallbackPos() const{ +#ifdef SSL + return startAtByte+currBuf->size()+decBuffer.size(); +#else + return startAtByte+currBuf->size(); +#endif + } + + /// Attempts to read a single TS packet from the current segment, setting packetPtr on success + void SegmentReader::close(){ + packetPtr = 0; + isOpen = false; + segDL.close(); + } + + /// Loads the given segment URL into the segment buffer. + bool SegmentReader::load(const std::string &path, uint64_t startAt, uint64_t stopAt, const char * ivec, const char * keyAES, Util::ResizeablePointer * bufPtr){ + tsStream.partialClear(); + isOpen = false; + parser = STRM_UNKN; + if (ivec && keyAES && memcmp(keyAES, "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000", 16)){ +#ifdef SSL + encrypted = true; + std::string hexKey = printhex(keyAES, 16); + std::string hexIvec = printhex(ivec, 16); + MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", path.c_str(), hexKey.c_str(), hexIvec.c_str()); +#else + FAIL_MSG("Cannot read encrypted segment: %s", path.c_str()); + return false; +#endif + }else{ + encrypted = false; + MEDIUM_MSG("Loading segment: %s", path.c_str()); + } + + startAtByte = startAt; + stopAtByte = stopAt; + offset = 0; + currBuf = bufPtr; + + // Is there at least one byte? Check if we need to resume or have a whole buffer + // If reserved and total size match, assume we have the whole thing + if (currBuf->size() && (currBuf->rsize() == currBuf->size())){ + buffered = true; + }else{ + buffered = false; + + if (currBuf->size()){ + MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize()); + } + + // We only re-open and seek if the opened URL doesn't match what we want already + HTTP::URL A = segDL.getURI(); + HTTP::URL B = HTTP::localURIResolver().link(path); + if (A != B){ + if (!segDL.open(path) || !segDL){ + FAIL_MSG("Could not open %s", path.c_str()); + return false; + } + if (!segDL){return false;} + } + + // Non-seekable case is handled further down + if (segDL.isSeekable() && startAtByte + currBuf->size()){ + //Seek to startAtByte position, since it's not the beginning of the file + MEDIUM_MSG("Seeking to %zu", startAtByte + currBuf->size()); + segDL.seek(startAtByte + currBuf->size()); + } + } + + if (!buffered){ + if (!currBuf->size() || !currBuf->rsize()){ + // Allocate full size if known + if (stopAtByte || segDL.getSize() != std::string::npos){currBuf->allocate(stopAtByte?(stopAtByte - startAtByte):segDL.getSize());} + } + // Download full segment if not seekable, pretend it was cached all along + if (!segDL.isSeekable()){ + currBuf->truncate(0); + segDL.readAll(*this); + if (startAtByte || stopAtByte){ + WARN_MSG("Wasting data: downloaded whole segment due to unavailability of range requests, but caching only part of it"); + if (startAtByte){currBuf->shift(startAtByte);} + if (stopAtByte){currBuf->truncate(stopAtByte - startAtByte);} + } + buffered = true; + segDL.close(); + } + } + +#ifdef SSL + decBuffer.truncate(0); + // If we have a non-null key, decrypt + if (encrypted){ + // Load key + mbedtls_aes_setkey_dec(&aes, (const unsigned char *)keyAES, 128); + // Load initialization vector + memcpy(tmpIvec, ivec, 16); + } +#endif + + packetPtr = 0; + isOpen = true; + VERYHIGH_MSG("Segment opened: %s", path.c_str()); + return true; + } + +}// namespace Mist + diff --git a/lib/segmentreader.h b/lib/segmentreader.h new file mode 100644 index 00000000..cb4c8c40 --- /dev/null +++ b/lib/segmentreader.h @@ -0,0 +1,55 @@ +#include +#include +#include +#include +#include + +namespace Mist{ + + enum streamType {STRM_UNKN, STRM_TS, STRM_MP4}; + + class SegmentReader: public Util::DataCallback{ + public: + SegmentReader(); + void onProgress(bool (*callback)(uint8_t)); + operator bool() const {return isOpen;} + + char *packetPtr; + bool load(const std::string &path, uint64_t startAt, uint64_t stopAt, const char * ivec, const char * keyAES, Util::ResizeablePointer * bufPtr); + bool readNext(DTSC::Packet & thisPacket, uint64_t bytePos); + void setInit(const std::string & initData); + void reset(); + void close(); + void initializeMetadata(DTSC::Meta &meta, size_t tid, size_t mappingId); + + virtual void dataCallback(const char *ptr, size_t size); + virtual size_t getDataCallbackPos() const; + + private: + HTTP::URIReader segDL; ///< If non-buffered, reader for the data + Util::ResizeablePointer * currBuf; ///< Storage for all (non)buffered segment content + uint64_t startAtByte; ///< Start position in bytes + uint64_t stopAtByte; ///< Stop position in bytes + bool encrypted; ///< True if segment must be decrypted before parsing + bool buffered; ///< True if segment is fully buffered in memory + bool isOpen; ///< True if a segment has been successfully opened + bool (*progressCallback)(uint8_t); + + bool readTo(size_t offset); + size_t offset; + + // Parser related + streamType parser; + TS::Stream tsStream; + std::deque mp4Headers; + + +#ifdef SSL + //Encryption-related + Util::ResizeablePointer decBuffer; ///< Buffer for pre-decryption data - max 16 bytes + unsigned char tmpIvec[16]; + mbedtls_aes_context aes; ///< Decryption context +#endif + }; + +} // namespace Mist diff --git a/src/input/input_hls.cpp b/src/input/input_hls.cpp index 54355a07..7f39c957 100644 --- a/src/input/input_hls.cpp +++ b/src/input/input_hls.cpp @@ -1,14 +1,9 @@ #include "input_hls.h" -#ifdef SSL -#include "mbedtls/aes.h" -#endif -#include #include #include #include #include #include -#include #include #include #include @@ -24,6 +19,66 @@ #define SEM_TS_CLAIM "/MstTSIN%s" +/// Local RAM buffer for recently accessed segments +std::map segBufs; +/// Order of adding/accessing for local RAM buffer of segments +std::deque segBufAccs; +/// Order of adding/accessing sizes for local RAM buffer of segments +std::deque segBufSize; +size_t segBufTotalSize = 0; + +/// Read data for a segment, update buffer sizes to match +bool readNext(Mist::SegmentReader & S, DTSC::Packet & thisPacket, uint64_t bytePos){ + //Overwrite the current segment size + segBufTotalSize -= segBufSize.front(); + bool ret = S.readNext(thisPacket, bytePos); + segBufSize.front() = S.getDataCallbackPos(); + segBufTotalSize += segBufSize.front(); + return ret; +} + +/// Load a new segment, use cache if possible or create a new cache entry +bool loadSegment(Mist::SegmentReader & S, const Mist::playListEntries & entry){ + if (!segBufs.count(entry)){ + HIGH_MSG("Reading non-cache: %s", entry.shortName().c_str()); + //Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times) + while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){ + HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().shortName().c_str()); + segBufs.erase(segBufAccs.back()); + segBufTotalSize -= segBufSize.back(); + segBufAccs.pop_back(); + segBufSize.pop_back(); + } + segBufAccs.push_front(entry); + segBufSize.push_front(0); + }else{ + HIGH_MSG("Reading from cache: %s", entry.shortName().c_str()); + // Ensure current entry is the front entry in the deques + std::deque segBufAccsCopy = segBufAccs; + std::deque segBufSizeCopy = segBufSize; + segBufAccs.clear(); + segBufSize.clear(); + size_t thisSize = 0; + + while (segBufSizeCopy.size()){ + if (segBufAccsCopy.back() == entry){ + thisSize = segBufSizeCopy.back(); + }else{ + segBufAccs.push_front(segBufAccsCopy.back()); + segBufSize.push_front(segBufSizeCopy.back()); + } + segBufAccsCopy.pop_back(); + segBufSizeCopy.pop_back(); + } + segBufAccs.push_front(entry); + segBufSize.push_front(thisSize); + } + return S.load(entry.filename, entry.startAtByte, entry.stopAtByte, entry.ivec, entry.keyAES, &(segBufs[entry])); +} + + + + static uint64_t ISO8601toUnixmillis(const std::string &ts){ // Format examples: // 2019-12-05T09:41:16.765000+00:00 @@ -111,17 +166,6 @@ namespace Mist{ /// Save playlist objects for manual reloading static std::map playlistMapping; - /// Local RAM buffer for recently accessed segments - std::map segBufs; - - /// Order of adding/accessing for local RAM buffer of segments - std::deque segBufAccs; - - /// Order of adding/accessing sizes for local RAM buffer of segments - std::deque segBufSize; - - size_t segBufTotalSize = 0; - /// Track which segment numbers have been parsed std::map parsedSegments; @@ -239,264 +283,6 @@ namespace Mist{ } } - static std::string printhex(const char *data, size_t len){ - static const char *const lut = "0123456789ABCDEF"; - - std::string output; - output.reserve(2 * len); - for (size_t i = 0; i < len; ++i){ - const unsigned char c = data[i]; - output.push_back(lut[c >> 4]); - output.push_back(lut[c & 15]); - } - return output; - } - - SegmentDownloader::SegmentDownloader(){ - isOpen = false; - segDL.onProgress(callbackFunc); - encrypted = false; - currBuf = 0; - packetPtr = 0; - } - - /// Returns true if packetPtr is at the end of the current segment. - bool SegmentDownloader::atEnd() const{ - if (!isOpen || !currBuf){return true;} - if (buffered){return currBuf->size() <= offset + 188;} - if (stopAtByte && (stopAtByte - startAtByte) <= offset + 188){return true;} - return !segDL && currBuf->size() <= offset + 188; - // return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size(); - } - - /// Attempts to read a single TS packet from the current segment, setting packetPtr on success - bool SegmentDownloader::readNext(){ - if (encrypted){ - // Encrypted, need to decrypt -#ifdef SSL - // Are we exactly at the end of the buffer? Truncate it entirely. - if (encOffset == outData.size() || !outData.size()){ - outData.truncate(0); - packetPtr = 0; - encOffset = 0; - } - // Do we already have some data ready? Just serve it. - if (encOffset + 188 <= outData.size()){ - packetPtr = outData + encOffset; - encOffset += 188; - if (packetPtr[0] != 0x47){ - FAIL_MSG("Not TS! Starts with byte %" PRIu8, (uint8_t)packetPtr[0]); - return false; - } - return true; - } - // Alright, we need to read some more data. - // We read 192 bytes at a time: a single TS packet is 188 bytes but AES-128-CBC encryption works in 16-byte blocks. - size_t len = 0; - segDL.readSome(packetPtr, len, 192); - if (!len){return false;} - if (len % 16 != 0){ - FAIL_MSG("Read a non-16-multiple of bytes (%zu), cannot decode!", len); - return false; - } - outData.allocate(outData.size() + len); - mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, len, tmpIvec, (const unsigned char *)packetPtr, - ((unsigned char *)(char *)outData) + outData.size()); - outData.append(0, len); - // End of the segment? Remove padding data. - if (segDL.isEOF()){ - // The padding consists of X bytes of padding, all containing the raw value X. - // Since padding is mandatory, we can simply read the last byte and remove X bytes from the length. - if (outData.size() <= outData[outData.size() - 1]){ - FAIL_MSG("Encryption padding is >= one TS packet. We probably returned some invalid TS " - "data earlier :-("); - return false; - } - outData.truncate(outData.size() - outData[outData.size() - 1]); - } - // Okay, we have more data. Let's see if we can return it... - if (encOffset + 188 <= outData.size()){ - packetPtr = outData + encOffset; - encOffset += 188; - if (packetPtr[0] != 0x47){ - FAIL_MSG("Not TS! Starts with byte %" PRIu8, (uint8_t)packetPtr[0]); - return false; - } - return true; - } -#endif - // No? Then we've failed in our task :'( - FAIL_MSG("Could not load encrypted packet :'("); - return false; - }else{ - // Plaintext - if (buffered){ - if (atEnd()){return false;} - }else{ - if (!currBuf){return false;} - size_t retries = 0; - if (stopAtByte && (stopAtByte - startAtByte) <= currBuf->size()){return false;} - while (segDL && currBuf->size() < offset + 188 + 188){ - size_t preSize = currBuf->size(); - segDL.readSome(offset + 188 + 188 - currBuf->size(), *this); - if (currBuf->size() < offset + 188 + 188){ - if (!segDL){ - if (!segDL.isSeekable()){return false;} - // Only retry/resume if seekable and allocated size greater than current size - if (currBuf->rsize() > currBuf->size()){ - // Seek to current position to resume - ++retries; - if (retries > 5){ - segDL.close(); - return false; - } - segDL.seek(startAtByte+currBuf->size()); - } - } - if (currBuf->size() <= preSize){ - Util::sleep(5); - } - } - } - if (currBuf->size() < offset + 188 + 188){return false;} - } - // First packet is at offset 0, not 188. Skip increment for this one. - if (!firstPacket){ - offset += 188; - }else{ - firstPacket = false; - } - packetPtr = *currBuf + offset; - if (!packetPtr || packetPtr[0] != 0x47){ - std::stringstream packData; - if (packetPtr){ - for (uint64_t i = 0; i < 188; ++i){ - packData << std::hex << std::setw(2) << std::setfill('0') << (unsigned int)packetPtr[i]; - } - } - FAIL_MSG("Not a valid TS packet: byte %zu is not 0x47: %s", offset, packData.str().c_str()); - return false; - } - return true; - } - } - - void SegmentDownloader::dataCallback(const char *ptr, size_t size){ - currBuf->append(ptr, size); - //Overwrite the current segment size - segBufTotalSize -= segBufSize.front(); - segBufSize.front() = currBuf->size(); - segBufTotalSize += segBufSize.front(); - } - - size_t SegmentDownloader::getDataCallbackPos() const{return startAtByte+currBuf->size();} - - /// Attempts to read a single TS packet from the current segment, setting packetPtr on success - void SegmentDownloader::close(){ - packetPtr = 0; - isOpen = false; - segDL.close(); - } - - /// Loads the given segment URL into the segment buffer. - bool SegmentDownloader::loadSegment(const playListEntries &entry){ - std::string hexKey = printhex(entry.keyAES, 16); - std::string hexIvec = printhex(entry.ivec, 16); - - MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.shortName().c_str(), hexKey.c_str(), - hexIvec.c_str()); - - startAtByte = entry.startAtByte; - stopAtByte = entry.stopAtByte; - offset = 0; - firstPacket = true; - buffered = segBufs.count(entry); - if (!buffered){ - HIGH_MSG("Reading non-cache: %s", entry.shortName().c_str()); - if (!segDL.open(entry.filename)){ - FAIL_MSG("Could not open %s", entry.shortName().c_str()); - return false; - } - if (!segDL){return false;} - //Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times) - while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){ - HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().shortName().c_str()); - segBufs.erase(segBufAccs.back()); - segBufTotalSize -= segBufSize.back(); - segBufAccs.pop_back(); - segBufSize.pop_back(); - } - segBufAccs.push_front(entry); - segBufSize.push_front(0); - currBuf = &(segBufs[entry]); - // Non-seekable case is handled further down - if (segDL.isSeekable() && startAtByte){ - //Seek to startAtByte position, since it's not the beginning of the file - MEDIUM_MSG("Seeking to %zu", startAtByte); - segDL.seek(startAtByte); - } - }else{ - HIGH_MSG("Reading from segment cache: %s", entry.shortName().c_str()); - currBuf = &(segBufs[entry]); - if (currBuf->rsize() != currBuf->size()){ - MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize()); - buffered = false; - // We only re-open and seek if the opened URL doesn't match what we want already - HTTP::URL A = segDL.getURI(); - HTTP::URL B = HTTP::localURIResolver().link(entry.filename); - if (A != B){ - if (!segDL.open(entry.filename)){ - FAIL_MSG("Could not open %s", entry.shortName().c_str()); - return false; - } - if (!segDL){return false;} - //Seek to current position in segment for resuming - MEDIUM_MSG("Seeking to %zu", currBuf->size()+startAtByte); - segDL.seek(currBuf->size()+startAtByte); - } - } - } - if (!buffered){ - // Allocate full size if known - if (stopAtByte || segDL.getSize() != std::string::npos){currBuf->allocate(stopAtByte?(stopAtByte - startAtByte):segDL.getSize());} - // Download full segment if not seekable, pretend it was cached all along - if (!segDL.isSeekable()){ - segDL.readAll(*this); - if (startAtByte || stopAtByte){ - WARN_MSG("Wasting data: downloaded whole segment due to unavailability of range requests, but caching only part of it"); - if (startAtByte){currBuf->shift(startAtByte);} - if (stopAtByte){currBuf->truncate(stopAtByte - startAtByte);} - //Overwrite the current segment size - segBufTotalSize -= segBufSize.front(); - segBufSize.front() = currBuf->size(); - segBufTotalSize += segBufSize.front(); - } - buffered = true; - } - } - - encrypted = false; - outData.truncate(0); - // If we have a non-null key, decrypt - if (entry.keyAES[0] != 0 || entry.keyAES[1] != 0 || entry.keyAES[2] != 0 || entry.keyAES[3] != 0 || - entry.keyAES[4] != 0 || entry.keyAES[5] != 0 || entry.keyAES[6] != 0 || entry.keyAES[7] != 0 || - entry.keyAES[8] != 0 || entry.keyAES[9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 || - entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){ - encrypted = true; -#ifdef SSL - // Load key - mbedtls_aes_setkey_dec(&aes, (const unsigned char *)entry.keyAES, 128); - // Load initialization vector - memcpy(tmpIvec, entry.ivec, 16); -#endif - } - - packetPtr = 0; - isOpen = true; - HIGH_MSG("Segment download complete and passed sanity checks"); - return true; - } - /// Handles both initial load and future reloads. /// Returns how many segments were added to the internal segment list. bool Playlist::reload(){ @@ -512,6 +298,9 @@ namespace Mist{ std::string keyUri; std::string keyIV; + std::string mapUri; + std::string mapRange; + int count = 0; std::istringstream urlSource; @@ -619,6 +408,59 @@ namespace Mist{ continue; } + if (key == "MAP"){ + size_t mapLen = 0, mapOffset = 0; + size_t tmpPos = val.find("BYTERANGE=\""); + size_t tmpPos2 = val.substr(tmpPos).find('"'); + if (tmpPos != std::string::npos){ + mapRange = val.substr(tmpPos + 11, tmpPos2 - tmpPos - 11); + + size_t atSign = mapRange.find('@'); + if (atSign != std::string::npos){ + std::string len = mapRange.substr(0, atSign); + std::string pos = mapRange.substr(atSign+1); + mapLen = atoll(len.c_str()); + mapOffset = atoll(pos.c_str()); + }else{ + mapLen = atoll(val.c_str()); + } + } + + tmpPos = val.find("URI=\""); + tmpPos2 = val.substr(tmpPos + 5).find('"'); + if (tmpPos != std::string::npos){ + mapUri = val.substr(tmpPos + 5, tmpPos2); + } + + // when key not found, download and store it in the map + if (!maps.count(mapUri+mapRange)){ + HTTP::URIReader mapDL; + if (!mapDL.open(root.link(mapUri)) || !mapDL){ + FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str()); + continue; + } + char *mapPtr; + size_t mapPLen; + mapDL.readAll(mapPtr, mapPLen); + if (mapOffset){ + if (mapOffset <= mapPLen){ + mapPtr += mapOffset; + mapPLen -= mapOffset; + }else{ + mapPLen = 0; + } + } + if (mapLen < mapPLen){mapPLen = mapLen;} + if (!mapPLen){ + FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str()); + continue; + } + maps.insert(std::pair(keyUri, std::string(mapPtr, mapPLen))); + } + continue; + } + + if (key == "PLAYLIST-TYPE"){ if (val == "VOD"){ streamIsVOD = true; @@ -657,7 +499,7 @@ namespace Mist{ memset(ivec, 0, 16); Bit::htobll(ivec + 8, bposCounter); } - addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), startByte, lenByte); + addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), mapUri+mapRange, startByte, lenByte); lastSegment = bposCounter; ++count; } @@ -681,10 +523,11 @@ namespace Mist{ /// Adds playlist segments to be processed void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &bpos, - const std::string &key, const std::string &iv, uint64_t startByte, uint64_t lenByte){ + const std::string &key, const std::string &iv, const std::string mapName, uint64_t startByte, uint64_t lenByte){ playListEntries entry; entry.filename = absolute_filename; entry.relative_filename = filename; + entry.mapName = mapName; cleanLine(entry.filename); entry.bytePos = bpos; entry.duration = duration; @@ -771,6 +614,7 @@ namespace Mist{ currentPlaylist = 0; streamOffset = 0; isInitialRun = false; + segDowner.onProgress(callbackFunc); pidCounter = 1; @@ -819,11 +663,9 @@ namespace Mist{ capa["optional"]["bufferTime"]["default"] = 50000; option.null(); - inFile = NULL; } InputHLS::~InputHLS(){ - if (inFile){fclose(inFile);} } bool InputHLS::checkArguments(){ @@ -953,7 +795,6 @@ namespace Mist{ TS::Packet packet; char *data; size_t dataLen; - bool hasPacket = false; meta.reInit(isSingular() ? streamName : ""); tthread::lock_guard guard(entryMutex); @@ -966,7 +807,8 @@ namespace Mist{ for (std::map >::iterator pListIt = listEntries.begin(); pListIt != listEntries.end() && config->is_active; pListIt++){ - tsStream.clear(); + segDowner.reset(); + std::string lastMapName; uint32_t entId = 0; bool foundAtLeastOnePacket = false; VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex); @@ -974,80 +816,47 @@ namespace Mist{ for (std::deque::iterator entryIt = pListIt->second.begin(); entryIt != pListIt->second.end() && config->is_active; entryIt++){ ++currentSegment; - tsStream.partialClear(); - if (!segDowner.loadSegment(*entryIt)){ + if (entryIt->mapName != lastMapName){ + lastMapName = entryIt->mapName; + segDowner.setInit(playlistMapping[pListIt->first].maps[lastMapName]); + } + if (!loadSegment(segDowner, *entryIt)){ FAIL_MSG("Failed to load segment - skipping to next"); continue; } entId++; allowRemap = true; - while ((!segDowner.atEnd() || tsStream.hasPacket()) && config->is_active){ - // Wait for packets on each track to make sure the offset is set based on the earliest packet - hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); - if (hasPacket){ - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - while (headerPack){ - size_t tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); - uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } - if (!streamIsLive){ - headerPack.getString("data", data, dataLen); - - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); - } - tsStream.getEarliestPacket(headerPack); - foundAtLeastOnePacket = true; - } - } - // No packets available, so read the next TS packet if available - if (segDowner.readNext()){ - packet.FromPointer(segDowner.packetPtr); - tsStream.parse(packet, entryIt->bytePos); - } - } - // get last packets - tsStream.finish(); DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - while (headerPack){ - size_t tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); - uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } + while (config->is_active && readNext(segDowner, headerPack, entryIt->bytePos)){ + if (!config->is_active){return false;} + if (headerPack){ + size_t tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(pListIt->first, tmpTrackId); + uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC); + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + segDowner.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } + if (!streamIsLive){ + headerPack.getString("data", data, dataLen); - if (!streamIsLive){ - headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; - DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11; + DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize); + } + foundAtLeastOnePacket = true; } - tsStream.getEarliestPacket(headerPack); } // Finally save the offset as part of the TS segment. This is required for bufferframe // to work correctly, since not every segment might have an UTC timestamp tag if (plsTimeOffset.count(pListIt->first)){ - std::deque &curList = listEntries[pListIt->first]; - curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first]; + listEntries[pListIt->first].at(entId-1).timeOffset = plsTimeOffset[pListIt->first]; }else{ - std::deque &curList = listEntries[pListIt->first]; - curList.at(entId-1).timeOffset = 0; + listEntries[pListIt->first].at(entId-1).timeOffset = 0; } //Set progress counter @@ -1055,18 +864,11 @@ namespace Mist{ streamStatus.mapped[1] = (255 * currentSegment) / totalSegments; } - // If live, don't actually parse anything. - // If non-live, we read all the segments - if (streamIsLive){ - parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex; - }else{ - parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment; - } + // If live, don't actually parse anything. If non-live, we read all the segments + parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + (streamIsLive ? 0 : currentSegment); // For still-appending streams, only parse the first segment for each playlist - if (streamIsLive && foundAtLeastOnePacket){ - break; - } + if (streamIsLive && foundAtLeastOnePacket){break;} } } if (!config->is_active){return false;} @@ -1129,7 +931,6 @@ namespace Mist{ /// \return True if the segment has been buffered successfully bool InputHLS::parseSegmentAsLive(uint64_t segmentIndex){ bool hasOffset = false; - bool hasPacket = false; uint64_t bufferTime = config->getInteger("pagetimeout"); if (config->hasOption("bufferTime")){ bufferTime = config->getInteger("bufferTime") / 1000; @@ -1147,89 +948,53 @@ namespace Mist{ FAIL_MSG("Tried to load segment with index '%" PRIu64 "', but the playlist only contains '%zu' entries!", segmentIndex, curList.size()); return false; } - if (!segDowner.loadSegment(curList.at(segmentIndex))){ + + playListEntries & ntry = curList.at(segmentIndex); + if (ntry.mapName.size()){ + segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]); + } + if (!loadSegment(segDowner, ntry)){ FAIL_MSG("Failed to load segment"); return false; } - while (!segDowner.atEnd()){ - // Wait for packets on each track to make sure the offset is set based on the earliest packet - hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); - if (hasPacket){ - DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - while (headerPack){ - size_t tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); - uint64_t packetTime = headerPack.getTime(); - // Set segment offset and save it - if (!hasOffset && curList.at(segmentIndex).mUTC){ - hasOffset = true; - DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; - MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); - curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; - } - if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ - hasOffset = true; - packetTime += DVRTimeOffsets[currentPlaylist]; - HIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime); - } - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } - playlistMapping[currentPlaylist].tracks[idx] = true; - - headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); - if (isInitialRun){ - pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; - }else{ - pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); - } - tsStream.getEarliestPacket(headerPack); - } - } - // No packets available, so read the next TS packet if available - if (segDowner.readNext()){ - packet.FromPointer(segDowner.packetPtr); - tsStream.parse(packet, curList.at(segmentIndex).bytePos); - } - } - // get last packets - tsStream.finish(); DTSC::Packet headerPack; - tsStream.getEarliestPacket(headerPack); - while (headerPack){ - int tmpTrackId = headerPack.getTrackId(); - uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); - uint64_t packetTime = headerPack.getTime(); - if (DVRTimeOffsets.count(currentPlaylist)){ - packetTime += DVRTimeOffsets[currentPlaylist]; - VERYHIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime); - } - size_t idx = M.trackIDToIndex(packetId, getpid()); - if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ - tsStream.initializeMetadata(meta, tmpTrackId, packetId); - idx = M.trackIDToIndex(packetId, getpid()); - } - playlistMapping[currentPlaylist].tracks[idx] = true; + while (readNext(segDowner, headerPack, curList.at(segmentIndex).bytePos)){ + if (headerPack){ + size_t tmpTrackId = headerPack.getTrackId(); + uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId); + uint64_t packetTime = headerPack.getTime(); + // Set segment offset and save it + if (!hasOffset && curList.at(segmentIndex).mUTC){ + hasOffset = true; + DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime; + MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]); + curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist]; + } + if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){ + hasOffset = true; + packetTime += DVRTimeOffsets[currentPlaylist]; + HIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime); + } + size_t idx = M.trackIDToIndex(packetId, getpid()); + if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){ + tsStream.initializeMetadata(meta, tmpTrackId, packetId); + idx = M.trackIDToIndex(packetId, getpid()); + } + playlistMapping[currentPlaylist].tracks[idx] = true; - headerPack.getString("data", data, dataLen); - // keyframe data exists, so always add 19 bytes keyframedata. - uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; - VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); - bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); - if (isInitialRun){ - pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; - }else{ - pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); + headerPack.getString("data", data, dataLen); + // keyframe data exists, so always add 19 bytes keyframedata. + uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0; + VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx); + bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe")); + if (isInitialRun){ + pageCounter[idx][getCurrentLivePage(idx)] = curTimeout; + }else{ + pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs(); + } + tsStream.getEarliestPacket(headerPack); } - tsStream.getEarliestPacket(headerPack); } return true; } @@ -1338,64 +1103,30 @@ namespace Mist{ void InputHLS::getNext(size_t idx){ INSANE_MSG("Getting next"); uint32_t tid = 0; - bool finished = false; thisPacket.null(); while (config->is_active && (needsLock() || keepAlive())){ // Check if we have a packet - bool hasPacket = false; - if (idx == INVALID_TRACK_ID){ - hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket()); - }else{ - hasPacket = tsStream.hasPacket(getMappedTrackId(M.getID(idx))); - } - - // Yes? Excellent! Read and return it. - if (hasPacket){ - // Read - if (idx == INVALID_TRACK_ID){ - tsStream.getEarliestPacket(thisPacket); + if (readNext(segDowner, thisPacket, listEntries[currentPlaylist].at(currentIndex).bytePos)){ + if (thisPacket){ tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId()); - if (!tid){ - INSANE_MSG("Track %zu on PLS %" PRIu64 " -> %" PRIu32, thisPacket.getTrackId(), currentPlaylist, tid); - continue; + // Is it one we want? + if (idx == INVALID_TRACK_ID || getMappedTrackId(M.getID(idx)) == thisPacket.getTrackId()){ + uint64_t packetTime = thisPacket.getTime(); + if (listEntries[currentPlaylist].at(currentIndex).timeOffset){ + packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset; + }else{ + packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); + } + INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); + // overwrite trackId on success + Bit::htobl(thisPacket.getData() + 8, tid); + Bit::htobll(thisPacket.getData() + 12, packetTime); + thisTime = packetTime; + thisIdx = tid; + return; // Success! } - }else{ - tid = getMappedTrackId(M.getID(idx)); - tsStream.getPacket(tid, thisPacket); } - if (!thisPacket){ - Util::logExitReason(ER_FORMAT_SPECIFIC, "Could not getNext TS packet!"); - return; - } - - uint64_t packetTime = thisPacket.getTime(); - if (listEntries[currentPlaylist].at(currentIndex).timeOffset){ - packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset; - }else{ - packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC); - } - INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime); - // overwrite trackId on success - Bit::htobl(thisPacket.getData() + 8, tid); - Bit::htobll(thisPacket.getData() + 12, packetTime); - thisTime = packetTime; - thisIdx = tid; - return; // Success! - } - - // No? Let's read some more data and check again. - if (!segDowner.atEnd() && segDowner.readNext()){ - tsBuf.FromPointer(segDowner.packetPtr); - tsStream.parse(tsBuf, listEntries[currentPlaylist].at(currentIndex).bytePos); - continue; // check again - } - - // Okay, reading more is not possible. Let's call finish() and check again. - if (!finished && segDowner.atEnd()){ - tsStream.finish(); - finished = true; - VERYHIGH_MSG("Finishing reading TS segment"); - continue; // Check again! + continue; } // No? Then we want to try reading the next file. @@ -1416,7 +1147,6 @@ namespace Mist{ VERYHIGH_MSG("Moving on to next TS segment (variant %" PRIu64 ")", currentPlaylist); if (readNextFile()){ MEDIUM_MSG("Next segment read successfully"); - finished = false; continue; // Success! Continue regular parsing. }else{ if (userSelect.size() > 1){ @@ -1442,7 +1172,7 @@ namespace Mist{ plsTimeOffset.clear(); plsLastTime.clear(); plsInterval.clear(); - tsStream.clear(); + segDowner.reset(); uint64_t trackId = M.getID(idx); unsigned long plistEntry = 0; @@ -1478,23 +1208,18 @@ namespace Mist{ curPlaylist.size(), currentIndex); return; } - playListEntries &entry = curPlaylist.at(currentIndex); - segDowner.loadSegment(entry); + playListEntries & e = curPlaylist.at(currentIndex); + if (e.mapName.size()){ + segDowner.setInit(playlistMapping[currentPlaylist].maps[e.mapName]); + } + loadSegment(segDowner, e); // If we have an offset, load it allowRemap = false; - if (entry.timeOffset){ - HIGH_MSG("Setting time offset of this TS segment to %" PRId64, entry.timeOffset); - plsTimeOffset[currentPlaylist] = entry.timeOffset; + if (e.timeOffset){ + HIGH_MSG("Setting time offset of this TS segment to %" PRId64, e.timeOffset); + plsTimeOffset[currentPlaylist] = e.timeOffset; } } - - HIGH_MSG("readPMT()"); - TS::Packet tsBuffer; - while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){ - if (!segDowner.readNext()){break;} - tsBuffer.FromPointer(segDowner.packetPtr); - tsStream.parse(tsBuffer, listEntries[currentPlaylist].at(currentIndex).bytePos); - } } /// \brief Applies any offset to the packets original timestamp @@ -1809,7 +1534,7 @@ namespace Mist{ /// Read next .ts file from the playlist. (from the list of entries which needs /// to be processed) bool InputHLS::readNextFile(){ - tsStream.clear(); + segDowner.reset(); playListEntries ntry; // This scope limiter prevents the recursion down below from deadlocking us @@ -1837,7 +1562,10 @@ namespace Mist{ ntry = curList[currentIndex]; } - if (!segDowner.loadSegment(ntry)){ + if (ntry.mapName.size()){ + segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]); + } + if (!loadSegment(segDowner, ntry)){ ERROR_MSG("Could not download segment: %s", ntry.filename.c_str()); return readNextFile(); // Attempt to read another, if possible. } diff --git a/src/input/input_hls.h b/src/input/input_hls.h index 92f024e8..abcf349f 100644 --- a/src/input/input_hls.h +++ b/src/input/input_hls.h @@ -2,12 +2,7 @@ #include "input.h" #include #include -#include -#include -#include -//#include -#include -#include +#include #define BUFFERTIME 10 @@ -21,6 +16,7 @@ namespace Mist{ struct playListEntries{ std::string filename; std::string relative_filename; + std::string mapName; uint64_t startAtByte; ///< Byte position inside filename where to start reading uint64_t stopAtByte; ///< Byte position inside filename where to stop sending uint64_t bytePos; @@ -61,45 +57,20 @@ namespace Mist{ return a.filename < b.filename || (a.filename == b.filename && a.startAtByte < b.startAtByte); } + inline bool operator== (const playListEntries a, const playListEntries b){ + return a.filename == b.filename && a.startAtByte == b.startAtByte; + } + /// Keeps the segment entry list by playlist ID extern std::map > listEntries; - class SegmentDownloader: public Util::DataCallback{ - public: - SegmentDownloader(); - HTTP::URIReader segDL; - char *packetPtr; - bool loadSegment(const playListEntries &entry); - bool readNext(); - virtual void dataCallback(const char *ptr, size_t size); - virtual size_t getDataCallbackPos() const; - void close(); - bool atEnd() const; - - private: - uint64_t startAtByte; - uint64_t stopAtByte; - bool encrypted; - bool buffered; - size_t offset; - bool firstPacket; - Util::ResizeablePointer outData; - Util::ResizeablePointer * currBuf; - size_t encOffset; - unsigned char tmpIvec[16]; -#ifdef SSL - mbedtls_aes_context aes; -#endif - bool isOpen; - }; - class Playlist{ public: Playlist(const std::string &uriSrc = ""); bool isUrl() const; bool reload(); void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &bpos, - const std::string &key, const std::string &keyIV, uint64_t startByte, uint64_t lenByte); + const std::string &key, const std::string &keyIV, const std::string mapName, uint64_t startByte, uint64_t lenByte); std::string uri; // link to the current playlistfile HTTP::URL root; @@ -118,6 +89,7 @@ namespace Mist{ uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist char keyAES[16]; std::map keys; + std::map maps; uint64_t firstIndex; //< the index of the first segment in the playlist uint64_t lastSegment; std::map tracks; @@ -138,7 +110,7 @@ namespace Mist{ uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header unsigned int startTime; - SegmentDownloader segDowner; + SegmentReader segDowner; int version; int targetDuration; bool endPlaylist; @@ -155,11 +127,6 @@ namespace Mist{ size_t currentIndex; std::string currentFile; - TS::Stream tsStream; ///< Used for parsing the incoming ts stream - - Socket::Connection conn; - TS::Packet tsBuf; - // Used to map packetId of packets in pidMapping int pidCounter; @@ -181,8 +148,6 @@ namespace Mist{ bool readExistingHeader(); void getNext(size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); - FILE *inFile; - FILE *tsFile; bool readIndex(); bool initPlaylist(const std::string &uri, bool fullInit = true);