EBML input from S3 support

This commit is contained in:
Thulinma 2023-04-05 14:27:20 +02:00
parent 55be798e46
commit 364441c435
7 changed files with 176 additions and 61 deletions

View file

@ -108,6 +108,18 @@ namespace HTTP{
size_t URIReader::getDataCallbackPos() const{return allData.size();} size_t URIReader::getDataCallbackPos() const{return allData.size();}
bool URIReader::open(const int fd){
close();
myURI = HTTP::URL("file://-");
originalUrl = myURI;
downer.getSocket().open(-1, fd);
stateType = HTTP::Stream;
startPos = 0;
endPos = std::string::npos;
totalSize = std::string::npos;
return true;
}
bool URIReader::open(const HTTP::URL &uri){ bool URIReader::open(const HTTP::URL &uri){
close(); close();
myURI = uri; myURI = uri;
@ -317,15 +329,17 @@ namespace HTTP{
}else if (stateType == HTTP::HTTP){ }else if (stateType == HTTP::HTTP){
downer.continueNonBlocking(cb); downer.continueNonBlocking(cb);
}else{// streaming mode }else{// streaming mode
int s; int s = downer.getSocket().Received().bytes(wantedLen);
if ((downer.getSocket() && downer.getSocket().spool())){// || downer.getSocket().Received().size() > 0){ if (!s){
s = downer.getSocket().Received().bytes(wantedLen); if (downer.getSocket() && downer.getSocket().spool()){
std::string buf = downer.getSocket().Received().remove(s); s = downer.getSocket().Received().bytes(wantedLen);
}else{
cb.dataCallback(buf.data(), s); Util::sleep(50);
}else{ return;
Util::sleep(50); }
} }
std::string buf = downer.getSocket().Received().remove(s);
cb.dataCallback(buf.data(), s);
} }
} }

View file

