From 3734c90544a68d03a1a69438f00f215c517fec01 Mon Sep 17 00:00:00 2001 From: Phencys Date: Sun, 20 Sep 2020 20:31:17 +0200 Subject: [PATCH] Added support for raw passthrough of MPEG2-TS data --- src/input/input_ts.cpp | 73 ++++++++++++++++++++++++++++++++--- src/input/input_ts.h | 5 +++ src/input/input_tsrist.cpp | 54 +++++++++++++++++++++++++- src/input/input_tsrist.h | 6 +++ src/input/input_tssrt.cpp | 35 +++++++++++++++++ src/input/input_tssrt.h | 4 ++ src/io.cpp | 7 ++++ src/io.h | 1 + src/output/output_httpts.cpp | 1 + src/output/output_ts.cpp | 1 + src/output/output_ts_base.cpp | 5 +++ src/output/output_tsrist.cpp | 1 + src/output/output_tssrt.cpp | 1 + 13 files changed, 187 insertions(+), 7 deletions(-) diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index 98300411..cfe3f7cd 100644 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -134,6 +134,13 @@ void parseThread(void *mistIn){ } } } + + //On shutdown, make sure to clean up stream buffer + if (idx != INVALID_TRACK_ID){ + tthread::lock_guard guard(threadClaimMutex); + input->liveFinalize(idx); + } + std::string reason = "unknown reason"; if (!(Util::bootSecs() - threadTimer[tid] < THREAD_TIMEOUT)){reason = "thread timeout";} if (!cfgPointer->is_active){reason = "input shutting down";} @@ -155,6 +162,9 @@ namespace Mist{ /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. inputTS::inputTS(Util::Config *cfg) : Input(cfg){ + rawMode = false; + rawIdx = INVALID_TRACK_ID; + lastRawPacket = 0; capa["name"] = "TS"; capa["desc"] = "This input allows you to stream MPEG2-TS data from static files (/*.ts), streamed files " @@ -188,6 +198,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); capa["codecs"][0u][1u].append("opus"); + capa["codecs"][1u][0u].append("rawts"); inFile = NULL; inputProcess = 0; isFinished = false; @@ -232,6 +243,16 @@ namespace Mist{ "Alternative stream to load for playback when there is no active broadcast"; capa["optional"]["fallback_stream"]["type"] = "str"; capa["optional"]["fallback_stream"]["default"] = ""; + + capa["optional"]["raw"]["name"] = "Raw input mode"; + capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode"; + capa["optional"]["raw"]["option"] = "--raw"; + + JSON::Value option; + option["long"] = "raw"; + option["short"] = "R"; + option["help"] = "Enable raw MPEG-TS passthrough mode"; + config->addOption("raw", option); } inputTS::~inputTS(){ @@ -257,6 +278,10 @@ namespace Mist{ /// Live Setup of TS Input bool inputTS::preRun(){ INFO_MSG("Prerun: %s", config->getString("input").c_str()); + + rawMode = config->getBool("raw"); + if (rawMode){INFO_MSG("Entering raw mode");} + // streamed standard input if (config->getString("input") == "-"){ standAlone = false; @@ -520,9 +545,28 @@ namespace Mist{ } if (tcpCon.Received().available(188) && tcpCon.Received().get()[0] == 0x47){ std::string newData = tcpCon.Received().remove(188); - tsBuf.FromPointer(newData.data()); - liveStream.add(tsBuf); - if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} + if (rawMode){ + keepAlive(); + rawBuffer.append(newData); + if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ + if (rawIdx == INVALID_TRACK_ID){ + rawIdx = meta.addTrack(); + meta.setType(rawIdx, "meta"); + meta.setCodec(rawIdx, "rawts"); + meta.setID(rawIdx, 1); + userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); + } + uint64_t packetTime = Util::bootMS(); + thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); + bufferLivePacket(thisPacket); + lastRawPacket = packetTime; + rawBuffer.truncate(0); + } + }else { + tsBuf.FromPointer(newData.data()); + liveStream.add(tsBuf); + if (!liveStream.isDataTrack(tsBuf.getPID())){liveStream.parse(tsBuf.getPID());} + } } } noDataSince = Util::bootSecs(); @@ -543,7 +587,26 @@ namespace Mist{ gettingData = true; INFO_MSG("Now receiving UDP data..."); } - assembler.assemble(liveStream, udpCon.data, udpCon.data.size()); + if (rawMode){ + keepAlive(); + rawBuffer.append(udpCon.data, udpCon.data.size()); + if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ + if (rawIdx == INVALID_TRACK_ID){ + rawIdx = meta.addTrack(); + meta.setType(rawIdx, "meta"); + meta.setCodec(rawIdx, "rawts"); + meta.setID(rawIdx, 1); + userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); + } + uint64_t packetTime = Util::bootMS(); + thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); + bufferLivePacket(thisPacket); + lastRawPacket = packetTime; + rawBuffer.truncate(0); + } + }else{ + assembler.assemble(liveStream, udpCon.data, udpCon.data.size()); + } } if (!received){ Util::sleep(100); @@ -578,7 +641,7 @@ namespace Mist{ } std::set activeTracks = liveStream.getActiveTracks(); - { + if (!rawMode){ tthread::lock_guard guard(threadClaimMutex); if (hasStarted && !threadTimer.size()){ if (!isAlwaysOn()){ diff --git a/src/input/input_ts.h b/src/input/input_ts.h index e810ddb4..3116725d 100644 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -41,6 +41,11 @@ namespace Mist{ pid_t inputProcess; size_t tmpIdx; bool isFinished; + + bool rawMode; + Util::ResizeablePointer rawBuffer; + size_t rawIdx; + uint64_t lastRawPacket; }; }// namespace Mist diff --git a/src/input/input_tsrist.cpp b/src/input/input_tsrist.cpp index 18b05c9d..5b95aa9f 100644 --- a/src/input/input_tsrist.cpp +++ b/src/input/input_tsrist.cpp @@ -66,6 +66,10 @@ namespace Mist{ /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. inputTSRIST::inputTSRIST(Util::Config *cfg) : Input(cfg){ + rawMode = false; + rawIdx = INVALID_TRACK_ID; + lastRawPacket = 0; + hasRaw = false; connPtr = this; cnfPtr = config; @@ -96,6 +100,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); capa["codecs"][0u][1u].append("opus"); + capa["codecs"][1u][0u].append("rawts"); JSON::Value option; option["arg"] = "integer"; @@ -132,6 +137,15 @@ namespace Mist{ capa["optional"]["profile"]["type"] = "select"; capa["optional"]["profile"]["option"] = "--profile"; + capa["optional"]["raw"]["name"] = "Raw input mode"; + capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode"; + capa["optional"]["raw"]["option"] = "--raw"; + + option["long"] = "raw"; + option["short"] = "R"; + option["help"] = "Enable raw MPEG-TS passthrough mode"; + config->addOption("raw", option); + lastTimeStamp = 0; timeStampOffset = 0; receiver_ctx = 0; @@ -146,6 +160,9 @@ namespace Mist{ /// Live Setup of SRT Input. Runs only if we are the "main" thread bool inputTSRIST::preRun(){ + rawMode = config->getBool("raw"); + if (rawMode){INFO_MSG("Entering raw mode");} + std::string source = config->getString("input"); standAlone = false; HTTP::URL u(source); @@ -161,6 +178,20 @@ namespace Mist{ // Retrieve the next packet to be played from the srt connection. void inputTSRIST::getNext(size_t idx){ thisPacket.null(); + if (rawMode){ + //Set to false so the other thread knows its safe to fill + hasRaw = false; + while (!hasRaw && config->is_active){ + Util::sleep(50); + if (!bufferActive()){ + Util::logExitReason("Buffer shut down"); + return; + } + } + //if hasRaw, thisPacket has been filled by the other thread + return; + } + while (!thisPacket && config->is_active){ if (tsStream.hasPacket()){ tsStream.getEarliestPacket(thisPacket); @@ -228,8 +259,27 @@ namespace Mist{ } void inputTSRIST::addData(const char * ptr, size_t len){ - for (size_t o = 0; o <= len-188; o += 188){ - tsStream.parse((char*)ptr+o, 0); + for (size_t o = 0; o+188 <= len; o += 188){ + if (rawMode){ + rawBuffer.append(ptr+o, 188); + if (!hasRaw && rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ + if (rawIdx == INVALID_TRACK_ID){ + rawIdx = meta.addTrack(); + meta.setType(rawIdx, "meta"); + meta.setCodec(rawIdx, "rawts"); + meta.setID(rawIdx, 1); + userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); + } + thisTime = Util::bootMS(); + thisIdx = rawIdx; + thisPacket.genericFill(thisTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); + lastRawPacket = thisTime; + rawBuffer.truncate(0); + hasRaw = true; + } + }else{ + tsStream.parse((char*)ptr+o, 0); + } } } diff --git a/src/input/input_tsrist.h b/src/input/input_tsrist.h index 3bc29e1c..731f9b04 100644 --- a/src/input/input_tsrist.h +++ b/src/input/input_tsrist.h @@ -33,6 +33,12 @@ namespace Mist{ virtual void connStats(Comms::Statistics &statComm); struct rist_ctx *receiver_ctx; + + bool rawMode; + Util::ResizeablePointer rawBuffer; + size_t rawIdx; + uint64_t lastRawPacket; + bool hasRaw; }; }// namespace Mist diff --git a/src/input/input_tssrt.cpp b/src/input/input_tssrt.cpp index 6cd614e7..da69be29 100644 --- a/src/input/input_tssrt.cpp +++ b/src/input/input_tssrt.cpp @@ -25,6 +25,7 @@ Util::Config *cfgPointer = NULL; std::string baseStreamName; Socket::SRTServer sSock; +bool rawMode = false; void (*oldSignal)(int, siginfo_t *,void *) = 0; @@ -49,6 +50,8 @@ namespace Mist{ /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. inputTSSRT::inputTSSRT(Util::Config *cfg, SRTSOCKET s) : Input(cfg){ + rawIdx = INVALID_TRACK_ID; + lastRawPacket = 0; capa["name"] = "TSSRT"; capa["desc"] = "This input allows for processing MPEG2-TS-based SRT streams. Use mode=listener " "for push input."; @@ -66,6 +69,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); capa["codecs"][0u][1u].append("opus"); + capa["codecs"][1u][0u].append("rawts"); JSON::Value option; option["arg"] = "integer"; @@ -103,7 +107,16 @@ namespace Mist{ capa["optional"]["acceptable"]["select"][2u][0u] = 2; capa["optional"]["acceptable"]["select"][2u][1u] = "Disallow non-matching streamid"; + capa["optional"]["raw"]["name"] = "Raw input mode"; + capa["optional"]["raw"]["help"] = "Enable raw MPEG-TS passthrough mode"; + capa["optional"]["raw"]["option"] = "--raw"; + option.null(); + option["long"] = "raw"; + option["short"] = "R"; + option["help"] = "Enable raw MPEG-TS passthrough mode"; + config->addOption("raw", option); + // Setup if we are called form with a thread for push-based input. if (s != -1){ srtConn = Socket::SRTConnection(s); @@ -131,6 +144,8 @@ namespace Mist{ /// Live Setup of SRT Input. Runs only if we are the "main" thread bool inputTSSRT::preRun(){ + rawMode = config->getBool("raw"); + if (rawMode){INFO_MSG("Entering raw mode");} if (srtConn.getSocket() == -1){ std::string source = config->getString("input"); standAlone = false; @@ -183,6 +198,26 @@ namespace Mist{ size_t recvSize = srtConn.RecvNow(); if (recvSize){ + if (rawMode){ + keepAlive(); + rawBuffer.append(srtConn.recvbuf, recvSize); + if (rawBuffer.size() >= 1316 && (lastRawPacket == 0 || lastRawPacket != Util::bootMS())){ + if (rawIdx == INVALID_TRACK_ID){ + rawIdx = meta.addTrack(); + meta.setType(rawIdx, "meta"); + meta.setCodec(rawIdx, "rawts"); + meta.setID(rawIdx, 1); + userSelect[rawIdx].reload(streamName, rawIdx, COMM_STATUS_SOURCE); + } + uint64_t packetTime = Util::bootMS(); + thisPacket.genericFill(packetTime, 0, 1, rawBuffer, rawBuffer.size(), 0, 0); + bufferLivePacket(thisPacket); + lastRawPacket = packetTime; + rawBuffer.truncate(0); + return; + } + continue; + } if (assembler.assemble(tsStream, srtConn.recvbuf, recvSize, true)){hasPacket = tsStream.hasPacket();} }else if (srtConn){ // This should not happen as the SRT socket is read blocking and won't return until there is diff --git a/src/input/input_tssrt.h b/src/input/input_tssrt.h index 40fa05c1..4f337b48 100644 --- a/src/input/input_tssrt.h +++ b/src/input/input_tssrt.h @@ -41,6 +41,10 @@ namespace Mist{ Socket::SRTConnection srtConn; bool singularFlag; virtual void connStats(Comms::Statistics &statComm); + + Util::ResizeablePointer rawBuffer; + size_t rawIdx; + uint64_t lastRawPacket; }; }// namespace Mist diff --git a/src/io.cpp b/src/io.cpp index 6a5844c4..fea32193 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -292,6 +292,13 @@ namespace Mist{ tPages.setInt("avail", pageOffset + packDataLen, pageIdx); } + /// Wraps up the buffering of a shared memory data page + /// \param idx The track index of the page to finalize + void InOutBase::liveFinalize(size_t idx){ + if (!livePage.count(idx)){return;} + bufferFinalize(idx, livePage[idx]); + } + /// Wraps up the buffering of a shared memory data page /// \param idx The track index of the page to finalize void InOutBase::bufferFinalize(size_t idx, IPC::sharedPage & page){ diff --git a/src/io.h b/src/io.h index d839952a..e6545929 100644 --- a/src/io.h +++ b/src/io.h @@ -21,6 +21,7 @@ namespace Mist{ bool bufferStart(size_t idx, uint32_t pageNumber, IPC::sharedPage & page, DTSC::Meta & aMeta); void bufferFinalize(size_t idx, IPC::sharedPage & page); + void liveFinalize(size_t idx); bool isCurrentLivePage(size_t idx, uint32_t pageNumber); void bufferRemove(size_t idx, uint32_t pageNumber); void bufferLivePacket(const DTSC::Packet &packet); diff --git a/src/output/output_httpts.cpp b/src/output/output_httpts.cpp index 6aab2b9b..9daa9dc3 100644 --- a/src/output/output_httpts.cpp +++ b/src/output/output_httpts.cpp @@ -135,6 +135,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("+AC3"); capa["codecs"][0u][1u].append("+MP2"); capa["codecs"][0u][1u].append("+opus"); + capa["codecs"][1u][0u].append("rawts"); capa["methods"][0u]["handler"] = "http"; capa["methods"][0u]["type"] = "html5/video/mpeg"; capa["methods"][0u]["hrn"] = "TS HTTP progressive"; diff --git a/src/output/output_ts.cpp b/src/output/output_ts.cpp index d2addc24..561bf9aa 100644 --- a/src/output/output_ts.cpp +++ b/src/output/output_ts.cpp @@ -179,6 +179,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("+AC3"); capa["codecs"][0u][1u].append("+MP2"); capa["codecs"][0u][1u].append("+opus"); + capa["codecs"][1u][0u].append("rawts"); cfg->addConnectorOptions(8888, capa); config = cfg; capa["push_urls"].append("tsudp://*"); diff --git a/src/output/output_ts_base.cpp b/src/output/output_ts_base.cpp index 17b2a8ce..c4163e7f 100644 --- a/src/output/output_ts_base.cpp +++ b/src/output/output_ts_base.cpp @@ -75,6 +75,11 @@ namespace Mist{ size_t dataLen = 0; thisPacket.getString("data", dataPointer, dataLen); // data + if (codec == "rawts"){ + for (size_t i = 0; i+188 <= dataLen; i+=188){sendTS(dataPointer+i, 188);} + return; + } + packTime *= 90; std::string bs; // prepare bufferstring diff --git a/src/output/output_tsrist.cpp b/src/output/output_tsrist.cpp index ee57d8ee..c4e03f80 100644 --- a/src/output/output_tsrist.cpp +++ b/src/output/output_tsrist.cpp @@ -202,6 +202,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("+AC3"); capa["codecs"][0u][1u].append("+MP2"); capa["codecs"][0u][1u].append("+opus"); + capa["codecs"][1u][0u].append("rawts"); capa["optional"]["profile"]["name"] = "RIST profile"; capa["optional"]["profile"]["help"] = "RIST profile to use"; diff --git a/src/output/output_tssrt.cpp b/src/output/output_tssrt.cpp index fb07b11e..db07dc91 100644 --- a/src/output/output_tssrt.cpp +++ b/src/output/output_tssrt.cpp @@ -200,6 +200,7 @@ namespace Mist{ capa["codecs"][0u][1u].append("AC3"); capa["codecs"][0u][1u].append("MP2"); capa["codecs"][0u][1u].append("opus"); + capa["codecs"][1u][0u].append("rawts"); cfg->addConnectorOptions(8889, capa); config = cfg; capa["push_urls"].append("srt://*");