diff --git a/lib/urireader.cpp b/lib/urireader.cpp index ffc40c94..72496802 100644 --- a/lib/urireader.cpp +++ b/lib/urireader.cpp @@ -108,6 +108,18 @@ namespace HTTP{ size_t URIReader::getDataCallbackPos() const{return allData.size();} + bool URIReader::open(const int fd){ + close(); + myURI = HTTP::URL("file://-"); + originalUrl = myURI; + downer.getSocket().open(-1, fd); + stateType = HTTP::Stream; + startPos = 0; + endPos = std::string::npos; + totalSize = std::string::npos; + return true; + } + bool URIReader::open(const HTTP::URL &uri){ close(); myURI = uri; @@ -317,15 +329,17 @@ namespace HTTP{ }else if (stateType == HTTP::HTTP){ downer.continueNonBlocking(cb); }else{// streaming mode - int s; - if ((downer.getSocket() && downer.getSocket().spool())){// || downer.getSocket().Received().size() > 0){ - s = downer.getSocket().Received().bytes(wantedLen); - std::string buf = downer.getSocket().Received().remove(s); - - cb.dataCallback(buf.data(), s); - }else{ - Util::sleep(50); + int s = downer.getSocket().Received().bytes(wantedLen); + if (!s){ + if (downer.getSocket() && downer.getSocket().spool()){ + s = downer.getSocket().Received().bytes(wantedLen); + }else{ + Util::sleep(50); + return; + } } + std::string buf = downer.getSocket().Received().remove(s); + cb.dataCallback(buf.data(), s); } } diff --git a/lib/urireader.h b/lib/urireader.h index 1d4da390..dc904218 100644 --- a/lib/urireader.h +++ b/lib/urireader.h @@ -20,6 +20,8 @@ namespace HTTP{ /// Calls open on the given relative uri during construction /// URI is resolved relative to the current working directory URIReader(const std::string &reluri); + /// Sets the internal URI to file://- and opens the given file descriptor in stream mode. + bool open(const int fd); /// Sets the internal URI to the given URI and opens it, whatever that may mean for the given URI type. bool open(const HTTP::URL &uri); /// Links the internal URI to the given relative URI and opens it, whatever that may mean for the current URI type. diff --git a/src/input/input.cpp b/src/input/input.cpp index 16bae077..c76bc185 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -41,7 +41,7 @@ namespace Mist{ //Exactly! I thought not! So, if the end key number == the first, we increase by one. if (endKey == key){++endKey;} if (endKey > key + 1000){endKey = key + 1000;} - DONTEVEN_MSG("User with ID:%zu is on key %zu->%zu (timestamp %" PRIu64 ")", id, key, endKey, time); + DONTEVEN_MSG("User with ID:%zu is on %zu:%zu -> %zu (timestamp %" PRIu64 ")", id, track, key, endKey, time); for (size_t i = key; i <= endKey; ){ @@ -55,10 +55,11 @@ namespace Mist{ pageIdx = j; } uint32_t pageNumber = tPages.getInt("firstkey", pageIdx); - if (i == key){ + uint64_t pageTime = M.getTimeForKeyIndex(track, pageNumber); + if (pageTime < time){ keyLoadPriority[trackKey(track, pageNumber)] += 10000; }else{ - keyLoadPriority[trackKey(track, pageNumber)] += 1000 - (i - key); + keyLoadPriority[trackKey(track, pageNumber)] += 600 - (pageTime - time) / 1000; } uint64_t cnt = tPages.getInt("keycount", pageIdx); if (pageNumber + cnt <= i){return;} diff --git a/src/input/input_ebml.cpp b/src/input/input_ebml.cpp index 5b854495..64abb994 100644 --- a/src/input/input_ebml.cpp +++ b/src/input/input_ebml.cpp @@ -15,6 +15,26 @@ namespace Mist{ capa["source_match"].append("/*.mk3d"); capa["source_match"].append("/*.mks"); capa["source_match"].append("/*.webm"); + capa["source_match"].append("http://*.mkv"); + capa["source_match"].append("http://*.mka"); + capa["source_match"].append("http://*.mk3d"); + capa["source_match"].append("http://*.mks"); + capa["source_match"].append("http://*.webm"); + capa["source_match"].append("https://*.mkv"); + capa["source_match"].append("https://*.mka"); + capa["source_match"].append("https://*.mk3d"); + capa["source_match"].append("https://*.mks"); + capa["source_match"].append("https://*.webm"); + capa["source_match"].append("s3+http://*.mkv"); + capa["source_match"].append("s3+http://*.mka"); + capa["source_match"].append("s3+http://*.mk3d"); + capa["source_match"].append("s3+http://*.mks"); + capa["source_match"].append("s3+http://*.webm"); + capa["source_match"].append("s3+https://*.mkv"); + capa["source_match"].append("s3+https://*.mka"); + capa["source_match"].append("s3+https://*.mk3d"); + capa["source_match"].append("s3+https://*.mks"); + capa["source_match"].append("s3+https://*.webm"); capa["source_match"].append("mkv-exec:*"); capa["always_match"].append("mkv-exec:*"); capa["source_file"] = "$source"; @@ -45,6 +65,10 @@ namespace Mist{ bufferedPacks = 0; wantBlocks = true; totalBytes = 0; + readBufferOffset = 0; + readPos = 0; + readingMinimal = true; + firstRead = true; } std::string ASStoSRT(const char *ptr, uint32_t len){ @@ -94,7 +118,8 @@ namespace Mist{ } bool InputEBML::needsLock(){ - // Standard input requires no lock, otherwise default behaviour. + // Streamed input requires no lock, non-streamed does + if (!standAlone){return false;} if (config->getString("input") == "-" || config->getString("input").substr(0, 9) == "mkv-exec:"){return false;} return Input::needsLock(); } @@ -127,57 +152,86 @@ namespace Mist{ Util::Procs::StartPiped(args, &fin, &fout, 0); if (fout == -1){return false;} dup2(fout, 0); - inFile = stdin; + inFile.open(0); return true; } if (config->getString("input") == "-"){ - inFile = stdin; + standAlone = false; + inFile.open(0); }else{ // open File - inFile = fopen(config->getString("input").c_str(), "r"); + inFile.open(config->getString("input")); if (!inFile){return false;} + standAlone = inFile.isSeekable(); } return true; } + void InputEBML::dataCallback(const char *ptr, size_t size){ + readBuffer.append(ptr, size); + totalBytes += size; + } + size_t InputEBML::getDataCallbackPos() const{return readPos + readBuffer.size();} + bool InputEBML::readElement(){ - ptr.truncate(0); - readingMinimal = true; - uint32_t needed = EBML::Element::needBytes(ptr, ptr.size(), readingMinimal); - while (ptr.size() < needed && config->is_active){ - if (!ptr.allocate(needed)){return false;} - int64_t toRead = needed - ptr.size(); - int readResult = 0; - while (!readResult){ - readResult = fread(ptr + ptr.size(), toRead, 1, inFile); - if (!readResult){ - if (errno == EINTR){ - continue; - } - // At EOF we don't print a warning - if (!feof(inFile)){ - FAIL_MSG("Could not read more data! (have %zu, need %" PRIu32 ")", ptr.size(), needed); - } - return false; - } - ptr.append(0, toRead); - } - totalBytes += toRead; - needed = EBML::Element::needBytes(ptr, ptr.size(), readingMinimal); - if (ptr.size() >= needed){ + uint32_t needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal); + if (!firstRead && readBuffer.size() >= needed + readBufferOffset){ + readBufferOffset += needed; + needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal); + readingMinimal = true; + if (readBuffer.size() >= needed + readBufferOffset){ // Make sure TrackEntry types are read whole - if (readingMinimal && EBML::Element(ptr).getID() == EBML::EID_TRACKENTRY){ + if (readingMinimal && EBML::Element(readBuffer + readBufferOffset).getID() == EBML::EID_TRACKENTRY){ readingMinimal = false; - needed = EBML::Element::needBytes(ptr, ptr.size(), readingMinimal); + needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal); } } } - EBML::Element E(ptr); + + while (readBuffer.size() < needed + readBufferOffset && config->is_active){ + if (!readBuffer.allocate(needed + readBufferOffset)){return false;} + if (!inFile){return false;} + int64_t toRead = needed - readBuffer.size() + readBufferOffset; + + if (standAlone){ + //If we have more than 10MiB buffered and are more than 10MiB into the buffer, shift the first 4MiB off the buffer. + //This prevents infinite growth of the read buffer for large files, but allows for some re-use of data. + if (readBuffer.size() >= 10*1024*1024 && readBufferOffset > 10*1024*1024){ + readBuffer.shift(4*1024*1024); + readBufferOffset -= 4*1024*1024; + readPos += 4*1024*1024; + } + }else{ + //For non-standalone mode, we know we're always live streaming, and can always cut off what we've shifted + if (readBufferOffset){ + readBuffer.shift(readBufferOffset); + readPos += readBufferOffset; + readBufferOffset = 0; + } + } + + size_t preSize = readBuffer.size(); + inFile.readSome(toRead, *this); + if (readBuffer.size() == preSize){ + Util::sleep(5); + continue; + } + + needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal); + if (readBuffer.size() >= needed + readBufferOffset){ + // Make sure TrackEntry types are read whole + if (readingMinimal && EBML::Element(readBuffer + readBufferOffset).getID() == EBML::EID_TRACKENTRY){ + readingMinimal = false; + needed = EBML::Element::needBytes(readBuffer + readBufferOffset, readBuffer.size() - readBufferOffset, readingMinimal); + } + } + } + EBML::Element E(readBuffer + readBufferOffset); if (E.getID() == EBML::EID_CLUSTER){ - if (inFile == stdin){ + if (!inFile.isSeekable()){ lastClusterBPos = 0; }else{ - int64_t bp = Util::ftell(inFile); + int64_t bp = readPos + readBufferOffset; if (bp == -1 && errno == ESPIPE){ lastClusterBPos = 0; }else{ @@ -190,6 +244,7 @@ namespace Mist{ lastClusterTime = E.getValUInt(); DONTEVEN_MSG("Cluster time %" PRIu64 " ms", lastClusterTime); } + firstRead = false; return true; } @@ -216,7 +271,7 @@ namespace Mist{ WARN_MSG("Aborting header generation due to shutdown: %s", Util::exitReason); return false; } - EBML::Element E(ptr, readingMinimal); + EBML::Element E(readBuffer + readBufferOffset, readingMinimal); if (E.getID() == EBML::EID_TRACKENTRY){ EBML::Element tmpElem = E.findChild(EBML::EID_TRACKNUMBER); if (!tmpElem){ @@ -392,9 +447,15 @@ namespace Mist{ timeScale = ((double)timeScaleVal) / 1000000.0; } // Live streams stop parsing the header as soon as the first Cluster is encountered - if (E.getID() == EBML::EID_CLUSTER && !needsLock()){return true;} + if (E.getID() == EBML::EID_CLUSTER){ + if (!needsLock()){return true;} + //Set progress counter for non-live inputs + if (streamStatus && streamStatus.len > 1 && inFile.getSize()){ + streamStatus.mapped[1] = (255 * (readPos + readBufferOffset)) / inFile.getSize(); + } + } if (E.getType() == EBML::ELEM_BLOCK){ - EBML::Block B(ptr); + EBML::Block B(readBuffer + readBufferOffset); uint64_t tNum = B.getTrackNum(); uint64_t newTime = lastClusterTime + B.getTimecode(); trackPredictor &TP = packBuf[tNum]; @@ -414,15 +475,15 @@ namespace Mist{ for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){ if (frameNo){ if (M.getCodec(idx) == "AAC"){ - newTime += (1000000 / M.getRate(idx)) / timeScale; // assume ~1000 samples per frame + newTime += (uint64_t)(1000000 / M.getRate(idx)) / timeScale; // assume ~1000 samples per frame }else if (M.getCodec(idx) == "MP3"){ - newTime += (1152000 / M.getRate(idx)) / timeScale; // 1152 samples per frame + newTime += (uint64_t)(1152000 / M.getRate(idx)) / timeScale; // 1152 samples per frame }else if (M.getCodec(idx) == "DTS"){ // Assume 512 samples per frame (DVD default) // actual amount can be calculated from data, but data // is not available during header generation... // See: http://www.stnsoft.com/DVD/dtshdr.html - newTime += (512000 / M.getRate(idx)) / timeScale; + newTime += (uint64_t)(512000 / M.getRate(idx)) / timeScale; }else{ newTime += 1 / timeScale; ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!", @@ -538,6 +599,7 @@ namespace Mist{ EBML::Block B; if (wantBlocks){ do{ + if (!config->is_active){return;} if (!readElement()){ // Make sure we empty our buffer first if (bufferedPacks && packBuf.size()){ @@ -557,11 +619,11 @@ namespace Mist{ thisPacket.null(); return; } - B = EBML::Block(ptr); + B = EBML::Block(readBuffer + readBufferOffset); }while (!B || B.getType() != EBML::ELEM_BLOCK || (singleTrack && wantedID != B.getTrackNum())); }else{ - B = EBML::Block(ptr); + B = EBML::Block(readBuffer + readBufferOffset); } uint64_t tNum = B.getTrackNum(); @@ -590,15 +652,15 @@ namespace Mist{ for (uint64_t frameNo = 0; frameNo < B.getFrameCount(); ++frameNo){ if (frameNo){ if (M.getCodec(thisIdx) == "AAC"){ - newTime += (1000000 / M.getRate(thisIdx)) / timeScale; // assume ~1000 samples per frame + newTime += (uint64_t)(1000000 / M.getRate(thisIdx)) / timeScale; // assume ~1000 samples per frame }else if (M.getCodec(thisIdx) == "MP3"){ - newTime += (1152000 / M.getRate(thisIdx)) / timeScale; // 1152 samples per frame + newTime += (uint64_t)(1152000 / M.getRate(thisIdx)) / timeScale; // 1152 samples per frame }else if (M.getCodec(thisIdx) == "DTS"){ // Assume 512 samples per frame (DVD default) // actual amount can be calculated from data, but data // is not available during header generation... // See: http://www.stnsoft.com/DVD/dtshdr.html - newTime += (512000 / M.getRate(thisIdx)) / timeScale; + newTime += (uint64_t)(512000 / M.getRate(thisIdx)) / timeScale; }else{ ERROR_MSG("Unknown frame duration for codec %s - timestamps WILL be wrong!", M.getCodec(thisIdx).c_str()); @@ -647,7 +709,34 @@ namespace Mist{ DONTEVEN_MSG("Seeking to %" PRIu64 ", found %" PRIu64 "...", seekTime, keys.getTime(i)); seekPos = keys.getBpos(i); } - Util::fseek(inFile, seekPos, SEEK_SET); + + + firstRead = true; + if (readPos > seekPos || seekPos > readPos + readBuffer.size() + 4*1024*1024){ + readBuffer.truncate(0); + readBufferOffset = 0; + if (!inFile.seek(seekPos)){ + FAIL_MSG("Seek to %" PRIu64 " failed! Aborting load", seekPos); + } + readPos = inFile.getPos(); + }else{ + while (seekPos > readPos + readBuffer.size() && config->is_active){ + size_t preSize = readBuffer.size(); + inFile.readSome(seekPos - (readPos + readBuffer.size()), *this); + if (readBuffer.size() == preSize){ + Util::sleep(5); + } + } + if (seekPos > readPos + readBuffer.size()){ + Util::logExitReason("Input file seek abort"); + config->is_active = false; + readBufferOffset = 0; + return; + } + readBufferOffset = seekPos - readPos; + } + + } /// Flushes all trackPredictors without deleting permanent data from them. diff --git a/src/input/input_ebml.h b/src/input/input_ebml.h index e7400954..1fbc6dbc 100644 --- a/src/input/input_ebml.h +++ b/src/input/input_ebml.h @@ -1,6 +1,7 @@ #pragma once #include "input.h" #include +#include namespace Mist{ @@ -122,12 +123,22 @@ namespace Mist{ } }; - class InputEBML : public Input{ + class InputEBML : public Input, public Util::DataCallback{ public: InputEBML(Util::Config *cfg); bool needsLock(); + virtual bool isSingular(){return standAlone && !config->getBool("realtime");} + virtual void dataCallback(const char *ptr, size_t size); + virtual size_t getDataCallbackPos() const; protected: + + HTTP::URIReader inFile; + Util::ResizeablePointer readBuffer; + uint64_t readBufferOffset; + uint64_t readPos; + bool firstRead; + virtual size_t streamByteCount(){ return totalBytes; }; // For live streams: to update the stats with correct values. @@ -140,8 +151,6 @@ namespace Mist{ void getNext(size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); void clearPredictors(); - FILE *inFile; - Util::ResizeablePointer ptr; bool readingMinimal; uint64_t lastClusterBPos; uint64_t lastClusterTime; diff --git a/src/process/process_exec.cpp b/src/process/process_exec.cpp index c19b8201..e89c2444 100644 --- a/src/process/process_exec.cpp +++ b/src/process/process_exec.cpp @@ -59,7 +59,7 @@ namespace Mist{ } } void setInFile(int stdin_val){ - inFile = fdopen(stdin_val, "r"); + inFile.open(stdin_val); streamName = opt["sink"].asString(); if (!streamName.size()){streamName = opt["source"].asString();} Util::streamVariables(streamName, opt["source"].asString()); diff --git a/src/process/process_ffmpeg.cpp b/src/process/process_ffmpeg.cpp index dad6f687..67141cea 100644 --- a/src/process/process_ffmpeg.cpp +++ b/src/process/process_ffmpeg.cpp @@ -443,7 +443,7 @@ namespace Mist{ } void EncodeInputEBML::setInFile(int stdin_val){ - inFile = fdopen(stdin_val, "r"); + inFile.open(stdin_val); streamName = opt["sink"].asString(); if (!streamName.size()){streamName = opt["source"].asString();} Util::streamVariables(streamName, opt["source"].asString());