Generalized recording, pimped.

This commit is contained in:
Thulinma 2017-08-31 15:41:46 +02:00
parent f8b9db9dcd
commit 8fbdafb288
9 changed files with 85 additions and 76 deletions

View file

@ -374,7 +374,7 @@ pid_t Util::startPush(const std::string & streamname, std::string & target) {
DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len); DTSC::Scan config = DTSC::Scan(mistConfOut.mapped, mistConfOut.len);
DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors"); DTSC::Scan outputs = config.getMember("capabilities").getMember("connectors");
std::string output_bin = ""; std::string output_bin = "";
std::string checkTarget = target.substr(0, target.find('?')); std::string checkTarget = target.substr(0, target.rfind('?'));
unsigned int outputs_size = outputs.getSize(); unsigned int outputs_size = outputs.getSize();
for (unsigned int i = 0; i<outputs_size && !output_bin.size(); ++i){ for (unsigned int i = 0; i<outputs_size && !output_bin.size(); ++i){
DTSC::Scan output = outputs.getIndice(i); DTSC::Scan output = outputs.getIndice(i);

View file

@ -87,42 +87,29 @@ namespace Mist{
sentHeader = false; sentHeader = false;
isRecordingToFile = false; isRecordingToFile = false;
if (config->getString("streamname").size()){ //If we have a streamname option, set internal streamname to that option
if (!streamName.size() && config->hasOption("streamname")){
streamName = config->getString("streamname"); streamName = config->getString("streamname");
} }
if(capa.isMember("push_urls")){ /*LTS-START*/
// If we have a target, scan for trailing ?, remove it, parse into targetParams
if (config->hasOption("target")){
std::string tgt = config->getString("target"); std::string tgt = config->getString("target");
struct stat tgtStat; if (tgt.rfind('?') != std::string::npos){
if (tgt.size()){ INFO_MSG("Stripping target options: %s", tgt.substr(tgt.rfind('?') + 1).c_str());
if(stat(tgt.substr(0, tgt.rfind('/')).c_str(), &tgtStat) != 0){ HTTP::parseVars(tgt.substr(tgt.rfind('?') + 1), targetParams);
INFO_MSG("could not stat %s", tgt.substr(0, tgt.rfind('/')).c_str()); config->getOption("target", true).append(tgt.substr(0, tgt.rfind('?')));
return;
}
if (!streamName.size()){
WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["nama"].asString().c_str());
conn.close();
return;
}
if (tgt == "-"){
parseData = true;
wantRequest = false;
INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(), capa["nama"].asString().c_str());
return;
}
std::string params = tgt.substr(tgt.find('?') + 1);
tgt = tgt.substr(0, tgt.find('?'));
if (connectToFile(tgt)){
parseData = true;
wantRequest = false;
INFO_MSG("Recording %s to %s with %s format", streamName.c_str(), tgt.c_str(), capa["nama"].asString().c_str());
HTTP::parseVars(params, recParams);
}else{ }else{
conn.close(); INFO_MSG("Not modifying target (%s), no options present", tgt.c_str());
} }
} }
/*LTS-END*/
} }
bool Output::isFileTarget(){
INFO_MSG("Default file target handler (false)");
return false;
} }
void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){ void Output::listener(Util::Config & conf, int (*callback)(Socket::Connection & S)){
@ -820,27 +807,29 @@ namespace Mist{
if (good){break;} if (good){break;}
} }
} }
/*LTS-START*/
if (isRecordingToFile){ if (isRecordingToFile){
if (recParams.count("recuntil")){ if (targetParams.count("recstop")){
long long endRec = atoll(recParams["recuntil"].c_str()); long long endRec = atoll(targetParams["recstop"].c_str());
if (endRec < startTime()){ if (endRec < startTime()){
FAIL_MSG("Record range not available anymore"); FAIL_MSG("Entire recording range is in the past");
config->is_active = false; onFail();
return; return;
} }
INFO_MSG("Recording will stop at %lld", endRec);
} }
if (recParams.count("recfrom") && atoll(recParams["recfrom"].c_str()) != 0){ if (targetParams.count("recstart") && atoll(targetParams["recstart"].c_str()) != 0){
unsigned long int mainTrack = getMainSelectedTrack(); unsigned long int mainTrack = getMainSelectedTrack();
long long startRec = atoll(recParams["recfrom"].c_str()); long long startRec = atoll(targetParams["recstart"].c_str());
if (startRec > myMeta.tracks[mainTrack].lastms){ if (startRec > myMeta.tracks[mainTrack].lastms){
if (myMeta.vod){ if (myMeta.vod){
FAIL_MSG("Record range out of bounds on vod file"); FAIL_MSG("Recording start past end of non-live source");
config->is_active = false; onFail();
return; return;
} }
long unsigned int streamAvail = myMeta.tracks[mainTrack].lastms; long unsigned int streamAvail = myMeta.tracks[mainTrack].lastms;
long unsigned int lastUpdated = Util::getMS(); long unsigned int lastUpdated = Util::getMS();
while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail){ while (Util::getMS() - lastUpdated < 5000 && startRec > streamAvail && keepGoing()){
Util::sleep(500); Util::sleep(500);
updateMeta(); updateMeta();
if (myMeta.tracks[mainTrack].lastms > streamAvail){ if (myMeta.tracks[mainTrack].lastms > streamAvail){
@ -851,12 +840,14 @@ namespace Mist{
} }
} }
if (startRec < startTime()){ if (startRec < startTime()){
WARN_MSG("Record begin @ %llu ms not available, starting at %llu ms instead", startRec, startTime()); WARN_MSG("Record begin at %llu ms not available, starting at %llu ms instead", startRec, startTime());
startRec = startTime(); startRec = startTime();
} }
INFO_MSG("Recording will start at %lld", startRec);
seekPos = startRec; seekPos = startRec;
} }
} }
/*LTS-END*/
MEDIUM_MSG("Initial seek to %llums", seekPos); MEDIUM_MSG("Initial seek to %llums", seekPos);
seek(seekPos); seek(seekPos);
} }
@ -944,6 +935,36 @@ namespace Mist{
/// ~~~~~~~~~~~~~~~ /// ~~~~~~~~~~~~~~~
int Output::run(){ int Output::run(){
/*LTS-START*/ /*LTS-START*/
//Connect to file target, if needed
if(isFileTarget()){
if (!streamName.size()){
WARN_MSG("Recording unconnected %s output to file! Cancelled.", capa["name"].asString().c_str());
myConn.close();
return 2;
}
initialize();
if (!myMeta.tracks.size() || !selectedTracks.size() || !keepGoing()){
INFO_MSG("Stream not available - aborting");
myConn.close();
}else{
if (config->getString("target") == "-"){
parseData = true;
wantRequest = false;
if (!targetParams.count("realtime")){realTime = 0;}
INFO_MSG("Outputting %s to stdout with %s format", streamName.c_str(), capa["name"].asString().c_str());
}else{
if (connectToFile(config->getString("target"))){
parseData = true;
wantRequest = false;
if (!targetParams.count("realtime")){realTime = 0;}
INFO_MSG("Recording %s to %s with %s format", streamName.c_str(), config->getString("target").c_str(), capa["name"].asString().c_str());
}else{
myConn.close();
}
}
}
}
//Handle CONN_OPEN trigger, if needed
if(Triggers::shouldTrigger("CONN_OPEN", streamName)){ if(Triggers::shouldTrigger("CONN_OPEN", streamName)){
std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl; std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"+reqUrl;
if (!Triggers::doTrigger("CONN_OPEN", payload, streamName)){ if (!Triggers::doTrigger("CONN_OPEN", payload, streamName)){
@ -1012,19 +1033,21 @@ namespace Mist{
} }
} }
if (isRecordingToFile && recParams.count("recuntil") && atoll(recParams["recuntil"].c_str()) < lastPacketTime){ if (isRecordingToFile && targetParams.count("recstop") && atoll(targetParams["recstop"].c_str()) < lastPacketTime){
config->is_active = false; INFO_MSG("End of planned recording reached, shutting down");
}else{ if (!onFinish()){
sendNext(); break;
} }
}
sendNext();
}else{ }else{
INFO_MSG("Shutting down because of stream end");
/*LTS-START*/ /*LTS-START*/
if(Triggers::shouldTrigger("CONN_STOP", streamName)){ if(Triggers::shouldTrigger("CONN_STOP", streamName)){
std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n"; std::string payload = streamName+"\n" + getConnectedHost() +"\n"+capa["name"].asStringRef()+"\n";
Triggers::doTrigger("CONN_STOP", payload, streamName); Triggers::doTrigger("CONN_STOP", payload, streamName);
} }
/*LTS-END*/ /*LTS-END*/
INFO_MSG("Shutting down because of stream end");
if (!onFinish()){ if (!onFinish()){
break; break;
} }
@ -1142,7 +1165,7 @@ namespace Mist{
//actually drop what we found. //actually drop what we found.
//if both of the above cases occur, the next prepareNext iteration will take care of that //if both of the above cases occur, the next prepareNext iteration will take care of that
for (std::set<uint32_t>::iterator it = dropTracks.begin(); it != dropTracks.end(); ++it){ for (std::set<uint32_t>::iterator it = dropTracks.begin(); it != dropTracks.end(); ++it){
dropTrack(*it, "seek/select mismatch", true); dropTrack(*it, "seek/select mismatch");
} }
return false; return false;
} }
@ -1150,7 +1173,7 @@ namespace Mist{
sortedPageInfo nxt = *(buffer.begin()); sortedPageInfo nxt = *(buffer.begin());
if (!myMeta.tracks.count(nxt.tid)){ if (!myMeta.tracks.count(nxt.tid)){
dropTrack(nxt.tid, "disappeared from metadata", true); dropTrack(nxt.tid, "disappeared from metadata");
return false; return false;
} }
@ -1174,7 +1197,7 @@ namespace Mist{
buffer.insert(nxt); buffer.insert(nxt);
} }
}else{ }else{
dropTrack(nxt.tid, "page load failure", true); dropTrack(nxt.tid, "page load failure");
} }
return false; return false;
} }
@ -1186,6 +1209,10 @@ namespace Mist{
dropTrack(nxt.tid, "timeless empty packet"); dropTrack(nxt.tid, "timeless empty packet");
return false; return false;
} }
//for VoD, check if we've reached the end of the track, if so, drop it
if (myMeta.vod && nxt.time > myMeta.tracks[nxt.tid].lastms){
dropTrack(nxt.tid, "Reached end of track", false);
}
//if this is a live stream, we might have just reached the live point. //if this is a live stream, we might have just reached the live point.
//check where the next key is //check where the next key is
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time); nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, nxt.time);
@ -1452,6 +1479,7 @@ namespace Mist{
} }
close(outFile); close(outFile);
isRecordingToFile = true; isRecordingToFile = true;
sought = false;
return true; return true;
} }

