Merge branch 'development' into LTS_development
# Conflicts: # lib/shared_memory.cpp # src/controller/controller.cpp # src/controller/controller_storage.h # src/output/output.cpp
This commit is contained in:
commit
1172768c34
20 changed files with 270 additions and 192 deletions
|
@ -34,6 +34,7 @@
|
|||
#include <ctime>
|
||||
#include <vector>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/wait.h>
|
||||
#include <mist/config.h>
|
||||
#include <mist/socket.h>
|
||||
#include <mist/http_parser.h>
|
||||
|
@ -95,6 +96,7 @@ void createAccount (std::string account){
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/// Status monitoring thread.
|
||||
/// Will check outputs, inputs and converters every five seconds
|
||||
void statusMonitor(void * np){
|
||||
|
@ -138,15 +140,14 @@ void statusMonitor(void * np){
|
|||
configLock.unlink();
|
||||
}
|
||||
|
||||
///\brief The main entry point for the controller.
|
||||
///\brief The main loop for the controller.
|
||||
///
|
||||
/// \triggers
|
||||
/// The `"SYSTEM_STOP"` trigger is global, and is ran when the controller shuts down. If cancelled, the controller does not shut down and will attempt to re-open the API socket. Its payload is:
|
||||
/// ~~~~~~~~~~~~~~~
|
||||
/// shutdown reason
|
||||
/// ~~~~~~~~~~~~~~~
|
||||
int main(int argc, char ** argv){
|
||||
|
||||
int main_loop(int argc, char ** argv){
|
||||
Controller::Storage = JSON::fromFile("config.json");
|
||||
JSON::Value stored_port = JSON::fromString("{\"long\":\"port\", \"short\":\"p\", \"arg\":\"integer\", \"help\":\"TCP port to listen on.\"}");
|
||||
stored_port["default"] = Controller::Storage["config"]["controller"]["port"];
|
||||
|
@ -163,7 +164,6 @@ int main(int argc, char ** argv){
|
|||
if ( !stored_user["default"]){
|
||||
stored_user["default"] = "root";
|
||||
}
|
||||
Controller::conf = Util::Config(argv[0]);
|
||||
Controller::conf.addOption("port", stored_port);
|
||||
Controller::conf.addOption("interface", stored_interface);
|
||||
Controller::conf.addOption("username", stored_user);
|
||||
|
@ -205,6 +205,7 @@ int main(int argc, char ** argv){
|
|||
if (pipe(pipeErr) >= 0){
|
||||
dup2(pipeErr[1], STDERR_FILENO);//cause stderr to write to the pipe
|
||||
close(pipeErr[1]);//close the unneeded pipe file descriptor
|
||||
Util::Procs::socketList.insert(pipeErr[0]);
|
||||
tthread::thread msghandler(Controller::handleMsg, (void*)(((char*)0) + pipeErr[0]));
|
||||
msghandler.detach();
|
||||
}
|
||||
|
@ -262,7 +263,7 @@ int main(int argc, char ** argv){
|
|||
}
|
||||
}else if(yna(in_string) == 'a'){
|
||||
//abort controller startup
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -284,7 +285,7 @@ int main(int argc, char ** argv){
|
|||
}
|
||||
}else if(yna(in_string) == 'a'){
|
||||
//abort controller startup
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -334,10 +335,10 @@ int main(int argc, char ** argv){
|
|||
}else{
|
||||
shutdown_reason = "socket problem (API port closed)";
|
||||
}
|
||||
/*LTS-START*/
|
||||
if (Controller::restarting){
|
||||
shutdown_reason = "update (on request)";
|
||||
shutdown_reason = "restart (on request)";
|
||||
}
|
||||
/*LTS-START*/
|
||||
if(Triggers::shouldTrigger("SYSTEM_STOP")){
|
||||
if (!Triggers::doTrigger("SYSTEM_STOP", shutdown_reason)){
|
||||
Controller::conf.is_active = true;
|
||||
|
@ -348,8 +349,10 @@ int main(int argc, char ** argv){
|
|||
Controller::Log("CONF", "Controller shutting down because of "+shutdown_reason);
|
||||
}
|
||||
}else{
|
||||
/*LTS-END*/
|
||||
Controller::conf.is_active = false;
|
||||
Controller::Log("CONF", "Controller shutting down because of "+shutdown_reason);
|
||||
/*LTS-START*/
|
||||
}
|
||||
}//indentation intentionally wrong, to minimize Pro/nonPro diffs
|
||||
/*LTS-END*/
|
||||
|
@ -374,16 +377,85 @@ int main(int argc, char ** argv){
|
|||
Util::Procs::StopAll();
|
||||
//give everything some time to print messages
|
||||
Util::wait(100);
|
||||
std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
|
||||
if (Controller::restarting){
|
||||
return 42;
|
||||
}
|
||||
//close stderr to make the stderr reading thread exit
|
||||
close(STDERR_FILENO);
|
||||
std::cout << "Killed all processes, wrote config to disk. Exiting." << std::endl;
|
||||
/*LTS-START*/
|
||||
if (Controller::restarting){
|
||||
std::string myFile = Util::getMyPath() + "MistController";
|
||||
execvp(myFile.c_str(), argv);
|
||||
std::cout << "Error restarting: " << strerror(errno) << std::endl;
|
||||
}
|
||||
/*LTS-END*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
void handleUSR1(int signum, siginfo_t * sigInfo, void * ignore){
|
||||
Controller::Log("CONF", "USR1 received - restarting controller");
|
||||
Controller::restarting = true;
|
||||
raise(SIGINT); //trigger restart
|
||||
}
|
||||
|
||||
///\brief The controller angel process.
|
||||
///Starts a forked main_loop in a loop. Yes, you read that right.
|
||||
int main(int argc, char ** argv){
|
||||
Util::Procs::setHandler();//set child handler
|
||||
{
|
||||
struct sigaction new_action;
|
||||
struct sigaction cur_action;
|
||||
new_action.sa_sigaction = handleUSR1;
|
||||
sigemptyset(&new_action.sa_mask);
|
||||
new_action.sa_flags = 0;
|
||||
sigaction(SIGUSR1, &new_action, NULL);
|
||||
}
|
||||
|
||||
Controller::conf = Util::Config(argv[0]);
|
||||
Controller::conf.activate();
|
||||
uint64_t reTimer = 0;
|
||||
while (Controller::conf.is_active){
|
||||
pid_t pid = fork();
|
||||
if (pid == 0){
|
||||
Util::Procs::handler_set = false;
|
||||
Util::Procs::reaper_thread = 0;
|
||||
{
|
||||
struct sigaction new_action;
|
||||
struct sigaction cur_action;
|
||||
new_action.sa_sigaction = handleUSR1;
|
||||
sigemptyset(&new_action.sa_mask);
|
||||
new_action.sa_flags = 0;
|
||||
sigaction(SIGUSR1, &new_action, NULL);
|
||||
}
|
||||
return main_loop(argc, argv);
|
||||
}
|
||||
if (pid == -1){
|
||||
FAIL_MSG("Unable to spawn controller process!");
|
||||
return 2;
|
||||
}
|
||||
//wait for the process to exit
|
||||
int status;
|
||||
while (waitpid(pid, &status, 0) != pid && errno == EINTR){
|
||||
if (Controller::restarting){
|
||||
Controller::conf.is_active = true;
|
||||
Controller::restarting = false;
|
||||
kill(pid, SIGUSR1);
|
||||
}
|
||||
if (!Controller::conf.is_active){
|
||||
INFO_MSG("Shutting down controller because of signal interrupt...");
|
||||
Util::Procs::Stop(pid);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
//if the exit was clean, don't restart it
|
||||
if (WIFEXITED(status) && (WEXITSTATUS(status) == 0)){
|
||||
MEDIUM_MSG("Controller shut down cleanly");
|
||||
break;
|
||||
}
|
||||
if (WIFEXITED(status) && (WEXITSTATUS(status) == 42)){
|
||||
WARN_MSG("Refreshing angel process for update");
|
||||
std::string myFile = Util::getMyPath() + "MistController";
|
||||
execvp(myFile.c_str(), argv);
|
||||
FAIL_MSG("Error restarting: %s", strerror(errno));
|
||||
}
|
||||
INFO_MSG("Controller uncleanly shut down! Restarting in %llu...", reTimer);
|
||||
Util::wait(reTimer);
|
||||
reTimer += 1000;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ namespace Controller {
|
|||
tthread::mutex logMutex;
|
||||
unsigned long long logCounter = 0;
|
||||
bool configChanged = false;
|
||||
bool restarting = false;
|
||||
|
||||
///\brief Store and print a log message.
|
||||
///\param kind The type of message.
|
||||
|
|
|
@ -9,6 +9,7 @@ namespace Controller {
|
|||
extern tthread::mutex logMutex;///< Mutex for log thread.
|
||||
extern tthread::mutex configMutex;///< Mutex for server config access.
|
||||
extern bool configChanged; ///< Bool that indicates config must be written to SHM.
|
||||
extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
|
||||
extern unsigned long long logCounter; ///<Count of logged messages since boot
|
||||
|
||||
/// Store and print a log message.
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include "controller_updater.h"
|
||||
|
||||
namespace Controller {
|
||||
bool restarting = false;
|
||||
JSON::Value updates;
|
||||
std::string uniqId;
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
#endif
|
||||
|
||||
namespace Controller {
|
||||
extern bool restarting;///< Signals if the controller is shutting down (false) or restarting (true).
|
||||
extern JSON::Value updates;
|
||||
extern std::string uniqId;
|
||||
|
||||
|
|
|
@ -581,5 +581,19 @@ namespace Mist {
|
|||
void Input::quitPlay() {
|
||||
playing = 0;
|
||||
}
|
||||
|
||||
bool Input::readExistingHeader(){
|
||||
DTSC::File tmpdtsh(config->getString("input") + ".dtsh");
|
||||
if (!tmpdtsh){
|
||||
return false;
|
||||
}
|
||||
if (tmpdtsh.getMeta().version != DTSH_VERSION){
|
||||
INFO_MSG("Updating wrong version header file from version %llu to %llu", tmpdtsh.getMeta().version, DTSH_VERSION);
|
||||
return false;
|
||||
}
|
||||
myMeta = tmpdtsh.getMeta();
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ namespace Mist {
|
|||
static void callbackWrapper(char * data, size_t len, unsigned int id);
|
||||
virtual bool setup() = 0;
|
||||
virtual bool readHeader() = 0;
|
||||
virtual bool readExistingHeader();
|
||||
virtual bool atKeyFrame();
|
||||
virtual void getNext(bool smart = true) {};
|
||||
virtual void seek(int seekTime){};
|
||||
|
|
|
@ -526,6 +526,7 @@ namespace Mist {
|
|||
} else if (everHadPush && !resumeMode && config->is_active) {
|
||||
INFO_MSG("Shutting down buffer because resume mode is disabled and the source disconnected");
|
||||
config->is_active = false;
|
||||
userPage.finishEach();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,23 +49,13 @@ namespace Mist {
|
|||
}
|
||||
|
||||
bool inputFLV::readHeader() {
|
||||
JSON::Value lastPack;
|
||||
if (!inFile) {
|
||||
return false;
|
||||
}
|
||||
if (!inFile){return false;}
|
||||
//See whether a separate header file exists.
|
||||
DTSC::File tmp(config->getString("input") + ".dtsh");
|
||||
if (tmp){
|
||||
myMeta = tmp.getMeta();
|
||||
if (myMeta){
|
||||
return true;
|
||||
}else{
|
||||
myMeta = DTSC::Meta();
|
||||
}
|
||||
}
|
||||
if (readExistingHeader()){return true;}
|
||||
//Create header file from FLV data
|
||||
fseek(inFile, 13, SEEK_SET);
|
||||
AMF::Object amf_storage;
|
||||
JSON::Value lastPack;
|
||||
long long int lastBytePos = 13;
|
||||
while (!feof(inFile) && !FLV::Parse_Error){
|
||||
if (tmpTag.FileLoader(inFile)){
|
||||
|
@ -80,9 +70,7 @@ namespace Mist {
|
|||
std::cerr << FLV::Error_Str << std::endl;
|
||||
return false;
|
||||
}
|
||||
std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str());
|
||||
oFile << myMeta.toJSON().toNetPacked();
|
||||
oFile.close();
|
||||
myMeta.toFile(config->getString("input") + ".dtsh");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -47,17 +47,9 @@ namespace Mist {
|
|||
}
|
||||
|
||||
bool inputMP3::readHeader() {
|
||||
if (!inFile) {
|
||||
return false;
|
||||
}
|
||||
if (!inFile){return false;}
|
||||
//See whether a separate header file exists.
|
||||
DTSC::File tmp(config->getString("input") + ".dtsh");
|
||||
if (tmp){
|
||||
myMeta = tmp.getMeta();
|
||||
if (myMeta){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (readExistingHeader()){return true;}
|
||||
myMeta = DTSC::Meta();
|
||||
myMeta.tracks[1].trackID = 1;
|
||||
myMeta.tracks[1].type = "audio";
|
||||
|
@ -93,9 +85,7 @@ namespace Mist {
|
|||
|
||||
fseek(inFile, 0, SEEK_SET);
|
||||
timestamp = 0;
|
||||
std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str());
|
||||
oFile << myMeta.toJSON().toNetPacked();
|
||||
oFile.close();
|
||||
myMeta.toFile(config->getString("input") + ".dtsh");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -222,11 +222,7 @@ namespace Mist {
|
|||
getNext();
|
||||
}
|
||||
|
||||
std::ofstream oFile(std::string(config->getString("input") + ".dtsh").c_str());
|
||||
oFile << myMeta.toJSON().toNetPacked();
|
||||
oFile.close();
|
||||
|
||||
//myMeta.toPrettyString(std::cout);
|
||||
myMeta.toFile(config->getString("input") + ".dtsh");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -73,6 +73,10 @@ namespace Mist {
|
|||
}
|
||||
|
||||
void Output::updateMeta(){
|
||||
//cancel if not alive
|
||||
if (!nProxy.userClient.isAlive()){
|
||||
return;
|
||||
}
|
||||
//read metadata from page to myMeta variable
|
||||
if (nProxy.metaPages[0].mapped){
|
||||
IPC::semaphore * liveSem = 0;
|
||||
|
@ -249,22 +253,30 @@ namespace Mist {
|
|||
statsPage.finish();
|
||||
myConn.resetCounter();
|
||||
}
|
||||
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
|
||||
if (nProxy.userClient.getData()){
|
||||
nProxy.userClient.finish();
|
||||
}
|
||||
char userPageName[NAME_BUFFER_SIZE];
|
||||
snprintf(userPageName, NAME_BUFFER_SIZE, SHM_USERS, streamName.c_str());
|
||||
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
|
||||
unsigned int attempts = 0;
|
||||
while (!nProxy.userClient.isAlive() && ++attempts < 20 && Util::streamAlive(streamName)){
|
||||
nProxy.userClient = IPC::sharedClient(userPageName, PLAY_EX_SIZE, true);
|
||||
}
|
||||
if (!nProxy.userClient.isAlive()){
|
||||
FAIL_MSG("Could not register as client for %s", streamName.c_str());
|
||||
onFail();
|
||||
return;
|
||||
}
|
||||
char pageId[NAME_BUFFER_SIZE];
|
||||
snprintf(pageId, NAME_BUFFER_SIZE, SHM_STREAM_INDEX, streamName.c_str());
|
||||
nProxy.metaPages.clear();
|
||||
nProxy.metaPages[0].init(pageId, DEFAULT_STRM_PAGE_SIZE);
|
||||
if (!nProxy.metaPages[0].mapped){
|
||||
FAIL_MSG("Could not connect to server for %s", streamName.c_str());
|
||||
FAIL_MSG("Could not connect to data for %s", streamName.c_str());
|
||||
onFail();
|
||||
return;
|
||||
}
|
||||
statsPage = IPC::sharedClient(SHM_STATISTICS, STAT_EX_SIZE, true);
|
||||
stats(true);
|
||||
updateMeta();
|
||||
selectDefaultTracks();
|
||||
|
@ -490,8 +502,12 @@ namespace Mist {
|
|||
}
|
||||
|
||||
void Output::loadPageForKey(long unsigned int trackId, long long int keyNum){
|
||||
if (!myMeta.tracks.count(trackId) || !myMeta.tracks[trackId].keys.size()){
|
||||
WARN_MSG("Load for track %lu key %lld aborted - track is empty", trackId, keyNum);
|
||||
return;
|
||||
}
|
||||
if (myMeta.vod && keyNum > myMeta.tracks[trackId].keys.rbegin()->getNumber()){
|
||||
INFO_MSG("Seek in track %lu to key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber());
|
||||
INFO_MSG("Load for track %lu key %lld aborted, is > %lld", trackId, keyNum, myMeta.tracks[trackId].keys.rbegin()->getNumber());
|
||||
nProxy.curPage.erase(trackId);
|
||||
currKeyOpen.erase(trackId);
|
||||
return;
|
||||
|
@ -1229,25 +1245,28 @@ namespace Mist {
|
|||
//check where the next key is
|
||||
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
|
||||
int nextPage = pageNumForKey(nxt.tid, nxtKeyNum[nxt.tid]+1);
|
||||
//are we live, and the next key hasn't shown up on another page, then we're waiting.
|
||||
if (myMeta.live && currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){
|
||||
//if the next key hasn't shown up on another page, then we're waiting.
|
||||
//VoD might be slow, so we check VoD case also, just in case
|
||||
if (currKeyOpen.count(nxt.tid) && (currKeyOpen[nxt.tid] == (unsigned int)nextPage || nextPage == -1)){
|
||||
if (++emptyCount < 100){
|
||||
Util::wait(250);
|
||||
//we're waiting for new data to show up
|
||||
if (emptyCount % 64 == 0){
|
||||
reconnect();//reconnect every 16 seconds
|
||||
}else{
|
||||
if (emptyCount % 4 == 0){
|
||||
//updating meta is only useful with live streams
|
||||
if (myMeta.live && emptyCount % 4 == 0){
|
||||
updateMeta();
|
||||
}
|
||||
}
|
||||
}else{
|
||||
//after ~25 seconds, give up and drop the track.
|
||||
dropTrack(nxt.tid, "EOP: could not reload empty packet");
|
||||
dropTrack(nxt.tid, "EOP: data wait timeout");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
//The next key showed up on another page!
|
||||
//We've simply reached the end of the page. Load the next key = next page.
|
||||
loadPageForKey(nxt.tid, ++nxtKeyNum[nxt.tid]);
|
||||
nxt.offset = 0;
|
||||
|
@ -1297,6 +1316,10 @@ namespace Mist {
|
|||
|
||||
//when live, every keyframe, check correctness of the keyframe number
|
||||
if (myMeta.live && thisPacket.getFlag("keyframe")){
|
||||
//cancel if not alive
|
||||
if (!nProxy.userClient.isAlive()){
|
||||
return false;
|
||||
}
|
||||
//Check whether returned keyframe is correct. If not, wait for approximately 10 seconds while checking.
|
||||
//Failure here will cause tracks to drop due to inconsistent internal state.
|
||||
nxtKeyNum[nxt.tid] = getKeyForTime(nxt.tid, thisPacket.getTime());
|
||||
|
|
|
@ -586,6 +586,16 @@ namespace Mist {
|
|||
void OutRTMP::parseAMFCommand(AMF::Object & amfData, int messageType, int streamId) {
|
||||
MEDIUM_MSG("Received command: %s", amfData.Print().c_str());
|
||||
HIGH_MSG("AMF0 command: %s", amfData.getContentP(0)->StrValue().c_str());
|
||||
if (amfData.getContentP(0)->StrValue() == "xsbwtest") {
|
||||
//send a _result reply
|
||||
AMF::Object amfReply("container", AMF::AMF0_DDV_CONTAINER);
|
||||
amfReply.addContent(AMF::Object("", "_error")); //result success
|
||||
amfReply.addContent(amfData.getContent(1)); //same transaction ID
|
||||
amfReply.addContent(AMF::Object("", amfData.getContentP(0)->StrValue())); //null - command info
|
||||
amfReply.addContent(AMF::Object("", "Hai XSplit user!")); //stream ID?
|
||||
sendCommand(amfReply, messageType, streamId);
|
||||
return;
|
||||
}
|
||||
if (amfData.getContentP(0)->StrValue() == "connect") {
|
||||
double objencoding = 0;
|
||||
if (amfData.getContentP(2)->getContentP("objectEncoding")) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue