From f29d48154fd8893a2f3a690b9d74ce678128bb47 Mon Sep 17 00:00:00 2001 From: Thulinma Date: Mon, 10 Apr 2023 03:46:27 +0200 Subject: [PATCH] URIReader support for TS input --- lib/ts_stream.cpp | 48 ++++--- lib/ts_stream.h | 3 +- src/input/input_ts.cpp | 286 ++++++++++++++++++++--------------------- src/input/input_ts.h | 27 ++-- 4 files changed, 191 insertions(+), 173 deletions(-) diff --git a/lib/ts_stream.cpp b/lib/ts_stream.cpp index db1c45ac..38b224f4 100644 --- a/lib/ts_stream.cpp +++ b/lib/ts_stream.cpp @@ -15,26 +15,34 @@ tthread::recursive_mutex tMutex; namespace TS{ - bool Assembler::assemble(Stream & TSStrm, char * ptr, size_t len, bool parse){ + bool Assembler::assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse, uint64_t bytePos){ bool ret = false; size_t offset = 0; size_t amount = 188-leftData.size(); - if (leftData.size() && len >= amount){ - //Attempt to re-assemble a packet from the leftovers of last time + current head - if (len == amount || ptr[amount] == 0x47){ - VERYHIGH_MSG("Assembled scrap packet"); - //Success! - leftData.append(ptr, amount); - tsBuf.FromPointer(leftData); - if (!ret && tsBuf.getUnitStart()){ret = true;} - if (parse){ - TSStrm.parse(tsBuf, 0); - }else{ - TSStrm.add(tsBuf); - if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} + if (leftData.size()){ + if (len >= amount){ + //Attempt to re-assemble a packet from the leftovers of last time + current head + if (len == amount || ptr[amount] == 0x47){ + VERYHIGH_MSG("Assembled scrap packet"); + //Success! + bytePos -= leftData.size(); + leftData.append(ptr, amount); + tsBuf.FromPointer(leftData); + if (!ret && tsBuf.getUnitStart()){ret = true;} + if (parse){ + TSStrm.parse(tsBuf, bytePos); + }else{ + TSStrm.add(tsBuf); + if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} + } + offset = amount; + bytePos += 188; + leftData.truncate(0); } - offset = amount; - leftData.assign(0,0); + }else{ + //No way to verify, we'll just append and hope for the best... + leftData.append(ptr, len); + return ret; } //On failure, hope we might live to succeed another day } @@ -51,7 +59,7 @@ namespace TS{ tsBuf.FromPointer(ptr + offset); if (!ret && tsBuf.getUnitStart()){ret = true;} if (parse){ - TSStrm.parse(tsBuf, 0); + TSStrm.parse(tsBuf, bytePos); }else{ TSStrm.add(tsBuf); if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} @@ -59,15 +67,21 @@ namespace TS{ }else{ leftData.assign(ptr + offset, len - offset); } + bytePos += 188; offset += 188; }else{ ++junk; ++offset; + ++bytePos; } } return ret; } + void Assembler::clear(){ + leftData.truncate(0); + } + void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, uint32_t avail, uint64_t bPos){ if (!p.getCompleteSize()){return;} diff --git a/lib/ts_stream.h b/lib/ts_stream.h index 0cc1da7e..aef91ed8 100644 --- a/lib/ts_stream.h +++ b/lib/ts_stream.h @@ -112,7 +112,8 @@ namespace TS{ class Assembler{ public: - bool assemble(Stream & TSStrm, char * ptr, size_t len, bool parse = false); + bool assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse = false, uint64_t bytePos = 0); + void clear(); private: Util::ResizeablePointer leftData; TS::Packet tsBuf; diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 18b8572d..36a0cae5 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -163,8 +162,11 @@ namespace Mist{ /// \arg cfg Util::Config that contains all current configurations. inputTS::inputTS(Util::Config *cfg) : Input(cfg){ rawMode = false; + udpMode = false; rawIdx = INVALID_TRACK_ID; lastRawPacket = 0; + readPos = 0; + unitStartSeen = false; capa["name"] = "TS"; capa["desc"] = "This input allows you to stream MPEG2-TS data from static files (/*.ts), streamed files " @@ -180,6 +182,8 @@ namespace Mist{ capa["source_match"].append("http-ts://*"); capa["source_match"].append("https://*.ts"); capa["source_match"].append("https-ts://*"); + capa["source_match"].append("s3+http://*.ts"); + capa["source_match"].append("s3+https://*.ts"); // These can/may be set to always-on mode capa["always_match"].append("stream://*.ts"); capa["always_match"].append("tsudp://*"); @@ -188,6 +192,8 @@ namespace Mist{ capa["always_match"].append("http-ts://*"); capa["always_match"].append("https://*.ts"); capa["always_match"].append("https-ts://*"); + capa["always_match"].append("s3+http://*.ts"); + capa["always_match"].append("s3+https://*.ts"); capa["incoming_push_url"] = "udp://$host:$port"; capa["incoming_push_url_match"] = "tsudp://*"; capa["priority"] = 9; @@ -199,7 +205,6 @@ namespace Mist{ capa["codecs"]["audio"].append("MP2"); capa["codecs"]["audio"].append("opus"); capa["codecs"]["passthrough"].append("rawts"); - inFile = NULL; inputProcess = 0; isFinished = false; @@ -256,8 +261,6 @@ namespace Mist{ } inputTS::~inputTS(){ - if (inFile){fclose(inFile);} - if (tcpCon){tcpCon.close();} if (!standAlone){ tthread::lock_guard guard(threadClaimMutex); threadTimer.clear(); @@ -265,6 +268,8 @@ namespace Mist{ } } + bool skipPipes = false; + bool inputTS::checkArguments(){ if (config->getString("input").substr(0, 6) == "srt://"){ std::string source = config->getString("input"); @@ -272,40 +277,51 @@ namespace Mist{ config->getOption("input", true).append("ts-exec:srt-live-transmit " + srtUrl.getUrl() + " file://con"); INFO_MSG("Rewriting SRT source '%s' to '%s'", source.c_str(), config->getString("input").c_str()); } + // We call preRun early and, if successful, close the opened reader. + // This is to ensure we have udpMode/rawMode/standAlone all set properly before the first call to needsLock. + // The reader must be closed so that the angel process does not have a reader open. + // There is no need to close a potential UDP socket, since that doesn't get opened in preRun just yet. + skipPipes = true; + if (!preRun()){return false;} + skipPipes = false; + reader.close(); return true; } /// Live Setup of TS Input bool inputTS::preRun(){ - INFO_MSG("Prerun: %s", config->getString("input").c_str()); - + std::string const inCfg = config->getString("input"); + udpMode = false; rawMode = config->getBool("raw"); if (rawMode){INFO_MSG("Entering raw mode");} + // UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) + if (inCfg.substr(0, 8) == "tsudp://"){ + standAlone = false; + udpMode = true; + return true; + } // streamed standard input - if (config->getString("input") == "-"){ + if (inCfg == "-"){ standAlone = false; - tcpCon.open(fileno(stdout), fileno(stdin)); - return true; + if (skipPipes){return true;} + reader.open(0); + return reader; } - if (config->getString("input").substr(0, 7) == "http://" || - config->getString("input").substr(0, 10) == "http-ts://" || - config->getString("input").substr(0, 8) == "https://" || - config->getString("input").substr(0, 11) == "https-ts://"){ + //file descriptor input + if (inCfg.substr(0, 5) == "fd://"){ standAlone = false; - HTTP::URL url(config->getString("input")); - if (url.protocol == "http-ts"){url.protocol = "http";} - if (url.protocol == "https-ts"){url.protocol = "https";} - HTTP::Downloader DL; - DL.getHTTP().headerOnly = true; - if (!DL.get(url)){return false;} - tcpCon = DL.getSocket(); - DL.getSocket().drop(); // Prevent shutdown of connection, keeping copy of socket open - return true; + if (skipPipes){return true;} + int fd = atoi(inCfg.c_str() + 5); + INFO_MSG("Opening file descriptor %s (%d)", inCfg.c_str(), fd); + reader.open(fd); + return reader; } - if (config->getString("input").substr(0, 8) == "ts-exec:"){ + //ts-exec: input + if (inCfg.substr(0, 8) == "ts-exec:"){ standAlone = false; - std::string input = config->getString("input").substr(8); + if (skipPipes){return true;} + std::string input = inCfg.substr(8); char *args[128]; uint8_t argCnt = 0; char *startCh = 0; @@ -328,32 +344,36 @@ namespace Mist{ int fin = -1, fout = -1; inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); - tcpCon.open(-1, fout); - return true; + reader.open(fout); + return reader; } // streamed file - if (config->getString("input").substr(0, 9) == "stream://"){ - inFile = fopen(config->getString("input").c_str() + 9, "r"); - tcpCon.open(-1, fileno(inFile)); + if (inCfg.substr(0, 9) == "stream://"){ + reader.open(inCfg.substr(9)); + FILE * inFile = fopen(inCfg.c_str() + 9, "r"); + reader.open(fileno(inFile)); standAlone = false; return inFile; } - //file descriptor input - if (config->getString("input").substr(0, 5) == "fd://"){ - int fd = atoi(config->getString("input").c_str() + 5); - INFO_MSG("Opening file descriptor %s (%d)", config->getString("input").c_str(), fd); - tcpCon.open(-1, fd); - standAlone = false; - return tcpCon; + //Anything else, read through URIReader + HTTP::URL url = HTTP::localURIResolver().link(inCfg); + if (url.protocol == "http-ts"){url.protocol = "http";} + if (url.protocol == "https-ts"){url.protocol = "https";} + reader.open(url); + standAlone = reader.isSeekable(); + return reader; + } + + void inputTS::dataCallback(const char *ptr, size_t size){ + if (standAlone){ + unitStartSeen |= assembler.assemble(tsStream, ptr, size, true, readPos); + }else{ + liveReadBuffer.append(ptr, size); } - // UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) - if (config->getString("input").substr(0, 8) == "tsudp://"){ - standAlone = false; - return true; - } - // plain VoD file - inFile = fopen(config->getString("input").c_str(), "r"); - return inFile; + readPos += size; + } + size_t inputTS::getDataCallbackPos() const{ + return readPos; } bool inputTS::needHeader(){ @@ -366,19 +386,18 @@ namespace Mist{ /// It encounters a new PES start, it writes the currently found PES data /// for a specific track to metadata. After the entire stream has been read, /// it writes the remaining metadata. - ///\todo Find errors, perhaps parts can be made more modular bool inputTS::readHeader(){ - if (!inFile){return false;} + if (!reader){return false;} meta.reInit(isSingular() ? streamName : ""); TS::Packet packet; // to analyse and extract data DTSC::Packet headerPack; - fseek(inFile, 0, SEEK_SET); // seek to beginning - uint64_t lastBpos = 0; - while (packet.FromFile(inFile) && !feof(inFile)){ - tsStream.parse(packet, lastBpos); - lastBpos = Util::ftell(inFile); - if (packet.getUnitStart()){ + while (!reader.isEOF()){ + uint64_t prePos = readPos; + reader.readSome(188, *this); + if (readPos == prePos){Util::sleep(50);} + if (unitStartSeen){ + unitStartSeen = false; while (tsStream.hasPacketOnEachTrack()){ tsStream.getEarliestPacket(headerPack); size_t pid = headerPack.getTrackId(); @@ -393,10 +412,14 @@ namespace Mist{ meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen, headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen()); } + //Set progress counter + if (streamStatus && streamStatus.len > 1 && reader.getSize()){ + streamStatus.mapped[1] = (255 * readPos) / reader.getSize(); + } } } tsStream.finish(); - INFO_MSG("Reached %s at %" PRIu64 " bytes", feof(inFile) ? "EOF" : "error", lastBpos); + INFO_MSG("Reached %s at %" PRIu64 " bytes", reader.isEOF() ? "EOF" : "error", readPos); while (tsStream.hasPacket()){ tsStream.getEarliestPacket(headerPack); size_t pid = headerPack.getTrackId(); @@ -411,8 +434,6 @@ namespace Mist{ meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen, headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen()); } - - fseek(inFile, 0, SEEK_SET); return true; } @@ -425,17 +446,17 @@ namespace Mist{ INSANE_MSG("Getting next on track %zu", idx); thisPacket.null(); bool hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); - while (!hasPacket && !feof(inFile) && + while (!hasPacket && !reader.isEOF() && (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active){ - tsBuf.FromFile(inFile); - if (idx == INVALID_TRACK_ID || pid == tsBuf.getPID()){ - tsStream.parse(tsBuf, 0); // bPos == 0 - if (tsBuf.getUnitStart()){ - hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); - } + uint64_t prePos = readPos; + reader.readSome(188, *this); + if (readPos == prePos){Util::sleep(50);} + if (unitStartSeen){ + unitStartSeen = false; + hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); } } - if (feof(inFile)){ + if (reader.isEOF()){ if (!isFinished){ tsStream.finish(); isFinished = true; @@ -459,33 +480,23 @@ namespace Mist{ if (thisIdx == INVALID_TRACK_ID){getNext(idx);} } - void inputTS::readPMT(){ - // save current file position - uint64_t bpos = Util::ftell(inFile); - if (fseek(inFile, 0, SEEK_SET)){ - FAIL_MSG("Seek to 0 failed"); - return; - } + /// Guarantees the PMT is read and we know about all tracks. + void inputTS::postHeader(){ + if (!standAlone){return;} + tsStream.clear(); + assembler.clear(); + reader.seek(0); + readPos = reader.getPos(); - TS::Packet tsBuffer; - while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)){ - tsStream.parse(tsBuffer, 0); - } - - // Clear leaves the PMT in place - tsStream.partialClear(); - - // Restore original file position - if (Util::fseek(inFile, bpos, SEEK_SET)){ - clearerr(inFile); - return; + while (!tsStream.hasPacketOnEachTrack()){ + uint64_t prePos = readPos; + reader.readSome(188, *this); + if (readPos == prePos){Util::sleep(50);} } } /// Seeks to a specific time void inputTS::seek(uint64_t seekTime, size_t idx){ - tsStream.clear(); - readPMT(); uint64_t seekPos = 0xFFFFFFFFull; if (idx != INVALID_TRACK_ID){ uint32_t keyNum = M.getKeyNumForTime(idx, seekTime); @@ -500,56 +511,48 @@ namespace Mist{ if (thisBPos < seekPos){seekPos = thisBPos;} } } - clearerr(inFile); - Util::fseek(inFile, seekPos, SEEK_SET); // seek to the correct position + isFinished = false; + tsStream.partialClear(); + assembler.clear(); + reader.seek(seekPos); + readPos = reader.getPos(); } bool inputTS::openStreamSource(){ - const std::string &inpt = config->getString("input"); - if (inpt.substr(0, 8) == "tsudp://"){ - HTTP::URL input_url(inpt); - udpCon.setBlocking(false); - udpCon.bind(input_url.getPort(), input_url.host, input_url.path); - if (udpCon.getSock() == -1){ - FAIL_MSG("Could not open UDP socket. Aborting."); - return false; - } - } - return true; - } - - void inputTS::parseStreamHeader(){ - // Placeholder empty track to force normal code to continue despite no tracks available - tmpIdx = meta.addTrack(0, 0, 0, 0); + //Non-UDP mode inputs were already opened in preRun() + if (!udpMode){return reader;} + HTTP::URL input_url(config->getString("input")); + udpCon.setBlocking(false); + udpCon.bind(input_url.getPort(), input_url.host, input_url.path); + // This line assures memory for destination address is allocated, so we can fill it during receive later + udpCon.allocateDestination(); + return (udpCon.getSock() != -1); } void inputTS::streamMainLoop(){ - meta.removeTrack(tmpIdx); - INFO_MSG("Removed temptrack %zu", tmpIdx); Comms::Connections statComm; - uint64_t downCounter = 0; uint64_t startTime = Util::bootSecs(); uint64_t noDataSince = Util::bootSecs(); bool gettingData = false; bool hasStarted = false; cfgPointer = config; globalStreamName = streamName; - Util::ResizeablePointer newData; unsigned long long threadCheckTimer = Util::bootSecs(); while (config->is_active){ - if (tcpCon){ - if (tcpCon.spool()){ - while (tcpCon.Received().available(188)){ - while (tcpCon.Received().get()[0] != 0x47 && tcpCon.Received().available(188)){ - tcpCon.Received().remove(1); + if (!udpMode){ + uint64_t prePos = readPos; + reader.readSome(188, *this); + if (readPos == prePos){ + Util::sleep(50); + }else{ + while (liveReadBuffer.size() >= 188){ + while (liveReadBuffer[0] != 0x47 && liveReadBuffer.size() >= 188){ + liveReadBuffer.shift(1); } - if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){ - newData.truncate(0); - tcpCon.Received().remove(newData, 188); + if (liveReadBuffer.size() >= 188 && liveReadBuffer[0] == 0x47){ if (rawMode){ keepAlive(); - rawBuffer.append(newData, newData.size()); - if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ + if (liveReadBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ if (rawIdx == INVALID_TRACK_ID){ rawIdx = meta.addTrack(); meta.setType(rawIdx, "meta"); @@ -558,23 +561,27 @@ namespace Mist{ userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); } uint64_t packetTime = Util::bootMS(); - thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); + uint64_t packetLen = (liveReadBuffer.size() / 188) * 188; + thisPacket.genericFill(packetTime, 0, 1, liveReadBuffer, packetLen, 0, 0); bufferLivePacket(thisPacket); lastRawPacket = packetTime; - rawBuffer.truncate(0); + liveReadBuffer.shift(packetLen); } }else { - tsBuf.FromPointer(newData); - liveStream.add(tsBuf); - if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} + size_t shiftAmount = 0; + for (size_t offset = 0; liveReadBuffer.size() >= offset + 188; offset += 188){ + tsBuf.FromPointer(liveReadBuffer + offset); + liveStream.add(tsBuf); + if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} + shiftAmount += 188; + } + liveReadBuffer.shift(shiftAmount); } } } noDataSince = Util::bootSecs(); - }else{ - Util::sleep(100); } - if (!tcpCon){ + if (!reader){ config->is_active = false; Util::logExitReason("end of streamed input"); return; @@ -582,7 +589,7 @@ namespace Mist{ }else{ bool received = false; while (udpCon.Receive()){ - downCounter += udpCon.data.size(); + readPos += udpCon.data.size(); received = true; if (!gettingData){ gettingData = true; @@ -590,8 +597,8 @@ namespace Mist{ } if (rawMode){ keepAlive(); - rawBuffer.append(udpCon.data, udpCon.data.size()); - if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ + liveReadBuffer.append(udpCon.data, udpCon.data.size()); + if (liveReadBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ if (rawIdx == INVALID_TRACK_ID){ rawIdx = meta.addTrack(); meta.setType(rawIdx, "meta"); @@ -600,10 +607,10 @@ namespace Mist{ userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); } uint64_t packetTime = Util::bootMS(); - thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); + thisPacket.genericFill(packetTime, 0, 1, liveReadBuffer, liveReadBuffer.size(), 0, 0); bufferLivePacket(thisPacket); lastRawPacket = packetTime; - rawBuffer.truncate(0); + liveReadBuffer.truncate(0); } }else{ assembler.assemble(liveStream, udpCon.data, udpCon.data.size()); @@ -621,8 +628,10 @@ namespace Mist{ } // Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 1){ - // Connect to stats for INPUT detection - statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), ""); + if (!statComm && gettingData){ + // Connect to stats for INPUT detection + statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), ""); + } if (statComm){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ config->is_active = false; @@ -634,7 +643,7 @@ namespace Mist{ statComm.setStream(streamName); statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setUp(0); - statComm.setDown(downCounter + tcpCon.dataDown()); + statComm.setDown(readPos); statComm.setTime(now - startTime); statComm.setLastSecond(0); } @@ -701,18 +710,5 @@ namespace Mist{ }while (threadCount); } - bool inputTS::needsLock(){ - // we already know no lock will be needed - if (!standAlone){return false;} - // otherwise, check input param - const std::string &inpt = config->getString("input"); - if (inpt.size() && inpt != "-" && inpt.substr(0, 9) != "stream://" && inpt.substr(0, 8) != "tsudp://" && - inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 6) != "srt://" && - inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://" && - inpt.substr(0, 8) != "https://" && inpt.substr(0, 11) != "https-ts://"){ - return Input::needsLock(); - } - return false; - } }// namespace Mist diff --git a/src/input/input_ts.h b/src/input/input_ts.h index 3116725d..8d2655af 100644 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -3,49 +3,56 @@ #include #include #include +#include #include #include namespace Mist{ /// This class contains all functions needed to implement TS Input - class inputTS : public Input{ + class inputTS : public Input, public Util::DataCallback{ public: inputTS(Util::Config *cfg); ~inputTS(); - virtual bool needsLock(); + + // This function can simply check standAlone because we ensure it's set in checkArguments, + // which is always called before the first call to needsLock + virtual bool needsLock(){return standAlone && Input::needsLock();} virtual std::string getConnectedBinHost(){ - if (tcpCon){return tcpCon.getBinHost();} - /// \TODO Handle UDP - return Input::getConnectedBinHost(); + if (udpMode){return udpCon.getBinDestination();} + return reader.getBinHost(); } + virtual bool publishesTracks(){return false;} + virtual void dataCallback(const char *ptr, size_t size); + virtual size_t getDataCallbackPos() const; protected: // Private Functions bool checkArguments(); bool preRun(); bool readHeader(); virtual bool needHeader(); + virtual void postHeader(); virtual void getNext(size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); void readPMT(); bool openStreamSource(); - void parseStreamHeader(); void streamMainLoop(); void finish(); - FILE *inFile; ///< The input file with ts data TS::Assembler assembler; + Util::ResizeablePointer liveReadBuffer; TS::Stream tsStream; ///< Used for parsing the incoming ts stream Socket::UDPConnection udpCon; - Socket::Connection tcpCon; + HTTP::URIReader reader; TS::Packet tsBuf; pid_t inputProcess; - size_t tmpIdx; bool isFinished; + bool udpMode; bool rawMode; - Util::ResizeablePointer rawBuffer; size_t rawIdx; uint64_t lastRawPacket; + uint64_t readPos; + bool unitStartSeen; }; }// namespace Mist