diff --git a/lib/dtsc.h b/lib/dtsc.h index d187b58c..5ad31216 100644 --- a/lib/dtsc.h +++ b/lib/dtsc.h @@ -86,6 +86,8 @@ namespace DTSC { Scan getMember(const std::string & indice) const; Scan getMember(const char * indice) const; Scan getMember(const char * indice, size_t ind_len) const; + void nullMember(const std::string & indice); + void nullMember(const char * indice, size_t ind_len); Scan getIndice(size_t num) const; std::string getIndiceName(size_t num) const; size_t getSize() const; @@ -132,6 +134,7 @@ namespace DTSC { void setKeyFrame(bool kf); virtual uint64_t getTime() const; void setTime(uint64_t _time); + void nullMember(const std::string & memb); size_t getTrackId() const; char * getData() const; size_t getDataLen() const; @@ -141,6 +144,7 @@ namespace DTSC { JSON::Value toJSON() const; std::string toSummary() const; Scan getScan() const; + Scan getScan(); protected: bool master; packType version; @@ -366,6 +370,7 @@ namespace DTSC { Meta(); Meta(const DTSC::Packet & source); Meta(JSON::Value & meta); + bool nextIsKey; inline operator bool() const { //returns if the object contains valid meta data BY LOOKING AT vod/live FLAGS return vod || live; diff --git a/lib/dtscmeta.cpp b/lib/dtscmeta.cpp index 045f94fd..4642cc9e 100644 --- a/lib/dtscmeta.cpp +++ b/lib/dtscmeta.cpp @@ -485,6 +485,14 @@ namespace DTSC { Bit::htobll(data + 12, _time); } + void Packet::nullMember(const std::string & memb){ + if (!master){ + INFO_MSG("Can't null '%s' for this packet, as it is not master.", memb.c_str()); + return; + } + getScan().nullMember(memb); + } + ///\brief Returns the track id of the packet. ///\return The track id of this packet. size_t Packet::getTrackId() const { @@ -525,6 +533,15 @@ namespace DTSC { return Scan(data + (getDataLen() - getPayloadLen()), getPayloadLen()); } + /// Returns a DTSC::Scan instance to the contents of this packet. + /// May return an invalid instance if this packet is invalid. + Scan Packet::getScan(){ + if (!*this || !getDataLen() || !getPayloadLen() || getDataLen() <= getPayloadLen()){ + return Scan(); + } + return Scan(data + (getDataLen() - getPayloadLen()), getPayloadLen()); + } + ///\brief Converts the packet into a JSON value ///\return A JSON::Value representation of this packet. JSON::Value Packet::toJSON() const { @@ -600,6 +617,32 @@ namespace DTSC { return Scan(); } + /// If this is an object type and contains the given indice/len, sets the indice name to all zeroes. + void Scan::nullMember(const std::string & indice){ + nullMember(indice.data(), indice.size()); + } + + /// If this is an object type and contains the given indice/len, sets the indice name to all zeroes. + void Scan::nullMember(const char * indice, const size_t ind_len){ + if (getType() != DTSC_OBJ && getType() != DTSC_CON){return;} + char * i = p + 1; + //object, scan contents + while (i[0] + i[1] != 0 && i < p + len) { //while not encountering 0x0000 (we assume 0x0000EE) + if (i + 2 >= p + len) { + return;//out of packet! + } + uint16_t strlen = Bit::btohs(i); + i += 2; + if (ind_len == strlen && strncmp(indice, i, strlen) == 0) { + memset(i, 0, strlen); + return; + } + i = skipDTSC(i + strlen, p + len); + if (!i) {return;} + } + return; + } + /// Returns an object representing the named indice of this object. /// Returns an invalid object if this indice doesn't exist or this isn't an object type. bool Scan::hasMember(const std::string & indice) const{ @@ -1476,6 +1519,7 @@ namespace DTSC { ///\brief Creates an empty meta object Meta::Meta() { + nextIsKey = false; vod = false; live = false; version = DTSH_VERSION; @@ -1582,6 +1626,10 @@ namespace DTSC { } void Meta::update(long long packTime, long long packOffset, long long packTrack, long long packDataSize, uint64_t packBytePos, bool isKeyframe, long long packSendSize, unsigned long segment_size, const char * ivec){ + if (nextIsKey){ + isKeyframe = true; + nextIsKey = false; + } DONTEVEN_MSG("Updating meta with: t=%lld, o=%lld, s=%lld, t=%lld, p=%lld", packTime, packOffset, packDataSize, packTrack, packBytePos); if (!packSendSize){ //time and trackID are part of the 20-byte header. diff --git a/lib/stream.cpp b/lib/stream.cpp index 47ec83c6..31c86ce1 100644 --- a/lib/stream.cpp +++ b/lib/stream.cpp @@ -354,6 +354,8 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir filename = stream_cfg["source"].asStringRef(); } + bool hadOriginal = getenv("MIST_ORIGINAL_SOURCE"); + if (!hadOriginal){setenv("MIST_ORIGINAL_SOURCE", filename.c_str(), 1);} streamVariables(filename, streamname); const JSON::Value input = getInputBySource(filename, isProvider); if (!input){return false;} @@ -366,13 +368,15 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir if (!prm->isMember("option")){continue;} const std::string opt = (*prm)["option"].asStringRef(); // check for overrides - if (overrides.count(opt)){ - str_args[opt] = overrides.at(opt); + if (overrides.count(prm.key())){ + HIGH_MSG("Overriding option '%s' to '%s'", prm.key().c_str(), overrides.at(prm.key()).c_str()); + str_args[opt] = overrides.at(prm.key()); }else{ if (!stream_cfg.isMember(prm.key())){ FAIL_MSG("Required parameter %s for stream %s missing", prm.key().c_str(), streamname.c_str()); return false; } + HIGH_MSG("Setting option '%s' to '%s'", opt.c_str(), stream_cfg[prm.key()].asStringRef().c_str()); str_args[opt] = stream_cfg[opt].asStringRef(); } } @@ -383,10 +387,14 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir if (!prm->isMember("option")){continue;} const std::string opt = (*prm)["option"].asStringRef(); // check for overrides - if (overrides.count(opt)){ - str_args[opt] = overrides.at(opt); + if (overrides.count(prm.key())){ + HIGH_MSG("Overriding option '%s' to '%s'", prm.key().c_str(), overrides.at(prm.key()).c_str()); + str_args[opt] = overrides.at(prm.key()); }else{ - if (stream_cfg.isMember(prm.key())){str_args[opt] = stream_cfg[prm.key()].asStringRef();} + if (stream_cfg.isMember(prm.key()) && stream_cfg[prm.key()].asStringRef().size()){ + HIGH_MSG("Setting option '%s' to '%s'", opt.c_str(), stream_cfg[prm.key()].asStringRef().c_str()); + str_args[opt] = stream_cfg[prm.key()].asStringRef(); + } } if (!prm->isMember("type") && str_args.count(opt)){str_args[opt] = "";} } @@ -398,7 +406,6 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir } std::string player_bin = Util::getMyPath() + "MistIn" + input["name"].asStringRef(); - INFO_MSG("Starting %s -s %s %s", player_bin.c_str(), streamname.c_str(), filename.c_str()); char *argv[30] ={(char *)player_bin.c_str(), (char *)"-s", (char *)streamname.c_str(), (char *)filename.c_str()}; int argNum = 3; @@ -422,6 +429,7 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir pid = fork(); if (pid == -1){ FAIL_MSG("Forking process for stream %s failed: %s", streamname.c_str(), strerror(errno)); + if (!hadOriginal){unsetenv("MIST_ORIGINAL_SOURCE");} return false; } if (pid && overrides.count("singular")){ @@ -438,13 +446,19 @@ bool Util::startInput(std::string streamname, std::string filename, bool forkFir } Socket::Connection io(0, 1); io.drop(); - DONTEVEN_MSG("execvp"); + std::stringstream args; + for (size_t i = 0; i < 30; ++i){ + if (!argv[i] || !argv[i][0]){break;} + args << argv[i] << " "; + } + INFO_MSG("Starting %s", args.str().c_str()); execvp(argv[0], argv); - FAIL_MSG("Starting process %s for stream %s failed: %s", argv[0], streamname.c_str(), strerror(errno)); + FAIL_MSG("Starting process %s failed: %s", argv[0], strerror(errno)); _exit(42); }else if (spawn_pid != NULL){ *spawn_pid = pid; } + if (!hadOriginal){unsetenv("MIST_ORIGINAL_SOURCE");} unsigned int waiting = 0; while (!streamAlive(streamname) && ++waiting < 240){ @@ -947,7 +961,7 @@ std::set Util::getSupportedTracks(const DTSC::Meta &M, const JSON::Value if (found){break;} } if (!found){ - HIGH_MSG("Track %zu with codec %s not supported!", *it, codec.c_str()); + HIGH_MSG("Track %u with codec %s not supported!", it->first, codec.c_str()); continue; } } diff --git a/src/controller/controller_streams.cpp b/src/controller/controller_streams.cpp index 0bff01a1..0fcd2dac 100644 --- a/src/controller/controller_streams.cpp +++ b/src/controller/controller_streams.cpp @@ -124,6 +124,7 @@ namespace Controller { } //non-VoD stream if (URL.substr(0, 1) != "/"){return;} + Util::streamVariables(URL, name, ""); //VoD-style stream struct stat fileinfo; if (stat(URL.c_str(), &fileinfo) != 0){ diff --git a/src/input/input.cpp b/src/input/input.cpp index affced17..7ddb3f7a 100644 --- a/src/input/input.cpp +++ b/src/input/input.cpp @@ -247,7 +247,7 @@ namespace Mist { return 0; } - INFO_MSG("Booting input for stream %s", streamName.c_str()); + INFO_MSG("Input booting"); if (!checkArguments()) { FAIL_MSG("Setup failed - exiting"); @@ -410,7 +410,7 @@ namespace Mist { return 0; }else{ timer = Util::bootMS() - timer; - DEBUG_MSG(DLVL_DEVEL, "Read header for '%s' in %llums", streamName.c_str(), timer); + DEBUG_MSG(DLVL_DEVEL, "Read header for '%s' in %" PRIu64 "ms", streamName.c_str(), timer); } } if (myMeta.vod){ @@ -586,7 +586,7 @@ namespace Mist { /// - call getNext() in a loop, buffering packets void Input::stream(){ - if (Util::streamAlive(streamName)){ + if (!config->getBool("realtime") && Util::streamAlive(streamName)){ WARN_MSG("Stream already online, cancelling"); return; } @@ -596,7 +596,7 @@ namespace Mist { if(isSingular()){ overrides["singular"] = ""; } - if (config->getBool("realtime")){ + if (config->getBool("realtime") || (capa.isMember("hardcoded") && capa["hardcoded"].isMember("resume") && capa["hardcoded"]["resume"])){ overrides["resume"] = "1"; } if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer @@ -604,9 +604,6 @@ namespace Mist { return; } - - INFO_MSG("Input for stream %s started", streamName.c_str()); - if (!openStreamSource()){ FAIL_MSG("Unable to connect to source"); return; @@ -628,6 +625,7 @@ namespace Mist { timeOffset = 0; + uint64_t minFirstMs = 0; //If resume mode is on, find matching tracks and set timeOffset values to make sure we append to the tracks. if (config->getBool("realtime")){ @@ -655,24 +653,39 @@ namespace Mist { liveSem = 0; } DTSC::Meta tmpM(tmpMeta); - unsigned int minKeepAway = 0; + minFirstMs = 0xFFFFFFFFFFFFFFFFull; + uint64_t maxFirstMs = 0; + uint64_t minLastMs = 0xFFFFFFFFFFFFFFFFull; + uint64_t maxLastMs = 0; + + //track lowest firstms value for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); ++it){ - for (std::map::iterator secondIt = tmpM.tracks.begin(); secondIt != tmpM.tracks.end(); ++secondIt){ - if (it->second.codec == secondIt->second.codec && it->second.init == secondIt->second.init){ - timeOffset = std::max(timeOffset, (uint64_t)secondIt->second.lastms); - minKeepAway = std::max(minKeepAway, secondIt->second.minKeepAway); - } - } + if (it->second.firstms < minFirstMs){minFirstMs = it->second.firstms;} + if (it->second.firstms > maxFirstMs){maxFirstMs = it->second.firstms;} + if (it->second.lastms < minLastMs){minLastMs = it->second.lastms;} + if (it->second.lastms > maxLastMs){maxLastMs = it->second.lastms;} + } + if (maxFirstMs - minFirstMs > 500){ + WARN_MSG("Begin timings of tracks for this file are %" PRIu64 " ms apart. This may mess up playback to some degree. (Range: %" PRIu64 "ms - %" PRIu64 "ms)", maxFirstMs-minFirstMs, minFirstMs, maxFirstMs); + } + if (maxLastMs - minLastMs > 500){ + WARN_MSG("Stop timings of tracks for this file are %" PRIu64 " ms apart. This may mess up playback to some degree. (Range: %" PRIu64 "ms - %" PRIu64 "ms)", maxLastMs-minLastMs, minLastMs, maxLastMs); + } + //find highest current time + for (std::map::iterator secondIt = tmpM.tracks.begin(); secondIt != tmpM.tracks.end(); ++secondIt){ + timeOffset = std::max(timeOffset, (int64_t)secondIt->second.lastms); } if (timeOffset){ - timeOffset += 1000;//Add an artificial second to make sure we append and not overwrite + if (minFirstMs == 0xFFFFFFFFFFFFFFFFull){minFirstMs = 0;} + MEDIUM_MSG("Offset is %" PRId64 ", adding 1s and subtracting the start time of %" PRIu64, timeOffset, minFirstMs); + timeOffset += 1000;//Add an artificial frame at 25 FPS to make sure we append, not overwrite + timeOffset -= minFirstMs;//we don't need to add the lowest firstms value to the offset, as it's already there } } for (std::map::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){ - originalFirstms[it->first] = it->second.firstms; - it->second.firstms = timeOffset; + it->second.firstms += timeOffset; it->second.lastms = 0; selectedTracks.insert(it->first); it->second.minKeepAway = SIMULATED_LIVE_BUFFER; @@ -680,9 +693,7 @@ namespace Mist { nProxy.pagesByTrack.clear(); simStartTime = config->getInteger("simulated-starttime"); - if (!simStartTime){ - simStartTime = Util::bootMS(); - } + if (!simStartTime){simStartTime = Util::bootMS();} std::string reason; @@ -716,31 +727,26 @@ namespace Mist { std::string Input::realtimeMainLoop(){ getNext(); while (thisPacket && config->is_active && nProxy.userClient.isAlive()){ - while (config->is_active&& nProxy.userClient.isAlive() && Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]) + simStartTime){ - Util::sleep(std::min(((thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER), (uint64_t)1000)); + thisPacket.nullMember("bpos"); + while (config->is_active&& nProxy.userClient.isAlive() && Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset) + simStartTime){ + Util::sleep(std::min(((thisPacket.getTime() + timeOffset) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER), (uint64_t)1000)); nProxy.userClient.keepAlive(); } uint64_t originalTime = thisPacket.getTime(); - if (originalTime >= originalFirstms[thisPacket.getTrackId()]){ - if (timeOffset || originalFirstms[thisPacket.getTrackId()]){ - thisPacket.setTime(thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]); - } - nProxy.bufferLivePacket(thisPacket, myMeta); - if (timeOffset){ - thisPacket.setTime(originalTime); - } - } + thisPacket.setTime(originalTime + timeOffset); + nProxy.bufferLivePacket(thisPacket, myMeta); + thisPacket.setTime(originalTime); getNext(); nProxy.userClient.keepAlive(); } - if (!thisPacket){return "Invalid packet";} + if (!thisPacket){return "end of file";} if (!config->is_active){return "received deactivate signal";} if (!nProxy.userClient.isAlive()){return "buffer shutdown";} return "Unknown"; } void Input::finish() { - if (!standAlone){ + if (!standAlone || config->getBool("realtime")){ return; } for (std::map >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) { @@ -914,7 +920,7 @@ namespace Mist { VERYHIGH_MSG("Buffering stream %s, track %u, key %u", streamName.c_str(), track, keyNum); if (keyNum > myMeta.tracks[track].keys.size()) { //End of movie here, returning true to avoid various error messages - WARN_MSG("Key %llu is higher than total (%llu). Cancelling buffering.", keyNum, myMeta.tracks[track].keys.size()); + WARN_MSG("Key %u is higher than total (%zu). Cancelling buffering.", keyNum, myMeta.tracks[track].keys.size()); return true; } if (keyNum < 1) { @@ -981,7 +987,7 @@ namespace Mist { getNext(); //in case earlier seeking was inprecise, seek to the exact point while (thisPacket && thisPacket.getTime() < (unsigned long long)myMeta.tracks[track].keys[keyNum - 1].getTime()) { - DONTEVEN_MSG("Skipping packet: %d@%llu, %llub", track, thisPacket.getTime(), thisPacket.getDataLen()); + DONTEVEN_MSG("Skipping packet: %u@%" PRIu64 ", %zub", track, thisPacket.getTime(), thisPacket.getDataLen()); getNext(); } } @@ -1001,7 +1007,7 @@ namespace Mist { }else{ while (thisPacket && thisPacket.getTime() < stopTime) { if (thisPacket.getTime() >= lastBuffered){ - DONTEVEN_MSG("Buffering packet: %d@%llu, %llub", track, thisPacket.getTime(), thisPacket.getDataLen()); + DONTEVEN_MSG("Buffering packet: %u@%" PRIu64 ", %zub", track, thisPacket.getTime(), thisPacket.getDataLen()); bufferNext(thisPacket); ++packCounter; byteCounter += thisPacket.getDataLen(); @@ -1056,7 +1062,7 @@ namespace Mist { return false; } if (tmpdtsh.getMeta().version != DTSH_VERSION){ - INFO_MSG("Updating wrong version header file from version %llu to %llu", tmpdtsh.getMeta().version, DTSH_VERSION); + INFO_MSG("Updating wrong version header file from version %" PRIu16 " to %d", tmpdtsh.getMeta().version, DTSH_VERSION); return false; } myMeta = tmpdtsh.getMeta(); diff --git a/src/input/input.h b/src/input/input.h index baaa7f7c..d9c3f50a 100644 --- a/src/input/input.h +++ b/src/input/input.h @@ -34,7 +34,7 @@ namespace Mist { virtual bool readHeader() = 0; virtual bool needHeader(){return !readExistingHeader();} virtual bool preRun(){return true;} - virtual bool isSingular(){return true;} + virtual bool isSingular(){return !config->getBool("realtime");} virtual bool readExistingHeader(); virtual bool atKeyFrame(); virtual void getNext(bool smart = true) {} @@ -73,8 +73,7 @@ namespace Mist { JSON::Value capa; std::map > keyTimes; - uint64_t timeOffset; - std::map originalFirstms; + int64_t timeOffset; //Create server for user pages IPC::sharedServer userPage; diff --git a/src/input/input_buffer.cpp b/src/input/input_buffer.cpp index dee35772..5f25d2a7 100644 --- a/src/input/input_buffer.cpp +++ b/src/input/input_buffer.cpp @@ -960,6 +960,8 @@ namespace Mist { } lastUpdated[tNum] = Util::bootSecs(); while (tmpPack) { + //Make sure the first item on a page is always marked as key frame + myMeta.nextIsKey = !pageData.curOffset; //Update the metadata with this packet myMeta.update(tmpPack, segmentSize);/*LTS*/ //Set the first time when appropriate diff --git a/src/input/input_ebml.cpp b/src/input/input_ebml.cpp index 81c89093..2816e84c 100644 --- a/src/input/input_ebml.cpp +++ b/src/input/input_ebml.cpp @@ -84,12 +84,9 @@ namespace Mist{ } bool InputEBML::needsLock() { - //Standard input requires no lock, everything else does. - if (config->getString("input") != "-"){ - return true; - }else{ - return false; - } + //Standard input requires no lock, otherwise default behaviour. + if (config->getString("input") == "-"){return false;} + return Input::needsLock(); } bool InputEBML::preRun(){ diff --git a/src/input/input_playlist.cpp b/src/input/input_playlist.cpp index 157d5b09..d8604698 100644 --- a/src/input/input_playlist.cpp +++ b/src/input/input_playlist.cpp @@ -12,14 +12,12 @@ namespace Mist { capa["name"] = "Playlist"; capa["desc"] = "Enables Playlist Input"; capa["source_match"] = "*.pls"; + capa["always_match"] = "*.pls"; + capa["variables_match"] = "*.pls"; capa["priority"] = 9; - capa["hardcoded"]["resume"] = 1; - capa["hardcoded"]["always_on"] = 1; - playlistIndex = 0xFFFFFFFEull;//Not FFFFFFFF on purpose! - seenValidEntry = true; } bool inputPlaylist::checkArguments(){ @@ -41,55 +39,36 @@ namespace Mist { return true; } - void inputPlaylist::stream(){ - IPC::semaphore playlistLock; - playlistLock.open(std::string("/MstPlaylist_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1); - if (!playlistLock){ - FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str()); - return; - } - if (!playlistLock.tryWait()){ - WARN_MSG("A pull process for stream %s is already running", streamName.c_str()); - playlistLock.close(); - return; - } - - std::map overrides; - overrides["resume"] = "1"; - if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer - playlistLock.post(); - playlistLock.close(); - playlistLock.unlink(); - WARN_MSG("Could not start buffer, cancelling"); - return; - } - - char userPageName[NAME_BUFFER_SIZE]; - snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str()); - nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true); - nProxy.userClient.countAsViewer = false; - + std::string inputPlaylist::streamMainLoop(){ + bool seenValidEntry = true; uint64_t startTime = Util::bootMS(); - while (config->is_active && nProxy.userClient.isAlive()){ + struct tm * wTime; + time_t nowTime = time(0); + wTime = localtime(&nowTime); + wallTime = wTime->tm_hour*60+wTime->tm_min; nProxy.userClient.keepAlive(); reloadPlaylist(); if (!playlist.size()){ - playlistLock.post(); - playlistLock.close(); - playlistLock.unlink(); - WARN_MSG("No entries in playlist, exiting"); - break; + return "No entries in playlist"; } ++playlistIndex; if (playlistIndex >= playlist.size()){ if (!seenValidEntry){ - HIGH_MSG("Parsed entire playlist without seeing a valid entry, wait a second for any entry to become available"); + HIGH_MSG("Parsed entire playlist without seeing a valid entry, waiting for any entry to become available"); Util::sleep(1000); } playlistIndex = 0; seenValidEntry = false; } + if (minIndex != std::string::npos && playlistIndex < minIndex){ + INFO_MSG("Clipping playlist index from %zu to %zu to stay within playback timing schedule", playlistIndex, minIndex); + playlistIndex = minIndex; + } + if (maxIndex != std::string::npos && playlistIndex > maxIndex){ + INFO_MSG("Clipping playlist index from %zu to %zu to stay within playback timing schedule", playlistIndex, maxIndex); + playlistIndex = maxIndex; + } currentSource = playlist.at(playlistIndex); std::map overrides; @@ -107,6 +86,7 @@ namespace Mist { srcPath = std::string(workingDir) + "/" + srcPath; } free(workingDir); + Util::streamVariables(srcPath, streamName, ""); struct stat statRes; if (stat(srcPath.c_str(), &statRes)){ @@ -118,39 +98,80 @@ namespace Mist { continue; } pid_t spawn_pid = 0; - if (!Util::startInput(streamName, srcPath, true, true, overrides, &spawn_pid)) {//manually override stream url to start the correct input + //manually override stream url to start the correct input + if (!Util::startInput(streamName, srcPath, true, true, overrides, &spawn_pid)){ FAIL_MSG("Could not start input for source %s", srcPath.c_str()); continue; } seenValidEntry = true; while (Util::Procs::isRunning(spawn_pid) && nProxy.userClient.isAlive() && config->is_active){ Util::sleep(1000); + if (reloadOn != 0xFFFF){ + time_t nowTime = time(0); + wTime = localtime(&nowTime); + wallTime = wTime->tm_hour*60+wTime->tm_min; + if (wallTime >= reloadOn){ + reloadPlaylist(); + } + if ((minIndex != std::string::npos && playlistIndex < minIndex) || (maxIndex != std::string::npos && playlistIndex > maxIndex)){ + INFO_MSG("Killing current playback to stay within min/max playlist entry for current time of day"); + Util::Procs::Stop(spawn_pid); + } + } nProxy.userClient.keepAlive(); } if (!config->is_active && Util::Procs::isRunning(spawn_pid)){ Util::Procs::Stop(spawn_pid); } } - playlistLock.post(); - playlistLock.close(); - playlistLock.unlink(); - - nProxy.userClient.finish(); + if (!config->is_active){return "received deactivate signal";} + if (!nProxy.userClient.isAlive()){return "buffer shutdown";} + return "Unknown"; } void inputPlaylist::reloadPlaylist(){ - std::string playlistFile = config->getString("input"); + minIndex = std::string::npos; + maxIndex = std::string::npos; + std::string playlistFile; + char * origSource = getenv("MIST_ORIGINAL_SOURCE"); + if (origSource){ + playlistFile = origSource; + }else{ + playlistFile = config->getString("input"); + } + MEDIUM_MSG("Reloading playlist '%s'", playlistFile.c_str()); + Util::streamVariables(playlistFile, streamName, playlistFile); std::ifstream inFile(playlistFile.c_str()); if (!inFile.good()){ - WARN_MSG("Unable to open playlist, aborting reload!"); + WARN_MSG("Unable to open playlist '%s', aborting reload!", playlistFile.c_str()); return; } std::string line; + uint16_t plsStartTime = 0xFFFF; + reloadOn = 0xFFFF; playlist.clear(); + playlist_startTime.clear(); while (inFile.good()){ std::getline(inFile, line); if (inFile.good() && line.size() && line.at(0) != '#'){ playlist.push_back(line); + playlist_startTime.push_back(plsStartTime); + if (plsStartTime != 0xFFFF){ + //If the newest entry has a time under the current time, we know we should never play earlier than this + if (plsStartTime <= wallTime){minIndex = playlist.size() - 1;} + //If the newest entry has a time above the current time, we know we should never play it + if (plsStartTime > wallTime && maxIndex == std::string::npos){ + maxIndex = playlist.size() - 2; + reloadOn = plsStartTime; + } + HIGH_MSG("Start %s on %d (min: %zu, max: %zu)", line.c_str(), plsStartTime, minIndex, maxIndex); + } + plsStartTime = 0xFFFF; + }else{ + if (line.size() > 13 && line.at(0) == '#' && line.substr(0, 13) == "#X-STARTTIME:"){ + int hour, min; + if (sscanf(line.c_str()+13, "%d:%d", &hour, &min) == 2){plsStartTime = hour*60+min;} + } } } inFile.close(); diff --git a/src/input/input_playlist.h b/src/input/input_playlist.h index 582e14ec..1e19fb20 100644 --- a/src/input/input_playlist.h +++ b/src/input/input_playlist.h @@ -10,14 +10,19 @@ namespace Mist { protected: bool checkArguments(); bool readHeader() { return true; } - void stream(); + virtual void parseStreamHeader() {myMeta.tracks[1].codec = "PLACEHOLDER";} + std::string streamMainLoop(); virtual bool needHeader(){return false;} private: void reloadPlaylist(); std::deque playlist; + std::deque playlist_startTime; std::string currentSource; size_t playlistIndex; + size_t minIndex, maxIndex; bool seenValidEntry; + uint32_t wallTime; + uint32_t reloadOn; }; }