Merge branch 'development' into LTS_development
# Conflicts: # src/input/input.cpp
This commit is contained in:
commit
1fde08e333
7 changed files with 50 additions and 10 deletions
|
@ -444,6 +444,18 @@ int Util::Procs::Count() {
|
||||||
/// Returns true if a process with this PID is currently active.
|
/// Returns true if a process with this PID is currently active.
|
||||||
bool Util::Procs::isActive(pid_t name) {
|
bool Util::Procs::isActive(pid_t name) {
|
||||||
tthread::lock_guard<tthread::mutex> guard(plistMutex);
|
tthread::lock_guard<tthread::mutex> guard(plistMutex);
|
||||||
return (plist.count(name) == 1) && (kill(name, 0) == 0);
|
return (kill(name, 0) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Forget about the given PID, keeping it running on shutdown.
|
||||||
|
void Util::Procs::forget(pid_t pid) {
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(plistMutex);
|
||||||
|
plist.erase(pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remember the given PID, killing it on shutdown.
|
||||||
|
void Util::Procs::remember(pid_t pid) {
|
||||||
|
tthread::lock_guard<tthread::mutex> guard(plistMutex);
|
||||||
|
plist.insert(pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,8 @@ namespace Util {
|
||||||
static int Count();
|
static int Count();
|
||||||
static bool isActive(pid_t name);
|
static bool isActive(pid_t name);
|
||||||
static bool isRunning(pid_t pid);
|
static bool isRunning(pid_t pid);
|
||||||
|
static void forget(pid_t pid);
|
||||||
|
static void remember(pid_t pid);
|
||||||
static std::set<int> socketList; ///< Holds sockets that should be closed before forking
|
static std::set<int> socketList; ///< Holds sockets that should be closed before forking
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,6 +142,17 @@ namespace Mist {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Default crash handler, cleans up Pull semaphore on crashes
|
||||||
|
void Input::onCrash(){
|
||||||
|
if (streamName.size() && !needsLock()) {
|
||||||
|
//we have a Pull semaphore to clean up, do it
|
||||||
|
IPC::semaphore pullLock;
|
||||||
|
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
|
pullLock.close();
|
||||||
|
pullLock.unlink();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Input::convert() {
|
void Input::convert() {
|
||||||
//check filename for no -
|
//check filename for no -
|
||||||
if (config->getString("output") != "-") {
|
if (config->getString("output") != "-") {
|
||||||
|
@ -262,12 +273,18 @@ namespace Mist {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Main loop for stream-style inputs.
|
/// Main loop for stream-style inputs.
|
||||||
/// This loop will start the buffer without resume support, and then repeatedly call ..... followed by ....
|
/// This loop will do the following, in order:
|
||||||
|
/// - exit if another stream() input is already open for this streamname
|
||||||
|
/// - start a buffer in push mode
|
||||||
|
/// - connect to it
|
||||||
|
/// - run parseStreamHeader
|
||||||
|
/// - if there are tracks, register as a non-viewer on the user page of the buffer
|
||||||
|
/// - call getNext() in a loop, buffering packets
|
||||||
void Input::stream(){
|
void Input::stream(){
|
||||||
IPC::semaphore pullLock;
|
IPC::semaphore pullLock;
|
||||||
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
pullLock.open(std::string("/MstPull_" + streamName).c_str(), O_CREAT | O_RDWR, ACCESSPERMS, 1);
|
||||||
if (!pullLock.tryWait()){
|
if (!pullLock.tryWait()){
|
||||||
DEBUG_MSG(DLVL_DEVEL, "A pull process for stream %s is already running", streamName.c_str());
|
WARN_MSG("A pull process for stream %s is already running", streamName.c_str());
|
||||||
pullLock.close();
|
pullLock.close();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ namespace Mist {
|
||||||
public:
|
public:
|
||||||
Input(Util::Config * cfg);
|
Input(Util::Config * cfg);
|
||||||
virtual int run();
|
virtual int run();
|
||||||
virtual void onCrash(){}
|
virtual void onCrash();
|
||||||
virtual void argumentsParsed(){}
|
virtual void argumentsParsed(){}
|
||||||
virtual ~Input() {};
|
virtual ~Input() {};
|
||||||
|
|
||||||
|
|
|
@ -547,7 +547,10 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
return highest;
|
return highest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Loads the page for the given trackId and keyNum into memory.
|
||||||
|
/// Overwrites any existing page for the same trackId.
|
||||||
|
/// Automatically calls thisPacket.null() if necessary.
|
||||||
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
|
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
|
||||||
if (!myMeta.tracks.count(trackId) || !myMeta.tracks[trackId].keys.size()){
|
if (!myMeta.tracks.count(trackId) || !myMeta.tracks[trackId].keys.size()){
|
||||||
WARN_MSG("Load for track %lu key %lld aborted - track is empty", trackId, keyNum);
|
WARN_MSG("Load for track %lu key %lld aborted - track is empty", trackId, keyNum);
|
||||||
|
@ -598,6 +601,10 @@ namespace Mist{
|
||||||
if (currKeyOpen.count(trackId) && currKeyOpen[trackId] == (unsigned int)pageNum){
|
if (currKeyOpen.count(trackId) && currKeyOpen[trackId] == (unsigned int)pageNum){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
//If we're loading the track thisPacket is on, null it to prevent accesses.
|
||||||
|
if (thisPacket && thisPacket.getTrackId() == trackId){
|
||||||
|
thisPacket.null();
|
||||||
|
}
|
||||||
char id[NAME_BUFFER_SIZE];
|
char id[NAME_BUFFER_SIZE];
|
||||||
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackId, pageNum);
|
snprintf(id, NAME_BUFFER_SIZE, SHM_TRACK_DATA, streamName.c_str(), trackId, pageNum);
|
||||||
nProxy.curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
|
nProxy.curPage[trackId].init(id, DEFAULT_DATA_PAGE_SIZE);
|
||||||
|
@ -951,7 +958,7 @@ namespace Mist{
|
||||||
}
|
}
|
||||||
|
|
||||||
///Attempts to prepare a new packet for output.
|
///Attempts to prepare a new packet for output.
|
||||||
///If thisPacket evaluates to false, playback has completed.
|
///If it returns true and thisPacket evaluates to false, playback has completed.
|
||||||
///Could be called repeatedly in a loop if you really really want a new packet.
|
///Could be called repeatedly in a loop if you really really want a new packet.
|
||||||
/// \returns true if thisPacket was filled with the next packet.
|
/// \returns true if thisPacket was filled with the next packet.
|
||||||
/// \returns false if we could not reliably determine the next packet yet.
|
/// \returns false if we could not reliably determine the next packet yet.
|
||||||
|
@ -1009,7 +1016,9 @@ namespace Mist{
|
||||||
//if we're going to read past the end of the data page, load the next page
|
//if we're going to read past the end of the data page, load the next page
|
||||||
//this only happens for VoD
|
//this only happens for VoD
|
||||||
if (nxt.offset >= nProxy.curPage[nxt.tid].len){
|
if (nxt.offset >= nProxy.curPage[nxt.tid].len){
|
||||||
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
|
if (thisPacket){
|
||||||
|
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
|
||||||
|
}
|
||||||
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
|
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
|
||||||
nxt.offset = 0;
|
nxt.offset = 0;
|
||||||
if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){
|
if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){
|
||||||
|
@ -1022,7 +1031,6 @@ namespace Mist{
|
||||||
buffer.insert(nxt);
|
buffer.insert(nxt);
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
thisPacket.null();
|
|
||||||
dropTrack(nxt.tid, "page load failure", true);
|
dropTrack(nxt.tid, "page load failure", true);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -1063,7 +1071,6 @@ namespace Mist{
|
||||||
//The next key showed up on another page!
|
//The next key showed up on another page!
|
||||||
//We've simply reached the end of the page. Load the next key = next page.
|
//We've simply reached the end of the page. Load the next key = next page.
|
||||||
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
|
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
|
||||||
thisPacket.null();
|
|
||||||
nxt.offset = 0;
|
nxt.offset = 0;
|
||||||
if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){
|
if (nProxy.curPage.count(nxt.tid) && nProxy.curPage[nxt.tid].mapped){
|
||||||
unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
|
unsigned long long nextTime = getDTSCTime(nProxy.curPage[nxt.tid].mapped, nxt.offset);
|
||||||
|
|
|
@ -61,6 +61,8 @@ namespace Mist {
|
||||||
uint32_t currTrackCount() const;
|
uint32_t currTrackCount() const;
|
||||||
virtual bool isReadyForPlay();
|
virtual bool isReadyForPlay();
|
||||||
//virtuals. The optional virtuals have default implementations that do as little as possible.
|
//virtuals. The optional virtuals have default implementations that do as little as possible.
|
||||||
|
/// This function is called whenever a packet is ready for sending.
|
||||||
|
/// Inside it, thisPacket is guaranteed to contain a valid packet.
|
||||||
virtual void sendNext() {}//REQUIRED! Others are optional.
|
virtual void sendNext() {}//REQUIRED! Others are optional.
|
||||||
bool prepareNext();
|
bool prepareNext();
|
||||||
virtual void dropTrack(uint32_t trackId, std::string reason, bool probablyBad = true);
|
virtual void dropTrack(uint32_t trackId, std::string reason, bool probablyBad = true);
|
||||||
|
|
|
@ -257,7 +257,7 @@ namespace Mist {
|
||||||
// erase &
|
// erase &
|
||||||
pos = nextpos + 1;
|
pos = nextpos + 1;
|
||||||
}
|
}
|
||||||
if (trackSwitch){
|
if (trackSwitch && thisPacket){
|
||||||
seek(thisPacket.getTime());
|
seek(thisPacket.getTime());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue