URIReader support for TS input

This commit is contained in:
Thulinma 2023-04-10 03:46:27 +02:00
parent 1df76eff16
commit f29d48154f
4 changed files with 191 additions and 173 deletions

View file

@ -15,26 +15,34 @@ tthread::recursive_mutex tMutex;
namespace TS{ namespace TS{
bool Assembler::assemble(Stream & TSStrm, char * ptr, size_t len, bool parse){ bool Assembler::assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse, uint64_t bytePos){
bool ret = false; bool ret = false;
size_t offset = 0; size_t offset = 0;
size_t amount = 188-leftData.size(); size_t amount = 188-leftData.size();
if (leftData.size() && len >= amount){ if (leftData.size()){
//Attempt to re-assemble a packet from the leftovers of last time + current head if (len >= amount){
if (len == amount || ptr[amount] == 0x47){ //Attempt to re-assemble a packet from the leftovers of last time + current head
VERYHIGH_MSG("Assembled scrap packet"); if (len == amount || ptr[amount] == 0x47){
//Success! VERYHIGH_MSG("Assembled scrap packet");
leftData.append(ptr, amount); //Success!
tsBuf.FromPointer(leftData); bytePos -= leftData.size();
if (!ret && tsBuf.getUnitStart()){ret = true;} leftData.append(ptr, amount);
if (parse){ tsBuf.FromPointer(leftData);
TSStrm.parse(tsBuf, 0); if (!ret && tsBuf.getUnitStart()){ret = true;}
}else{ if (parse){
TSStrm.add(tsBuf); TSStrm.parse(tsBuf, bytePos);
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} }else{
TSStrm.add(tsBuf);
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
}
offset = amount;
bytePos += 188;
leftData.truncate(0);
} }
offset = amount; }else{
leftData.assign(0,0); //No way to verify, we'll just append and hope for the best...
leftData.append(ptr, len);
return ret;
} }
//On failure, hope we might live to succeed another day //On failure, hope we might live to succeed another day
} }
@ -51,7 +59,7 @@ namespace TS{
tsBuf.FromPointer(ptr + offset); tsBuf.FromPointer(ptr + offset);
if (!ret && tsBuf.getUnitStart()){ret = true;} if (!ret && tsBuf.getUnitStart()){ret = true;}
if (parse){ if (parse){
TSStrm.parse(tsBuf, 0); TSStrm.parse(tsBuf, bytePos);
}else{ }else{
TSStrm.add(tsBuf); TSStrm.add(tsBuf);
if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());} if (!TSStrm.isDataTrack(tsBuf.getPID())){TSStrm.parse(tsBuf.getPID());}
@ -59,15 +67,21 @@ namespace TS{
}else{ }else{
leftData.assign(ptr + offset, len - offset); leftData.assign(ptr + offset, len - offset);
} }
bytePos += 188;
offset += 188; offset += 188;
}else{ }else{
++junk; ++junk;
++offset; ++offset;
++bytePos;
} }
} }
return ret; return ret;
} }
void Assembler::clear(){
leftData.truncate(0);
}
void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, uint32_t avail, uint64_t bPos){ void ADTSRemainder::setRemainder(const aac::adts &p, const void *source, uint32_t avail, uint64_t bPos){
if (!p.getCompleteSize()){return;} if (!p.getCompleteSize()){return;}

View file

@ -112,7 +112,8 @@ namespace TS{
class Assembler{ class Assembler{
public: public:
bool assemble(Stream & TSStrm, char * ptr, size_t len, bool parse = false); bool assemble(Stream & TSStrm, const char * ptr, size_t len, bool parse = false, uint64_t bytePos = 0);
void clear();
private: private:
Util::ResizeablePointer leftData; Util::ResizeablePointer leftData;
TS::Packet tsBuf; TS::Packet tsBuf;

View file

@ -7,7 +7,6 @@
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
#include <mist/defines.h> #include <mist/defines.h>
#include <mist/downloader.h>
#include <mist/flv_tag.h> #include <mist/flv_tag.h>
#include <mist/http_parser.h> #include <mist/http_parser.h>
#include <mist/mp4_generic.h> #include <mist/mp4_generic.h>
@ -163,8 +162,11 @@ namespace Mist{
/// \arg cfg Util::Config that contains all current configurations. /// \arg cfg Util::Config that contains all current configurations.
inputTS::inputTS(Util::Config *cfg) : Input(cfg){ inputTS::inputTS(Util::Config *cfg) : Input(cfg){
rawMode = false; rawMode = false;
udpMode = false;
rawIdx = INVALID_TRACK_ID; rawIdx = INVALID_TRACK_ID;
lastRawPacket = 0; lastRawPacket = 0;
readPos = 0;
unitStartSeen = false;
capa["name"] = "TS"; capa["name"] = "TS";
capa["desc"] = capa["desc"] =
"This input allows you to stream MPEG2-TS data from static files (/*.ts), streamed files " "This input allows you to stream MPEG2-TS data from static files (/*.ts), streamed files "
@ -180,6 +182,8 @@ namespace Mist{
capa["source_match"].append("http-ts://*"); capa["source_match"].append("http-ts://*");
capa["source_match"].append("https://*.ts"); capa["source_match"].append("https://*.ts");
capa["source_match"].append("https-ts://*"); capa["source_match"].append("https-ts://*");
capa["source_match"].append("s3+http://*.ts");
capa["source_match"].append("s3+https://*.ts");
// These can/may be set to always-on mode // These can/may be set to always-on mode
capa["always_match"].append("stream://*.ts"); capa["always_match"].append("stream://*.ts");
capa["always_match"].append("tsudp://*"); capa["always_match"].append("tsudp://*");
@ -188,6 +192,8 @@ namespace Mist{
capa["always_match"].append("http-ts://*"); capa["always_match"].append("http-ts://*");
capa["always_match"].append("https://*.ts"); capa["always_match"].append("https://*.ts");
capa["always_match"].append("https-ts://*"); capa["always_match"].append("https-ts://*");
capa["always_match"].append("s3+http://*.ts");
capa["always_match"].append("s3+https://*.ts");
capa["incoming_push_url"] = "udp://$host:$port"; capa["incoming_push_url"] = "udp://$host:$port";
capa["incoming_push_url_match"] = "tsudp://*"; capa["incoming_push_url_match"] = "tsudp://*";
capa["priority"] = 9; capa["priority"] = 9;
@ -199,7 +205,6 @@ namespace Mist{
capa["codecs"]["audio"].append("MP2"); capa["codecs"]["audio"].append("MP2");
capa["codecs"]["audio"].append("opus"); capa["codecs"]["audio"].append("opus");
capa["codecs"]["passthrough"].append("rawts"); capa["codecs"]["passthrough"].append("rawts");
inFile = NULL;
inputProcess = 0; inputProcess = 0;
isFinished = false; isFinished = false;
@ -256,8 +261,6 @@ namespace Mist{
} }
inputTS::~inputTS(){ inputTS::~inputTS(){
if (inFile){fclose(inFile);}
if (tcpCon){tcpCon.close();}
if (!standAlone){ if (!standAlone){
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex); tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
threadTimer.clear(); threadTimer.clear();
@ -265,6 +268,8 @@ namespace Mist{
} }
} }
bool skipPipes = false;
bool inputTS::checkArguments(){ bool inputTS::checkArguments(){
if (config->getString("input").substr(0, 6) == "srt://"){ if (config->getString("input").substr(0, 6) == "srt://"){
std::string source = config->getString("input"); std::string source = config->getString("input");
@ -272,40 +277,51 @@ namespace Mist{
config->getOption("input", true).append("ts-exec:srt-live-transmit " + srtUrl.getUrl() + " file://con"); config->getOption("input", true).append("ts-exec:srt-live-transmit " + srtUrl.getUrl() + " file://con");
INFO_MSG("Rewriting SRT source '%s' to '%s'", source.c_str(), config->getString("input").c_str()); INFO_MSG("Rewriting SRT source '%s' to '%s'", source.c_str(), config->getString("input").c_str());
} }
// We call preRun early and, if successful, close the opened reader.
// This is to ensure we have udpMode/rawMode/standAlone all set properly before the first call to needsLock.
// The reader must be closed so that the angel process does not have a reader open.
// There is no need to close a potential UDP socket, since that doesn't get opened in preRun just yet.
skipPipes = true;
if (!preRun()){return false;}
skipPipes = false;
reader.close();
return true; return true;
} }
/// Live Setup of TS Input /// Live Setup of TS Input
bool inputTS::preRun(){ bool inputTS::preRun(){
INFO_MSG("Prerun: %s", config->getString("input").c_str()); std::string const inCfg = config->getString("input");
udpMode = false;
rawMode = config->getBool("raw"); rawMode = config->getBool("raw");
if (rawMode){INFO_MSG("Entering raw mode");} if (rawMode){INFO_MSG("Entering raw mode");}
// UDP input (tsudp://[host:]port[/iface[,iface[,...]]])
if (inCfg.substr(0, 8) == "tsudp://"){
standAlone = false;
udpMode = true;
return true;
}
// streamed standard input // streamed standard input
if (config->getString("input") == "-"){ if (inCfg == "-"){
standAlone = false; standAlone = false;
tcpCon.open(fileno(stdout), fileno(stdin)); if (skipPipes){return true;}
return true; reader.open(0);
return reader;
} }
if (config->getString("input").substr(0, 7) == "http://" || //file descriptor input
config->getString("input").substr(0, 10) == "http-ts://" || if (inCfg.substr(0, 5) == "fd://"){
config->getString("input").substr(0, 8) == "https://" ||
config->getString("input").substr(0, 11) == "https-ts://"){
standAlone = false; standAlone = false;
HTTP::URL url(config->getString("input")); if (skipPipes){return true;}
if (url.protocol == "http-ts"){url.protocol = "http";} int fd = atoi(inCfg.c_str() + 5);
if (url.protocol == "https-ts"){url.protocol = "https";} INFO_MSG("Opening file descriptor %s (%d)", inCfg.c_str(), fd);
HTTP::Downloader DL; reader.open(fd);
DL.getHTTP().headerOnly = true; return reader;
if (!DL.get(url)){return false;}
tcpCon = DL.getSocket();
DL.getSocket().drop(); // Prevent shutdown of connection, keeping copy of socket open
return true;
} }
if (config->getString("input").substr(0, 8) == "ts-exec:"){ //ts-exec: input
if (inCfg.substr(0, 8) == "ts-exec:"){
standAlone = false; standAlone = false;
std::string input = config->getString("input").substr(8); if (skipPipes){return true;}
std::string input = inCfg.substr(8);
char *args[128]; char *args[128];
uint8_t argCnt = 0; uint8_t argCnt = 0;
char *startCh = 0; char *startCh = 0;
@ -328,32 +344,36 @@ namespace Mist{
int fin = -1, fout = -1; int fin = -1, fout = -1;
inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0); inputProcess = Util::Procs::StartPiped(args, &fin, &fout, 0);
tcpCon.open(-1, fout); reader.open(fout);
return true; return reader;
} }
// streamed file // streamed file
if (config->getString("input").substr(0, 9) == "stream://"){ if (inCfg.substr(0, 9) == "stream://"){
inFile = fopen(config->getString("input").c_str() + 9, "r"); reader.open(inCfg.substr(9));
tcpCon.open(-1, fileno(inFile)); FILE * inFile = fopen(inCfg.c_str() + 9, "r");
reader.open(fileno(inFile));
standAlone = false; standAlone = false;
return inFile; return inFile;
} }
//file descriptor input //Anything else, read through URIReader
if (config->getString("input").substr(0, 5) == "fd://"){ HTTP::URL url = HTTP::localURIResolver().link(inCfg);
int fd = atoi(config->getString("input").c_str() + 5); if (url.protocol == "http-ts"){url.protocol = "http";}
INFO_MSG("Opening file descriptor %s (%d)", config->getString("input").c_str(), fd); if (url.protocol == "https-ts"){url.protocol = "https";}
tcpCon.open(-1, fd); reader.open(url);
standAlone = false; standAlone = reader.isSeekable();
return tcpCon; return reader;
}
void inputTS::dataCallback(const char *ptr, size_t size){
if (standAlone){
unitStartSeen |= assembler.assemble(tsStream, ptr, size, true, readPos);
}else{
liveReadBuffer.append(ptr, size);
} }
// UDP input (tsudp://[host:]port[/iface[,iface[,...]]]) readPos += size;
if (config->getString("input").substr(0, 8) == "tsudp://"){ }
standAlone = false; size_t inputTS::getDataCallbackPos() const{
return true; return readPos;
}
// plain VoD file
inFile = fopen(config->getString("input").c_str(), "r");
return inFile;
} }
bool inputTS::needHeader(){ bool inputTS::needHeader(){
@ -366,19 +386,18 @@ namespace Mist{
/// It encounters a new PES start, it writes the currently found PES data /// It encounters a new PES start, it writes the currently found PES data
/// for a specific track to metadata. After the entire stream has been read, /// for a specific track to metadata. After the entire stream has been read,
/// it writes the remaining metadata. /// it writes the remaining metadata.
///\todo Find errors, perhaps parts can be made more modular
bool inputTS::readHeader(){ bool inputTS::readHeader(){
if (!inFile){return false;} if (!reader){return false;}
meta.reInit(isSingular() ? streamName : ""); meta.reInit(isSingular() ? streamName : "");
TS::Packet packet; // to analyse and extract data TS::Packet packet; // to analyse and extract data
DTSC::Packet headerPack; DTSC::Packet headerPack;
fseek(inFile, 0, SEEK_SET); // seek to beginning
uint64_t lastBpos = 0; while (!reader.isEOF()){
while (packet.FromFile(inFile) && !feof(inFile)){ uint64_t prePos = readPos;
tsStream.parse(packet, lastBpos); reader.readSome(188, *this);
lastBpos = Util::ftell(inFile); if (readPos == prePos){Util::sleep(50);}
if (packet.getUnitStart()){ if (unitStartSeen){
unitStartSeen = false;
while (tsStream.hasPacketOnEachTrack()){ while (tsStream.hasPacketOnEachTrack()){
tsStream.getEarliestPacket(headerPack); tsStream.getEarliestPacket(headerPack);
size_t pid = headerPack.getTrackId(); size_t pid = headerPack.getTrackId();
@ -393,10 +412,14 @@ namespace Mist{
meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen, meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen,
headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen()); headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen());
} }
//Set progress counter
if (streamStatus && streamStatus.len > 1 && reader.getSize()){
streamStatus.mapped[1] = (255 * readPos) / reader.getSize();
}
} }
} }
tsStream.finish(); tsStream.finish();
INFO_MSG("Reached %s at %" PRIu64 " bytes", feof(inFile) ? "EOF" : "error", lastBpos); INFO_MSG("Reached %s at %" PRIu64 " bytes", reader.isEOF() ? "EOF" : "error", readPos);
while (tsStream.hasPacket()){ while (tsStream.hasPacket()){
tsStream.getEarliestPacket(headerPack); tsStream.getEarliestPacket(headerPack);
size_t pid = headerPack.getTrackId(); size_t pid = headerPack.getTrackId();
@ -411,8 +434,6 @@ namespace Mist{
meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen, meta.update(headerPack.getTime(), headerPack.getInt("offset"), idx, dataLen,
headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen()); headerPack.getInt("bpos"), headerPack.getFlag("keyframe"), headerPack.getDataLen());
} }
fseek(inFile, 0, SEEK_SET);
return true; return true;
} }
@ -425,17 +446,17 @@ namespace Mist{
INSANE_MSG("Getting next on track %zu", idx); INSANE_MSG("Getting next on track %zu", idx);
thisPacket.null(); thisPacket.null();
bool hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); bool hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid));
while (!hasPacket && !feof(inFile) && while (!hasPacket && !reader.isEOF() &&
(inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active){ (inputProcess == 0 || Util::Procs::childRunning(inputProcess)) && config->is_active){
tsBuf.FromFile(inFile); uint64_t prePos = readPos;
if (idx == INVALID_TRACK_ID || pid == tsBuf.getPID()){ reader.readSome(188, *this);
tsStream.parse(tsBuf, 0); // bPos == 0 if (readPos == prePos){Util::sleep(50);}
if (tsBuf.getUnitStart()){ if (unitStartSeen){
hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid)); unitStartSeen = false;
} hasPacket = (idx == INVALID_TRACK_ID ? tsStream.hasPacket() : tsStream.hasPacket(pid));
} }
} }
if (feof(inFile)){ if (reader.isEOF()){
if (!isFinished){ if (!isFinished){
tsStream.finish(); tsStream.finish();
isFinished = true; isFinished = true;
@ -459,33 +480,23 @@ namespace Mist{
if (thisIdx == INVALID_TRACK_ID){getNext(idx);} if (thisIdx == INVALID_TRACK_ID){getNext(idx);}
} }
void inputTS::readPMT(){ /// Guarantees the PMT is read and we know about all tracks.
// save current file position void inputTS::postHeader(){
uint64_t bpos = Util::ftell(inFile); if (!standAlone){return;}
if (fseek(inFile, 0, SEEK_SET)){ tsStream.clear();
FAIL_MSG("Seek to 0 failed"); assembler.clear();
return; reader.seek(0);
} readPos = reader.getPos();
TS::Packet tsBuffer; while (!tsStream.hasPacketOnEachTrack()){
while (!tsStream.hasPacketOnEachTrack() && tsBuffer.FromFile(inFile)){ uint64_t prePos = readPos;
tsStream.parse(tsBuffer, 0); reader.readSome(188, *this);
} if (readPos == prePos){Util::sleep(50);}
// Clear leaves the PMT in place
tsStream.partialClear();
// Restore original file position
if (Util::fseek(inFile, bpos, SEEK_SET)){
clearerr(inFile);
return;
} }
} }
/// Seeks to a specific time /// Seeks to a specific time
void inputTS::seek(uint64_t seekTime, size_t idx){ void inputTS::seek(uint64_t seekTime, size_t idx){
tsStream.clear();
readPMT();
uint64_t seekPos = 0xFFFFFFFFull; uint64_t seekPos = 0xFFFFFFFFull;
if (idx != INVALID_TRACK_ID){ if (idx != INVALID_TRACK_ID){
uint32_t keyNum = M.getKeyNumForTime(idx, seekTime); uint32_t keyNum = M.getKeyNumForTime(idx, seekTime);
@ -500,56 +511,48 @@ namespace Mist{
if (thisBPos < seekPos){seekPos = thisBPos;} if (thisBPos < seekPos){seekPos = thisBPos;}
} }
} }
clearerr(inFile); isFinished = false;
Util::fseek(inFile, seekPos, SEEK_SET); // seek to the correct position tsStream.partialClear();
assembler.clear();
reader.seek(seekPos);
readPos = reader.getPos();
} }
bool inputTS::openStreamSource(){ bool inputTS::openStreamSource(){
const std::string &inpt = config->getString("input"); //Non-UDP mode inputs were already opened in preRun()
if (inpt.substr(0, 8) == "tsudp://"){ if (!udpMode){return reader;}
HTTP::URL input_url(inpt); HTTP::URL input_url(config->getString("input"));
udpCon.setBlocking(false); udpCon.setBlocking(false);
udpCon.bind(input_url.getPort(), input_url.host, input_url.path); udpCon.bind(input_url.getPort(), input_url.host, input_url.path);
if (udpCon.getSock() == -1){ // This line assures memory for destination address is allocated, so we can fill it during receive later
FAIL_MSG("Could not open UDP socket. Aborting."); udpCon.allocateDestination();
return false; return (udpCon.getSock() != -1);
}
}
return true;
}
void inputTS::parseStreamHeader(){
// Placeholder empty track to force normal code to continue despite no tracks available
tmpIdx = meta.addTrack(0, 0, 0, 0);
} }
void inputTS::streamMainLoop(){ void inputTS::streamMainLoop(){
meta.removeTrack(tmpIdx);
INFO_MSG("Removed temptrack %zu", tmpIdx);
Comms::Connections statComm; Comms::Connections statComm;
uint64_t downCounter = 0;
uint64_t startTime = Util::bootSecs(); uint64_t startTime = Util::bootSecs();
uint64_t noDataSince = Util::bootSecs(); uint64_t noDataSince = Util::bootSecs();
bool gettingData = false; bool gettingData = false;
bool hasStarted = false; bool hasStarted = false;
cfgPointer = config; cfgPointer = config;
globalStreamName = streamName; globalStreamName = streamName;
Util::ResizeablePointer newData;
unsigned long long threadCheckTimer = Util::bootSecs(); unsigned long long threadCheckTimer = Util::bootSecs();
while (config->is_active){ while (config->is_active){
if (tcpCon){ if (!udpMode){
if (tcpCon.spool()){ uint64_t prePos = readPos;
while (tcpCon.Received().available(188)){ reader.readSome(188, *this);
while (tcpCon.Received().get()[0] != 0x47 && tcpCon.Received().available(188)){ if (readPos == prePos){
tcpCon.Received().remove(1); Util::sleep(50);
}else{
while (liveReadBuffer.size() >= 188){
while (liveReadBuffer[0] != 0x47 && liveReadBuffer.size() >= 188){
liveReadBuffer.shift(1);
} }
if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){ if (liveReadBuffer.size() >= 188 && liveReadBuffer[0] == 0x47){
newData.truncate(0);
tcpCon.Received().remove(newData, 188);
if (rawMode){ if (rawMode){
keepAlive(); keepAlive();
rawBuffer.append(newData, newData.size()); if (liveReadBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawIdx == INVALID_TRACK_ID){ if (rawIdx == INVALID_TRACK_ID){
rawIdx = meta.addTrack(); rawIdx = meta.addTrack();
meta.setType(rawIdx, "meta"); meta.setType(rawIdx, "meta");
@ -558,23 +561,27 @@ namespace Mist{
userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE);
} }
uint64_t packetTime = Util::bootMS(); uint64_t packetTime = Util::bootMS();
thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); uint64_t packetLen = (liveReadBuffer.size() / 188) * 188;
thisPacket.genericFill(packetTime, 0, 1, liveReadBuffer, packetLen, 0, 0);
bufferLivePacket(thisPacket); bufferLivePacket(thisPacket);
lastRawPacket = packetTime; lastRawPacket = packetTime;
rawBuffer.truncate(0); liveReadBuffer.shift(packetLen);
} }
}else { }else {
tsBuf.FromPointer(newData); size_t shiftAmount = 0;
liveStream.add(tsBuf); for (size_t offset = 0; liveReadBuffer.size() >= offset + 188; offset += 188){
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} tsBuf.FromPointer(liveReadBuffer + offset);
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
shiftAmount += 188;
}
liveReadBuffer.shift(shiftAmount);
} }
} }
} }
noDataSince = Util::bootSecs(); noDataSince = Util::bootSecs();
}else{
Util::sleep(100);
} }
if (!tcpCon){ if (!reader){
config->is_active = false; config->is_active = false;
Util::logExitReason("end of streamed input"); Util::logExitReason("end of streamed input");
return; return;
@ -582,7 +589,7 @@ namespace Mist{
}else{ }else{
bool received = false; bool received = false;
while (udpCon.Receive()){ while (udpCon.Receive()){
downCounter += udpCon.data.size(); readPos += udpCon.data.size();
received = true; received = true;
if (!gettingData){ if (!gettingData){
gettingData = true; gettingData = true;
@ -590,8 +597,8 @@ namespace Mist{
} }
if (rawMode){ if (rawMode){
keepAlive(); keepAlive();
rawBuffer.append(udpCon.data, udpCon.data.size()); liveReadBuffer.append(udpCon.data, udpCon.data.size());
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ if (liveReadBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawIdx == INVALID_TRACK_ID){ if (rawIdx == INVALID_TRACK_ID){
rawIdx = meta.addTrack(); rawIdx = meta.addTrack();
meta.setType(rawIdx, "meta"); meta.setType(rawIdx, "meta");
@ -600,10 +607,10 @@ namespace Mist{
userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE);
} }
uint64_t packetTime = Util::bootMS(); uint64_t packetTime = Util::bootMS();
thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); thisPacket.genericFill(packetTime, 0, 1, liveReadBuffer, liveReadBuffer.size(), 0, 0);
bufferLivePacket(thisPacket); bufferLivePacket(thisPacket);
lastRawPacket = packetTime; lastRawPacket = packetTime;
rawBuffer.truncate(0); liveReadBuffer.truncate(0);
} }
}else{ }else{
assembler.assemble(liveStream, udpCon.data, udpCon.data.size()); assembler.assemble(liveStream, udpCon.data, udpCon.data.size());
@ -621,8 +628,10 @@ namespace Mist{
} }
// Check for and spawn threads here. // Check for and spawn threads here.
if (Util::bootSecs() - threadCheckTimer > 1){ if (Util::bootSecs() - threadCheckTimer > 1){
// Connect to stats for INPUT detection if (!statComm && gettingData){
statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), ""); // Connect to stats for INPUT detection
statComm.reload(streamName, getConnectedBinHost(), JSON::Value(getpid()).asString(), "INPUT:" + capa["name"].asStringRef(), "");
}
if (statComm){ if (statComm){
if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){ if (statComm.getStatus() & COMM_STATUS_REQDISCONNECT){
config->is_active = false; config->is_active = false;
@ -634,7 +643,7 @@ namespace Mist{
statComm.setStream(streamName); statComm.setStream(streamName);
statComm.setConnector("INPUT:" + capa["name"].asStringRef()); statComm.setConnector("INPUT:" + capa["name"].asStringRef());
statComm.setUp(0); statComm.setUp(0);
statComm.setDown(downCounter + tcpCon.dataDown()); statComm.setDown(readPos);
statComm.setTime(now - startTime); statComm.setTime(now - startTime);
statComm.setLastSecond(0); statComm.setLastSecond(0);
} }
@ -701,18 +710,5 @@ namespace Mist{
}while (threadCount); }while (threadCount);
} }
bool inputTS::needsLock(){
// we already know no lock will be needed
if (!standAlone){return false;}
// otherwise, check input param
const std::string &inpt = config->getString("input");
if (inpt.size() && inpt != "-" && inpt.substr(0, 9) != "stream://" && inpt.substr(0, 8) != "tsudp://" &&
inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 6) != "srt://" &&
inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://" &&
inpt.substr(0, 8) != "https://" && inpt.substr(0, 11) != "https-ts://"){
return Input::needsLock();
}
return false;
}
}// namespace Mist }// namespace Mist

View file

@ -3,49 +3,56 @@
#include <mist/nal.h> #include <mist/nal.h>
#include <mist/ts_packet.h> #include <mist/ts_packet.h>
#include <mist/ts_stream.h> #include <mist/ts_stream.h>
#include <mist/urireader.h>
#include <set> #include <set>
#include <string> #include <string>
namespace Mist{ namespace Mist{
/// This class contains all functions needed to implement TS Input /// This class contains all functions needed to implement TS Input
class inputTS : public Input{ class inputTS : public Input, public Util::DataCallback{
public: public:
inputTS(Util::Config *cfg); inputTS(Util::Config *cfg);
~inputTS(); ~inputTS();
virtual bool needsLock();
// This function can simply check standAlone because we ensure it's set in checkArguments,
// which is always called before the first call to needsLock
virtual bool needsLock(){return standAlone && Input::needsLock();}
virtual std::string getConnectedBinHost(){ virtual std::string getConnectedBinHost(){
if (tcpCon){return tcpCon.getBinHost();} if (udpMode){return udpCon.getBinDestination();}
/// \TODO Handle UDP return reader.getBinHost();
return Input::getConnectedBinHost();
} }
virtual bool publishesTracks(){return false;}
virtual void dataCallback(const char *ptr, size_t size);
virtual size_t getDataCallbackPos() const;
protected: protected:
// Private Functions // Private Functions
bool checkArguments(); bool checkArguments();
bool preRun(); bool preRun();
bool readHeader(); bool readHeader();
virtual bool needHeader(); virtual bool needHeader();
virtual void postHeader();
virtual void getNext(size_t idx = INVALID_TRACK_ID); virtual void getNext(size_t idx = INVALID_TRACK_ID);
void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID); void seek(uint64_t seekTime, size_t idx = INVALID_TRACK_ID);
void readPMT(); void readPMT();
bool openStreamSource(); bool openStreamSource();
void parseStreamHeader();
void streamMainLoop(); void streamMainLoop();
void finish(); void finish();
FILE *inFile; ///< The input file with ts data
TS::Assembler assembler; TS::Assembler assembler;
Util::ResizeablePointer liveReadBuffer;
TS::Stream tsStream; ///< Used for parsing the incoming ts stream TS::Stream tsStream; ///< Used for parsing the incoming ts stream
Socket::UDPConnection udpCon; Socket::UDPConnection udpCon;
Socket::Connection tcpCon; HTTP::URIReader reader;
TS::Packet tsBuf; TS::Packet tsBuf;
pid_t inputProcess; pid_t inputProcess;
size_t tmpIdx;
bool isFinished; bool isFinished;
bool udpMode;
bool rawMode; bool rawMode;
Util::ResizeablePointer rawBuffer;
size_t rawIdx; size_t rawIdx;
uint64_t lastRawPacket; uint64_t lastRawPacket;
uint64_t readPos;
bool unitStartSeen;
}; };
}// namespace Mist }// namespace Mist