diff --git a/CMakeLists.txt b/CMakeLists.txt index 56b13ab7..d4f58ec7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -394,6 +394,7 @@ makeInput(ISMV ismv)#LTS makeInput(MP4 mp4) makeInput(TS ts)#LTS makeInput(Folder folder)#LTS +makeInput(Playlist playlist)#LTS makeInput(Balancer balancer)#LTS makeInput(RTSP rtsp)#LTS diff --git a/lib/defines.h b/lib/defines.h index 756d4aa1..3668b1fc 100644 --- a/lib/defines.h +++ b/lib/defines.h @@ -177,3 +177,7 @@ static inline void show_stackframe(){} #define INVALID_TRACK_ID 0 +//The amount of milliseconds a simulated live stream is allowed to be "behind". +//Setting this value to lower than 2 seconds **WILL** cause stuttering in playback due to buffer negotiation. +#define SIMULATED_LIVE_BUFFER 7000 + diff --git a/src/controller/controller_api.cpp b/src/controller/controller_api.cpp index 05374ba8..e485099a 100644 --- a/src/controller/controller_api.cpp +++ b/src/controller/controller_api.cpp @@ -1,5 +1,6 @@ #include //for browse API call #include //for browse API call +#include #include #include #include @@ -31,6 +32,59 @@ std::string getChallenge(Socket::Connection & conn){ return Secure::md5(Date.str().c_str() + conn.getHost()); } +/// Executes a single Playlist-based API command. Recurses if necessary. +static void executePlsCommand(JSON::Value & cmd, std::deque & lines){ + if (!cmd.isArray() || !cmd.size()){ + FAIL_MSG("Not a valid playlist API command: %s", cmd.toString().c_str()); + return; + } + if (cmd[0u].isArray()){ + jsonForEach(cmd, it){ + executePlsCommand(*it, lines); + } + return; + } + if (!cmd[0u].isString()){ + FAIL_MSG("Not a valid playlist API command: %s", cmd.toString().c_str()); + return; + } + if (cmd[0u].asStringRef() == "append" && cmd.size() == 2 && cmd[1u].isString()){ + lines.push_back(cmd[1u].asStringRef()); + return; + } + if (cmd[0u].asStringRef() == "clear" && cmd.size() == 1){ + lines.clear(); + return; + } + if (cmd[0u].asStringRef() == "remove" && cmd.size() == 2 && cmd[1u].isString()){ + const std::string & toRemove = cmd[1u].asStringRef(); + for (std::deque::iterator it = lines.begin(); it != lines.end(); ++it){ + if ((*it) == toRemove){ + (*it) = ""; + } + } + return; + } + if (cmd[0u].asStringRef() == "line" && cmd.size() == 3 && cmd[1u].isInt() && cmd[2u].isString()){ + if (cmd[1u].asInt() >= lines.size()){ + FAIL_MSG("Line number %d does not exist in playlist - cannot modify line", (int)cmd[1u].asInt()); + return; + } + lines[cmd[1u].asInt()] = cmd[2u].asStringRef(); + return; + } + if (cmd[0u].asStringRef() == "replace" && cmd.size() == 3 && cmd[1u].isString() && cmd[2u].isString()){ + const std::string & toReplace = cmd[1u].asStringRef(); + for (std::deque::iterator it = lines.begin(); it != lines.end(); ++it){ + if ((*it) == toReplace){ + (*it) = cmd[2u].asStringRef(); + } + } + return; + } + FAIL_MSG("Not a valid playlist API command: %s", cmd.toString().c_str()); +} + ///\brief Checks an authorization request for a given user. ///\param Request The request to be parsed. ///\param Response The location to store the generated response. @@ -704,6 +758,78 @@ void Controller::handleAPICommands(JSON::Value & Request, JSON::Value & Response free(rpath); } + // Examples of valid playlist requests: + //"playlist":{"streamname": ["append", "path/to/file.ts"]} + //"playlist":{"streamname": ["remove", "path/to/file.ts"]} + //"playlist":{"streamname": ["line", 2, "path/to/file.ts"]} + //"playlist":{"streamname": true} + //"playlist":{"streamname": [["append", "path/to/file.ts"], ["remove", "path/to/file.ts"]]} + if (Request.isMember("playlist")){ + if (!Request["playlist"].isObject()){ + ERROR_MSG("Playlist API call requires object payload, no object given"); + }else{ + jsonForEach(Request["playlist"], it){ + if (!Controller::Storage["streams"].isMember(it.key()) || !Controller::Storage["streams"][it.key()].isMember("source")){ + FAIL_MSG("Playlist API call (partially) not executed: stream '%s' not configured", it.key().c_str()); + }else{ + std::string src = Controller::Storage["streams"][it.key()]["source"].asString(); + if (src.substr(src.size() - 4) != ".pls"){ + FAIL_MSG("Playlist API call (partially) not executed: stream '%s' is not playlist-based", it.key().c_str()); + }else{ + bool readFirst = true; + struct stat fileinfo; + if (stat(src.c_str(), &fileinfo) != 0){ + if (errno == EACCES){ + FAIL_MSG("Playlist API call (partially) not executed: stream '%s' playlist '%s' cannot be accessed (no file permissions)", it.key().c_str(), src.c_str()); + break; + } + if (errno == ENOENT){ + WARN_MSG("Creating playlist file: %s", src.c_str()); + readFirst = false; + } + } + std::deque lines; + if (readFirst){ + std::ifstream plsRead(src.c_str()); + if (!plsRead.good()){ + FAIL_MSG("Playlist (%s) for stream '%s' could not be opened for reading; aborting command(s)", src.c_str(), it.key().c_str()); + break; + } + std::string line; + do { + std::getline(plsRead, line); + if (line.size() || plsRead.good()){lines.push_back(line);} + } while(plsRead.good()); + } + unsigned int plsNo = 0; + for (std::deque::iterator plsIt = lines.begin(); plsIt != lines.end(); ++plsIt){ + MEDIUM_MSG("Before playlist command item %u: %s", plsNo, plsIt->c_str()); + ++plsNo; + } + if (!it->isBool()){ + executePlsCommand(*it, lines); + } + JSON::Value & outPls = Response["playlist"][it.key()]; + std::ofstream plsOutFile(src.c_str(), std::ios_base::trunc); + if (!plsOutFile.good()){ + FAIL_MSG("Could not open playlist for writing: %s", src.c_str()); + break; + } + plsNo = 0; + for (std::deque::iterator plsIt = lines.begin(); plsIt != lines.end(); ++plsIt){ + MEDIUM_MSG("After playlist command item %u: %s", plsNo, plsIt->c_str()); + ++plsNo; + outPls.append(*plsIt); + if (plsNo < lines.size() || (*plsIt).size()){ + plsOutFile << (*plsIt) << "\n"; + } + } + } + } + } + } + } + if (Request.isMember("save")){ Controller::Log("CONF", "Writing config to file on request through API"); Controller::writeConfigToDisk(); diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 909fbbb5..0bff01a1 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -115,7 +115,12 @@ namespace Controller { //new style always on if (data.isMember("always_on")){ INFO_MSG("Starting always-on input %s: %s", name.c_str(), URL.c_str()); - Util::startInput(name, URL); + std::map empty_overrides; + pid_t program = 0; + Util::startInput(name, URL, true, false, empty_overrides, &program); + if (program){ + inputProcesses[name] = program; + } } //non-VoD stream if (URL.substr(0, 1) != "/"){return;} @@ -182,6 +187,7 @@ namespace Controller { out[jit.key()] = (*jit); out[jit.key()].removeNullMembers(); out[jit.key()]["name"] = jit.key(); + checkParameters(out[jit.key()]); Log("STRM", std::string("Updated stream ") + jit.key()); } }else{ @@ -206,6 +212,7 @@ namespace Controller { out[jit.key()] = (*jit); out[jit.key()].removeNullMembers(); out[jit.key()]["name"] = jit.key(); + checkParameters(out[jit.key()]); Log("STRM", std::string("New stream ") + jit.key()); } Controller::writeStream(jit.key(), out[jit.key()]); @@ -374,5 +381,35 @@ namespace Controller { return ret; } + bool isMatch(const std::string & source, const std::string & match){ + std::string front = match.substr(0,match.find('*')); + std::string back = match.substr(match.find('*')+1); + return (source.substr(0,front.size()) == front && source.substr(source.size()-back.size()) == back); + } + + void checkParameters(JSON::Value & streamObj){ + JSON::Value & inpt = Controller::capabilities["inputs"]; + std::string match; + jsonForEach(inpt, it){ + if ((*it)["source_match"].isArray()){ + jsonForEach((*it)["source_match"], subIt){ + if (isMatch(streamObj["source"].asStringRef(), (*subIt).asStringRef())){ + match = (*it)["name"].asString(); + } + } + } + if ((*it)["source_match"].isString()){ + if (isMatch(streamObj["source"].asStringRef(), (*it)["source_match"].asStringRef())){ + match = (*it)["name"].asString(); + } + } + } + if (match != ""){ + jsonForEach(inpt[match]["hardcoded"], it){ + streamObj[it.key()] = *it; + } + } + } + } //Controller namespace diff --git a/src/controller/controller_streams.h b/src/controller/controller_streams.h index de961ab0..2ea03f21 100644 --- a/src/controller/controller_streams.h +++ b/src/controller/controller_streams.h @@ -7,6 +7,7 @@ namespace Controller { void CheckStreams(JSON::Value & in, JSON::Value & out); void AddStreams(JSON::Value & in, JSON::Value & out); int deleteStream(const std::string & name, JSON::Value & out, bool sourceFileToo = false); + void checkParameters(JSON::Value & stream); struct liveCheck { long long int lastms; diff --git a/src/input/input.cpp b/src/input/input.cpp index 8747dbed..affced17 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -71,7 +71,28 @@ namespace Mist { capa["optional"]["verimatrix-playready"]["option"] = "--verimatrix-playready"; capa["optional"]["verimatrix-playready"]["type"] = "str"; capa["optional"]["verimatrix-playready"]["default"] = ""; + option.null(); + option["long"] = "realtime"; + option["short"] = "r"; + option["help"] = "Feed the results of this input in realtime to the buffer"; + config->addOption("realtime", option); + capa["optional"]["realtime"]["name"] = "Simulated Live"; + capa["optional"]["realtime"]["help"] = "Make this input run as a simulated live stream"; + capa["optional"]["realtime"]["option"] = "--realtime"; + + option.null(); + option["long"] = "simulated-starttime"; + option["arg"] = "integer"; + option["short"] = "S"; + option["help"] = "Unix timestamp on which the simulated start of the stream is based."; + option["value"].append(0); + config->addOption("simulated-starttime", option); + capa["optional"]["simulated-starttime"]["name"] = "Simulated start time"; + capa["optional"]["simulated-starttime"]["help"] = "The unix timestamp on which this stream is assumed to have started playback, or 0 for automatic"; + capa["optional"]["simulated-starttime"]["option"] = "--simulated-starttime"; + capa["optional"]["simulated-starttime"]["type"] = "uint"; + capa["optional"]["simulated-starttime"]["default"] = 0; /*LTS-END*/ capa["optional"]["debug"]["name"] = "debug"; @@ -79,6 +100,7 @@ namespace Mist { capa["optional"]["debug"]["option"] = "--debug"; capa["optional"]["debug"]["type"] = "debug"; + packTime = 0; lastActive = Util::epoch(); playing = 0; @@ -574,7 +596,9 @@ namespace Mist { if(isSingular()){ overrides["singular"] = ""; } - + if (config->getBool("realtime")){ + overrides["resume"] = "1"; + } if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer WARN_MSG("Could not start buffer, cancelling"); return; @@ -602,13 +626,71 @@ namespace Mist { return; } - for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - it->second.firstms = 0; - it->second.lastms = 0; - selectedTracks.insert(it->first); + + timeOffset = 0; + + //If resume mode is on, find matching tracks and set timeOffset values to make sure we append to the tracks. + if (config->getBool("realtime")){ + seek(0); + + + char nameBuf[NAME_BUFFER_SIZE]; + snprintf(nameBuf, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str()); + IPC::sharedPage curMeta(nameBuf); + + + static char liveSemName[NAME_BUFFER_SIZE]; + snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str()); + IPC::semaphore * liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live); + if (*liveSem){ + liveSem->wait(); + }else{ + delete liveSem; + liveSem = 0; + } + DTSC::Packet tmpMeta(curMeta.mapped, curMeta.len, true); + if (liveSem){ + liveSem->post(); + delete liveSem; + liveSem = 0; + } + DTSC::Meta tmpM(tmpMeta); + unsigned int minKeepAway = 0; + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); ++it){ + for (std::map::iterator secondIt = tmpM.tracks.begin(); secondIt != tmpM.tracks.end(); ++secondIt){ + if (it->second.codec == secondIt->second.codec && it->second.init == secondIt->second.init){ + timeOffset = std::max(timeOffset, (uint64_t)secondIt->second.lastms); + minKeepAway = std::max(minKeepAway, secondIt->second.minKeepAway); + } + } + } + + if (timeOffset){ + timeOffset += 1000;//Add an artificial second to make sure we append and not overwrite + } } - std::string reason = streamMainLoop(); + for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ + originalFirstms[it->first] = it->second.firstms; + it->second.firstms = timeOffset; + it->second.lastms = 0; + selectedTracks.insert(it->first); + it->second.minKeepAway = SIMULATED_LIVE_BUFFER; + } + nProxy.pagesByTrack.clear(); + + simStartTime = config->getInteger("simulated-starttime"); + if (!simStartTime){ + simStartTime = Util::bootMS(); + } + + + std::string reason; + if (config->getBool("realtime")){ + reason = realtimeMainLoop(); + }else{ + reason = streamMainLoop(); + } closeStreamSource(); @@ -631,17 +713,44 @@ namespace Mist { return "Unknown"; } + std::string Input::realtimeMainLoop(){ + getNext(); + while (thisPacket && config->is_active && nProxy.userClient.isAlive()){ + while (config->is_active&& nProxy.userClient.isAlive() && Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]) + simStartTime){ + Util::sleep(std::min(((thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER), (uint64_t)1000)); + nProxy.userClient.keepAlive(); + } + uint64_t originalTime = thisPacket.getTime(); + if (originalTime >= originalFirstms[thisPacket.getTrackId()]){ + if (timeOffset || originalFirstms[thisPacket.getTrackId()]){ + thisPacket.setTime(thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]); + } + nProxy.bufferLivePacket(thisPacket, myMeta); + if (timeOffset){ + thisPacket.setTime(originalTime); + } + } + getNext(); + nProxy.userClient.keepAlive(); + } + if (!thisPacket){return "Invalid packet";} + if (!config->is_active){return "received deactivate signal";} + if (!nProxy.userClient.isAlive()){return "buffer shutdown";} + return "Unknown"; + } + void Input::finish() { + if (!standAlone){ + return; + } for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) { for (std::map::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) { it2->second = 1; } } removeUnused(); - if (standAlone) { - for (std::map::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); it++) { - it->second.master = true; - } + for (std::map::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); it++) { + it->second.master = true; } } diff --git a/src/input/input.h b/src/input/input.h index cc1a9ec0..baaa7f7c 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -25,10 +26,8 @@ namespace Mist { virtual int boot(int argc, char * argv[]); virtual ~Input() {}; - virtual bool needsLock(){return true;} - static Util::Config * config; - + virtual bool needsLock(){return !config->getBool("realtime");} protected: static void callbackWrapper(char * data, size_t len, unsigned int id); virtual bool checkArguments() = 0; @@ -42,7 +41,7 @@ namespace Mist { virtual void seek(int seekTime){}; virtual void finish(); virtual bool keepRunning(); - virtual bool openStreamSource() { return false; } + virtual bool openStreamSource() { return readHeader(); } virtual void closeStreamSource() {} virtual void parseStreamHeader() {} void play(int until = 0); @@ -56,6 +55,7 @@ namespace Mist { virtual void serve(); virtual void stream(); virtual std::string streamMainLoop(); + virtual std::string realtimeMainLoop(); bool isAlwaysOn(); virtual void parseHeader(); @@ -73,6 +73,8 @@ namespace Mist { JSON::Value capa; std::map > keyTimes; + uint64_t timeOffset; + std::map originalFirstms; //Create server for user pages IPC::sharedServer userPage; @@ -89,6 +91,8 @@ namespace Mist { void readSrtHeader(); void getNextSrt(bool smart = true); DTSC::Packet srtPack; + + uint64_t simStartTime; }; } diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index 539c2eae..dee35772 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -25,6 +25,9 @@ namespace Mist { inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) { + + capa["optional"].removeMember("realtime"); + liveMeta = 0; capa["name"] = "Buffer"; JSON::Value option; @@ -453,6 +456,9 @@ namespace Mist { eraseTrackDataPages(*it); } } + for (std::map::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); ++it){ + it->second.master = true; + } } /// \triggers @@ -637,12 +643,28 @@ namespace Mist { } if (activeTracks.count(value)) { updateMeta(); - eraseTrackDataPages(value); activeTracks.erase(value); - bufferLocations.erase(value); + if (!config->getBool("resume")){ + bufferLocations.erase(value); + eraseTrackDataPages(value); + }else{ + //finalize key count on page. We can NOT do this through bufferFinalize, as this triggers side effects.... + for (int i = 0; i < 1024; i++) { + int * tmpOffset = (int *)(nProxy.metaPages[value].mapped + (i * 8)); + int keyNum = ntohl(tmpOffset[0]); + int keyAmount = ntohl(tmpOffset[1]); + if(keyAmount == 1000){ + tmpOffset[1] = htonl(myMeta.tracks[value].keys.rbegin()->getNumber() - keyNum + 1); + break; + } + } + } + } + + if (!config->getBool("resume")){ + nProxy.metaPages[value].master = true; + nProxy.metaPages.erase(value); } - nProxy.metaPages[value].master = true; - nProxy.metaPages.erase(value); continue; } } @@ -792,8 +814,8 @@ namespace Mist { if (finalMap == -1) { //No collision has been detected, assign a new final number finalMap = (myMeta.tracks.size() ? myMeta.tracks.rbegin()->first : 0) + 1; - DEBUG_MSG(DLVL_DEVEL, "No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap); - /*LTS-START*/ + MEDIUM_MSG("No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap); + /*LTS-START*/ if (Triggers::shouldTrigger("STREAM_TRACK_ADD")) { std::string payload = config->getString("streamname") + "\n" + JSON::Value(finalMap).asString() + "\n"; Triggers::doTrigger("STREAM_TRACK_ADD", payload, config->getString("streamname")); @@ -804,13 +826,16 @@ namespace Mist { //or if the firstms of the replacement track is later than the lastms on the existing track if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms) { if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0) { - INFO_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); + MEDIUM_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); } else { - INFO_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id); + MEDIUM_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id); + if (resumeMode && (myMeta.bufferWindow > 15000)){ + WARN_MSG("Non-resumed track detected; playback will likely not be correct"); + } } } else { //Otherwise replace existing track - INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); + MEDIUM_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id); myMeta.tracks.erase(finalMap); //Set master to true before erasing the page, because we are responsible for cleaning up unused pages updateMeta(); @@ -837,7 +862,11 @@ namespace Mist { //Write the final mapped track number and keyframe number to the user page element //This is used to resume pushing as well as pushing new tracks userConn.setTrackId(index, finalMap); - userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size()); + if (myMeta.tracks[finalMap].keys.size()){ + userConn.setKeynum(index, myMeta.tracks[finalMap].keys.rbegin()->getNumber()); + }else{ + userConn.setKeynum(index, 0); + } //Update the metadata to reflect all changes updateMeta(); } @@ -945,6 +974,7 @@ namespace Mist { } bool inputBuffer::preRun() { + //This function gets run periodically to make sure runtime updates of the config get parsed. lastReTime = Util::epoch(); /*LTS*/ std::string strName = config->getString("streamname"); Util::sanitizeName(strName); diff --git a/src/input/input_playlist.cpp b/src/input/input_playlist.cpp new file mode 100644 index 00000000..157d5b09 --- /dev/null +++ b/src/input/input_playlist.cpp @@ -0,0 +1,160 @@ +#include "input_playlist.h" +#include +#include +#include +#include + +#include +#include + +namespace Mist { + inputPlaylist::inputPlaylist(Util::Config * cfg) : Input(cfg) { + capa["name"] = "Playlist"; + capa["desc"] = "Enables Playlist Input"; + capa["source_match"] = "*.pls"; + capa["priority"] = 9; + + capa["hardcoded"]["resume"] = 1; + capa["hardcoded"]["always_on"] = 1; + + + playlistIndex = 0xFFFFFFFEull;//Not FFFFFFFF on purpose! + seenValidEntry = true; + } + + bool inputPlaylist::checkArguments(){ + if (config->getString("input") == "-") { + std::cerr << "Input from stdin not supported" << std::endl; + return false; + } + if (!config->getString("streamname").size()){ + if (config->getString("output") == "-") { + std::cerr << "Output to stdout not supported" << std::endl; + return false; + } + }else{ + if (config->getString("output") != "-") { + std::cerr << "File output not supported" << std::endl; + return false; + } + } + return true; + } + + void inputPlaylist::stream(){ + IPC::semaphore playlistLock; + playlistLock.open(std::string("/MstPlaylist_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); + if (!playlistLock){ + FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); + return; + } + if (!playlistLock.tryWait()){ + WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); + playlistLock.close(); + return; + } + + std::map overrides; + overrides["resume"] = "1"; + if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer + playlistLock.post(); + playlistLock.close(); + playlistLock.unlink(); + WARN_MSG("Could not start buffer, cancelling"); + return; + } + + char userPageName[NAME_BUFFER_SIZE]; + snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); + nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); + nProxy.userClient.countAsViewer = false; + + uint64_t startTime = Util::bootMS(); + + while (config->is_active && nProxy.userClient.isAlive()){ + nProxy.userClient.keepAlive(); + reloadPlaylist(); + if (!playlist.size()){ + playlistLock.post(); + playlistLock.close(); + playlistLock.unlink(); + WARN_MSG("No entries in playlist, exiting"); + break; + } + ++playlistIndex; + if (playlistIndex >= playlist.size()){ + if (!seenValidEntry){ + HIGH_MSG("Parsed entire playlist without seeing a valid entry, wait a second for any entry to become available"); + Util::sleep(1000); + } + playlistIndex = 0; + seenValidEntry = false; + } + currentSource = playlist.at(playlistIndex); + + std::map overrides; + overrides["realtime"] = "1"; + overrides["alwaysStart"] = "";//Just making this value "available" is enough + overrides["simulated-starttime"] = JSON::Value(startTime).asString(); + std::string srcPath = config->getString("input"); + if ((currentSource.size() && currentSource[0] == '/') || srcPath.rfind('/') == std::string::npos){ + srcPath = currentSource; + } else { + srcPath = srcPath.substr(0, srcPath.rfind("/") + 1) + currentSource; + } + char * workingDir = getcwd(NULL, 0); + if (srcPath[0] != '/'){ + srcPath = std::string(workingDir) + "/" + srcPath; + } + free(workingDir); + + struct stat statRes; + if (stat(srcPath.c_str(), &statRes)){ + FAIL_MSG("%s does not exist on the system, skipping it.", srcPath.c_str()); + continue; + } + if ((statRes.st_mode & S_IFMT) != S_IFREG){ + FAIL_MSG("%s is not a valid file, skipping it.", srcPath.c_str()); + continue; + } + pid_t spawn_pid = 0; + if (!Util::startInput(streamName, srcPath, true, true, overrides, &spawn_pid)) {//manually override stream url to start the correct input + FAIL_MSG("Could not start input for source %s", srcPath.c_str()); + continue; + } + seenValidEntry = true; + while (Util::Procs::isRunning(spawn_pid) && nProxy.userClient.isAlive() && config->is_active){ + Util::sleep(1000); + nProxy.userClient.keepAlive(); + } + if (!config->is_active && Util::Procs::isRunning(spawn_pid)){ + Util::Procs::Stop(spawn_pid); + } + } + playlistLock.post(); + playlistLock.close(); + playlistLock.unlink(); + + nProxy.userClient.finish(); + } + + void inputPlaylist::reloadPlaylist(){ + std::string playlistFile = config->getString("input"); + std::ifstream inFile(playlistFile.c_str()); + if (!inFile.good()){ + WARN_MSG("Unable to open playlist, aborting reload!"); + return; + } + std::string line; + playlist.clear(); + while (inFile.good()){ + std::getline(inFile, line); + if (inFile.good() && line.size() && line.at(0) != '#'){ + playlist.push_back(line); + } + } + inFile.close(); + } + +} + diff --git a/src/input/input_playlist.h b/src/input/input_playlist.h new file mode 100644 index 00000000..582e14ec --- /dev/null +++ b/src/input/input_playlist.h @@ -0,0 +1,25 @@ +#include "input.h" +#include +#include + +namespace Mist { + class inputPlaylist : public Input { + public: + inputPlaylist(Util::Config * cfg); + bool needsLock(){return false;} + protected: + bool checkArguments(); + bool readHeader() { return true; } + void stream(); + virtual bool needHeader(){return false;} + private: + void reloadPlaylist(); + std::deque playlist; + std::string currentSource; + size_t playlistIndex; + bool seenValidEntry; + }; +} + +typedef Mist::inputPlaylist mistIn; + diff --git a/src/input/input_ts.cpp b/src/input/input_ts.cpp old mode 100755 new mode 100644 index 6abe13fd..8b2758de --- a/src/input/input_ts.cpp +++ b/src/input/input_ts.cpp @@ -623,7 +623,7 @@ namespace Mist { //otherwise, check input param const std::string & inpt = config->getString("input"); if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 6) != "srt://" && inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://" && inpt.substr(0, 8) != "https://" && inpt.substr(0, 11) != "https-ts://"){ - return true; + return Input::needsLock(); }else{ return false; } diff --git a/src/io.cpp b/src/io.cpp index ecd46937..23908687 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -497,7 +497,7 @@ namespace Mist { preBuffer[tid].push_back(packet); }else{ if (preBuffer[tid].size()){ - INFO_MSG("Track %lu accepted", tid); + INFO_MSG("Track %lu accepted as track %lu", tid, trackMap[tid] ); while (preBuffer[tid].size()){ bufferSinglePacket(preBuffer[tid].front(), myMeta); preBuffer[tid].pop_front(); @@ -564,7 +564,11 @@ namespace Mist { if (curPageNum.count(tid)) { nextPageNum = curPageNum[tid]; }else{ - nextPageNum = 1; + if (pagesByTrack.count(tid)){ + nextPageNum = pagesByTrack[tid].begin()->first; + }else{ + nextPageNum = 1; + } } } //If we have no pages by track, we have not received a starting keyframe yet. Drop this packet. @@ -733,6 +737,10 @@ namespace Mist { DTSC::Meta tmpMeta; tmpMeta.tracks[newTid] = myMeta.tracks[tid]; tmpMeta.tracks[newTid].trackID = newTid; + tmpMeta.tracks[newTid].fragments.clear(); + tmpMeta.tracks[newTid].keySizes.clear(); + tmpMeta.tracks[newTid].keys.clear(); + tmpMeta.tracks[newTid].parts.clear(); JSON::Value tmpVal = tmpMeta.toJSON(); if (!myMeta.tracks[tid].type.size() || !myMeta.tracks[tid].codec.size()){ FAIL_MSG("Negotiating a track without metadata. This is a serious issue, please report this to the developers."); @@ -782,6 +790,7 @@ namespace Mist { break; } + firstPage++; MEDIUM_MSG("Buffer says %s:%lu should start writing on track %lu, page %lu", streamName.c_str(), tid, finalTid, firstPage); trackMap[tid] = finalTid; if (myMeta.tracks.count(finalTid) && myMeta.tracks[finalTid].lastms){ @@ -790,8 +799,17 @@ namespace Mist { trackState[tid] = FILL_ACC; char pageName[NAME_BUFFER_SIZE]; snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), finalTid); - metaPages[tid].init(pageName, SHM_TRACK_INDEX_SIZE, true); + metaPages[tid].init(pageName, SHM_TRACK_INDEX_SIZE, false, false); + if (!metaPages[tid].mapped){ + metaPages[tid].init(pageName, SHM_TRACK_INDEX_SIZE, true); + } metaPages[tid].master = false; + + if (!pagesByTrack.count(tid) || pagesByTrack[tid].size() == 0) { + pagesByTrack[tid][firstPage].dataSize = DEFAULT_DATA_PAGE_SIZE;//Initialize op 25mb + pagesByTrack[tid][firstPage].pageNum = firstPage; + pagesByTrack[tid][firstPage].firstTime = 0; + } break; } default: