Merge branch 'development' into LTS_development

# Conflicts:
#	src/input/input.cpp
#	src/output/output.cpp
This commit is contained in:
Thulinma 2016-11-14 11:29:24 +01:00
commit 3f14db4b12
12 changed files with 137 additions and 64 deletions

View file

@ -71,11 +71,8 @@ static const char * DBG_LVL_LIST[] = {"NONE", "FAIL", "ERROR", "WARN", "INFO", "
#define INPUT_TIMEOUT STATS_DELAY
#endif
/// The size used for stream header pages under Windows, where they cannot be size-detected.
#define DEFAULT_META_PAGE_SIZE 16 * 1024 * 1024
/// The size used for stream header pages under Windows, where they cannot be size-detected.
#define DEFAULT_STRM_PAGE_SIZE 4 * 1024 * 1024
/// The size used for stream headers for live streams
#define DEFAULT_STRM_PAGE_SIZE 16 * 1024 * 1024
/// The size used for stream data pages under Windows, where they cannot be size-detected.
#define DEFAULT_DATA_PAGE_SIZE SHM_DATASIZE * 1024 * 1024

View file

@ -135,7 +135,7 @@ bool Controller::authorize(JSON::Value & Request, JSON::Value & Response, Socket
return true;
}
}
if (Request["authorize"]["password"].asString() != "" && Secure::md5(Storage["account"][UserID]["password"].asString()) != Request["authorize"]["password"].asString()){
if (Request["authorize"]["password"].asString() != ""){
Log("AUTH", "Failed login attempt " + UserID + " from " + conn.getHost());
}
}

View file

