Added support for raw passthrough of MPEG2-TS data

This commit is contained in:
Phencys 2020-09-20 20:31:17 +02:00 committed by Thulinma
parent 267a74f0f6
commit 3734c90544
13 changed files with 187 additions and 7 deletions

View file

@ -134,6 +134,13 @@ void parseThread(void *mistIn){
}
}
}
//On shutdown, make sure to clean up stream buffer
if (idx != INVALID_TRACK_ID){
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
input->liveFinalize(idx);
}
std::string reason = "unknown reason";
if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";}
if (!cfgPointer->is_active){reason = "input shutting down";}
@ -155,6 +162,9 @@ namespace Mist{
/// Constructor of TS Input
/// \arg cfg Util::Config that contains all current configurations.
inputTS::inputTS(Util::Config *cfg) : Input(cfg){
rawMode = false;
rawIdx = INVALID_TRACK_ID;
lastRawPacket = 0;
capa["name"] = "TS";
capa["desc"] =
"This input allows you to stream MPEG2-TS data from static files (/*.ts), streamed files "
@ -188,6 +198,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
capa["codecs"][0u][1u].append("opus");
capa["codecs"][1u][0u].append("rawts");
inFile = NULL;
inputProcess = 0;
isFinished = false;
@ -232,6 +243,16 @@ namespace Mist{
"Alternative stream to load for playback when there is no active broadcast";
capa["optional"]["fallback_stream"]["type"] = "str";
capa["optional"]["fallback_stream"]["default"] = "";
capa["optional"]["raw"]["name"] = "Raw input mode";
capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode";
capa["optional"]["raw"]["option"] = "--raw";
JSON::Value option;
option["long"] = "raw";
option["short"] = "R";
option["help"] = "Enable raw MPEG-TS passthrough mode";
config->addOption("raw", option);
}
inputTS::~inputTS(){
@ -257,6 +278,10 @@ namespace Mist{
/// Live Setup of TS Input
bool inputTS::preRun(){
INFO_MSG("Prerun: %s", config->getString("input").c_str());
rawMode = config->getBool("raw");
if (rawMode){INFO_MSG("Entering raw mode");}
// streamed standard input
if (config->getString("input") == "-"){
standAlone = false;
@ -520,9 +545,28 @@ namespace Mist{
}
if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){
std::string newData = tcpCon.Received().remove(188);
tsBuf.FromPointer(newData.data());
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
if (rawMode){
keepAlive();
rawBuffer.append(newData);
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawIdx == INVALID_TRACK_ID){
rawIdx = meta.addTrack();
meta.setType(rawIdx, "meta");
meta.setCodec(rawIdx, "rawts");
meta.setID(rawIdx, 1);
userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE);
}
uint64_t packetTime = Util::bootMS();
thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0);
bufferLivePacket(thisPacket);
lastRawPacket = packetTime;
rawBuffer.truncate(0);
}
}else {
tsBuf.FromPointer(newData.data());
liveStream.add(tsBuf);
if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());}
}
}
}
noDataSince = Util::bootSecs();
@ -543,7 +587,26 @@ namespace Mist{
gettingData = true;
INFO_MSG("Now receiving UDP data...");
}
assembler.assemble(liveStream, udpCon.data, udpCon.data.size());
if (rawMode){
keepAlive();
rawBuffer.append(udpCon.data, udpCon.data.size());
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawIdx == INVALID_TRACK_ID){
rawIdx = meta.addTrack();
meta.setType(rawIdx, "meta");
meta.setCodec(rawIdx, "rawts");
meta.setID(rawIdx, 1);
userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE);
}
uint64_t packetTime = Util::bootMS();
thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0);
bufferLivePacket(thisPacket);
lastRawPacket = packetTime;
rawBuffer.truncate(0);
}
}else{
assembler.assemble(liveStream, udpCon.data, udpCon.data.size());
}
}
if (!received){
Util::sleep(100);
@ -578,7 +641,7 @@ namespace Mist{
}
std::set<size_t> activeTracks = liveStream.getActiveTracks();
{
if (!rawMode){
tthread::lock_guard<tthread::mutex> guard(threadClaimMutex);
if (hasStarted && !threadTimer.size()){
if (!isAlwaysOn()){

View file

@ -41,6 +41,11 @@ namespace Mist{
pid_t inputProcess;
size_t tmpIdx;
bool isFinished;
bool rawMode;
Util::ResizeablePointer rawBuffer;
size_t rawIdx;
uint64_t lastRawPacket;
};
}// namespace Mist

View file

@ -66,6 +66,10 @@ namespace Mist{
/// Constructor of TS Input
/// \arg cfg Util::Config that contains all current configurations.
inputTSRIST::inputTSRIST(Util::Config *cfg) : Input(cfg){
rawMode = false;
rawIdx = INVALID_TRACK_ID;
lastRawPacket = 0;
hasRaw = false;
connPtr = this;
cnfPtr = config;
@ -96,6 +100,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
capa["codecs"][0u][1u].append("opus");
capa["codecs"][1u][0u].append("rawts");
JSON::Value option;
option["arg"] = "integer";
@ -132,6 +137,15 @@ namespace Mist{
capa["optional"]["profile"]["type"] = "select";
capa["optional"]["profile"]["option"] = "--profile";
capa["optional"]["raw"]["name"] = "Raw input mode";
capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode";
capa["optional"]["raw"]["option"] = "--raw";
option["long"] = "raw";
option["short"] = "R";
option["help"] = "Enable raw MPEG-TS passthrough mode";
config->addOption("raw", option);
lastTimeStamp = 0;
timeStampOffset = 0;
receiver_ctx = 0;
@ -146,6 +160,9 @@ namespace Mist{
/// Live Setup of SRT Input. Runs only if we are the "main" thread
bool inputTSRIST::preRun(){
rawMode = config->getBool("raw");
if (rawMode){INFO_MSG("Entering raw mode");}
std::string source = config->getString("input");
standAlone = false;
HTTP::URL u(source);
@ -161,6 +178,20 @@ namespace Mist{
// Retrieve the next packet to be played from the srt connection.
void inputTSRIST::getNext(size_t idx){
thisPacket.null();
if (rawMode){
//Set to false so the other thread knows its safe to fill
hasRaw = false;
while (!hasRaw && config->is_active){
Util::sleep(50);
if (!bufferActive()){
Util::logExitReason("Buffer shut down");
return;
}
}
//if hasRaw, thisPacket has been filled by the other thread
return;
}
while (!thisPacket && config->is_active){
if (tsStream.hasPacket()){
tsStream.getEarliestPacket(thisPacket);
@ -228,8 +259,27 @@ namespace Mist{
}
void inputTSRIST::addData(const char * ptr, size_t len){
for (size_t o = 0; o <= len-188; o += 188){
tsStream.parse((char*)ptr+o, 0);
for (size_t o = 0; o+188 <= len; o += 188){
if (rawMode){
rawBuffer.append(ptr+o, 188);
if (!hasRaw && rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawIdx == INVALID_TRACK_ID){
rawIdx = meta.addTrack();
meta.setType(rawIdx, "meta");
meta.setCodec(rawIdx, "rawts");
meta.setID(rawIdx, 1);
userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE);
}
thisTime = Util::bootMS();
thisIdx = rawIdx;
thisPacket.genericFill(thisTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0);
lastRawPacket = thisTime;
rawBuffer.truncate(0);
hasRaw = true;
}
}else{
tsStream.parse((char*)ptr+o, 0);
}
}
}

View file

@ -33,6 +33,12 @@ namespace Mist{
virtual void connStats(Comms::Statistics &statComm);
struct rist_ctx *receiver_ctx;
bool rawMode;
Util::ResizeablePointer rawBuffer;
size_t rawIdx;
uint64_t lastRawPacket;
bool hasRaw;
};
}// namespace Mist

View file

@ -25,6 +25,7 @@
Util::Config *cfgPointer = NULL;
std::string baseStreamName;
Socket::SRTServer sSock;
bool rawMode = false;
void (*oldSignal)(int, siginfo_t *,void *) = 0;
@ -49,6 +50,8 @@ namespace Mist{
/// Constructor of TS Input
/// \arg cfg Util::Config that contains all current configurations.
inputTSSRT::inputTSSRT(Util::Config *cfg, SRTSOCKET s) : Input(cfg){
rawIdx = INVALID_TRACK_ID;
lastRawPacket = 0;
capa["name"] = "TSSRT";
capa["desc"] = "This input allows for processing MPEG2-TS-based SRT streams. Use mode=listener "
"for push input.";
@ -66,6 +69,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
capa["codecs"][0u][1u].append("opus");
capa["codecs"][1u][0u].append("rawts");
JSON::Value option;
option["arg"] = "integer";
@ -103,7 +107,16 @@ namespace Mist{
capa["optional"]["acceptable"]["select"][2u][0u] = 2;
capa["optional"]["acceptable"]["select"][2u][1u] = "Disallow non-matching streamid";
capa["optional"]["raw"]["name"] = "Raw input mode";
capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode";
capa["optional"]["raw"]["option"] = "--raw";
option.null();
option["long"] = "raw";
option["short"] = "R";
option["help"] = "Enable raw MPEG-TS passthrough mode";
config->addOption("raw", option);
// Setup if we are called form with a thread for push-based input.
if (s != -1){
srtConn = Socket::SRTConnection(s);
@ -131,6 +144,8 @@ namespace Mist{
/// Live Setup of SRT Input. Runs only if we are the "main" thread
bool inputTSSRT::preRun(){
rawMode = config->getBool("raw");
if (rawMode){INFO_MSG("Entering raw mode");}
if (srtConn.getSocket() == -1){
std::string source = config->getString("input");
standAlone = false;
@ -183,6 +198,26 @@ namespace Mist{
size_t recvSize = srtConn.RecvNow();
if (recvSize){
if (rawMode){
keepAlive();
rawBuffer.append(srtConn.recvbuf, recvSize);
if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){
if (rawIdx == INVALID_TRACK_ID){
rawIdx = meta.addTrack();
meta.setType(rawIdx, "meta");
meta.setCodec(rawIdx, "rawts");
meta.setID(rawIdx, 1);
userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE);
}
uint64_t packetTime = Util::bootMS();
thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0);
bufferLivePacket(thisPacket);
lastRawPacket = packetTime;
rawBuffer.truncate(0);
return;
}
continue;
}
if (assembler.assemble(tsStream, srtConn.recvbuf, recvSize, true)){hasPacket = tsStream.hasPacket();}
}else if (srtConn){
// This should not happen as the SRT socket is read blocking and won't return until there is

View file

@ -41,6 +41,10 @@ namespace Mist{
Socket::SRTConnection srtConn;
bool singularFlag;
virtual void connStats(Comms::Statistics &statComm);
Util::ResizeablePointer rawBuffer;
size_t rawIdx;
uint64_t lastRawPacket;
};
}// namespace Mist

View file

@ -292,6 +292,13 @@ namespace Mist{
tPages.setInt("avail", pageOffset + packDataLen, pageIdx);
}
/// Wraps up the buffering of a shared memory data page
/// \param idx The track index of the page to finalize
void InOutBase::liveFinalize(size_t idx){
if (!livePage.count(idx)){return;}
bufferFinalize(idx, livePage[idx]);
}
/// Wraps up the buffering of a shared memory data page
/// \param idx The track index of the page to finalize
void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){

View file

@ -21,6 +21,7 @@ namespace Mist{
bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta);
void bufferFinalize(size_t idx, IPC::sharedPage & page);
void liveFinalize(size_t idx);
bool isCurrentLivePage(size_t idx, uint32_t pageNumber);
void bufferRemove(size_t idx, uint32_t pageNumber);
void bufferLivePacket(const DTSC::Packet &packet);

View file

@ -135,6 +135,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("+AC3");
capa["codecs"][0u][1u].append("+MP2");
capa["codecs"][0u][1u].append("+opus");
capa["codecs"][1u][0u].append("rawts");
capa["methods"][0u]["handler"] = "http";
capa["methods"][0u]["type"] = "html5/video/mpeg";
capa["methods"][0u]["hrn"] = "TS HTTP progressive";

View file

@ -179,6 +179,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("+AC3");
capa["codecs"][0u][1u].append("+MP2");
capa["codecs"][0u][1u].append("+opus");
capa["codecs"][1u][0u].append("rawts");
cfg->addConnectorOptions(8888, capa);
config = cfg;
capa["push_urls"].append("tsudp://*");

View file

@ -75,6 +75,11 @@ namespace Mist{
size_t dataLen = 0;
thisPacket.getString("data", dataPointer, dataLen); // data
if (codec == "rawts"){
for (size_t i = 0; i+188 <= dataLen; i+=188){sendTS(dataPointer+i, 188);}
return;
}
packTime *= 90;
std::string bs;
// prepare bufferstring

View file

@ -202,6 +202,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("+AC3");
capa["codecs"][0u][1u].append("+MP2");
capa["codecs"][0u][1u].append("+opus");
capa["codecs"][1u][0u].append("rawts");
capa["optional"]["profile"]["name"] = "RIST profile";
capa["optional"]["profile"]["help"] = "RIST profile to use";

View file

@ -200,6 +200,7 @@ namespace Mist{
capa["codecs"][0u][1u].append("AC3");
capa["codecs"][0u][1u].append("MP2");
capa["codecs"][0u][1u].append("opus");
capa["codecs"][1u][0u].append("rawts");
cfg->addConnectorOptions(8889, capa);
config = cfg;
capa["push_urls"].append("srt://*");