diff --git a/CMakeLists.txt b/CMakeLists.txt index a9472bc1..d5343e57 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -295,9 +295,6 @@ macro(makeInput inputName format) #Set compile definitions unset(my_definitions) - if (";${ARGN};" MATCHES ";tslive;") - list(APPEND my_definitions "TSLIVE_INPUT") - endif() list(APPEND my_definitions "INPUTTYPE=\"input_${format}.h\"") set_target_properties(MistIn${inputName} @@ -326,7 +323,6 @@ makeInput(Buffer buffer) makeInput(ISMV ismv)#LTS makeInput(MP4 mp4)#LTS makeInput(TS ts)#LTS -makeInput(TSStream ts tslive)#LTS makeInput(Folder folder folder)#LTS ######################################## diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index f30e1a29..fa32badf 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -68,7 +68,7 @@ namespace Controller { // False: start TS input INFO_MSG("No TS Input running on port %s for stream %s, starting it", udpPort.c_str(), name.c_str()); std::deque command; - command.push_back(Util::getMyPath() + "MistInTSStream"); + command.push_back(Util::getMyPath() + "MistInTS"); command.push_back("-s"); command.push_back(name); command.push_back("-p"); diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp index ea51c939..b0b8d799 100755 --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -19,22 +19,6 @@ #define SEM_TS_CLAIM "/MstTSIN%s" - -/// \todo Implement this trigger equivalent... -/* -if(Triggers::shouldTrigger("STREAM_PUSH", smp)){ - std::string payload = streamName+"\n" + myConn.getHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; - if (!Triggers::doTrigger("STREAM_PUSH", payload, smp)){ - DEBUG_MSG(DLVL_FAIL, "Push from %s to %s rejected - STREAM_PUSH trigger denied the push", myConn.getHost().c_str(), streamName.c_str()); - myConn.close(); - configLock.post(); - configLock.close(); - return; - } -} -*/ - -#ifdef TSLIVE_INPUT std::string globalStreamName; TS::Stream liveStream(true); Util::Config * cfgPointer = NULL; @@ -108,16 +92,15 @@ void parseThread(void * ignored) { myProxy.userClient.finish(); } -#endif - namespace Mist { /// Constructor of TS Input /// \arg cfg Util::Config that contains all current configurations. inputTS::inputTS(Util::Config * cfg) : Input(cfg) { capa["name"] = "TS"; - capa["decs"] = "Enables TS Input"; - capa["source_match"] = "/*.ts"; + capa["decs"] = "MPEG2-TS input from static files, streamed files, or multicast/unicast UDP socket"; + capa["source_match"].append("/*.ts"); + capa["source_match"].append("stream://*.ts"); capa["priority"] = 9ll; capa["codecs"][0u][0u].append("H264"); capa["codecs"][0u][0u].append("HEVC"); @@ -138,68 +121,59 @@ namespace Mist { capa["optional"]["multicastinterface"]["type"] = "str"; capa["optional"]["multicastinterface"]["default"] = ""; cfg->addOption("multicastinterface", - JSON::fromString("{\"arg\":\"string\",\"value\":\"\",\"short\":\"M\",\"long\":\"multicast-interface\",\"help\":\"The interfaces on which to listen for UDP Multicast packets, space separatered.\"}")); + JSON::fromString("{\"arg\":\"string\",\"value\":\"\",\"short\":\"M\",\"long\":\"multicast-interface\",\"help\":\"The interfaces on which to listen for UDP Multicast packets, space separated.\"}")); - pushing = false; inFile = NULL; - -#ifdef TSLIVE_INPUT - standAlone = false; -#endif } inputTS::~inputTS() { if (inFile) { fclose(inFile); } -#ifdef TSLIVE_INPUT - char semName[NAME_BUFFER_SIZE]; - snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); - IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); - lock.wait(); - threadTimer.clear(); - claimableThreads.clear(); - lock.post(); - lock.unlink(); -#endif + if (!standAlone){ + char semName[NAME_BUFFER_SIZE]; + snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); + IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); + lock.wait(); + threadTimer.clear(); + claimableThreads.clear(); + lock.post(); + lock.unlink(); + } } -#ifdef TSLIVE_INPUT ///Live Setup of TS Input bool inputTS::setup() { - INFO_MSG("Setup start"); - if (config->getString("input") == "-") { - inFile = stdin; - } else { - pushing = true; - udpCon.setBlocking(false); - std::string ipPort = config->getString("port"); - size_t colon = ipPort.rfind(':'); - if (colon != std::string::npos) { - udpCon.bind(JSON::Value(ipPort.substr(colon + 1)).asInt(), ipPort.substr(0, colon), config->getString("multicastinterface")); + const std::string & inpt = config->getString("input"); + if (inpt.size() && (inpt != "-" || inpt.substr(0,9) == "stream://")){ + if (inpt.substr(0,9) == "stream://"){ + inFile = fopen(inpt.c_str()+9, "r"); + standAlone = false; + }else{ + inFile = fopen(inpt.c_str(), "r"); + } + if (!inFile) { + return false; + } + }else{ + standAlone = false; + if (inpt == "-") { + inFile = stdin; } else { - udpCon.bind(JSON::Value(ipPort).asInt(), "", config->getString("multicastinterface")); + udpCon.setBlocking(false); + std::string ipPort = config->getString("port"); + size_t colon = ipPort.rfind(':'); + if (colon != std::string::npos) { + udpCon.bind(JSON::Value(ipPort.substr(colon + 1)).asInt(), ipPort.substr(0, colon), config->getString("multicastinterface")); + } else { + udpCon.bind(JSON::Value(ipPort).asInt(), "", config->getString("multicastinterface")); + } } } - INFO_MSG("Setup complete"); return true; } -#else - - ///Setup of TS Input - bool inputTS::setup() { - if (config->getString("input") != "-") { - inFile = fopen(config->getString("input").c_str(), "r"); - } - if (!inFile) { - return false; - } - return true; - } - -#endif ///Track selector of TS Input ///\arg trackSpec specifies which tracks are to be selected @@ -219,13 +193,6 @@ namespace Mist { } -#ifdef TSLIVE_INPUT - //This implementation in used in the live version of TS input, where no header is available in advance. - //Reading the header returns true in this case, to continue parsing the actual stream. - bool inputTS::readHeader() { - return true; - } -#else ///Reads headers from a TS stream, and saves them into metadata ///It works by going through the entire TS stream, and every time ///It encounters a new PES start, it writes the currently found PES data @@ -233,6 +200,7 @@ namespace Mist { ///it writes the remaining metadata. ///\todo Find errors, perhaps parts can be made more modular bool inputTS::readHeader() { + if (!standAlone){return true;} if (!inFile){return false;} //See whether a separate header file exists. if (readExistingHeader()){return true;} @@ -259,7 +227,6 @@ namespace Mist { myMeta.toFile(config->getString("input") + ".dtsh"); return true; } -#endif ///Gets the next packet that is to be sent ///At the moment, the logic of sending the last packet that was finished has been implemented, @@ -269,37 +236,18 @@ namespace Mist { INSANE_MSG("Getting next"); thisPacket.null(); bool hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); - while (!hasPacket && (pushing || !feof(inFile)) && config->is_active) { - if (!pushing) { - unsigned int bPos = ftell(inFile); - tsBuf.FromFile(inFile); - if (selectedTracks.count(tsBuf.getPID())) { - tsStream.parse(tsBuf, bPos); - } - } else { - while (udpCon.Receive()) { - udpDataBuffer.append(udpCon.data, udpCon.data_len); - while (udpDataBuffer.size() > 188 && (udpDataBuffer[0] != 0x47 || udpDataBuffer[188] != 0x47)) { - size_t syncPos = udpDataBuffer.find("\107", 1); - udpDataBuffer.erase(0, syncPos); - } - while (udpDataBuffer.size() >= 188) { - tsBuf.FromPointer(udpDataBuffer.data()); - tsStream.parse(tsBuf, 0); - udpDataBuffer.erase(0, 188); - } - } - Util::sleep(500); + while (!hasPacket && !feof(inFile) && config->is_active) { + unsigned int bPos = ftell(inFile); + tsBuf.FromFile(inFile); + if (selectedTracks.count(tsBuf.getPID())) { + tsStream.parse(tsBuf, bPos); } hasPacket = (selectedTracks.size() == 1 ? tsStream.hasPacket(*selectedTracks.begin()) : tsStream.hasPacketOnEachTrack()); } if (!hasPacket) { - if (inFile && !feof(inFile)) { + if (!feof(inFile)) { getNext(); } - if (pushing) { - sleep(500); - } return; } if (selectedTracks.size() == 1) { @@ -361,17 +309,27 @@ namespace Mist { fseek(inFile, seekPos, SEEK_SET);//seek to the correct position } -#ifdef TSLIVE_INPUT void inputTS::stream() { + if (!Util::startInput(streamName, "push://")) {//manually override stream url to start the buffer + FAIL_MSG("Could not start buffer for %s", streamName.c_str()); + return; + } + IPC::sharedClient statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); + uint64_t downCounter = 0; + uint64_t startTime = Util::epoch(); cfgPointer = config; globalStreamName = streamName; unsigned long long threadCheckTimer = Util::bootSecs(); while (config->is_active) { - if (!pushing) { - unsigned int bPos = ftell(inFile); + if (inFile) { + if (feof(inFile)){ + config->is_active = false; + INFO_MSG("Reached end of file on streamed input"); + } int ctr = 0; - while (ctr < 20 && tsBuf.FromFile(inFile)){ + while (ctr < 20 && tsBuf.FromFile(inFile) && !feof(inFile)){ liveStream.add(tsBuf); + downCounter += 188; ctr++; } } else { @@ -384,6 +342,7 @@ namespace Mist { if (udpCon.data[0] == 0x47){//check for sync byte if (offset + 188 <= udpCon.data_len){ liveStream.add(udpCon.data + offset); + downCounter += 188; }else{ leftData.append(udpCon.data + offset, udpCon.data_len - offset); } @@ -393,6 +352,7 @@ namespace Mist { leftData.append(udpCon.data + offset, 1); if (leftData.size() >= 188){ liveStream.add((char*)leftData.data()); + downCounter += 188; leftData.erase(0, 188); } } @@ -403,6 +363,28 @@ namespace Mist { } //Check for and spawn threads here. if (Util::bootSecs() - threadCheckTimer > 2) { + //Connect to stats for INPUT detection + uint64_t now = Util::epoch(); + if (!statsPage.getData()){ + statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true); + } + if (statsPage.getData()){ + if (!statsPage.isAlive()){ + config->is_active = false; + return; + } + IPC::statExchange tmpEx(statsPage.getData()); + tmpEx.now(now); + tmpEx.crc(getpid()); + tmpEx.streamName(streamName); + tmpEx.connector("INPUT"); + tmpEx.up(0); + tmpEx.down(downCounter); + tmpEx.time(now - startTime); + tmpEx.lastSecond(0); + statsPage.keepAlive(); + } + std::set activeTracks = liveStream.getActiveTracks(); char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); @@ -426,7 +408,7 @@ namespace Mist { lock.post(); threadCheckTimer = Util::bootSecs(); } - if (pushing){ + if (!inFile){ Util::sleep(100); } } @@ -435,6 +417,10 @@ namespace Mist { } void inputTS::finish() { + if (standAlone){ + Input::finish(); + return; + } char semName[NAME_BUFFER_SIZE]; snprintf(semName, NAME_BUFFER_SIZE, SEM_TS_CLAIM, globalStreamName.c_str()); IPC::semaphore lock(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1); @@ -452,9 +438,16 @@ namespace Mist { } bool inputTS::needsLock() { - return false; + //we already know no lock will be needed + if (!standAlone){return false;} + //otherwise, check input param + const std::string & inpt = config->getString("input"); + if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://"){ + return true; + }else{ + return false; + } } -#endif } diff --git a/src/input/input_ts.h b/src/input/input_ts.h index 09976ca9..f0a4829a 100755 --- a/src/input/input_ts.h +++ b/src/input/input_ts.h @@ -13,9 +13,7 @@ namespace Mist { public: inputTS(Util::Config * cfg); ~inputTS(); -#ifdef TSLIVE_INPUT bool needsLock(); -#endif protected: //Private Functions bool setup(); @@ -24,25 +22,12 @@ namespace Mist { void seek(int seekTime); void trackSelect(std::string trackSpec); void readPMT(); - -#ifdef TSLIVE_INPUT - //Live tsinput does not have a header, so parseheader should do nothing - void parseHeader() { } - //In case of live TS Input, we override the default serve function void stream(); void finish(); -#endif - - - - FILE * inFile;///