@ -20,6 +20,8 @@ namespace HTTP{
/// Calls open on the given relative uri during construction /// Calls open on the given relative uri during construction
/// URI is resolved relative to the current working directory /// URI is resolved relative to the current working directory
URIReader(const std::string &reluri); URIReader(const std::string &reluri);
/// Sets the internal URI to file://- and opens the given file descriptor in stream mode.
bool open(const int fd);
/// Sets the internal URI to the given URI and opens it, whatever that may mean for the given URI type. /// Sets the internal URI to the given URI and opens it, whatever that may mean for the given URI type.
bool open(const HTTP::URL &uri); bool open(const HTTP::URL &uri);
/// Links the internal URI to the given relative URI and opens it, whatever that may mean for the current URI type. /// Links the internal URI to the given relative URI and opens it, whatever that may mean for the current URI type.

View file

@ -41,7 +41,7 @@ namespace Mist{
//Exactly! I thought not! So, if the end key number == the first, we increase by one. //Exactly! I thought not! So, if the end key number == the first, we increase by one.
if (endKey == key){++endKey;} if (endKey == key){++endKey;}
if (endKey > key + 1000){endKey = key + 1000;} if (endKey > key + 1000){endKey = key + 1000;}
DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time); DONTEVEN_MSG("User with ID:%zu is on %zu:%zu -> %zu (timestamp %" PRIu64 ")", id, track, key, endKey, time);
for (size_t i = key; i <= endKey; ){ for (size_t i = key; i <= endKey; ){
@ -55,10 +55,11 @@ namespace Mist{
pageIdx = j; pageIdx = j;
} }
uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); uint32_t pageNumber = tPages.getInt("firstkey", pageIdx);
if (i == key){ uint64_t pageTime = M.getTimeForKeyIndex(track, pageNumber);
if (pageTime < time){
keyLoadPriority[trackKey(track, pageNumber)] += 10000; keyLoadPriority[trackKey(track, pageNumber)] += 10000;
}else{ }else{
keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (i - key); keyLoadPriority[trackKey(track, pageNumber)] += 600 - (pageTime - time) / 1000;
} }
uint64_t cnt = tPages.getInt("keycount", pageIdx); uint64_t cnt = tPages.getInt("keycount", pageIdx);
if (pageNumber + cnt <= i){return;} if (pageNumber + cnt <= i){return;}

View file

@ -15,6 +15,26 @@ namespace Mist{
capa["source_match"].append("/*.mk3d"); capa["source_match"].append("/*.mk3d");
capa["source_match"].append("/*.mks"); capa["source_match"].append("/*.mks");
capa["source_match"].append("/*.webm"); capa["source_match"].append("/*.webm");
capa["source_match"].append("http://*.mkv");
capa["source_match"].append("http://*.mka");
capa["source_match"].append("http://*.mk3d");
capa["source_match"].append("http://*.mks");
capa["source_match"].append("http://*.webm");
capa["source_match"].append("https://*.mkv");
capa["source_match"].append("https://*.mka");
capa["source_match"].append("https://*.mk3d");
capa["source_match"].append("https://*.mks");
capa["source_match"].append("https://*.webm");
capa["source_match"].append("s3+http://*.mkv");
capa["source_match"].append("s3+http://*.mka");
capa["source_match"].append("s3+http://*.mk3d");
capa["source_match"].append("s3+http://*.mks");
capa["source_match"].append("s3+http://*.webm");
capa["source_match"].append("s3+https://*.mkv");
capa["source_match"].append("s3+https://*.mka");
capa["source_match"].append("s3+https://*.mk3d");
capa["source_match"].append("s3+https://*.mks");
capa["source_match"].append("s3+https://*.webm");
capa["source_match"].append("mkv-exec:*"); capa["source_match"].append("mkv-exec:*");
capa["always_match"].append("mkv-exec:*"); capa["always_match"].append("mkv-exec:*");
capa["source_file"] = "$source"; capa["source_file"] = "$source";
@ -45,6 +65,10 @@ namespace Mist{
bufferedPacks = 0; bufferedPacks = 0;
wantBlocks = true; wantBlocks = true;
totalBytes = 0; totalBytes = 0;
readBufferOffset = 0;
readPos = 0;
readingMinimal = true;
firstRead = true;
} }
std::string ASStoSRT(const char *ptr, uint32_t len){ std::string ASStoSRT(const char *ptr, uint32_t len){
@ -94,7 +118,8 @@ namespace Mist{
} }
bool InputEBML::needsLock(){ bool InputEBML::needsLock(){
// Standard input requires no lock, otherwise default behaviour. // Streamed input requires no lock, non-streamed does
if (!standAlone){return false;}
if (config->getString("input") == "-" || config->getString("input").substr(0, 9) == "mkv-exec:"){return false;} if (config->getString("input") == "-" || config->getString("input").substr(0, 9) == "mkv-exec:"){return false;}
return Input::needsLock(); return Input::needsLock();
} }
@ -127,57 +152,86 @@ namespace Mist{
Util::Procs::StartPiped(args, &fin, &fout, 0); Util::Procs::StartPiped(args, &fin, &fout, 0);
if (fout == -1){return false;} if (fout == -1){return false;}
dup2(fout, 0); dup2(fout, 0);
inFile = stdin; inFile.open(0);
return true; return true;
} }
if (config->getString("input") == "-"){ if (config->getString("input") == "-"){
inFile = stdin; standAlone = false;
inFile.open(0);
}else{ }else{
// open File // open File
inFile = fopen(config->getString("input").c_str(), "r"); inFile.open(config->getString("input"));
if (!inFile){return false;} if (!inFile){return false;}
standAlone = inFile.isSeekable();
} }
return true; return true;
} }
void InputEBML::dataCallback(const char *ptr, size_t size){
readBuffer.append(ptr, size);
totalBytes += size;
}
size_t InputEBML::getDataCallbackPos() const{return readPos + readBuffer.size();}
bool InputEBML::readElement(){ bool InputEBML::readElement(){
ptr.truncate(0); uint32_t needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal);
readingMinimal = true; if (!firstRead && readBuffer.size() >= needed + readBufferOffset){
uint32_t needed = EBML::Element::needBytes(ptr, ptr.size(), readingMinimal); readBufferOffset += needed;
while (ptr.size() < needed && config->is_active){ needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal);
if (!ptr.allocate(needed)){return false;} readingMinimal = true;
int64_t toRead = needed - ptr.size(); if (readBuffer.size() >= needed + readBufferOffset){
int readResult = 0;
while (!readResult){
readResult = fread(ptr + ptr.size(), toRead, 1, inFile);
if (!readResult){
if (errno == EINTR){
continue;
}
// At EOF we don't print a warning
if (!feof(inFile)){
FAIL_MSG("Could not read more data! (have %zu, need %" PRIu32 ")", ptr.size(), needed);
}
return false;
}
ptr.append(0, toRead);
}
totalBytes += toRead;
needed = EBML::Element::needBytes(ptr, ptr.size(), readingMinimal);
if (ptr.size() >= needed){
// Make sure TrackEntry types are read whole // Make sure TrackEntry types are read whole
if (readingMinimal && EBML::Element(ptr).getID() == EBML::EID_TRACKENTRY){ if (readingMinimal && EBML::Element(readBuffer + readBufferOffset).getID() == EBML::EID_TRACKENTRY){
readingMinimal = false; readingMinimal = false;
needed = EBML::Element::needBytes(ptr, ptr.size(), readingMinimal); needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal);
} }
} }
} }
EBML::Element E(ptr);
while (readBuffer.size() < needed + readBufferOffset && config->is_active){
if (!readBuffer.allocate(needed + readBufferOffset)){return false;}
if (!inFile){return false;}
int64_t toRead = needed - readBuffer.size() + readBufferOffset;
if (standAlone){
//If we have more than 10MiB buffered and are more than 10MiB into the buffer, shift the first 4MiB off the buffer.
//This prevents infinite growth of the read buffer for large files, but allows for some re-use of data.
if (readBuffer.size() >= 10*1024*1024 && readBufferOffset > 10*1024*1024){
readBuffer.shift(4*1024*1024);
readBufferOffset -= 4*1024*1024;
readPos += 4*1024*1024;
}
}else{
//For non-standalone mode, we know we're always live streaming, and can always cut off what we've shifted
if (readBufferOffset){
readBuffer.shift(readBufferOffset);
readPos += readBufferOffset;
readBufferOffset = 0;
}
}
size_t preSize = readBuffer.size();
inFile.readSome(toRead, *this);
if (readBuffer.size() == preSize){
Util::sleep(5);
continue;
}
needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal);
if (readBuffer.size() >= needed + readBufferOffset){
// Make sure TrackEntry types are read whole
if (readingMinimal && EBML::Element(readBuffer + readBufferOffset).getID() == EBML::EID_TRACKENTRY){
readingMinimal = false;
needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal);
}
}
}
EBML::Element E(readBuffer + readBufferOffset);
if (E.getID() == EBML::EID_CLUSTER){ if (E.getID() == EBML::EID_CLUSTER){
if (inFile == stdin){ if (!inFile.isSeekable()){
lastClusterBPos = 0; lastClusterBPos = 0;
}else{ }else{
int64_t bp = Util::ftell(inFile); int64_t bp = readPos + readBufferOffset;
if (bp == -1 && errno == ESPIPE){ if (bp == -1 && errno == ESPIPE){
lastClusterBPos = 0; lastClusterBPos = 0;
}else{ }else{
@ -190,6 +244,7 @@ namespace Mist{
lastClusterTime = E.getValUInt(); lastClusterTime = E.getValUInt();
DONTEVEN_MSG("Cluster time %" PRIu64 " ms", lastClusterTime); DONTEVEN_MSG("Cluster time %" PRIu64 " ms", lastClusterTime);
} }
firstRead = false;
return true; return true;
} }
@ -216,7 +271,7 @@ namespace Mist{
WARN_MSG("Aborting header generation due to shutdown: %s", Util::exitReason); WARN_MSG("Aborting header generation due to shutdown: %s", Util::exitReason);
return false; return false;
} }
EBML::Element E(ptr, readingMinimal); EBML::Element E(readBuffer + readBufferOffset, readingMinimal);
if (E.getID() == EBML::EID_TRACKENTRY){ if (E.getID() == EBML::EID_TRACKENTRY){
EBML::Element tmpElem = E.findChild(EBML::EID_TRACKNUMBER); EBML::Element tmpElem = E.findChild(EBML::EID_TRACKNUMBER);
if (!tmpElem){ if (!tmpElem){
@ -392,9 +447,15 @@ namespace Mist{
timeScale = ((double)timeScaleVal) / 1000000.0; timeScale = ((double)timeScaleVal) / 1000000.0;
} }
// Live streams stop parsing the header as soon as the first Cluster is encountered // Live streams stop parsing the header as soon as the first Cluster is encountered
if (E.getID() == EBML::EID_CLUSTER && !needsLock()){return true;} if (E.getID() == EBML::EID_CLUSTER){
if (!needsLock()){return true;}
//Set progress counter for non-live inputs
if (streamStatus && streamStatus.len > 1 && inFile.getSize()){
streamStatus.mapped[1] = (255 * (readPos + readBufferOffset)) / inFile.getSize();
}
}
if (E.getType() == EBML::ELEM_BLOCK){ if (E.getType() == EBML::ELEM_BLOCK){
EBML::Block B(ptr); EBML::Block B(readBuffer + readBufferOffset);
uint64_t tNum = B.getTrackNum(); uint64_t tNum = B.getTrackNum();
uint64_t newTime = lastClusterTime + B.getTimecode(); uint64_t newTime = lastClusterTime + B.getTimecode();
trackPredictor &TP = packBuf[tNum]; trackPredictor &TP = packBuf[tNum];
@ -414,15 +475,15 @@ namespace Mist{
for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){ for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){
if (frameNo){ if (frameNo){
if (M.getCodec(idx) == "AAC"){ if (M.getCodec(idx) == "AAC"){
newTime += (1000000 / M.getRate(idx)) / timeScale; // assume ~1000 samples per frame newTime += (uint64_t)(1000000 / M.getRate(idx)) / timeScale; // assume ~1000 samples per frame
}else if (M.getCodec(idx) == "MP3"){ }else if (M.getCodec(idx) == "MP3"){
newTime += (1152000 / M.getRate(idx)) / timeScale; // 1152 samples per frame newTime += (uint64_t)(1152000 / M.getRate(idx)) / timeScale; // 1152 samples per frame
}else if (M.getCodec(idx) == "DTS"){ }else if (M.getCodec(idx) == "DTS"){
// Assume 512 samples per frame (DVD default) // Assume 512 samples per frame (DVD default)
// actual amount can be calculated from data, but data // actual amount can be calculated from data, but data
// is not available during header generation... // is not available during header generation...
// See: http://www.stnsoft.com/DVD/dtshdr.html // See: http://www.stnsoft.com/DVD/dtshdr.html
newTime += (512000 / M.getRate(idx)) / timeScale; newTime += (uint64_t)(512000 / M.getRate(idx)) / timeScale;
}else{ }else{
newTime += 1 / timeScale; newTime += 1 / timeScale;
ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!", ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!",
@ -538,6 +599,7 @@ namespace Mist{
EBML::Block B; EBML::Block B;
if (wantBlocks){ if (wantBlocks){
do{ do{
if (!config->is_active){return;}
if (!readElement()){ if (!readElement()){
// Make sure we empty our buffer first // Make sure we empty our buffer first
if (bufferedPacks && packBuf.size()){ if (bufferedPacks && packBuf.size()){
@ -557,11 +619,11 @@ namespace Mist{
thisPacket.null(); thisPacket.null();
return; return;
} }
B = EBML::Block(ptr); B = EBML::Block(readBuffer + readBufferOffset);
}while (!B || B.getType() != EBML::ELEM_BLOCK || }while (!B || B.getType() != EBML::ELEM_BLOCK ||
(singleTrack && wantedID != B.getTrackNum())); (singleTrack && wantedID != B.getTrackNum()));
}else{ }else{
B = EBML::Block(ptr); B = EBML::Block(readBuffer + readBufferOffset);
} }
uint64_t tNum = B.getTrackNum(); uint64_t tNum = B.getTrackNum();
@ -590,15 +652,15 @@ namespace Mist{
for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){ for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){
if (frameNo){ if (frameNo){
if (M.getCodec(thisIdx) == "AAC"){ if (M.getCodec(thisIdx) == "AAC"){
newTime += (1000000 / M.getRate(thisIdx)) / timeScale; // assume ~1000 samples per frame newTime += (uint64_t)(1000000 / M.getRate(thisIdx)) / timeScale; // assume ~1000 samples per frame
}else if (M.getCodec(thisIdx) == "MP3"){ }else if (M.getCodec(thisIdx) == "MP3"){
newTime += (1152000 / M.getRate(thisIdx)) / timeScale; // 1152 samples per frame newTime += (uint64_t)(1152000 / M.getRate(thisIdx)) / timeScale; // 1152 samples per frame
}else if (M.getCodec(thisIdx) == "DTS"){ }else if (M.getCodec(thisIdx) == "DTS"){
// Assume 512 samples per frame (DVD default) // Assume 512 samples per frame (DVD default)
// actual amount can be calculated from data, but data // actual amount can be calculated from data, but data
// is not available during header generation... // is not available during header generation...
// See: http://www.stnsoft.com/DVD/dtshdr.html // See: http://www.stnsoft.com/DVD/dtshdr.html
newTime += (512000 / M.getRate(thisIdx)) / timeScale; newTime += (uint64_t)(512000 / M.getRate(thisIdx)) / timeScale;
}else{ }else{
ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!", ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!",
M.getCodec(thisIdx).c_str()); M.getCodec(thisIdx).c_str());
@ -647,7 +709,34 @@ namespace Mist{
DONTEVEN_MSG("Seeking to %" PRIu64 ", found %" PRIu64 "...", seekTime, keys.getTime(i)); DONTEVEN_MSG("Seeking to %" PRIu64 ", found %" PRIu64 "...", seekTime, keys.getTime(i));
seekPos = keys.getBpos(i); seekPos = keys.getBpos(i);
} }
Util::fseek(inFile, seekPos, SEEK_SET);
firstRead = true;
if (readPos > seekPos || seekPos > readPos + readBuffer.size() + 4*1024*1024){
readBuffer.truncate(0);
readBufferOffset = 0;
if (!inFile.seek(seekPos)){
FAIL_MSG("Seek to %" PRIu64 " failed! Aborting load", seekPos);
}
readPos = inFile.getPos();
}else{
while (seekPos > readPos + readBuffer.size() && config->is_active){
size_t preSize = readBuffer.size();
inFile.readSome(seekPos - (readPos + readBuffer.size()), *this);
if (readBuffer.size() == preSize){
Util::sleep(5);
}
}
if (seekPos > readPos + readBuffer.size()){
Util::logExitReason("Input file seek abort");
config->is_active = false;
readBufferOffset = 0;
return;
}
readBufferOffset = seekPos - readPos;
}
} }
/// Flushes all trackPredictors without deleting permanent data from them. /// Flushes all trackPredictors without deleting permanent data from them.

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "input.h" #include "input.h"
#include <mist/util.h> #include <mist/util.h>
#include <mist/urireader.h>
namespace Mist{ namespace Mist{
@ -122,12 +123,22 @@ namespace Mist{
} }
}; };
class InputEBML : public Input{ class InputEBML : public Input, public Util::DataCallback{
public: public:
InputEBML(Util::Config *cfg); InputEBML(Util::Config *cfg);
bool needsLock(); bool needsLock();
virtual bool isSingular(){return standAlone && !config->getBool("realtime");}
virtual void dataCallback(const char *ptr, size_t size);
virtual size_t getDataCallbackPos() const;
protected: protected:
HTTP::URIReader inFile;
Util::ResizeablePointer readBuffer;
uint64_t readBufferOffset;
uint64_t readPos;
bool firstRead;
virtual size_t streamByteCount(){ virtual size_t streamByteCount(){
return totalBytes; return totalBytes;
}; // For live streams: to update the stats with correct values. }; // For live streams: to update the stats with correct values.
@ -140,8 +151,6 @@ namespace Mist{
void getNext(size_t idx = INVALID_TRACK_ID); void getNext(size_t idx = INVALID_TRACK_ID);
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);
void clearPredictors(); void clearPredictors();
FILE *inFile;
Util::ResizeablePointer ptr;
bool readingMinimal; bool readingMinimal;
uint64_t lastClusterBPos; uint64_t lastClusterBPos;
uint64_t lastClusterTime; uint64_t lastClusterTime;

View file

@ -59,7 +59,7 @@ namespace Mist{
} }
} }
void setInFile(int stdin_val){ void setInFile(int stdin_val){
inFile = fdopen(stdin_val, "r"); inFile.open(stdin_val);
streamName = opt["sink"].asString(); streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();} if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString()); Util::streamVariables(streamName, opt["source"].asString());

View file

@ -443,7 +443,7 @@ namespace Mist{
} }
void EncodeInputEBML::setInFile(int stdin_val){ void EncodeInputEBML::setInFile(int stdin_val){
inFile = fdopen(stdin_val, "r"); inFile.open(stdin_val);
streamName = opt["sink"].asString(); streamName = opt["sink"].asString();
if (!streamName.size()){streamName = opt["source"].asString();} if (!streamName.size()){streamName = opt["source"].asString();}
Util::streamVariables(streamName, opt["source"].asString()); Util::streamVariables(streamName, opt["source"].asString());