View file

@ -96,13 +96,13 @@ namespace Mist {
int pageNumForKey(long unsigned int trackId, long long int keyNum); int pageNumForKey(long unsigned int trackId, long long int keyNum);
int pageNumMax(long unsigned int trackId); int pageNumMax(long unsigned int trackId);
bool isRecordingToFile; bool isRecordingToFile;
std::map<std::string, std::string> recParams;
unsigned int lastStats;///<Time of last sending of stats. unsigned int lastStats;///<Time of last sending of stats.
std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes. std::map<unsigned long, unsigned long> nxtKeyNum;///< Contains the number of the next key, for page seeking purposes.
std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets. std::set<sortedPageInfo> buffer;///< A sorted list of next-to-be-loaded packets.
bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext(). bool sought;///<If a seek has been done, this is set to true. Used for seeking on prepareNext().
protected://these are to be messed with by child classes protected://these are to be messed with by child classes
bool pushing; bool pushing;
std::map<std::string, std::string> targetParams; /*LTS*/
std::string UA; ///< User Agent string, if known. std::string UA; ///< User Agent string, if known.
uint16_t uaDelay;///<Seconds to wait before setting the UA. uint16_t uaDelay;///<Seconds to wait before setting the UA.
uint64_t lastRecv; uint64_t lastRecv;
@ -133,6 +133,7 @@ namespace Mist {
std::map<int,DTSCPageData> bookKeeping; std::map<int,DTSCPageData> bookKeeping;
virtual bool isRecording(); virtual bool isRecording();
virtual bool isFileTarget();
virtual bool isPushing(){return pushing;}; virtual bool isPushing(){return pushing;};
bool allowPush(const std::string & passwd); bool allowPush(const std::string & passwd);
void waitForStreamPushReady(); void waitForStreamPushReady();

View file

@ -9,9 +9,6 @@ namespace Mist {
if (config->getString("ip").size()){ if (config->getString("ip").size()){
myConn.setHost(config->getString("ip")); myConn.setHost(config->getString("ip"));
} }
if (config->getString("streamname").size()){
streamName = config->getString("streamname");
}
config->activate(); config->activate();
} }

View file

@ -11,6 +11,7 @@ namespace Mist {
void sendTS(const char * tsData, unsigned int len=188); void sendTS(const char * tsData, unsigned int len=188);
private: private:
bool isRecording(); bool isRecording();
bool isFileTarget(){return isRecording();}
}; };
} }

View file

@ -12,6 +12,7 @@ namespace Mist {
private: private:
FLV::Tag tag; FLV::Tag tag;
bool isRecording(); bool isRecording();
bool isFileTarget(){return isRecording();}
}; };
} }

View file

@ -1,28 +1,7 @@
#include "output_progressive_mp3.h" #include "output_progressive_mp3.h"
namespace Mist { namespace Mist {
OutProgressiveMP3::OutProgressiveMP3(Socket::Connection & conn) : HTTPOutput(conn){ OutProgressiveMP3::OutProgressiveMP3(Socket::Connection & conn) : HTTPOutput(conn){}
if (config->getString("target").size()){
if (!streamName.size()){
WARN_MSG("Recording unconnected MP3 output to file! Cancelled.");
conn.close();
return;
}
if (config->getString("target") == "-"){
parseData = true;
wantRequest = false;
INFO_MSG("Outputting %s to stdout in MP3 format", streamName.c_str());
return;
}
if (connectToFile(config->getString("target"))){
parseData = true;
wantRequest = false;
INFO_MSG("Recording %s to %s in MP3 format", streamName.c_str(), config->getString("target").c_str());
}else{
conn.close();
}
}
}
void OutProgressiveMP3::init(Util::Config * cfg){ void OutProgressiveMP3::init(Util::Config * cfg){
HTTPOutput::init(cfg); HTTPOutput::init(cfg);

View file

@ -11,6 +11,7 @@ namespace Mist {
void sendHeader(); void sendHeader();
private: private:
bool isRecording(); bool isRecording();
bool isFileTarget(){return isRecording();}
}; };
} }

View file

@ -11,6 +11,7 @@ namespace Mist {
void sendHeader(); void sendHeader();
private: private:
bool isRecording(); bool isRecording();
bool isFileTarget(){return isRecording();}
}; };
} }