Merge branch 'development' into LTS_development

# Conflicts:
#	src/output/output.cpp
#	src/output/output.h
This commit is contained in:
Thulinma 2017-07-04 13:22:37 +02:00
commit c2520f9c81
2 changed files with 54 additions and 14 deletions

View file

@ -47,16 +47,21 @@ namespace Mist{
} }
void Output::bufferLivePacket(DTSC::Packet & packet){ void Output::bufferLivePacket(DTSC::Packet & packet){
if (!pushIsOngoing){
waitForStreamPushReady();
}
if (nProxy.negTimer > 600){ if (nProxy.negTimer > 600){
WARN_MSG("No negotiation response from buffer - reconnecting."); WARN_MSG("No negotiation response from buffer - reconnecting.");
nProxy.clear(); nProxy.clear();
reconnect(); reconnect();
} }
InOutBase::bufferLivePacket(packet); InOutBase::bufferLivePacket(packet);
pushIsOngoing = true;
} }
Output::Output(Socket::Connection & conn) : myConn(conn){ Output::Output(Socket::Connection & conn) : myConn(conn){
pushing = false; pushing = false;
pushIsOngoing = false;
firstTime = 0; firstTime = 0;
firstPacketTime = 0xFFFFFFFFFFFFFFFFull; firstPacketTime = 0xFFFFFFFFFFFFFFFFull;
lastPacketTime = 0; lastPacketTime = 0;
@ -350,7 +355,7 @@ namespace Mist{
selectDefaultTracks(); selectDefaultTracks();
if (!myMeta.vod && !isReadyForPlay()){ if (!myMeta.vod && !isReadyForPlay()){
unsigned long long waitUntil = Util::epoch() + 30; unsigned long long waitUntil = Util::epoch() + 30;
while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive()){ while (!myMeta.vod && !isReadyForPlay() && nProxy.userClient.isAlive() && keepGoing()){
if (Util::epoch() > waitUntil + 45 || (!selectedTracks.size() && Util::epoch() > waitUntil)){ if (Util::epoch() > waitUntil + 45 || (!selectedTracks.size() && Util::epoch() > waitUntil)){
INFO_MSG("Giving up waiting for playable tracks. Stream: %s, IP: %s", streamName.c_str(), getConnectedHost().c_str()); INFO_MSG("Giving up waiting for playable tracks. Stream: %s, IP: %s", streamName.c_str(), getConnectedHost().c_str());
break; break;
@ -556,7 +561,7 @@ namespace Mist{
VERYHIGH_MSG("Loading track %lu, containing key %lld", trackId, keyNum); VERYHIGH_MSG("Loading track %lu, containing key %lld", trackId, keyNum);
unsigned int timeout = 0; unsigned int timeout = 0;
unsigned long pageNum = pageNumForKey(trackId, keyNum); unsigned long pageNum = pageNumForKey(trackId, keyNum);
while (config->is_active && myConn && pageNum == -1){ while (keepGoing() && pageNum == -1){
if (!timeout){ if (!timeout){
HIGH_MSG("Requesting page with key %lu:%lld", trackId, keyNum); HIGH_MSG("Requesting page with key %lu:%lld", trackId, keyNum);
} }
@ -679,7 +684,7 @@ namespace Mist{
bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){ bool Output::seek(unsigned int tid, unsigned long long pos, bool getNextKey){
if (myMeta.live && myMeta.tracks[tid].lastms < pos){ if (myMeta.live && myMeta.tracks[tid].lastms < pos){
unsigned int maxTime = 0; unsigned int maxTime = 0;
while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20){ while (myMeta.tracks[tid].lastms < pos && myConn && ++maxTime <= 20 && keepGoing()){
Util::wait(500); Util::wait(500);
stats(); stats();
updateMeta(); updateMeta();
@ -726,7 +731,7 @@ namespace Mist{
}else{ }else{
VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset); VERYHIGH_MSG("Track %d no data (key %u @ %u) - waiting...", tid, getKeyForTime(tid, pos) + (getNextKey?1:0), tmp.offset);
unsigned int i = 0; unsigned int i = 0;
while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10){ while (!myMeta.live && nProxy.curPage[tid].mapped[tmp.offset] == 0 && ++i <= 10 && keepGoing()){
Util::wait(100*i); Util::wait(100*i);
stats(); stats();
} }
@ -824,7 +829,7 @@ namespace Mist{
} }
/*LTS-END*/ /*LTS-END*/
DONTEVEN_MSG("MistOut client handler started"); DONTEVEN_MSG("MistOut client handler started");
while (config->is_active && myConn && (wantRequest || parseData)){ while (keepGoing() && (wantRequest || parseData)){
if (wantRequest){ if (wantRequest){
requestHandler(); requestHandler();
} }
@ -846,11 +851,10 @@ namespace Mist{
firstPacketTime = lastPacketTime; firstPacketTime = lastPacketTime;
} }
//slow down processing, if real time speed is wanted //slow down processing, if real time speed is wanted
if (realTime){ if (realTime){
uint8_t i = 6; uint8_t i = 6;
while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && config->is_active && myConn){ while (--i && thisPacket.getTime() > (((Util::getMS() - firstTime)*1000)+maxSkipAhead)/realTime && keepGoing()){
Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu)); Util::sleep(std::min(thisPacket.getTime() - (((Util::getMS() - firstTime)*1000)+minSkipAhead)/realTime, 1000llu));
stats(); stats();
} }
@ -863,7 +867,7 @@ namespace Mist{
//wait at most double the look ahead time, plus ten seconds //wait at most double the look ahead time, plus ten seconds
uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime); uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime);
uint64_t needsTime = thisPacket.getTime() + needsLookAhead; uint64_t needsTime = thisPacket.getTime() + needsLookAhead;
while(--timeoutTries){ while(--timeoutTries && keepGoing()){
bool lookReady = true; bool lookReady = true;
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){ for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].lastms <= needsTime){ if (myMeta.tracks[*it].lastms <= needsTime){
@ -893,6 +897,7 @@ namespace Mist{
Triggers::doTrigger("CONN_STOP", payload, streamName); Triggers::doTrigger("CONN_STOP", payload, streamName);
} }
/*LTS-END*/ /*LTS-END*/
INFO_MSG("Shutting down because of stream end");
if (!onFinish()){ if (!onFinish()){
break; break;
} }
@ -1259,9 +1264,20 @@ namespace Mist{
} }
} }
if (!nProxy.userClient.isAlive()){ if (!nProxy.userClient.isAlive()){
onFinish(); if (isPushing() && !pushIsOngoing){
myConn.close(); waitForStreamPushReady();
return; if (!nProxy.userClient.isAlive()){
WARN_MSG("Failed to wait for buffer, aborting incoming push");
onFinish();
myConn.close();
return;
}
}else{
INFO_MSG("Received disconnect request from input");
onFinish();
myConn.close();
return;
}
} }
if (!isPushing()){ if (!isPushing()){
IPC::userConnection userConn(nProxy.userClient.getData()); IPC::userConnection userConn(nProxy.userClient.getData());
@ -1320,11 +1336,13 @@ namespace Mist{
// Initialize the stream source if needed, connect to it // Initialize the stream source if needed, connect to it
initialize(); initialize();
waitForStreamPushReady();
//pull the source setting from metadata //pull the source setting from metadata
strmSource = myMeta.sourceURI; strmSource = myMeta.sourceURI;
if (!strmSource.size()){ if (!strmSource.size()){
FAIL_MSG("Push rejected - stream %s not configured", streamName.c_str()); FAIL_MSG("Push rejected - stream %s not configured or unavailable", streamName.c_str());
pushing = false; pushing = false;
return false; return false;
} }
@ -1375,5 +1393,24 @@ namespace Mist{
return true; return true;
} }
/// Attempts to wait for a stream to finish shutting down if it is, then restarts and reconnects.
void Output::waitForStreamPushReady(){
uint8_t streamStatus = Util::getStreamStatus(streamName);
MEDIUM_MSG("Current status for %s buffer is %u", streamName.c_str(), streamStatus);
while (streamStatus != STRMSTAT_WAIT && streamStatus != STRMSTAT_READY && keepGoing()){
INFO_MSG("Waiting for %s buffer to be ready... (%u)", streamName.c_str(), streamStatus);
if (nProxy.userClient.getData()){
nProxy.userClient.finish();
nProxy.userClient = IPC::sharedClient();
}
Util::wait(1000);
streamStatus = Util::getStreamStatus(streamName);
if (streamStatus == STRMSTAT_OFF || streamStatus == STRMSTAT_WAIT || streamStatus == STRMSTAT_READY){
reconnect();
streamStatus = Util::getStreamStatus(streamName);
}
}
}
} }

View file

@ -87,8 +87,9 @@ namespace Mist {
std::string getCountry(std::string ip); std::string getCountry(std::string ip);
/*LTS-END*/ /*LTS-END*/
void doSync(bool force = false); void doSync(bool force = false);
inline bool keepGoing(){
return config->is_active && myConn;
}
std::map<unsigned long, unsigned int> currKeyOpen; std::map<unsigned long, unsigned int> currKeyOpen;
void loadPageForKey(long unsigned int trackId, long long int keyNum); void loadPageForKey(long unsigned int trackId, long long int keyNum);
int pageNumForKey(long unsigned int trackId, long long int keyNum); int pageNumForKey(long unsigned int trackId, long long int keyNum);
@ -132,6 +133,8 @@ namespace Mist {
virtual bool isRecording(); virtual bool isRecording();
virtual bool isPushing(){return pushing;}; virtual bool isPushing(){return pushing;};
bool allowPush(const std::string & passwd); bool allowPush(const std::string & passwd);
void waitForStreamPushReady();
bool pushIsOngoing;
void bufferLivePacket(DTSC::Packet & packet); void bufferLivePacket(DTSC::Packet & packet);
uint64_t firstPacketTime; uint64_t firstPacketTime;
uint64_t lastPacketTime; uint64_t lastPacketTime;