Playlist support

This commit is contained in:
Erik Zandvliet 2017-11-07 10:07:38 +01:00 committed by Thulinma
parent 7beea43d31
commit 907be3b1f4
12 changed files with 544 additions and 29 deletions

View file

@ -71,7 +71,28 @@ namespace Mist {
capa["optional"]["verimatrix-playready"]["option"] = "--verimatrix-playready";
capa["optional"]["verimatrix-playready"]["type"] = "str";
capa["optional"]["verimatrix-playready"]["default"] = "";
option.null();
option["long"] = "realtime";
option["short"] = "r";
option["help"] = "Feed the results of this input in realtime to the buffer";
config->addOption("realtime", option);
capa["optional"]["realtime"]["name"] = "Simulated Live";
capa["optional"]["realtime"]["help"] = "Make this input run as a simulated live stream";
capa["optional"]["realtime"]["option"] = "--realtime";
option.null();
option["long"] = "simulated-starttime";
option["arg"] = "integer";
option["short"] = "S";
option["help"] = "Unix timestamp on which the simulated start of the stream is based.";
option["value"].append(0);
config->addOption("simulated-starttime", option);
capa["optional"]["simulated-starttime"]["name"] = "Simulated start time";
capa["optional"]["simulated-starttime"]["help"] = "The unix timestamp on which this stream is assumed to have started playback, or 0 for automatic";
capa["optional"]["simulated-starttime"]["option"] = "--simulated-starttime";
capa["optional"]["simulated-starttime"]["type"] = "uint";
capa["optional"]["simulated-starttime"]["default"] = 0;
/*LTS-END*/
capa["optional"]["debug"]["name"] = "debug";
@ -79,6 +100,7 @@ namespace Mist {
capa["optional"]["debug"]["option"] = "--debug";
capa["optional"]["debug"]["type"] = "debug";
packTime = 0;
lastActive = Util::epoch();
playing = 0;
@ -574,7 +596,9 @@ namespace Mist {
if(isSingular()){
overrides["singular"] = "";
}
if (config->getBool("realtime")){
overrides["resume"] = "1";
}
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer
WARN_MSG("Could not start buffer, cancelling");
return;
@ -602,13 +626,71 @@ namespace Mist {
return;
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
it->second.firstms = 0;
it->second.lastms = 0;
selectedTracks.insert(it->first);
timeOffset = 0;
//If resume mode is on, find matching tracks and set timeOffset values to make sure we append to the tracks.
if (config->getBool("realtime")){
seek(0);
char nameBuf[NAME_BUFFER_SIZE];
snprintf(nameBuf, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
IPC::sharedPage curMeta(nameBuf);
static char liveSemName[NAME_BUFFER_SIZE];
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore * liveSem = new IPC::semaphore(liveSemName, O_RDWR, ACCESSPERMS, 1, !myMeta.live);
if (*liveSem){
liveSem->wait();
}else{
delete liveSem;
liveSem = 0;
}
DTSC::Packet tmpMeta(curMeta.mapped, curMeta.len, true);
if (liveSem){
liveSem->post();
delete liveSem;
liveSem = 0;
}
DTSC::Meta tmpM(tmpMeta);
unsigned int minKeepAway = 0;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); ++it){
for (std::map<unsigned int, DTSC::Track>::iterator secondIt = tmpM.tracks.begin(); secondIt != tmpM.tracks.end(); ++secondIt){
if (it->second.codec == secondIt->second.codec && it->second.init == secondIt->second.init){
timeOffset = std::max(timeOffset, (uint64_t)secondIt->second.lastms);
minKeepAway = std::max(minKeepAway, secondIt->second.minKeepAway);
}
}
}
if (timeOffset){
timeOffset += 1000;//Add an artificial second to make sure we append and not overwrite
}
}
std::string reason = streamMainLoop();
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
originalFirstms[it->first] = it->second.firstms;
it->second.firstms = timeOffset;
it->second.lastms = 0;
selectedTracks.insert(it->first);
it->second.minKeepAway = SIMULATED_LIVE_BUFFER;
}
nProxy.pagesByTrack.clear();
simStartTime = config->getInteger("simulated-starttime");
if (!simStartTime){
simStartTime = Util::bootMS();
}
std::string reason;
if (config->getBool("realtime")){
reason = realtimeMainLoop();
}else{
reason = streamMainLoop();
}
closeStreamSource();
@ -631,17 +713,44 @@ namespace Mist {
return "Unknown";
}
std::string Input::realtimeMainLoop(){
getNext();
while (thisPacket && config->is_active && nProxy.userClient.isAlive()){
while (config->is_active&& nProxy.userClient.isAlive() && Util::bootMS() + SIMULATED_LIVE_BUFFER < (thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]) + simStartTime){
Util::sleep(std::min(((thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]) + simStartTime) - (Util::getMS() + SIMULATED_LIVE_BUFFER), (uint64_t)1000));
nProxy.userClient.keepAlive();
}
uint64_t originalTime = thisPacket.getTime();
if (originalTime >= originalFirstms[thisPacket.getTrackId()]){
if (timeOffset || originalFirstms[thisPacket.getTrackId()]){
thisPacket.setTime(thisPacket.getTime() + timeOffset - originalFirstms[thisPacket.getTrackId()]);
}
nProxy.bufferLivePacket(thisPacket, myMeta);
if (timeOffset){
thisPacket.setTime(originalTime);
}
}
getNext();
nProxy.userClient.keepAlive();
}
if (!thisPacket){return "Invalid packet";}
if (!config->is_active){return "received deactivate signal";}
if (!nProxy.userClient.isAlive()){return "buffer shutdown";}
return "Unknown";
}
void Input::finish() {
if (!standAlone){
return;
}
for (std::map<unsigned int, std::map<unsigned int, unsigned int> >::iterator it = pageCounter.begin(); it != pageCounter.end(); it++) {
for (std::map<unsigned int, unsigned int>::iterator it2 = it->second.begin(); it2 != it->second.end(); it2++) {
it2->second = 1;
}
}
removeUnused();
if (standAlone) {
for (std::map<unsigned long, IPC::sharedPage>::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); it++) {
it->second.master = true;
}
for (std::map<unsigned long, IPC::sharedPage>::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); it++) {
it->second.master = true;
}
}

