Spun out generalized segment reader class from HLS input
This commit is contained in:
parent
1dd27f215a
commit
b01df1f3f1
5 changed files with 594 additions and 540 deletions
|
@ -35,6 +35,7 @@ headers = [
|
|||
'rtmpchunks.h',
|
||||
'rtp_fec.h',
|
||||
'rtp.h',
|
||||
'segmentreader.h',
|
||||
'sdp.h',
|
||||
'sdp_media.h',
|
||||
'shared_memory.h',
|
||||
|
@ -105,6 +106,7 @@ libmist = library('mist',
|
|||
'rtmpchunks.cpp',
|
||||
'rtp_fec.cpp',
|
||||
'rtp.cpp',
|
||||
'segmentreader.cpp',
|
||||
'sdp.cpp',
|
||||
'sdp_media.cpp',
|
||||
'shared_memory.cpp',
|
||||
|
|
304
lib/segmentreader.cpp
Normal file
304
lib/segmentreader.cpp
Normal file
|
@ -0,0 +1,304 @@
|
|||
#include "segmentreader.h"
|
||||
#include "timing.h"
|
||||
|
||||
#ifdef SSL
|
||||
#include "mbedtls/aes.h"
|
||||
#endif
|
||||
|
||||
/// Helper function for printing encryption keys in hex format
|
||||
static std::string printhex(const char *data, size_t len){
|
||||
static const char *const lut = "0123456789ABCDEF";
|
||||
std::string output;
|
||||
output.reserve(2 * len);
|
||||
for (size_t i = 0; i < len; ++i){
|
||||
const unsigned char c = data[i];
|
||||
output.push_back(lut[c >> 4]);
|
||||
output.push_back(lut[c & 15]);
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
|
||||
|
||||
namespace Mist{
|
||||
SegmentReader::SegmentReader(){
|
||||
progressCallback = 0;
|
||||
isOpen = false;
|
||||
#ifdef SSL
|
||||
encrypted = false;
|
||||
#endif
|
||||
currBuf = 0;
|
||||
packetPtr = 0;
|
||||
}
|
||||
|
||||
void SegmentReader::onProgress(bool (*callback)(uint8_t)){
|
||||
progressCallback = callback;
|
||||
segDL.onProgress(callback);
|
||||
}
|
||||
|
||||
void SegmentReader::reset(){
|
||||
tsStream.clear();
|
||||
}
|
||||
|
||||
/// Reads the segment at least up to position _offset.
|
||||
/// Returns true if the position is available, false otherwise.
|
||||
bool SegmentReader::readTo(size_t _offset){
|
||||
// Have it? Return true right away
|
||||
if (currBuf->size() >= _offset){return true;}
|
||||
|
||||
// Buffered? Just return false - we can't download more.
|
||||
if (buffered){return false;}
|
||||
|
||||
#ifdef SSL
|
||||
// Encrypted? Round up to nearest multiple of 16
|
||||
if (encrypted && _offset % 16){
|
||||
_offset = ((size_t)(_offset / 16) + 1) * 16;
|
||||
// Clip to size of file
|
||||
if (_offset > currBuf->rsize()){_offset = currBuf->rsize();}
|
||||
}
|
||||
#endif
|
||||
|
||||
// Attempt to download what we need
|
||||
size_t retries = 0;
|
||||
while (currBuf->size() < _offset){
|
||||
size_t preSize = getDataCallbackPos();
|
||||
if (!segDL){
|
||||
if (!segDL.isSeekable()){return false;}
|
||||
// Only retry/resume if seekable and allocated size greater than current size
|
||||
if (currBuf->rsize() > currBuf->size()){
|
||||
// Seek to current position to resume
|
||||
if (retries++ > 5){
|
||||
segDL.close();
|
||||
return false;
|
||||
}
|
||||
segDL.seek(getDataCallbackPos());
|
||||
}
|
||||
}
|
||||
segDL.readSome(_offset - currBuf->size(), *this);
|
||||
|
||||
// Sleep if we made no progress
|
||||
if (getDataCallbackPos() == preSize){
|
||||
Util::sleep(5);
|
||||
if (progressCallback && !progressCallback(0)){return false;}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void SegmentReader::initializeMetadata(DTSC::Meta &meta, size_t tid, size_t mappingId){
|
||||
tsStream.initializeMetadata(meta, tid, mappingId);
|
||||
}
|
||||
|
||||
/// Attempts to read a single TS packet from the current segment, setting packetPtr on success
|
||||
bool SegmentReader::readNext(DTSC::Packet & thisPacket, uint64_t bytePos){
|
||||
while (*this){
|
||||
if (parser == STRM_UNKN){
|
||||
if (!readTo(189)){
|
||||
WARN_MSG("File format detection failed: could not read at least 189 bytes!");
|
||||
return false;
|
||||
}
|
||||
if ((*currBuf)[0] == 0x47 && (*currBuf)[188] == 0x47){
|
||||
parser = STRM_TS;
|
||||
continue;
|
||||
}
|
||||
if (!memcmp(*currBuf + 4, "ftyp", 4) || !memcmp(*currBuf + 4, "moof", 4) || !memcmp(*currBuf + 4, "moov", 4)){
|
||||
parser = STRM_MP4;
|
||||
continue;
|
||||
}
|
||||
WARN_MSG("File format detection failed: unable to recognize file format!");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (parser == STRM_TS){
|
||||
if (currBuf->size() == currBuf->rsize()){tsStream.finish();}
|
||||
if (tsStream.hasPacketOnEachTrack() || currBuf->size() == currBuf->rsize()){
|
||||
if (!tsStream.hasPacket()){return false;}
|
||||
tsStream.getEarliestPacket(thisPacket);
|
||||
return true;
|
||||
}
|
||||
if (!readTo(offset + 188)){return false;}
|
||||
tsStream.parse(*currBuf + offset, bytePos);
|
||||
offset += 188;
|
||||
}
|
||||
|
||||
if (parser == STRM_MP4){
|
||||
/// \TODO Implement parsing MP4 data
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void SegmentReader::setInit(const std::string & data){
|
||||
/// \TODO Implement detecting/parsing MP4 init data
|
||||
/*
|
||||
std::string boxType = std::string(readBuffer+4, 4);
|
||||
uint64_t boxSize = MP4::calcBoxSize(readBuffer);
|
||||
if (boxType == "moov"){
|
||||
while (readBuffer.size() < boxSize && inFile && keepRunning()){inFile.readSome(boxSize-readBuffer.size(), *this);}
|
||||
if (readBuffer.size() < boxSize){
|
||||
Util::logExitReason(ER_FORMAT_SPECIFIC, "Could not read entire MOOV box into memory");
|
||||
break;
|
||||
}
|
||||
MP4::Box moovBox(readBuffer, false);
|
||||
|
||||
// for all box in moov
|
||||
std::deque<MP4::TRAK> trak = ((MP4::MOOV*)&moovBox)->getChildren<MP4::TRAK>();
|
||||
for (std::deque<MP4::TRAK>::iterator trakIt = trak.begin(); trakIt != trak.end(); trakIt++){
|
||||
trackHeaders.push_back(MP4::TrackHeader());
|
||||
trackHeaders.rbegin()->read(*trakIt);
|
||||
}
|
||||
hasMoov = true;
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/// Stores data in currBuf, decodes if/as necessary, in whole 16-byte blocks
|
||||
void SegmentReader::dataCallback(const char *ptr, size_t size){
|
||||
#ifdef SSL
|
||||
if (encrypted){
|
||||
// Try to complete a 16-byte remainder
|
||||
if (decBuffer.size()){
|
||||
size_t toAppend = 16 - decBuffer.size();
|
||||
decBuffer.append(ptr, toAppend);
|
||||
if (decBuffer.size() != 16){
|
||||
//Not enough data yet
|
||||
return;
|
||||
}
|
||||
// Decode 16 bytes
|
||||
currBuf->allocate(currBuf->size() + 16);
|
||||
mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, 16, tmpIvec, (const unsigned char *)(char*)decBuffer,
|
||||
((unsigned char *)(char *)*currBuf) + currBuf->size());
|
||||
currBuf->append(0, 16);
|
||||
// Clear remainder
|
||||
decBuffer.truncate(0);
|
||||
// Shift buffers
|
||||
ptr += toAppend;
|
||||
size -= toAppend;
|
||||
}
|
||||
// Decode any multiple of 16 bytes
|
||||
size_t toDecode = ((size_t)(size / 16)) * 16;
|
||||
if (toDecode){
|
||||
currBuf->allocate(currBuf->size() + toDecode);
|
||||
mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, toDecode, tmpIvec, (const unsigned char *)ptr,
|
||||
((unsigned char *)(char *)*currBuf) + currBuf->size());
|
||||
currBuf->append(0, toDecode);
|
||||
// Shift buffers
|
||||
ptr += toDecode;
|
||||
size -= toDecode;
|
||||
}
|
||||
// Store remainder, if needed
|
||||
if (size){decBuffer.append(ptr, size);}
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
currBuf->append(ptr, size);
|
||||
}
|
||||
|
||||
size_t SegmentReader::getDataCallbackPos() const{
|
||||
#ifdef SSL
|
||||
return startAtByte+currBuf->size()+decBuffer.size();
|
||||
#else
|
||||
return startAtByte+currBuf->size();
|
||||
#endif
|
||||
}
|
||||
|
||||
/// Attempts to read a single TS packet from the current segment, setting packetPtr on success
|
||||
void SegmentReader::close(){
|
||||
packetPtr = 0;
|
||||
isOpen = false;
|
||||
segDL.close();
|
||||
}
|
||||
|
||||
/// Loads the given segment URL into the segment buffer.
|
||||
bool SegmentReader::load(const std::string &path, uint64_t startAt, uint64_t stopAt, const char * ivec, const char * keyAES, Util::ResizeablePointer * bufPtr){
|
||||
tsStream.partialClear();
|
||||
isOpen = false;
|
||||
parser = STRM_UNKN;
|
||||
if (ivec && keyAES && memcmp(keyAES, "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000", 16)){
|
||||
#ifdef SSL
|
||||
encrypted = true;
|
||||
std::string hexKey = printhex(keyAES, 16);
|
||||
std::string hexIvec = printhex(ivec, 16);
|
||||
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", path.c_str(), hexKey.c_str(), hexIvec.c_str());
|
||||
#else
|
||||
FAIL_MSG("Cannot read encrypted segment: %s", path.c_str());
|
||||
return false;
|
||||
#endif
|
||||
}else{
|
||||
encrypted = false;
|
||||
MEDIUM_MSG("Loading segment: %s", path.c_str());
|
||||
}
|
||||
|
||||
startAtByte = startAt;
|
||||
stopAtByte = stopAt;
|
||||
offset = 0;
|
||||
currBuf = bufPtr;
|
||||
|
||||
// Is there at least one byte? Check if we need to resume or have a whole buffer
|
||||
// If reserved and total size match, assume we have the whole thing
|
||||
if (currBuf->size() && (currBuf->rsize() == currBuf->size())){
|
||||
buffered = true;
|
||||
}else{
|
||||
buffered = false;
|
||||
|
||||
if (currBuf->size()){
|
||||
MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize());
|
||||
}
|
||||
|
||||
// We only re-open and seek if the opened URL doesn't match what we want already
|
||||
HTTP::URL A = segDL.getURI();
|
||||
HTTP::URL B = HTTP::localURIResolver().link(path);
|
||||
if (A != B){
|
||||
if (!segDL.open(path) || !segDL){
|
||||
FAIL_MSG("Could not open %s", path.c_str());
|
||||
return false;
|
||||
}
|
||||
if (!segDL){return false;}
|
||||
}
|
||||
|
||||
// Non-seekable case is handled further down
|
||||
if (segDL.isSeekable() && startAtByte + currBuf->size()){
|
||||
//Seek to startAtByte position, since it's not the beginning of the file
|
||||
MEDIUM_MSG("Seeking to %zu", startAtByte + currBuf->size());
|
||||
segDL.seek(startAtByte + currBuf->size());
|
||||
}
|
||||
}
|
||||
|
||||
if (!buffered){
|
||||
if (!currBuf->size() || !currBuf->rsize()){
|
||||
// Allocate full size if known
|
||||
if (stopAtByte || segDL.getSize() != std::string::npos){currBuf->allocate(stopAtByte?(stopAtByte - startAtByte):segDL.getSize());}
|
||||
}
|
||||
// Download full segment if not seekable, pretend it was cached all along
|
||||
if (!segDL.isSeekable()){
|
||||
currBuf->truncate(0);
|
||||
segDL.readAll(*this);
|
||||
if (startAtByte || stopAtByte){
|
||||
WARN_MSG("Wasting data: downloaded whole segment due to unavailability of range requests, but caching only part of it");
|
||||
if (startAtByte){currBuf->shift(startAtByte);}
|
||||
if (stopAtByte){currBuf->truncate(stopAtByte - startAtByte);}
|
||||
}
|
||||
buffered = true;
|
||||
segDL.close();
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef SSL
|
||||
decBuffer.truncate(0);
|
||||
// If we have a non-null key, decrypt
|
||||
if (encrypted){
|
||||
// Load key
|
||||
mbedtls_aes_setkey_dec(&aes, (const unsigned char *)keyAES, 128);
|
||||
// Load initialization vector
|
||||
memcpy(tmpIvec, ivec, 16);
|
||||
}
|
||||
#endif
|
||||
|
||||
packetPtr = 0;
|
||||
isOpen = true;
|
||||
VERYHIGH_MSG("Segment opened: %s", path.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
}// namespace Mist
|
||||
|
55
lib/segmentreader.h
Normal file
55
lib/segmentreader.h
Normal file
|
@ -0,0 +1,55 @@
|
|||
#include <http_parser.h>
|
||||
#include <urireader.h>
|
||||
#include <dtsc.h>
|
||||
#include <ts_stream.h>
|
||||
#include <mp4_stream.h>
|
||||
|
||||
namespace Mist{
|
||||
|
||||
enum streamType {STRM_UNKN, STRM_TS, STRM_MP4};
|
||||
|
||||
class SegmentReader: public Util::DataCallback{
|
||||
public:
|
||||
SegmentReader();
|
||||
void onProgress(bool (*callback)(uint8_t));
|
||||
operator bool() const {return isOpen;}
|
||||
|
||||
char *packetPtr;
|
||||
bool load(const std::string &path, uint64_t startAt, uint64_t stopAt, const char * ivec, const char * keyAES, Util::ResizeablePointer * bufPtr);
|
||||
bool readNext(DTSC::Packet & thisPacket, uint64_t bytePos);
|
||||
void setInit(const std::string & initData);
|
||||
void reset();
|
||||
void close();
|
||||
void initializeMetadata(DTSC::Meta &meta, size_t tid, size_t mappingId);
|
||||
|
||||
virtual void dataCallback(const char *ptr, size_t size);
|
||||
virtual size_t getDataCallbackPos() const;
|
||||
|
||||
private:
|
||||
HTTP::URIReader segDL; ///< If non-buffered, reader for the data
|
||||
Util::ResizeablePointer * currBuf; ///< Storage for all (non)buffered segment content
|
||||
uint64_t startAtByte; ///< Start position in bytes
|
||||
uint64_t stopAtByte; ///< Stop position in bytes
|
||||
bool encrypted; ///< True if segment must be decrypted before parsing
|
||||
bool buffered; ///< True if segment is fully buffered in memory
|
||||
bool isOpen; ///< True if a segment has been successfully opened
|
||||
bool (*progressCallback)(uint8_t);
|
||||
|
||||
bool readTo(size_t offset);
|
||||
size_t offset;
|
||||
|
||||
// Parser related
|
||||
streamType parser;
|
||||
TS::Stream tsStream;
|
||||
std::deque<MP4::TrackHeader> mp4Headers;
|
||||
|
||||
|
||||
#ifdef SSL
|
||||
//Encryption-related
|
||||
Util::ResizeablePointer decBuffer; ///< Buffer for pre-decryption data - max 16 bytes
|
||||
unsigned char tmpIvec[16];
|
||||
mbedtls_aes_context aes; ///< Decryption context
|
||||
#endif
|
||||
};
|
||||
|
||||
} // namespace Mist
|
|
@ -1,14 +1,9 @@
|
|||
#include "input_hls.h"
|
||||
#ifdef SSL
|
||||
#include "mbedtls/aes.h"
|
||||
#endif
|
||||
#include <algorithm>
|
||||
#include <cerrno>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <fstream>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <mist/bitfields.h>
|
||||
#include <mist/defines.h>
|
||||
|
@ -24,6 +19,66 @@
|
|||
|
||||
#define SEM_TS_CLAIM "/MstTSIN%s"
|
||||
|
||||
/// Local RAM buffer for recently accessed segments
|
||||
std::map<Mist::playListEntries, Util::ResizeablePointer> segBufs;
|
||||
/// Order of adding/accessing for local RAM buffer of segments
|
||||
std::deque<Mist::playListEntries> segBufAccs;
|
||||
/// Order of adding/accessing sizes for local RAM buffer of segments
|
||||
std::deque<size_t> segBufSize;
|
||||
size_t segBufTotalSize = 0;
|
||||
|
||||
/// Read data for a segment, update buffer sizes to match
|
||||
bool readNext(Mist::SegmentReader & S, DTSC::Packet & thisPacket, uint64_t bytePos){
|
||||
//Overwrite the current segment size
|
||||
segBufTotalSize -= segBufSize.front();
|
||||
bool ret = S.readNext(thisPacket, bytePos);
|
||||
segBufSize.front() = S.getDataCallbackPos();
|
||||
segBufTotalSize += segBufSize.front();
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Load a new segment, use cache if possible or create a new cache entry
|
||||
bool loadSegment(Mist::SegmentReader & S, const Mist::playListEntries & entry){
|
||||
if (!segBufs.count(entry)){
|
||||
HIGH_MSG("Reading non-cache: %s", entry.shortName().c_str());
|
||||
//Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times)
|
||||
while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){
|
||||
HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().shortName().c_str());
|
||||
segBufs.erase(segBufAccs.back());
|
||||
segBufTotalSize -= segBufSize.back();
|
||||
segBufAccs.pop_back();
|
||||
segBufSize.pop_back();
|
||||
}
|
||||
segBufAccs.push_front(entry);
|
||||
segBufSize.push_front(0);
|
||||
}else{
|
||||
HIGH_MSG("Reading from cache: %s", entry.shortName().c_str());
|
||||
// Ensure current entry is the front entry in the deques
|
||||
std::deque<Mist::playListEntries> segBufAccsCopy = segBufAccs;
|
||||
std::deque<size_t> segBufSizeCopy = segBufSize;
|
||||
segBufAccs.clear();
|
||||
segBufSize.clear();
|
||||
size_t thisSize = 0;
|
||||
|
||||
while (segBufSizeCopy.size()){
|
||||
if (segBufAccsCopy.back() == entry){
|
||||
thisSize = segBufSizeCopy.back();
|
||||
}else{
|
||||
segBufAccs.push_front(segBufAccsCopy.back());
|
||||
segBufSize.push_front(segBufSizeCopy.back());
|
||||
}
|
||||
segBufAccsCopy.pop_back();
|
||||
segBufSizeCopy.pop_back();
|
||||
}
|
||||
segBufAccs.push_front(entry);
|
||||
segBufSize.push_front(thisSize);
|
||||
}
|
||||
return S.load(entry.filename, entry.startAtByte, entry.stopAtByte, entry.ivec, entry.keyAES, &(segBufs[entry]));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
static uint64_t ISO8601toUnixmillis(const std::string &ts){
|
||||
// Format examples:
|
||||
// 2019-12-05T09:41:16.765000+00:00
|
||||
|
@ -111,17 +166,6 @@ namespace Mist{
|
|||
/// Save playlist objects for manual reloading
|
||||
static std::map<uint64_t, Playlist> playlistMapping;
|
||||
|
||||
/// Local RAM buffer for recently accessed segments
|
||||
std::map<playListEntries, Util::ResizeablePointer> segBufs;
|
||||
|
||||
/// Order of adding/accessing for local RAM buffer of segments
|
||||
std::deque<playListEntries> segBufAccs;
|
||||
|
||||
/// Order of adding/accessing sizes for local RAM buffer of segments
|
||||
std::deque<size_t> segBufSize;
|
||||
|
||||
size_t segBufTotalSize = 0;
|
||||
|
||||
/// Track which segment numbers have been parsed
|
||||
std::map<uint64_t, uint64_t> parsedSegments;
|
||||
|
||||
|
@ -239,264 +283,6 @@ namespace Mist{
|
|||
}
|
||||
}
|
||||
|
||||
static std::string printhex(const char *data, size_t len){
|
||||
static const char *const lut = "0123456789ABCDEF";
|
||||
|
||||
std::string output;
|
||||
output.reserve(2 * len);
|
||||
for (size_t i = 0; i < len; ++i){
|
||||
const unsigned char c = data[i];
|
||||
output.push_back(lut[c >> 4]);
|
||||
output.push_back(lut[c & 15]);
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
SegmentDownloader::SegmentDownloader(){
|
||||
isOpen = false;
|
||||
segDL.onProgress(callbackFunc);
|
||||
encrypted = false;
|
||||
currBuf = 0;
|
||||
packetPtr = 0;
|
||||
}
|
||||
|
||||
/// Returns true if packetPtr is at the end of the current segment.
|
||||
bool SegmentDownloader::atEnd() const{
|
||||
if (!isOpen || !currBuf){return true;}
|
||||
if (buffered){return currBuf->size() <= offset + 188;}
|
||||
if (stopAtByte && (stopAtByte - startAtByte) <= offset + 188){return true;}
|
||||
return !segDL && currBuf->size() <= offset + 188;
|
||||
// return (packetPtr - segDL.const_data().data() + 188) > segDL.const_data().size();
|
||||
}
|
||||
|
||||
/// Attempts to read a single TS packet from the current segment, setting packetPtr on success
|
||||
bool SegmentDownloader::readNext(){
|
||||
if (encrypted){
|
||||
// Encrypted, need to decrypt
|
||||
#ifdef SSL
|
||||
// Are we exactly at the end of the buffer? Truncate it entirely.
|
||||
if (encOffset == outData.size() || !outData.size()){
|
||||
outData.truncate(0);
|
||||
packetPtr = 0;
|
||||
encOffset = 0;
|
||||
}
|
||||
// Do we already have some data ready? Just serve it.
|
||||
if (encOffset + 188 <= outData.size()){
|
||||
packetPtr = outData + encOffset;
|
||||
encOffset += 188;
|
||||
if (packetPtr[0] != 0x47){
|
||||
FAIL_MSG("Not TS! Starts with byte %" PRIu8, (uint8_t)packetPtr[0]);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
// Alright, we need to read some more data.
|
||||
// We read 192 bytes at a time: a single TS packet is 188 bytes but AES-128-CBC encryption works in 16-byte blocks.
|
||||
size_t len = 0;
|
||||
segDL.readSome(packetPtr, len, 192);
|
||||
if (!len){return false;}
|
||||
if (len % 16 != 0){
|
||||
FAIL_MSG("Read a non-16-multiple of bytes (%zu), cannot decode!", len);
|
||||
return false;
|
||||
}
|
||||
outData.allocate(outData.size() + len);
|
||||
mbedtls_aes_crypt_cbc(&aes, MBEDTLS_AES_DECRYPT, len, tmpIvec, (const unsigned char *)packetPtr,
|
||||
((unsigned char *)(char *)outData) + outData.size());
|
||||
outData.append(0, len);
|
||||
// End of the segment? Remove padding data.
|
||||
if (segDL.isEOF()){
|
||||
// The padding consists of X bytes of padding, all containing the raw value X.
|
||||
// Since padding is mandatory, we can simply read the last byte and remove X bytes from the length.
|
||||
if (outData.size() <= outData[outData.size() - 1]){
|
||||
FAIL_MSG("Encryption padding is >= one TS packet. We probably returned some invalid TS "
|
||||
"data earlier :-(");
|
||||
return false;
|
||||
}
|
||||
outData.truncate(outData.size() - outData[outData.size() - 1]);
|
||||
}
|
||||
// Okay, we have more data. Let's see if we can return it...
|
||||
if (encOffset + 188 <= outData.size()){
|
||||
packetPtr = outData + encOffset;
|
||||
encOffset += 188;
|
||||
if (packetPtr[0] != 0x47){
|
||||
FAIL_MSG("Not TS! Starts with byte %" PRIu8, (uint8_t)packetPtr[0]);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
// No? Then we've failed in our task :'(
|
||||
FAIL_MSG("Could not load encrypted packet :'(");
|
||||
return false;
|
||||
}else{
|
||||
// Plaintext
|
||||
if (buffered){
|
||||
if (atEnd()){return false;}
|
||||
}else{
|
||||
if (!currBuf){return false;}
|
||||
size_t retries = 0;
|
||||
if (stopAtByte && (stopAtByte - startAtByte) <= currBuf->size()){return false;}
|
||||
while (segDL && currBuf->size() < offset + 188 + 188){
|
||||
size_t preSize = currBuf->size();
|
||||
segDL.readSome(offset + 188 + 188 - currBuf->size(), *this);
|
||||
if (currBuf->size() < offset + 188 + 188){
|
||||
if (!segDL){
|
||||
if (!segDL.isSeekable()){return false;}
|
||||
// Only retry/resume if seekable and allocated size greater than current size
|
||||
if (currBuf->rsize() > currBuf->size()){
|
||||
// Seek to current position to resume
|
||||
++retries;
|
||||
if (retries > 5){
|
||||
segDL.close();
|
||||
return false;
|
||||
}
|
||||
segDL.seek(startAtByte+currBuf->size());
|
||||
}
|
||||
}
|
||||
if (currBuf->size() <= preSize){
|
||||
Util::sleep(5);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (currBuf->size() < offset + 188 + 188){return false;}
|
||||
}
|
||||
// First packet is at offset 0, not 188. Skip increment for this one.
|
||||
if (!firstPacket){
|
||||
offset += 188;
|
||||
}else{
|
||||
firstPacket = false;
|
||||
}
|
||||
packetPtr = *currBuf + offset;
|
||||
if (!packetPtr || packetPtr[0] != 0x47){
|
||||
std::stringstream packData;
|
||||
if (packetPtr){
|
||||
for (uint64_t i = 0; i < 188; ++i){
|
||||
packData << std::hex << std::setw(2) << std::setfill('0') << (unsigned int)packetPtr[i];
|
||||
}
|
||||
}
|
||||
FAIL_MSG("Not a valid TS packet: byte %zu is not 0x47: %s", offset, packData.str().c_str());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void SegmentDownloader::dataCallback(const char *ptr, size_t size){
|
||||
currBuf->append(ptr, size);
|
||||
//Overwrite the current segment size
|
||||
segBufTotalSize -= segBufSize.front();
|
||||
segBufSize.front() = currBuf->size();
|
||||
segBufTotalSize += segBufSize.front();
|
||||
}
|
||||
|
||||
size_t SegmentDownloader::getDataCallbackPos() const{return startAtByte+currBuf->size();}
|
||||
|
||||
/// Attempts to read a single TS packet from the current segment, setting packetPtr on success
|
||||
void SegmentDownloader::close(){
|
||||
packetPtr = 0;
|
||||
isOpen = false;
|
||||
segDL.close();
|
||||
}
|
||||
|
||||
/// Loads the given segment URL into the segment buffer.
|
||||
bool SegmentDownloader::loadSegment(const playListEntries &entry){
|
||||
std::string hexKey = printhex(entry.keyAES, 16);
|
||||
std::string hexIvec = printhex(entry.ivec, 16);
|
||||
|
||||
MEDIUM_MSG("Loading segment: %s, key: %s, ivec: %s", entry.shortName().c_str(), hexKey.c_str(),
|
||||
hexIvec.c_str());
|
||||
|
||||
startAtByte = entry.startAtByte;
|
||||
stopAtByte = entry.stopAtByte;
|
||||
offset = 0;
|
||||
firstPacket = true;
|
||||
buffered = segBufs.count(entry);
|
||||
if (!buffered){
|
||||
HIGH_MSG("Reading non-cache: %s", entry.shortName().c_str());
|
||||
if (!segDL.open(entry.filename)){
|
||||
FAIL_MSG("Could not open %s", entry.shortName().c_str());
|
||||
return false;
|
||||
}
|
||||
if (!segDL){return false;}
|
||||
//Remove cache entries while above 16MiB in total size, unless we only have 1 entry (we keep two at least at all times)
|
||||
while (segBufTotalSize > 16 * 1024 * 1024 && segBufs.size() > 1){
|
||||
HIGH_MSG("Dropping from segment cache: %s", segBufAccs.back().shortName().c_str());
|
||||
segBufs.erase(segBufAccs.back());
|
||||
segBufTotalSize -= segBufSize.back();
|
||||
segBufAccs.pop_back();
|
||||
segBufSize.pop_back();
|
||||
}
|
||||
segBufAccs.push_front(entry);
|
||||
segBufSize.push_front(0);
|
||||
currBuf = &(segBufs[entry]);
|
||||
// Non-seekable case is handled further down
|
||||
if (segDL.isSeekable() && startAtByte){
|
||||
//Seek to startAtByte position, since it's not the beginning of the file
|
||||
MEDIUM_MSG("Seeking to %zu", startAtByte);
|
||||
segDL.seek(startAtByte);
|
||||
}
|
||||
}else{
|
||||
HIGH_MSG("Reading from segment cache: %s", entry.shortName().c_str());
|
||||
currBuf = &(segBufs[entry]);
|
||||
if (currBuf->rsize() != currBuf->size()){
|
||||
MEDIUM_MSG("Cache was incomplete (%zu/%" PRIu32 "), resuming", currBuf->size(), currBuf->rsize());
|
||||
buffered = false;
|
||||
// We only re-open and seek if the opened URL doesn't match what we want already
|
||||
HTTP::URL A = segDL.getURI();
|
||||
HTTP::URL B = HTTP::localURIResolver().link(entry.filename);
|
||||
if (A != B){
|
||||
if (!segDL.open(entry.filename)){
|
||||
FAIL_MSG("Could not open %s", entry.shortName().c_str());
|
||||
return false;
|
||||
}
|
||||
if (!segDL){return false;}
|
||||
//Seek to current position in segment for resuming
|
||||
MEDIUM_MSG("Seeking to %zu", currBuf->size()+startAtByte);
|
||||
segDL.seek(currBuf->size()+startAtByte);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!buffered){
|
||||
// Allocate full size if known
|
||||
if (stopAtByte || segDL.getSize() != std::string::npos){currBuf->allocate(stopAtByte?(stopAtByte - startAtByte):segDL.getSize());}
|
||||
// Download full segment if not seekable, pretend it was cached all along
|
||||
if (!segDL.isSeekable()){
|
||||
segDL.readAll(*this);
|
||||
if (startAtByte || stopAtByte){
|
||||
WARN_MSG("Wasting data: downloaded whole segment due to unavailability of range requests, but caching only part of it");
|
||||
if (startAtByte){currBuf->shift(startAtByte);}
|
||||
if (stopAtByte){currBuf->truncate(stopAtByte - startAtByte);}
|
||||
//Overwrite the current segment size
|
||||
segBufTotalSize -= segBufSize.front();
|
||||
segBufSize.front() = currBuf->size();
|
||||
segBufTotalSize += segBufSize.front();
|
||||
}
|
||||
buffered = true;
|
||||
}
|
||||
}
|
||||
|
||||
encrypted = false;
|
||||
outData.truncate(0);
|
||||
// If we have a non-null key, decrypt
|
||||
if (entry.keyAES[0] != 0 || entry.keyAES[1] != 0 || entry.keyAES[2] != 0 || entry.keyAES[3] != 0 ||
|
||||
entry.keyAES[4] != 0 || entry.keyAES[5] != 0 || entry.keyAES[6] != 0 || entry.keyAES[7] != 0 ||
|
||||
entry.keyAES[8] != 0 || entry.keyAES[9] != 0 || entry.keyAES[10] != 0 || entry.keyAES[11] != 0 ||
|
||||
entry.keyAES[12] != 0 || entry.keyAES[13] != 0 || entry.keyAES[14] != 0 || entry.keyAES[15] != 0){
|
||||
encrypted = true;
|
||||
#ifdef SSL
|
||||
// Load key
|
||||
mbedtls_aes_setkey_dec(&aes, (const unsigned char *)entry.keyAES, 128);
|
||||
// Load initialization vector
|
||||
memcpy(tmpIvec, entry.ivec, 16);
|
||||
#endif
|
||||
}
|
||||
|
||||
packetPtr = 0;
|
||||
isOpen = true;
|
||||
HIGH_MSG("Segment download complete and passed sanity checks");
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Handles both initial load and future reloads.
|
||||
/// Returns how many segments were added to the internal segment list.
|
||||
bool Playlist::reload(){
|
||||
|
@ -512,6 +298,9 @@ namespace Mist{
|
|||
std::string keyUri;
|
||||
std::string keyIV;
|
||||
|
||||
std::string mapUri;
|
||||
std::string mapRange;
|
||||
|
||||
int count = 0;
|
||||
|
||||
std::istringstream urlSource;
|
||||
|
@ -619,6 +408,59 @@ namespace Mist{
|
|||
continue;
|
||||
}
|
||||
|
||||
if (key == "MAP"){
|
||||
size_t mapLen = 0, mapOffset = 0;
|
||||
size_t tmpPos = val.find("BYTERANGE=\"");
|
||||
size_t tmpPos2 = val.substr(tmpPos).find('"');
|
||||
if (tmpPos != std::string::npos){
|
||||
mapRange = val.substr(tmpPos + 11, tmpPos2 - tmpPos - 11);
|
||||
|
||||
size_t atSign = mapRange.find('@');
|
||||
if (atSign != std::string::npos){
|
||||
std::string len = mapRange.substr(0, atSign);
|
||||
std::string pos = mapRange.substr(atSign+1);
|
||||
mapLen = atoll(len.c_str());
|
||||
mapOffset = atoll(pos.c_str());
|
||||
}else{
|
||||
mapLen = atoll(val.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
tmpPos = val.find("URI=\"");
|
||||
tmpPos2 = val.substr(tmpPos + 5).find('"');
|
||||
if (tmpPos != std::string::npos){
|
||||
mapUri = val.substr(tmpPos + 5, tmpPos2);
|
||||
}
|
||||
|
||||
// when key not found, download and store it in the map
|
||||
if (!maps.count(mapUri+mapRange)){
|
||||
HTTP::URIReader mapDL;
|
||||
if (!mapDL.open(root.link(mapUri)) || !mapDL){
|
||||
FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str());
|
||||
continue;
|
||||
}
|
||||
char *mapPtr;
|
||||
size_t mapPLen;
|
||||
mapDL.readAll(mapPtr, mapPLen);
|
||||
if (mapOffset){
|
||||
if (mapOffset <= mapPLen){
|
||||
mapPtr += mapOffset;
|
||||
mapPLen -= mapOffset;
|
||||
}else{
|
||||
mapPLen = 0;
|
||||
}
|
||||
}
|
||||
if (mapLen < mapPLen){mapPLen = mapLen;}
|
||||
if (!mapPLen){
|
||||
FAIL_MSG("Could not retrieve map from '%s'", root.link(mapUri).getUrl().c_str());
|
||||
continue;
|
||||
}
|
||||
maps.insert(std::pair<std::string, std::string>(keyUri, std::string(mapPtr, mapPLen)));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (key == "PLAYLIST-TYPE"){
|
||||
if (val == "VOD"){
|
||||
streamIsVOD = true;
|
||||
|
@ -657,7 +499,7 @@ namespace Mist{
|
|||
memset(ivec, 0, 16);
|
||||
Bit::htobll(ivec + 8, bposCounter);
|
||||
}
|
||||
addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), startByte, lenByte);
|
||||
addEntry(root.link(line).getUrl(), line, segDur, bposCounter, keys[keyUri], std::string(ivec, 16), mapUri+mapRange, startByte, lenByte);
|
||||
lastSegment = bposCounter;
|
||||
++count;
|
||||
}
|
||||
|
@ -681,10 +523,11 @@ namespace Mist{
|
|||
|
||||
/// Adds playlist segments to be processed
|
||||
void Playlist::addEntry(const std::string &absolute_filename, const std::string &filename, float duration, uint64_t &bpos,
|
||||
const std::string &key, const std::string &iv, uint64_t startByte, uint64_t lenByte){
|
||||
const std::string &key, const std::string &iv, const std::string mapName, uint64_t startByte, uint64_t lenByte){
|
||||
playListEntries entry;
|
||||
entry.filename = absolute_filename;
|
||||
entry.relative_filename = filename;
|
||||
entry.mapName = mapName;
|
||||
cleanLine(entry.filename);
|
||||
entry.bytePos = bpos;
|
||||
entry.duration = duration;
|
||||
|
@ -771,6 +614,7 @@ namespace Mist{
|
|||
currentPlaylist = 0;
|
||||
streamOffset = 0;
|
||||
isInitialRun = false;
|
||||
segDowner.onProgress(callbackFunc);
|
||||
|
||||
pidCounter = 1;
|
||||
|
||||
|
@ -819,11 +663,9 @@ namespace Mist{
|
|||
capa["optional"]["bufferTime"]["default"] = 50000;
|
||||
option.null();
|
||||
|
||||
inFile = NULL;
|
||||
}
|
||||
|
||||
InputHLS::~InputHLS(){
|
||||
if (inFile){fclose(inFile);}
|
||||
}
|
||||
|
||||
bool InputHLS::checkArguments(){
|
||||
|
@ -953,7 +795,6 @@ namespace Mist{
|
|||
TS::Packet packet;
|
||||
char *data;
|
||||
size_t dataLen;
|
||||
bool hasPacket = false;
|
||||
meta.reInit(isSingular() ? streamName : "");
|
||||
|
||||
tthread::lock_guard<tthread::mutex> guard(entryMutex);
|
||||
|
@ -966,7 +807,8 @@ namespace Mist{
|
|||
|
||||
for (std::map<uint32_t, std::deque<playListEntries> >::iterator pListIt = listEntries.begin();
|
||||
pListIt != listEntries.end() && config->is_active; pListIt++){
|
||||
tsStream.clear();
|
||||
segDowner.reset();
|
||||
std::string lastMapName;
|
||||
uint32_t entId = 0;
|
||||
bool foundAtLeastOnePacket = false;
|
||||
VERYHIGH_MSG("Playlist %" PRIu32 " starts at media index %lu", pListIt->first, playlistMapping[pListIt->first].firstIndex);
|
||||
|
@ -974,80 +816,47 @@ namespace Mist{
|
|||
for (std::deque<playListEntries>::iterator entryIt = pListIt->second.begin();
|
||||
entryIt != pListIt->second.end() && config->is_active; entryIt++){
|
||||
++currentSegment;
|
||||
tsStream.partialClear();
|
||||
|
||||
if (!segDowner.loadSegment(*entryIt)){
|
||||
if (entryIt->mapName != lastMapName){
|
||||
lastMapName = entryIt->mapName;
|
||||
segDowner.setInit(playlistMapping[pListIt->first].maps[lastMapName]);
|
||||
}
|
||||
if (!loadSegment(segDowner, *entryIt)){
|
||||
FAIL_MSG("Failed to load segment - skipping to next");
|
||||
continue;
|
||||
}
|
||||
entId++;
|
||||
allowRemap = true;
|
||||
while ((!segDowner.atEnd() || tsStream.hasPacket()) && config->is_active){
|
||||
// Wait for packets on each track to make sure the offset is set based on the earliest packet
|
||||
hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket());
|
||||
if (hasPacket){
|
||||
DTSC::Packet headerPack;
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
while (headerPack){
|
||||
size_t tmpTrackId = headerPack.getTrackId();
|
||||
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
|
||||
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
|
||||
size_t idx = M.trackIDToIndex(packetId, getpid());
|
||||
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
|
||||
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
|
||||
idx = M.trackIDToIndex(packetId, getpid());
|
||||
}
|
||||
if (!streamIsLive){
|
||||
headerPack.getString("data", data, dataLen);
|
||||
|
||||
// keyframe data exists, so always add 19 bytes keyframedata.
|
||||
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
|
||||
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
|
||||
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
|
||||
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
|
||||
}
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
foundAtLeastOnePacket = true;
|
||||
}
|
||||
}
|
||||
// No packets available, so read the next TS packet if available
|
||||
if (segDowner.readNext()){
|
||||
packet.FromPointer(segDowner.packetPtr);
|
||||
tsStream.parse(packet, entryIt->bytePos);
|
||||
}
|
||||
}
|
||||
// get last packets
|
||||
tsStream.finish();
|
||||
DTSC::Packet headerPack;
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
while (headerPack){
|
||||
size_t tmpTrackId = headerPack.getTrackId();
|
||||
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
|
||||
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
|
||||
size_t idx = M.trackIDToIndex(packetId, getpid());
|
||||
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
|
||||
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
|
||||
idx = M.trackIDToIndex(packetId, getpid());
|
||||
}
|
||||
while (config->is_active && readNext(segDowner, headerPack, entryIt->bytePos)){
|
||||
if (!config->is_active){return false;}
|
||||
if (headerPack){
|
||||
size_t tmpTrackId = headerPack.getTrackId();
|
||||
uint64_t packetId = getPacketID(pListIt->first, tmpTrackId);
|
||||
uint64_t packetTime = getPacketTime(headerPack.getTime(), tmpTrackId, pListIt->first, entryIt->mUTC);
|
||||
size_t idx = M.trackIDToIndex(packetId, getpid());
|
||||
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
|
||||
segDowner.initializeMetadata(meta, tmpTrackId, packetId);
|
||||
idx = M.trackIDToIndex(packetId, getpid());
|
||||
}
|
||||
if (!streamIsLive){
|
||||
headerPack.getString("data", data, dataLen);
|
||||
|
||||
if (!streamIsLive){
|
||||
headerPack.getString("data", data, dataLen);
|
||||
// keyframe data exists, so always add 19 bytes keyframedata.
|
||||
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
|
||||
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
|
||||
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
|
||||
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
|
||||
// keyframe data exists, so always add 19 bytes keyframedata.
|
||||
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
|
||||
size_t packSendSize = 24 + (packOffset ? 17 : 0) + (entId >= 0 ? 15 : 0) + 19 + dataLen + 11;
|
||||
DONTEVEN_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
|
||||
meta.update(packetTime, packOffset, idx, dataLen, entryIt->bytePos, headerPack.hasMember("keyframe"), packSendSize);
|
||||
}
|
||||
foundAtLeastOnePacket = true;
|
||||
}
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
}
|
||||
// Finally save the offset as part of the TS segment. This is required for bufferframe
|
||||
// to work correctly, since not every segment might have an UTC timestamp tag
|
||||
if (plsTimeOffset.count(pListIt->first)){
|
||||
std::deque<playListEntries> &curList = listEntries[pListIt->first];
|
||||
curList.at(entId-1).timeOffset = plsTimeOffset[pListIt->first];
|
||||
listEntries[pListIt->first].at(entId-1).timeOffset = plsTimeOffset[pListIt->first];
|
||||
}else{
|
||||
std::deque<playListEntries> &curList = listEntries[pListIt->first];
|
||||
curList.at(entId-1).timeOffset = 0;
|
||||
listEntries[pListIt->first].at(entId-1).timeOffset = 0;
|
||||
}
|
||||
|
||||
//Set progress counter
|
||||
|
@ -1055,18 +864,11 @@ namespace Mist{
|
|||
streamStatus.mapped[1] = (255 * currentSegment) / totalSegments;
|
||||
}
|
||||
|
||||
// If live, don't actually parse anything.
|
||||
// If non-live, we read all the segments
|
||||
if (streamIsLive){
|
||||
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex;
|
||||
}else{
|
||||
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + currentSegment;
|
||||
}
|
||||
// If live, don't actually parse anything. If non-live, we read all the segments
|
||||
parsedSegments[pListIt->first] = playlistMapping[pListIt->first].firstIndex + (streamIsLive ? 0 : currentSegment);
|
||||
|
||||
// For still-appending streams, only parse the first segment for each playlist
|
||||
if (streamIsLive && foundAtLeastOnePacket){
|
||||
break;
|
||||
}
|
||||
if (streamIsLive && foundAtLeastOnePacket){break;}
|
||||
}
|
||||
}
|
||||
if (!config->is_active){return false;}
|
||||
|
@ -1129,7 +931,6 @@ namespace Mist{
|
|||
/// \return True if the segment has been buffered successfully
|
||||
bool InputHLS::parseSegmentAsLive(uint64_t segmentIndex){
|
||||
bool hasOffset = false;
|
||||
bool hasPacket = false;
|
||||
uint64_t bufferTime = config->getInteger("pagetimeout");
|
||||
if (config->hasOption("bufferTime")){
|
||||
bufferTime = config->getInteger("bufferTime") / 1000;
|
||||
|
@ -1147,89 +948,53 @@ namespace Mist{
|
|||
FAIL_MSG("Tried to load segment with index '%" PRIu64 "', but the playlist only contains '%zu' entries!", segmentIndex, curList.size());
|
||||
return false;
|
||||
}
|
||||
if (!segDowner.loadSegment(curList.at(segmentIndex))){
|
||||
|
||||
playListEntries & ntry = curList.at(segmentIndex);
|
||||
if (ntry.mapName.size()){
|
||||
segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]);
|
||||
}
|
||||
if (!loadSegment(segDowner, ntry)){
|
||||
FAIL_MSG("Failed to load segment");
|
||||
return false;
|
||||
}
|
||||
|
||||
while (!segDowner.atEnd()){
|
||||
// Wait for packets on each track to make sure the offset is set based on the earliest packet
|
||||
hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket());
|
||||
if (hasPacket){
|
||||
DTSC::Packet headerPack;
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
while (headerPack){
|
||||
size_t tmpTrackId = headerPack.getTrackId();
|
||||
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
|
||||
uint64_t packetTime = headerPack.getTime();
|
||||
// Set segment offset and save it
|
||||
if (!hasOffset && curList.at(segmentIndex).mUTC){
|
||||
hasOffset = true;
|
||||
DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime;
|
||||
MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]);
|
||||
curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist];
|
||||
}
|
||||
if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){
|
||||
hasOffset = true;
|
||||
packetTime += DVRTimeOffsets[currentPlaylist];
|
||||
HIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime);
|
||||
}
|
||||
size_t idx = M.trackIDToIndex(packetId, getpid());
|
||||
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
|
||||
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
|
||||
idx = M.trackIDToIndex(packetId, getpid());
|
||||
}
|
||||
playlistMapping[currentPlaylist].tracks[idx] = true;
|
||||
|
||||
headerPack.getString("data", data, dataLen);
|
||||
// keyframe data exists, so always add 19 bytes keyframedata.
|
||||
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
|
||||
VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
|
||||
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
|
||||
if (isInitialRun){
|
||||
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
|
||||
}else{
|
||||
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
|
||||
}
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
}
|
||||
}
|
||||
// No packets available, so read the next TS packet if available
|
||||
if (segDowner.readNext()){
|
||||
packet.FromPointer(segDowner.packetPtr);
|
||||
tsStream.parse(packet, curList.at(segmentIndex).bytePos);
|
||||
}
|
||||
}
|
||||
// get last packets
|
||||
tsStream.finish();
|
||||
DTSC::Packet headerPack;
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
while (headerPack){
|
||||
int tmpTrackId = headerPack.getTrackId();
|
||||
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
|
||||
uint64_t packetTime = headerPack.getTime();
|
||||
if (DVRTimeOffsets.count(currentPlaylist)){
|
||||
packetTime += DVRTimeOffsets[currentPlaylist];
|
||||
VERYHIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime);
|
||||
}
|
||||
size_t idx = M.trackIDToIndex(packetId, getpid());
|
||||
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
|
||||
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
|
||||
idx = M.trackIDToIndex(packetId, getpid());
|
||||
}
|
||||
playlistMapping[currentPlaylist].tracks[idx] = true;
|
||||
while (readNext(segDowner, headerPack, curList.at(segmentIndex).bytePos)){
|
||||
if (headerPack){
|
||||
size_t tmpTrackId = headerPack.getTrackId();
|
||||
uint64_t packetId = getPacketID(currentPlaylist, tmpTrackId);
|
||||
uint64_t packetTime = headerPack.getTime();
|
||||
// Set segment offset and save it
|
||||
if (!hasOffset && curList.at(segmentIndex).mUTC){
|
||||
hasOffset = true;
|
||||
DVRTimeOffsets[currentPlaylist] = (curList.at(segmentIndex).mUTC - zUTC) - packetTime;
|
||||
MEDIUM_MSG("Setting current live segment time offset to %" PRId64, DVRTimeOffsets[currentPlaylist]);
|
||||
curList.at(segmentIndex).timeOffset = DVRTimeOffsets[currentPlaylist];
|
||||
}
|
||||
if (hasOffset || DVRTimeOffsets.count(currentPlaylist)){
|
||||
hasOffset = true;
|
||||
packetTime += DVRTimeOffsets[currentPlaylist];
|
||||
HIGH_MSG("Adjusting current packet timestamp %" PRIu64 " -> %" PRIu64, headerPack.getTime(), packetTime);
|
||||
}
|
||||
size_t idx = M.trackIDToIndex(packetId, getpid());
|
||||
if (idx == INVALID_TRACK_ID || !M.getCodec(idx).size()){
|
||||
tsStream.initializeMetadata(meta, tmpTrackId, packetId);
|
||||
idx = M.trackIDToIndex(packetId, getpid());
|
||||
}
|
||||
playlistMapping[currentPlaylist].tracks[idx] = true;
|
||||
|
||||
headerPack.getString("data", data, dataLen);
|
||||
// keyframe data exists, so always add 19 bytes keyframedata.
|
||||
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
|
||||
VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
|
||||
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
|
||||
if (isInitialRun){
|
||||
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
|
||||
}else{
|
||||
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
|
||||
headerPack.getString("data", data, dataLen);
|
||||
// keyframe data exists, so always add 19 bytes keyframedata.
|
||||
uint32_t packOffset = headerPack.hasMember("offset") ? headerPack.getInt("offset") : 0;
|
||||
VERYHIGH_MSG("Adding packet (%zuB) at %" PRIu64 " with an offset of %" PRIu32 " on track %zu", dataLen, packetTime, packOffset, idx);
|
||||
bufferLivePacket(packetTime, packOffset, idx, data, dataLen, curList.at(segmentIndex).bytePos, headerPack.hasMember("keyframe"));
|
||||
if (isInitialRun){
|
||||
pageCounter[idx][getCurrentLivePage(idx)] = curTimeout;
|
||||
}else{
|
||||
pageCounter[idx][getCurrentLivePage(idx)] = Util::bootSecs();
|
||||
}
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
}
|
||||
tsStream.getEarliestPacket(headerPack);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -1338,64 +1103,30 @@ namespace Mist{
|
|||
void InputHLS::getNext(size_t idx){
|
||||
INSANE_MSG("Getting next");
|
||||
uint32_t tid = 0;
|
||||
bool finished = false;
|
||||
thisPacket.null();
|
||||
while (config->is_active && (needsLock() || keepAlive())){
|
||||
// Check if we have a packet
|
||||
bool hasPacket = false;
|
||||
if (idx == INVALID_TRACK_ID){
|
||||
hasPacket = tsStream.hasPacketOnEachTrack() || (segDowner.atEnd() && tsStream.hasPacket());
|
||||
}else{
|
||||
hasPacket = tsStream.hasPacket(getMappedTrackId(M.getID(idx)));
|
||||
}
|
||||
|
||||
// Yes? Excellent! Read and return it.
|
||||
if (hasPacket){
|
||||
// Read
|
||||
if (idx == INVALID_TRACK_ID){
|
||||
tsStream.getEarliestPacket(thisPacket);
|
||||
if (readNext(segDowner, thisPacket, listEntries[currentPlaylist].at(currentIndex).bytePos)){
|
||||
if (thisPacket){
|
||||
tid = getOriginalTrackId(currentPlaylist, thisPacket.getTrackId());
|
||||
if (!tid){
|
||||
INSANE_MSG("Track %zu on PLS %" PRIu64 " -> %" PRIu32, thisPacket.getTrackId(), currentPlaylist, tid);
|
||||
continue;
|
||||
// Is it one we want?
|
||||
if (idx == INVALID_TRACK_ID || getMappedTrackId(M.getID(idx)) == thisPacket.getTrackId()){
|
||||
uint64_t packetTime = thisPacket.getTime();
|
||||
if (listEntries[currentPlaylist].at(currentIndex).timeOffset){
|
||||
packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset;
|
||||
}else{
|
||||
packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC);
|
||||
}
|
||||
INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime);
|
||||
// overwrite trackId on success
|
||||
Bit::htobl(thisPacket.getData() + 8, tid);
|
||||
Bit::htobll(thisPacket.getData() + 12, packetTime);
|
||||
thisTime = packetTime;
|
||||
thisIdx = tid;
|
||||
return; // Success!
|
||||
}
|
||||
}else{
|
||||
tid = getMappedTrackId(M.getID(idx));
|
||||
tsStream.getPacket(tid, thisPacket);
|
||||
}
|
||||
if (!thisPacket){
|
||||
Util::logExitReason(ER_FORMAT_SPECIFIC, "Could not getNext TS packet!");
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t packetTime = thisPacket.getTime();
|
||||
if (listEntries[currentPlaylist].at(currentIndex).timeOffset){
|
||||
packetTime += listEntries[currentPlaylist].at(currentIndex).timeOffset;
|
||||
}else{
|
||||
packetTime = getPacketTime(thisPacket.getTime(), tid, currentPlaylist, nUTC);
|
||||
}
|
||||
INSANE_MSG("Packet %" PRIu32 "@%" PRIu64 "ms -> %" PRIu64 "ms", tid, thisPacket.getTime(), packetTime);
|
||||
// overwrite trackId on success
|
||||
Bit::htobl(thisPacket.getData() + 8, tid);
|
||||
Bit::htobll(thisPacket.getData() + 12, packetTime);
|
||||
thisTime = packetTime;
|
||||
thisIdx = tid;
|
||||
return; // Success!
|
||||
}
|
||||
|
||||
// No? Let's read some more data and check again.
|
||||
if (!segDowner.atEnd() && segDowner.readNext()){
|
||||
tsBuf.FromPointer(segDowner.packetPtr);
|
||||
tsStream.parse(tsBuf, listEntries[currentPlaylist].at(currentIndex).bytePos);
|
||||
continue; // check again
|
||||
}
|
||||
|
||||
// Okay, reading more is not possible. Let's call finish() and check again.
|
||||
if (!finished && segDowner.atEnd()){
|
||||
tsStream.finish();
|
||||
finished = true;
|
||||
VERYHIGH_MSG("Finishing reading TS segment");
|
||||
continue; // Check again!
|
||||
continue;
|
||||
}
|
||||
|
||||
// No? Then we want to try reading the next file.
|
||||
|
@ -1416,7 +1147,6 @@ namespace Mist{
|
|||
VERYHIGH_MSG("Moving on to next TS segment (variant %" PRIu64 ")", currentPlaylist);
|
||||
if (readNextFile()){
|
||||
MEDIUM_MSG("Next segment read successfully");
|
||||
finished = false;
|
||||
continue; // Success! Continue regular parsing.
|
||||
}else{
|
||||
if (userSelect.size() > 1){
|
||||
|
@ -1442,7 +1172,7 @@ namespace Mist{
|
|||
plsTimeOffset.clear();
|
||||
plsLastTime.clear();
|
||||
plsInterval.clear();
|
||||
tsStream.clear();
|
||||
segDowner.reset();
|
||||
uint64_t trackId = M.getID(idx);
|
||||
|
||||
unsigned long plistEntry = 0;
|
||||
|
@ -1478,23 +1208,18 @@ namespace Mist{
|
|||
curPlaylist.size(), currentIndex);
|
||||
return;
|
||||
}
|
||||
playListEntries &entry = curPlaylist.at(currentIndex);
|
||||
segDowner.loadSegment(entry);
|
||||
playListEntries & e = curPlaylist.at(currentIndex);
|
||||
if (e.mapName.size()){
|
||||
segDowner.setInit(playlistMapping[currentPlaylist].maps[e.mapName]);
|
||||
}
|
||||
loadSegment(segDowner, e);
|
||||
// If we have an offset, load it
|
||||
allowRemap = false;
|
||||
if (entry.timeOffset){
|
||||
HIGH_MSG("Setting time offset of this TS segment to %" PRId64, entry.timeOffset);
|
||||
plsTimeOffset[currentPlaylist] = entry.timeOffset;
|
||||
if (e.timeOffset){
|
||||
HIGH_MSG("Setting time offset of this TS segment to %" PRId64, e.timeOffset);
|
||||
plsTimeOffset[currentPlaylist] = e.timeOffset;
|
||||
}
|
||||
}
|
||||
|
||||
HIGH_MSG("readPMT()");
|
||||
TS::Packet tsBuffer;
|
||||
while (!tsStream.hasPacketOnEachTrack() && !segDowner.atEnd()){
|
||||
if (!segDowner.readNext()){break;}
|
||||
tsBuffer.FromPointer(segDowner.packetPtr);
|
||||
tsStream.parse(tsBuffer, listEntries[currentPlaylist].at(currentIndex).bytePos);
|
||||
}
|
||||
}
|
||||
|
||||
/// \brief Applies any offset to the packets original timestamp
|
||||
|
@ -1809,7 +1534,7 @@ namespace Mist{
|
|||
/// Read next .ts file from the playlist. (from the list of entries which needs
|
||||
/// to be processed)
|
||||
bool InputHLS::readNextFile(){
|
||||
tsStream.clear();
|
||||
segDowner.reset();
|
||||
|
||||
playListEntries ntry;
|
||||
// This scope limiter prevents the recursion down below from deadlocking us
|
||||
|
@ -1837,7 +1562,10 @@ namespace Mist{
|
|||
ntry = curList[currentIndex];
|
||||
}
|
||||
|
||||
if (!segDowner.loadSegment(ntry)){
|
||||
if (ntry.mapName.size()){
|
||||
segDowner.setInit(playlistMapping[currentPlaylist].maps[ntry.mapName]);
|
||||
}
|
||||
if (!loadSegment(segDowner, ntry)){
|
||||
ERROR_MSG("Could not download segment: %s", ntry.filename.c_str());
|
||||
return readNextFile(); // Attempt to read another, if possible.
|
||||
}
|
||||
|
|
|
@ -2,12 +2,7 @@
|
|||
#include "input.h"
|
||||
#include <mist/dtsc.h>
|
||||
#include <mist/nal.h>
|
||||
#include <mist/ts_packet.h>
|
||||
#include <mist/ts_stream.h>
|
||||
#include <string>
|
||||
//#include <stdint.h>
|
||||
#include <mist/http_parser.h>
|
||||
#include <mist/urireader.h>
|
||||
#include <mist/segmentreader.h>
|
||||
|
||||
#define BUFFERTIME 10
|
||||
|
||||
|
@ -21,6 +16,7 @@ namespace Mist{
|
|||
struct playListEntries{
|
||||
std::string filename;
|
||||
std::string relative_filename;
|
||||
std::string mapName;
|
||||
uint64_t startAtByte; ///< Byte position inside filename where to start reading
|
||||
uint64_t stopAtByte; ///< Byte position inside filename where to stop sending
|
||||
uint64_t bytePos;
|
||||
|
@ -61,45 +57,20 @@ namespace Mist{
|
|||
return a.filename < b.filename || (a.filename == b.filename && a.startAtByte < b.startAtByte);
|
||||
}
|
||||
|
||||
inline bool operator== (const playListEntries a, const playListEntries b){
|
||||
return a.filename == b.filename && a.startAtByte == b.startAtByte;
|
||||
}
|
||||
|
||||
/// Keeps the segment entry list by playlist ID
|
||||
extern std::map<uint32_t, std::deque<playListEntries> > listEntries;
|
||||
|
||||
class SegmentDownloader: public Util::DataCallback{
|
||||
public:
|
||||
SegmentDownloader();
|
||||
HTTP::URIReader segDL;
|
||||
char *packetPtr;
|
||||
bool loadSegment(const playListEntries &entry);
|
||||
bool readNext();
|
||||
virtual void dataCallback(const char *ptr, size_t size);
|
||||
virtual size_t getDataCallbackPos() const;
|
||||
void close();
|
||||
bool atEnd() const;
|
||||
|
||||
private:
|
||||
uint64_t startAtByte;
|
||||
uint64_t stopAtByte;
|
||||
bool encrypted;
|
||||
bool buffered;
|
||||
size_t offset;
|
||||
bool firstPacket;
|
||||
Util::ResizeablePointer outData;
|
||||
Util::ResizeablePointer * currBuf;
|
||||
size_t encOffset;
|
||||
unsigned char tmpIvec[16];
|
||||
#ifdef SSL
|
||||
mbedtls_aes_context aes;
|
||||
#endif
|
||||
bool isOpen;
|
||||
};
|
||||
|
||||
class Playlist{
|
||||
public:
|
||||
Playlist(const std::string &uriSrc = "");
|
||||
bool isUrl() const;
|
||||
bool reload();
|
||||
void addEntry(const std::string & absolute_filename, const std::string &filename, float duration, uint64_t &bpos,
|
||||
const std::string &key, const std::string &keyIV, uint64_t startByte, uint64_t lenByte);
|
||||
const std::string &key, const std::string &keyIV, const std::string mapName, uint64_t startByte, uint64_t lenByte);
|
||||
|
||||
std::string uri; // link to the current playlistfile
|
||||
HTTP::URL root;
|
||||
|
@ -118,6 +89,7 @@ namespace Mist{
|
|||
uint64_t nextUTC; ///< If non-zero, the UTC timestamp of the next segment on this playlist
|
||||
char keyAES[16];
|
||||
std::map<std::string, std::string> keys;
|
||||
std::map<std::string, std::string> maps;
|
||||
uint64_t firstIndex; //< the index of the first segment in the playlist
|
||||
uint64_t lastSegment;
|
||||
std::map<size_t, bool> tracks;
|
||||
|
@ -138,7 +110,7 @@ namespace Mist{
|
|||
uint64_t nUTC; ///< Next packet timestamp in UTC unix time millis
|
||||
int64_t streamOffset; ///< bootMsOffset we need to set once we have parsed the header
|
||||
unsigned int startTime;
|
||||
SegmentDownloader segDowner;
|
||||
SegmentReader segDowner;
|
||||
int version;
|
||||
int targetDuration;
|
||||
bool endPlaylist;
|
||||
|
@ -155,11 +127,6 @@ namespace Mist{
|
|||
size_t currentIndex;
|
||||
std::string currentFile;
|
||||
|
||||
TS::Stream tsStream; ///< Used for parsing the incoming ts stream
|
||||
|
||||
Socket::Connection conn;
|
||||
TS::Packet tsBuf;
|
||||
|
||||
// Used to map packetId of packets in pidMapping
|
||||
int pidCounter;
|
||||
|
||||
|
@ -181,8 +148,6 @@ namespace Mist{
|
|||
bool readExistingHeader();
|
||||
void getNext(size_t idx = INVALID_TRACK_ID);
|
||||
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);
|
||||
FILE *inFile;
|
||||
FILE *tsFile;
|
||||
|
||||
bool readIndex();
|
||||
bool initPlaylist(const std::string &uri, bool fullInit = true);
|
||||
|
|
Loading…
Add table
Reference in a new issue