Merge branch 'development' into LTS_development

# Conflicts:
#	src/output/output.cpp
#	src/output/output_http_internal.cpp
This commit is contained in:
Thulinma 2017-01-10 12:36:53 +01:00
commit dadb1ebde8
9 changed files with 190 additions and 167 deletions

View file

@ -338,6 +338,10 @@ namespace Mist {
getNext();
nProxy.userClient.keepAlive();
}
std::string reason = "Unknown";
if (!thisPacket){reason = "Invalid packet";}
if (!config->is_active){reason = "received deactivate signal";}
if (!nProxy.userClient.isAlive()){reason = "buffer shutdown";}
closeStreamSource();
@ -346,7 +350,7 @@ namespace Mist {
pullLock.post();
pullLock.close();
pullLock.unlink();
INFO_MSG("Stream input %s closing clean", streamName.c_str());
INFO_MSG("Stream input %s closing clean; reason: %s", streamName.c_str(), reason.c_str());
return;
}

View file

@ -248,68 +248,79 @@ namespace Mist {
void inputDTSC::getNext(bool smart) {
if (!needsLock()){
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTCM){
nProxy.userClient.keepAlive();
std::string cmd;
thisPacket.getString("cmd", cmd);
if (cmd == "reset"){
//Read next packet
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTSC_HEAD){
DTSC::Meta newMeta;
newMeta.reinit(thisPacket);
//Detect new tracks
std::set<unsigned int> newTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){
if (!myMeta.tracks.count(it->first)){
newTracks.insert(it->first);
while (config->is_active){
if (thisPacket.getVersion() == DTSC::DTCM){
nProxy.userClient.keepAlive();
std::string cmd;
thisPacket.getString("cmd", cmd);
if (cmd == "reset"){
//Read next packet
thisPacket.reInit(srcConn);
if (thisPacket.getVersion() == DTSC::DTSC_HEAD){
DTSC::Meta newMeta;
newMeta.reinit(thisPacket);
//Detect new tracks
std::set<unsigned int> newTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){
if (!myMeta.tracks.count(it->first)){
newTracks.insert(it->first);
}
}
}
for (std::set<unsigned int>::iterator it = newTracks.begin(); it != newTracks.end(); it++){
INFO_MSG("Reset: adding track %d", *it);
myMeta.tracks[*it] = newMeta.tracks[*it];
continueNegotiate(*it, true);
}
//Detect removed tracks
std::set<unsigned int> deletedTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!newMeta.tracks.count(it->first)){
deletedTracks.insert(it->first);
for (std::set<unsigned int>::iterator it = newTracks.begin(); it != newTracks.end(); it++){
INFO_MSG("Reset: adding track %d", *it);
myMeta.tracks[*it] = newMeta.tracks[*it];
continueNegotiate(*it, true);
}
}
for(std::set<unsigned int>::iterator it = deletedTracks.begin(); it != deletedTracks.end(); it++){
INFO_MSG("Reset: deleting track %d", *it);
myMeta.tracks.erase(*it);
}
//Detect removed tracks
std::set<unsigned int> deletedTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (!newMeta.tracks.count(it->first)){
deletedTracks.insert(it->first);
}
}
//Read next packet before returning
return getNext(smart);
for(std::set<unsigned int>::iterator it = deletedTracks.begin(); it != deletedTracks.end(); it++){
INFO_MSG("Reset: deleting track %d", *it);
myMeta.tracks.erase(*it);
}
thisPacket.reInit(srcConn);//read the next packet before continuing
}else{
myMeta = DTSC::Meta();
}
}else{
myMeta = DTSC::Meta();
thisPacket.reInit(srcConn);//read the next packet before continuing
}
}else{
//Read next packet before returning
thisPacket.reInit(srcConn);
}
}else if (thisPacket.getVersion() == DTSC::DTSC_HEAD){
DTSC::Meta newMeta;
newMeta.reinit(thisPacket);
std::set<unsigned int> newTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){
if (!myMeta.tracks.count(it->first)){
newTracks.insert(it->first);
continue;//parse the next packet before returning
}else if (thisPacket.getVersion() == DTSC::DTSC_HEAD){
DTSC::Meta newMeta;
newMeta.reinit(thisPacket);
std::set<unsigned int> newTracks;
for (std::map<unsigned int, DTSC::Track>::iterator it = newMeta.tracks.begin(); it != newMeta.tracks.end(); it++){
if (!myMeta.tracks.count(it->first)){
newTracks.insert(it->first);
}
}
}
for (std::set<unsigned int>::iterator it = newTracks.begin(); it != newTracks.end(); it++){
INFO_MSG("New header: adding track %d (%s)", *it, newMeta.tracks[*it].type.c_str());
myMeta.tracks[*it] = newMeta.tracks[*it];
continueNegotiate(*it, true);
for (std::set<unsigned int>::iterator it = newTracks.begin(); it != newTracks.end(); it++){
INFO_MSG("New header: adding track %d (%s)", *it, newMeta.tracks[*it].type.c_str());
myMeta.tracks[*it] = newMeta.tracks[*it];
continueNegotiate(*it, true);
}
thisPacket.reInit(srcConn);//read the next packet before continuing
continue;//parse the next packet before returning
}
return getNext(smart);
//We now know we have either a data packet, or an error.
if (!thisPacket.getTrackId()){
if (thisPacket.getVersion() == DTSC::DTSC_V2){
WARN_MSG("Received bad packet for stream %s: %llu@%llu", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime());
}else{
//All types except data packets are handled above, so if it's not a V2 data packet, we assume corruption
WARN_MSG("Invalid packet header for stream %s", streamName.c_str());
}
}
return;//we have a packet
}
}else{
if (smart) {

View file

@ -20,10 +20,6 @@
#include <netdb.h>
/*LTS-END*/
#ifndef MIN_DELAY
#define MIN_DELAY 2500
#endif
namespace Mist{
JSON::Value Output::capa = JSON::Value();
@ -50,7 +46,7 @@ namespace Mist{
sought = false;
isInitialized = false;
isBlocking = false;
completeKeysOnly = false;
needsLookAhead = 0;
lastStats = 0;
maxSkipAhead = 7500;
minSkipAhead = 5000;
@ -751,7 +747,7 @@ namespace Mist{
/// This function decides where in the stream initial playback starts.
/// The default implementation calls seek(0) for VoD.
/// For live, it seeks to the last sync'ed keyframe of the main track, no closer than MIN_DELAY ms from the end.
/// For live, it seeks to the last sync'ed keyframe of the main track, no closer than needsLookAhead ms from the end.
/// Unless lastms < 5000, then it seeks to the first keyframe of the main track.
/// Aborts if there is no main track or it has no keyframes.
void Output::initialSeek(){
@ -767,7 +763,7 @@ namespace Mist{
bool good = true;
//check if all tracks have data for this point in time
for (std::set<unsigned long>::iterator ti = selectedTracks.begin(); ti != selectedTracks.end(); ++ti){
if (myMeta.tracks[*ti].lastms < seekPos+MIN_DELAY){good = false; break;}
if (myMeta.tracks[*ti].lastms < seekPos+needsLookAhead){good = false; break;}
if (mainTrack == *ti){continue;}//skip self
if (!myMeta.tracks.count(*ti)){
HIGH_MSG("Skipping track %lu, not in tracks", *ti);
@ -854,41 +850,32 @@ namespace Mist{
}
}
//delay the stream until its current keyframe is complete, if only complete keys wanted
if (completeKeysOnly){
bool completeKeyReady = false;
int timeoutTries = 40;//wait default 250ms*40=10 seconds
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++){
if (it->second.keys.size() >1){
int thisTimeoutTries = ((it->second.lastms - it->second.firstms) / (it->second.keys.size()-1)) / 125;
if (thisTimeoutTries > timeoutTries) timeoutTries = thisTimeoutTries;
}
}
unsigned int mTrack = getMainSelectedTrack();
while(!completeKeyReady && timeoutTries>0){
if (!myMeta.tracks[mTrack].keys.size() || myMeta.tracks[mTrack].keys.rbegin()->getTime() <= thisPacket.getTime()){
completeKeyReady = false;
}else{
DTSC::Key & mustHaveKey = myMeta.tracks[mTrack].getKey(getKeyForTime(mTrack, thisPacket.getTime()));
unsigned long long mustHaveTime = mustHaveKey.getTime() + mustHaveKey.getLength();
completeKeyReady = true;
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].lastms < mustHaveTime){
completeKeyReady = false;
break;
//delay the stream until metadata has caught up, if needed
if (needsLookAhead){
//we sleep in 250ms increments, or less if the lookahead time itself is less
uint32_t sleepTime = std::min((uint32_t)250, needsLookAhead);
//wait at most double the look ahead time, plus ten seconds
uint32_t timeoutTries = (needsLookAhead / sleepTime) * 2 + (10000/sleepTime);
uint64_t needsTime = thisPacket.getTime() + needsLookAhead;
while(--timeoutTries){
bool lookReady = true;
for (std::set<long unsigned int>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); it++){
if (myMeta.tracks[*it].lastms <= needsTime){
if (timeoutTries == 1){
WARN_MSG("Track %lu: %llu <= %llu", *it, myMeta.tracks[*it].lastms, needsTime);
}
lookReady = false;
break;
}
}
if (!completeKeyReady){
timeoutTries--;//we count down
stats();
Util::wait(250);
updateMeta();
}
if (lookReady){break;}
Util::wait(sleepTime);
stats();
updateMeta();
}
if (timeoutTries<=0){
WARN_MSG("Waiting for key frame timed out");
completeKeysOnly = false;
if (!timeoutTries){
WARN_MSG("Waiting for lookahead timed out - resetting lookahead!");
needsLookAhead = 0;
}
}

View file

@ -114,7 +114,7 @@ namespace Mist {
unsigned int maxSkipAhead;///< Maximum ms that we will go ahead of the intended timestamps.
unsigned int minSkipAhead;///< Minimum ms that we will go ahead of the intended timestamps.
unsigned int realTime;///< Playback speed in ms of data per second. eg: 0 is infinite, 1000 real-time, 5000 is 0.2X speed, 500 = 2X speed.
bool completeKeysOnly;///< Bool if we send whole keys only, so the metadata is complete and the output knows in advance what will be sent.
uint32_t needsLookAhead;///< Amount of millis we need to be able to look ahead in the metadata
//Read/write status variables
Socket::Connection & myConn;///< Connection to the client.

View file

@ -32,6 +32,11 @@ namespace Mist {
}
void OutHTTP::onFail(){
// send logo icon
if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){
sendIcon();
return;
}
INFO_MSG("Failing: %s", H.url.c_str());
if (H.url.size() >= 3 && H.url.substr(H.url.size() - 3) == ".js"){
if (H.url.size() >= 5 && H.url.substr(0, 5) == "/json"){
@ -258,36 +263,7 @@ namespace Mist {
}
// send logo icon
if (H.url.length() > 4 && H.url.substr(H.url.length() - 4, 4) == ".ico"){
/*LTS-START*/
if (H.GetVar("s").size() && H.GetVar("s") == SUPER_SECRET){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.setCORSHeaders();
if(method == "OPTIONS" || method == "HEAD"){
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
H.SetBody("Yup");
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
/*LTS-END*/
H.Clean();
#include "../icon.h"
H.SetHeader("Content-Type", "image/x-icon");
H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
H.SetHeader("Content-Length", icon_len);
H.setCORSHeaders();
if(method == "OPTIONS" || method == "HEAD"){
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
H.SendResponse("200", "OK", myConn);
myConn.SendNow((const char*)icon_data, icon_len);
H.Clean();
sendIcon();
return;
}
@ -646,4 +622,40 @@ namespace Mist {
return;
}
}
void OutHTTP::sendIcon(){
std::string method = H.method;
/*LTS-START*/
if (H.GetVar("s").size() && H.GetVar("s") == SUPER_SECRET){
H.Clean();
H.SetHeader("Server", "mistserver/" PACKAGE_VERSION);
H.setCORSHeaders();
if(method == "OPTIONS" || method == "HEAD"){
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
H.SetBody("Yup");
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
/*LTS-END*/
H.Clean();
#include "../icon.h"
H.SetHeader("Content-Type", "image/x-icon");
H.SetHeader("Server", "MistServer/" PACKAGE_VERSION);
H.SetHeader("Content-Length", icon_len);
H.setCORSHeaders();
if(method == "OPTIONS" || method == "HEAD"){
H.SendResponse("200", "OK", myConn);
H.Clean();
return;
}
H.SendResponse("200", "OK", myConn);
myConn.SendNow((const char*)icon_data, icon_len);
H.Clean();
}
}

View file

@ -11,6 +11,7 @@ namespace Mist {
virtual void onFail();
void onHTTP();
bool isReadyForPlay();
void sendIcon();
};
}