View file

@ -5,6 +5,7 @@
#include <mist/json.h>
#include <mist/timing.h>
#include <mist/dtsc.h>
#include <mist/defines.h>
#include <mist/shared_memory.h>
#include <fstream>
@ -25,10 +26,8 @@ namespace Mist {
virtual int boot(int argc, char * argv[]);
virtual ~Input() {};
virtual bool needsLock(){return true;}
static Util::Config * config;
virtual bool needsLock(){return !config->getBool("realtime");}
protected:
static void callbackWrapper(char * data, size_t len, unsigned int id);
virtual bool checkArguments() = 0;
@ -42,7 +41,7 @@ namespace Mist {
virtual void seek(int seekTime){};
virtual void finish();
virtual bool keepRunning();
virtual bool openStreamSource() { return false; }
virtual bool openStreamSource() { return readHeader(); }
virtual void closeStreamSource() {}
virtual void parseStreamHeader() {}
void play(int until = 0);
@ -56,6 +55,7 @@ namespace Mist {
virtual void serve();
virtual void stream();
virtual std::string streamMainLoop();
virtual std::string realtimeMainLoop();
bool isAlwaysOn();
virtual void parseHeader();
@ -73,6 +73,8 @@ namespace Mist {
JSON::Value capa;
std::map<int,std::set<int> > keyTimes;
uint64_t timeOffset;
std::map<int, uint64_t> originalFirstms;
//Create server for user pages
IPC::sharedServer userPage;
@ -89,6 +91,8 @@ namespace Mist {
void readSrtHeader();
void getNextSrt(bool smart = true);
DTSC::Packet srtPack;
uint64_t simStartTime;
};
}

View file

@ -25,6 +25,9 @@
namespace Mist {
inputBuffer::inputBuffer(Util::Config * cfg) : Input(cfg) {
capa["optional"].removeMember("realtime");
liveMeta = 0;
capa["name"] = "Buffer";
JSON::Value option;
@ -453,6 +456,9 @@ namespace Mist {
eraseTrackDataPages(*it);
}
}
for (std::map<unsigned long, IPC::sharedPage>::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); ++it){
it->second.master = true;
}
}
/// \triggers
@ -637,12 +643,28 @@ namespace Mist {
}
if (activeTracks.count(value)) {
updateMeta();
eraseTrackDataPages(value);
activeTracks.erase(value);
bufferLocations.erase(value);
if (!config->getBool("resume")){
bufferLocations.erase(value);
eraseTrackDataPages(value);
}else{
//finalize key count on page. We can NOT do this through bufferFinalize, as this triggers side effects....
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(nProxy.metaPages[value].mapped + (i * 8));
int keyNum = ntohl(tmpOffset[0]);
int keyAmount = ntohl(tmpOffset[1]);
if(keyAmount == 1000){
tmpOffset[1] = htonl(myMeta.tracks[value].keys.rbegin()->getNumber() - keyNum + 1);
break;
}
}
}
}
if (!config->getBool("resume")){
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
}
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
continue;
}
}
@ -792,8 +814,8 @@ namespace Mist {
if (finalMap == -1) {
//No collision has been detected, assign a new final number
finalMap = (myMeta.tracks.size() ? myMeta.tracks.rbegin()->first : 0) + 1;
DEBUG_MSG(DLVL_DEVEL, "No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap);
/*LTS-START*/
MEDIUM_MSG("No colision detected for temporary track %lu from user %u, assigning final track number %lu", value, id, finalMap);
/*LTS-START*/
if (Triggers::shouldTrigger("STREAM_TRACK_ADD")) {
std::string payload = config->getString("streamname") + "\n" + JSON::Value(finalMap).asString() + "\n";
Triggers::doTrigger("STREAM_TRACK_ADD", payload, config->getString("streamname"));
@ -804,13 +826,16 @@ namespace Mist {
//or if the firstms of the replacement track is later than the lastms on the existing track
if (!myMeta.tracks.count(finalMap) || trackMeta.tracks.find(value)->second.keys.size() > 1 || trackMeta.tracks.find(value)->second.firstms >= myMeta.tracks[finalMap].lastms) {
if (myMeta.tracks.count(finalMap) && myMeta.tracks[finalMap].lastms > 0) {
INFO_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
MEDIUM_MSG("Resume of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
} else {
INFO_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id);
MEDIUM_MSG("New track detected, assigned track id %lu, coming from temporary track %lu of user %u", finalMap, value, id);
if (resumeMode && (myMeta.bufferWindow > 15000)){
WARN_MSG("Non-resumed track detected; playback will likely not be correct");
}
}
} else {
//Otherwise replace existing track
INFO_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
MEDIUM_MSG("Replacement of track %lu detected, coming from temporary track %lu of user %u", finalMap, value, id);
myMeta.tracks.erase(finalMap);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
updateMeta();
@ -837,7 +862,11 @@ namespace Mist {
//Write the final mapped track number and keyframe number to the user page element
//This is used to resume pushing as well as pushing new tracks
userConn.setTrackId(index, finalMap);
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size());
if (myMeta.tracks[finalMap].keys.size()){
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.rbegin()->getNumber());
}else{
userConn.setKeynum(index, 0);
}
//Update the metadata to reflect all changes
updateMeta();
}
@ -945,6 +974,7 @@ namespace Mist {
}
bool inputBuffer::preRun() {
//This function gets run periodically to make sure runtime updates of the config get parsed.
lastReTime = Util::epoch(); /*LTS*/
std::string strName = config->getString("streamname");
Util::sanitizeName(strName);

View file

@ -0,0 +1,160 @@
#include "input_playlist.h"
#include <algorithm>
#include <mist/stream.h>
#include <mist/procs.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
namespace Mist {
inputPlaylist::inputPlaylist(Util::Config * cfg) : Input(cfg) {
capa["name"] = "Playlist";
capa["desc"] = "Enables Playlist Input";
capa["source_match"] = "*.pls";
capa["priority"] = 9;
capa["hardcoded"]["resume"] = 1;
capa["hardcoded"]["always_on"] = 1;
playlistIndex = 0xFFFFFFFEull;//Not FFFFFFFF on purpose!
seenValidEntry = true;
}
bool inputPlaylist::checkArguments(){
if (config->getString("input") == "-") {
std::cerr << "Input from stdin not supported" << std::endl;
return false;
}
if (!config->getString("streamname").size()){
if (config->getString("output") == "-") {
std::cerr << "Output to stdout not supported" << std::endl;
return false;
}
}else{
if (config->getString("output") != "-") {
std::cerr << "File output not supported" << std::endl;
return false;
}
}
return true;
}
void inputPlaylist::stream(){
IPC::semaphore playlistLock;
playlistLock.open(std::string("/MstPlaylist_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playlistLock){
FAIL_MSG("Could not open pull lock for stream '%s' - aborting!", streamName.c_str());
return;
}
if (!playlistLock.tryWait()){
WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
playlistLock.close();
return;
}
std::map<std::string, std::string> overrides;
overrides["resume"] = "1";
if (!Util::startInput(streamName, "push://INTERNAL_ONLY:"+config->getString("input"), true, true, overrides)) {//manually override stream url to start the buffer
playlistLock.post();
playlistLock.close();
playlistLock.unlink();
WARN_MSG("Could not start buffer, cancelling");
return;
}
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
nProxy.userClient.countAsViewer = false;
uint64_t startTime = Util::bootMS();
while (config->is_active && nProxy.userClient.isAlive()){
nProxy.userClient.keepAlive();
reloadPlaylist();
if (!playlist.size()){
playlistLock.post();
playlistLock.close();
playlistLock.unlink();
WARN_MSG("No entries in playlist, exiting");
break;
}
++playlistIndex;
if (playlistIndex >= playlist.size()){
if (!seenValidEntry){
HIGH_MSG("Parsed entire playlist without seeing a valid entry, wait a second for any entry to become available");
Util::sleep(1000);
}
playlistIndex = 0;
seenValidEntry = false;
}
currentSource = playlist.at(playlistIndex);
std::map<std::string, std::string> overrides;
overrides["realtime"] = "1";
overrides["alwaysStart"] = "";//Just making this value "available" is enough
overrides["simulated-starttime"] = JSON::Value(startTime).asString();
std::string srcPath = config->getString("input");
if ((currentSource.size() && currentSource[0] == '/') || srcPath.rfind('/') == std::string::npos){
srcPath = currentSource;
} else {
srcPath = srcPath.substr(0, srcPath.rfind("/") + 1) + currentSource;
}
char * workingDir = getcwd(NULL, 0);
if (srcPath[0] != '/'){
srcPath = std::string(workingDir) + "/" + srcPath;
}
free(workingDir);
struct stat statRes;
if (stat(srcPath.c_str(), &statRes)){
FAIL_MSG("%s does not exist on the system, skipping it.", srcPath.c_str());
continue;
}
if ((statRes.st_mode & S_IFMT) != S_IFREG){
FAIL_MSG("%s is not a valid file, skipping it.", srcPath.c_str());
continue;
}
pid_t spawn_pid = 0;
if (!Util::startInput(streamName, srcPath, true, true, overrides, &spawn_pid)) {//manually override stream url to start the correct input
FAIL_MSG("Could not start input for source %s", srcPath.c_str());
continue;
}
seenValidEntry = true;
while (Util::Procs::isRunning(spawn_pid) && nProxy.userClient.isAlive() && config->is_active){
Util::sleep(1000);
nProxy.userClient.keepAlive();
}
if (!config->is_active && Util::Procs::isRunning(spawn_pid)){
Util::Procs::Stop(spawn_pid);
}
}
playlistLock.post();
playlistLock.close();
playlistLock.unlink();
nProxy.userClient.finish();
}
void inputPlaylist::reloadPlaylist(){
std::string playlistFile = config->getString("input");
std::ifstream inFile(playlistFile.c_str());
if (!inFile.good()){
WARN_MSG("Unable to open playlist, aborting reload!");
return;
}
std::string line;
playlist.clear();
while (inFile.good()){
std::getline(inFile, line);
if (inFile.good() && line.size() && line.at(0) != '#'){
playlist.push_back(line);
}
}
inFile.close();
}
}

View file

@ -0,0 +1,25 @@
#include "input.h"
#include <mist/dtsc.h>
#include <deque>
namespace Mist {
class inputPlaylist : public Input {
public:
inputPlaylist(Util::Config * cfg);
bool needsLock(){return false;}
protected:
bool checkArguments();
bool readHeader() { return true; }
void stream();
virtual bool needHeader(){return false;}
private:
void reloadPlaylist();
std::deque<std::string> playlist;
std::string currentSource;
size_t playlistIndex;
bool seenValidEntry;
};
}
typedef Mist::inputPlaylist mistIn;

2
src/input/input_ts.cpp Executable file → Normal file
View file

@ -623,7 +623,7 @@ namespace Mist {
//otherwise, check input param
const std::string & inpt = config->getString("input");
if (inpt.size() && inpt != "-" && inpt.substr(0,9) != "stream://" && inpt.substr(0,8) != "tsudp://" && inpt.substr(0, 8) != "ts-exec:" && inpt.substr(0, 6) != "srt://" && inpt.substr(0, 7) != "http://" && inpt.substr(0, 10) != "http-ts://" && inpt.substr(0, 8) != "https://" && inpt.substr(0, 11) != "https-ts://"){
return true;
return Input::needsLock();
}else{
return false;
}