Backported various little edits from Pro edition.

This commit is contained in:
Thulinma 2016-05-30 15:17:54 +02:00
parent ef9938da0c
commit 4c9c6fa7ba
78 changed files with 2334 additions and 1266 deletions

View file

@ -2,6 +2,7 @@
#include <fcntl.h>
#include <sys/stat.h>
#include <mist/stream.h>
#include <mist/defines.h>
#include "input.h"
#include <sstream>
@ -69,7 +70,7 @@ namespace Mist {
}
void Input::checkHeaderTimes(std::string streamFile){
if ( streamFile == "-" ){
if (streamFile == "-" || streamFile == "push://") {
return;
}
std::string headerFile = streamFile + ".dtsh";
@ -99,15 +100,22 @@ namespace Mist {
}
int Input::run() {
streamName = config->getString("streamname");
if (config->getBool("json")) {
std::cout << capa.toString() << std::endl;
return 0;
}
if (streamName != "") {
config->getOption("streamname") = streamName;
}
streamName = config->getString("streamname");
nProxy.streamName = streamName;
if (!setup()){
std::cerr << config->getString("cmd") << " setup failed." << std::endl;
return 0;
}
checkHeaderTimes(config->getString("input"));
if (!readHeader()){
std::cerr << "Reading header for " << config->getString("input") << " failed." << std::endl;
@ -115,7 +123,7 @@ namespace Mist {
}
parseHeader();
if (!config->getString("streamname").size()){
if (!streamName.size()) {
convert();
}else{
serve();
@ -155,28 +163,32 @@ namespace Mist {
}
void Input::serve(){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userPage.init(userPageName, PLAY_EX_SIZE, true);
if (!isBuffer){
for (std::map<unsigned int,DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
bufferFrame(it->first, 1);
}
}
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
userPage.init(userPageName, PLAY_EX_SIZE, true);
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s started", streamName.c_str());
long long int activityCounter = Util::bootSecs();
while ((Util::bootSecs() - activityCounter) < 10 && config->is_active){//10 second timeout
Util::wait(1000);
removeUnused();
while ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT && config->is_active) { //15 second timeout
userPage.parseEach(callbackWrapper);
if (userPage.amount){
removeUnused();
if (userPage.connectedUsers) {
if (myMeta.tracks.size()){
activityCounter = Util::bootSecs();
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.amount);
}
DEBUG_MSG(DLVL_INSANE, "Connected users: %d", userPage.connectedUsers);
}else{
DEBUG_MSG(DLVL_INSANE, "Timer running");
}
if (config->is_active){
Util::wait(1000);
}
}
finish();
DEBUG_MSG(DLVL_DEVEL,"Input for stream %s closing clean", streamName.c_str());
@ -191,7 +203,7 @@ namespace Mist {
}
removeUnused();
if (standAlone){
for (std::map<unsigned long, IPC::sharedPage>::iterator it = metaPages.begin(); it != metaPages.end(); it++){
for (std::map<unsigned long, IPC::sharedPage>::iterator it = nProxy.metaPages.begin(); it != nProxy.metaPages.end(); it++) {
it->second.master = true;
}
}
@ -210,9 +222,9 @@ namespace Mist {
bufferRemove(it->first, it2->first);
pageCounter[it->first].erase(it2->first);
for (int i = 0; i < 8192; i += 8){
unsigned int thisKeyNum = ntohl(((((long long int *)(metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
unsigned int thisKeyNum = ntohl(((((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) >> 32) & 0xFFFFFFFF);
if (thisKeyNum == it2->first){
(((long long int *)(metaPages[it->first].mapped + i))[0]) = 0;
(((long long int *)(nProxy.metaPages[it->first].mapped + i))[0]) = 0;
}
}
change = true;
@ -253,13 +265,13 @@ namespace Mist {
for (int i = 0; i < it->second.keys.size(); i++){
if (newData){
//i+1 because keys are 1-indexed
pagesByTrack[it->first][i+1].firstTime = it->second.keys[i].getTime();
nProxy.pagesByTrack[it->first][i + 1].firstTime = it->second.keys[i].getTime();
newData = false;
}
pagesByTrack[it->first].rbegin()->second.keyNum++;
pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts();
pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i];
if (pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE){
nProxy.pagesByTrack[it->first].rbegin()->second.keyNum++;
nProxy.pagesByTrack[it->first].rbegin()->second.partNum += it->second.keys[i].getParts();
nProxy.pagesByTrack[it->first].rbegin()->second.dataSize += it->second.keySizes[i];
if (nProxy.pagesByTrack[it->first].rbegin()->second.dataSize > FLIP_DATA_PAGE_SIZE) {
newData = true;
}
}
@ -292,7 +304,7 @@ namespace Mist {
}
if (myMeta.tracks[tid].keys[bookKeeping[tid].curKey].getParts() + 1 == curData[tid].partNum){
if (curData[tid].dataSize > FLIP_DATA_PAGE_SIZE) {
pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
nProxy.pagesByTrack[tid][bookKeeping[tid].first] = curData[tid];
bookKeeping[tid].first += curData[tid].keyNum;
curData[tid].keyNum = 0;
curData[tid].dataSize = 0;
@ -309,17 +321,17 @@ namespace Mist {
getNext(false);
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (curData.count(it->first) && !pagesByTrack[it->first].count(bookKeeping[it->first].first)){
pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first];
if (curData.count(it->first) && !nProxy.pagesByTrack[it->first].count(bookKeeping[it->first].first)) {
nProxy.pagesByTrack[it->first][bookKeeping[it->first].first] = curData[it->first];
}
}
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!pagesByTrack.count(it->first)){
if (!nProxy.pagesByTrack.count(it->first)) {
DEBUG_MSG(DLVL_WARN, "No pages for track %d found", it->first);
}else{
DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), pagesByTrack[it->first].size());
for (std::map<unsigned long, DTSCPageData>::iterator it2 = pagesByTrack[it->first].begin(); it2 != pagesByTrack[it->first].end(); it2++){
DEBUG_MSG(DLVL_MEDIUM, "Track %d (%s) split into %lu pages", it->first, myMeta.tracks[it->first].codec.c_str(), nProxy.pagesByTrack[it->first].size());
for (std::map<unsigned long, DTSCPageData>::iterator it2 = nProxy.pagesByTrack[it->first].begin(); it2 != nProxy.pagesByTrack[it->first].end(); it2++) {
DEBUG_MSG(DLVL_VERYHIGH, "Page %lu-%lu, (%llu bytes)", it2->first, it2->first + it2->second.keyNum - 1, it2->second.dataSize);
}
}
@ -328,29 +340,38 @@ namespace Mist {
bool Input::bufferFrame(unsigned int track, unsigned int keyNum){
VERYHIGH_MSG("bufferFrame for stream %s, track %u, key %u", streamName.c_str(), track, keyNum);
VERYHIGH_MSG("Buffering stream %s, track %u, key %u", streamName.c_str(), track, keyNum);
if (keyNum > myMeta.tracks[track].keys.size()){
//End of movie here, returning true to avoid various error messages
VERYHIGH_MSG("Key number is higher than total key count. Cancelling bufferFrame");
WARN_MSG("Key %llu is higher than total (%llu). Cancelling buffering.", keyNum, myMeta.tracks[track].keys.size());
return true;
}
if (keyNum < 1){keyNum = 1;}
//abort in case already buffered
int pageNumber = bufferedOnPage(track, keyNum);
if (pageNumber){
if (keyNum < 1) {
keyNum = 1;
}
if (nProxy.isBuffered(track, keyNum)) {
//get corresponding page number
int pageNumber = 0;
for (std::map<unsigned long, DTSCPageData>::iterator it = nProxy.pagesByTrack[track].begin(); it != nProxy.pagesByTrack[track].end(); it++) {
if (it->first <= keyNum) {
pageNumber = it->first;
} else {
break;
}
}
pageCounter[track][pageNumber] = 15;
VERYHIGH_MSG("Track %u, key %u is already buffered in page %d. Cancelling bufferFrame", track, keyNum, pageNumber);
return true;
}
if (!pagesByTrack.count(track)){
if (!nProxy.pagesByTrack.count(track)) {
WARN_MSG("No pages for track %u found! Cancelling bufferFrame", track);
return false;
}
//Update keynum to point to the corresponding page
INFO_MSG("Loading key %u from page %lu", keyNum, (--(pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(pagesByTrack[track].upper_bound(keyNum)))->first;
INFO_MSG("Loading key %u from page %lu", keyNum, (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first);
keyNum = (--(nProxy.pagesByTrack[track].upper_bound(keyNum)))->first;
if (!bufferStart(track, keyNum)){
WARN_MSG("bufferStart failed! Cancelling bufferFrame", track);
WARN_MSG("bufferStart failed! Cancelling bufferFrame");
return false;
}
@ -359,8 +380,8 @@ namespace Mist {
trackSelect(trackSpec.str());
seek(myMeta.tracks[track].keys[keyNum - 1].getTime());
long long unsigned int stopTime = myMeta.tracks[track].lastms + 1;
if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + pagesByTrack[track][keyNum].keyNum){
stopTime = myMeta.tracks[track].keys[keyNum - 1 + pagesByTrack[track][keyNum].keyNum].getTime();
if ((int)myMeta.tracks[track].keys.size() > keyNum - 1 + nProxy.pagesByTrack[track][keyNum].keyNum) {
stopTime = myMeta.tracks[track].keys[keyNum - 1 + nProxy.pagesByTrack[track][keyNum].keyNum].getTime();
}
DEBUG_MSG(DLVL_HIGH, "Playing from %llu to %llu", myMeta.tracks[track].keys[keyNum - 1].getTime(), stopTime);
getNext();

View file

@ -20,7 +20,11 @@ namespace Mist {
public:
Input(Util::Config * cfg);
virtual int run();
virtual void onCrash(){}
virtual void argumentsParsed(){}
virtual ~Input() {};
virtual bool needsLock(){return true;}
protected:
static void callbackWrapper(char * data, size_t len, unsigned int id);
virtual bool setup() = 0;
@ -29,6 +33,9 @@ namespace Mist {
virtual void getNext(bool smart = true) {};
virtual void seek(int seekTime){};
virtual void finish();
virtual bool openStreamSource() { return false; };
virtual void closeStreamSource() {};
virtual void parseStreamHeader() {};
void play(int until = 0);
void playOnce();
void quitPlay();
@ -36,11 +43,11 @@ namespace Mist {
virtual void removeUnused();
virtual void trackSelect(std::string trackSpec){};
virtual void userCallback(char * data, size_t len, unsigned int id);
void serve();
void convert();
virtual void convert();
virtual void serve();
void parseHeader();
virtual void parseHeader();
bool bufferFrame(unsigned int track, unsigned int keyNum);
unsigned int packTime;///Media-timestamp of the last packet.

View file

@ -13,7 +13,7 @@
#include "input_buffer.h"
#ifndef TIMEOUTMULTIPLIER
#define TIMEOUTMULTIPLIER 10
#define TIMEOUTMULTIPLIER 2
#endif
namespace Mist {
@ -41,6 +41,8 @@ namespace Mist {
singleton = this;
bufferTime = 50000;
cutTime = 0;
hasPush = false;
resumeMode = false;
}
inputBuffer::~inputBuffer() {
@ -48,16 +50,111 @@ namespace Mist {
if (myMeta.tracks.size()) {
DEBUG_MSG(DLVL_DEVEL, "Cleaning up, removing last keyframes");
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
while (removeKey(it->first)) {}
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[it->first];
if (!nProxy.metaPages.count(it->first) || !nProxy.metaPages[it->first].mapped) {
continue;
}
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
int * tmpOffset = (int *)(nProxy.metaPages[it->first].mapped + i);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0) {
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) {
locations[keyNum].curOffset = 0;
}
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
}
for (std::map<unsigned long, DTSCPageData>::iterator it2 = locations.begin(); it2 != locations.end(); it2++) {
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), it->first, it2->first);
IPC::sharedPage erasePage(thisPageName, 20971520);
erasePage.master = true;
}
}
}
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.unlink();
}
///Cleans up any left-over data for the current stream
void inputBuffer::onCrash(){
WARN_MSG("Buffer crashed. Cleaning.");
streamName = config->getString("streamname");
char pageName[NAME_BUFFER_SIZE];
//Set userpage to all 0xFF's, will disconnect all current clients.
snprintf(pageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
std::string baseName = pageName;
for (long unsigned i = 0; i < 15; ++i){
unsigned int size = std::min(((8192 * 2) << i), (32 * 1024 * 1024));
IPC::sharedPage tmp(std::string(baseName + (char)(i + (int)'A')), size, false, false);
if (tmp.mapped){
tmp.master = true;
WARN_MSG("Wiping %s", std::string(baseName + (char)(i + (int)'A')).c_str());
memset(tmp.mapped, 0xFF, size);
}
}
{
//Delete the live stream semaphore, if any.
snprintf(pageName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(pageName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.unlink();
}{
//Delete the stream index metapage.
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
IPC::sharedPage erasePage(pageName, DEFAULT_STRM_PAGE_SIZE, false, false);
erasePage.master = true;
}
//Delete most if not all temporary track metadata pages.
for (long unsigned i = 1001; i <= 1024; ++i){
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_META, streamName.c_str(), i);
IPC::sharedPage erasePage(pageName, 1024, false, false);
erasePage.master = true;
}
//Delete most if not all track indexes and data pages.
for (long unsigned i = 1; i <= 24; ++i){
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, streamName.c_str(), i);
IPC::sharedPage indexPage(pageName, SHM_TRACK_INDEX_SIZE, false, false);
indexPage.master = true;
if (indexPage.mapped){
char * mappedPointer = indexPage.mapped;
for (int j = 0; j < 8192; j += 8) {
int * tmpOffset = (int *)(mappedPointer + j);
if (tmpOffset[0] == 0 && tmpOffset[1] == 0){
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
snprintf(pageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), i, keyNum);
IPC::sharedPage erasePage(pageName, 1024, false, false);
erasePage.master = true;
}
}
}
}
/// \triggers
/// The `"STREAM_BUFFER"` trigger is stream-specific, and is ran whenever the buffer changes state between playable (FULL) or not (EMPTY). It cannot be cancelled. It is possible to receive multiple EMPTY calls without FULL calls in between, as EMPTY is always generated when a stream is unloaded from memory, even if this stream never reached playable state in the first place (e.g. a broadcast was cancelled before filling enough buffer to be playable). Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// FULL or EMPTY (depending on current state)
/// ~~~~~~~~~~~~~~~
void inputBuffer::updateMeta() {
long long unsigned int firstms = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int lastms = 0;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
if (it->second.type == "meta" || !it->second.type.size()){continue;}
if (it->second.type == "meta" || !it->second.type.size()) {
continue;
}
if (it->second.init.size()){
if (!initData.count(it->first) || initData[it->first] != it->second.init){
initData[it->first] = it->second.init;
@ -81,14 +178,14 @@ namespace Mist {
snprintf(liveSemName, NAME_BUFFER_SIZE, SEM_LIVE, streamName.c_str());
IPC::semaphore liveMeta(liveSemName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
liveMeta.wait();
if (!metaPages.count(0) || !metaPages[0].mapped) {
if (!nProxy.metaPages.count(0) || !nProxy.metaPages[0].mapped) {
char pageName[NAME_BUFFER_SIZE];
snprintf(pageName, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
metaPages[0].init(pageName, DEFAULT_META_PAGE_SIZE, true);
metaPages[0].master = false;
nProxy.metaPages[0].init(pageName, DEFAULT_STRM_PAGE_SIZE, true);
nProxy.metaPages[0].master = false;
}
myMeta.writeTo(metaPages[0].mapped);
memset(metaPages[0].mapped + myMeta.getSendLen(), 0, (metaPages[0].len > myMeta.getSendLen() ? std::min(metaPages[0].len - myMeta.getSendLen(), 4ll) : 0));
myMeta.writeTo(nProxy.metaPages[0].mapped);
memset(nProxy.metaPages[0].mapped + myMeta.getSendLen(), 0, (nProxy.metaPages[0].len > myMeta.getSendLen() ? std::min(nProxy.metaPages[0].len - myMeta.getSendLen(), 4ll) : 0));
liveMeta.post();
}
@ -118,30 +215,15 @@ namespace Mist {
if (bufferLocations[tid].size() > 1) {
//Check if the first key starts on the second page or higher
if (myMeta.tracks[tid].keys[0].getNumber() >= (++(bufferLocations[tid].begin()))->first || !config->is_active) {
//Find page in indexpage and null it
for (int i = 0; i < 8192; i += 8) {
unsigned int thisKeyNum = ((((long long int *)(metaPages[tid].mapped + i))[0]) >> 32) & 0xFFFFFFFF;
if (thisKeyNum == htonl(bufferLocations[tid].begin()->first) && ((((long long int *)(metaPages[tid].mapped + i))[0]) != 0)){
(((long long int *)(metaPages[tid].mapped + i))[0]) = 0;
}
}
DEBUG_MSG(DLVL_DEVEL, "Erasing track %d, keys %lu-%lu from buffer", tid, bufferLocations[tid].begin()->first, bufferLocations[tid].begin()->first + bufferLocations[tid].begin()->second.keyNum - 1);
bufferRemove(tid, bufferLocations[tid].begin()->first);
for (int i = 0; i < 1024; i++) {
int * tmpOffset = (int *)(metaPages[tid].mapped + (i * 8));
int tmpNum = ntohl(tmpOffset[0]);
if (tmpNum == bufferLocations[tid].begin()->first) {
tmpOffset[0] = 0;
tmpOffset[1] = 0;
}
}
curPageNum.erase(tid);
nProxy.curPageNum.erase(tid);
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first);
curPage[tid].init(thisPageName, 20971520);
curPage[tid].master = true;
curPage.erase(tid);
nProxy.curPage[tid].init(thisPageName, 20971520);
nProxy.curPage[tid].master = true;
nProxy.curPage.erase(tid);
bufferLocations[tid].erase(bufferLocations[tid].begin());
} else {
@ -158,13 +240,13 @@ namespace Mist {
for (std::map<unsigned long, DTSCPageData>::iterator it = bufferLocations[tid].begin(); it != bufferLocations[tid].end(); it++){
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tid, it->first);
curPage[tid].init(thisPageName, 20971520, false, false);
curPage[tid].master = true;
curPage.erase(tid);
nProxy.curPage[tid].init(thisPageName, 20971520, false, false);
nProxy.curPage[tid].master = true;
nProxy.curPage.erase(tid);
}
bufferLocations.erase(tid);
metaPages[tid].master = true;
metaPages.erase(tid);
nProxy.metaPages[tid].master = true;
nProxy.metaPages.erase(tid);
}
void inputBuffer::finish() {
@ -189,11 +271,13 @@ namespace Mist {
long long unsigned int time = Util::bootSecs();
long long unsigned int compareFirst = 0xFFFFFFFFFFFFFFFFull;
long long unsigned int compareLast = 0;
std::set<std::string> activeTypes;
//for tracks that were updated in the last 5 seconds, get the first and last ms edges.
for (std::map<unsigned int, DTSC::Track>::iterator it2 = myMeta.tracks.begin(); it2 != myMeta.tracks.end(); it2++) {
if ((time - lastUpdated[it2->first]) > 5) {
continue;
}
activeTypes.insert(it2->second.type);
if (it2->second.lastms > compareLast) {
compareLast = it2->second.lastms;
}
@ -204,7 +288,7 @@ namespace Mist {
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//if not updated for an entire buffer duration, or last updated track and this track differ by an entire buffer duration, erase the track.
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000) ||
(compareLast && (long long int)(time - lastUpdated[it->first]) > 5 && (
(compareLast && activeTypes.count(it->second.type) && (long long int)(time - lastUpdated[it->first]) > 5 && (
(compareLast < it->second.firstms && (long long int)(it->second.firstms - compareLast) > bufferTime)
||
(compareFirst > it->second.lastms && (long long int)(compareFirst - it->second.lastms) > bufferTime)
@ -213,26 +297,26 @@ namespace Mist {
unsigned int tid = it->first;
//erase this track
if ((long long int)(time - lastUpdated[it->first]) > (long long int)(bufferTime / 1000)){
INFO_MSG("Erasing track %d because not updated for %ds (> %ds)", it->first, (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000));
WARN_MSG("Erasing %s track %d (%s/%s) because not updated for %ds (> %ds)", streamName.c_str(), it->first, it->second.type.c_str(), it->second.codec.c_str(), (long long int)(time - lastUpdated[it->first]), (long long int)(bufferTime / 1000));
}else{
INFO_MSG("Erasing inactive track %u because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", it->first, it->second.firstms / 1000, it->second.lastms / 1000, compareFirst/1000, compareLast/1000, bufferTime / 1000);
WARN_MSG("Erasing %s inactive track %u (%s/%s) because it was inactive for 5+ seconds and contains data (%us - %us), while active tracks are (%us - %us), which is more than %us seconds apart.", streamName.c_str(), it->first, it->second.type.c_str(), it->second.codec.c_str(), it->second.firstms / 1000, it->second.lastms / 1000, compareFirst / 1000, compareLast / 1000, bufferTime / 1000);
}
lastUpdated.erase(tid);
/// \todo Consider replacing with eraseTrackDataPages(it->first)?
while (bufferLocations[tid].size()){
char thisPageName[NAME_BUFFER_SIZE];
snprintf(thisPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), (unsigned long)tid, bufferLocations[tid].begin()->first);
curPage[tid].init(thisPageName, 20971520);
curPage[tid].master = true;
curPage.erase(tid);
nProxy.curPage[tid].init(thisPageName, 20971520);
nProxy.curPage[tid].master = true;
nProxy.curPage.erase(tid);
bufferLocations[tid].erase(bufferLocations[tid].begin());
}
if (pushLocation.count(it->first)){
pushLocation.erase(it->first);
}
curPageNum.erase(it->first);
metaPages[it->first].master = true;
metaPages.erase(it->first);
nProxy.curPageNum.erase(it->first);
nProxy.metaPages[it->first].master = true;
nProxy.metaPages.erase(it->first);
activeTracks.erase(it->first);
myMeta.tracks.erase(it->first);
changed = true;
@ -251,8 +335,9 @@ namespace Mist {
}
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
//non-video tracks need to have a second keyframe that is <= firstVideo
//firstVideo = 1 happens when there are no tracks, in which case we don't care any more
if (it->second.type != "video") {
if (it->second.keys.size() < 2 || it->second.keys[1].getTime() > firstVideo) {
if (it->second.keys.size() < 2 || (it->second.keys[1].getTime() > firstVideo && firstVideo != 1)){
continue;
}
}
@ -270,6 +355,14 @@ namespace Mist {
}
}
updateMeta();
static bool everHadPush = false;
if (hasPush) {
hasPush = false;
everHadPush = true;
} else if (everHadPush && !resumeMode && config->is_active) {
INFO_MSG("Shutting down buffer because resume mode is disabled and the source disconnected");
config->is_active = false;
}
}
void inputBuffer::userCallback(char * data, size_t len, unsigned int id) {
@ -278,10 +371,10 @@ namespace Mist {
//Get the counter of this user
char counter = (*(data - 1));
//Each user can have at maximum SIMUL_TRACKS elements in their userpage.
IPC::userConnection userConn(data);
for (int index = 0; index < SIMUL_TRACKS; index++) {
char * thisData = data + (index * 6);
//Get the track id from the current element
unsigned long value = ((long)(thisData[0]) << 24) | ((long)(thisData[1]) << 16) | ((long)(thisData[2]) << 8) | thisData[3];
unsigned long value = userConn.getTrackId(index);
//Skip value 0xFFFFFFFF as this indicates a previously declined track
if (value == 0xFFFFFFFF) {
continue;
@ -294,12 +387,10 @@ namespace Mist {
//If the current value indicates a valid trackid, and it is pushed from this user
if (pushLocation[value] == data) {
//Check for timeouts, and erase the track if necessary
if (counter == 126 || counter == 127 || counter == 254 || counter == 255) {
if (counter == 126 || counter == 127){
pushLocation.erase(value);
if (negotiatingTracks.count(value)) {
negotiatingTracks.erase(value);
metaPages[value].master = true;
metaPages.erase(value);
}
if (activeTracks.count(value)) {
updateMeta();
@ -307,8 +398,8 @@ namespace Mist {
activeTracks.erase(value);
bufferLocations.erase(value);
}
metaPages[value].master = true;
metaPages.erase(value);
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
continue;
}
}
@ -320,28 +411,24 @@ namespace Mist {
//Add the temporary track id to the list of tracks that are currently being negotiated
negotiatingTracks.insert(tempMapping);
//Write the temporary id to the userpage element
thisData[0] = (tempMapping >> 24) & 0xFF;
thisData[1] = (tempMapping >> 16) & 0xFF;
thisData[2] = (tempMapping >> 8) & 0xFF;
thisData[3] = (tempMapping) & 0xFF;
userConn.setTrackId(index, tempMapping);
//Obtain the original track number for the pushing process
unsigned long originalTrack = ((long)(thisData[4]) << 8) | thisData[5];
unsigned long originalTrack = userConn.getKeynum(index);
//Overwrite it with 0xFFFF
thisData[4] = 0xFF;
thisData[5] = 0xFF;
userConn.setKeynum(index, 0xFFFF);
DEBUG_MSG(DLVL_HIGH, "Incoming track %lu from pushing process %d has now been assigned temporary id %llu", originalTrack, id, tempMapping);
}
//The track id is set to the value of a track that we are currently negotiating about
if (negotiatingTracks.count(value)) {
//If the metadata page for this track is not yet registered, initialize it
if (!metaPages.count(value) || !metaPages[value].mapped) {
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) {
char tempMetaName[NAME_BUFFER_SIZE];
snprintf(tempMetaName, NAME_BUFFER_SIZE, SHM_TRACK_META, config->getString("streamname").c_str(), value);
metaPages[value].init(tempMetaName, 8388608, false, false);
nProxy.metaPages[value].init(tempMetaName, 8388608, false, false);
}
//If this tracks metdata page is not initialize, skip the entire element for now. It will be instantiated later
if (!metaPages[value].mapped) {
if (!nProxy.metaPages[value].mapped) {
//remove the negotiation if it has timed out
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
@ -353,13 +440,13 @@ namespace Mist {
//The page exist, now we try to read in the metadata of the track
//Store the size of the dtsc packet to read.
unsigned int len = ntohl(((int *)metaPages[value].mapped)[1]);
unsigned int len = ntohl(((int *)nProxy.metaPages[value].mapped)[1]);
//Temporary variable, won't be used again
unsigned int tempForReadingMeta = 0;
//Read in the metadata through a temporary JSON object
///\todo Optimize this part. Find a way to not have to store the metadata in JSON first, but read it from the page immediately
JSON::Value tempJSONForMeta;
JSON::fromDTMI((const unsigned char *)metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
JSON::fromDTMI((const unsigned char *)nProxy.metaPages[value].mapped + 8, len, tempForReadingMeta, tempJSONForMeta);
//Construct a metadata object for the current track
DTSC::Meta trackMeta(tempJSONForMeta);
//If the track metadata does not contain the negotiated track, assume the metadata is currently being written, and skip the element for now. It will be instantiated in the next call.
@ -368,8 +455,8 @@ namespace Mist {
if (++negotiationTimeout[value] >= 1000){
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
negotiationTimeout.erase(value);
}
continue;
@ -381,8 +468,8 @@ namespace Mist {
//Remove the "negotiate" status in either case
negotiatingTracks.erase(value);
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
metaPages[value].master = true;
metaPages.erase(value);
nProxy.metaPages[value].master = true;
nProxy.metaPages.erase(value);
int finalMap = 3;
if (trackMeta.tracks.find(value)->second.type == "video"){finalMap = 1;}
@ -402,8 +489,8 @@ namespace Mist {
//Set master to true before erasing the page, because we are responsible for cleaning up unused pages
updateMeta();
eraseTrackDataPages(value);
metaPages[finalMap].master = true;
metaPages.erase(finalMap);
nProxy.metaPages[finalMap].master = true;
nProxy.metaPages.erase(finalMap);
bufferLocations.erase(finalMap);
}
@ -415,43 +502,39 @@ namespace Mist {
pushLocation[finalMap] = data;
//Initialize the metadata for this track if it was not in place yet.
if (!myMeta.tracks.count(finalMap)) {
DEBUG_MSG(DLVL_HIGH, "Inserting metadata for track number %d", finalMap);
DEBUG_MSG(DLVL_MEDIUM, "Inserting metadata for track number %d", finalMap);
myMeta.tracks[finalMap] = trackMeta.tracks.begin()->second;
myMeta.tracks[finalMap].trackID = finalMap;
}
//Write the final mapped track number to the user page element
thisData[0] = (finalMap >> 24) & 0xFF;
thisData[1] = (finalMap >> 16) & 0xFF;
thisData[2] = (finalMap >> 8) & 0xFF;
thisData[3] = (finalMap) & 0xFF;
//Write the key number to start pushing from to to the userpage element.
//Write the final mapped track number and keyframe number to the user page element
//This is used to resume pushing as well as pushing new tracks
unsigned long keyNum = myMeta.tracks[finalMap].keys.size();
thisData[4] = (keyNum >> 8) & 0xFF;
thisData[5] = keyNum & 0xFF;
userConn.setTrackId(index, finalMap);
userConn.setKeynum(index, myMeta.tracks[finalMap].keys.size());
//Update the metadata to reflect all changes
updateMeta();
}
//If the track is active, and this is the element responsible for pushing it
if (activeTracks.count(value) && pushLocation[value] == data) {
//Open the track index page if we dont have it open yet
if (!metaPages.count(value) || !metaPages[value].mapped) {
if (!nProxy.metaPages.count(value) || !nProxy.metaPages[value].mapped) {
char firstPage[NAME_BUFFER_SIZE];
snprintf(firstPage, NAME_BUFFER_SIZE, SHM_TRACK_INDEX, config->getString("streamname").c_str(), value);
metaPages[value].init(firstPage, 8192, false, false);
nProxy.metaPages[value].init(firstPage, SHM_TRACK_INDEX_SIZE, false, false);
}
if (metaPages[value].mapped) {
if (nProxy.metaPages[value].mapped) {
//Update the metadata for this track
updateTrackMeta(value);
hasPush = true;
}
}
}
}
void inputBuffer::updateTrackMeta(unsigned long tNum) {
VERYHIGH_MSG("Updating meta for track %d", tNum);
//Store a reference for easier access
std::map<unsigned long, DTSCPageData> & locations = bufferLocations[tNum];
char * mappedPointer = metaPages[tNum].mapped;
char * mappedPointer = nProxy.metaPages[tNum].mapped;
//First detect all entries on metaPage
for (int i = 0; i < 8192; i += 8) {
@ -460,6 +543,7 @@ namespace Mist {
continue;
}
unsigned long keyNum = ntohl(tmpOffset[0]);
INSANE_MSG("Page %d detected, with %d keys", keyNum, ntohl(tmpOffset[1]));
//Add an entry into bufferLocations[tNum] for the pages we haven't handled yet.
if (!locations.count(keyNum)) {
@ -468,7 +552,6 @@ namespace Mist {
locations[keyNum].pageNum = keyNum;
locations[keyNum].keyNum = ntohl(tmpOffset[1]);
}
//Since the map is ordered by keynumber, this loop updates the metadata for each page from oldest to newest
for (std::map<unsigned long, DTSCPageData>::iterator pageIt = locations.begin(); pageIt != locations.end(); pageIt++) {
updateMetaFromPage(tNum, pageIt->first);
@ -477,6 +560,7 @@ namespace Mist {
}
void inputBuffer::updateMetaFromPage(unsigned long tNum, unsigned long pageNum) {
VERYHIGH_MSG("Updating meta for track %d page %d", tNum, pageNum);
DTSCPageData & pageData = bufferLocations[tNum][pageNum];
//If the current page is over its 8mb "splitting" boundary
@ -491,27 +575,27 @@ namespace Mist {
//Otherwise open and parse the page
//Open the page if it is not yet open
if (!curPageNum.count(tNum) || curPageNum[tNum] != pageNum || !curPage[tNum].mapped){
if (!nProxy.curPageNum.count(tNum) || nProxy.curPageNum[tNum] != pageNum || !nProxy.curPage[tNum].mapped) {
//DO NOT ERASE THE PAGE HERE, master is not set to true
curPageNum.erase(tNum);
nProxy.curPageNum.erase(tNum);
char nextPageName[NAME_BUFFER_SIZE];
snprintf(nextPageName, NAME_BUFFER_SIZE, SHM_TRACK_DATA, config->getString("streamname").c_str(), tNum, pageNum);
curPage[tNum].init(nextPageName, 20971520);
nProxy.curPage[tNum].init(nextPageName, 20971520);
//If the page can not be opened, stop here
if (!curPage[tNum].mapped) {
if (!nProxy.curPage[tNum].mapped) {
WARN_MSG("Could not open page: %s", nextPageName);
return;
}
curPageNum[tNum] = pageNum;
nProxy.curPageNum[tNum] = pageNum;
}
DTSC::Packet tmpPack;
if (!curPage[tNum].mapped[pageData.curOffset]){
if (!nProxy.curPage[tNum].mapped[pageData.curOffset]) {
VERYHIGH_MSG("No packet on page %lu for track %lu, waiting...", pageNum, tNum);
return;
}
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
tmpPack.reInit(nProxy.curPage[tNum].mapped + pageData.curOffset, 0);
//No new data has been written on the page since last update
if (!tmpPack) {
return;
@ -527,7 +611,7 @@ namespace Mist {
//Update the offset on the page with the size of the current packet
pageData.curOffset += tmpPack.getDataLen();
//Attempt to read in the next packet
tmpPack.reInit(curPage[tNum].mapped + pageData.curOffset, 0);
tmpPack.reInit(nProxy.curPage[tNum].mapped + pageData.curOffset, 0);
}
}
@ -535,8 +619,8 @@ namespace Mist {
std::string strName = config->getString("streamname");
Util::sanitizeName(strName);
strName = strName.substr(0, (strName.find_first_of("+ ")));
IPC::sharedPage serverCfg("!mistConfig", DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock("!mistConfLock", O_CREAT | O_RDWR, ACCESSPERMS, 1);
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
DTSC::Scan streamCfg = DTSC::Scan(serverCfg.mapped, serverCfg.len).getMember("streams").getMember(strName);
long long tmpNum;
@ -553,7 +637,9 @@ namespace Mist {
tmpNum = config->getOption("bufferTime").asInt();
}
}
if (tmpNum < 1000){tmpNum = 1000;}
if (tmpNum < 1000) {
tmpNum = 1000;
}
//if the new value is different, print a message and apply it
if (bufferTime != tmpNum) {
DEBUG_MSG(DLVL_DEVEL, "Setting bufferTime from %u to new value of %lli", bufferTime, tmpNum);

View file

@ -7,9 +7,12 @@ namespace Mist {
public:
inputBuffer(Util::Config * cfg);
~inputBuffer();
void onCrash();
private:
unsigned int bufferTime;
unsigned int cutTime;
bool hasPush;
bool resumeMode;
protected:
//Private Functions
bool setup();
@ -33,8 +36,7 @@ namespace Mist {
std::map<unsigned long, std::map<unsigned long, DTSCPageData> > bufferLocations;
std::map<unsigned long, char *> pushLocation;
inputBuffer * singleton;
//This is used for an ugly fix to prevent metadata from dissapearing in some cases.
//This is used for an ugly fix to prevent metadata from disappearing in some cases.
std::map<unsigned long, std::string> initData;
};
}

View file

@ -7,6 +7,9 @@
#include <mist/stream.h>
#include <mist/defines.h>
#include <mist/util.h>
#include <mist/bitfields.h>
#include "input_dtsc.h"
namespace Mist {
@ -14,7 +17,8 @@ namespace Mist {
capa["name"] = "DTSC";
capa["desc"] = "Enables DTSC Input";
capa["priority"] = 9ll;
capa["source_match"] = "/*.dtsc";
capa["source_match"].append("/*.dtsc");
capa["source_match"].append("dtsc://*");
capa["codecs"][0u][0u].append("H264");
capa["codecs"][0u][0u].append("H263");
capa["codecs"][0u][0u].append("VP6");
@ -24,7 +28,142 @@ namespace Mist {
capa["codecs"][0u][1u].append("vorbis");
}
bool inputDTSC::needsLock(){
return config->getString("input").substr(0, 7) != "dtsc://";
}
void parseDTSCURI(const std::string & src, std::string & host, uint16_t & port, std::string & password, std::string & streamName) {
host = "";
port = 4200;
password = "";
streamName = "";
std::deque<std::string> matches;
if (Util::stringScan(src, "%s:%s@%s/%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
password = matches[2];
streamName = matches[3];
return;
}
//Using default streamname
if (Util::stringScan(src, "%s:%s@%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
password = matches[2];
return;
}
//Without password
if (Util::stringScan(src, "%s:%s/%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
streamName = matches[2];
return;
}
//Using default port
if (Util::stringScan(src, "%s@%s/%s", matches)) {
host = matches[0];
password = matches[1];
streamName = matches[2];
return;
}
//Default port, no password
if (Util::stringScan(src, "%s/%s", matches)) {
host = matches[0];
streamName = matches[1];
return;
}
//No password, default streamname
if (Util::stringScan(src, "%s:%s", matches)) {
host = matches[0];
port = atoi(matches[1].c_str());
return;
}
//Default port and streamname
if (Util::stringScan(src, "%s@%s", matches)) {
host = matches[0];
password = matches[1];
return;
}
//Default port and streamname, no password
if (Util::stringScan(src, "%s", matches)) {
host = matches[0];
return;
}
}
void inputDTSC::parseStreamHeader() {
while (srcConn.connected()){
srcConn.spool();
if (srcConn.Received().available(8)){
if (srcConn.Received().copy(4) == "DTCM" || srcConn.Received().copy(4) == "DTSC") {
// Command message
std::string toRec = srcConn.Received().copy(8);
unsigned long rSize = Bit::btohl(toRec.c_str() + 4);
if (!srcConn.Received().available(8 + rSize)) {
continue; //abort - not enough data yet
}
//Ignore initial DTCM message, as this is a "hi" message from the server
if (srcConn.Received().copy(4) == "DTCM"){
srcConn.Received().remove(8 + rSize);
}else{
std::string dataPacket = srcConn.Received().remove(8+rSize);
DTSC::Packet metaPack(dataPacket.data(), dataPacket.size());
myMeta.reinit(metaPack);
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
continueNegotiate(it->first, true);
}
break;
}
}else{
INFO_MSG("Received a wrong type of packet - '%s'", srcConn.Received().copy(4).c_str());
break;
}
}
}
}
bool inputDTSC::openStreamSource() {
std::string source = config->getString("input");
if (source.find("dtsc://") == 0) {
source.erase(0, 7);
}
std::string host;
uint16_t port;
std::string password;
std::string streamName;
parseDTSCURI(source, host, port, password, streamName);
std::string givenStream = config->getString("streamname");
if (streamName == "") {
streamName = givenStream;
}else{
if (givenStream.find("+") != std::string::npos){
streamName += givenStream.substr(givenStream.find("+"));
}
}
srcConn = Socket::Connection(host, port, true);
if (!srcConn.connected()){
return false;
}
JSON::Value prep;
prep["cmd"] = "play";
prep["version"] = "MistServer " PACKAGE_VERSION;
prep["stream"] = streamName;
srcConn.SendNow("DTCM");
char sSize[4] = {0, 0, 0, 0};
Bit::htobl(sSize, prep.packedSize());
srcConn.SendNow(sSize, 4);
prep.sendTo(srcConn);
return true;
}
void inputDTSC::closeStreamSource(){
srcConn.close();
}
bool inputDTSC::setup() {
if (!needsLock()) {
return true;
} else {
if (config->getString("input") == "-") {
std::cerr << "Input from stdin not yet supported" << std::endl;
return false;
@ -46,10 +185,14 @@ namespace Mist {
if (!inFile) {
return false;
}
}
return true;
}
bool inputDTSC::readHeader() {
if (!needsLock()) {
return true;
}
if (!inFile) {
return false;
}
@ -69,12 +212,62 @@ namespace Mist {
}
void inputDTSC::getNext(bool smart) {
if (!needsLock()){
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTCM){
std::string cmd;
thisPacket.getString("cmd", cmd);
if (cmd == "reset"){
//Read next packet
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTSC_HEAD){
DTSC::Meta newMeta;
newMeta.reinit(thisPacket);
//Detect new tracks
std::set<unsigned int> newTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){
if (!myMeta.tracks.count(it->first)){
newTracks.insert(it->first);
}
}
for (std::set<unsigned int>::iterator it = newTracks.begin(); it != newTracks.end(); it++){
INFO_MSG("Adding track %d to internal metadata", *it);
myMeta.tracks[*it] = newMeta.tracks[*it];
continueNegotiate(*it, true);
}
//Detect removed tracks
std::set<unsigned int> deletedTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!newMeta.tracks.count(it->first)){
deletedTracks.insert(it->first);
}
}
for(std::set<unsigned int>::iterator it = deletedTracks.begin(); it != deletedTracks.end(); it++){
INFO_MSG("Deleting track %d from internal metadata", *it);
myMeta.tracks.erase(*it);
}
//Read next packet before returning
thisPacket.reInit(srcConn);
}else{
myMeta = DTSC::Meta();
}
}else{
//Read next packet before returning
thisPacket.reInit(srcConn);
}
}
}else{
if (smart){
inFile.seekNext();
}else{
inFile.parseNext();
}
thisPacket = inFile.getPacket();
}
}
void inputDTSC::seek(int seekTime) {

View file

@ -5,8 +5,12 @@ namespace Mist {
class inputDTSC : public Input {
public:
inputDTSC(Util::Config * cfg);
bool needsLock();
protected:
//Private Functions
bool openStreamSource();
void closeStreamSource();
void parseStreamHeader();
bool setup();
bool readHeader();
void getNext(bool smart = true);
@ -14,6 +18,8 @@ namespace Mist {
void trackSelect(std::string trackSpec);
DTSC::File inFile;
Socket::Connection srcConn;
};
}

View file

@ -122,7 +122,7 @@ namespace Mist {
}
}
if (!offset){
DEBUG_MSG(DLVL_FAIL, "Sync byte not found from offset %llu", filePos);
DEBUG_MSG(DLVL_FAIL, "Sync byte not found from offset %lu", filePos);
return;
}
filePos += offset;

View file

@ -16,24 +16,34 @@ int main(int argc, char * argv[]) {
mistIn conv(&conf);
if (conf.parseArgs(argc, argv)) {
std::string streamName = conf.getString("streamname");
conv.argumentsParsed();
IPC::semaphore playerLock;
if (streamName.size()){
playerLock.open(std::string("/lock_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
if (conv.needsLock()){
if (streamName.size()){
char semName[NAME_BUFFER_SIZE];
snprintf(semName, NAME_BUFFER_SIZE, SEM_INPUT, streamName.c_str());
playerLock.open(semName, O_CREAT | O_RDWR, ACCESSPERMS, 1);
if (!playerLock.tryWait()){
DEBUG_MSG(DLVL_DEVEL, "A player for stream %s is already running", streamName.c_str());
return 1;
}
}
}
conf.activate();
while (conf.is_active){
pid_t pid = fork();
if (pid == 0){
playerLock.close();
if (conv.needsLock()){
playerLock.close();
}
return conv.run();
}
if (pid == -1){
DEBUG_MSG(DLVL_FAIL, "Unable to spawn player process");
playerLock.post();
if (conv.needsLock()){
playerLock.post();
}
return 2;
}
//wait for the process to exit
@ -50,6 +60,11 @@ int main(int argc, char * argv[]) {
DEBUG_MSG(DLVL_MEDIUM, "Input for stream %s shut down cleanly", streamName.c_str());
break;
}
#if DEBUG >= DLVL_DEVEL
WARN_MSG("Aborting autoclean; this is a development build.");
#else
conv.onCrash();
#endif
if (DEBUG >= DLVL_DEVEL){
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Aborting restart; this is a development build.", streamName.c_str());
break;
@ -57,8 +72,11 @@ int main(int argc, char * argv[]) {
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s uncleanly shut down! Restarting...", streamName.c_str());
}
}
playerLock.post();
playerLock.close();
if (conv.needsLock()){
playerLock.post();
playerLock.unlink();
playerLock.close();
}
}
return 0;
}