@ -97,7 +97,8 @@ namespace Mist {
INSANE_MSG("No header exists to compare - ignoring header check");
return;
}
if (bufHeader.st_mtime < bufStream.st_mtime) {
//the same second is not enough - add a 15 second window where we consider it too old
if (bufHeader.st_mtime < bufStream.st_mtime + 15) {
INFO_MSG("Overwriting outdated DTSH header file: %s ", headerFile.c_str());
remove(headerFile.c_str());
}
@ -180,12 +181,6 @@ namespace Mist {
/// streamname
/// input name
/// ~~~~~~~~~~~~~~~
/// The `"STREAM_UNLOAD"` trigger is stream-specific, and is ran right before an input shuts down and stops serving a stream. If cancelled, the shut down is delayed. Its payload is:
/// ~~~~~~~~~~~~~~~
/// streamname
/// input name
/// ~~~~~~~~~~~~~~~
//
void Input::serve(){
if (!isBuffer) {
for (std::map<unsigned int, DTSC::Track>::iterator it = myMeta.tracks.begin(); it != myMeta.tracks.end(); it++) {
@ -205,11 +200,15 @@ namespace Mist {
/*LTS-END*/
DEBUG_MSG(DLVL_DEVEL, "Input for stream %s started", streamName.c_str());
long long int activityCounter = Util::bootSecs();
while (((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)) && config->is_active) { //15 second timeout
activityCounter = Util::bootSecs();
//main serve loop
while (keepRunning()) {
//load pages for connected clients on request
//through the callbackWrapper function
userPage.parseEach(callbackWrapper);
//unload pages that haven't been used for a while
removeUnused();
//If users are connected and tracks exist, reset the activity counter
if (userPage.connectedUsers) {
if (myMeta.tracks.size()){
activityCounter = Util::bootSecs();
@ -218,17 +217,7 @@ namespace Mist {
} else {
DEBUG_MSG(DLVL_INSANE, "Timer running");
}
/*LTS-START*/
if ((Util::bootSecs() - activityCounter) >= INPUT_TIMEOUT || !config->is_active){//15 second timeout
if(Triggers::shouldTrigger("STREAM_UNLOAD", config->getString("streamname"))){
std::string payload = config->getString("streamname")+"\n" +capa["name"].asStringRef()+"\n";
if (!Triggers::doTrigger("STREAM_UNLOAD", payload, config->getString("streamname"))){
activityCounter = Util::bootSecs();
config->is_active = true;
}
}
}
/*LTS-END*/
//if not shutting down, wait 1 second before looping
if (config->is_active){
Util::wait(1000);
}
@ -238,6 +227,14 @@ namespace Mist {
//end player functionality
}
bool Input::keepRunning(){
//We keep running in serve mode if the config is still active AND either
// - INPUT_TIMEOUT seconds haven't passed yet,
// - this is a live stream and at least two of the biggest fragment haven't passed yet,
bool ret = (config->is_active && ((Util::bootSecs() - activityCounter) < INPUT_TIMEOUT || (myMeta.live && (Util::bootSecs() - activityCounter) < myMeta.biggestFragment()/500)));
return ret;
}
/// Main loop for stream-style inputs.
/// This loop will start the buffer without resume support, and then repeatedly call ..... followed by ....
void Input::stream(){

View file

@ -34,6 +34,7 @@ namespace Mist {
virtual void getNext(bool smart = true) {};
virtual void seek(int seekTime){};
virtual void finish();
virtual bool keepRunning();
virtual bool openStreamSource() { return false; };
virtual void closeStreamSource() {};
virtual void parseStreamHeader() {};
@ -60,6 +61,7 @@ namespace Mist {
unsigned int benchMark;
bool isBuffer;
uint64_t activityCounter;
JSON::Value capa;

View file

@ -5,6 +5,9 @@
#include <cstdlib>
#include <cstdio>
#include <string>
#include <sys/types.h>//for stat
#include <sys/stat.h>//for stat
#include <unistd.h>//for stat
#include <mist/util.h>
#include <mist/stream.h>
#include <mist/defines.h>
@ -46,9 +49,30 @@ namespace Mist {
if (!inFile) {
return false;
}
struct stat statData;
lastModTime = 0;
if (stat(config->getString("input").c_str(), &statData) != -1){
lastModTime = statData.st_mtime;
}
return true;
}
/// Overrides the default keepRunning function to shut down
/// if the file disappears or changes, by polling the file's mtime.
/// If neither applies, calls the original function.
bool inputFLV::keepRunning(){
struct stat statData;
if (stat(config->getString("input").c_str(), &statData) == -1){
INFO_MSG("Shutting down because input file disappeared");
return false;
}
if (lastModTime != statData.st_mtime){
INFO_MSG("Shutting down because input file changed");
return false;
}
return Input::keepRunning();
}
bool inputFLV::readHeader() {
if (!inFile){return false;}
//See whether a separate header file exists.

View file

@ -13,7 +13,9 @@ namespace Mist {
void getNext(bool smart = true);
void seek(int seekTime);
void trackSelect(std::string trackSpec);
bool keepRunning();
FLV::Tag tmpTag;
uint64_t lastModTime;
FILE * inFile;
};
}

View file

@ -452,7 +452,7 @@ namespace Mist {
INSANE_MSG("Found track/codec: %s", trit->second.codec.c_str());
}
static std::string source;
if (!source.size()){
if (!myMeta.tracks.size() && !source.size()){
IPC::sharedPage serverCfg(SHM_CONF, DEFAULT_CONF_PAGE_SIZE, false, false); ///< Contains server configuration and capabilities
IPC::semaphore configLock(SEM_CONF, O_CREAT | O_RDWR, ACCESSPERMS, 1);
configLock.wait();
@ -1082,17 +1082,27 @@ namespace Mist {
//if there's a timestamp mismatch, print this.
//except for live, where we never know the time in advance
if (thisPacket.getTime() != nxt.time && nxt.time && !atLivePoint){
if (thisPacket.getTime() != nxt.time && nxt.time){
if (!atLivePoint){
static int warned = 0;
if (warned < 5){
WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset);
if (++warned == 5){
WARN_MSG("Further warnings about time mismatches printed on HIGH level.");
}
WARN_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(),
thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time),
myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset);
if (++warned == 5){WARN_MSG("Further warnings about time mismatches printed on HIGH level.");}
}else{
HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(), thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time), myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset);
HIGH_MSG("Loaded %s track %ld@%llu instead of %u@%llu (%dms, %s, offset %lu)", streamName.c_str(), thisPacket.getTrackId(),
thisPacket.getTime(), nxt.tid, nxt.time, (int)((long long)thisPacket.getTime() - (long long)nxt.time),
myMeta.tracks[nxt.tid].codec.c_str(), nxt.offset);
}
}
nxt.time = thisPacket.getTime();
//swap out the next object in the buffer with a new one
buffer.erase(buffer.begin());
buffer.insert(nxt);
VERYHIGH_MSG("JIT reordering %u@%llu.", nxt.tid, nxt.time);
return false;
}
//when live, every keyframe, check correctness of the keyframe number
if (myMeta.live && thisPacket.getFlag("keyframe")){

View file

@ -259,6 +259,7 @@ namespace Mist {
OutHLS::OutHLS(Socket::Connection & conn) : TSOutput(conn) {
realTime = 0;
until=0xFFFFFFFFFFFFFFFFull;
}
OutHLS::~OutHLS() {}
@ -474,6 +475,37 @@ namespace Mist {
}
}
void OutHLS::sendNext(){
//First check if we need to stop.
if (thisPacket.getTime() >= until){
stop();
wantRequest = true;
parseData = false;
//Ensure alignment of contCounters for selected tracks, to prevent discontinuities.
for (std::set<unsigned long>::iterator it = selectedTracks.begin(); it != selectedTracks.end(); ++it){
DTSC::Track & Trk = myMeta.tracks[*it];
uint32_t pkgPid = 255 + *it;
int & contPkg = contCounters[pkgPid];
if (contPkg % 16 != 0){
packData.clear();
packData.setPID(pkgPid);
packData.addStuffing();
while (contPkg % 16 != 0){
packData.setContinuityCounter(++contPkg);
sendTS(packData.checkAndGetBuffer());
}
packData.clear();
}
}
//Signal end of data
H.Chunkify("", 0, myConn);
return;
}
//Invoke the generic TS output sendNext handler
TSOutput::sendNext();
}
void OutHLS::sendTS(const char * tsData, unsigned int len) {
H.Chunkify(tsData, len, myConn);

View file

@ -8,6 +8,7 @@ namespace Mist {
~OutHLS();
static void init(Util::Config * cfg);
void sendTS(const char * tsData, unsigned int len=188);
void sendNext();
void onHTTP();
bool isReadyForPlay();
protected:
@ -25,6 +26,7 @@ namespace Mist {
int keysToSend;
unsigned int vidTrack;
unsigned int audTrack;
long long unsigned int until;
};
}

View file

@ -1144,6 +1144,7 @@ namespace Mist {
case 9: //video data
case 18: {//meta data
static std::map<unsigned int, AMF::Object> pushMeta;
static uint64_t lastTagTime = 0;
if (!isInitialized) {
MEDIUM_MSG("Received useless media data");
onFinish();
@ -1161,7 +1162,23 @@ namespace Mist {
unsigned int reTrack = next.cs_id*3 + (F.data[0] == 0x09 ? 1 : (F.data[0] == 0x08 ? 2 : 3));
F.toMeta(myMeta, *amf_storage, reTrack);
if (F.getDataLen() && !(F.needsInitData() && F.isInitData())){
thisPacket.genericFill(F.tagTime(), F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
uint64_t tagTime = next.timestamp;
//Check for decreasing timestamps - this is a connection error.
//We allow wrapping around the 32 bits maximum value if the most significant 8 bits are set.
/// \TODO Provide time continuity for wrap-around.
if (lastTagTime && tagTime < lastTagTime && lastTagTime < 0xFF000000ull){
FAIL_MSG("Timestamps went from %llu to %llu (decreased): disconnecting!", lastTagTime, tagTime);
onFinish();
break;
}
//Check if we went more than 10 minutes into the future
if (lastTagTime && tagTime > lastTagTime + 600000){
FAIL_MSG("Timestamps went from %llu to %llu (> 10m in future): disconnecting!", lastTagTime, tagTime);
onFinish();
break;
}
thisPacket.genericFill(tagTime, F.offset(), reTrack, F.getData(), F.getDataLen(), 0, F.isKeyframe);
lastTagTime = tagTime;
if (!nProxy.userClient.getData()){
char userPageName[NAME_BUFFER_SIZE];
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());

View file

@ -6,7 +6,6 @@ namespace Mist {
haveAvcc = false;
haveHvcc = false;
ts_from = 0;
until=0xFFFFFFFFFFFFFFFFull;
setBlocking(true);
sendRepeatingHeaders = false;
appleCompat=false;
@ -36,7 +35,6 @@ namespace Mist {
packData.setContinuityCounter(++contPkg);
if (firstPack){
packData.setUnitStart(1);
packData.setDiscontinuity(true);
if (video){
if (keyframe){
packData.setRandomAccess(true);
@ -69,13 +67,6 @@ namespace Mist {
char * dataPointer = 0;
unsigned int dataLen = 0;
thisPacket.getString("data", dataPointer, dataLen); //data
if (packTime >= until){ //this if should only trigger for HLS
stop();
wantRequest = true;
parseData = false;
sendTS("",0);
return;
}
//apple compatibility timestamp correction
if (appleCompat){
packTime -= ts_from;

View file

@ -14,7 +14,7 @@ namespace Mist {
public:
TSOutput(Socket::Connection & conn);
virtual ~TSOutput(){};
void sendNext();
virtual void sendNext();
virtual void sendTS(const char * tsData, unsigned int len=188){};
void fillPacket(char const * data, size_t dataLen, bool & firstPack, bool video, bool keyframe, uint32_t pkgPid, int & contPkg);
protected:
@ -33,7 +33,6 @@ namespace Mist {
/*LTS-END*/
bool sendRepeatingHeaders;
long long unsigned int ts_from;
long long unsigned int until;
long long unsigned int lastVid;
};
}